diff --git a/crates/dbsp/src/circuit/checkpointer.rs b/crates/dbsp/src/circuit/checkpointer.rs index 7c154946c1e..7a0cc3f95ee 100644 --- a/crates/dbsp/src/circuit/checkpointer.rs +++ b/crates/dbsp/src/circuit/checkpointer.rs @@ -3,6 +3,7 @@ use crate::dynamic::{self, data::DataTyped}; use crate::storage::file::SerializerInner; use crate::{Error, NumEntries, TypedBox}; +use enum_map::{Enum, EnumMap}; use feldera_types::checkpoint::{ CheckpointDependencies, CheckpointDependenciesWrite, CheckpointMetadata, }; @@ -10,8 +11,12 @@ use feldera_types::constants::{ ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME, DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE, }; +use itertools::Itertools; use size_of::SizeOf; +use smallvec::SmallVec; +use tracing::{debug, error, info, warn}; +use std::collections::{BTreeMap, BTreeSet}; use std::io::ErrorKind; use std::sync::atomic::Ordering; use std::{ @@ -104,26 +109,32 @@ impl Checkpointer { /// directory. Returns the amount of storage still in use. pub fn gc_startup(&self) -> Result { // Collect all directories and files still referenced by a checkpoint - let mut in_use_paths: HashSet = HashSet::new(); - in_use_paths.insert(CHECKPOINT_FILE_NAME.into()); - in_use_paths.insert(STEPS_FILE.into()); - in_use_paths.insert(STATE_FILE.into()); + let mut in_use_paths = BTreeMap::>::new(); + in_use_paths.insert(CHECKPOINT_FILE_NAME.into(), SmallVec::new()); + in_use_paths.insert(STEPS_FILE.into(), SmallVec::new()); + in_use_paths.insert(STATE_FILE.into(), SmallVec::new()); // Don't delete either `status.json` or `status.json.mut` either because // these files get updated asynchronously and we must not interfere with // it. - in_use_paths.insert(STATUS_FILE.into()); - in_use_paths.insert(format!("{}.mut", STATUS_FILE).into()); - in_use_paths.insert(ADHOC_TEMP_DIR.into()); - in_use_paths.insert(ACTIVATION_MARKER_FILE.into()); - for cpm in self.checkpoint_list.iter() { - in_use_paths.insert(cpm.uuid.to_string().into()); + in_use_paths.insert(STATUS_FILE.into(), SmallVec::new()); + in_use_paths.insert(format!("{}.mut", STATUS_FILE).into(), SmallVec::new()); + in_use_paths.insert(ADHOC_TEMP_DIR.into(), SmallVec::new()); + in_use_paths.insert(ACTIVATION_MARKER_FILE.into(), SmallVec::new()); + for (checkpoint_index, cpm) in self.checkpoint_list.iter().enumerate() { + in_use_paths + .entry(cpm.uuid.to_string().into()) + .or_default() + .push(checkpoint_index); let batches = self.gather_batches_for_checkpoint(cpm)?; for batch in batches { - in_use_paths.insert(batch); + in_use_paths + .entry(batch) + .or_default() + .push(checkpoint_index); } } // Give the coordinator a namespace for persistent files. - in_use_paths.insert("coordinator".into()); + in_use_paths.insert("coordinator".into(), SmallVec::new()); /// True if `path` is a name that we might have created ourselves. fn is_feldera_filename(path: &StoragePath) -> bool { @@ -132,21 +143,126 @@ impl Checkpointer { } // Collect everything found in the storage directory + // + // This code is designed to tolerate `list()` reporting a single file + // name more than once (it should not happen but who knows?). + + /// Simplified [StorageFileType] (without a file size). + #[derive(Copy, Clone, Debug, Enum)] + enum Class { + File, + Directory, + Other, + } + impl From for Class { + fn from(value: StorageFileType) -> Self { + match value { + StorageFileType::File { .. } => Self::File, + StorageFileType::Directory => Self::Directory, + StorageFileType::Other => Self::Other, + } + } + } + + /// What to do to a file. + #[derive(Copy, Clone, Debug, Enum)] + enum Disposition { + /// Keep it because we know what it is and we need it. + KeepExpected, + /// Keep it because it is not ours and we do not want to risk it. + KeepUnexpected, + /// Remove it because we know what it is but we no longer need it. + Remove, + } let mut usage = 0; - self.backend.list(&StoragePath::default(), &mut |path, file_type| { - if !in_use_paths.contains(path) && (is_feldera_filename(path) || file_type == StorageFileType::Directory) { - match self.backend.delete_recursive(path) { - Ok(_) => { - tracing::debug!("Removed unused {file_type:?} '{path}'"); + let mut counts = EnumMap::from_fn(|_| EnumMap::from_fn(|_| 0usize)); + self.backend + .list(&StoragePath::default(), &mut |path, file_type| { + let class = Class::from(file_type); + let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(path) { + // Clear the indexes but don't remove the entry in case `list()` + // reports the same file name again, which should not happen but + // who knows?. (Otherwise, on the second appearance, we would + // delete the file.) + checkpoint_indexes.clear(); + Disposition::KeepExpected + } else if is_feldera_filename(path) || file_type == StorageFileType::Directory { + Disposition::Remove + } else { + Disposition::KeepUnexpected + }; + counts[disposition][class] += 1; + + match disposition { + Disposition::KeepExpected => (), + Disposition::KeepUnexpected => { + if counts[disposition][class] < 10 { + info!("Keeping unexpected {file_type:?} {path}"); + } else { + debug!("Keeping unexpected {file_type:?} {path}"); + } } - Err(e) => { - tracing::warn!("Unable to remove old-checkpoint file {path}: {e} (the pipeline will try to delete the file again on a restart)"); + Disposition::Remove => { + if counts[disposition][class] < 10 { + info!("Removing unused {file_type:?} {path}"); + } else { + debug!("Removing unused {file_type:?} {path}"); + } + if let Err(e) = self.backend.delete_recursive(path) { + warn!("Failed to delete {file_type:?} {path}: {e} (the pipeline will try to delete the file again on a restart)"); + } else { + return; + } } } - } else if let StorageFileType::File { size } = file_type { - usage += size; + if let StorageFileType::File { size } = file_type { + usage += size; + } + })?; + info!( + "GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused", + counts[Disposition::KeepExpected][Class::File], + counts[Disposition::KeepExpected][Class::Directory], + counts[Disposition::KeepExpected][Class::Other], + counts[Disposition::KeepUnexpected][Class::File], + counts[Disposition::KeepUnexpected][Class::Directory], + counts[Disposition::KeepUnexpected][Class::Other], + counts[Disposition::Remove][Class::File], + counts[Disposition::Remove][Class::Directory], + counts[Disposition::Remove][Class::Other], + ); + + // Log errors if any files that should be there do not appear. + in_use_paths.retain(|_name, indexes| !indexes.is_empty()); + if !in_use_paths.is_empty() { + let mut incomplete_checkpoints = BTreeSet::new(); + for checkpoint_indexes in in_use_paths.values() { + for checkpoint_index in checkpoint_indexes { + incomplete_checkpoints.insert(*checkpoint_index); + } } - })?; + error!( + "{} checkpoint(s) with the following UUID(s) have {} missing file(s) in storage: {}", + incomplete_checkpoints.len(), + in_use_paths.len(), + incomplete_checkpoints + .iter() + .copied() + .map(|index| &self.checkpoint_list[index].uuid) + .format(", ") + ); + + for (index, (name, checkpoint_indexes)) in in_use_paths.iter().enumerate() { + if index >= 32 { + error!("(only first {index} missing files logged by name)"); + break; + } + error!( + "{} checkpoint(s) need missing file: {name}", + checkpoint_indexes.len() + ); + } + } Ok(usage) } @@ -354,10 +470,10 @@ impl Checkpointer { fn remove_batch_file(&self, file: &StoragePath) { match self.backend.delete_if_exists(file) { Ok(_) => { - tracing::debug!("Removed file {file}"); + debug!("Removed file {file}"); } Err(e) => { - tracing::warn!( + warn!( "Unable to remove old-checkpoint file {file}: {e} (the pipeline will try to delete the file again on a restart)" ); } @@ -443,7 +559,7 @@ impl Checkpointer { self.remove_checkpoint_dir(*cpm)?; } - tracing::info!( + info!( "cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}", to_remove.len(), self.checkpoint_list @@ -607,9 +723,12 @@ mod test { use feldera_storage::{StorageBackend, StoragePath}; use feldera_types::config::{FileBackendConfig, StorageCacheConfig}; use feldera_types::constants::CHECKPOINT_FILE_NAME; + use itertools::Itertools; use std::collections::HashSet; + use uuid::uuid; use crate::storage::backend::posixio_impl::PosixBackend; + use crate::utils::LogCapture; use super::Checkpointer; @@ -678,7 +797,12 @@ mod test { StorageCacheConfig::default(), &FileBackendConfig::default(), )); - let mut checkpointer = Checkpointer::new(backend.clone()).unwrap(); + let (mut checkpointer, log) = + LogCapture::new(|| Checkpointer::new(backend.clone()).unwrap()).into_parts(); + assert_eq!( + log, + " INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + ); let uuid = uuid::Uuid::now_v7(); // Pre-populate a per-operator state file before commit so the @@ -697,6 +821,38 @@ mod test { Checkpointer::verify_checkpoint_intact(backend.as_ref(), &cp_path) .expect("intact checkpoint should verify"); + assert_eq!( + LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log, + " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + ); + + // Add a couple of stray files, one that matches Feldera's pattern and + // one that doesn't, plus a stray directory, and make sure that they get + // handled as they should. + // + // We sort the results because there's no guarantee that the directory + // will be in any particular order. + let expected_file = tempdir.path().join("expected.feldera"); + let unexpected_file = tempdir.path().join("unexpected.xyzzy"); + let unexpected_dir = tempdir.path().join("unexpected-dir"); + std::fs::write(&expected_file, b"").unwrap(); + std::fs::write(&unexpected_file, b"").unwrap(); + std::fs::create_dir(&unexpected_dir).unwrap(); + let log = LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log; + let log_sorted = log.lines().sorted().collect_vec(); + assert_eq!( + log_sorted, + vec![ + " INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused", + " INFO dbsp::circuit::checkpointer: Keeping unexpected File { size: 0 } unexpected.xyzzy", + " INFO dbsp::circuit::checkpointer: Removing unused Directory unexpected-dir", + " INFO dbsp::circuit::checkpointer: Removing unused File { size: 0 } expected.feldera" + ] + ); + assert!(!expected_file.exists()); + assert!(unexpected_file.exists()); + assert!(!unexpected_dir.exists()); + // Lose the per-operator file. The dependencies record still names it. std::fs::remove_file(&state_file).unwrap(); let err = Checkpointer::verify_checkpoint_intact(backend.as_ref(), &cp_path) @@ -722,7 +878,7 @@ mod test { }; let mut checkpointer = Checkpointer::new(make_backend()).unwrap(); - let uuid = uuid::Uuid::now_v7(); + let uuid = uuid!("019e4708-bac8-7c10-b2f7-5cb248871905"); // Pre-populate a batch file and a pspine-batches entry that // references it before calling commit, so the commit-time // dependencies.json captures the batch in its snapshot. @@ -751,12 +907,26 @@ mod test { assert!(cp_dir.join("dependencies.json").exists()); assert!(batch_path.exists()); - let _restarted = Checkpointer::new(make_backend()).unwrap(); + let log = LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log; + assert_eq!( + log, + " INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n" + ); assert!( batch_path.exists(), "startup GC deleted live batch {} after losing pspine-batches metadata", batch_path.display(), ); + + // Now delete the batch and ensure that startup reports that it's missing. + std::fs::remove_file(&batch_path).unwrap(); + assert_eq!( + LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log, + r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused +ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) with the following UUID(s) have 1 missing file(s) in storage: 019e4708-bac8-7c10-b2f7-5cb248871905 +ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) need missing file: w0-aaaaaaaa.feldera +"# + ); } /// Older checkpoints predate `dependencies.json` and only carry the diff --git a/crates/dbsp/src/utils.rs b/crates/dbsp/src/utils.rs index a8528842407..ea05239a3c2 100644 --- a/crates/dbsp/src/utils.rs +++ b/crates/dbsp/src/utils.rs @@ -117,3 +117,69 @@ pub fn process_rss_bytes() -> Option { memory_stats().map(|usage| usage.physical_mem as u64) } + +/// Results of capturing [tracing] output. +#[cfg(test)] +pub(crate) struct LogCapture { + /// Return value of the closure passed to [LogCapture::new]. + pub retval: T, + + /// Log output during [LogCapture::new]. + pub log: String, +} + +#[cfg(test)] +impl LogCapture { + /// Calls `f`, capturing [tracing] output to the log. + pub fn new(f: F) -> Self + where + F: FnOnce() -> T, + { + use std::{ + io::Write, + sync::{Arc, Mutex}, + }; + + use tracing::subscriber::with_default; + + #[derive(Debug, Default)] + struct CaptureBuf(Mutex>); + impl<'a> Write for &'a CaptureBuf { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + impl CaptureBuf { + fn take(&self) -> String { + String::from_utf8(std::mem::take(&mut *self.0.lock().unwrap())).unwrap() + } + } + + // Without the following line, capture occasionally fails to capture + // anything at all. There might be some kind of race between + // initializing the global and per-thread subscribers. + let _ = tracing_subscriber::fmt::try_init(); + + let tracing_capture = Arc::new(CaptureBuf::default()); + let retval = with_default( + tracing_subscriber::FmtSubscriber::builder() + .with_ansi(false) + .without_time() + .with_writer(tracing_capture.clone()) + .finish(), + f, + ); + Self { + log: tracing_capture.take(), + retval, + } + } + + pub fn into_parts(self) -> (T, String) { + (self.retval, self.log) + } +}