Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/tests/platform/fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""PySpark builders for Delta table test fixtures.

Each module here writes a Delta table that exercises a feature only Delta
Spark can produce (deletion vectors, column-mapping schema evolution). They
run as subprocesses via ``tests.utils.ensure_delta_spark_fixture`` and keep
their PySpark imports function-local, so importing this package never pulls in
the JVM/Spark stack.
"""
97 changes: 97 additions & 0 deletions python/tests/platform/fixtures/column_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Build a column-mapped, schema-evolved Delta table fixture with PySpark.

``tests.utils.ensure_delta_spark_fixture`` runs this as a subprocess
(``uv run --with "delta-spark>=4.2,<5" python column_mapping.py <output_dir>
<mode>``), where ``<mode>`` is a ``delta.columnMapping.mode`` value: ``name``
(physical Parquet columns are renamed to ``col-<uuid>``) or ``id`` (columns are
matched by Parquet field ID). PySpark is currently the only writer that can
produce a Delta table with column mapping enabled *and* perform the rename /
drop schema-evolution operations that make logical column names diverge from
the physical ones; neither ``delta-rs`` nor the ``deltalake`` Python wheel can.
DROP COLUMN under column mapping requires delta-spark 4.x (Delta writer v5,
reader v2). PySpark imports stay function-local so importing this module (for
``EXPECTED_ROWS``) never pulls in the JVM/Spark stack.

Changing this builder changes the fixture it produces, but a cached fixture is
reused based on its path alone — bump ``FIXTURE_VERSION`` in
``test_delta_input_column_mapping.py`` on any builder change.

The resulting table's history (one commit per step):

* ``v0`` CREATE TABLE with ``delta.columnMapping.mode = '<mode>'``
* ``v1`` INSERT two rows under the original ``(id, name, amount)`` schema
* ``v2`` RENAME COLUMN ``name`` -> ``full_name``
* ``v3`` ADD COLUMN ``country``
* ``v4`` INSERT two rows under the evolved ``(id, full_name, amount, country)``
* ``v5`` DROP COLUMN ``amount``
* ``v6`` INSERT one row under the final ``(id, full_name, country)`` schema

A snapshot read at the latest version must therefore return the final logical
schema with ``amount`` gone, ``country`` NULL for the rows written before it
existed, and every value resolved through column mapping.
"""

from __future__ import annotations

import sys


# Final logical rows expected from a snapshot read of the latest version.
# Shared with the test via import so the two never drift.
EXPECTED_ROWS = [
{"id": 1, "full_name": "alice", "country": None},
{"id": 2, "full_name": "bob", "country": None},
{"id": 3, "full_name": "carol", "country": "US"},
{"id": 4, "full_name": "dave", "country": "UK"},
{"id": 5, "full_name": "erin", "country": "FR"},
]


