feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler#5720
feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler#5720aglinxinyuan wants to merge 4 commits into
Conversation
…submission Add a general LogicalOp.requiresMaterializedExecution flag (default false) and WorkflowExecutionService.validateExecutionMode, which rejects a non-MATERIALIZED submission when any operator requires materialized execution -- keyed off the flag, not a specific operator class. Default false keeps all existing behavior; operators opt in by overriding the flag. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the loop operators set the flag.
There was a problem hiding this comment.
Pull request overview
Adds an operator-declared capability flag to enforce execution-mode constraints at workflow submission time, preventing workflows that require materialization from being run in PIPELINED mode (avoiding UI/engine mode mismatches).
Changes:
- Added
LogicalOp.requiresMaterializedExecution: Boolean = falsefor operators to declare a MATERIALIZED-only requirement. - Introduced
WorkflowExecutionService.validateExecutionMode(operators, settings)to reject incompatible submissions with an actionable error message. - Added ScalaTest coverage for the default flag value and validate/reject/accept paths.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala | Adds requiresMaterializedExecution flag (default false) for operator-declared mode requirements. |
| amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala | Implements and invokes submission-time execution mode validation keyed off the new flag. |
| amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala | Adds unit tests covering the flag default and validation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5720 +/- ##
============================================
- Coverage 52.93% 52.71% -0.22%
- Complexity 2626 2627 +1
============================================
Files 1090 1089 -1
Lines 42208 42175 -33
Branches 4534 4535 +1
============================================
- Hits 22341 22231 -110
- Misses 18557 18645 +88
+ Partials 1310 1299 -11
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…e#5720) The requiresMaterializedExecution flag + WorkflowExecutionService.validateExecutionMode are split out to apache#5720. loop-feb keeps the flag (set on the loop operators via LoopOpDesc) and the submission-time validation call; drop the loop-based WorkflowExecutionServiceSpec (the split adds a stub-based one on main) and move the loop-flag coverage into LoopStart/EndOpDescSpec.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 410 | 0.25 | 24,164/30,424/30,424 us | 🟢 -15.1% / 🟢 -13.0% |
| 🔴 | bs=100 sw=10 sl=64 | 938 | 0.572 | 105,709/132,360/132,360 us | 🔴 +11.9% / 🟢 -5.9% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,132 | 0.691 | 884,705/917,905/917,905 us | ⚪ within ±5% / 🟢 -10.3% |
Baseline details
Latest main 83df2b5 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 410 tuples/sec | 438 tuples/sec | 410.82 tuples/sec | -6.4% | -0.2% |
| bs=10 sw=10 sl=64 | MB/s | 0.25 MB/s | 0.267 MB/s | 0.251 MB/s | -6.4% | -0.3% |
| bs=10 sw=10 sl=64 | p50 | 24,164 us | 21,108 us | 23,785 us | +14.5% | +1.6% |
| bs=10 sw=10 sl=64 | p95 | 30,424 us | 35,830 us | 34,980 us | -15.1% | -13.0% |
| bs=10 sw=10 sl=64 | p99 | 30,424 us | 35,830 us | 34,980 us | -15.1% | -13.0% |
| bs=100 sw=10 sl=64 | throughput | 938 tuples/sec | 983 tuples/sec | 891.94 tuples/sec | -4.6% | +5.2% |
| bs=100 sw=10 sl=64 | MB/s | 0.572 MB/s | 0.6 MB/s | 0.544 MB/s | -4.7% | +5.1% |
| bs=100 sw=10 sl=64 | p50 | 105,709 us | 99,028 us | 112,277 us | +6.7% | -5.9% |
| bs=100 sw=10 sl=64 | p95 | 132,360 us | 118,235 us | 139,802 us | +11.9% | -5.3% |
| bs=100 sw=10 sl=64 | p99 | 132,360 us | 118,235 us | 139,802 us | +11.9% | -5.3% |
| bs=1000 sw=10 sl=64 | throughput | 1,132 tuples/sec | 1,123 tuples/sec | 1,041 tuples/sec | +0.8% | +8.7% |
| bs=1000 sw=10 sl=64 | MB/s | 0.691 MB/s | 0.685 MB/s | 0.635 MB/s | +0.9% | +8.8% |
| bs=1000 sw=10 sl=64 | p50 | 884,705 us | 897,490 us | 972,714 us | -1.4% | -9.0% |
| bs=1000 sw=10 sl=64 | p95 | 917,905 us | 939,450 us | 1,023,057 us | -2.3% | -10.3% |
| bs=1000 sw=10 sl=64 | p99 | 917,905 us | 939,450 us | 1,023,057 us | -2.3% | -10.3% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,487.65,200,128000,410,0.250,24164.33,30423.92,30423.92
1,100,10,64,20,2132.92,2000,1280000,938,0.572,105709.23,132360.14,132360.14
2,1000,10,64,20,17666.74,20000,12800000,1132,0.691,884705.37,917904.68,917904.68|
had a discussion with @aglinxinyuan, two comments:
would suffice. Note that this simple consumer can help reviewer to understand how this property will be used. |
… + consume in scheduler Per @Yicong-Huang's review on apache#5720: - It is a physical-execution property, so move the flag from LogicalOp to PhysicalOp (+ a withRequiresMaterializedExecution builder). - Give it a real consumer in the scheduler: CostBasedScheduleGenerator forces a fully-materialized schedule when any physical op requires it, instead of validating at submission. Drops WorkflowExecutionService.validateExecutionMode and its spec. Default false -> dormant and behavior-preserving (no operator requires it yet); the loop operators set it on their PhysicalOp.
|
Addressed both points in
Default |
…ion on PhysicalOp apache#5720 was redesigned: the flag moved LogicalOp -> PhysicalOp and the consumer moved from WorkflowExecutionService.validateExecutionMode to the scheduler (CostBasedScheduleGenerator). Match that here: - mechanism files (PhysicalOp flag + builder, CostBasedScheduleGenerator consumer, WorkflowCoreTypesSpec test) brought in line with apache#5720; WorkflowExecutionService reverted (validateExecutionMode dropped). - LoopOpDesc.getPhysicalOp sets requiresMaterializedExecution on the physical op (Loop Start and Loop End); dropped the LogicalOp-level override. - LoopStart/EndOpDescSpec assert the flag via getPhysicalOp.
| // An operator may require a fully-materialized schedule (e.g. a loop, | ||
| // whose back-edge is a cross-region materialized state channel). When any | ||
| // does, materialize fully regardless of the requested execution mode. | ||
| val effectiveMode = | ||
| if (physicalPlan.operators.exists(_.requiresMaterializedExecution)) | ||
| ExecutionMode.MATERIALIZED | ||
| else | ||
| workflowContext.workflowSettings.executionMode | ||
| effectiveMode match { |
There was a problem hiding this comment.
let's add a test to cover these lines
There was a problem hiding this comment.
Done in 2bbf760 — extracted the schedule-mode decision into CostBasedScheduleGenerator.effectiveExecutionMode(physicalPlan, requestedMode) and added two CostBasedScheduleGeneratorSpec cases: an operator that requires materialization forces MATERIALIZED even when PIPELINED is requested, and a plan with no such operator keeps the requested mode.
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM, left one comment inline
…onsumer Address @Yicong-Huang's review on apache#5720: extract the schedule-mode decision in CostBasedScheduleGenerator into a testable `effectiveExecutionMode` helper (forces MATERIALIZED when any physical op requires it, otherwise the requested mode) and add CostBasedScheduleGeneratorSpec cases covering both branches.
…rom apache#5720 apache#5720 extracted the schedule-mode decision into CostBasedScheduleGenerator.effectiveExecutionMode and added test coverage. Sync the helper refactor + tests here so the files match apache#5720 (loop ops set the flag, so the helper forces a materialized schedule for loop workflows).
|
Thanks. Let's get @Xiao-zhen-Liu's approval before merging |
What changes were proposed in this PR?
Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it:
PhysicalOpgainsrequiresMaterializedExecution: Boolean = false(+ awithRequiresMaterializedExecutionbuilder). It is a physical-execution property, so it lives on the physical op.CostBasedScheduleGeneratorconsumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged.Default
false⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op.Any related issues, documentation, discussions?
Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of #5700. Reflects the review discussion with @Yicong-Huang: the property belongs on
PhysicalOp, and it is consumed by the scheduler.How was this PR tested?
WorkflowCoreTypesSpeccovers thePhysicalOp.requiresMaterializedExecutiondefault + builder.WorkflowExecutionService/Test/compile,scalafixAll --check, andscalafmtCheckAllpass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.