Skip to content

cloud_topics: make min local threshold a compaction floor#30757

Draft
Lazin wants to merge 6 commits into
redpanda-data:devfrom
Lazin:ct/min-local-threshold-2-floor
Draft

cloud_topics: make min local threshold a compaction floor#30757
Lazin wants to merge 6 commits into
redpanda-data:devfrom
Lazin:ct/min-local-threshold-2-floor

Conversation

@Lazin

@Lazin Lazin commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

This is PR 2 of 3. It builds on PR 1 and implements the core behavior: driving
cloud topics local retention from L1 compaction.

  • cloud_topics/l1: advance the min allowed local threshold after compaction,
    wiring the level_zero_notifier (added in PR 1) into the maintenance worker.
  • cloud_topics/stm: make min_allowed_local_threshold a compaction floor so
    local retention never truncates below what L1 still needs.
  • tests: ducktape coverage for tiered_cloud local retention.

Stack

This PR is part of a 3-PR stack — please review/merge in order:

  1. cloud_topics: prep for compaction-driven local retention #30756 — cloud_topics: prep for compaction-driven local retention
  2. cloud_topics: make min local threshold a compaction floor #30757 — cloud_topics: make min local threshold a compaction floor
  3. cloud_topics: pivot tiered_cloud reads on the local log start #30758 — cloud_topics: pivot tiered_cloud reads on the local log start

This is PR 2 of 3. All three target dev, so GitHub shows this PR's diff
against dev; it includes commits from earlier PRs in the stack until those
merge. Review only the commits owned by this PR.

Backports Required

  • none - not a bug fix

Release Notes

  • none

Lazin added 6 commits June 10, 2026 07:05
…cal_threshold

Rename the ctp_stm local-retention floor field from
allowed_local_start_offset to min_allowed_local_threshold -- MASH, for
Min Allowed local threSHold. This is a pure mechanical rename of the
existing field, its state/api accessors, and the
set_min_allowed_local_threshold_cmd command identifier. The command's
numeric ctp_stm_key is unchanged, so the on-wire format is preserved.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
ctp_stm housekeeping needs the same local-retention offset that do_gc
computes, including the adjustment of bogus (future) retention
timestamps that mutates segment indexes. Factor that logic out of do_gc
into a new async log query, adjusted_retention_offset, which applies
retention overrides, adjusts timestamps, and returns the eviction offset
without requesting eviction. do_gc now delegates to it, and the query is
exposed on the storage::log interface (and the failure_injectable_log
test wrapper) so ctp_stm can share it.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Add level_zero_notifier, a peering_sharded_service under
cloud_topics/level_zero/notifier/ and owned directly by app. Given an
{ntp, min_allowed_local_threshold}, it resolves the partition's home
shard via shard_table, hops there with container().invoke_on, and
replicates the floor through ctp_stm_api, retrying transient failures
(not-leader / timeout) up to a bounded number of attempts. Replication
on each shard is bounded by a semaphore, and a gate guards shutdown.

The call is not fire-and-forget: set_min_allowed_local_threshold returns
the replication result so the caller decides whether a failure is fatal.
A partition that is not hosted on this node (or has no ctp_stm) is
reported as a no-op success.

app constructs the notifier with shard_table and partition_manager;
construct_service drives its stop(). The housekeeper is left untouched.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
After compaction, compaction_sink computes the new
min_allowed_local_threshold floor (the exclusive lower bound for local
reads) from the newly cleaned ranges and replicates it through the
level_zero_notifier before committing the metastore replace. If
replication fails and tombstones were removed at or above the floor the
commit is skipped, because local reads in that range could otherwise
serve records that compaction has just deleted in L1. When no tombstones
were removed above the floor the failure is logged and the commit
proceeds.

The notifier is threaded scheduler -> worker_manager (resolved via the
app-owned ss::sharded<level_zero_notifier>) -> worker -> compaction_sink.
Compaction takes no ctp_stm / shard_table / partition_manager dependency
of its own; the notifier handles the cross-shard routing internally.

