cloud_topics: make min local threshold a compaction floor#30757
Draft
Lazin wants to merge 6 commits into
Draft
Conversation
…cal_threshold Rename the ctp_stm local-retention floor field from allowed_local_start_offset to min_allowed_local_threshold -- MASH, for Min Allowed local threSHold. This is a pure mechanical rename of the existing field, its state/api accessors, and the set_min_allowed_local_threshold_cmd command identifier. The command's numeric ctp_stm_key is unchanged, so the on-wire format is preserved. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
ctp_stm housekeeping needs the same local-retention offset that do_gc computes, including the adjustment of bogus (future) retention timestamps that mutates segment indexes. Factor that logic out of do_gc into a new async log query, adjusted_retention_offset, which applies retention overrides, adjusts timestamps, and returns the eviction offset without requesting eviction. do_gc now delegates to it, and the query is exposed on the storage::log interface (and the failure_injectable_log test wrapper) so ctp_stm can share it. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Add level_zero_notifier, a peering_sharded_service under
cloud_topics/level_zero/notifier/ and owned directly by app. Given an
{ntp, min_allowed_local_threshold}, it resolves the partition's home
shard via shard_table, hops there with container().invoke_on, and
replicates the floor through ctp_stm_api, retrying transient failures
(not-leader / timeout) up to a bounded number of attempts. Replication
on each shard is bounded by a semaphore, and a gate guards shutdown.
The call is not fire-and-forget: set_min_allowed_local_threshold returns
the replication result so the caller decides whether a failure is fatal.
A partition that is not hosted on this node (or has no ctp_stm) is
reported as a no-op success.
app constructs the notifier with shard_table and partition_manager;
construct_service drives its stop(). The housekeeper is left untouched.
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
After compaction, compaction_sink computes the new min_allowed_local_threshold floor (the exclusive lower bound for local reads) from the newly cleaned ranges and replicates it through the level_zero_notifier before committing the metastore replace. If replication fails and tombstones were removed at or above the floor the commit is skipped, because local reads in that range could otherwise serve records that compaction has just deleted in L1. When no tombstones were removed above the floor the failure is logged and the commit proceeds. The notifier is threaded scheduler -> worker_manager (resolved via the app-owned ss::sharded<level_zero_notifier>) -> worker -> compaction_sink. Compaction takes no ctp_stm / shard_table / partition_manager dependency of its own; the notifier handles the cross-shard routing internally. The sink is given a fresh copy of the partition ntp (log->ntp) rather than the local ntp already moved into compaction_source, so the floor update targets a valid ntp. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
ctp_stm prefix-truncation now folds the L1-compaction floor (min_allowed_local_threshold) into its target alongside local retention, active readers, and the LRLO cap. The floor is enforced monotonically in the stm state and replication short-circuits no-op updates, so the advance from compaction converges safely. prefix_truncate_target consumes storage's adjusted_retention_offset (which handles bogus retention timestamps) instead of the removed pure query, and so becomes async. The floor field is spelled out as min_allowed_local_threshold throughout (no 'also' abbreviation), and the to_log_offset translation is wrapped in try/catch. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Cover local retention for cloud-topic partitions: the local log is trimmed toward retention.local.target.bytes while the authoritative data lives in L1, bounded by active readers and the compaction floor. The steady-state assertion uses a config-derived ceiling (local target plus two segments of headroom per replica) so a regression that ignores the floor or local retention is caught rather than passing vacuously. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
This was referenced Jun 10, 2026
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the core “local retention driven by L1 compaction” behavior for cloud topics by (a) introducing a cross-shard notifier to replicate L1-derived floors into ctp_stm, and (b) updating ctp_stm to treat min_allowed_local_threshold as a compaction floor when deciding how far to prefix-truncate the local raft log. Adds ducktape and unit test coverage for tiered cloud local retention behavior.
Changes:
- Add
storage::log::adjusted_retention_offset(gc_config)and implement it indisk_log_implsoctp_stmcan reuse storage-layer retention computation (including strict/local overrides and cloud GC offsets). - Replace the previous “allowed local start offset hint” with a monotonic
min_allowed_local_thresholdfloor, and updatectp_stmprefix truncation to combine retention target + floor + LRLO cap. - Wire L1 compaction finalization to replicate the new floor back to the owning partition via
level_zero_notifier, and add unit + ducktape tests.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/tiered_cloud_local_retention_test.py | New ducktape E2E test validating tiered cloud local retention behavior under strict targets, mode flips, disk pressure, and compaction. |
| src/v/storage/log.h | Adds new virtual adjusted_retention_offset API for retention-driven prefix truncation. |
| src/v/storage/disk_log_impl.h | Declares disk_log_impl::adjusted_retention_offset. |
| src/v/storage/disk_log_impl.cc | Implements adjusted_retention_offset, updates local override gating for cloud topics, and switches GC path to call the new method. |
| src/v/raft/tests/failure_injectable_log.h | Extends test log wrapper to implement adjusted_retention_offset. |
| src/v/raft/tests/failure_injectable_log.cc | Delegates adjusted_retention_offset to underlying log in test wrapper. |
| src/v/cloud_topics/level_zero/stm/types.h | Renames command key enum to set_min_allowed_local_threshold. |
| src/v/cloud_topics/level_zero/stm/types.cc | Updates formatter string for renamed command key. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc | Updates tests for async prefix_truncate_target and new floor semantics/command naming. |
| src/v/cloud_topics/level_zero/stm/tests/ctp_stm_state_test.cc | Renames tests and adds monotonicity/serde roundtrip coverage for min threshold. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.h | Makes prefix_truncate_target async, documents composition of retention/floor/cap, adds build_gc_config. |
| src/v/cloud_topics/level_zero/stm/ctp_stm.cc | Implements async truncation target combining storage retention and compaction floor; applies new command handling. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.h | Renames state field/accessors to min_allowed_local_threshold and updates serde fields. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_state.cc | Implements monotonic floor semantics (ignores decreases, allows nullopt reset). |
| src/v/cloud_topics/level_zero/stm/ctp_stm_commands.h | Renames set-floor command type/key to set_min_allowed_local_threshold_cmd. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.h | Renames API method to set_min_allowed_local_threshold. |
| src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc | Implements monotonic/idempotent replication semantics for the new floor command. |
| src/v/cloud_topics/level_zero/notifier/level_zero_notifier.h | New per-shard service to replicate L1-derived floors to the owning partition’s ctp_stm. |
| src/v/cloud_topics/level_zero/notifier/level_zero_notifier.cc | Implements cross-shard lookup + retrying replication to ctp_stm_api. |
| src/v/cloud_topics/level_zero/notifier/tests/level_zero_notifier_test.cc | New unit tests for retry/leader vs follower behavior. |
| src/v/cloud_topics/level_zero/notifier/tests/BUILD | Bazel target for level_zero_notifier unit tests. |
| src/v/cloud_topics/level_zero/notifier/BUILD | Bazel library target for level_zero_notifier. |
| src/v/cloud_topics/level_one/maintenance/worker.h | Threads notifier pointer through compaction worker to sinks. |
| src/v/cloud_topics/level_one/maintenance/worker.cc | Passes NTP + notifier into compaction_sink. |
| src/v/cloud_topics/level_one/maintenance/worker_manager.h | Accepts optional notifier shard service and stores it. |
| src/v/cloud_topics/level_one/maintenance/worker_manager.cc | Wires notifier into per-shard worker construction. |
| src/v/cloud_topics/level_one/maintenance/scheduler.h | Extends scheduler constructor signature to accept notifier. |
| src/v/cloud_topics/level_one/maintenance/scheduler.cc | Wires notifier into worker_manager from scheduler. |
| src/v/cloud_topics/level_one/maintenance/compaction/tests/reducer_test.cc | Updates compaction sink construction to pass NTP. |
| src/v/cloud_topics/level_one/maintenance/compaction/compaction_sink.h | Extends sink to carry NTP + notifier and notify floor after finalize. |
| src/v/cloud_topics/level_one/maintenance/compaction/compaction_sink.cc | Computes new floor from cleaned ranges; replicates it (conditionally blocking commit if required). |
| src/v/cloud_topics/level_one/maintenance/compaction/BUILD | Adds notifier dep for compaction code. |
| src/v/cloud_topics/level_one/maintenance/BUILD | Adds notifier dep for maintenance module. |
| src/v/cloud_topics/BUILD | Adds notifier dep for cloud_topics top-level library. |
| src/v/cloud_topics/app.h | Adds sharded l0_notifier service to cloud topics app. |
| src/v/cloud_topics/app.cc | Constructs notifier service and wires it into L1 compaction scheduler. |
| cfg); | ||
|
|
||
| auto max_offset = co_await maybe_adjusted_retention_offset(cfg); | ||
| auto max_offset = co_await adjusted_retention_offset(cfg); |
Comment on lines
+83
to
+85
| auto last_error = ctp_stm_api_errc::timeout; | ||
| for (int attempt = 0; attempt < max_attempts && !_as.abort_requested(); | ||
| ++attempt) { |
| ss::future<std::expected<void, ctp_stm_api_errc>> | ||
| level_zero_notifier::set_min_allowed_local_threshold( | ||
| model::ntp ntp, kafka::offset new_floor) { | ||
| auto shard = _shard_table->local().shard_for(ntp); |
Comment on lines
+113
to
118
| /// Sets the cached `min_allowed_local_threshold` hint on ctp_stm. | ||
| /// | ||
| /// The reconciler replicates this command after computing the offset | ||
| /// from local segment stats and effective local-retention targets. | ||
| /// nullopt clears the hint (used when storage.mode != tiered_cloud or | ||
| /// when compaction is enabled on the topic). |
Comment on lines
+133
to
138
| /// Set the min allowed local threshold hint. | ||
| /// | ||
| /// This value is produced by the reconciler and indicates the lower bound | ||
| /// of kafka offsets that may be retained locally. The STM caches it but | ||
| /// does not enforce truncation directly; a separate path applies it as a | ||
| /// prefix-truncate target on the raft log. |
Comment on lines
+82
to
83
| /// Replicate a set_min_allowed_local_threshold_cmd. The STM stores the | ||
| /// value as a hint for prefix_truncate_below_lro. |
Comment on lines
+236
to
237
| /// Min allowed local threshold hint, produced by the reconciler. | ||
| /// Cached state only; truncation is applied elsewhere. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is PR 2 of 3. It builds on PR 1 and implements the core behavior: driving
cloud topics local retention from L1 compaction.
wiring the
level_zero_notifier(added in PR 1) into the maintenance worker.min_allowed_local_thresholda compaction floor solocal retention never truncates below what L1 still needs.
Stack
This PR is part of a 3-PR stack — please review/merge in order:
This is PR 2 of 3. All three target
dev, so GitHub shows this PR's diffagainst
dev; it includes commits from earlier PRs in the stack until thosemerge. Review only the commits owned by this PR.
Backports Required
Release Notes