br: Cherry pick from feature restore from replication storage#68601
br: Cherry pick from feature restore from replication storage#68601Leavrth wants to merge 4 commits into
Conversation
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. |
|
Hi @Leavrth. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
💤 Files with no reviewable changes (1)
📝 WalkthroughWalkthroughAdds BatchDownloadLatestMVCC and a support probe; implements retain-latest-MVCC SST download and routing in SnapFileImporter; refactors SST restore into collect+restore phases; adds two‑phase stream restore and replication resume-state; introduces tagged backupmeta filename flags and makes metadata reads concurrency-safe; updates tests and build pins. ChangesLatest MVCC SST Download and Restoration
Two-Phase Stream Restore with Checkpoint Preservation
Metadata filename flags and cache concurrency
Utility refactors and build updates
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.2)Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
br/pkg/restore/log_client/client.go (1)
337-338: ⚡ Quick winAdd contextual wrapping when SST collection fails.
Returning raw iterator/rewrite errors here makes restore failures harder to triage (which table/subcompaction failed). Please annotate with table context before returning.
As per coding guidelines: Go code: Keep error handling actionable and contextual; avoid silently swallowing errors.Suggested diff
@@ - if r.Err != nil { - return nil, 0, r.Err + if r.Err != nil { + return nil, 0, errors.Annotate(r.Err, "failed while iterating compacted SSTs") } @@ - newRules, err := rc.rewriteRulesFor(i, rewriteRules) + newRules, err := rc.rewriteRulesFor(i, rewriteRules) if err != nil { - return nil, 0, err + return nil, 0, errors.Annotatef(err, "failed to prepare rewrite rules for table %d", i.TableID()) }Also applies to: 351-353
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/restore/log_client/client.go` around lines 337 - 338, Wrap the raw iterator/rewrite error (r.Err) at the return sites so the error includes table and subcompaction context before returning; replace returns like "return nil, 0, r.Err" with a wrapped error using fmt.Errorf (or errors.Wrapf) that adds identifiers present in scope (e.g. tableID/tableName, cfName, subCompactionID or range/start/end) such as fmt.Errorf("collect SSTs failed for table %s cf %s subcompaction %d: %w", tableName, cfName, subCompactionID, r.Err); do the same for the other return site(s) that currently return r.Err (lines around the second occurrence).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@br/pkg/task/restore.go`:
- Around line 535-553: The validation and normalization of RestorePhase,
ReplicationStatusSubPrefix and their derived booleans (RestoreInPhase,
FromReplicationStorage) are currently only performed in ParseFromFlags; move
that logic into the config's Adjust() method so non-CLI code paths also enforce
the invariant and normalize values. Specifically, perform: (1) compute
RestoreInPhase = flags-equivalent check (i.e. whether RestorePhase != 0 or
ReplicationStatusSubPrefix non-empty) inside Adjust(), (2) validate RestorePhase
is 1 or 2 when RestoreInPhase is true, (3) enforce the checkpoint requirement
(UseCheckpoint) when RestoreInPhase is true, and (4) set FromReplicationStorage
based on ReplicationStatusSubPrefix presence inside Adjust(); remove or keep
only minimal flag parsing in ParseFromFlags so it only reads raw values and
defers normalization/validation to Adjust().
- Around line 1084-1093: The code currently treats a failed
restoreRegistry.PauseTask as non-fatal: it logs a warning then unconditionally
logs that phase 1 finished and returns nil; change this so that inside the
IsStreamRestore && cfg.RestorePhase == 1 branch, if cfg.RestoreID != 0 and
restoreRegistry.PauseTask(c, cfg.RestoreID) returns an error you return that
error (or wrap it) instead of continuing; only emit the success
log.Info("restore phase 1 finished...") and return nil when PauseTask succeeded
(or when cfg.RestoreID == 0), and remove the misleading success log when
PauseTask failed. Ensure the change touches the PauseTask call, the surrounding
if block (cfg.RestoreID check), and the final return path.
In `@br/pkg/task/stream.go`:
- Around line 1738-1745: The async DML-check using
hasDMLFilesResultCh/hasAnyLogFiles must be made synchronous and performed before
calling RestoreSSTFileSets when cfg.RetainLatestMVCCVersion is true so we
fail-fast on DML-bearing inputs; replace the goroutine+buffered channel pattern
that defers waiting with a direct call to hasAnyLogFiles(ctx, logFilesIter),
handle its error immediately, and if hasFiles is true return the appropriate
error (or abort) before invoking RestoreSSTFileSets (references:
hasDMLFilesResultCh, hasAnyLogFiles, RestoreSSTFileSets,
cfg.RetainLatestMVCCVersion, logFilesIter).
In `@go.mod`:
- Around line 349-350: The go.mod currently replaces github.com/pingcap/kvproto
with a fork github.com/leavrth/kvproto at v0.0.0-20260505121738-8ce467678bf2
which is a fork-only commit; for a release either remove that replace and
restore usage of github.com/pingcap/kvproto (undo the replace line) or, if the
forked changes are absolutely required, keep the replace but add a clear
justification and remediation steps: add a short comment above the replace in
go.mod referencing a tracking issue/PR ID, document the reason and
compatibility/interop implications in RELEASE_NOTES (or
SECURITY/THIRD-PARTY.md), open an upstreaming PR to pingcap/kvproto and include
a TODO in go.mod to remove the replace once upstreamed (and state the target
release version). Ensure the unique symbol to change is the replace directive
for github.com/pingcap/kvproto => github.com/leavrth/kvproto
v0.0.0-20260505121738-8ce467678bf2.
---
Nitpick comments:
In `@br/pkg/restore/log_client/client.go`:
- Around line 337-338: Wrap the raw iterator/rewrite error (r.Err) at the return
sites so the error includes table and subcompaction context before returning;
replace returns like "return nil, 0, r.Err" with a wrapped error using
fmt.Errorf (or errors.Wrapf) that adds identifiers present in scope (e.g.
tableID/tableName, cfName, subCompactionID or range/start/end) such as
fmt.Errorf("collect SSTs failed for table %s cf %s subcompaction %d: %w",
tableName, cfName, subCompactionID, r.Err); do the same for the other return
site(s) that currently return r.Err (lines around the second occurrence).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: ccfab626-c6f1-47fc-b0bd-f7640f6b0a59
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (11)
br/pkg/restore/internal/import_client/import_client.gobr/pkg/restore/log_client/client.gobr/pkg/restore/log_client/client_test.gobr/pkg/restore/snap_client/client.gobr/pkg/restore/snap_client/export_test.gobr/pkg/restore/snap_client/import.gobr/pkg/task/BUILD.bazelbr/pkg/task/config_test.gobr/pkg/task/restore.gobr/pkg/task/stream.gogo.mod
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release-8.5 #68601 +/- ##
================================================
Coverage ? 55.5705%
================================================
Files ? 1823
Lines ? 656274
Branches ? 0
================================================
Hits ? 364695
Misses ? 264475
Partials ? 27104
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
e525781 to
d455458
Compare
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
br/pkg/task/stream.go (1)
2083-2089:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAdd a doc comment for
GetStatusFileName.This new exported helper is missing a Go doc comment.
As per coding guidelines, "Go code: Keep exported-symbol doc comments, and prefer semantic constraints over name restatement".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/task/stream.go` around lines 2083 - 2089, Add a Go doc comment for the exported function GetStatusFileName that explains what the function returns and under what conditions it can error: mention that it normalizes the provided storage sub-directory using normalizeStorageSubDir and returns the path to the status file "resume-state.json" within that normalized subdirectory, and that an error is returned if normalization fails; prefer a semantic description rather than repeating the function name.
♻️ Duplicate comments (3)
br/pkg/task/stream.go (1)
1738-1745:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBlock on the DML probe before restoring SSTs.
This still applies SST data before waiting on
hasDMLFilesResultCh, so--retain-latest-mvcc-versioncan fail only after a partial restore has already happened. The DML-file check needs to complete beforeRestoreSSTFileSets.Also applies to: 1764-1785
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/task/stream.go` around lines 1738 - 1745, The DML probe launched into hasDMLFilesResultCh must be awaited before applying SSTs: when cfg.RetainLatestMVCCVersion is true, block on reading from hasDMLFilesResultCh (the value produced by hasAnyLogFiles via hasLogFilesResult) and handle its error/result before calling RestoreSSTFileSets; do the same for the other occurrence that currently applies SST data (the second block using hasDMLFilesResultCh around RestoreSSTFileSets) so SST restore is skipped or aborted if the probe reports an error or indicates DML files exist.br/pkg/task/restore.go (2)
1084-1093:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMake phase-1 registry pause failure fatal.
If
PauseTaskfails here, the function still logs phase-1 success and returnsnil, which leaves checkpoint data behind without a paused restore registration for the follow-up phase.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/task/restore.go` around lines 1084 - 1093, The current block treats PauseTask failure as non-fatal: in IsStreamRestore + cfg.RestorePhase == 1 path, if restoreRegistry.PauseTask(c, cfg.RestoreID) returns an error the code only logs a warning and then logs phase-1 success and returns nil; change this so that when restoreRegistry.PauseTask(...) fails you log the error (use log.Error or similar) and return that error (or a wrapped error) immediately instead of proceeding to log success and return nil; ensure the success log ("restore phase 1 finished...") only runs when PauseTask succeeds and reference the same cfg.RestoreID and restoreRegistry.PauseTask symbols when modifying control flow.
535-552:⚠️ Potential issue | 🟠 Major | ⚡ Quick winNormalize phase/replication restore state in
Adjust(), not only during flag parsing.These derived fields are still set only in
ParseFromFlags, so non-CLI paths that populateRestoreConfigand then callAdjust()can miss replication-storage mode or skip phase validation entirely.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/task/restore.go` around lines 535 - 552, Move the derived/validation logic for phase and replication-storage out of ParseFromFlags into RestoreConfig.Adjust(): in Adjust(), compute RestoreInPhase = (RestorePhase > 0), validate RestorePhase is either 1 or 2 when RestoreInPhase, enforce that RestoreInPhase requires UseCheckpoint, and set FromReplicationStorage = (len(ReplicationStatusSubPrefix) > 0); keep ParseFromFlags responsible only for reading flags into fields (it can still call cfg.Adjust() at the end) so non-CLI code paths that populate RestoreConfig and call Adjust() get consistent normalization and validation.
🧹 Nitpick comments (1)
br/pkg/stream/stream_metas.go (1)
295-315: ⚡ Quick winAdd doc comments for the exported helpers in this hunk.
TryParseTaggedBackupMetaFileNameWrapper,UpdateShiftTS, andUpdateShiftTSFromMetadataare exported but undocumented, which makes the package surface harder to understand and breaks the Go guideline we follow.As per coding guidelines, "Go code: Keep exported-symbol doc comments, and prefer semantic constraints over name restatement".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@br/pkg/stream/stream_metas.go` around lines 295 - 315, Add concise Go doc comments for the three exported helpers: TryParseTaggedBackupMetaFileNameWrapper, UpdateShiftTS, and UpdateShiftTSFromMetadata. For each function, add a one- or two-sentence comment immediately above the function declaration that explains what the function does, its key inputs/outputs (e.g., filename and metadata for parsing; startTS and restoreTS and the returned shifted timestamp and found flag), and any important semantic behavior (e.g., that TryParse... strips metaSuffix and delegates to backupmetas.TryParseTaggedBackupMetaFileName, that UpdateShiftTS prefers filename parsing and falls back to metadata, and that UpdateShiftTSFromMetadata computes shift from Metadata). Keep comments idiomatic (no restating the name) and follow Go doc style (start with the function name).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@br/pkg/restore/log_client/log_file_manager.go`:
- Around line 179-199: The anonymous callback that calls
stream.TryParseTaggedBackupMetaFileNameWrapper currently returns false on parse
error, which prevents the raw-metadata fallback from running; instead, change
the error branch so that parse failures do not abort the callback (i.e., do not
return false on err) and allow the function to return true so the raw-metadata
handler runs (preserving shiftStartTS behavior); update the block around
stream.TryParseTaggedBackupMetaFileNameWrapper, the err branch, and the
surrounding logic that sets shiftTS (the closure referencing lm.startTS,
lm.restoreTS and shiftTS) to ensure untagged/older .meta files trigger the
fallback path.
In `@br/pkg/stream/stream_mgr.go`:
- Around line 438-440: The call to the callback skipCondition in stream_mgr.go
is not nil-checked and will panic if callers pass nil; update the code that
currently does if skipCondition(path) { return nil } to first guard the callback
(e.g., check skipCondition != nil) before invoking it, or initialize
skipCondition to a no-op default earlier where it's accepted, so that calls to
skipCondition(path) are safe; ensure this change references the skipCondition
variable in the function or method that contains the current check so callers
who pass nil no longer cause a panic.
- Around line 208-214: In decodeCompressedData, avoid passing rawLength directly
into make (which can panic if it's too large); validate rawLength before using
it to set slice capacity in the call to m.decoder.DecodeAll: ensure rawLength is
within a safe bound and convertible to int (e.g., check rawLength <= math.MaxInt
and <= a reasonable maximum like 1<<30), fallback to 0 or a safe default when
invalid, then convert to int and call m.decoder.DecodeAll(data, make([]byte, 0,
int(validatedRawLength))). This prevents a "cap out of range" panic while
keeping the use of m.decoder.DecodeAll consistent.
In `@br/pkg/stream/stream_misc_test.go`:
- Around line 26-39: The ReadFile implementation on gatedReadStorage currently
recurses into itself; replace the final recursive call with a call to the
underlying storage by delegating to the embedded ExternalStorage (e.g., call
s.ExternalStorage.ReadFile(ctx, name)), preserving the active counter defer and
the readGate wait behavior; ensure you reference gatedReadStorage.ReadFile, the
embedded ExternalStorage, and fields active, maxActive and readGate when making
the change.
In `@br/pkg/utils/iter/combinator_types.go`:
- Around line 48-52: The code is double-draining the m.outstanding channel
causing possible deadlock: remove or guard the second blocking receive so
outstanding is only drained once when an upstream error occurs. In TryNext (the
method calling m.inner.TryNext(ctx)), after receiving r := m.inner.TryNext(ctx)
and before sending the error result, drain outstanding exactly once; then when
you receive back from the mapper result avoid doing a second <-m.outstanding if
r.Err != nil (either skip the receive when r.Err != nil or replace it with a
non-blocking select to drain only if available). Update the logic around
m.inner.TryNext, m.cancel, m.finished and uses of <-m.outstanding so the channel
is drained in one place and the second drain is conditional/non-blocking to
prevent blocking forever.
---
Outside diff comments:
In `@br/pkg/task/stream.go`:
- Around line 2083-2089: Add a Go doc comment for the exported function
GetStatusFileName that explains what the function returns and under what
conditions it can error: mention that it normalizes the provided storage
sub-directory using normalizeStorageSubDir and returns the path to the status
file "resume-state.json" within that normalized subdirectory, and that an error
is returned if normalization fails; prefer a semantic description rather than
repeating the function name.
---
Duplicate comments:
In `@br/pkg/task/restore.go`:
- Around line 1084-1093: The current block treats PauseTask failure as
non-fatal: in IsStreamRestore + cfg.RestorePhase == 1 path, if
restoreRegistry.PauseTask(c, cfg.RestoreID) returns an error the code only logs
a warning and then logs phase-1 success and returns nil; change this so that
when restoreRegistry.PauseTask(...) fails you log the error (use log.Error or
similar) and return that error (or a wrapped error) immediately instead of
proceeding to log success and return nil; ensure the success log ("restore phase
1 finished...") only runs when PauseTask succeeds and reference the same
cfg.RestoreID and restoreRegistry.PauseTask symbols when modifying control flow.
- Around line 535-552: Move the derived/validation logic for phase and
replication-storage out of ParseFromFlags into RestoreConfig.Adjust(): in
Adjust(), compute RestoreInPhase = (RestorePhase > 0), validate RestorePhase is
either 1 or 2 when RestoreInPhase, enforce that RestoreInPhase requires
UseCheckpoint, and set FromReplicationStorage = (len(ReplicationStatusSubPrefix)
> 0); keep ParseFromFlags responsible only for reading flags into fields (it can
still call cfg.Adjust() at the end) so non-CLI code paths that populate
RestoreConfig and call Adjust() get consistent normalization and validation.
In `@br/pkg/task/stream.go`:
- Around line 1738-1745: The DML probe launched into hasDMLFilesResultCh must be
awaited before applying SSTs: when cfg.RetainLatestMVCCVersion is true, block on
reading from hasDMLFilesResultCh (the value produced by hasAnyLogFiles via
hasLogFilesResult) and handle its error/result before calling
RestoreSSTFileSets; do the same for the other occurrence that currently applies
SST data (the second block using hasDMLFilesResultCh around RestoreSSTFileSets)
so SST restore is skipped or aborted if the probe reports an error or indicates
DML files exist.
---
Nitpick comments:
In `@br/pkg/stream/stream_metas.go`:
- Around line 295-315: Add concise Go doc comments for the three exported
helpers: TryParseTaggedBackupMetaFileNameWrapper, UpdateShiftTS, and
UpdateShiftTSFromMetadata. For each function, add a one- or two-sentence comment
immediately above the function declaration that explains what the function does,
its key inputs/outputs (e.g., filename and metadata for parsing; startTS and
restoreTS and the returned shifted timestamp and found flag), and any important
semantic behavior (e.g., that TryParse... strips metaSuffix and delegates to
backupmetas.TryParseTaggedBackupMetaFileName, that UpdateShiftTS prefers
filename parsing and falls back to metadata, and that UpdateShiftTSFromMetadata
computes shift from Metadata). Keep comments idiomatic (no restating the name)
and follow Go doc style (start with the function name).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 13a391dd-c793-4c23-a55b-fb6b3f9ac059
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (23)
DEPS.bzlbr/pkg/restore/internal/import_client/import_client.gobr/pkg/restore/log_client/BUILD.bazelbr/pkg/restore/log_client/client.gobr/pkg/restore/log_client/client_test.gobr/pkg/restore/log_client/export_test.gobr/pkg/restore/log_client/log_file_manager.gobr/pkg/restore/log_client/log_file_manager_test.gobr/pkg/restore/snap_client/client.gobr/pkg/restore/snap_client/export_test.gobr/pkg/restore/snap_client/import.gobr/pkg/stream/backupmetas/parser.gobr/pkg/stream/stream_metas.gobr/pkg/stream/stream_metas_test.gobr/pkg/stream/stream_mgr.gobr/pkg/stream/stream_mgr_fuzz_test.gobr/pkg/stream/stream_misc_test.gobr/pkg/task/restore.gobr/pkg/task/stream.gobr/pkg/utils/iter/BUILD.bazelbr/pkg/utils/iter/combinator_test.gobr/pkg/utils/iter/combinator_types.gogo.mod
💤 Files with no reviewable changes (1)
- br/pkg/utils/iter/BUILD.bazel
✅ Files skipped from review due to trivial changes (1)
- br/pkg/restore/log_client/BUILD.bazel
|
/retest |
|
@Leavrth: PRs from untrusted users cannot be marked as trusted with DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
|
@Leavrth: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
Refactor
Tests