Skip to content

proto: preserve DynamicFilterPhysicalExpr identity across round-trip#21786

Draft
adriangb wants to merge 7 commits intoapache:mainfrom
adriangb:ser-filters
Draft

proto: preserve DynamicFilterPhysicalExpr identity across round-trip#21786
adriangb wants to merge 7 commits intoapache:mainfrom
adriangb:ser-filters

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

Given a plan like

HashJoinExec(dynamic_filter_1 on a@0)
  (...left side of join)
  ProjectionExec(a := Column("a", source_index))
    DataSourceExec
      ParquetSource(predicate = dynamic_filter_2)

after serialize/deserialize the two DynamicFilterPhysicalExpr wrappers should still share the same mutable inner, so that a HashJoinExec update is visible at the pushed-down ParquetSource. Today this breaks for two reasons:

  1. serialize_physical_expr_with_converter calls snapshot_physical_expr, which replaces DynamicFilterPhysicalExpr with its current inner expression (often lit(true)) — identity is lost.
  2. The existing dedup key hashes the outer Arc::as_ptr, but the HashJoinExec side and the ParquetSource side hold different outer Arcs (one comes from with_new_children), so they never share expr_id.

What changes are included in this PR?

  • PhysicalExpr::expression_id(&self) -> Option<u64> added to the trait, defaulting to None. Only DynamicFilterPhysicalExpr reports an identity today.
  • DynamicFilterPhysicalExpr gains a stable u64 id (random at construction). It follows the shared Arc<RwLock<Inner>> across with_new_children — two wrappers that observe the same mutable state report the same id. id is omitted from Debug to keep plan snapshots deterministic.
  • New with_id_and_state(id, children, inner, generation, is_complete) constructor used on the deserialize side to rehydrate the filter. generation and is_complete survive the proto round-trip for fidelity (matches the state that already lives on Inner). state_watch is always fresh on the receiver; cross-process update propagation remains out of scope.
  • New proto message PhysicalDynamicFilterExprNode { id, current_expr, original_children, effective_children, generation, is_complete }. The serializer downcasts DynamicFilterPhysicalExpr and emits this variant directly; the top-level snapshot_physical_expr call is removed. The HashTableLookupExpr → lit(true) replacement still fires (unchanged).
  • DeduplicatingSerializer is now stateless: it stamps expr_id = expr.expression_id(). The old session_id/Arc::as_ptr/pid hashing is dropped, along with the implicit within-process dedup for generic exprs. A follow-up can restore that for specific types (e.g. InList) by implementing expression_id() on them.
  • DeduplicatingDeserializer intercepts DynamicFilter before the generic cache lookup: the cache stores the canonical (un-remapped) wrapper keyed by id, and each call site's effective_children gets applied via with_new_children to produce the site-specific wrapper while sharing the canonical's inner.

Compared to #20416: that PR introduces a generic PhysicalExprId { exact, shallow } and keeps the Arc-ptr default so every expression is stamped. This PR instead makes the identity hook opt-in per type (expression_id()), restricting the blast radius to the one type that actually needs it and letting the follow-up decide per-type whether generic dedup is worth re-introducing.

Are these changes tested?

Yes:

  • dynamic_filters::test::test_expression_id_stable_across_with_new_children — unit test verifying the id survives with_new_children.
  • roundtrip_dynamic_filter_preserves_shared_inner — two wrappers in a BinaryExpr(And) predicate share inner after round-trip; an update() on one is observable via the other.
  • roundtrip_dynamic_filter_preserves_remapped_children — two wrappers with different effective children; each preserves its site-specific projection after round-trip, both share identity, update() propagates, and current() on the remapped side applies the column substitution.
  • roundtrip_dynamic_filter_in_parquet_pushdown — the plan shape from the PR description (FilterExec → ProjectionExec → DataSourceExec(ParquetSource with predicate)). Asserts that the top FilterExec's predicate and the pushed-down ParquetSource predicate end up sharing inner and that an update() at the top is observed at the scan site.

Removed: the pre-existing test_expression_deduplication_arc_sharing, test_deduplication_within_plan_deserialization, test_deduplication_within_expr_deserialization, and two test_session_id_rotation_* tests — they asserted the generic Arc-ptr dedup contract that this PR deliberately drops.

cargo fmt --all and cargo clippy --all-targets --all-features clean on the affected crates. Full cargo test -p datafusion-proto -p datafusion-physical-expr passes.

Are there any user-facing changes?

  • PhysicalExpr::expression_id is a new trait method with a safe default (None), so existing implementations keep compiling.
  • DeduplicatingProtoConverter no longer deduplicates arbitrary expressions by Arc pointer; plans that relied on that happening for, say, a large shared InList will serialize independently until a per-type expression_id() is added. Dynamic filters now round-trip with shared state, which is the primary motivation.
  • New public DynamicFilterPhysicalExpr surface: with_id, with_id_and_state, original_children, raw_inner_expr, raw_inner_state.

🤖 Generated with Claude Code

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates proto Related to proto crate labels Apr 22, 2026
@adriangb adriangb force-pushed the ser-filters branch 2 times, most recently from 2df2293 to 955117c Compare April 23, 2026 00:35
@github-actions github-actions Bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Apr 23, 2026
adriangb and others added 7 commits April 23, 2026 07:13
Without this change a plan with a DynamicFilterPhysicalExpr referenced
from two sites (e.g. a HashJoinExec and a pushed-down ParquetSource
predicate) loses referential integrity when serialized and deserialized:
the filter is snapshotted away at serialize time, and even if it
survived the existing Arc-pointer dedup scheme would give two sites
different ids because the pushed-down side comes from `with_new_children`
with a different outer Arc.

