-
Notifications
You must be signed in to change notification settings - Fork 131
[dbsp] Better handle errors encountered listing storage files. #6296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ use std::{ | |
|
|
||
| use feldera_storage::error::StorageError; | ||
| use feldera_storage::fbuf::FBuf; | ||
| use feldera_storage::{StorageBackend, StorageFileType, StoragePath}; | ||
| use feldera_storage::{DirEntry, StorageBackend, StorageFileType, StoragePath}; | ||
| use uuid::Uuid; | ||
|
|
||
| use super::RuntimeError; | ||
|
|
@@ -89,8 +89,8 @@ impl Checkpointer { | |
| let mut usage = 0; | ||
| StorageError::ignore_notfound(self.backend.list( | ||
| &Self::checkpoint_dir(uuid), | ||
| &mut |_path, file_type| { | ||
| if let StorageFileType::File { size } = file_type { | ||
| &mut |entry| { | ||
| if let Ok(StorageFileType::File { size }) = entry.file_type { | ||
| usage += size; | ||
| } | ||
| }, | ||
|
|
@@ -148,10 +148,14 @@ impl Checkpointer { | |
| // name more than once (it should not happen but who knows?). | ||
|
|
||
| /// Simplified [StorageFileType] (without a file size). | ||
| #[derive(Copy, Clone, Debug, Enum)] | ||
| #[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)] | ||
| enum Class { | ||
| /// Regular file. | ||
| File, | ||
| /// Directory. | ||
| Directory, | ||
| /// Another kind of file, or unknown type (i.e. there was an error | ||
| /// determining the file type). | ||
| Other, | ||
| } | ||
| impl From<StorageFileType> for Class { | ||
|
|
@@ -176,17 +180,22 @@ impl Checkpointer { | |
| } | ||
| let mut usage = 0; | ||
| let mut counts = EnumMap::from_fn(|_| EnumMap::from_fn(|_| 0usize)); | ||
| let mut n_errors = 0; | ||
| self.backend | ||
| .list(&StoragePath::default(), &mut |path, file_type| { | ||
| .list(&StoragePath::default(), &mut |DirEntry { name, file_type}| { | ||
| let file_type = file_type.unwrap_or_else(|_| { | ||
| n_errors += 1; | ||
| StorageFileType::Other | ||
| }); | ||
| let class = Class::from(file_type); | ||
| let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(path) { | ||
| let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(&name) { | ||
| // Clear the indexes but don't remove the entry in case `list()` | ||
| // reports the same file name again, which should not happen but | ||
| // who knows?. (Otherwise, on the second appearance, we would | ||
| // who knows? (Otherwise, on the second appearance, we would | ||
| // delete the file.) | ||
| checkpoint_indexes.clear(); | ||
| Disposition::KeepExpected | ||
| } else if is_feldera_filename(path) || file_type == StorageFileType::Directory { | ||
| } else if is_feldera_filename(&name) || class == Class::Directory { | ||
| Disposition::Remove | ||
| } else { | ||
| Disposition::KeepUnexpected | ||
|
|
@@ -197,19 +206,19 @@ impl Checkpointer { | |
| Disposition::KeepExpected => (), | ||
| Disposition::KeepUnexpected => { | ||
| if counts[disposition][class] < 10 { | ||
| info!("Keeping unexpected {file_type:?} {path}"); | ||
| info!("Keeping unexpected {class:?} {name}"); | ||
| } else { | ||
| debug!("Keeping unexpected {file_type:?} {path}"); | ||
| debug!("Keeping unexpected {class:?} {name}"); | ||
| } | ||
| } | ||
| Disposition::Remove => { | ||
| if counts[disposition][class] < 10 { | ||
| info!("Removing unused {file_type:?} {path}"); | ||
| info!("Removing unused {class:?} {name}"); | ||
| } else { | ||
| debug!("Removing unused {file_type:?} {path}"); | ||
| debug!("Removing unused {class:?} {name}"); | ||
| } | ||
| if let Err(e) = self.backend.delete_recursive(path) { | ||
| warn!("Failed to delete {file_type:?} {path}: {e} (the pipeline will try to delete the file again on a restart)"); | ||
| if let Err(e) = self.backend.delete_recursive(&name) { | ||
| warn!("Failed to delete {class:?} {name}: {e} (the pipeline will try to delete the file again on a restart)"); | ||
| } else { | ||
| return; | ||
| } | ||
|
|
@@ -220,7 +229,7 @@ impl Checkpointer { | |
| } | ||
| })?; | ||
| info!( | ||
| "GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused", | ||
| "GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused; {n_errors} error(s) reading directory entries", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: the GC log line is now wide enough that I'd flip it to fielded logging ( |
||
| counts[Disposition::KeepExpected][Class::File], | ||
| counts[Disposition::KeepExpected][Class::Directory], | ||
| counts[Disposition::KeepExpected][Class::Other], | ||
|
|
@@ -296,9 +305,9 @@ impl Checkpointer { | |
| } | ||
|
|
||
| let mut present: HashSet<String> = HashSet::new(); | ||
| backend.list(cp_dir, &mut |path, file_type| { | ||
| if file_type != StorageFileType::Directory | ||
| && let Some(name) = path.filename() | ||
| backend.list(cp_dir, &mut |entry| { | ||
| if !matches!(&entry.file_type, Ok(StorageFileType::Directory)) | ||
| && let Some(name) = entry.name.filename() | ||
| { | ||
| present.insert(name.to_string()); | ||
| } | ||
|
|
@@ -374,9 +383,9 @@ impl Checkpointer { | |
| // also catches a missing marker on restore. | ||
| let cp_dir = Self::checkpoint_dir(uuid); | ||
| let mut state_files: Vec<String> = Vec::new(); | ||
| self.backend.list(&cp_dir, &mut |path, file_type| { | ||
| if file_type != StorageFileType::Directory | ||
| && let Some(name) = path.filename() | ||
| self.backend.list(&cp_dir, &mut |entry| { | ||
| if !matches!(&entry.file_type, Ok(StorageFileType::Directory)) | ||
| && let Some(name) = entry.name.filename() | ||
| && name != CHECKPOINT_DEPENDENCIES | ||
| { | ||
| state_files.push(name.to_string()); | ||
|
|
@@ -435,9 +444,9 @@ impl Checkpointer { | |
| Ok(checkpoints) => Ok(checkpoints), | ||
| Err(error) if error.kind() == ErrorKind::NotFound => { | ||
| let mut orphan_uuid_dirs: Vec<String> = Vec::new(); | ||
| backend.list(&StoragePath::default(), &mut |path, file_type| { | ||
| if file_type == StorageFileType::Directory | ||
| && let Some(name) = path.filename() | ||
| backend.list(&StoragePath::default(), &mut |entry| { | ||
| if matches!(&entry.file_type, Ok(StorageFileType::Directory)) | ||
| && let Some(name) = entry.name.filename() | ||
| && Uuid::parse_str(name).is_ok() | ||
| { | ||
| orphan_uuid_dirs.push(name.to_string()); | ||
|
|
@@ -720,7 +729,7 @@ impl<T: Default> Checkpoint for EmptyCheckpoint<T> { | |
| mod test { | ||
| use std::sync::Arc; | ||
|
|
||
| use feldera_storage::{StorageBackend, StoragePath}; | ||
| use feldera_storage::{DirEntry, StorageBackend, StoragePath}; | ||
| use feldera_types::config::{FileBackendConfig, StorageCacheConfig}; | ||
| use feldera_types::constants::CHECKPOINT_FILE_NAME; | ||
| use itertools::Itertools; | ||
|
|
@@ -766,7 +775,7 @@ mod test { | |
| fn list( | ||
| &self, | ||
| parent: &StoragePath, | ||
| cb: &mut dyn FnMut(&StoragePath, feldera_storage::StorageFileType), | ||
| cb: &mut dyn FnMut(DirEntry), | ||
| ) -> Result<(), feldera_storage::error::StorageError> { | ||
| self.inner.list(parent, cb) | ||
| } | ||
|
|
@@ -801,7 +810,7 @@ mod test { | |
| LogCapture::new(|| Checkpointer::new(backend.clone()).unwrap()).into_parts(); | ||
| assert_eq!( | ||
| log, | ||
| " INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" | ||
| " INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" | ||
| ); | ||
|
|
||
| let uuid = uuid::Uuid::now_v7(); | ||
|
|
@@ -823,7 +832,7 @@ mod test { | |
|
|
||
| assert_eq!( | ||
| LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log, | ||
| " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" | ||
| " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" | ||
| ); | ||
|
|
||
| // Add a couple of stray files, one that matches Feldera's pattern and | ||
|
|
@@ -843,10 +852,10 @@ mod test { | |
| assert_eq!( | ||
| log_sorted, | ||
| vec![ | ||
| " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused", | ||
| " INFO dbsp::circuit::checkpointer: Keeping unexpected File { size: 0 } unexpected.xyzzy", | ||
| " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused; 0 error(s) reading directory entries", | ||
| " INFO dbsp::circuit::checkpointer: Keeping unexpected File unexpected.xyzzy", | ||
| " INFO dbsp::circuit::checkpointer: Removing unused Directory unexpected-dir", | ||
| " INFO dbsp::circuit::checkpointer: Removing unused File { size: 0 } expected.feldera" | ||
| " INFO dbsp::circuit::checkpointer: Removing unused File expected.feldera" | ||
| ] | ||
| ); | ||
| assert!(!expected_file.exists()); | ||
|
|
@@ -910,7 +919,7 @@ mod test { | |
| let log = LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log; | ||
| assert_eq!( | ||
| log, | ||
| " INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" | ||
| " INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n" | ||
| ); | ||
| assert!( | ||
| batch_path.exists(), | ||
|
|
@@ -922,7 +931,7 @@ mod test { | |
| std::fs::remove_file(&batch_path).unwrap(); | ||
| assert_eq!( | ||
| LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log, | ||
| r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused | ||
| r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries | ||
| ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) with the following UUID(s) have 1 missing file(s) in storage: 019e4708-bac8-7c10-b2f7-5cb248871905 | ||
| ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) need missing file: w0-aaaaaaaa.feldera | ||
| "# | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |
| use super::{BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageBackend, StorageError}; | ||
| use crate::circuit::metrics::FILES_CREATED; | ||
| use crate::storage::buffer_cache::FBuf; | ||
| use feldera_storage::{FileCommitter, StorageFileType, StoragePath}; | ||
| use feldera_storage::{DirEntry, FileCommitter, StorageFileType, StoragePath}; | ||
| use std::fmt::Debug; | ||
| use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; | ||
| use std::{ | ||
|
|
@@ -237,11 +237,7 @@ impl StorageBackend for MemoryBackend { | |
| } | ||
| } | ||
|
|
||
| fn list( | ||
| &self, | ||
| parent: &StoragePath, | ||
| cb: &mut dyn FnMut(&StoragePath, StorageFileType), | ||
| ) -> Result<(), StorageError> { | ||
| fn list(&self, parent: &StoragePath, cb: &mut dyn FnMut(DirEntry)) -> Result<(), StorageError> { | ||
| let entries = self | ||
| .0 | ||
| .files | ||
|
|
@@ -251,8 +247,11 @@ impl StorageBackend for MemoryBackend { | |
| .filter(|&(name, _file)| name.prefix_matches(parent)) | ||
| .map(|(name, file)| (name.clone(), file.size)) | ||
| .collect::<Vec<_>>(); | ||
| for (path, size) in entries { | ||
| cb(&path, StorageFileType::File { size }); | ||
| for (name, size) in entries { | ||
| cb(DirEntry { | ||
| name, | ||
| file_type: Ok(StorageFileType::File { size }), | ||
| }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new contract — that |
||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,6 @@ use feldera_storage::{ | |
| use feldera_types::config::{ | ||
| FileBackendConfig, StorageBackendConfig, StorageCacheConfig, StorageConfig, | ||
| }; | ||
| use std::ffi::OsString; | ||
| use std::fmt::Debug; | ||
| use std::fs::{DirEntry, create_dir_all}; | ||
| use std::io::{ErrorKind, IoSlice, Write}; | ||
|
|
@@ -35,7 +34,7 @@ use std::{ | |
| atomic::{AtomicBool, AtomicI64, Ordering}, | ||
| }, | ||
| }; | ||
| use tracing::warn; | ||
| use tracing::{debug, warn}; | ||
|
|
||
| /// fsync the directory at `path` so a freshly-created child entry (a | ||
| /// rename target or a new subdirectory) becomes durable. Without this, | ||
|
|
@@ -506,9 +505,9 @@ impl StorageBackend for PosixBackend { | |
| fn list( | ||
| &self, | ||
| parent: &StoragePath, | ||
| cb: &mut dyn FnMut(&StoragePath, StorageFileType), | ||
| cb: &mut dyn FnMut(feldera_storage::DirEntry), | ||
| ) -> Result<(), StorageError> { | ||
| fn parse_entry(entry: &DirEntry) -> Result<(OsString, StorageFileType), StorageError> { | ||
| fn get_file_type(entry: &DirEntry) -> Result<StorageFileType, StorageError> { | ||
| let file_type = entry.file_type().map_err(|e| { | ||
| StorageError::stdio(e.kind(), "readdir type", entry.path().display()) | ||
| })?; | ||
|
|
@@ -526,33 +525,50 @@ impl StorageBackend for PosixBackend { | |
| } else { | ||
| StorageFileType::Other | ||
| }; | ||
| Ok((entry.file_name(), file_type)) | ||
| Ok(file_type) | ||
| } | ||
|
|
||
| let mut result = Ok(()); | ||
| let path = self.fs_path(parent); | ||
| for entry in path | ||
| .read_dir() | ||
| .map_err(|e| StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display()))? | ||
| { | ||
| match entry | ||
| .map_err(|e| StorageError::stdio(e.kind(), "readdir entry", path.display())) | ||
| .and_then(|entry| parse_entry(&entry)) | ||
| { | ||
| Err(e) => { | ||
| if e.kind() != ErrorKind::NotFound { | ||
| result = Err(e) | ||
| } else { | ||
| let entries = path.read_dir().map_err(|e| { | ||
| StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display()) | ||
| })?; | ||
| let mut warnings = 0usize..20; | ||
| for entry in entries { | ||
| match entry { | ||
| Ok(entry) => { | ||
| let entry = feldera_storage::DirEntry { | ||
| name: parent | ||
| .child(StoragePathPart::from(entry.file_name().as_encoded_bytes())), | ||
| file_type: get_file_type(&entry), | ||
| }; | ||
| if let Err(e) = &entry.file_type | ||
| && e.kind() == ErrorKind::NotFound | ||
| { | ||
| // Ignore NotFound error. The file was probably | ||
| // `status.json.mut`, renamed by the adapters server to | ||
| // `status.json` between the call to readdir and the | ||
| // call to stat. Don't succumb to a race for it. | ||
| } else { | ||
| if let Err(e) = &entry.file_type { | ||
| match warnings.next_back() { | ||
| Some(1..) => warn!("I/O error listing {parent}: {e}"), | ||
| Some(0) => warn!( | ||
| "I/O error listing {parent} (further warnings will be at debug level): {e}" | ||
| ), | ||
| None => debug!("I/O error listing {parent}: {e}"), | ||
| } | ||
| } | ||
| cb(entry); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style nit, non-blocking: this checks match &entry.file_type {
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => match warnings.next().unwrap() { /* … */ },
Ok(_) => {}
}
cb(entry); |
||
| } | ||
| } | ||
| Ok((name, file_type)) => cb( | ||
| &parent.child(StoragePathPart::from(name.as_encoded_bytes())), | ||
| file_type, | ||
| ), | ||
| Err(error) => { | ||
| result = Err(StorageError::stdio( | ||
| error.kind(), | ||
| "readdir entry", | ||
| path.display(), | ||
| )); | ||
| } | ||
| } | ||
| } | ||
| result | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An
Err(_)here collapses any per-entry metadata failure intoClass::Other+Disposition::KeepUnexpected. That's reasonable — refuse to delete what you can't classify — but the GC log line will show those entries asKeeping unexpected Other …even though the real reason is "could not stat". Worth a one-line debug log at the point of the unwrap_or_else (debug!("Cannot determine type of {name}: {e}, treating as Other")) so operators can correlate then_errorscount with concrete paths.