[py]: delta input snapshot test for column mapping#6376
Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Nice extraction. The ensure_delta_spark_fixture helper is the right level of abstraction — both the DV and column-mapping tests now read like a one-liner, the heavy uv+Spark stack stays gated behind cache-miss, and the _delta_log-last upload ordering invariant is preserved as a single method on DeltaTestLocation instead of being copy-pasted. EXPECTED_ROWS exported from fixtures/column_mapping.py so the test and the builder can't drift is exactly right.
A few non-blocking observations:
-
The column-mapping test's default
is_presentisDeltaTestLocation.table_exists, i.e. "any Delta log entries here at all". If a previous, differently-shaped fixture ever lands at the samestable_subpathyou'd silently reuse it.FIXTURE_VERSION = "column_mapping_v1"is the intended escape hatch, but a stronger predicate (e.g. "log v6 exists AND has a drop-column metadata op") would catch the case where someone tweaks the builder and forgets to bump the version. The DV test does exactly this via_log_has_dv_entries. Worth either documenting "you must bump FIXTURE_VERSION on any builder change" incolumn_mapping.py, or pointing the predicate at something the schema-evolution path uniquely produces. -
ALTER TABLE ... DROP COLUMNunder column mapping requires Delta writer v5 + reader v2 (which you set) plus thecolumnMappingtable feature; on Spark 4.x Delta 4.x this is fine, but thedelta-spark>=4.2,<5pin in_FIXTURE_BUILDER's docstring is what guarantees it. If anyone ever relaxes that pin, the fixture will start failing in a confusing way. A one-line note in the builder ("requires delta-spark>=4.x for ALTER COLUMN under column mapping") would help future-you. -
ensure_delta_spark_fixture(builder_args=(...))stringifies every arg viastr(arg), so aNonebecomes the literal"None". Today both callers pass ints, but a single-line note in the docstring ("args are stringified verbatim — pass primitives, not None") would head off the eventual footgun. -
Tiny:
test_delta_input_column_mapping.pybuilds a SQL fragment that re-implements connector JSON wrapping that's near-identical to the DV test (json.dumps([...]).replace("'", "''")). If you have a third call site coming, that's a candidate for atests.utilshelper too — not for this PR.
Approve.
| raise | ||
| return dt.count() | ||
|
|
||
| def table_exists(self) -> bool: |
There was a problem hiding this comment.
If the comment is right I would call this function "delta_log_exists"
| """Place a locally-built Delta table tree onto this location's backend. | ||
|
|
||
| Some fixtures can only be produced on the local filesystem (e.g. a | ||
| PySpark-written table) yet must be read back from the configured |
There was a problem hiding this comment.
"configured"? This comment seems to imply a bunch of stuff that I don't understand.
| if not present(loc): | ||
| raise RuntimeError( | ||
| f"Delta fixture at {loc.uri} is still absent after upload — " | ||
| "partial upload, or content-stripping middleware?" |
There was a problem hiding this comment.
Does this guess about who did it help in any way?
Is it always right?
| Parquet column names (``col-<uuid>``) that differ from the logical names, so a | ||
| correct snapshot read must resolve every value through the column-mapping | ||
| metadata in the Delta log. PySpark seeds the fixture (it is the only writer | ||
| that supports column-mapping rename/drop); the builder lives in |
| f""" | ||
| CREATE TABLE {t} (id BIGINT, name STRING, amount DOUBLE) | ||
| USING delta | ||
| TBLPROPERTIES ('delta.columnMapping.mode' = 'name', |
There was a problem hiding this comment.
I think we need to test the id mode as well.
Test delta input connector in snapshot mode on a table with column mapping (columnMapping.mode='name') with rename/add/drop schema evolution, asserting snapshot reads resolve values through the column-mapping metadata. Factor the shared fixture orchestration out of the deletion-vectors test into tests.utils.ensure_delta_spark_fixture (used by both), move the Spark builders under tests/platform/fixtures/ Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
3b98419 to
bda9434
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Re-APPROVE on bda9434.
All four prior nits addressed:
- Default presence check renamed
table_exists->delta_log_exists(utils.py), and the column-mapping path now has its ownFIXTURE_VERSIONbumped on builder changes — the dependency between builder mutation and cache invalidation is now spelled out in the builder module docstring. - Builder docstring now states the delta-spark 4.x requirement explicitly ("DROP COLUMN under column mapping requires delta-spark 4.x (Delta writer v5, reader v2)").
builder_argsstringification semantics documented onensure_delta_spark_fixture— each arg goes throughstr()verbatim,Nonewould render as the literal"None". Cheap, clear, no validator wedged in.- Coverage expanded by adding the
mode='id'case via a_run_column_mapping_snapshot_testhelper rather thanpytest.mark.parametrize, with an inline rationale (parametrize would collide pipeline names underpytest -n). Good call.
Bonus cleanup: --no-project on uv run keeps the builder hermetic; the failure message no longer speculates about content-stripping middleware.
LGTM.
Test delta input connector in snapshot mode on a table with column mapping (columnMapping.mode='name') with rename/add/drop schema evolution, asserting snapshot reads resolve values through the column-mapping metadata.
Factor the shared fixture orchestration out of the deletion-vectors test into tests.utils.ensure_delta_spark_fixture (used by both), move the Spark builders under tests/platform/fixtures/
Describe Manual Test Plan
Ran it locally
Checklist
Breaking Changes?
Adding a test, no breaking change