Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/pipeline-manager/src/db/operations/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ pub(crate) async fn update_pipeline(
{
not_allowed.push("`runtime_config.resources.namespace`");
}
if runtime_config
.get("resources")
.map(|v| v.get("storage_class"))
!= current
.runtime_config
.get("resources")
.map(|v| v.get("storage_class"))
{
not_allowed.push("`runtime_config.resources.storage_class`");
}
}

if !not_allowed.is_empty() {
Expand Down
43 changes: 23 additions & 20 deletions crates/pipeline-manager/src/db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3912,8 +3912,11 @@ impl ModelHelpers for Mutex<DbModel> {
if runtime_config.get("workers") != pipeline.runtime_config.get("workers") {
not_allowed.push("`runtime_config.workers`");
}
if runtime_config.get("storage") != pipeline.runtime_config.get("storage") {
not_allowed.push("`runtime_config.storage`");
let one = json!(1);
if runtime_config.get("hosts").unwrap_or(&one)
!= pipeline.runtime_config.get("hosts").unwrap_or(&one)
{
not_allowed.push("`runtime_config.hosts`");
}
if runtime_config.get("fault_tolerance")
!= pipeline.runtime_config.get("fault_tolerance")
Expand All @@ -3930,24 +3933,24 @@ impl ModelHelpers for Mutex<DbModel> {
{
not_allowed.push("`runtime_config.resources.storage_mb_max`");
}
}
if program_code
.as_ref()
.is_some_and(|v| *v != pipeline.program_code)
{
not_allowed.push("`program_code`")
}
if udf_rust.as_ref().is_some_and(|v| *v != pipeline.udf_rust) {
not_allowed.push("`udf_rust`")
}
if udf_toml.as_ref().is_some_and(|v| *v != pipeline.udf_toml) {
not_allowed.push("`udf_toml`")
}
if program_config
.as_ref()
.is_some_and(|v| *v != pipeline.program_config)
{
not_allowed.push("`program_config`")
if runtime_config.get("resources").map(|v| v.get("namespace"))
!= pipeline
.runtime_config
.get("resources")
.map(|v| v.get("namespace"))
{
not_allowed.push("`runtime_config.resources.namespace`");
}
if runtime_config
.get("resources")
.map(|v| v.get("storage_class"))
!= pipeline
.runtime_config
.get("resources")
.map(|v| v.get("storage_class"))
{
not_allowed.push("`runtime_config.resources.storage_class`");
}
}
if !not_allowed.is_empty() {
return Err(DBError::EditRestrictedToClearedStorage {
Expand Down
2 changes: 2 additions & 0 deletions docs.feldera.com/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import TabItem from '@theme/TabItem';

## Unreleased

- No longer allowed to edit `runtime_config.resources.storage_class` if the pipeline storage is not cleared.

- Calling `/start` on a pipeline that already failed to compile will directly return an error instead of
the runner later on setting the `deployment_error` during its check whether to proceed to provisioning.

Expand Down
13 changes: 13 additions & 0 deletions docs.feldera.com/docs/pipelines/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ Make sure to appropriately size resource limits (memory and storage), the number
<ApiSchema pointer="#/components/schemas/RuntimeConfig"/>
</div>

#### Runtime configuration edit restrictions

The runtime configuration cannot be edited at all if the pipeline is not stopped.

If the pipeline is stopped, but its storage has not been cleared, the following runtime configuration fields cannot be
edited:
- `workers`
- `hosts`
- `fault_tolerance`
- `resources.storage_mb_max`
- `resources.namespace`
- `resources.storage_class`

### Program configuration

The "optimized" compilation profile (default) should be used when running production pipelines where performance is important.
Expand Down
120 changes: 100 additions & 20 deletions python/tests/platform/test_pipeline_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,44 +691,124 @@ def test_refresh_version_due_to_status_changes(pipeline_name):
)


@gen_pipeline_name
def test_resources_namespace_edit_needs_cleared_storage(pipeline_name):
pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace()
def helper_test_restricted_runtime_config_edit(
pipeline_name, pipeline, field, edit, retrieve, new_value
):
pipeline.start()
pipeline.stop(force=True)

# Attempt to patch without cleared storage will fail
runtime_config = TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[
"runtime_config"
]
runtime_config["resources"]["namespace"] = "example"
runtime_config: dict = TEST_CLIENT.http.get(
f"/pipelines/{pipeline_name}?selector=all"
)["runtime_config"]
runtime_config.update(edit)
error = None
try:
TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config=runtime_config)
except FelderaAPIError as e:
error = e
assert (
error is not None
and error.error_code == "EditRestrictedToClearedStorage"
and error.details["not_allowed"] == ["`runtime_config.resources.namespace`"]
assert error is not None, f"assert failed for: {field} -- error was: {error}"
assert error.error_code == "EditRestrictedToClearedStorage", (
f"assert failed for: {field} -- error was: {error}"
)
assert (
TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[
"runtime_config"
]["resources"]["namespace"]
is None
assert error.details["not_allowed"] == [field], (
f"assert failed for: {field} -- error was: {error}"
)
assert (
retrieve(
TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[
"runtime_config"
]
)
!= new_value
), f"assert failed for: {field}"

# Clear storage
pipeline.clear_storage()

# After clearing storage, it should work
TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config=runtime_config)
assert (
TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[
"runtime_config"
]["resources"]["namespace"]
== "example"
retrieve(
TEST_CLIENT.http.get(f"/pipelines/{pipeline_name}?selector=all")[
"runtime_config"
]
)
== new_value
), f"assert failed for: {field}"

# Clear runtime config for the next field to test
TEST_CLIENT.patch_pipeline(name=pipeline_name, runtime_config={})


@gen_pipeline_name
def test_runtime_config_edit_restricted(pipeline_name):
pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace()

# runtime_config.workers
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.workers`",
{"workers": 16},
lambda r: r["workers"],
16,
)

# runtime_config.resources.storage_mb_max
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.resources.storage_mb_max`",
{"resources": {"storage_mb_max": 10000}},
lambda r: r["resources"]["storage_mb_max"],
10000,
)

# runtime_config.resources.namespace
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.resources.namespace`",
{"resources": {"namespace": "example"}},
lambda r: r["resources"]["namespace"],
"example",
)

# runtime_config.resources.storage_class
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.resources.storage_class`",
{"resources": {"storage_class": "example"}},
lambda r: r["resources"]["storage_class"],
"example",
)


@gen_pipeline_name
@enterprise_only
def test_runtime_config_edit_restricted_enterprise(pipeline_name):
pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, "").create_or_replace()

# runtime_config.hosts
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.hosts`",
{"hosts": 2},
lambda r: r["hosts"],
2,
)

# runtime_config.fault_tolerance
helper_test_restricted_runtime_config_edit(
pipeline_name,
pipeline,
"`runtime_config.fault_tolerance`",
{"fault_tolerance": {"model": "exactly_once", "checkpoint_interval_secs": 60}},
lambda r: r["fault_tolerance"],
{"model": "exactly_once", "checkpoint_interval_secs": 60},
)


Expand Down
Loading