Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/adapters/src/controller/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ impl Journal {
} else {
let mut result = Ok(());
self.backend
.list(&self.path, &mut |path, _kind| {
let Some(file_name) = path.filename() else {
.list(&self.path, &mut |entry| {
let Some(file_name) = entry.name.filename() else {
return;
};
let Some(number) = file_name.strip_suffix(".bin") else {
Expand All @@ -102,8 +102,8 @@ impl Journal {
if !steps.contains(&step) {
return;
}
if let Err(error) = self.backend.delete(path) {
result = Err(StepError::storage_error(path, error));
if let Err(error) = self.backend.delete(&entry.name) {
result = Err(StepError::storage_error(&entry.name, error));
}
})
.map_err(|error| self.storage_error(error))?;
Expand Down
75 changes: 42 additions & 33 deletions crates/dbsp/src/circuit/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use feldera_storage::error::StorageError;
use feldera_storage::fbuf::FBuf;
use feldera_storage::{StorageBackend, StorageFileType, StoragePath};
use feldera_storage::{DirEntry, StorageBackend, StorageFileType, StoragePath};
use uuid::Uuid;

use super::RuntimeError;
Expand Down Expand Up @@ -89,8 +89,8 @@ impl Checkpointer {
let mut usage = 0;
StorageError::ignore_notfound(self.backend.list(
&Self::checkpoint_dir(uuid),
&mut |_path, file_type| {
if let StorageFileType::File { size } = file_type {
&mut |entry| {
if let Ok(StorageFileType::File { size }) = entry.file_type {
usage += size;
}
},
Expand Down Expand Up @@ -148,10 +148,14 @@ impl Checkpointer {
// name more than once (it should not happen but who knows?).

/// Simplified [StorageFileType] (without a file size).
#[derive(Copy, Clone, Debug, Enum)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
enum Class {
/// Regular file.
File,
/// Directory.
Directory,
/// Another kind of file, or unknown type (i.e. there was an error
/// determining the file type).
Other,
}
impl From<StorageFileType> for Class {
Expand All @@ -176,17 +180,22 @@ impl Checkpointer {
}
let mut usage = 0;
let mut counts = EnumMap::from_fn(|_| EnumMap::from_fn(|_| 0usize));
let mut n_errors = 0;
self.backend
.list(&StoragePath::default(), &mut |path, file_type| {
.list(&StoragePath::default(), &mut |DirEntry { name, file_type}| {
let file_type = file_type.unwrap_or_else(|_| {
n_errors += 1;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Err(_) here collapses any per-entry metadata failure into Class::Other + Disposition::KeepUnexpected. That's reasonable — refuse to delete what you can't classify — but the GC log line will show those entries as Keeping unexpected Other … even though the real reason is "could not stat". Worth a one-line debug log at the point of the unwrap_or_else (debug!("Cannot determine type of {name}: {e}, treating as Other")) so operators can correlate the n_errors count with concrete paths.

StorageFileType::Other
});
let class = Class::from(file_type);
let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(path) {
let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(&name) {
// Clear the indexes but don't remove the entry in case `list()`
// reports the same file name again, which should not happen but
// who knows?. (Otherwise, on the second appearance, we would
// who knows? (Otherwise, on the second appearance, we would
// delete the file.)
checkpoint_indexes.clear();
Disposition::KeepExpected
} else if is_feldera_filename(path) || file_type == StorageFileType::Directory {
} else if is_feldera_filename(&name) || class == Class::Directory {
Disposition::Remove
} else {
Disposition::KeepUnexpected
Expand All @@ -197,19 +206,19 @@ impl Checkpointer {
Disposition::KeepExpected => (),
Disposition::KeepUnexpected => {
if counts[disposition][class] < 10 {
info!("Keeping unexpected {file_type:?} {path}");
info!("Keeping unexpected {class:?} {name}");
} else {
debug!("Keeping unexpected {file_type:?} {path}");
debug!("Keeping unexpected {class:?} {name}");
}
}
Disposition::Remove => {
if counts[disposition][class] < 10 {
info!("Removing unused {file_type:?} {path}");
info!("Removing unused {class:?} {name}");
} else {
debug!("Removing unused {file_type:?} {path}");
debug!("Removing unused {class:?} {name}");
}
if let Err(e) = self.backend.delete_recursive(path) {
warn!("Failed to delete {file_type:?} {path}: {e} (the pipeline will try to delete the file again on a restart)");
if let Err(e) = self.backend.delete_recursive(&name) {
warn!("Failed to delete {class:?} {name}: {e} (the pipeline will try to delete the file again on a restart)");
} else {
return;
}
Expand All @@ -220,7 +229,7 @@ impl Checkpointer {
}
})?;
info!(
"GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused",
"GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused; {n_errors} error(s) reading directory entries",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the GC log line is now wide enough that I'd flip it to fielded logging (info!(expected_files = …, n_errors, "GC summary")) so log-aggregators can index on n_errors and alert on > 0. Format-string version stringifies a number that operators will want to filter on.

counts[Disposition::KeepExpected][Class::File],
counts[Disposition::KeepExpected][Class::Directory],
counts[Disposition::KeepExpected][Class::Other],
Expand Down Expand Up @@ -296,9 +305,9 @@ impl Checkpointer {
}

let mut present: HashSet<String> = HashSet::new();
backend.list(cp_dir, &mut |path, file_type| {
if file_type != StorageFileType::Directory
&& let Some(name) = path.filename()
backend.list(cp_dir, &mut |entry| {
if !matches!(&entry.file_type, Ok(StorageFileType::Directory))
&& let Some(name) = entry.name.filename()
{
present.insert(name.to_string());
}
Expand Down Expand Up @@ -374,9 +383,9 @@ impl Checkpointer {
// also catches a missing marker on restore.
let cp_dir = Self::checkpoint_dir(uuid);
let mut state_files: Vec<String> = Vec::new();
self.backend.list(&cp_dir, &mut |path, file_type| {
if file_type != StorageFileType::Directory
&& let Some(name) = path.filename()
self.backend.list(&cp_dir, &mut |entry| {
if !matches!(&entry.file_type, Ok(StorageFileType::Directory))
&& let Some(name) = entry.name.filename()
&& name != CHECKPOINT_DEPENDENCIES
{
state_files.push(name.to_string());
Expand Down Expand Up @@ -435,9 +444,9 @@ impl Checkpointer {
Ok(checkpoints) => Ok(checkpoints),
Err(error) if error.kind() == ErrorKind::NotFound => {
let mut orphan_uuid_dirs: Vec<String> = Vec::new();
backend.list(&StoragePath::default(), &mut |path, file_type| {
if file_type == StorageFileType::Directory
&& let Some(name) = path.filename()
backend.list(&StoragePath::default(), &mut |entry| {
if matches!(&entry.file_type, Ok(StorageFileType::Directory))
&& let Some(name) = entry.name.filename()
&& Uuid::parse_str(name).is_ok()
{
orphan_uuid_dirs.push(name.to_string());
Expand Down Expand Up @@ -720,7 +729,7 @@ impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
mod test {
use std::sync::Arc;

use feldera_storage::{StorageBackend, StoragePath};
use feldera_storage::{DirEntry, StorageBackend, StoragePath};
use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
use feldera_types::constants::CHECKPOINT_FILE_NAME;
use itertools::Itertools;
Expand Down Expand Up @@ -766,7 +775,7 @@ mod test {
fn list(
&self,
parent: &StoragePath,
cb: &mut dyn FnMut(&StoragePath, feldera_storage::StorageFileType),
cb: &mut dyn FnMut(DirEntry),
) -> Result<(), feldera_storage::error::StorageError> {
self.inner.list(parent, cb)
}
Expand Down Expand Up @@ -801,7 +810,7 @@ mod test {
LogCapture::new(|| Checkpointer::new(backend.clone()).unwrap()).into_parts();
assert_eq!(
log,
" INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
" INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n"
);

let uuid = uuid::Uuid::now_v7();
Expand All @@ -823,7 +832,7 @@ mod test {

assert_eq!(
LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log,
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n"
);

// Add a couple of stray files, one that matches Feldera's pattern and
Expand All @@ -843,10 +852,10 @@ mod test {
assert_eq!(
log_sorted,
vec![
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused",
" INFO dbsp::circuit::checkpointer: Keeping unexpected File { size: 0 } unexpected.xyzzy",
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused; 0 error(s) reading directory entries",
" INFO dbsp::circuit::checkpointer: Keeping unexpected File unexpected.xyzzy",
" INFO dbsp::circuit::checkpointer: Removing unused Directory unexpected-dir",
" INFO dbsp::circuit::checkpointer: Removing unused File { size: 0 } expected.feldera"
" INFO dbsp::circuit::checkpointer: Removing unused File expected.feldera"
]
);
assert!(!expected_file.exists());
Expand Down Expand Up @@ -910,7 +919,7 @@ mod test {
let log = LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log;
assert_eq!(
log,
" INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
" INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries\n"
);
assert!(
batch_path.exists(),
Expand All @@ -922,7 +931,7 @@ mod test {
std::fs::remove_file(&batch_path).unwrap();
assert_eq!(
LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log,
r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused
r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused; 0 error(s) reading directory entries
ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) with the following UUID(s) have 1 missing file(s) in storage: 019e4708-bac8-7c10-b2f7-5cb248871905
ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) need missing file: w0-aaaaaaaa.feldera
"#
Expand Down
15 changes: 7 additions & 8 deletions crates/dbsp/src/storage/backend/memory_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use super::{BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageBackend, StorageError};
use crate::circuit::metrics::FILES_CREATED;
use crate::storage::buffer_cache::FBuf;
use feldera_storage::{FileCommitter, StorageFileType, StoragePath};
use feldera_storage::{DirEntry, FileCommitter, StorageFileType, StoragePath};
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::{
Expand Down Expand Up @@ -237,11 +237,7 @@ impl StorageBackend for MemoryBackend {
}
}

fn list(
&self,
parent: &StoragePath,
cb: &mut dyn FnMut(&StoragePath, StorageFileType),
) -> Result<(), StorageError> {
fn list(&self, parent: &StoragePath, cb: &mut dyn FnMut(DirEntry)) -> Result<(), StorageError> {
let entries = self
.0
.files
Expand All @@ -251,8 +247,11 @@ impl StorageBackend for MemoryBackend {
.filter(|&(name, _file)| name.prefix_matches(parent))
.map(|(name, file)| (name.clone(), file.size))
.collect::<Vec<_>>();
for (path, size) in entries {
cb(&path, StorageFileType::File { size });
for (name, size) in entries {
cb(DirEntry {
name,
file_type: Ok(StorageFileType::File { size }),
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new contract — that cb may receive a DirEntry whose file_type is Err(..) — is not exercised by any unit test. The memory backend always returns Ok(File { size }), and the posix path that produces an Err is hard to drive deterministically. Consider adding a tiny test-only backend (or a knob on MemoryBackend) that returns an Err file_type for a planted entry, so the checkpointer's n_errors/Class::Other accounting and per-call-site matches!(.., Ok(..)) checks have at least one regression test. Non-blocking, but the whole point of this PR is the new error path — it should be covered.

}
Ok(())
}
Expand Down
58 changes: 37 additions & 21 deletions crates/dbsp/src/storage/backend/posixio_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use feldera_storage::{
use feldera_types::config::{
FileBackendConfig, StorageBackendConfig, StorageCacheConfig, StorageConfig,
};
use std::ffi::OsString;
use std::fmt::Debug;
use std::fs::{DirEntry, create_dir_all};
use std::io::{ErrorKind, IoSlice, Write};
Expand All @@ -35,7 +34,7 @@ use std::{
atomic::{AtomicBool, AtomicI64, Ordering},
},
};
use tracing::warn;
use tracing::{debug, warn};

/// fsync the directory at `path` so a freshly-created child entry (a
/// rename target or a new subdirectory) becomes durable. Without this,
Expand Down Expand Up @@ -506,9 +505,9 @@ impl StorageBackend for PosixBackend {
fn list(
&self,
parent: &StoragePath,
cb: &mut dyn FnMut(&StoragePath, StorageFileType),
cb: &mut dyn FnMut(feldera_storage::DirEntry),
) -> Result<(), StorageError> {
fn parse_entry(entry: &DirEntry) -> Result<(OsString, StorageFileType), StorageError> {
fn get_file_type(entry: &DirEntry) -> Result<StorageFileType, StorageError> {
let file_type = entry.file_type().map_err(|e| {
StorageError::stdio(e.kind(), "readdir type", entry.path().display())
})?;
Expand All @@ -526,33 +525,50 @@ impl StorageBackend for PosixBackend {
} else {
StorageFileType::Other
};
Ok((entry.file_name(), file_type))
Ok(file_type)
}

let mut result = Ok(());
let path = self.fs_path(parent);
for entry in path
.read_dir()
.map_err(|e| StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display()))?
{
match entry
.map_err(|e| StorageError::stdio(e.kind(), "readdir entry", path.display()))
.and_then(|entry| parse_entry(&entry))
{
Err(e) => {
if e.kind() != ErrorKind::NotFound {
result = Err(e)
} else {
let entries = path.read_dir().map_err(|e| {
StorageError::stdio(e.kind(), "readdir", self.fs_path(parent).display())
})?;
let mut warnings = 0usize..20;
for entry in entries {
match entry {
Ok(entry) => {
let entry = feldera_storage::DirEntry {
name: parent
.child(StoragePathPart::from(entry.file_name().as_encoded_bytes())),
file_type: get_file_type(&entry),
};
if let Err(e) = &entry.file_type
&& e.kind() == ErrorKind::NotFound
{
// Ignore NotFound error. The file was probably
// `status.json.mut`, renamed by the adapters server to
// `status.json` between the call to readdir and the
// call to stat. Don't succumb to a race for it.
} else {
if let Err(e) = &entry.file_type {
match warnings.next_back() {
Some(1..) => warn!("I/O error listing {parent}: {e}"),
Some(0) => warn!(
"I/O error listing {parent} (further warnings will be at debug level): {e}"
),
None => debug!("I/O error listing {parent}: {e}"),
}
}
cb(entry);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit, non-blocking: this checks &entry.file_type twice (once for the NotFound short-circuit, once for the warn). Reads cleaner as a single match:

match &entry.file_type {
    Err(e) if e.kind() == ErrorKind::NotFound => continue,
    Err(e) => match warnings.next().unwrap() { /* … */ },
    Ok(_) => {}
}
cb(entry);

}
}
Ok((name, file_type)) => cb(
&parent.child(StoragePathPart::from(name.as_encoded_bytes())),
file_type,
),
Err(error) => {
result = Err(StorageError::stdio(
error.kind(),
"readdir entry",
path.display(),
));
}
}
}
result
Expand Down
Loading
Loading