Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6269,15 +6269,6 @@ impl ControllerInner {
)
.map_err(|e| ControllerError::pipeline_config_parse_error(&e))?;

if let Some(max_queued_records) = resolved_connector_config.max_queued_records
&& resolved_connector_config.max_queued_bytes.is_some()
{
let max_queued_bytes = resolved_connector_config.max_queued_bytes();
warn!(
"Input endpoint {endpoint_name} configures `max_queued_records` to {max_queued_records}, but not `max_queued_bytes`, which will default to {max_queued_bytes}. We recommend setting `max_queued_bytes` explicitly (`max_queued_records` is less useful and may be omitted)."
)
}

// Create preprocessor if specified by the configuration
let mut preprocessor = None;
let mut streaming_preprocessor = false;
Expand Down
13 changes: 7 additions & 6 deletions crates/feldera-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub use dev_tweaks::DevTweaks;
const DEFAULT_MAX_PARALLEL_CONNECTOR_INIT: u64 = 10;

/// Default value of `ConnectorConfig::max_queued_records`.
pub const DEFAULT_MAX_QUEUED_RECORDS: u64 = 1_000_000;
pub const fn default_max_queued_records() -> u64 {
1_000_000
}

pub const DEFAULT_MAX_WORKER_BATCH_SIZE: u64 = 10_000;

Expand Down Expand Up @@ -1616,8 +1618,8 @@ pub struct ConnectorConfig {
/// which more data may be queued.
///
/// The default is 1 million.
#[serde(skip_serializing_if = "Option::is_none")]
pub max_queued_records: Option<u64>,
#[serde(default = "default_max_queued_records")]
pub max_queued_records: u64,

/// Backpressure threshold, in bytes.
///
Expand Down Expand Up @@ -1673,7 +1675,7 @@ impl ConnectorConfig {
output_buffer_config: Default::default(),
max_batch_size: None,
max_worker_batch_size: None,
max_queued_records: None,
max_queued_records: default_max_queued_records(),
max_queued_bytes: None,
paused: false,
labels: Vec::new(),
Expand All @@ -1687,7 +1689,7 @@ impl ConnectorConfig {
}

pub fn with_max_queued_records(mut self, max_queued_records: u64) -> Self {
self.max_queued_records = Some(max_queued_records);
self.max_queued_records = max_queued_records;
self
}

Expand All @@ -1705,7 +1707,6 @@ impl ConnectorConfig {
/// Returns `max_queued_records` or, if it is not set, the default.
pub fn max_queued_records(&self) -> u64 {
self.max_queued_records
.unwrap_or(DEFAULT_MAX_QUEUED_RECORDS)
}

/// Returns `max_queued_bytes` or, if it is not set, the default based on
Expand Down
2 changes: 1 addition & 1 deletion crates/pipeline-manager/src/db/types/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ mod tests {
output_buffer_config: Default::default(),
max_batch_size: Some(0),
max_worker_batch_size: None,
max_queued_records: Some(0),
max_queued_records: 0,
max_queued_bytes: None,
paused: false,
labels: vec![],
Expand Down
1 change: 0 additions & 1 deletion openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -7966,7 +7966,6 @@
"type": "integer",
"format": "int64",
"description": "Backpressure threshold, in records.\n\nMaximal number of records queued by the endpoint before the endpoint\nis paused by the backpressure mechanism.\n\nFor input endpoints, this setting bounds the number of records that have\nbeen received from the input transport but haven't yet been consumed by\nthe circuit, since the circuit is still busy processing previous inputs.\n\nFor output endpoints, this setting bounds the number of records that have\nbeen produced by the circuit but not yet sent via the output transport endpoint\nnor stored in the output buffer (see `enable_output_buffer`).\n\nNote that this is not a hard bound: there can be a small delay between\nthe backpressure mechanism is triggered and the endpoint is paused, during\nwhich more data may be queued.\n\nThe default is 1 million.",
"nullable": true,
"minimum": 0
},
"max_worker_batch_size": {
Expand Down
Loading