diff --git a/crates/pipeline-manager/src/db/operations/pipeline.rs b/crates/pipeline-manager/src/db/operations/pipeline.rs index 4f5c95bd136..851da2817bb 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline.rs @@ -449,6 +449,16 @@ pub(crate) async fn update_pipeline( { not_allowed.push("`runtime_config.resources.namespace`"); } + if runtime_config + .get("resources") + .map(|v| v.get("storage_class")) + != current + .runtime_config + .get("resources") + .map(|v| v.get("storage_class")) + { + not_allowed.push("`runtime_config.resources.storage_class`"); + } } if !not_allowed.is_empty() { diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index 680cdf8a4c6..ce59ac51525 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -3912,8 +3912,11 @@ impl ModelHelpers for Mutex { if runtime_config.get("workers") != pipeline.runtime_config.get("workers") { not_allowed.push("`runtime_config.workers`"); } - if runtime_config.get("storage") != pipeline.runtime_config.get("storage") { - not_allowed.push("`runtime_config.storage`"); + let one = json!(1); + if runtime_config.get("hosts").unwrap_or(&one) + != pipeline.runtime_config.get("hosts").unwrap_or(&one) + { + not_allowed.push("`runtime_config.hosts`"); } if runtime_config.get("fault_tolerance") != pipeline.runtime_config.get("fault_tolerance") @@ -3930,24 +3933,24 @@ impl ModelHelpers for Mutex { { not_allowed.push("`runtime_config.resources.storage_mb_max`"); } - } - if program_code - .as_ref() - .is_some_and(|v| *v != pipeline.program_code) - { - not_allowed.push("`program_code`") - } - if udf_rust.as_ref().is_some_and(|v| *v != pipeline.udf_rust) { - not_allowed.push("`udf_rust`") - } - if udf_toml.as_ref().is_some_and(|v| *v != pipeline.udf_toml) { - not_allowed.push("`udf_toml`") - } - if program_config - .as_ref() - .is_some_and(|v| *v != pipeline.program_config) - { - not_allowed.push("`program_config`") + if runtime_config.get("resources").map(|v| v.get("namespace")) + != pipeline + .runtime_config + .get("resources") + .map(|v| v.get("namespace")) + { + not_allowed.push("`runtime_config.resources.namespace`"); + } + if runtime_config + .get("resources") + .map(|v| v.get("storage_class")) + != pipeline + .runtime_config + .get("resources") + .map(|v| v.get("storage_class")) + { + not_allowed.push("`runtime_config.resources.storage_class`"); + } } if !not_allowed.is_empty() { return Err(DBError::EditRestrictedToClearedStorage { diff --git a/docs.feldera.com/docs/changelog.md b/docs.feldera.com/docs/changelog.md index 0635097e487..332e4c87522 100644 --- a/docs.feldera.com/docs/changelog.md +++ b/docs.feldera.com/docs/changelog.md @@ -14,6 +14,8 @@ import TabItem from '@theme/TabItem'; ## Unreleased + - No longer allowed to edit `runtime_config.resources.storage_class` if the pipeline storage is not cleared. + - Calling `/start` on a pipeline that already failed to compile will directly return an error instead of the runner later on setting the `deployment_error` during its check whether to proceed to provisioning. diff --git a/docs.feldera.com/docs/pipelines/configuration.mdx b/docs.feldera.com/docs/pipelines/configuration.mdx index a3476bfd8bc..12e559ce671 100644 --- a/docs.feldera.com/docs/pipelines/configuration.mdx +++ b/docs.feldera.com/docs/pipelines/configuration.mdx @@ -51,6 +51,19 @@ Make sure to appropriately size resource limits (memory and storage), the number +#### Runtime configuration edit restrictions + +The runtime configuration cannot be edited at all if the pipeline is not stopped. + +If the pipeline is stopped, but its storage has not been cleared, the following runtime configuration fields cannot be +edited: +- `workers` +- `hosts` +- `fault_tolerance` +- `resources.storage_mb_max` +- `resources.namespace` +- `resources.storage_class` + ### Program configuration The "optimized" compilation profile (default) should be used when running production pipelines where performance is important. diff --git a/python/tests/platform/test_pipeline_lifecycle.py b/python/tests/platform/test_pipeline_lifecycle.py index f775b3a2028..2c6e61b02ec 100644 --- a/python/tests/platform/test_pipeline_lifecycle.py +++ b/python/tests/platform/test_pipeline_lifecycle.py @@ -691,33 +691,37 @@ def test_refresh_version_due_to_status_changes(pipeline_name): ) -@gen_pipeline_name -def test_resources_namespace_edit_needs_cleared_storage(pipeline_name): - pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace() +def helper_test_restricted_runtime_config_edit( + pipeline_name, pipeline, field, edit, retrieve, new_value +): pipeline.start() pipeline.stop(force=True) # Attempt to patch without cleared storage will fail - runtime_config = TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[ - "runtime_config" - ] - runtime_config["resources"]["namespace"] = "example" + runtime_config: dict = TEST_CLIENT.http.get( + f"/pipelines/{pipeline_name}?selector=all" + )["runtime_config"] + runtime_config.update(edit) error = None try: TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config=runtime_config) except FelderaAPIError as e: error = e - assert ( - error is not None - and error.error_code == "EditRestrictedToClearedStorage" - and error.details["not_allowed"] == ["`runtime_config.resources.namespace`"] + assert error is not None, f"assert failed for: {field} -- error was: {error}" + assert error.error_code == "EditRestrictedToClearedStorage", ( + f"assert failed for: {field} -- error was: {error}" ) - assert ( - TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[ - "runtime_config" - ]["resources"]["namespace"] - is None + assert error.details["not_allowed"] == [field], ( + f"assert failed for: {field} -- error was: {error}" ) + assert ( + retrieve( + TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[ + "runtime_config" + ] + ) + != new_value + ), f"assert failed for: {field}" # Clear storage pipeline.clear_storage() @@ -725,10 +729,86 @@ def test_resources_namespace_edit_needs_cleared_storage(pipeline_name): # After clearing storage, it should work TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config=runtime_config) assert ( - TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[ - "runtime_config" - ]["resources"]["namespace"] - == "example" + retrieve( + TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[ + "runtime_config" + ] + ) + == new_value + ), f"assert failed for: {field}" + + # Clear runtime config for the next field to test + TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config={}) + + +@gen_pipeline_name +def test_runtime_config_edit_restricted(pipeline_name): + pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace() + + # runtime_config.workers + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.workers`", + {"workers": 16}, + lambda r: r["workers"], + 16, + ) + + # runtime_config.resources.storage_mb_max + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.resources.storage_mb_max`", + {"resources": {"storage_mb_max": 10000}}, + lambda r: r["resources"]["storage_mb_max"], + 10000, + ) + + # runtime_config.resources.namespace + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.resources.namespace`", + {"resources": {"namespace": "example"}}, + lambda r: r["resources"]["namespace"], + "example", + ) + + # runtime_config.resources.storage_class + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.resources.storage_class`", + {"resources": {"storage_class": "example"}}, + lambda r: r["resources"]["storage_class"], + "example", + ) + + +@gen_pipeline_name +@enterprise_only +def test_runtime_config_edit_restricted_enterprise(pipeline_name): + pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace() + + # runtime_config.hosts + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.hosts`", + {"hosts": 2}, + lambda r: r["hosts"], + 2, + ) + + # runtime_config.fault_tolerance + helper_test_restricted_runtime_config_edit( + pipeline_name, + pipeline, + "`runtime_config.fault_tolerance`", + {"fault_tolerance": {"model": "exactly_once", "checkpoint_interval_secs": 60}}, + lambda r: r["fault_tolerance"], + {"model": "exactly_once", "checkpoint_interval_secs": 60}, )