Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 198 additions & 28 deletions crates/dbsp/src/circuit/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
use crate::dynamic::{self, data::DataTyped};
use crate::storage::file::SerializerInner;
use crate::{Error, NumEntries, TypedBox};
use enum_map::{Enum, EnumMap};
use feldera_types::checkpoint::{
CheckpointDependencies, CheckpointDependenciesWrite, CheckpointMetadata,
};
use feldera_types::constants::{
ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME,
DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE,
};
use itertools::Itertools;
use size_of::SizeOf;
use smallvec::SmallVec;
use tracing::{debug, error, info, warn};

use std::collections::{BTreeMap, BTreeSet};
use std::io::ErrorKind;
use std::sync::atomic::Ordering;
use std::{
Expand Down Expand Up @@ -104,26 +109,32 @@ impl Checkpointer {
/// directory. Returns the amount of storage still in use.
pub fn gc_startup(&self) -> Result<u64, Error> {
// Collect all directories and files still referenced by a checkpoint
let mut in_use_paths: HashSet<StoragePath> = HashSet::new();
in_use_paths.insert(CHECKPOINT_FILE_NAME.into());
in_use_paths.insert(STEPS_FILE.into());
in_use_paths.insert(STATE_FILE.into());
let mut in_use_paths = BTreeMap::<StoragePath, SmallVec<[usize; 2]>>::new();
in_use_paths.insert(CHECKPOINT_FILE_NAME.into(), SmallVec::new());
in_use_paths.insert(STEPS_FILE.into(), SmallVec::new());
in_use_paths.insert(STATE_FILE.into(), SmallVec::new());
// Don't delete either `status.json` or `status.json.mut` either because
// these files get updated asynchronously and we must not interfere with
// it.
in_use_paths.insert(STATUS_FILE.into());
in_use_paths.insert(format!("{}.mut", STATUS_FILE).into());
in_use_paths.insert(ADHOC_TEMP_DIR.into());
in_use_paths.insert(ACTIVATION_MARKER_FILE.into());
for cpm in self.checkpoint_list.iter() {
in_use_paths.insert(cpm.uuid.to_string().into());
in_use_paths.insert(STATUS_FILE.into(), SmallVec::new());
in_use_paths.insert(format!("{}.mut", STATUS_FILE).into(), SmallVec::new());
in_use_paths.insert(ADHOC_TEMP_DIR.into(), SmallVec::new());
in_use_paths.insert(ACTIVATION_MARKER_FILE.into(), SmallVec::new());
for (checkpoint_index, cpm) in self.checkpoint_list.iter().enumerate() {
in_use_paths
.entry(cpm.uuid.to_string().into())
.or_default()
.push(checkpoint_index);
let batches = self.gather_batches_for_checkpoint(cpm)?;
Comment thread
blp marked this conversation as resolved.
for batch in batches {
in_use_paths.insert(batch);
in_use_paths
.entry(batch)
.or_default()
.push(checkpoint_index);
}
}
// Give the coordinator a namespace for persistent files.
in_use_paths.insert("coordinator".into());
in_use_paths.insert("coordinator".into(), SmallVec::new());

/// True if `path` is a name that we might have created ourselves.
fn is_feldera_filename(path: &StoragePath) -> bool {
Expand All @@ -132,21 +143,126 @@ impl Checkpointer {
}

// Collect everything found in the storage directory
//
// This code is designed to tolerate `list()` reporting a single file
// name more than once (it should not happen but who knows?).

/// Simplified [StorageFileType] (without a file size).
#[derive(Copy, Clone, Debug, Enum)]
enum Class {
File,
Directory,
Other,
}
impl From<StorageFileType> for Class {
fn from(value: StorageFileType) -> Self {
match value {
StorageFileType::File { .. } => Self::File,
StorageFileType::Directory => Self::Directory,
StorageFileType::Other => Self::Other,
}
}
}

/// What to do to a file.
#[derive(Copy, Clone, Debug, Enum)]
enum Disposition {
/// Keep it because we know what it is and we need it.
KeepExpected,
/// Keep it because it is not ours and we do not want to risk it.
KeepUnexpected,
/// Remove it because we know what it is but we no longer need it.
Remove,
}
let mut usage = 0;
self.backend.list(&StoragePath::default(), &mut |path, file_type| {
if !in_use_paths.contains(path) && (is_feldera_filename(path) || file_type == StorageFileType::Directory) {
match self.backend.delete_recursive(path) {
Ok(_) => {
tracing::debug!("Removed unused {file_type:?} '{path}'");
let mut counts = EnumMap::from_fn(|_| EnumMap::from_fn(|_| 0usize));
self.backend
.list(&StoragePath::default(), &mut |path, file_type| {
let class = Class::from(file_type);
let disposition = if let Some(checkpoint_indexes) = in_use_paths.get_mut(path) {
// Clear the indexes but don't remove the entry in case `list()`
// reports the same file name again, which should not happen but
// who knows?. (Otherwise, on the second appearance, we would
// delete the file.)
checkpoint_indexes.clear();
Disposition::KeepExpected
} else if is_feldera_filename(path) || file_type == StorageFileType::Directory {
Disposition::Remove
} else {
Disposition::KeepUnexpected
};
counts[disposition][class] += 1;

match disposition {
Disposition::KeepExpected => (),
Disposition::KeepUnexpected => {
if counts[disposition][class] < 10 {
info!("Keeping unexpected {file_type:?} {path}");
} else {
debug!("Keeping unexpected {file_type:?} {path}");
}
}
Err(e) => {
tracing::warn!("Unable to remove old-checkpoint file {path}: {e} (the pipeline will try to delete the file again on a restart)");
Disposition::Remove => {
if counts[disposition][class] < 10 {
Comment thread
blp marked this conversation as resolved.
info!("Removing unused {file_type:?} {path}");
} else {
debug!("Removing unused {file_type:?} {path}");
}
if let Err(e) = self.backend.delete_recursive(path) {
warn!("Failed to delete {file_type:?} {path}: {e} (the pipeline will try to delete the file again on a restart)");
} else {
Comment thread
blp marked this conversation as resolved.
return;
}
}
}
} else if let StorageFileType::File { size } = file_type {
usage += size;
if let StorageFileType::File { size } = file_type {
usage += size;
}
})?;
info!(
"GC kept {}/{}/{} expected files/directories/other, kept {}/{}/{} unexpected, and deleted {}/{}/{} unused",
counts[Disposition::KeepExpected][Class::File],
counts[Disposition::KeepExpected][Class::Directory],
counts[Disposition::KeepExpected][Class::Other],
counts[Disposition::KeepUnexpected][Class::File],
counts[Disposition::KeepUnexpected][Class::Directory],
counts[Disposition::KeepUnexpected][Class::Other],
counts[Disposition::Remove][Class::File],
counts[Disposition::Remove][Class::Directory],
counts[Disposition::Remove][Class::Other],
);

// Log errors if any files that should be there do not appear.
in_use_paths.retain(|_name, indexes| !indexes.is_empty());
if !in_use_paths.is_empty() {
let mut incomplete_checkpoints = BTreeSet::new();
for checkpoint_indexes in in_use_paths.values() {
for checkpoint_index in checkpoint_indexes {
incomplete_checkpoints.insert(*checkpoint_index);
}
}
})?;
error!(
"{} checkpoint(s) with the following UUID(s) have {} missing file(s) in storage: {}",
incomplete_checkpoints.len(),
Comment thread
blp marked this conversation as resolved.
in_use_paths.len(),
incomplete_checkpoints
.iter()
.copied()
.map(|index| &self.checkpoint_list[index].uuid)
.format(", ")
);

for (index, (name, checkpoint_indexes)) in in_use_paths.iter().enumerate() {
if index >= 32 {
error!("(only first {index} missing files logged by name)");
break;
}
error!(
"{} checkpoint(s) need missing file: {name}",
checkpoint_indexes.len()
);
}
Comment thread
blp marked this conversation as resolved.
}

Ok(usage)
}
Expand Down Expand Up @@ -354,10 +470,10 @@ impl Checkpointer {
fn remove_batch_file(&self, file: &StoragePath) {
match self.backend.delete_if_exists(file) {
Ok(_) => {
tracing::debug!("Removed file {file}");
debug!("Removed file {file}");
}
Err(e) => {
tracing::warn!(
warn!(
"Unable to remove old-checkpoint file {file}: {e} (the pipeline will try to delete the file again on a restart)"
);
}
Expand Down Expand Up @@ -443,7 +559,7 @@ impl Checkpointer {
self.remove_checkpoint_dir(*cpm)?;
}

tracing::info!(
info!(
"cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}",
to_remove.len(),
self.checkpoint_list
Expand Down Expand Up @@ -607,9 +723,12 @@ mod test {
use feldera_storage::{StorageBackend, StoragePath};
use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
use feldera_types::constants::CHECKPOINT_FILE_NAME;
use itertools::Itertools;
use std::collections::HashSet;
use uuid::uuid;

use crate::storage::backend::posixio_impl::PosixBackend;
use crate::utils::LogCapture;

use super::Checkpointer;

Expand Down Expand Up @@ -678,7 +797,12 @@ mod test {
StorageCacheConfig::default(),
&FileBackendConfig::default(),
));
let mut checkpointer = Checkpointer::new(backend.clone()).unwrap();
let (mut checkpointer, log) =
LogCapture::new(|| Checkpointer::new(backend.clone()).unwrap()).into_parts();
assert_eq!(
Comment thread
blp marked this conversation as resolved.
log,
" INFO dbsp::circuit::checkpointer: GC kept 0/0/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
);

let uuid = uuid::Uuid::now_v7();
// Pre-populate a per-operator state file before commit so the
Expand All @@ -697,6 +821,38 @@ mod test {
Checkpointer::verify_checkpoint_intact(backend.as_ref(), &cp_path)
.expect("intact checkpoint should verify");

assert_eq!(
LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log,
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
);

// Add a couple of stray files, one that matches Feldera's pattern and
// one that doesn't, plus a stray directory, and make sure that they get
// handled as they should.
//
// We sort the results because there's no guarantee that the directory
// will be in any particular order.
let expected_file = tempdir.path().join("expected.feldera");
let unexpected_file = tempdir.path().join("unexpected.xyzzy");
let unexpected_dir = tempdir.path().join("unexpected-dir");
std::fs::write(&expected_file, b"").unwrap();
std::fs::write(&unexpected_file, b"").unwrap();
std::fs::create_dir(&unexpected_dir).unwrap();
let log = LogCapture::new(|| drop(Checkpointer::new(backend.clone()).unwrap())).log;
let log_sorted = log.lines().sorted().collect_vec();
assert_eq!(
log_sorted,
vec![
" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 1/0/0 unexpected, and deleted 1/1/0 unused",
" INFO dbsp::circuit::checkpointer: Keeping unexpected File { size: 0 } unexpected.xyzzy",
" INFO dbsp::circuit::checkpointer: Removing unused Directory unexpected-dir",
" INFO dbsp::circuit::checkpointer: Removing unused File { size: 0 } expected.feldera"
]
);
assert!(!expected_file.exists());
assert!(unexpected_file.exists());
assert!(!unexpected_dir.exists());

// Lose the per-operator file. The dependencies record still names it.
std::fs::remove_file(&state_file).unwrap();
let err = Checkpointer::verify_checkpoint_intact(backend.as_ref(), &cp_path)
Expand All @@ -722,7 +878,7 @@ mod test {
};

let mut checkpointer = Checkpointer::new(make_backend()).unwrap();
let uuid = uuid::Uuid::now_v7();
let uuid = uuid!("019e4708-bac8-7c10-b2f7-5cb248871905");
// Pre-populate a batch file and a pspine-batches entry that
// references it before calling commit, so the commit-time
// dependencies.json captures the batch in its snapshot.
Expand Down Expand Up @@ -751,12 +907,26 @@ mod test {
assert!(cp_dir.join("dependencies.json").exists());
assert!(batch_path.exists());

let _restarted = Checkpointer::new(make_backend()).unwrap();
let log = LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log;
assert_eq!(
log,
" INFO dbsp::circuit::checkpointer: GC kept 2/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused\n"
);
assert!(
batch_path.exists(),
"startup GC deleted live batch {} after losing pspine-batches metadata",
batch_path.display(),
);

// Now delete the batch and ensure that startup reports that it's missing.
std::fs::remove_file(&batch_path).unwrap();
assert_eq!(
LogCapture::new(|| drop(Checkpointer::new(make_backend()).unwrap())).log,
r#" INFO dbsp::circuit::checkpointer: GC kept 1/1/0 expected files/directories/other, kept 0/0/0 unexpected, and deleted 0/0/0 unused
ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) with the following UUID(s) have 1 missing file(s) in storage: 019e4708-bac8-7c10-b2f7-5cb248871905
ERROR dbsp::circuit::checkpointer: 1 checkpoint(s) need missing file: w0-aaaaaaaa.feldera
"#
);
}

/// Older checkpoints predate `dependencies.json` and only carry the
Expand Down
66 changes: 66 additions & 0 deletions crates/dbsp/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,69 @@ pub fn process_rss_bytes() -> Option<u64> {

memory_stats().map(|usage| usage.physical_mem as u64)
}

/// Results of capturing [tracing] output.
#[cfg(test)]
pub(crate) struct LogCapture<T> {
/// Return value of the closure passed to [LogCapture::new].
pub retval: T,

/// Log output during [LogCapture::new].
pub log: String,
}

#[cfg(test)]
impl<T> LogCapture<T> {
/// Calls `f`, capturing [tracing] output to the log.
pub fn new<F>(f: F) -> Self
where
F: FnOnce() -> T,
{
use std::{
io::Write,
sync::{Arc, Mutex},
};

use tracing::subscriber::with_default;

#[derive(Debug, Default)]
struct CaptureBuf(Mutex<Vec<u8>>);
impl<'a> Write for &'a CaptureBuf {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl CaptureBuf {
fn take(&self) -> String {
String::from_utf8(std::mem::take(&mut *self.0.lock().unwrap())).unwrap()
}
}

// Without the following line, capture occasionally fails to capture
// anything at all. There might be some kind of race between
// initializing the global and per-thread subscribers.
let _ = tracing_subscriber::fmt::try_init();

let tracing_capture = Arc::new(CaptureBuf::default());
let retval = with_default(
tracing_subscriber::FmtSubscriber::builder()
.with_ansi(false)
.without_time()
.with_writer(tracing_capture.clone())
.finish(),
f,
);
Self {
log: tracing_capture.take(),
retval,
}
}

pub fn into_parts(self) -> (T, String) {
(self.retval, self.log)
}
}
Loading