From 776a7d0c25a862cdbf2595a44b5c1e956cacd436 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Mon, 8 Jun 2026 17:16:51 -0700 Subject: [PATCH] [types] Revert change of `ConnectorConfig::max_queued_records` to `Option`. Commit 753cdd81716d ("Add support for bytewise limiting input connector buffering.") changed `ConnectorConfig::max_queued_records` from u64 to `Option`. This should have been a helpful change because it allows the user to configure only the new `max_queued_bytes` setting, since bytes are more meaningful than records for the purpose of limiting the amount of input buffering, but it caused surprising behavior because a configuration that did not include `max_queued_records` was now interpreted differently from before, which triggered bootstrapping at startup. The best fix for the bootstrapping problem would be for the code that computes differences between pipelines to be updated so that it doesn't treat `max_queued_records` (and some other fields) as significant for the purpose of bootstrapping. That preferred fix is filed as https://github.com/feldera/feldera/issues/6427. However, that change would take some consideration, so instead this commit just falls back to the previous definition of `max_queued_records`. This commit may be reverted once https://github.com/feldera/feldera/issues/6427 is fixed. Signed-off-by: Ben Pfaff --- crates/adapters/src/controller.rs | 9 --------- crates/feldera-types/src/config.rs | 13 +++++++------ crates/pipeline-manager/src/db/types/program.rs | 2 +- openapi.json | 1 - 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index b55cbbc8743..8e26b4895ec 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -6269,15 +6269,6 @@ impl ControllerInner { ) .map_err(|e| ControllerError::pipeline_config_parse_error(&e))?; - if let Some(max_queued_records) = resolved_connector_config.max_queued_records - && resolved_connector_config.max_queued_bytes.is_some() - { - let max_queued_bytes = resolved_connector_config.max_queued_bytes(); - warn!( - "Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)." - ) - } - // Create preprocessor if specified by the configuration let mut preprocessor = None; let mut streaming_preprocessor = false; diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 7b253dc93d7..264db56be07 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -44,7 +44,9 @@ pub use dev_tweaks::DevTweaks; const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10; /// Default value of `ConnectorConfig::max_queued_records`. -pub const DEFAULT_MAX_QUEUED_RECORDS: u64 = 1_000_000; +pub const fn default_max_queued_records() -> u64 { + 1_000_000 +} pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000; @@ -1616,8 +1618,8 @@ pub struct ConnectorConfig { /// which more data may be queued. /// /// The default is 1 million. - #[serde(skip_serializing_if = "Option::is_none")] - pub max_queued_records: Option, + #[serde(default = "default_max_queued_records")] + pub max_queued_records: u64, /// Backpressure threshold, in bytes. /// @@ -1673,7 +1675,7 @@ impl ConnectorConfig { output_buffer_config: Default::default(), max_batch_size: None, max_worker_batch_size: None, - max_queued_records: None, + max_queued_records: default_max_queued_records(), max_queued_bytes: None, paused: false, labels: Vec::new(), @@ -1687,7 +1689,7 @@ impl ConnectorConfig { } pub fn with_max_queued_records(mut self, max_queued_records: u64) -> Self { - self.max_queued_records = Some(max_queued_records); + self.max_queued_records = max_queued_records; self } @@ -1705,7 +1707,6 @@ impl ConnectorConfig { /// Returns `max_queued_records` or, if it is not set, the default. pub fn max_queued_records(&self) -> u64 { self.max_queued_records - .unwrap_or(DEFAULT_MAX_QUEUED_RECORDS) } /// Returns `max_queued_bytes` or, if it is not set, the default based on diff --git a/crates/pipeline-manager/src/db/types/program.rs b/crates/pipeline-manager/src/db/types/program.rs index 4b409f2cf39..8dc96c7f9e3 100644 --- a/crates/pipeline-manager/src/db/types/program.rs +++ b/crates/pipeline-manager/src/db/types/program.rs @@ -881,7 +881,7 @@ mod tests { output_buffer_config: Default::default(), max_batch_size: Some(0), max_worker_batch_size: None, - max_queued_records: Some(0), + max_queued_records: 0, max_queued_bytes: None, paused: false, labels: vec![], diff --git a/openapi.json b/openapi.json index 7b216160552..d20070efb7e 100644 --- a/openapi.json +++ b/openapi.json @@ -7966,7 +7966,6 @@ "type": "integer", "format": "int64", "description": "Backpressure threshold, in records.\n\nMaximal number of records queued by the endpoint before the endpoint\nis paused by the backpressure mechanism.\n\nFor input endpoints, this setting bounds the number of records that have\nbeen received from the input transport but haven't yet been consumed by\nthe circuit, since the circuit is still busy processing previous inputs.\n\nFor output endpoints, this setting bounds the number of records that have\nbeen produced by the circuit but not yet sent via the output transport endpoint\nnor stored in the output buffer (see `enable_output_buffer`).\n\nNote that this is not a hard bound: there can be a small delay between\nthe backpressure mechanism is triggered and the endpoint is paused, during\nwhich more data may be queued.\n\nThe default is 1 million.", - "nullable": true, "minimum": 0 }, "max_worker_batch_size": {