Skip to content

python: add support for adhoc query as pyarrow table#5814

Merged
abhizer merged 3 commits into
feldera:mainfrom
monochromatti:arrow-ipc-sdk
Apr 10, 2026
Merged

python: add support for adhoc query as pyarrow table#5814
abhizer merged 3 commits into
feldera:mainfrom
monochromatti:arrow-ipc-sdk

Conversation

@monochromatti

@monochromatti monochromatti commented Mar 13, 2026

Copy link
Copy Markdown
Contributor

Ran tests locally against a running Feldera API.

From python/:

  • Full Python SDK suite (excluding tests/runtime_aggtest):
    • uv run python -m pytest tests/ --ignore=tests/runtime_aggtest -ra
    • Local result: 122 passed, 45 skipped
  • Targeted reruns:
    • uv run python -m pytest tests/platform/test_shared_pipeline.py::TestPipeline::test_adhoc_query_arrow -q
    • uv run python -m pytest tests/unit/test_query_as_arrow.py -q

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

None.


Summary

This PR adds Arrow IPC query support to the Python SDK so ad-hoc query results can be consumed as streamed PyArrow record batches.

What changed

  • Added a new client API:
    • FelderaClient.query_as_arrow(pipeline_name, query) -> Generator[pyarrow.RecordBatch, None, None]
  • Added a pipeline convenience method:
    • Pipeline.query_arrow(query) -> Generator[pyarrow.RecordBatch, None, None]
  • Added optional Arrow dependency extra:
    • pip install "feldera[arrow]"
  • Updated Python README with Arrow installation guidance
  • Added unit and platform tests for Arrow IPC query behavior

Notes

  • The Arrow response is consumed from an HTTP stream (stream=True) and yielded batch-by-batch.
  • Users can materialize a pyarrow.Table when desired via pyarrow.Table.from_batches(...).

@monochromatti monochromatti force-pushed the arrow-ipc-sdk branch 2 times, most recently from 4065f37 to edcaa7e Compare March 13, 2026 06:54

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — but see inline: there is an existing open PR covering the same feature.

Comment thread python/feldera/rest/feldera_client.py
@gz

gz commented Mar 13, 2026

Copy link
Copy Markdown
Contributor

hi @monochromatti this looks good thanks a lot for your contribution. @abhizer can you review this

@monochromatti

Copy link
Copy Markdown
Contributor Author

I'd like input on whether to return Generator[pyarrow.RecordBatch, ...] or a pyarrow.Table directly. The latter is the current state of the PR, but after some thinking it feels like generating batches is more in style with similar existing functionality and better suited for big payloads.

@abhizer abhizer left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

As a heads up, the reason we didn't merge the prior PR is because the server intermittently sent bad data and we were unable to figure out why.

@abhizer

abhizer commented Mar 13, 2026

Copy link
Copy Markdown
Contributor

I'd like input on whether to return Generator[pyarrow.RecordBatch, ...] or a pyarrow.Table directly

We normally return a generator, and it might be a good idea to keep this behavior consistent.

@mihaibudiu

Copy link
Copy Markdown
Contributor

@monochromatti please re-request a review from @abhizer when this is ready again

@monochromatti monochromatti force-pushed the arrow-ipc-sdk branch 2 times, most recently from 379bfe8 to 5f06e6a Compare March 24, 2026 12:16
@monochromatti monochromatti requested a review from abhizer March 24, 2026 12:18

@abhizer abhizer left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@abhizer abhizer changed the title arrow ipc sdk python: add support for adhoc query as pyarrow table Apr 2, 2026
@monochromatti

Copy link
Copy Markdown
Contributor Author

Rebased on main to solve a uv.lock conflict

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@monochromatti

Copy link
Copy Markdown
Contributor Author

Sorry I might be missing something, but the PR still requires an approval to run CI?

@abhizer

abhizer commented Apr 4, 2026

Copy link
Copy Markdown
Contributor

Done!

@mythical-fred

Copy link
Copy Markdown

The "Pre Merge Queue Tasks" CI failure looks transient — the failing step is the Rust build check, but this PR has no Rust changes. The same step failed and then passed for other PRs around the same time. Could someone re-trigger CI?

@mythical-fred

Copy link
Copy Markdown

