Skip to content

feat: Re-spill sort stream if unable to reserve for 2 streams#22945

Open
EmilyMatt wants to merge 8 commits into
apache:mainfrom
EmilyMatt:sort-skew-resilience
Open

feat: Re-spill sort stream if unable to reserve for 2 streams#22945
EmilyMatt wants to merge 8 commits into
apache:mainfrom
EmilyMatt:sort-skew-resilience

Conversation

@EmilyMatt

Copy link
Copy Markdown
Contributor

Rationale for this change

I've encountered several cases where the merge reservation cannot acquire the minimum reservation needed(2 streams, with a buffer size of 1), we currently error in these cases but that's not a necessity.
I've implemented a simple re-spill mechanism for when the first 2 streams cannot be reserved: we take the larger of those streams, and we re-spill it with all its batches split in half(done using slice() so no copying happens at that stage and we'll have a smaller memory peak)
This converges until we have enough memory to perform the merge(because max_record_batch_size is halved, ideally)
I've encountered this in situations where there is heavy skew, so maybe in the future might be worth it running this in general whenever one stream has a max_record_batch_size that is far above the other streams, could greatly improve performance of the entire merge stream at the cost of re-spilling once.

What changes are included in this PR?

Aforementioned implementation

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 14, 2026
@rluvaton

Copy link
Copy Markdown
Member

Could you please add a test in memory constraint env fuzz testing we have?

@github-actions github-actions Bot added core Core DataFusion crate auto detected api change Auto detected API change labels Jun 14, 2026
@EmilyMatt

Copy link
Copy Markdown
Contributor Author

I'm not sure what the supposed API change is, as all the functions I've modified are private 🤔 and the "details" tab shows unrelated stuff

@2010YOUY01 2010YOUY01 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to construct a end-to-end reproducer (ideally at SQL level) for the following reasons.

  • I think this implementation makes sense to me, but it inevatiably introduced some complexity inside operator, if we can know the end-to-end goal from the specific workload, we can try to think about is there any easier alternative.
  • The UTs are very low-level, if we decide to refactor the implementation someday, this coverage is very likely to get lost, an e2e test would be more robust to the rapidly moving codebase.

@github-actions github-actions Bot removed the auto detected api change Auto detected API change label Jun 15, 2026
Comment thread datafusion/physical-plan/src/sorts/multi_level_merge.rs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another optimization ideas (not in this PR):

scenerio

Let's say you have 8 files:

M,L,M,M,M,M,M,M

(M is file with medium max batch size, L is file with large max batch size)

in your implementation: you see that you can't merge the first 2 files since L is very large so you spilt L batch size by half and try again
so after spilt you have:

M,L_Split,M,M,M,M,M,M

and now you can merge the first 4 streams: M,L_Split,M,M
and continue as usual.

but what you did was changing batch size for everything (which is required so you don't go back to the same large batch in the worst case scenerio)

but this also harm the performance of the entire multi level merge since you now spill and merge in smaller batches.

My idea is that you can delay the split of L to last so all the sort and spill files before it will use the old batch size but only the last one will use the smaller batch size and this will increase performance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are very smart Raz ;)

Comment on lines +442 to +447
if number_of_spills_to_read_for_current_phase == 0 {
// We couldn't even reserve a single stream - one record batch
// is larger than the whole merge budget. That's the lone-batch
// case, not the 2-stream merge skew we rescue here - surface it.
return Err(err);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can can possibly still split batch size into 2, the calculation for amount of memory for merge stream is more than what is needed for only splitting a batch in 2, so you can try to check if have enough memory for splitting in 2

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worst case is still 2

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or aspiring to 2 on a level where it doesn't matter(1 huge string with everything else nulls, for example)

Comment thread datafusion/physical-plan/src/sorts/multi_level_merge.rs
@@ -182,7 +191,17 @@ impl MultiLevelMergeBuilder {

async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please try to revert the batch size back to the original value for the last stream if it is possible to stay in the limit?

so if the original very large batch was spread across many rows that are now in different batches or files we may still keep the batch size as one last try

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to as this complicates the code, I don't really have a guarantee that the last stream is not a part of a merge with the results of the first streams, as there can be multiple levels, I don't wanna fuck with that

/// next attempt can seat both streams. One stream's worth of memory is reserved
/// for the duration and freed afterwards. Makes the merge resilient to skew.
async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> {
let target = self.sorted_spill_files.remove(index);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid shifting everything can we swap the index with the last index, then pop, create a new file, push and swap again.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied against my better judgment, as this is both negligible compared to the work done here and makes the code more complex.
(and besides the point may even be optimized out by the compiler, but that is untested)

@EmilyMatt

Copy link
Copy Markdown
Contributor Author

It would be great to construct a end-to-end reproducer (ideally at SQL level) for the following reasons.

* I think this implementation makes sense to me, but it inevatiably introduced some complexity inside operator, if we can know the end-to-end goal from the specific workload, we can try to think about is there any easier alternative.

* The UTs are very low-level, if we decide to refactor the implementation someday, this coverage is very likely to get lost, an e2e test would be more robust to the rapidly moving codebase.

I believe I've added an e2e test, but let me know if it is not what you intended. can confirm it fails without this implementation

Comment on lines +538 to +541
return resources_err!(
"Cannot merge sorted runs: a single record batch of {old_max} bytes \
exceeds the available merge memory and cannot be split further"
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful (not for now, or even at all) if we could print the largest row size to help the user with understanding the issue


return Err(err);
// buffer_len == 1 and we still can't seat the minimum of 2 streams.
if number_of_spills_to_read_for_current_phase == 0 {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can happen if we have 1 sorted stream and the first spill file is very large, why can't we split it in those cases? (but propagate the error when minimum_number_of_required_streams is greater than 1)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a good expansion but a bigger refactor than I want to do right now, this feature was unobtrusive because the code paths are separated.
Perhaps a good solution would even be to spill the sorted streams and only then call this function, at a small perf cost in exchange for resilience.

Comment on lines +729 to +732
/// Proves the fix: two sorted runs whose largest batches are too big to both
/// be seated in the merge budget at once are re-spilled (halved) until they
/// fit, and the merge then completes with fully sorted, complete output.
/// Before the fix this returned `ResourcesExhausted` instead of merging.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to mention the "fix"

/// which lowers the per-stream merge reservation so the
/// next attempt can seat both streams. One stream's worth of memory is reserved
/// for the duration and freed afterwards. Makes the merge resilient to skew.
async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add log::debug! when this happens

@rluvaton rluvaton left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great idea!

this will make sort even more resilient

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants