Add support for bytewise limiting input connector buffering.#6345
Conversation
| buffered_records >= max_queued_records | ||
| } | ||
|
|
||
| fn is_full_of_bytes(&self) -> bool { |
There was a problem hiding this comment.
I like the name of this: "he's so full of bytes!"
| /// True if the number of records buffered by the endpoint exceeds | ||
| /// its `max_queued_records` config parameter. | ||
| pub fn is_full(&self) -> bool { | ||
| fn is_full_of_records(&self) -> bool { |
There was a problem hiding this comment.
indeed, I think counting records is pointless, a record can be arbitrarily large
There was a problem hiding this comment.
Using records relies on the tacit assumption that humans know something about the size of the input records for each table.
There was a problem hiding this comment.
Worse, it leaves us guessing all the time based on support bundles
There was a problem hiding this comment.
Yes. We could reasonably drop max_queued_records except that people actually configure it, so it would break people's pipelines.
There was a problem hiding this comment.
A warning would be great for users still using max-queued-records (and nudging them to use the other one instead).
There was a problem hiding this comment.
That's a good idea. I can log a warning.
(I don't think we elevate log messages beyond the Log pane, which means that users probably won't see it, though, unless @mihaibudiu wants to eventually make the SQL compiler emit such a warning by parsing the connector configuration, or the web-console starts elevating log warnings/errors.)
There was a problem hiding this comment.
I'm adding this:
if let Some(max_queued_records) = resolved_connector_config.max_queued_records
&& !resolved_connector_config.max_queued_bytes.is_none()
{
let max_queued_bytes = resolved_connector_config.max_queued_bytes();
warn!(
"Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)."
)
}5b2b99c to
596d18e
Compare
|
This is simpler than I expected! We should revise all connectors to make sure they accurately report completed bytes. |
I was surprised too.
That's a good point. |
ac4f2fb to
7f8eee5
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Draft-level comments — high-level only. Shape is right: BufferSize struct, is_full = records OR bytes, default tied to max_queued_records * 1000.
Two design-level things worth deciding before this leaves draft:
-
Silent behavior change for existing pipelines on upgrade. Default
max_queued_bytes = max_queued_records.saturating_mul(1000)means every pipeline running today with no explicit byte cap inherits a 1 GB ceiling. For workloads with>1 KB/recordaverages (Avro/Protobuf payloads, large JSON, wide CDC rows from a few-hundred-column table) the byte threshold will trip first and pause earlier thanmax_queued_recordsdid before this PR. That's a real semantic change on upgrade, not just an additive feature. Two cleaner shapes:- Treat
Noneas "no byte limit" (opt-in only); docs can still recommend setting it. My preference — defaults shouldn't change throughput for anyone. - Or keep the default but call it out in the changelog as a behavior change, not just "New
max_queued_bytessetting." Operators tuning aroundmax_queued_recordswill want to know.
- Treat
-
Output endpoints diverge from input.
is_fullon output keys onmax_queued_recordsonly; this PR documents that explicitly, but it meansmax_queued_bytesis silently a no-op on output. Either reject it at config-validation time on output connectors ("not yet implemented") or at minimum log a warning on pipeline start. A silently-ignored memory-bound knob is exactly the thing that bites at 2 AM.
Smaller stuff (skip if you'd rather batch post-draft):
crates/feldera-types/src/config.rs(and the inheritedmax_queued_recordsdoc on the same struct): "received from the input transport but haven't yet been consumed by the circuit since the circuit, since the circuit is still busy processing previous inputs" — duplicated clause, copy-paste from the original. Worth fixing the records doc too while you're in here.docs/operations/memory.md: "The defaults of 1,000,000 records and 1,000,000,000 bytes limits the memory used by a connector to 1 GB" — should be "limit." Also the byte default =1e9only when both stay at default; if someone tunes records, the byte default tracks it. Doc doesn't quite say that.- Nested
fn crossedinside the closure is fine, but an inline(old.records < r && old.records + incr >= r) || (...)lets the reader see both arms in one shape. Style call. - Tests exercise both thresholds independently and the saturation case. Good.
Full diff pass when it's marked ready.
Code by me, tests by claude. Also ran a TPC-H run with byte limits to see whether it works in practice. Fixes: #6318 Signed-off-by: Ben Pfaff <blp@feldera.com>
474c716 to
3a9c9c4
Compare
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Two real issues plus one carried-over typo. Otherwise nice extension of the backpressure machinery with thorough unit tests.
See inline.
| { | ||
| let max_queued_bytes = resolved_connector_config.max_queued_bytes(); | ||
| warn!( | ||
| "Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)." |
There was a problem hiding this comment.
Inverted condition. The message says "configures max_queued_records ... but not max_queued_bytes, which will default to ..." — i.e. it is meant to fire when the user set records but left bytes implicit. But the guard is max_queued_bytes.is_some(), so it actually fires only when both are set (and never produces a defaulted value), and stays silent in the case the message was written for. Either flip to max_queued_bytes.is_none() (and then the printed max_queued_bytes is genuinely the implied default), or rewrite the message to match the current condition.
Also: the message ends with (max_queued_records is less useful and may be omitted). That advice contradicts the changelog entry, which tells users to keep max_queued_records and tune max_queued_bytes. Pick one story.
| /// | ||
| /// For input endpoints, this setting bounds the number of bytes that have | ||
| /// been received from the input transport but haven't yet been consumed by | ||
| /// the circuit since the circuit, since the circuit is still busy processing |
There was a problem hiding this comment.
Carried-over typo: "the circuit since the circuit, since the circuit is still busy...". The original max_queued_records doc got this fixed in this same PR (line 1608 now reads cleanly); the new max_queued_bytes doc was copy-pasted from the broken version. Drop the first "since the circuit".
| /// which more data may be queued. | ||
| /// | ||
| /// When this is unspecified, it defaults to `1000 * max_queued_records`. | ||
| #[serde(skip_serializing_if = "Option::is_none")] |
There was a problem hiding this comment.
Rustdoc nit: 1000 * max_queued_records is true for the default case, but if the user sets max_queued_records explicitly and leaves max_queued_bytes unset, the implied byte cap is 1000 * <user value>, which can be a very different number from 1 GB and is easy to under-estimate on upgrade. Worth saying explicitly here (the changelog covers it; the field doc should too, since this is what most config readers will look at).
| fn crossed(old: usize, increment: usize, threshold: usize) -> bool { | ||
| old < threshold && old + increment >= threshold | ||
| } | ||
|
|
There was a problem hiding this comment.
Soft: max_queued_records() as usize (and the bytes counterpart) silently truncate on 32-bit targets. Feldera doesn't run there today, but if someone passes u64::MAX for "unlimited" on a future 32-bit build, this collapses to a small number and re-engages backpressure. A try_into().unwrap_or(usize::MAX) (or just keeping the comparison in u64) is cheap insurance. Same for old.records + increment — if a misconfigured limit lives near usize::MAX, the addition wraps. saturating_add would do.
| let max_queued_records = self.config.connector_config.max_queued_records; | ||
| let max_queued_records = self.config.connector_config.max_queued_records(); | ||
| buffered_records >= max_queued_records | ||
| } |
There was a problem hiding this comment.
Naming: is_full_of_records / is_full_of_bytes reads a little odd — is_records_full / is_bytes_full, or records_at_threshold / bytes_at_threshold, is closer to what these actually check (they fire at >=, not when the buffer is literally full). Non-blocking.
| fn is_full_bytes_triggers_independently_of_records() { | ||
| let endpoint = make_endpoint(1_000_000, Some(100)); | ||
| endpoint | ||
| .metrics |
There was a problem hiding this comment.
max_queued_bytes defaults to 1000 * max_queued_records — fine. The boundary worth pinning explicitly: max_queued_records = u64::MAX (or any value > u64::MAX / 1000) saturates, which max_queued_bytes_default_saturates does cover for max_queued_bytes. Worth adding a one-liner that confirms max_queued_records() itself does NOT saturate so the two getters' semantics are documented side-by-side.
| }, | ||
| &unparker2, | ||
| ); | ||
| assert!( |
There was a problem hiding this comment.
These threading tests rely on recv_timeout(100ms) for the positive case and recv_timeout(50ms) for the negative case. On a loaded CI runner the 50ms negative wait can occasionally be too short to be sure the second batch wouldn't eventually unpark, and 100ms can flake under heavy contention. Consider using parker.park_timeout() directly or, at minimum, bumping the negative wait to ~200ms.
Also: backpressure_unparked_only_on_threshold_crossing doesn't cover the symmetric case for bytes (cross bytes once, then add more bytes — should not re-unpark). Cheap to add.
| - Calling `/start` on a pipeline that already failed to compile will directly return an error instead of | ||
| the runner later on setting the `deployment_error` during its check whether to proceed to provisioning. | ||
|
|
||
| - New `max_queued_bytes` setting for input connectors. The default is 1,000 times |
There was a problem hiding this comment.
Worth calling out more sharply that this is a behavior change on upgrade: any pipeline whose average record size is well under 1 kB is unaffected, but Avro/Protobuf/wide-CDC records easily exceed 1 kB and those pipelines will hit the new byte cap earlier than the old record cap and pause sooner. Recommend either (a) explicit "if your records average > 1 kB, expect earlier pauses; mitigation: raise max_queued_bytes", or (b) flip to opt-in (None = no byte limit) for one release.
| [`max_queued_records`] and [`max_queued_bytes`] properties. These | ||
| should be large enough to hide the latency of communication, but | ||
| small enough to avoid wasting memory. The defaults of 1,000,000 | ||
| records and 1,000,000,000 bytes limit the memory used by a connector |
There was a problem hiding this comment.
Minor: "The defaults of 1,000,000 records and 1,000,000,000 bytes limit the memory used by a connector to 1 GB." The two limits are connected (bytes = 1000 * records), so a user reading only this row may not realize that changing max_queued_records silently changes max_queued_bytes too. One sentence — "unless overridden, max_queued_bytes tracks max_queued_records" — would prevent surprises.
|
Post-merge nit — spotted while reviewing a follow-up PR that rebased over this one. Not blocking anything; flagging for a tiny follow-up. Inverted warning condition — if let Some(max_queued_records) = resolved_connector_config.max_queued_records
&& resolved_connector_config.max_queued_bytes.is_some()
{
let max_queued_bytes = resolved_connector_config.max_queued_bytes();
warn!(
"Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. ..."
)
}The guard checks Doc typo —
Drop one occurrence; the surrounding Both should be quick one-liners. |
Fixes: #6318
Describe Manual Test Plan
I haven't tested this yet but I'm submitting a draft anyway to get an initial CI run.
Checklist