From 3c417d1c55daa0fd9ec8173b917af3731f69694d Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Thu, 21 May 2026 10:23:48 -0700 Subject: [PATCH] [dbsp] Better handle errors encountered listing storage files. The storage interface `StorageBackend::list` did not distinguish between errors that meant that some or all of the files in a directory were not listed and errors that meant that an individual file's metadata could not be obtained; it reported them all the same way, through its return value. This commit changes that. Now, errors that could lose files are still reported through the return value, and errors that mean that the metadata for an individual file cannot be obtained are reported through the callback. This allows the caller to more intelligently interpret the errors. This commit also improves error logging: the POSIX backend now logs warnings for errors obtaining metadata (up to a limit), and the checkpointer code logs the number of directory entry errors encountered during GC. Signed-off-by: Ben Pfaff --- crates/adapters/src/controller/journal.rs | 8 +- crates/dbsp/src/circuit/checkpointer.rs | 75 +++++++++++-------- .../dbsp/src/storage/backend/memory_impl.rs | 15 ++-- .../dbsp/src/storage/backend/posixio_impl.rs | 58 ++++++++------ crates/storage/src/lib.rs | 38 +++++++--- 5 files changed, 117 insertions(+), 77 deletions(-) diff --git a/crates/adapters/src/controller/journal.rs b/crates/adapters/src/controller/journal.rs index ed037c77641..50877d00566 100644 --- a/crates/adapters/src/controller/journal.rs +++ b/crates/adapters/src/controller/journal.rs @@ -89,8 +89,8 @@ impl Journal { } else { let mut result = Ok(()); self.backend - .list(&self.path, &mut |path, _kind| { - let Some(file_name) = path.filename() else { + .list(&self.path, &mut |entry| { + let Some(file_name) = entry.name.filename() else { return; }; let Some(number) = file_name.strip_suffix(".bin") else { @@ -102,8 +102,8 @@ impl Journal { if !steps.contains(&step) { return; } - if let Err(error) = self.backend.delete(path) { - result = Err(StepError::storage_error(path, error)); + if let Err(error) = self.backend.delete(&entry.name) { + result = Err(StepError::storage_error(&entry.name, error)); } }) .map_err(|error| self.storage_error(error))?; diff --git a/crates/dbsp/src/circuit/checkpointer.rs b/crates/dbsp/src/circuit/checkpointer.rs index 947c840ae41..04a3a255dba 100644 --- a/crates/dbsp/src/circuit/checkpointer.rs +++ b/crates/dbsp/src/circuit/checkpointer.rs @@ -26,7 +26,7 @@ use std::{ use feldera_storage::error::StorageError; use feldera_storage::fbuf::FBuf; -use feldera_storage::{StorageBackend, StorageFileType, StoragePath}; +use feldera_storage::{DirEntry, StorageBackend, StorageFileType, StoragePath}; use uuid::Uuid; use super::RuntimeError; @@ -89,8 +89,8 @@ impl Checkpointer { let mut usage = 0; StorageError::ignore_notfound(self.backend.list( &Self::checkpoint_dir(uuid), - &mut |_path, file_type| { - if let StorageFileType::File { size } = file_type { + &mut |entry| { + if let Ok(StorageFileType::File { size }) = entry.file_type { usage += size; } }, @@ -148,10 +148,14 @@ impl Checkpointer { // name more than once (it should not happen but who knows?). /// Simplified [StorageFileType] (without a file size). - #[derive(Copy, Clone, Debug, Enum)] + #[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)] enum Class { + /// Regular file. File, + /// Directory. Directory, + /// Another kind of file, or unknown type (i.e. there was an error + /// determining the file type). Other, } impl From for Class { @@ -176,17 +180,22 @@ impl Checkpointer { } let mut usage = 0; let mut counts = EnumMap::from_fn(|_| EnumMap::from_fn(|_| 0usize)); + let mut n_errors = 0; self.backend - .list(&StoragePath::default(), &mut |path, file_type| { + .list(&StoragePath::default(), &mut |DirEntry { name, file_type}| { + let file_type = file_type.unwrap_or_else(|_| { + n_errors += 1; + StorageFileType::Other + }); let class = Class::from(file_type); - let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(path) { + let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(&name) { // Clear the indexes but don't remove the entry in case `list()` // reports the same file name again, which should not happen but - // who knows?. (Otherwise, on the second appearance, we would + // who knows? (Otherwise, on the second appearance, we would // delete the file.) checkpoint_indexes.clear(); Disposition::KeepExpected - } else if is_feldera_filename(path) || file_type == StorageFileType::Directory { + } else if is_feldera_filename(&name) || class == Class::Directory { Disposition::Remove } else { Disposition::KeepUnexpected @@ -197,19 +206,19 @@ impl Checkpointer { Disposition::KeepExpected => (), Disposition::KeepUnexpected => { if counts[disposition][class] < 10 { - info!("Keeping unexpected {file_type:?} {path}"); + info!("Keeping unexpected {class:?} {name}"); } else { - debug!("Keeping unexpected {file_type:?} {path}"); + debug!("Keeping unexpected {class:?} {name}"); } } Disposition::Remove => { if counts[disposition][class] < 10 { - info!("Removing unused {file_type:?} {path}"); + info!("Removing unused {class:?} {name}"); } else { - debug!("Removing unused {file_type:?} {path}"); + debug!("Removing unused {class:?} {name}"); } - if let Err(e) = self.backend.delete_recursive(path) { - warn!("Failed to delete {file_type:?} {path}: {e} (the pipeline will try to delete the file again on a restart)"); + if let Err(e) = self.backend.delete_recursive(&name) { + warn!("Failed to delete {class:?} {name}: {e} (the pipeline will try to delete the file again on a restart)"); } else { return; } @@ -220,7 +229,7 @@ impl Checkpointer { } })?; info!( - "GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused", + "GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused; {n_errors} error(s) reading directory entries", counts[Disposition::KeepExpected][Class::File], counts[Disposition::KeepExpected][Class::Directory], counts[Disposition::KeepExpected][Class::Other], @@ -296,9 +305,9 @@ impl Checkpointer { } let mut present: HashSet = HashSet::new(); - backend.list(cp_dir, &mut |path, file_type| { - if file_type != StorageFileType::Directory - && let Some(name) = path.filename() + backend.list(cp_dir, &mut |entry| { + if !matches!(&entry.file_type, Ok(StorageFileType::Directory)) + && let Some(name) = entry.name.filename() { present.insert(name.to_string()); } @@ -374,9 +383,9 @@ impl Checkpointer { // also catches a missing marker on restore. let cp_dir = Self::checkpoint_dir(uuid); let mut state_files: Vec = Vec::new(); - self.backend.list(&cp_dir, &mut |path, file_type| { - if file_type != StorageFileType::Directory - && let Some(name) = path.filename() + self.backend.list(&cp_dir, &mut |entry| { + if !matches!(&entry.file_type, Ok(StorageFileType::Directory)) + && let Some(name) = entry.name.filename() && name != CHECKPOINT_DEPENDENCIES { state_files.push(name.to_string()); @@ -435,9 +444,9 @@ impl Checkpointer { Ok(checkpoints) => Ok(checkpoints), Err(error) if error.kind() == ErrorKind::NotFound => { let mut orphan_uuid_dirs: Vec = Vec::new(); - backend.list(&StoragePath::default(), &mut |path, file_type| { - if file_type == StorageFileType::Directory - && let Some(name) = path.filename() + backend.list(&StoragePath::default(), &mut |entry| { + if matches!(&entry.file_type, Ok(StorageFileType::Directory)) + && let Some(name) = entry.name.filename() && Uuid::parse_str(name).is_ok() { orphan_uuid_dirs.push(name.to_string()); @@ -720,7 +729,7 @@ impl Checkpoint for EmptyCheckpoint { mod test { use std::sync::Arc; - use feldera_storage::{StorageBackend, StoragePath}; + use feldera_storage::{DirEntry, StorageBackend, StoragePath}; use feldera_types::config::{FileBackendConfig, StorageCacheConfig}; use feldera_types::constants::CHECKPOINT_FILE_NAME; use itertools::Itertools; @@ -766,7 +775,7 @@ mod test { fn list( &self, parent: &StoragePath, - cb: &mut dyn FnMut(&StoragePath, feldera_storage::StorageFileType), + cb: &mut dyn FnMut(DirEntry), ) -> Result<(), feldera_storage::error::StorageError> { self.inner.list(parent, cb) } @@ -801,7 +810,7 @@ mod test { LogCapture::new(|| Checkpointer::new(backend.clone()).unwrap()).into_parts(); assert_eq!( log, - " INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + " INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" ); let uuid = uuid::Uuid::now_v7(); @@ -823,7 +832,7 @@ mod test { assert_eq!( LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log, - " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" ); // Add a couple of stray files, one that matches Feldera's pattern and @@ -843,10 +852,10 @@ mod test { assert_eq!( log_sorted, vec![ - " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused", - " INFO dbsp::circuit::checkpointer: Keeping unexpected File { size: 0 } unexpected.xyzzy", + " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused; 0 error(s) reading directory entries", + " INFO dbsp::circuit::checkpointer: Keeping unexpected File unexpected.xyzzy", " INFO dbsp::circuit::checkpointer: Removing unused Directory unexpected-dir", - " INFO dbsp::circuit::checkpointer: Removing unused File { size: 0 } expected.feldera" + " INFO dbsp::circuit::checkpointer: Removing unused File expected.feldera" ] ); assert!(!expected_file.exists()); @@ -910,7 +919,7 @@ mod test { let log = LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log; assert_eq!( log, - " INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + " INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" ); assert!( batch_path.exists(), @@ -922,7 +931,7 @@ mod test { std::fs::remove_file(&batch_path).unwrap(); assert_eq!( LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log, - r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused + r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) with the following UUID(s) have 1 missing file(s) in storage: 019e4708-bac8-7c10-b2f7-5cb248871905 ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) need missing file: w0-aaaaaaaa.feldera "# diff --git a/crates/dbsp/src/storage/backend/memory_impl.rs b/crates/dbsp/src/storage/backend/memory_impl.rs index fe11063792c..05ccd8f8f0f 100644 --- a/crates/dbsp/src/storage/backend/memory_impl.rs +++ b/crates/dbsp/src/storage/backend/memory_impl.rs @@ -5,7 +5,7 @@ use super::{BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageBackend, StorageError}; use crate::circuit::metrics::FILES_CREATED; use crate::storage::buffer_cache::FBuf; -use feldera_storage::{FileCommitter, StorageFileType, StoragePath}; +use feldera_storage::{DirEntry, FileCommitter, StorageFileType, StoragePath}; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::{ @@ -237,11 +237,7 @@ impl StorageBackend for MemoryBackend { } } - fn list( - &self, - parent: &StoragePath, - cb: &mut dyn FnMut(&StoragePath, StorageFileType), - ) -> Result<(), StorageError> { + fn list(&self, parent: &StoragePath, cb: &mut dyn FnMut(DirEntry)) -> Result<(), StorageError> { let entries = self .0 .files @@ -251,8 +247,11 @@ impl StorageBackend for MemoryBackend { .filter(|&(name, _file)| name.prefix_matches(parent)) .map(|(name, file)| (name.clone(), file.size)) .collect::>(); - for (path, size) in entries { - cb(&path, StorageFileType::File { size }); + for (name, size) in entries { + cb(DirEntry { + name, + file_type: Ok(StorageFileType::File { size }), + }); } Ok(()) } diff --git a/crates/dbsp/src/storage/backend/posixio_impl.rs b/crates/dbsp/src/storage/backend/posixio_impl.rs index f64730835b1..38d3f6a8635 100644 --- a/crates/dbsp/src/storage/backend/posixio_impl.rs +++ b/crates/dbsp/src/storage/backend/posixio_impl.rs @@ -19,7 +19,6 @@ use feldera_storage::{ use feldera_types::config::{ FileBackendConfig, StorageBackendConfig, StorageCacheConfig, StorageConfig, }; -use std::ffi::OsString; use std::fmt::Debug; use std::fs::{DirEntry, create_dir_all}; use std::io::{ErrorKind, IoSlice, Write}; @@ -35,7 +34,7 @@ use std::{ atomic::{AtomicBool, AtomicI64, Ordering}, }, }; -use tracing::warn; +use tracing::{debug, warn}; /// fsync the directory at `path` so a freshly-created child entry (a /// rename target or a new subdirectory) becomes durable. Without this, @@ -506,9 +505,9 @@ impl StorageBackend for PosixBackend { fn list( &self, parent: &StoragePath, - cb: &mut dyn FnMut(&StoragePath, StorageFileType), + cb: &mut dyn FnMut(feldera_storage::DirEntry), ) -> Result<(), StorageError> { - fn parse_entry(entry: &DirEntry) -> Result<(OsString, StorageFileType), StorageError> { + fn get_file_type(entry: &DirEntry) -> Result { let file_type = entry.file_type().map_err(|e| { StorageError::stdio(e.kind(), "readdir type", entry.path().display()) })?; @@ -526,33 +525,50 @@ impl StorageBackend for PosixBackend { } else { StorageFileType::Other }; - Ok((entry.file_name(), file_type)) + Ok(file_type) } let mut result = Ok(()); let path = self.fs_path(parent); - for entry in path - .read_dir() - .map_err(|e| StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display()))? - { - match entry - .map_err(|e| StorageError::stdio(e.kind(), "readdir entry", path.display())) - .and_then(|entry| parse_entry(&entry)) - { - Err(e) => { - if e.kind() != ErrorKind::NotFound { - result = Err(e) - } else { + let entries = path.read_dir().map_err(|e| { + StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display()) + })?; + let mut warnings = 0usize..20; + for entry in entries { + match entry { + Ok(entry) => { + let entry = feldera_storage::DirEntry { + name: parent + .child(StoragePathPart::from(entry.file_name().as_encoded_bytes())), + file_type: get_file_type(&entry), + }; + if let Err(e) = &entry.file_type + && e.kind() == ErrorKind::NotFound + { // Ignore NotFound error. The file was probably // `status.json.mut`, renamed by the adapters server to // `status.json` between the call to readdir and the // call to stat. Don't succumb to a race for it. + } else { + if let Err(e) = &entry.file_type { + match warnings.next_back() { + Some(1..) => warn!("I/O error listing {parent}: {e}"), + Some(0) => warn!( + "I/O error listing {parent} (further warnings will be at debug level): {e}" + ), + None => debug!("I/O error listing {parent}: {e}"), + } + } + cb(entry); } } - Ok((name, file_type)) => cb( - &parent.child(StoragePathPart::from(name.as_encoded_bytes())), - file_type, - ), + Err(error) => { + result = Err(StorageError::stdio( + error.kind(), + "readdir entry", + path.display(), + )); + } } } result diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 7a1a9097c1d..f06aa16e496 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -80,14 +80,22 @@ pub trait StorageBackend: Send + Sync { None } - /// Calls `cb` with the name of each of the files under `parent`. This is a + /// Calls `cb` with the name and type of each file under `parent`. This is a /// non-recursive list: it does not include files under sub-directories of /// `parent`. - fn list( - &self, - parent: &StoragePath, - cb: &mut dyn FnMut(&StoragePath, StorageFileType), - ) -> Result<(), StorageError>; + /// + /// This method can report two classes of errors: + /// + /// - The return value indicates errors that could prevent `cb` from being + /// called for some or all of the files in the directory. These errors + /// indicate that `parent` does not exist or cannot be (fully) read + /// successfully. If more than one such error occurs, the method returns + /// the last one. + /// + /// - [DirEntry::file_type] in the argument to the callback reports errors + /// obtaining the file type. Errors reported this way only mean that + /// there was a problem obtaining metadata for the file itself. + fn list(&self, parent: &StoragePath, cb: &mut dyn FnMut(DirEntry)) -> Result<(), StorageError>; fn delete(&self, name: &StoragePath) -> Result<(), StorageError>; @@ -242,12 +250,11 @@ impl dyn StorageBackend { // runs for checkpoints written before that file existed. // TODO: remove once no such old checkpoints remain in use. let mut spines = Vec::new(); - self.list(&checkpoint_dir, &mut |path, _file_type| { - if path - .filename() - .is_some_and(|filename| filename.starts_with("pspine-batches")) + self.list(&checkpoint_dir, &mut |entry| { + if let Some(filename) = entry.name.filename() + && filename.starts_with("pspine-batches") { - spines.push(path.clone()); + spines.push(entry.name); } })?; @@ -302,6 +309,15 @@ impl dyn StorageBackend + '_ { } } +/// A directory entry read by [StorageBackend::list]. +pub struct DirEntry { + /// File name. + pub name: StoragePath, + + /// File type, if it could be obtained. + pub file_type: Result, +} + /// A file being read or written. pub trait FileRw { /// Returns the file's unique ID.