CI is still showing a failure on "Pre Merge Queue Tasks" from Apr 4 — looks like nobody re-triggered it yet. Could someone queue a fresh run? This is a Python-only PR and that step has been transiently failing for unrelated Rust check reasons.

@abhizer

abhizer commented Apr 6, 2026

Copy link
Copy Markdown
Contributor

You might have to run "ruff format" for it to pass the pre merge queue.

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Mattias Matthiesen <mattias.matthiesen@eviny.no>
Signed-off-by: Mattias Matthiesen <mattias.matthiesen@eviny.no>
Signed-off-by: Mattias Matthiesen <mattias.matthiesen@eviny.no>
@monochromatti

monochromatti commented Apr 8, 2026

Copy link
Copy Markdown
Contributor Author

Updated PR body and solved uv.lock conflict (exclude-newer timestamp). @abhizer

@monochromatti monochromatti requested a review from abhizer April 8, 2026 05:45
@abhizer

abhizer commented Apr 8, 2026

Copy link
Copy Markdown
Contributor

Thank you!

@gz

gz commented Apr 10, 2026

Copy link
Copy Markdown
Contributor

@abhizer can we merge this?

@abhizer abhizer added this pull request to the merge queue Apr 10, 2026
Merged via the queue into feldera:main with commit 0be9804 Apr 10, 2026
1 check passed
@gz

gz commented Apr 12, 2026

Copy link
Copy Markdown
Contributor

@monochromatti there is unfortunately some issue in the arrow streaming that caused some CI tests fail non-deterministically. i reverted for now, but we should be able to bring it back once we fix this

@gz

gz commented Apr 12, 2026

Copy link
Copy Markdown
Contributor

#4287 tracking issue

gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
PR #5814 introduced pyarrow as an optional extra (feldera[arrow]),
gated behind a lazy import that surfaced a 'pip install feldera[arrow]'

We make this a non-optional import here because this is suppsed
to become the default format anyways going forward.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
PR #5814 introduced pyarrow as an optional extra (feldera[arrow]),
gated behind a lazy import that surfaced a 'pip install feldera[arrow]'

We make this a non-optional import here because this is suppsed
to become the default format anyways going forward.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
…tically

Adds a unit test that drives StreamWriter through the existing
ChannelWriter (the sync std::io::Write adapter that spawns one tokio
task per write call) and verifies the receiver-side StreamReader can
parse the byte stream back.

With four record batches and a four-worker tokio runtime the test
fails reliably with the same errors report in issues:

    ParseError("Unable to get root as message:
                RangeOutOfBounds { range: 655360..655364, .. }")
    IpcError("Expected schema message, found empty stream.")

Refs: #3923 #3792 #4287 #4226 #5814
gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
PR #5814 introduced pyarrow as an optional extra (feldera[arrow]),
gated behind a lazy import that surfaced a 'pip install feldera[arrow]'

We make this a non-optional import here because this is suppsed
to become the default format anyways going forward.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
mihaibudiu pushed a commit to mihaibudiu/dbsp that referenced this pull request May 14, 2026
…tically

Adds a unit test that drives StreamWriter through the existing
ChannelWriter (the sync std::io::Write adapter that spawns one tokio
task per write call) and verifies the receiver-side StreamReader can
parse the byte stream back.

With four record batches and a four-worker tokio runtime the test
fails reliably with the same errors report in issues:

    ParseError("Unable to get root as message:
                RangeOutOfBounds { range: 655360..655364, .. }")
    IpcError("Expected schema message, found empty stream.")

Refs: feldera#3923 feldera#3792 feldera#4287 feldera#4226 feldera#5814
mihaibudiu pushed a commit to mihaibudiu/dbsp that referenced this pull request May 14, 2026
…eldera#4226 feldera#5814 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: feldera#3923 feldera#3792 feldera#4287 feldera#4226 feldera#5814 feldera#4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
mihaibudiu pushed a commit to mihaibudiu/dbsp that referenced this pull request May 14, 2026
PR feldera#5814 introduced pyarrow as an optional extra (feldera[arrow]),
gated behind a lazy import that surfaced a 'pip install feldera[arrow]'

We make this a non-optional import here because this is suppsed
to become the default format anyways going forward.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants