diff --git a/python/tests/platform/fixtures/__init__.py b/python/tests/platform/fixtures/__init__.py new file mode 100644 index 00000000000..595cfbee8c1 --- /dev/null +++ b/python/tests/platform/fixtures/__init__.py @@ -0,0 +1,8 @@ +"""PySpark builders for Delta table test fixtures. + +Each module here writes a Delta table that exercises a feature only Delta +Spark can produce (deletion vectors, column-mapping schema evolution). They +run as subprocesses via ``tests.utils.ensure_delta_spark_fixture`` and keep +their PySpark imports function-local, so importing this package never pulls in +the JVM/Spark stack. +""" diff --git a/python/tests/platform/fixtures/column_mapping.py b/python/tests/platform/fixtures/column_mapping.py new file mode 100644 index 00000000000..30a4564d84b --- /dev/null +++ b/python/tests/platform/fixtures/column_mapping.py @@ -0,0 +1,97 @@ +"""Build a column-mapped, schema-evolved Delta table fixture with PySpark. + +``tests.utils.ensure_delta_spark_fixture`` runs this as a subprocess +(``uv run --with "delta-spark>=4.2,<5" python column_mapping.py +``), where ```` is a ``delta.columnMapping.mode`` value: ``name`` +(physical Parquet columns are renamed to ``col-``) or ``id`` (columns are +matched by Parquet field ID). PySpark is currently the only writer that can +produce a Delta table with column mapping enabled *and* perform the rename / +drop schema-evolution operations that make logical column names diverge from +the physical ones; neither ``delta-rs`` nor the ``deltalake`` Python wheel can. +DROP COLUMN under column mapping requires delta-spark 4.x (Delta writer v5, +reader v2). PySpark imports stay function-local so importing this module (for +``EXPECTED_ROWS``) never pulls in the JVM/Spark stack. + +Changing this builder changes the fixture it produces, but a cached fixture is +reused based on its path alone — bump ``FIXTURE_VERSION`` in +``test_delta_input_column_mapping.py`` on any builder change. + +The resulting table's history (one commit per step): + +* ``v0`` CREATE TABLE with ``delta.columnMapping.mode = ''`` +* ``v1`` INSERT two rows under the original ``(id, name, amount)`` schema +* ``v2`` RENAME COLUMN ``name`` -> ``full_name`` +* ``v3`` ADD COLUMN ``country`` +* ``v4`` INSERT two rows under the evolved ``(id, full_name, amount, country)`` +* ``v5`` DROP COLUMN ``amount`` +* ``v6`` INSERT one row under the final ``(id, full_name, country)`` schema + +A snapshot read at the latest version must therefore return the final logical +schema with ``amount`` gone, ``country`` NULL for the rows written before it +existed, and every value resolved through column mapping. +""" + +from __future__ import annotations + +import sys + + +# Final logical rows expected from a snapshot read of the latest version. +# Shared with the test via import so the two never drift. +EXPECTED_ROWS = [ + {"id": 1, "full_name": "alice", "country": None}, + {"id": 2, "full_name": "bob", "country": None}, + {"id": 3, "full_name": "carol", "country": "US"}, + {"id": 4, "full_name": "dave", "country": "UK"}, + {"id": 5, "full_name": "erin", "country": "FR"}, +] + + +def build(table_path: str, mode: str = "name") -> None: + """Create the column-mapped, schema-evolved table at ``table_path``. + + :param mode: ``delta.columnMapping.mode`` for the table: ``name`` or ``id``. + """ + if mode not in ("name", "id"): + raise ValueError(f"unsupported column-mapping mode: {mode!r}") + from delta import configure_spark_with_delta_pip + from pyspark.sql import SparkSession + + builder = ( + SparkSession.builder.appName("feldera-column-mapping-fixture") + .master("local[2]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + .config("spark.ui.showConsoleProgress", "false") + ) + spark = configure_spark_with_delta_pip(builder).getOrCreate() + spark.sparkContext.setLogLevel("ERROR") + try: + t = f"delta.`{table_path}`" + + spark.sql( + f""" + CREATE TABLE {t} (id BIGINT, name STRING, amount DOUBLE) + USING delta + TBLPROPERTIES ('delta.columnMapping.mode' = '{mode}', + 'delta.minReaderVersion' = '2', + 'delta.minWriterVersion' = '5') + """ + ) + spark.sql(f"INSERT INTO {t} VALUES (1,'alice',10.0),(2,'bob',20.0)") + spark.sql(f"ALTER TABLE {t} RENAME COLUMN name TO full_name") + spark.sql(f"ALTER TABLE {t} ADD COLUMN (country STRING)") + spark.sql(f"INSERT INTO {t} VALUES (3,'carol',30.0,'US'),(4,'dave',40.0,'UK')") + spark.sql(f"ALTER TABLE {t} DROP COLUMN amount") + spark.sql(f"INSERT INTO {t} VALUES (5,'erin','FR')") + finally: + spark.stop() + + +if __name__ == "__main__": + if len(sys.argv) not in (2, 3): + raise SystemExit("usage: column_mapping.py [name|id]") + build(*sys.argv[1:]) diff --git a/python/tests/platform/_dv_fixture_builder.py b/python/tests/platform/fixtures/deletion_vectors.py similarity index 87% rename from python/tests/platform/_dv_fixture_builder.py rename to python/tests/platform/fixtures/deletion_vectors.py index f0fd4ed0a90..c928f9641e5 100644 --- a/python/tests/platform/_dv_fixture_builder.py +++ b/python/tests/platform/fixtures/deletion_vectors.py @@ -7,26 +7,24 @@ auto-resolves the matching Delta JAR. Bare ``pyspark`` could write Delta too, but only by hardcoding the Delta Maven coordinate and Scala suffix and keeping them in lockstep with the pyspark version — fragile, and no cheaper (the JARs -download at runtime either way). The companion test invokes this file with:: +download at runtime either way). ``tests.utils.ensure_delta_spark_fixture`` +invokes this file with:: - uv run --with "delta-spark>=4.2,<5" python _dv_fixture_builder.py \\ + uv run --with "delta-spark>=4.2,<5" python deletion_vectors.py \\ It writes ``total_rows`` rows to ``dest`` with DVs enabled, runs a ``DELETE`` on the even ``id`` rows that produces deletion vectors, then asserts that exactly ``expected_active`` rows remain readable. - -The ``_`` prefix keeps pytest from collecting this module as a test; it is only -ever run as a subprocess. """ import sys -from pyspark.sql import SparkSession -from delta import configure_spark_with_delta_pip - def main() -> None: + from delta import configure_spark_with_delta_pip + from pyspark.sql import SparkSession + dest = sys.argv[1] total_rows = int(sys.argv[2]) expected_active = int(sys.argv[3]) diff --git a/python/tests/platform/test_delta_input_column_mapping.py b/python/tests/platform/test_delta_input_column_mapping.py new file mode 100644 index 00000000000..970685933ce --- /dev/null +++ b/python/tests/platform/test_delta_input_column_mapping.py @@ -0,0 +1,118 @@ +"""Snapshot read of a column-mapped Delta table after schema evolution. + +A table with ``delta.columnMapping.mode = 'name'`` stores opaque physical +Parquet column names (``col-``) that differ from the logical names; with +``mode = 'id'`` columns are matched by Parquet field ID instead. Either way, a +correct snapshot read must resolve every value through the column-mapping +metadata in the Delta log. PySpark seeds the fixture (it is currently the only +writer that supports column-mapping rename/drop); the builder lives in +``fixtures/column_mapping.py`` and runs via ``uv run --with delta-spark`` so +the Spark/JVM wheels are only fetched on cache miss. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from feldera import PipelineBuilder +from feldera.runtime_config import RuntimeConfig +from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS + +from tests import TEST_CLIENT +from tests.platform.fixtures.column_mapping import EXPECTED_ROWS +from tests.utils import DeltaTestLocation, ensure_delta_spark_fixture + +TABLE = "t" +CONNECTOR = "delta_in" +# Bump to invalidate cached MinIO copies when the fixture definition changes. +FIXTURE_VERSION = "v1" + +# Spark builder that writes the column-mapped, schema-evolved table. It runs in +# a subprocess (see ensure_delta_spark_fixture) rather than being imported here. +_FIXTURE_BUILDER = Path(__file__).parent / "fixtures" / "column_mapping.py" + + +def _build_sql(loc: DeltaTestLocation) -> str: + connectors = json.dumps( + [ + { + "name": CONNECTOR, + "transport": { + "name": "delta_table_input", + "config": dict(loc.connector_config), + }, + } + ] + ).replace("'", "''") + # The SQL schema declares the final, post-evolution logical columns. + return ( + f"CREATE TABLE {TABLE} (" + "id BIGINT NOT NULL," + "full_name VARCHAR," + "country VARCHAR" + f") WITH ('materialized' = 'true', 'connectors' = '{connectors}');" + ) + + +def _snapshot_rows(pipeline) -> list[dict]: + rows = pipeline.query(f"SELECT id, full_name, country FROM {TABLE}") + return sorted( + ( + {"id": r["id"], "full_name": r["full_name"], "country": r["country"]} + for r in rows + ), + key=lambda r: r["id"], + ) + + +def _run_column_mapping_snapshot_test(pipeline_name: str, mapping_mode: str) -> None: + """A snapshot read of a column-mapped, schema-evolved table returns the + final logical schema and correctly resolves physical column names. + + Driven by one wrapper test per ``delta.columnMapping.mode`` (rather than + ``pytest.mark.parametrize``) so each case gets a distinct pipeline name — + the ``pipeline_name`` fixture derives the name from the test function, and + parametrized cases sharing one name could collide under ``pytest -n``. + """ + loc = DeltaTestLocation.create( + pipeline_name, + mode="snapshot", + stable_subpath=f"column_mapping_{mapping_mode}_{FIXTURE_VERSION}", + ) + try: + ensure_delta_spark_fixture(loc, _FIXTURE_BUILDER, builder_args=(mapping_mode,)) + + pipeline = PipelineBuilder( + TEST_CLIENT, + pipeline_name, + sql=_build_sql(loc), + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + hosts=FELDERA_TEST_NUM_HOSTS, + logging="debug", + ), + ).create_or_replace() + pipeline.start() + pipeline.wait_for_completion(force_stop=False, timeout_s=600) + + rows = _snapshot_rows(pipeline) + assert rows == EXPECTED_ROWS, ( + "snapshot read must resolve column-mapped, schema-evolved data: " + "dropped 'amount' gone, 'country' NULL for pre-add rows; " + f"got {rows}" + ) + + pipeline.stop(force=True) + finally: + loc.cleanup() + + +def test_delta_input_column_mapping_name_snapshot(pipeline_name): + """Snapshot read with ``delta.columnMapping.mode = 'name'``.""" + _run_column_mapping_snapshot_test(pipeline_name, "name") + + +def test_delta_input_column_mapping_id_snapshot(pipeline_name): + """Snapshot read with ``delta.columnMapping.mode = 'id'``.""" + _run_column_mapping_snapshot_test(pipeline_name, "id") diff --git a/python/tests/platform/test_delta_input_deletion_vectors.py b/python/tests/platform/test_delta_input_deletion_vectors.py index fc3b2e80cd7..686feada379 100644 --- a/python/tests/platform/test_delta_input_deletion_vectors.py +++ b/python/tests/platform/test_delta_input_deletion_vectors.py @@ -1,16 +1,13 @@ """Snapshot read of a Delta table with deletion vectors. PySpark seeds the fixture (delta-rs cannot write DVs). The builder lives in -``_dv_fixture_builder.py`` and runs via ``uv run --with delta-spark`` so the -Spark/JVM wheels are only fetched on cache miss. +``fixtures/deletion_vectors.py`` and runs via ``uv run --with delta-spark`` so +the Spark/JVM wheels are only fetched on cache miss. """ from __future__ import annotations import json -import shutil -import subprocess -import tempfile from pathlib import Path from feldera import PipelineBuilder @@ -18,7 +15,7 @@ from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS from tests import TEST_CLIENT -from tests.utils import DeltaTestLocation +from tests.utils import DeltaTestLocation, ensure_delta_spark_fixture TABLE = "dv_data" @@ -29,8 +26,8 @@ FIXTURE_VERSION = "dv_snapshot_v1" # Spark builder that writes the DV-enabled table. It runs in a subprocess -# (see _ensure_dv_snapshot_fixture) rather than being imported here. -_FIXTURE_BUILDER = Path(__file__).parent / "_dv_fixture_builder.py" +# (see ensure_delta_spark_fixture) rather than being imported here. +_FIXTURE_BUILDER = Path(__file__).parent / "fixtures" / "deletion_vectors.py" def _log_has_dv_entries(loc: DeltaTestLocation) -> bool: @@ -57,67 +54,6 @@ def _log_has_dv_entries(loc: DeltaTestLocation) -> bool: return False -def _ensure_dv_snapshot_fixture(loc: DeltaTestLocation) -> None: - """Build the DV fixture if absent; reuse the cached copy otherwise. - - The fixture lives at a shared, commit-independent path - (``stable_subpath``), so the Spark build runs at most once per - ``FIXTURE_VERSION`` and later runs just read the cached table. - """ - if _log_has_dv_entries(loc): - return - - if shutil.which("uv") is None: - raise RuntimeError( - "`uv` is required on PATH to rebuild the DV fixture " - "(builder runs via `uv run --with delta-spark`)." - ) - - # Stage in a temp dir so a half-finished build cannot leak into the upload. - # Writing DV-enabled Delta tables needs the Delta Lake Spark JARs; - # `delta-spark` is the clean way to pull them plus a matching pyspark - # (see _dv_fixture_builder.py for why not bare pyspark). `uv run --with` - # installs the stack only on this rare rebuild path. - staging = Path(tempfile.mkdtemp(prefix="feldera_dv_stage_")) - try: - subprocess.run( - [ - "uv", - "run", - "--with", - "delta-spark>=4.2,<5", - "python", - str(_FIXTURE_BUILDER), - str(staging), - str(TOTAL_ROWS), - str(EXPECTED_ROWS_AFTER_DV), - ], - check=True, - ) - # Upload data files first, then _delta_log in version order, so a - # reader observing mid-upload never sees a log referencing a missing - # parquet. - if loc.local_dir is not None: - if loc.local_dir.exists(): - shutil.rmtree(loc.local_dir) - shutil.copytree(staging, loc.local_dir) - else: - fs = loc._s3_filesystem() - files = [f for f in sorted(staging.rglob("*")) if f.is_file()] - for f in sorted(files, key=lambda p: ("_delta_log" in p.parts, p.name)): - rel = f.relative_to(staging).as_posix() - with fs.open_output_stream(f"{loc.root_path}/{rel}") as out: - out.write(f.read_bytes()) - finally: - shutil.rmtree(staging, ignore_errors=True) - - if not _log_has_dv_entries(loc): - raise RuntimeError( - f"DV fixture at {loc.uri} has no deletion-vector log entries " - "after upload — partial upload, or DV-stripping middleware?" - ) - - def _build_sql(loc: DeltaTestLocation) -> str: connectors = json.dumps( [ @@ -147,7 +83,12 @@ def test_delta_input_snapshot_with_deletion_vectors(pipeline_name): stable_subpath=FIXTURE_VERSION, ) try: - _ensure_dv_snapshot_fixture(loc) + ensure_delta_spark_fixture( + loc, + _FIXTURE_BUILDER, + [TOTAL_ROWS, EXPECTED_ROWS_AFTER_DV], + is_present=_log_has_dv_entries, + ) pipeline = PipelineBuilder( TEST_CLIENT, diff --git a/python/tests/utils.py b/python/tests/utils.py index e642fb3c5a9..a279a606f0b 100644 --- a/python/tests/utils.py +++ b/python/tests/utils.py @@ -2,9 +2,11 @@ import os import pathlib import shutil +import subprocess import tempfile import time import uuid +from collections.abc import Callable, Sequence from dataclasses import dataclass from urllib.parse import urlparse @@ -248,6 +250,41 @@ def row_count(self, missing_ok: bool = False) -> int: raise return dt.count() + def delta_log_exists(self) -> bool: + """Return True when a Delta log is present at this location. + + Used to decide whether a cached fixture (see ``stable_subpath``) can + be reused instead of rebuilt. + """ + try: + return len(self.log_json_paths()) > 0 + except FileNotFoundError: + return False + + def _place_tree(self, staging: pathlib.Path) -> None: + """Copy a locally-built Delta table tree to where this location stores + its data: the local directory, or the S3/MinIO bucket, depending on + how this location was created. + + Some fixtures can only be produced on the local filesystem (e.g. a + PySpark-written table). For a local target, any existing content at + ``self.local_dir`` is deleted before the copy. S3/MinIO targets get + the data files first and ``_delta_log`` last, so a reader observing + the upload mid-flight never sees a log referencing a missing parquet. + """ + if self.local_dir is not None: + if self.local_dir.exists(): + shutil.rmtree(self.local_dir) + shutil.copytree(staging, self.local_dir) + return + + fs = self._s3_filesystem() + files = [path for path in sorted(staging.rglob("*")) if path.is_file()] + for path in sorted(files, key=lambda p: ("_delta_log" in p.parts, p.name)): + rel = path.relative_to(staging).as_posix() + with fs.open_output_stream(f"{self.root_path}/{rel}") as out: + out.write(path.read_bytes()) + def cleanup(self) -> None: """Remove the local temp directory, if any. @@ -266,6 +303,79 @@ def cleanup(self) -> None: self.local_dir = None +def ensure_delta_spark_fixture( + loc: DeltaTestLocation, + builder_script: str | os.PathLike[str], + builder_args: Sequence[object] = (), + *, + delta_spark_spec: str = "delta-spark>=4.2,<5", + is_present: Callable[[DeltaTestLocation], bool] | None = None, +) -> None: + """Ensure a PySpark-authored Delta fixture exists at ``loc`` (cached). + + Some Delta features (deletion vectors, column-mapping schema evolution) + can only be written by Delta Spark, not by delta-rs or the ``deltalake`` + wheel. This builds such a fixture once and reuses it: + + * If the fixture is already present (``is_present``, defaulting to + :meth:`DeltaTestLocation.delta_log_exists`), do nothing — so a + ``stable_subpath`` cache is reused across runs. + * Otherwise run ``builder_script`` in an isolated environment + (``uv run --no-project --with python + *builder_args``), staging into a temp dir so a half-finished + build can never leak into the upload, then place the tree onto ``loc``'s + backend. ``--no-project`` keeps the builder hermetic: it depends only on + ``delta_spark_spec``, never on building the enclosing project. + + The heavy PySpark + JVM stack is pulled only on this rare rebuild path. + + :param builder_script: Path to a standalone script that writes a Delta + table to the directory given as its first argument. + :param builder_args: Extra positional arguments passed after the staging + directory. Each is stringified verbatim with ``str()`` — pass + primitives; ``None`` would become the literal string ``"None"``. + :param is_present: Predicate deciding whether the fixture already exists; + also re-checked after upload to catch partial uploads. + """ + present = ( + is_present if is_present is not None else DeltaTestLocation.delta_log_exists + ) + if present(loc): + return + + if shutil.which("uv") is None: + raise RuntimeError( + "`uv` is required on PATH to build the PySpark Delta fixture " + f"(builder runs via `uv run --with {delta_spark_spec}`)." + ) + + staging = pathlib.Path(tempfile.mkdtemp(prefix="feldera_delta_fixture_")) + try: + subprocess.run( + [ + "uv", + "run", + "--no-project", + "--with", + delta_spark_spec, + "python", + str(builder_script), + str(staging), + *(str(arg) for arg in builder_args), + ], + check=True, + ) + loc._place_tree(staging) + finally: + shutil.rmtree(staging, ignore_errors=True) + + if not present(loc): + raise RuntimeError( + f"Delta fixture at {loc.uri} is still absent after the builder " + "ran and the tree was uploaded." + ) + + def wait_for_condition( description: str, predicate_func,