Changes:

- `PhysicalExpr::expression_id(&self) -> Option<u64>` new trait method,
  defaulting to None. Only DynamicFilterPhysicalExpr reports an identity.
- DynamicFilterPhysicalExpr stores a stable random u64 id that follows
  the shared inner Arc through `with_new_children`. Custom Debug hides
  the random id so plan snapshots stay deterministic.
- New proto variant PhysicalDynamicFilterExprNode carrying the current
  expression, the site's children view, generation, and is_complete.
- serialize_physical_expr_with_converter stops calling
  snapshot_physical_expr at the top — dynamic filters survive as
  themselves. HashTableLookupExpr still gets the lit(true) replacement.
- DeduplicatingSerializer is now stateless: it stamps
  `expr_id = expr.expression_id()`. The old session_id/Arc::as_ptr/pid
  hashing is dropped; dedup only fires for expressions that opt in via
  expression_id. Restoring within-process dedup for other types is a
  follow-up (implement expression_id on InList, literals, etc.).
- DeduplicatingDeserializer has a single unified cache path: on miss
  parse + cache, on hit parse once to recover this site's children and
  overlay via `with_new_children` on the cached canonical. This gives
  DynamicFilter its shared-inner semantics without any type-specific
  code in the deserializer.
- Three integration tests in roundtrip_physical_plan.rs cover shared
  inner preservation, per-site remapped children + update propagation
  including column remap, and the FilterExec → ProjectionExec →
  DataSourceExec(ParquetSource) shape from the PR description.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the 5-arg with_id_and_state(id, children, inner, generation,
is_complete) constructor with three fluent setters on Self that match
the HashJoinExecBuilder style used elsewhere in the crate:

    DynamicFilterPhysicalExpr::new(children, inner)
        .with_id(id)
        .with_generation(generation)
        .with_is_complete(is_complete)

new() keeps the default case (random id, generation 1, not complete).
The setters are intended for the deserialize side of proto round-trip
and assume the filter hasn't been shared yet; update() and
mark_complete() remain the correct path for live mutation.

No behavior change — only from_proto is migrated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Change HashJoinExecBuilder::with_dynamic_filter from a crate-private
`Option<HashJoinExecDynamicFilter>` setter into a public API that takes
just `Arc<DynamicFilterPhysicalExpr>` and wraps it internally. The inner
`HashJoinExecDynamicFilter` and its `SharedBuildAccumulator` stay
private; only the filter itself crosses the API boundary.

Promote `dynamic_filter_for_test` to a plain `pub fn dynamic_filter()`
accessor — proto serialization has a legitimate non-test use for it.
Existing test caller migrated.

No behavior change. Sets up HashJoinExec.dynamic_filter to be
round-trippable through proto in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirroring HashJoinExecBuilder::with_dynamic_filter: expose a fluent
setter on SortExec that installs a caller-provided
`Arc<DynamicFilterPhysicalExpr>` as the TopK dynamic filter, replacing
any auto-created one. Add a matching `dynamic_filter()` accessor that
reads through the `TopKDynamicFilters` wrapper.

No behavior change. Sets up SortExec.filter to be round-trippable
through proto in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add optional PhysicalExprNode dynamic_filter field to HashJoinExecNode.
Emit it from to_proto via the new HashJoinExec::dynamic_filter()
accessor; on deserialize, parse it and install via
HashJoinExecBuilder::with_dynamic_filter.

Critically, because the scan-side pushed-down predicate is already in
the id cache by the time we deserialize the HashJoinExec's field, the
cache hit returns the same canonical Arc — the join's filter and the
scan's filter share `inner` automatically. Build-side update() during
execution is observed by the scan, and the scan prunes rows.

Add an end-to-end SQL test in core/tests/physical_optimizer/filter_pushdown.rs
(under a new `proto_roundtrip` mod) that registers a parquet file, runs
an INNER JOIN with WHERE on the build side, round-trips via
DeduplicatingProtoConverter, executes the deserialized plan, and
asserts the probe-side scan emitted strictly fewer rows than the full
table.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add optional PhysicalExprNode dynamic_filter field to SortExecNode.
Emit from to_proto via SortExec::dynamic_filter(); on deserialize parse
it and install via SortExec::with_dynamic_filter after the usual
with_fetch / with_preserve_partitioning chain. The with_fetch step may
auto-create a TopK filter when fetch is set; with_dynamic_filter then
replaces it with the one from the sender so the id matches the
pushed-down scan's copy (shared via the id cache).

Add an end-to-end SQL test in
core/tests/physical_optimizer/filter_pushdown.rs that writes two
single-row parquet files (`a.parquet` with key=1, `b.parquet` with
key=2), runs `ORDER BY n_nationkey ASC LIMIT 1` with
`target_partitions=1`, round-trips via DeduplicatingProtoConverter,
executes, and asserts the scan emitted exactly 1 row — b.parquet was
pruned by row-group statistics after TopK saw a's row and tightened
the shared dynamic filter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Serialize dynamic filters across network boundaries

1 participant