def build(table_path: str, mode: str = "name") -> None:
"""Create the column-mapped, schema-evolved table at ``table_path``.

:param mode: ``delta.columnMapping.mode`` for the table: ``name`` or ``id``.
"""
if mode not in ("name", "id"):
raise ValueError(f"unsupported column-mapping mode: {mode!r}")
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = (
SparkSession.builder.appName("feldera-column-mapping-fixture")
.master("local[2]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.ui.showConsoleProgress", "false")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
try:
t = f"delta.`{table_path}`"

spark.sql(
f"""
CREATE TABLE {t} (id BIGINT, name STRING, amount DOUBLE)
USING delta
TBLPROPERTIES ('delta.columnMapping.mode' = '{mode}',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
"""
)
spark.sql(f"INSERT INTO {t} VALUES (1,'alice',10.0),(2,'bob',20.0)")
spark.sql(f"ALTER TABLE {t} RENAME COLUMN name TO full_name")
spark.sql(f"ALTER TABLE {t} ADD COLUMN (country STRING)")
spark.sql(f"INSERT INTO {t} VALUES (3,'carol',30.0,'US'),(4,'dave',40.0,'UK')")
spark.sql(f"ALTER TABLE {t} DROP COLUMN amount")
spark.sql(f"INSERT INTO {t} VALUES (5,'erin','FR')")
finally:
spark.stop()


if __name__ == "__main__":
if len(sys.argv) not in (2, 3):
raise SystemExit("usage: column_mapping.py <output_dir> [name|id]")
build(*sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,24 @@
auto-resolves the matching Delta JAR. Bare ``pyspark`` could write Delta too,
but only by hardcoding the Delta Maven coordinate and Scala suffix and keeping
them in lockstep with the pyspark version — fragile, and no cheaper (the JARs
download at runtime either way). The companion test invokes this file with::
download at runtime either way). ``tests.utils.ensure_delta_spark_fixture``
invokes this file with::

uv run --with "delta-spark>=4.2,<5" python _dv_fixture_builder.py \\
uv run --with "delta-spark>=4.2,<5" python deletion_vectors.py \\
<dest> <total_rows> <expected_active>

It writes ``total_rows`` rows to ``dest`` with DVs enabled, runs a ``DELETE``
on the even ``id`` rows that produces deletion vectors, then asserts that
exactly ``expected_active`` rows remain readable.

The ``_`` prefix keeps pytest from collecting this module as a test; it is only
ever run as a subprocess.
"""

import sys

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip


def main() -> None:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

dest = sys.argv[1]
total_rows = int(sys.argv[2])
expected_active = int(sys.argv[3])
Expand Down
118 changes: 118 additions & 0 deletions python/tests/platform/test_delta_input_column_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Snapshot read of a column-mapped Delta table after schema evolution.

A table with ``delta.columnMapping.mode = 'name'`` stores opaque physical
Parquet column names (``col-<uuid>``) that differ from the logical names; with
``mode = 'id'`` columns are matched by Parquet field ID instead. Either way, a
correct snapshot read must resolve every value through the column-mapping
metadata in the Delta log. PySpark seeds the fixture (it is currently the only
writer that supports column-mapping rename/drop); the builder lives in
``fixtures/column_mapping.py`` and runs via ``uv run --with delta-spark`` so
the Spark/JVM wheels are only fetched on cache miss.
"""

from __future__ import annotations

import json
from pathlib import Path

from feldera import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS

from tests import TEST_CLIENT
from tests.platform.fixtures.column_mapping import EXPECTED_ROWS
from tests.utils import DeltaTestLocation, ensure_delta_spark_fixture

TABLE = "t"
CONNECTOR = "delta_in"
# Bump to invalidate cached MinIO copies when the fixture definition changes.
FIXTURE_VERSION = "v1"

# Spark builder that writes the column-mapped, schema-evolved table. It runs in
# a subprocess (see ensure_delta_spark_fixture) rather than being imported here.
_FIXTURE_BUILDER = Path(__file__).parent / "fixtures" / "column_mapping.py"


def _build_sql(loc: DeltaTestLocation) -> str:
connectors = json.dumps(
[
{
"name": CONNECTOR,
"transport": {
"name": "delta_table_input",
"config": dict(loc.connector_config),
},
}
]
).replace("'", "''")
# The SQL schema declares the final, post-evolution logical columns.
return (
f"CREATE TABLE {TABLE} ("
"id BIGINT NOT NULL,"
"full_name VARCHAR,"
"country VARCHAR"
f") WITH ('materialized' = 'true', 'connectors' = '{connectors}');"
)


def _snapshot_rows(pipeline) -> list[dict]:
rows = pipeline.query(f"SELECT id, full_name, country FROM {TABLE}")
return sorted(
(
{"id": r["id"], "full_name": r["full_name"], "country": r["country"]}
for r in rows
),
key=lambda r: r["id"],
)


def _run_column_mapping_snapshot_test(pipeline_name: str, mapping_mode: str) -> None:
"""A snapshot read of a column-mapped, schema-evolved table returns the
final logical schema and correctly resolves physical column names.

Driven by one wrapper test per ``delta.columnMapping.mode`` (rather than
``pytest.mark.parametrize``) so each case gets a distinct pipeline name —
the ``pipeline_name`` fixture derives the name from the test function, and
parametrized cases sharing one name could collide under ``pytest -n``.
"""
loc = DeltaTestLocation.create(
pipeline_name,
mode="snapshot",
stable_subpath=f"column_mapping_{mapping_mode}_{FIXTURE_VERSION}",
)
try:
ensure_delta_spark_fixture(loc, _FIXTURE_BUILDER, builder_args=(mapping_mode,))

pipeline = PipelineBuilder(
TEST_CLIENT,
pipeline_name,
sql=_build_sql(loc),
runtime_config=RuntimeConfig(
workers=FELDERA_TEST_NUM_WORKERS,
hosts=FELDERA_TEST_NUM_HOSTS,
logging="debug",
),
).create_or_replace()
pipeline.start()
pipeline.wait_for_completion(force_stop=False, timeout_s=600)

rows = _snapshot_rows(pipeline)
assert rows == EXPECTED_ROWS, (
"snapshot read must resolve column-mapped, schema-evolved data: "
"dropped 'amount' gone, 'country' NULL for pre-add rows; "
f"got {rows}"
)

pipeline.stop(force=True)
finally:
loc.cleanup()


def test_delta_input_column_mapping_name_snapshot(pipeline_name):
"""Snapshot read with ``delta.columnMapping.mode = 'name'``."""
_run_column_mapping_snapshot_test(pipeline_name, "name")


def test_delta_input_column_mapping_id_snapshot(pipeline_name):
"""Snapshot read with ``delta.columnMapping.mode = 'id'``."""
_run_column_mapping_snapshot_test(pipeline_name, "id")
81 changes: 11 additions & 70 deletions python/tests/platform/test_delta_input_deletion_vectors.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""Snapshot read of a Delta table with deletion vectors.

PySpark seeds the fixture (delta-rs cannot write DVs). The builder lives in
``_dv_fixture_builder.py`` and runs via ``uv run --with delta-spark`` so the
Spark/JVM wheels are only fetched on cache miss.
``fixtures/deletion_vectors.py`` and runs via ``uv run --with delta-spark`` so
the Spark/JVM wheels are only fetched on cache miss.
"""

from __future__ import annotations

import json
import shutil
import subprocess
import tempfile
from pathlib import Path

from feldera import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS

from tests import TEST_CLIENT
from tests.utils import DeltaTestLocation
from tests.utils import DeltaTestLocation, ensure_delta_spark_fixture


TABLE = "dv_data"
Expand All @@ -29,8 +26,8 @@
FIXTURE_VERSION = "dv_snapshot_v1"

# Spark builder that writes the DV-enabled table. It runs in a subprocess
# (see _ensure_dv_snapshot_fixture) rather than being imported here.
_FIXTURE_BUILDER = Path(__file__).parent / "_dv_fixture_builder.py"
# (see ensure_delta_spark_fixture) rather than being imported here.
_FIXTURE_BUILDER = Path(__file__).parent / "fixtures" / "deletion_vectors.py"


def _log_has_dv_entries(loc: DeltaTestLocation) -> bool:
Expand All @@ -57,67 +54,6 @@ def _log_has_dv_entries(loc: DeltaTestLocation) -> bool:
return False


def _ensure_dv_snapshot_fixture(loc: DeltaTestLocation) -> None:
"""Build the DV fixture if absent; reuse the cached copy otherwise.

The fixture lives at a shared, commit-independent path
(``stable_subpath``), so the Spark build runs at most once per
``FIXTURE_VERSION`` and later runs just read the cached table.
"""
if _log_has_dv_entries(loc):
return

if shutil.which("uv") is None:
raise RuntimeError(
"`uv` is required on PATH to rebuild the DV fixture "
"(builder runs via `uv run --with delta-spark`)."
)

# Stage in a temp dir so a half-finished build cannot leak into the upload.
# Writing DV-enabled Delta tables needs the Delta Lake Spark JARs;
# `delta-spark` is the clean way to pull them plus a matching pyspark
# (see _dv_fixture_builder.py for why not bare pyspark). `uv run --with`
# installs the stack only on this rare rebuild path.
staging = Path(tempfile.mkdtemp(prefix="feldera_dv_stage_"))
try:
subprocess.run(
[
"uv",
"run",
"--with",
"delta-spark>=4.2,<5",
"python",
str(_FIXTURE_BUILDER),
str(staging),
str(TOTAL_ROWS),
str(EXPECTED_ROWS_AFTER_DV),
],
check=True,
)
# Upload data files first, then _delta_log in version order, so a
# reader observing mid-upload never sees a log referencing a missing
# parquet.
if loc.local_dir is not None:
if loc.local_dir.exists():
shutil.rmtree(loc.local_dir)
shutil.copytree(staging, loc.local_dir)
else:
fs = loc._s3_filesystem()
files = [f for f in sorted(staging.rglob("*")) if f.is_file()]
for f in sorted(files, key=lambda p: ("_delta_log" in p.parts, p.name)):
rel = f.relative_to(staging).as_posix()
with fs.open_output_stream(f"{loc.root_path}/{rel}") as out:
out.write(f.read_bytes())
finally:
shutil.rmtree(staging, ignore_errors=True)

if not _log_has_dv_entries(loc):
raise RuntimeError(
f"DV fixture at {loc.uri} has no deletion-vector log entries "
"after upload — partial upload, or DV-stripping middleware?"
)


def _build_sql(loc: DeltaTestLocation) -> str:
connectors = json.dumps(
[
Expand Down Expand Up @@ -147,7 +83,12 @@ def test_delta_input_snapshot_with_deletion_vectors(pipeline_name):
stable_subpath=FIXTURE_VERSION,
)
try:
_ensure_dv_snapshot_fixture(loc)
ensure_delta_spark_fixture(
loc,
_FIXTURE_BUILDER,
[TOTAL_ROWS, EXPECTED_ROWS_AFTER_DV],
is_present=_log_has_dv_entries,
)

pipeline = PipelineBuilder(
TEST_CLIENT,
Expand Down
Loading
Loading