The sink is given a fresh copy of the partition ntp (log->ntp) rather
than the local ntp already moved into compaction_source, so the floor
update targets a valid ntp.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
ctp_stm prefix-truncation now folds the L1-compaction floor
(min_allowed_local_threshold) into its target alongside local retention,
active readers, and the LRLO cap. The floor is enforced monotonically in
the stm state and replication short-circuits no-op updates, so the
advance from compaction converges safely.

prefix_truncate_target consumes storage's adjusted_retention_offset
(which handles bogus retention timestamps) instead of the removed pure
query, and so becomes async. The floor field is spelled out as
min_allowed_local_threshold throughout (no 'also' abbreviation), and the
to_log_offset translation is wrapped in try/catch.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Cover local retention for cloud-topic partitions: the local log is
trimmed toward retention.local.target.bytes while the authoritative data
lives in L1, bounded by active readers and the compaction floor. The
steady-state assertion uses a config-derived ceiling (local target plus
two segments of headroom per replica) so a regression that ignores the
floor or local retention is caught rather than passing vacuously.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements the core “local retention driven by L1 compaction” behavior for cloud topics by (a) introducing a cross-shard notifier to replicate L1-derived floors into ctp_stm, and (b) updating ctp_stm to treat min_allowed_local_threshold as a compaction floor when deciding how far to prefix-truncate the local raft log. Adds ducktape and unit test coverage for tiered cloud local retention behavior.

Changes:

  • Add storage::log::adjusted_retention_offset(gc_config) and implement it in disk_log_impl so ctp_stm can reuse storage-layer retention computation (including strict/local overrides and cloud GC offsets).
  • Replace the previous “allowed local start offset hint” with a monotonic min_allowed_local_threshold floor, and update ctp_stm prefix truncation to combine retention target + floor + LRLO cap.
  • Wire L1 compaction finalization to replicate the new floor back to the owning partition via level_zero_notifier, and add unit + ducktape tests.

Reviewed changes

