Skip to content

[dbsp] Gate streaming exchange with a DevTweak.#6307

Merged
ryzhyk merged 1 commit into
mainfrom
gate_streaming_exchange
May 28, 2026
Merged

[dbsp] Gate streaming exchange with a DevTweak.#6307
ryzhyk merged 1 commit into
mainfrom
gate_streaming_exchange

Conversation

@ryzhyk

@ryzhyk ryzhyk commented May 24, 2026

Copy link
Copy Markdown
Contributor

It looks like the streaming exchange operator may need more tuning. This commit puts it behind a streaming_exchange dev tweak.

Describe Manual Test Plan

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

This change will affect multihost performance.

@ryzhyk ryzhyk requested a review from gz May 24, 2026 16:50
@ryzhyk ryzhyk added the DBSP core Related to the core DBSP library label May 24, 2026
@gz gz enabled auto-merge May 24, 2026 17:11
@gz gz disabled auto-merge May 24, 2026 17:11
} else if Runtime::num_workers() == 1 {
self.dyn_accumulate(factories)
} else if Runtime::with_dev_tweaks(|d| !d.streaming_exchange()) {
self.dyn_shard(factories).dyn_accumulate(factories)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where it takes effect?

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean dev-tweak gate; reverts the multihost-sharded-accumulator path to shard().accumulate() by default while keeping the streaming-exchange path reachable for tuning. Tests in sharded_accumulator.rs opt into the new code path explicitly, so the existing coverage stays meaningful.

Two small nits inline; nothing blocking.

/// Enable streaming exchange.
///
/// `false`
#[serde(skip_serializing_if = "Option::is_none")]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment /// "false" is a little cryptic — other fields use prose like "The default is false." Worth matching that convention so the rendered openapi help isn't just a backticked literal.

if config.dev_tweaks.streaming_exchange() {
info!("dev_tweaks.streaming_exchange enabled");
} else {
info!("dev_tweaks.streaming_exchange disabled");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging both branches at info! means every pipeline start emits a dev_tweaks.streaming_exchange disabled line forever, even though that's the default and uninteresting. Consider debug! for the disabled branch, or log only when explicitly set (config.dev_tweaks.streaming_exchange.is_some()). Soft.

Comment thread crates/feldera-types/src/config/dev_tweaks.rs
Add a dev tweak to disable streaming exchange. This was motivated by a
suspected performance regression, which ended up not being a real issue. I
think it is still helpful to be able to disable this new feature.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the gate_streaming_exchange branch from cb4c167 to 497baef Compare May 26, 2026 15:55
@ryzhyk ryzhyk enabled auto-merge May 26, 2026 15:59

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New commit 497baef4 flips DevTweaks::streaming_exchange() default from false to true (dev_tweaks.rs:351). That contradicts both the PR title ("Gate streaming exchange with a DevTweak") and the PR description ("puts it behind a streaming_exchange dev tweak (disabled by default)"). The commit message still says "streaming exchange dev tweak (disabled by default)".

If the intent of this PR is in fact to enable streaming exchange by default and gate the legacy path, the description, title, commit message and rustdoc need to reflect that — and the rationale ("streaming exchange may need more tuning") no longer holds. If the intent is still to disable by default, the unwrap_or(true) is a bug.

My prior APPROVE was on cb4c1673 where the default was false. I'm not blocking, but please reconcile before merge.

@ryzhyk ryzhyk added this pull request to the merge queue May 26, 2026
@gz gz removed this pull request from the merge queue due to a manual request May 26, 2026
@ryzhyk ryzhyk added this pull request to the merge queue May 28, 2026
Merged via the queue into main with commit ef64f19 May 28, 2026
1 check passed
@ryzhyk ryzhyk deleted the gate_streaming_exchange branch May 28, 2026 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants