Skip to content

Add support for bytewise limiting input connector buffering.#6345

Merged
blp merged 2 commits into
mainfrom
bytewise-buffer-limit
Jun 2, 2026
Merged

Add support for bytewise limiting input connector buffering.#6345
blp merged 2 commits into
mainfrom
bytewise-buffer-limit

Conversation

@blp

@blp blp commented May 29, 2026

Copy link
Copy Markdown
Member

Fixes: #6318

Describe Manual Test Plan

I haven't tested this yet but I'm submitting a draft anyway to get an initial CI run.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

@blp blp self-assigned this May 29, 2026
@blp blp added performance connectors Issues related to the adapters/connectors crate rust Pull requests that update Rust code labels May 29, 2026
buffered_records >= max_queued_records
}

fn is_full_of_bytes(&self) -> bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name of this: "he's so full of bytes!"

/// True if the number of records buffered by the endpoint exceeds
/// its `max_queued_records` config parameter.
pub fn is_full(&self) -> bool {
fn is_full_of_records(&self) -> bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, I think counting records is pointless, a record can be arbitrarily large

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using records relies on the tacit assumption that humans know something about the size of the input records for each table.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worse, it leaves us guessing all the time based on support bundles

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We could reasonably drop max_queued_records except that people actually configure it, so it would break people's pipelines.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A warning would be great for users still using max-queued-records (and nudging them to use the other one instead).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I can log a warning.

(I don't think we elevate log messages beyond the Log pane, which means that users probably won't see it, though, unless @mihaibudiu wants to eventually make the SQL compiler emit such a warning by parsing the connector configuration, or the web-console starts elevating log warnings/errors.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding this:

        if let Some(max_queued_records) = resolved_connector_config.max_queued_records
            && !resolved_connector_config.max_queued_bytes.is_none()
        {
            let max_queued_bytes = resolved_connector_config.max_queued_bytes();
            warn!(
                "Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}.  We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)."
            )
        }

@blp blp force-pushed the bytewise-buffer-limit branch from 5b2b99c to 596d18e Compare May 29, 2026 17:39
@ryzhyk

ryzhyk commented May 29, 2026

Copy link
Copy Markdown
Contributor

This is simpler than I expected! We should revise all connectors to make sure they accurately report completed bytes.

@blp

blp commented May 29, 2026

Copy link
Copy Markdown
Member Author

This is simpler than I expected!

I was surprised too.

We should revise all connectors to make sure they accurately report completed bytes.

That's a good point.

@blp blp force-pushed the bytewise-buffer-limit branch from ac4f2fb to 7f8eee5 Compare June 1, 2026 16:01

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft-level comments — high-level only. Shape is right: BufferSize struct, is_full = records OR bytes, default tied to max_queued_records * 1000.

Two design-level things worth deciding before this leaves draft:

  1. Silent behavior change for existing pipelines on upgrade. Default max_queued_bytes = max_queued_records.saturating_mul(1000) means every pipeline running today with no explicit byte cap inherits a 1 GB ceiling. For workloads with >1 KB/record averages (Avro/Protobuf payloads, large JSON, wide CDC rows from a few-hundred-column table) the byte threshold will trip first and pause earlier than max_queued_records did before this PR. That's a real semantic change on upgrade, not just an additive feature. Two cleaner shapes:

    • Treat None as "no byte limit" (opt-in only); docs can still recommend setting it. My preference — defaults shouldn't change throughput for anyone.
    • Or keep the default but call it out in the changelog as a behavior change, not just "New max_queued_bytes setting." Operators tuning around max_queued_records will want to know.
  2. Output endpoints diverge from input. is_full on output keys on max_queued_records only; this PR documents that explicitly, but it means max_queued_bytes is silently a no-op on output. Either reject it at config-validation time on output connectors ("not yet implemented") or at minimum log a warning on pipeline start. A silently-ignored memory-bound knob is exactly the thing that bites at 2 AM.

Smaller stuff (skip if you'd rather batch post-draft):

  • crates/feldera-types/src/config.rs (and the inherited max_queued_records doc on the same struct): "received from the input transport but haven't yet been consumed by the circuit since the circuit, since the circuit is still busy processing previous inputs" — duplicated clause, copy-paste from the original. Worth fixing the records doc too while you're in here.
  • docs/operations/memory.md: "The defaults of 1,000,000 records and 1,000,000,000 bytes limits the memory used by a connector to 1 GB" — should be "limit." Also the byte default = 1e9 only when both stay at default; if someone tunes records, the byte default tracks it. Doc doesn't quite say that.
  • Nested fn crossed inside the closure is fine, but an inline (old.records < r && old.records + incr >= r) || (...) lets the reader see both arms in one shape. Style call.
  • Tests exercise both thresholds independently and the saturation case. Good.

Full diff pass when it's marked ready.

Code by me, tests by claude.  Also ran a TPC-H run with byte limits to see
whether it works in practice.

Fixes: #6318

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp force-pushed the bytewise-buffer-limit branch from 474c716 to 3a9c9c4 Compare June 2, 2026 17:30
@blp blp marked this pull request as ready for review June 2, 2026 17:30
@blp blp enabled auto-merge June 2, 2026 17:30
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@blp blp added this pull request to the merge queue Jun 2, 2026

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two real issues plus one carried-over typo. Otherwise nice extension of the backpressure machinery with thorough unit tests.

See inline.

{
let max_queued_bytes = resolved_connector_config.max_queued_bytes();
warn!(
"Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)."

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inverted condition. The message says "configures max_queued_records ... but not max_queued_bytes, which will default to ..." — i.e. it is meant to fire when the user set records but left bytes implicit. But the guard is max_queued_bytes.is_some(), so it actually fires only when both are set (and never produces a defaulted value), and stays silent in the case the message was written for. Either flip to max_queued_bytes.is_none() (and then the printed max_queued_bytes is genuinely the implied default), or rewrite the message to match the current condition.

Also: the message ends with (max_queued_records is less useful and may be omitted). That advice contradicts the changelog entry, which tells users to keep max_queued_records and tune max_queued_bytes. Pick one story.

///
/// For input endpoints, this setting bounds the number of bytes that have
/// been received from the input transport but haven't yet been consumed by
/// the circuit since the circuit, since the circuit is still busy processing

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Carried-over typo: "the circuit since the circuit, since the circuit is still busy...". The original max_queued_records doc got this fixed in this same PR (line 1608 now reads cleanly); the new max_queued_bytes doc was copy-pasted from the broken version. Drop the first "since the circuit".

/// which more data may be queued.
///
/// When this is unspecified, it defaults to `1000 * max_queued_records`.
#[serde(skip_serializing_if = "Option::is_none")]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rustdoc nit: 1000 * max_queued_records is true for the default case, but if the user sets max_queued_records explicitly and leaves max_queued_bytes unset, the implied byte cap is 1000 * <user value>, which can be a very different number from 1 GB and is easy to under-estimate on upgrade. Worth saying explicitly here (the changelog covers it; the field doc should too, since this is what most config readers will look at).

fn crossed(old: usize, increment: usize, threshold: usize) -> bool {
old < threshold && old + increment >= threshold
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soft: max_queued_records() as usize (and the bytes counterpart) silently truncate on 32-bit targets. Feldera doesn't run there today, but if someone passes u64::MAX for "unlimited" on a future 32-bit build, this collapses to a small number and re-engages backpressure. A try_into().unwrap_or(usize::MAX) (or just keeping the comparison in u64) is cheap insurance. Same for old.records + increment — if a misconfigured limit lives near usize::MAX, the addition wraps. saturating_add would do.

let max_queued_records = self.config.connector_config.max_queued_records;
let max_queued_records = self.config.connector_config.max_queued_records();
buffered_records >= max_queued_records
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming: is_full_of_records / is_full_of_bytes reads a little odd — is_records_full / is_bytes_full, or records_at_threshold / bytes_at_threshold, is closer to what these actually check (they fire at >=, not when the buffer is literally full). Non-blocking.

fn is_full_bytes_triggers_independently_of_records() {
let endpoint = make_endpoint(1_000_000, Some(100));
endpoint
.metrics

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_queued_bytes defaults to 1000 * max_queued_records — fine. The boundary worth pinning explicitly: max_queued_records = u64::MAX (or any value > u64::MAX / 1000) saturates, which max_queued_bytes_default_saturates does cover for max_queued_bytes. Worth adding a one-liner that confirms max_queued_records() itself does NOT saturate so the two getters' semantics are documented side-by-side.

},
&unparker2,
);
assert!(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These threading tests rely on recv_timeout(100ms) for the positive case and recv_timeout(50ms) for the negative case. On a loaded CI runner the 50ms negative wait can occasionally be too short to be sure the second batch wouldn't eventually unpark, and 100ms can flake under heavy contention. Consider using parker.park_timeout() directly or, at minimum, bumping the negative wait to ~200ms.

Also: backpressure_unparked_only_on_threshold_crossing doesn't cover the symmetric case for bytes (cross bytes once, then add more bytes — should not re-unpark). Cheap to add.

- Calling `/start` on a pipeline that already failed to compile will directly return an error instead of
the runner later on setting the `deployment_error` during its check whether to proceed to provisioning.

- New `max_queued_bytes` setting for input connectors. The default is 1,000 times

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth calling out more sharply that this is a behavior change on upgrade: any pipeline whose average record size is well under 1 kB is unaffected, but Avro/Protobuf/wide-CDC records easily exceed 1 kB and those pipelines will hit the new byte cap earlier than the old record cap and pause sooner. Recommend either (a) explicit "if your records average > 1 kB, expect earlier pauses; mitigation: raise max_queued_bytes", or (b) flip to opt-in (None = no byte limit) for one release.

[`max_queued_records`] and [`max_queued_bytes`] properties. These
should be large enough to hide the latency of communication, but
small enough to avoid wasting memory. The defaults of 1,000,000
records and 1,000,000,000 bytes limit the memory used by a connector

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: "The defaults of 1,000,000 records and 1,000,000,000 bytes limit the memory used by a connector to 1 GB." The two limits are connected (bytes = 1000 * records), so a user reading only this row may not realize that changing max_queued_records silently changes max_queued_bytes too. One sentence — "unless overridden, max_queued_bytes tracks max_queued_records" — would prevent surprises.

Merged via the queue into main with commit e749371 Jun 2, 2026
1 check passed
@blp blp deleted the bytewise-buffer-limit branch June 2, 2026 20:58
@mythical-fred

Copy link
Copy Markdown

Post-merge nit — spotted while reviewing a follow-up PR that rebased over this one. Not blocking anything; flagging for a tiny follow-up.

Inverted warning conditioncrates/adapters/src/controller.rs:6272-6279 (current main):

if let Some(max_queued_records) = resolved_connector_config.max_queued_records
    && resolved_connector_config.max_queued_bytes.is_some()
{
    let max_queued_bytes = resolved_connector_config.max_queued_bytes();
    warn!(
        "Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. ..."
    )
}

The guard checks max_queued_bytes.is_some() but the warning says "but not max_queued_bytes". Should be .is_none() so the warning fires when the user sets only max_queued_records and leaves max_queued_bytes at the computed default — which is exactly the case the prose describes.

Doc typocrates/feldera-types/src/config.rs:1629-1631, max_queued_bytes rustdoc has the clause "since the circuit" twice in one sentence:

"...haven't yet been consumed by the circuit since the circuit, since the circuit is still busy processing previous inputs."

Drop one occurrence; the surrounding max_queued_records doc reads cleanly with a single "since the circuit is still busy processing previous inputs."

Both should be quick one-liners.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate performance rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Limit input buffers by bytes rather than (or in addition to) records

6 participants