Copilot reviewed 36 out of 36 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/rptest/tests/tiered_cloud_local_retention_test.py New ducktape E2E test validating tiered cloud local retention behavior under strict targets, mode flips, disk pressure, and compaction.
src/v/storage/log.h Adds new virtual adjusted_retention_offset API for retention-driven prefix truncation.
src/v/storage/disk_log_impl.h Declares disk_log_impl::adjusted_retention_offset.
src/v/storage/disk_log_impl.cc Implements adjusted_retention_offset, updates local override gating for cloud topics, and switches GC path to call the new method.
src/v/raft/tests/failure_injectable_log.h Extends test log wrapper to implement adjusted_retention_offset.
src/v/raft/tests/failure_injectable_log.cc Delegates adjusted_retention_offset to underlying log in test wrapper.
src/v/cloud_topics/level_zero/stm/types.h Renames command key enum to set_min_allowed_local_threshold.
src/v/cloud_topics/level_zero/stm/types.cc Updates formatter string for renamed command key.
src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc Updates tests for async prefix_truncate_target and new floor semantics/command naming.
src/v/cloud_topics/level_zero/stm/tests/ctp_stm_state_test.cc Renames tests and adds monotonicity/serde roundtrip coverage for min threshold.
src/v/cloud_topics/level_zero/stm/ctp_stm.h Makes prefix_truncate_target async, documents composition of retention/floor/cap, adds build_gc_config.
src/v/cloud_topics/level_zero/stm/ctp_stm.cc Implements async truncation target combining storage retention and compaction floor; applies new command handling.
src/v/cloud_topics/level_zero/stm/ctp_stm_state.h Renames state field/accessors to min_allowed_local_threshold and updates serde fields.
src/v/cloud_topics/level_zero/stm/ctp_stm_state.cc Implements monotonic floor semantics (ignores decreases, allows nullopt reset).
src/v/cloud_topics/level_zero/stm/ctp_stm_commands.h Renames set-floor command type/key to set_min_allowed_local_threshold_cmd.
src/v/cloud_topics/level_zero/stm/ctp_stm_api.h Renames API method to set_min_allowed_local_threshold.
src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc Implements monotonic/idempotent replication semantics for the new floor command.
src/v/cloud_topics/level_zero/notifier/level_zero_notifier.h New per-shard service to replicate L1-derived floors to the owning partition’s ctp_stm.
src/v/cloud_topics/level_zero/notifier/level_zero_notifier.cc Implements cross-shard lookup + retrying replication to ctp_stm_api.
src/v/cloud_topics/level_zero/notifier/tests/level_zero_notifier_test.cc New unit tests for retry/leader vs follower behavior.
src/v/cloud_topics/level_zero/notifier/tests/BUILD Bazel target for level_zero_notifier unit tests.
src/v/cloud_topics/level_zero/notifier/BUILD Bazel library target for level_zero_notifier.
src/v/cloud_topics/level_one/maintenance/worker.h Threads notifier pointer through compaction worker to sinks.
src/v/cloud_topics/level_one/maintenance/worker.cc Passes NTP + notifier into compaction_sink.
src/v/cloud_topics/level_one/maintenance/worker_manager.h Accepts optional notifier shard service and stores it.
src/v/cloud_topics/level_one/maintenance/worker_manager.cc Wires notifier into per-shard worker construction.
src/v/cloud_topics/level_one/maintenance/scheduler.h Extends scheduler constructor signature to accept notifier.
src/v/cloud_topics/level_one/maintenance/scheduler.cc Wires notifier into worker_manager from scheduler.
src/v/cloud_topics/level_one/maintenance/compaction/tests/reducer_test.cc Updates compaction sink construction to pass NTP.
src/v/cloud_topics/level_one/maintenance/compaction/compaction_sink.h Extends sink to carry NTP + notifier and notify floor after finalize.
src/v/cloud_topics/level_one/maintenance/compaction/compaction_sink.cc Computes new floor from cleaned ranges; replicates it (conditionally blocking commit if required).
src/v/cloud_topics/level_one/maintenance/compaction/BUILD Adds notifier dep for compaction code.
src/v/cloud_topics/level_one/maintenance/BUILD Adds notifier dep for maintenance module.
src/v/cloud_topics/BUILD Adds notifier dep for cloud_topics top-level library.
src/v/cloud_topics/app.h Adds sharded l0_notifier service to cloud topics app.
src/v/cloud_topics/app.cc Constructs notifier service and wires it into L1 compaction scheduler.

cfg);

auto max_offset = co_await maybe_adjusted_retention_offset(cfg);
auto max_offset = co_await adjusted_retention_offset(cfg);
Comment on lines +83 to +85
auto last_error = ctp_stm_api_errc::timeout;
for (int attempt = 0; attempt < max_attempts && !_as.abort_requested();
++attempt) {
ss::future<std::expected<void, ctp_stm_api_errc>>
level_zero_notifier::set_min_allowed_local_threshold(
model::ntp ntp, kafka::offset new_floor) {
auto shard = _shard_table->local().shard_for(ntp);
Comment on lines +113 to 118
/// Sets the cached `min_allowed_local_threshold` hint on ctp_stm.
///
/// The reconciler replicates this command after computing the offset
/// from local segment stats and effective local-retention targets.
/// nullopt clears the hint (used when storage.mode != tiered_cloud or
/// when compaction is enabled on the topic).
Comment on lines +133 to 138
/// Set the min allowed local threshold hint.
///
/// This value is produced by the reconciler and indicates the lower bound
/// of kafka offsets that may be retained locally. The STM caches it but
/// does not enforce truncation directly; a separate path applies it as a
/// prefix-truncate target on the raft log.
Comment on lines +82 to 83
/// Replicate a set_min_allowed_local_threshold_cmd. The STM stores the
/// value as a hint for prefix_truncate_below_lro.
Comment on lines +236 to 237
/// Min allowed local threshold hint, produced by the reconciler.
/// Cached state only; truncation is applied elsewhere.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants