Skip to content

feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler#5720

Open
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:materialized-execution-validation
Open

feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler#5720
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:materialized-execution-validation

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it:

  • PhysicalOp gains requiresMaterializedExecution: Boolean = false (+ a withRequiresMaterializedExecution builder). It is a physical-execution property, so it lives on the physical op.
  • CostBasedScheduleGenerator consumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged.

Default false ⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op.

Any related issues, documentation, discussions?

Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of #5700. Reflects the review discussion with @Yicong-Huang: the property belongs on PhysicalOp, and it is consumed by the scheduler.

How was this PR tested?

WorkflowCoreTypesSpec covers the PhysicalOp.requiresMaterializedExecution default + builder. WorkflowExecutionService/Test/compile, scalafixAll --check, and scalafmtCheckAll pass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land.

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

…submission

Add a general LogicalOp.requiresMaterializedExecution flag (default false) and
WorkflowExecutionService.validateExecutionMode, which rejects a non-MATERIALIZED
submission when any operator requires materialized execution -- keyed off the
flag, not a specific operator class. Default false keeps all existing behavior;
operators opt in by overriding the flag.

Split out of the loop-operators PR (apache#5700) to keep that PR
reviewable; the loop operators set the flag.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an operator-declared capability flag to enforce execution-mode constraints at workflow submission time, preventing workflows that require materialization from being run in PIPELINED mode (avoiding UI/engine mode mismatches).

Changes:

  • Added LogicalOp.requiresMaterializedExecution: Boolean = false for operators to declare a MATERIALIZED-only requirement.
  • Introduced WorkflowExecutionService.validateExecutionMode(operators, settings) to reject incompatible submissions with an actionable error message.
  • Added ScalaTest coverage for the default flag value and validate/reject/accept paths.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala Adds requiresMaterializedExecution flag (default false) for operator-declared mode requirements.
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala Implements and invokes submission-time execution mode validation keyed off the new flag.
amber/src/test/scala/org/apache/texera/web/service/WorkflowExecutionServiceSpec.scala Adds unit tests covering the flag default and validation behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@codecov-commenter

codecov-commenter commented Jun 15, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 52.71%. Comparing base (83df2b5) to head (2bbf760).

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5720      +/-   ##
============================================
- Coverage     52.93%   52.71%   -0.22%     
- Complexity     2626     2627       +1     
============================================
  Files          1090     1089       -1     
  Lines         42208    42175      -33     
  Branches       4534     4535       +1     
============================================
- Hits          22341    22231     -110     
- Misses        18557    18645      +88     
+ Partials       1310     1299      -11     
Flag Coverage Δ *Carryforward flag
access-control-service 70.91% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from f577a4a
amber 53.13% <100.00%> (-0.01%) ⬇️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 56.71% <ø> (ø)
file-service 57.06% <ø> (ø)
frontend 47.35% <ø> (-0.52%) ⬇️ Carriedforward from f577a4a
pyamber 90.71% <ø> (+0.93%) ⬆️ Carriedforward from f577a4a
python 90.73% <ø> (ø) Carriedforward from f577a4a
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
…e#5720)

The requiresMaterializedExecution flag + WorkflowExecutionService.validateExecutionMode
are split out to apache#5720. loop-feb keeps the flag (set on the loop
operators via LoopOpDesc) and the submission-time validation call; drop the
loop-based WorkflowExecutionServiceSpec (the split adds a stub-based one on
main) and move the loop-flag coverage into LoopStart/EndOpDescSpec.
@github-actions

github-actions Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 2 better · 🔴 6 worse · ⚪ 7 noise (<±5%) · 0 without baseline

Compared against main 83df2b5 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 410 0.25 24,164/30,424/30,424 us 🟢 -15.1% / 🟢 -13.0%
🔴 bs=100 sw=10 sl=64 938 0.572 105,709/132,360/132,360 us 🔴 +11.9% / 🟢 -5.9%
bs=1000 sw=10 sl=64 1,132 0.691 884,705/917,905/917,905 us ⚪ within ±5% / 🟢 -10.3%
Baseline details

Latest main 83df2b5 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 410 tuples/sec 438 tuples/sec 410.82 tuples/sec -6.4% -0.2%
bs=10 sw=10 sl=64 MB/s 0.25 MB/s 0.267 MB/s 0.251 MB/s -6.4% -0.3%
bs=10 sw=10 sl=64 p50 24,164 us 21,108 us 23,785 us +14.5% +1.6%
bs=10 sw=10 sl=64 p95 30,424 us 35,830 us 34,980 us -15.1% -13.0%
bs=10 sw=10 sl=64 p99 30,424 us 35,830 us 34,980 us -15.1% -13.0%
bs=100 sw=10 sl=64 throughput 938 tuples/sec 983 tuples/sec 891.94 tuples/sec -4.6% +5.2%
bs=100 sw=10 sl=64 MB/s 0.572 MB/s 0.6 MB/s 0.544 MB/s -4.7% +5.1%
bs=100 sw=10 sl=64 p50 105,709 us 99,028 us 112,277 us +6.7% -5.9%
bs=100 sw=10 sl=64 p95 132,360 us 118,235 us 139,802 us +11.9% -5.3%
bs=100 sw=10 sl=64 p99 132,360 us 118,235 us 139,802 us +11.9% -5.3%
bs=1000 sw=10 sl=64 throughput 1,132 tuples/sec 1,123 tuples/sec 1,041 tuples/sec +0.8% +8.7%
bs=1000 sw=10 sl=64 MB/s 0.691 MB/s 0.685 MB/s 0.635 MB/s +0.9% +8.8%
bs=1000 sw=10 sl=64 p50 884,705 us 897,490 us 972,714 us -1.4% -9.0%
bs=1000 sw=10 sl=64 p95 917,905 us 939,450 us 1,023,057 us -2.3% -10.3%
bs=1000 sw=10 sl=64 p99 917,905 us 939,450 us 1,023,057 us -2.3% -10.3%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,487.65,200,128000,410,0.250,24164.33,30423.92,30423.92
1,100,10,64,20,2132.92,2000,1280000,938,0.572,105709.23,132360.14,132360.14
2,1000,10,64,20,17666.74,20000,12800000,1132,0.691,884705.37,917904.68,917904.68

@Yicong-Huang

Yicong-Huang commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

had a discussion with @aglinxinyuan, two comments:

  1. I think this property belongs to the physical execution and should be a property of a physical op, not logical.
  2. From the PR's perspective, it is not iideal to introduce a property without having a consumer of the property. To keep the PR scope small, the consumer can be simple. For instance, I assume this property should be consumed by scheduler (?), so a logic like this
if any(physicalOp.requireMaterializedExecution):
    // TODO: require non-pipeline mode

 // original pipeline execution mode logic
   .....

would suffice. Note that this simple consumer can help reviewer to understand how this property will be used.

… + consume in scheduler

Per @Yicong-Huang's review on apache#5720:
- It is a physical-execution property, so move the flag from LogicalOp to
  PhysicalOp (+ a withRequiresMaterializedExecution builder).
- Give it a real consumer in the scheduler: CostBasedScheduleGenerator forces a
  fully-materialized schedule when any physical op requires it, instead of
  validating at submission. Drops WorkflowExecutionService.validateExecutionMode
  and its spec.

Default false -> dormant and behavior-preserving (no operator requires it yet);
the loop operators set it on their PhysicalOp.
@aglinxinyuan aglinxinyuan changed the title feat(execution): add requiresMaterializedExecution flag, validate at workflow submission feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler Jun 15, 2026
@aglinxinyuan

Copy link
Copy Markdown
Contributor Author

Addressed both points in 01c8603:

  1. Moved the flag to PhysicalOp (it's a physical-execution property) + a withRequiresMaterializedExecution builder; removed it from LogicalOp.
  2. Real consumer in the scheduler: CostBasedScheduleGenerator now forces a fully-materialized schedule when any physical op requires it (otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged). Dropped the submission-time WorkflowExecutionService.validateExecutionMode.

Default false, so it's dormant and behavior-preserving today; the loop operators set it on their PhysicalOp. PR is now 3 files: PhysicalOp, CostBasedScheduleGenerator, WorkflowCoreTypesSpec.

aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
…ion on PhysicalOp

apache#5720 was redesigned: the flag moved LogicalOp -> PhysicalOp and the consumer
moved from WorkflowExecutionService.validateExecutionMode to the scheduler
(CostBasedScheduleGenerator). Match that here:
  - mechanism files (PhysicalOp flag + builder, CostBasedScheduleGenerator
    consumer, WorkflowCoreTypesSpec test) brought in line with apache#5720;
    WorkflowExecutionService reverted (validateExecutionMode dropped).
  - LoopOpDesc.getPhysicalOp sets requiresMaterializedExecution on the physical
    op (Loop Start and Loop End); dropped the LogicalOp-level override.
  - LoopStart/EndOpDescSpec assert the flag via getPhysicalOp.
Comment on lines +307 to +315
// An operator may require a fully-materialized schedule (e.g. a loop,
// whose back-edge is a cross-region materialized state channel). When any
// does, materialize fully regardless of the requested execution mode.
val effectiveMode =
if (physicalPlan.operators.exists(_.requiresMaterializedExecution))
ExecutionMode.MATERIALIZED
else
workflowContext.workflowSettings.executionMode
effectiveMode match {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a test to cover these lines

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 2bbf760 — extracted the schedule-mode decision into CostBasedScheduleGenerator.effectiveExecutionMode(physicalPlan, requestedMode) and added two CostBasedScheduleGeneratorSpec cases: an operator that requires materialization forces MATERIALIZED even when PIPELINED is requested, and a plan with no such operator keeps the requested mode.

@Yicong-Huang Yicong-Huang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left one comment inline

…onsumer

Address @Yicong-Huang's review on apache#5720: extract the schedule-mode
decision in CostBasedScheduleGenerator into a testable `effectiveExecutionMode`
helper (forces MATERIALIZED when any physical op requires it, otherwise the
requested mode) and add CostBasedScheduleGeneratorSpec cases covering both
branches.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 15, 2026
…rom apache#5720

apache#5720 extracted the schedule-mode decision into
CostBasedScheduleGenerator.effectiveExecutionMode and added test coverage. Sync
the helper refactor + tests here so the files match apache#5720 (loop ops set the
flag, so the helper forces a materialized schedule for loop workflows).
@Yicong-Huang

Copy link
Copy Markdown
Contributor

Thanks. Let's get @Xiao-zhen-Liu's approval before merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add a requiresMaterializedExecution flag and enforce it at workflow submission

4 participants