From f9b205e662bdcdffc46a20880bbf35954fadff52 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 22 Apr 2026 19:35:28 -0500 Subject: [PATCH 1/7] proto: preserve DynamicFilterPhysicalExpr identity across round-trip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without this change a plan with a DynamicFilterPhysicalExpr referenced from two sites (e.g. a HashJoinExec and a pushed-down ParquetSource predicate) loses referential integrity when serialized and deserialized: the filter is snapshotted away at serialize time, and even if it survived the existing Arc-pointer dedup scheme would give two sites different ids because the pushed-down side comes from `with_new_children` with a different outer Arc. Changes: - `PhysicalExpr::expression_id(&self) -> Option` new trait method, defaulting to None. Only DynamicFilterPhysicalExpr reports an identity. - DynamicFilterPhysicalExpr stores a stable random u64 id that follows the shared inner Arc through `with_new_children`. Custom Debug hides the random id so plan snapshots stay deterministic. - New proto variant PhysicalDynamicFilterExprNode carrying the current expression, the site's children view, generation, and is_complete. - serialize_physical_expr_with_converter stops calling snapshot_physical_expr at the top — dynamic filters survive as themselves. HashTableLookupExpr still gets the lit(true) replacement. - DeduplicatingSerializer is now stateless: it stamps `expr_id = expr.expression_id()`. The old session_id/Arc::as_ptr/pid hashing is dropped; dedup only fires for expressions that opt in via expression_id. Restoring within-process dedup for other types is a follow-up (implement expression_id on InList, literals, etc.). - DeduplicatingDeserializer has a single unified cache path: on miss parse + cache, on hit parse once to recover this site's children and overlay via `with_new_children` on the cached canonical. This gives DynamicFilter its shared-inner semantics without any type-specific code in the deserializer. - Three integration tests in roundtrip_physical_plan.rs cover shared inner preservation, per-site remapped children + update propagation including column remap, and the FilterExec → ProjectionExec → DataSourceExec(ParquetSource) shape from the PR description. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical-expr-common/src/physical_expr.rs | 15 + datafusion/physical-expr/Cargo.toml | 2 +- .../src/expressions/dynamic_filters.rs | 103 +++- datafusion/proto/proto/datafusion.proto | 32 ++ datafusion/proto/src/generated/pbjson.rs | 162 ++++++ datafusion/proto/src/generated/prost.rs | 40 +- .../proto/src/physical_plan/from_proto.rs | 38 ++ datafusion/proto/src/physical_plan/mod.rs | 105 ++-- .../proto/src/physical_plan/to_proto.rs | 29 +- .../tests/cases/roundtrip_physical_plan.rs | 520 +++++++++--------- 10 files changed, 700 insertions(+), 346 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a42a1560cb769..19f46440a1eb5 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -413,6 +413,21 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { 0 } + /// Return a stable, globally-unique identifier for this `PhysicalExpr`, if it has one. + /// + /// Expressions that carry shared mutable state (e.g. `DynamicFilterPhysicalExpr`) + /// use this to preserve their identity across proto serialize/deserialize round-trips + /// so that multiple references in a plan continue to point at the same state. + /// + /// The id must be stable across [`PhysicalExpr::with_new_children`] — it follows the + /// underlying shared state, not the outer wrapper. + /// + /// Default is `None`: the expression has no identity worth preserving across a + /// serialization boundary, and consumers should fall back to structural equality. + fn expression_id(&self) -> Option { + None + } + /// Returns true if the expression node is volatile, i.e. whether it can return /// different results when evaluated multiple times with the same input. /// diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b755353d75658..41be7b18b9119 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } petgraph = "0.8.3" +rand = { workspace = true } recursive = { workspace = true, optional = true } tokio = { workspace = true } half = { workspace = true } @@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } datafusion-functions = { workspace = true } insta = { workspace = true } -rand = { workspace = true } rstest = { workspace = true } [[bench]] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..cab756b484cf2 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -55,8 +55,16 @@ impl FilterState { /// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog] /// /// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters -#[derive(Debug)] pub struct DynamicFilterPhysicalExpr { + /// Stable identifier shared by all wrappers that observe the same `inner`. + /// Generated once at construction; copied verbatim through `with_new_children` + /// so that every `Arc` pointing at the same mutable + /// state reports the same id. + /// + /// Kept on the outer struct (rather than inside `Inner`) so that + /// `expression_id()` is a pure field read and does not require acquiring the + /// `inner` RwLock — the id never changes after construction anyway. + id: u64, /// The original children of this PhysicalExpr, if any. /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) /// and later remapped to the actual expressions that are being filtered. @@ -89,16 +97,6 @@ struct Inner { } impl Inner { - fn new(expr: Arc) -> Self { - Self { - // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0. - // This is not currently used anywhere but it seems useful to have this simple distinction. - generation: 1, - expr, - is_complete: false, - } - } - /// Clone the inner expression. fn expr(&self) -> &Arc { &self.expr @@ -137,6 +135,23 @@ impl Display for DynamicFilterPhysicalExpr { } } +impl std::fmt::Debug for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // `id` is a random per-instance value; including it in Debug output + // makes plan snapshots nondeterministic across runs and across + // proto round-trips (receiver reconstructs the filter with the same + // id but the Debug representation would still differ from any freshly + // constructed filter on another machine). Skip it here. + f.debug_struct("DynamicFilterPhysicalExpr") + .field("children", &self.children) + .field("remapped_children", &self.remapped_children) + .field("inner", &self.inner) + .field("data_type", &self.data_type) + .field("nullable", &self.nullable) + .finish_non_exhaustive() + } +} + impl DynamicFilterPhysicalExpr { /// Create a new [`DynamicFilterPhysicalExpr`] /// from an initial expression and a list of children. @@ -169,11 +184,39 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { - let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); + Self::with_id_and_state(rand::random(), children, inner, 1, false) + } + + /// Construct a `DynamicFilterPhysicalExpr` with all identity + mutable-state + /// fields supplied by the caller. Used on the deserialize side of proto + /// round-trip to rehydrate a filter with the id, generation counter, and + /// completion flag captured at serialization time. + /// + /// The `state_watch` channel is always fresh: cross-process update + /// propagation is not provided by proto, so waiters registered on the + /// sender do not carry across. + pub fn with_id_and_state( + id: u64, + children: Vec>, + inner: Arc, + generation: u64, + is_complete: bool, + ) -> Self { + let initial_state = if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }; + let (state_watch, _) = watch::channel(initial_state); Self { + id, children, - remapped_children: None, // Initially no remapped children - inner: Arc::new(RwLock::new(Inner::new(inner))), + remapped_children: None, + inner: Arc::new(RwLock::new(Inner { + generation, + expr: inner, + is_complete, + })), state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), @@ -214,6 +257,12 @@ impl DynamicFilterPhysicalExpr { self.inner.read().generation } + /// Whether [`Self::mark_complete`] has been called on this filter's shared + /// state. + pub fn is_complete(&self) -> bool { + self.inner.read().is_complete + } + /// Get the current expression. /// This will return the current expression with any children /// remapped to match calls to [`PhysicalExpr::with_new_children`]. @@ -362,6 +411,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: Vec>, ) -> Result> { Ok(Arc::new(Self { + id: self.id, children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), @@ -444,6 +494,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expression_id(&self) -> Option { + Some(self.id) + } } #[cfg(test)] @@ -861,4 +915,25 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + /// `expression_id` identifies the shared `inner` state, so it must be + /// identical on any wrapper produced by `with_new_children` — even one + /// whose `children()` view has been remapped. This is what lets proto + /// serialization link the two sides after deserialization. + #[test] + fn test_expression_id_stable_across_with_new_children() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![col("a", &schema).unwrap()], + lit(true) as Arc, + )); + let id = filter + .expression_id() + .expect("dynamic filter must have an id"); + + let remapped = Arc::clone(&filter) + .with_new_children(vec![col("a", &schema).unwrap()]) + .expect("with_new_children should succeed"); + assert_eq!(remapped.expression_id(), Some(id)); + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c61226fb526f6..0a55058ad30ab 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -920,6 +920,8 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterExprNode dynamic_filter = 22; } } @@ -1047,6 +1049,36 @@ message PhysicalHashExprNode { string description = 6; } +// A DynamicFilterPhysicalExpr: a filter whose inner expression is shared, +// mutable state owned by a producer (e.g. a HashJoinExec build side). +// +// Multiple `PhysicalExprNode`s in a plan can reference the same dynamic filter; +// they are linked on the deserialize side via `id`. The first site to be +// deserialized becomes the canonical wrapper and is cached; each subsequent +// site overlays its own `children` on the canonical via +// `PhysicalExpr::with_new_children`, preserving the canonical's mutable state +// while keeping per-site projections. +message PhysicalDynamicFilterExprNode { + // The filter expression as visible at this call site (equivalent to calling + // `current()` on the `DynamicFilterPhysicalExpr`, with any per-site child + // remapping already applied). + // + // Identity (for linking multiple call sites to the same shared state) is + // carried on the enclosing `PhysicalExprNode.expr_id` — populated by the + // `DeduplicatingProtoConverter` from `PhysicalExpr::expression_id`. + PhysicalExprNode current_expr = 1; + // The children visible at this call site — what `PhysicalExpr::children` + // returns. These are the `current_expr`'s leaf column references. + repeated PhysicalExprNode children = 2; + // Generation counter captured at serialization time. The receiver installs + // it on the reconstructed `Inner` so subsequent local `update()` calls keep + // a monotonic sequence. Does not propagate live updates from the sender. + uint64 generation = 3; + // Whether the sender had already called `mark_complete()` when this + // expression was serialized. + bool is_complete = 4; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 82bcdac898204..faee84dca2e2d 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16531,6 +16531,154 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterExprNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.current_expr.is_some() { + len += 1; + } + if !self.children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.is_complete { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterExprNode", len)?; + if let Some(v) = self.current_expr.as_ref() { + struct_ser.serialize_field("currentExpr", v)?; + } + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterExprNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "current_expr", + "currentExpr", + "children", + "generation", + "is_complete", + "isComplete", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + CurrentExpr, + Children, + Generation, + IsComplete, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "currentExpr" | "current_expr" => Ok(GeneratedField::CurrentExpr), + "children" => Ok(GeneratedField::Children), + "generation" => Ok(GeneratedField::Generation), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterExprNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterExprNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut current_expr__ = None; + let mut children__ = None; + let mut generation__ = None; + let mut is_complete__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::CurrentExpr => { + if current_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("currentExpr")); + } + current_expr__ = map_.next_value()?; + } + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalDynamicFilterExprNode { + current_expr: current_expr__, + children: children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + is_complete: is_complete__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterExprNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16610,6 +16758,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16656,6 +16807,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -16680,6 +16833,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16721,6 +16875,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16883,6 +17038,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ab60c3058dbde..4dfbe9d5bf757 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1317,7 +1317,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1370,6 +1370,8 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1572,6 +1574,42 @@ pub struct PhysicalHashExprNode { #[prost(string, tag = "6")] pub description: ::prost::alloc::string::String, } +/// A DynamicFilterPhysicalExpr: a filter whose inner expression is shared, +/// mutable state owned by a producer (e.g. a HashJoinExec build side). +/// +/// Multiple `PhysicalExprNode`s in a plan can reference the same dynamic filter; +/// they are linked on the deserialize side via `id`. The first site to be +/// deserialized becomes the canonical wrapper and is cached; each subsequent +/// site overlays its own `children` on the canonical via +/// `PhysicalExpr::with_new_children`, preserving the canonical's mutable state +/// while keeping per-site projections. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterExprNode { + /// The filter expression as visible at this call site (equivalent to calling + /// `current()` on the `DynamicFilterPhysicalExpr`, with any per-site child + /// remapping already applied). + /// + /// Identity (for linking multiple call sites to the same shared state) is + /// carried on the enclosing `PhysicalExprNode.expr_id` — populated by the + /// `DeduplicatingProtoConverter` from `PhysicalExpr::expression_id`. + #[prost(message, optional, boxed, tag = "1")] + pub current_expr: ::core::option::Option< + ::prost::alloc::boxed::Box, + >, + /// The children visible at this call site — what `PhysicalExpr::children` + /// returns. These are the `current_expr`'s leaf column references. + #[prost(message, repeated, tag = "2")] + pub children: ::prost::alloc::vec::Vec, + /// Generation counter captured at serialization time. The receiver installs + /// it on the reconstructed `Inner` so subsequent local `update()` calls keep + /// a monotonic sequence. Does not propagate live updates from the sender. + #[prost(uint64, tag = "3")] + pub generation: u64, + /// Whether the sender had already called `mark_complete()` when this + /// expression was serialized. + #[prost(bool, tag = "4")] + pub is_complete: bool, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct FilterExecNode { #[prost(message, optional, boxed, tag = "1")] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 61665db607757..c41d767a8fc31 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -38,6 +38,7 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{FunctionRegistry, TaskContext}; use datafusion_expr::WindowFunctionDefinition; use datafusion_expr::dml::InsertOp; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion_physical_plan::expressions::{ @@ -534,6 +535,43 @@ pub fn parse_physical_expr_with_converter( .collect::>()?; codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _ } + ExprType::DynamicFilter(node) => { + // Build a fresh wrapper from this site's view. When deserialized + // with `DeduplicatingDeserializer`, the first site to reach here + // becomes the canonical Arc kept in the id cache; subsequent sites + // bypass this construction and instead overlay their `children` on + // the cached canonical via `with_new_children` — preserving the + // canonical's `inner` state. + // + // Identity comes from the enclosing `PhysicalExprNode.expr_id` + // (stamped by the deduplicating serializer). Without that — e.g. + // when the default converter is used — the filter gets a fresh + // random id, giving each deserialized wrapper an independent + // identity, which matches the pre-dedup behavior. + let children = parse_physical_exprs( + &node.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + let current_expr = parse_required_physical_expr( + node.current_expr.as_deref(), + ctx, + "current_expr", + input_schema, + codec, + proto_converter, + )?; + let id = proto.expr_id.unwrap_or_else(rand::random); + Arc::new(DynamicFilterPhysicalExpr::with_id_and_state( + id, + children, + current_expr, + node.generation, + node.is_complete, + )) as Arc + } }; Ok(pexpr) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..61419a155b8de 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use arrow::compute::SortOptions; @@ -3839,20 +3838,15 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. -struct DeduplicatingSerializer { - /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. - session_id: u64, -} - -impl DeduplicatingSerializer { - fn new() -> Self { - Self { - session_id: rand::random(), - } - } -} +/// Internal serializer that stamps each proto expression node with the +/// source expression's stable [`PhysicalExpr::expression_id`] when one exists. +/// +/// Currently only `DynamicFilterPhysicalExpr` reports an identity; other +/// expressions serialize without an `expr_id`. Adding `expression_id()` to +/// other types (e.g. large IN lists) would restore cross-site deduplication +/// for them too. +#[derive(Default)] +struct DeduplicatingSerializer; impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_execution_plan( @@ -3898,26 +3892,21 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { codec: &dyn PhysicalExtensionCodec, ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); - + proto.expr_id = expr.expression_id(); Ok(proto) } } -/// Internal deserializer that caches expressions by expr_id. -/// Created fresh for each deserialization operation. +/// Internal deserializer that caches expressions by `expr_id` so that every +/// call site reporting the same id shares mutable state across the plan. +/// +/// Cache miss: deserialize normally, cache the result. +/// Cache hit: deserialize just to recover this site's children, then overlay +/// them on the cached canonical via `with_new_children`. The canonical keeps +/// its shared state (e.g. the `Arc>` in +/// `DynamicFilterPhysicalExpr`); each site ends up with its own child overlay. #[derive(Default)] struct DeduplicatingDeserializer { - /// Cache mapping expr_id to deserialized expressions. cache: RefCell>>, } @@ -3952,23 +3941,32 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); - } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( + let Some(expr_id) = proto.expr_id else { + return parse_physical_expr_with_converter( proto, ctx, input_schema, codec, self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + ); + }; + + let cached = self.cache.borrow().get(&expr_id).cloned(); + let parsed = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + match cached { + Some(canonical) => { + // Overlay this site's children on the cached canonical so that + // shared state (e.g. `DynamicFilterPhysicalExpr`'s `inner`) is + // preserved but each site keeps its own child references. + let children: Vec> = + parsed.children().into_iter().cloned().collect(); + canonical.with_new_children(children) + } + None => { + self.cache.borrow_mut().insert(expr_id, Arc::clone(&parsed)); + Ok(parsed) + } } } @@ -3981,20 +3979,23 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { } } -/// A proto converter that adds expression deduplication during serialization -/// and deserialization. +/// A proto converter that preserves expression identity across serialize / +/// deserialize round-trips. /// -/// During serialization, each expression's Arc pointer address is XORed with a -/// random session_id to create a salted `expr_id`. This prevents cross-process -/// collisions when serialized plans are merged. +/// During serialization, each proto `PhysicalExprNode` is stamped with the +/// source expression's stable [`PhysicalExpr::expression_id`] if one is +/// reported. Today only [`DynamicFilterPhysicalExpr`] reports an identity. /// -/// During deserialization, expressions with the same `expr_id` share the same -/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large -/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. +/// During deserialization, expressions sharing the same `expr_id` are unified +/// so that multiple call sites reference the same `Arc`. For +/// [`DynamicFilterPhysicalExpr`] specifically, the cache stores the canonical +/// (non-remapped) wrapper; each call site then overlays its own projection via +/// `with_new_children` while sharing the canonical's mutable `inner` state. /// -/// This converter is stateless - it creates internal serializers/deserializers +/// This converter is stateless — it creates internal serializers/deserializers /// on demand for each operation. /// +/// [`PhysicalExpr::expression_id`]: https://docs.rs/datafusion-physical-expr-common/latest/datafusion_physical_expr_common/physical_expr/trait.PhysicalExpr.html#method.expression_id /// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html #[derive(Debug, Default, Clone, Copy)] pub struct DeduplicatingProtoConverter {} @@ -4018,7 +4019,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { where Self: Sized, { - let serializer = DeduplicatingSerializer::new(); + let serializer = DeduplicatingSerializer; protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( Arc::clone(plan), codec, @@ -4045,7 +4046,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { expr: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { - let serializer = DeduplicatingSerializer::new(); + let serializer = DeduplicatingSerializer; serializer.physical_expr_to_proto(expr, codec) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f0eb6d27aac30..10281046b0bcb 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -32,8 +32,8 @@ use datafusion_datasource_json::file_format::JsonSink; use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, @@ -257,9 +257,7 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let expr = snapshot_physical_expr(Arc::clone(value))?; + let expr = Arc::clone(value); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which @@ -286,6 +284,29 @@ pub fn serialize_physical_expr_with_converter( }); } + if let Some(df) = expr.downcast_ref::() { + // Emit this site's view: `children()` as observed here (remapped if + // this wrapper went through `with_new_children`) and `current()` with + // the same remapping applied. Identity for linking multiple sites is + // carried on the enclosing `PhysicalExprNode.expr_id` by the + // deduplicating converter via `PhysicalExpr::expression_id`. + let current_expr = proto_converter + .physical_expr_to_proto(&df.current()?, codec) + .map(Box::new)?; + let children = serialize_physical_exprs(df.children(), codec, proto_converter)?; + return Ok(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterExprNode { + current_expr: Some(current_expr), + children, + generation: df.snapshot_generation(), + is_complete: df.is_complete(), + }), + )), + }); + } + if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3f1c306603bc1..ab09c23d28a4f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2746,75 +2746,6 @@ fn test_expression_deduplication() -> Result<()> { Ok(()) } -/// Test that expression deduplication correctly shares Arcs for identical expressions. -/// This test verifies the core deduplication behavior. -#[test] -fn test_expression_deduplication_arc_sharing() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a column expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - // Create a projection that uses the SAME Arc twice - // After roundtrip, both should point to the same Arc - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc! - alias: "a2".to_string(), - }, - ]; - - let input = Arc::new(EmptyExec::new(schema)); - let exec_plan = Arc::new(ProjectionExec::try_new(projection_exprs, input)?); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; - - // Deserialize with a fresh converter - let deser_converter = DeduplicatingProtoConverter {}; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Get the projection from the result - let projection = result_plan - .downcast_ref::() - .expect("Expected ProjectionExec"); - - let exprs: Vec<_> = projection.expr().iter().collect(); - assert_eq!(exprs.len(), 2); - - // The key test: both expressions should point to the same Arc after deduplication - // This is because they were the same Arc before serialization - assert!( - Arc::ptr_eq(&exprs[0].expr, &exprs[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); - - Ok(()) -} - /// Test backward compatibility: protos without expr_id should still deserialize correctly. #[test] fn test_backward_compatibility_no_expr_id() -> Result<()> { @@ -2854,270 +2785,311 @@ fn test_backward_compatibility_no_expr_id() -> Result<()> { Ok(()) } -/// Test that deduplication works within a single plan deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). +/// Round-trip a plan whose predicate contains two `DynamicFilterPhysicalExpr` +/// wrappers that share the same mutable `inner` state. After deserialization +/// both wrappers must still share that state so a single `update()` call +/// propagates to both — the core invariant that keeps HashJoinExec ↔ +/// pushed-down ParquetSource dynamic filtering working across the wire. #[test] -fn test_deduplication_within_plan_deserialization() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; +fn roundtrip_dynamic_filter_preserves_shared_inner() -> Result<()> { + use datafusion::physical_expr::expressions::DynamicFilterPhysicalExpr; - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema)?; + let initial: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Gt, + lit(10i32), + )); + let df = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), + )); - // Create a plan with expressions that will be deduplicated - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc - will be deduplicated - alias: "a2".to_string(), - }, - ]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs, - Arc::new(EmptyExec::new(schema)), - )?); + // Simulate a second call site (e.g. a pushed-down predicate) obtained via + // `with_new_children`. Both wrappers still share `inner`. + let df_other: Arc = + Arc::clone(&df).with_new_children(vec![Arc::clone(&col_a)])?; + let df_as_expr: Arc = Arc::clone(&df) as Arc; + let predicate: Arc = + Arc::new(BinaryExpr::new(df_as_expr, Operator::And, df_other)); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let plan = Arc::new(FilterExec::try_new(predicate, input)?); let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, + Arc::clone(&plan) as Arc, &codec, &proto_converter, )?; - - // First deserialization - let plan1 = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; - - // Check that the plan was deserialized correctly with deduplication - let projection1 = plan1 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs1: Vec<_> = projection1.expr().iter().collect(); - assert_eq!(exprs1.len(), 2); - assert!( - Arc::ptr_eq(&exprs1[0].expr, &exprs1[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); - - // Second deserialization - let plan2 = physical_plan_from_bytes_with_proto_converter( + let result_plan = physical_plan_from_bytes_with_proto_converter( bytes.as_ref(), ctx.task_ctx().as_ref(), &codec, &proto_converter, )?; - // Check that the second plan was also deserialized correctly - let projection2 = plan2 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs2: Vec<_> = projection2.expr().iter().collect(); - assert_eq!(exprs2.len(), 2); - assert!( - Arc::ptr_eq(&exprs2[0].expr, &exprs2[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); - - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(&exprs1[0].expr, &exprs2[0].expr), - "Expected expressions from different deserializations to be different Arcs" - ); - assert!( - !Arc::ptr_eq(&exprs1[1].expr, &exprs2[1].expr), - "Expected expressions from different deserializations to be different Arcs" - ); + let result_filter = result_plan + .downcast_ref::() + .expect("Expected FilterExec"); + let binary = result_filter + .predicate() + .downcast_ref::() + .expect("Expected BinaryExpr predicate"); + let left_df = binary + .left() + .downcast_ref::() + .expect("Expected left DynamicFilterPhysicalExpr"); + let right_df = binary + .right() + .downcast_ref::() + .expect("Expected right DynamicFilterPhysicalExpr"); + + // Identity preserved + assert_eq!(df.expression_id(), left_df.expression_id()); + assert_eq!(left_df.expression_id(), right_df.expression_id()); + + // Mutating one wrapper is visible through the other — the real test. + let updated: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Gt, + lit(42i32), + )); + left_df.update(Arc::clone(&updated))?; + // Both wrappers have identical children, so `current()` returns the raw + // inner expression unchanged — no remapping applied. + assert_eq!(&right_df.current()?, &updated); Ok(()) } -/// Test that deduplication works within direct expression deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). +/// Round-trip a plan where one call site has been through `with_new_children` +/// with a *different* set of children than the canonical original. On +/// deserialize the canonical wrapper must be shared but each site must also +/// carry its own effective children. #[test] -fn test_deduplication_within_expr_deserialization() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); +fn roundtrip_dynamic_filter_preserves_remapped_children() -> Result<()> { + use datafusion::physical_expr::expressions::DynamicFilterPhysicalExpr; - // Create a binary expression where both sides are the same Arc - // This allows us to test deduplication within a single deserialization - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let binary_expr: Arc = Arc::new(BinaryExpr::new( - Arc::clone(&col_expr), - Operator::Plus, - Arc::clone(&col_expr), // Same Arc - will be deduplicated + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let col_a: Arc = Arc::new(Column::new("a", 0)); + let col_b: Arc = Arc::new(Column::new("b", 1)); + let initial: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Gt, + lit(10i32), + )); + let df = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), )); + // Second wrapper remaps `a` -> `b` to simulate a pushed-down site with a + // different projection. + let df_remapped: Arc = + Arc::clone(&df).with_new_children(vec![Arc::clone(&col_b)])?; + + let df_as_expr: Arc = Arc::clone(&df) as Arc; + let predicate: Arc = + Arc::new(BinaryExpr::new(df_as_expr, Operator::And, df_remapped)); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let plan = Arc::new(FilterExec::try_new(predicate, input)?); let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize the expression - let proto = proto_converter.physical_expr_to_proto(&binary_expr, &codec)?; - - // First expression deserialization - let expr1 = proto_converter.proto_to_physical_expr( - &proto, - ctx.task_ctx().as_ref(), - &schema, + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&plan) as Arc, &codec, + &proto_converter, )?; - - // Check that deduplication worked within the deserialization - let binary1 = expr1 - .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary1.left(), binary1.right()), - "Expected both sides to share the same Arc after deduplication" - ); - - // Second expression deserialization - let expr2 = proto_converter.proto_to_physical_expr( - &proto, + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), ctx.task_ctx().as_ref(), - &schema, &codec, + &proto_converter, )?; - // Check that the second expression was also deserialized correctly - let binary2 = expr2 + let binary = result_plan + .downcast_ref::() + .expect("Expected FilterExec") + .predicate() .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary2.left(), binary2.right()), - "Expected both sides to share the same Arc after deduplication" - ); - - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(binary1.left(), binary2.left()), - "Expected expressions from different deserializations to be different Arcs" - ); - assert!( - !Arc::ptr_eq(binary1.right(), binary2.right()), - "Expected expressions from different deserializations to be different Arcs" - ); + .expect("Expected BinaryExpr predicate"); + let left_df = binary + .left() + .downcast_ref::() + .expect("Expected left DynamicFilter"); + let right_df = binary + .right() + .downcast_ref::() + .expect("Expected right DynamicFilter"); + + // Shared identity. + assert_eq!(left_df.expression_id(), right_df.expression_id()); + + // Site-specific effective children differ: left keeps `a`, right maps to `b`. + let left_children: Vec<_> = left_df.children().into_iter().cloned().collect(); + let right_children: Vec<_> = right_df.children().into_iter().cloned().collect(); + assert_eq!(left_children, vec![Arc::clone(&col_a)]); + assert_eq!(right_children, vec![Arc::clone(&col_b)]); + + // The load-bearing invariant: even though the two sites observe different + // columns, they share `inner`. An update through one wrapper must be + // visible through the other — and the remapping applied by `current()` on + // the remapped side must substitute col_a → col_b in the updated + // expression. + let gen_before = left_df.snapshot_generation(); + assert_eq!(gen_before, right_df.snapshot_generation()); + + let updated_ref_a: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Lt, + lit(99i32), + )); + left_df.update(Arc::clone(&updated_ref_a))?; + + assert_eq!(left_df.snapshot_generation(), gen_before + 1); + assert_eq!(right_df.snapshot_generation(), gen_before + 1); + + // Left is the canonical (non-remapped) wrapper, so `current()` returns + // the stored inner expression unchanged. + assert_eq!(&left_df.current()?, &updated_ref_a); + // Right's `current()` remaps col_a → col_b. + let expected_right: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_b), + Operator::Lt, + lit(99i32), + )); + assert_eq!(&right_df.current()?, &expected_right); Ok(()) } -/// Test that session_id rotates between top-level serialization operations. -/// This verifies that each top-level serialization gets a fresh session_id, -/// which prevents cross-process collisions when serialized plans are merged. +/// Round-trip the shape from the PR description: +/// +/// ```text +/// FilterExec(dynamic_filter) +/// ProjectionExec(a := Column("a", source_index)) +/// DataSourceExec +/// ParquetSource(predicate = dynamic_filter) +/// ``` +/// +/// After deserialization the `FilterExec`'s predicate and the `ParquetSource`'s +/// pushed-down predicate must be two wrappers around the same shared `inner`, +/// even though the `ParquetSource` site is reached through a `ProjectionExec`. #[test] -fn test_session_id_rotation_between_serializations() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let _schema = Arc::new(Schema::new(vec![field_a])); - - // Create a simple expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // First serialization - let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set"); +fn roundtrip_dynamic_filter_in_parquet_pushdown() -> Result<()> { + use datafusion::physical_expr::expressions::DynamicFilterPhysicalExpr; + + let data_source_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let projection_schema = Arc::clone(&data_source_schema); + + // Canonical filter held by the (conceptual) producer side. + let producer_col: Arc = Arc::new(Column::new("a", 0)); + let df_producer = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&producer_col)], + lit(true), + )); - // Second serialization with the same converter - // The session_id should have rotated, so the expr_id should be different - // even though we're serializing the same expression (same pointer address) - let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set"); + // Pushed-down wrapper — same inner state, potentially remapped children. + let df_pushed: Arc = Arc::clone(&df_producer) + .with_new_children(vec![Arc::new(Column::new("a", 0))])?; - // The expr_ids should be different because session_id rotated - assert_ne!( - expr_id1, expr_id2, - "Expected different expr_ids due to session_id rotation between serializations" + let parquet = Arc::new( + ParquetSource::new(Arc::clone(&data_source_schema)) + .with_predicate(Arc::clone(&df_pushed)), ); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), parquet) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .build(); + let data_source = DataSourceExec::from_data_source(scan_config); - // Also test that serializing the same expression multiple times within - // the same top-level operation would give the same expr_id (not testable - // here directly since each physical_expr_to_proto is a top-level operation, - // but the deduplication tests verify this indirectly) - - Ok(()) -} - -/// Test that session_id rotation works correctly with execution plans. -/// This verifies the end-to-end behavior with plan serialization. -#[test] -fn test_session_id_rotation_with_execution_plans() -> Result<()> { - use datafusion_proto::bytes::physical_plan_to_bytes_with_proto_converter; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); + let projection = Arc::new(ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 0)) as Arc, + alias: "a".to_string(), + }], + data_source, + )?) as Arc; - // Create a simple plan - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs.clone(), - Arc::new(EmptyExec::new(Arc::clone(&schema))), - )?); + let df_top: Arc = Arc::clone(&df_producer) as Arc; + let filter_exec = + Arc::new(FilterExec::try_new(df_top, projection)?) as Arc; + let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; let proto_converter = DeduplicatingProtoConverter {}; - - // First serialization - let bytes1 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&filter_exec), &codec, &proto_converter, )?; - - // Second serialization with the same converter - let bytes2 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), &codec, &proto_converter, )?; - // The serialized bytes should be different due to different session_ids - // (specifically, the expr_id values embedded in the protobuf will differ) - assert_ne!( - bytes1.as_ref(), - bytes2.as_ref(), - "Expected different serialized bytes due to session_id rotation" - ); + let top_filter = result_plan + .downcast_ref::() + .expect("Expected top FilterExec"); + let top_pred = Arc::clone(top_filter.predicate()); + let top_df = top_pred + .downcast_ref::() + .expect("Expected top predicate to be DynamicFilterPhysicalExpr"); - // But both should deserialize correctly - let ctx = SessionContext::new(); - let deser_converter = DeduplicatingProtoConverter {}; + let projection = top_filter.children()[0] + .downcast_ref::() + .expect("Expected ProjectionExec"); + let data_source = projection + .input() + .downcast_ref::() + .expect("Expected DataSourceExec"); + let scan_config = data_source + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let pushed_pred = scan_config + .file_source() + .filter() + .expect("ParquetSource should have a pushed-down predicate after round-trip"); + let pushed_df = pushed_pred + .downcast_ref::() + .expect("Expected pushed predicate to be DynamicFilterPhysicalExpr"); - let plan1 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes1.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; + // Both sites share identity. + assert_eq!(top_df.expression_id(), pushed_df.expression_id()); - let plan2 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes2.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; + // An update on the top (producer) side must be visible at the scan site. + let before = top_df.snapshot_generation(); + assert_eq!(before, pushed_df.snapshot_generation()); + + let new_filter: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + lit(7i64), + )); + top_df.update(Arc::clone(&new_filter))?; + + assert_eq!(pushed_df.snapshot_generation(), before + 1); + assert_eq!(&pushed_df.current()?, &new_filter); - // Verify both plans have the expected structure - assert_eq!(plan1.schema(), plan2.schema()); + // Verify the projection schema is still sane — guards against the plan + // being reshaped into something unrepresentative. + assert_eq!(projection.schema().fields(), projection_schema.fields()); Ok(()) } From 2036a0c773a8a02c41196bdf25f8d8f005c820ab Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 22 Apr 2026 19:38:08 -0500 Subject: [PATCH 2/7] physical-expr: builder setters on DynamicFilterPhysicalExpr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the 5-arg with_id_and_state(id, children, inner, generation, is_complete) constructor with three fluent setters on Self that match the HashJoinExecBuilder style used elsewhere in the crate: DynamicFilterPhysicalExpr::new(children, inner) .with_id(id) .with_generation(generation) .with_is_complete(is_complete) new() keeps the default case (random id, generation 1, not complete). The setters are intended for the deserialize side of proto round-trip and assume the filter hasn't been shared yet; update() and mark_complete() remain the correct path for live mutation. No behavior change — only from_proto is migrated. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/expressions/dynamic_filters.rs | 81 ++++++++++++------- .../proto/src/physical_plan/from_proto.rs | 13 ++- 2 files changed, 60 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index cab756b484cf2..ea6ff5d9bc387 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -184,38 +184,15 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { - Self::with_id_and_state(rand::random(), children, inner, 1, false) - } - - /// Construct a `DynamicFilterPhysicalExpr` with all identity + mutable-state - /// fields supplied by the caller. Used on the deserialize side of proto - /// round-trip to rehydrate a filter with the id, generation counter, and - /// completion flag captured at serialization time. - /// - /// The `state_watch` channel is always fresh: cross-process update - /// propagation is not provided by proto, so waiters registered on the - /// sender do not carry across. - pub fn with_id_and_state( - id: u64, - children: Vec>, - inner: Arc, - generation: u64, - is_complete: bool, - ) -> Self { - let initial_state = if is_complete { - FilterState::Complete { generation } - } else { - FilterState::InProgress { generation } - }; - let (state_watch, _) = watch::channel(initial_state); + let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); Self { - id, + id: rand::random(), children, remapped_children: None, inner: Arc::new(RwLock::new(Inner { - generation, + generation: 1, expr: inner, - is_complete, + is_complete: false, })), state_watch, data_type: Arc::new(RwLock::new(None)), @@ -223,6 +200,56 @@ impl DynamicFilterPhysicalExpr { } } + /// Override this filter's [`Self::expression_id`]. + /// + /// Typically used on the deserialize side of a proto round-trip so a + /// reconstructed wrapper keeps the same identity as the sender's, letting + /// multiple call sites share mutable state via the id cache. + pub fn with_id(mut self, id: u64) -> Self { + self.id = id; + self + } + + /// Override the initial generation counter. Intended for the deserialize + /// side of a proto round-trip, to rehydrate the sender's generation so + /// later local [`Self::update`] calls keep a monotonic sequence. + /// + /// Only safe to call on a freshly-constructed filter that hasn't been + /// shared yet — use [`Self::update`] for live mutation of a shared filter. + pub fn with_generation(self, generation: u64) -> Self { + let is_complete = { + let mut inner = self.inner.write(); + inner.generation = generation; + inner.is_complete + }; + let _ = self.state_watch.send(if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }); + self + } + + /// Override the completion flag. Intended for the deserialize side of a + /// proto round-trip, to rehydrate a sender that had already called + /// [`Self::mark_complete`] before serialization. + /// + /// Only safe to call on a freshly-constructed filter that hasn't been + /// shared yet — use [`Self::mark_complete`] for live mutation. + pub fn with_is_complete(self, is_complete: bool) -> Self { + let generation = { + let mut inner = self.inner.write(); + inner.is_complete = is_complete; + inner.generation + }; + let _ = self.state_watch.send(if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }); + self + } + fn remap_children( children: &[Arc], remapped_children: Option<&Vec>>, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index c41d767a8fc31..55bc78be49af6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -564,13 +564,12 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?; let id = proto.expr_id.unwrap_or_else(rand::random); - Arc::new(DynamicFilterPhysicalExpr::with_id_and_state( - id, - children, - current_expr, - node.generation, - node.is_complete, - )) as Arc + Arc::new( + DynamicFilterPhysicalExpr::new(children, current_expr) + .with_id(id) + .with_generation(node.generation) + .with_is_complete(node.is_complete), + ) as Arc } }; From 54e3b5847f7c5566bba1e5dfa76a3a58cec0274d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 22 Apr 2026 19:43:18 -0500 Subject: [PATCH 3/7] hash_join: public builder setter + accessor for the dynamic filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change HashJoinExecBuilder::with_dynamic_filter from a crate-private `Option` setter into a public API that takes just `Arc` and wraps it internally. The inner `HashJoinExecDynamicFilter` and its `SharedBuildAccumulator` stay private; only the filter itself crosses the API boundary. Promote `dynamic_filter_for_test` to a plain `pub fn dynamic_filter()` accessor — proto serialization has a legitimate non-test use for it. Existing test caller migrated. No behavior change. Sets up HashJoinExec.dynamic_filter to be round-trippable through proto in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 6 ++-- .../physical-plan/src/joins/hash_join/exec.rs | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 4ff1fad8f52b9..b08abe2bf9773 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -2755,8 +2755,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { ); } -// Not portable to sqllogictest: asserts on `HashJoinExec::dynamic_filter_for_test().is_used()` -// which is a debug-only API. The observable behavior (probe-side scan +// Not portable to sqllogictest: asserts on `HashJoinExec::dynamic_filter().is_used()` +// which is a Rust API. The observable behavior (probe-side scan // receiving the dynamic filter when the data source supports it) is // already covered by the simpler CollectLeft port in push_down_filter_parquet.slt; // the with_support(false) branch has no SQL analog (parquet always supports @@ -2835,7 +2835,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { // Verify that a dynamic filter was created let dynamic_filter = hash_join - .dynamic_filter_for_test() + .dynamic_filter() .expect("Dynamic filter should be created"); // Verify that is_used() returns the expected value based on probe side support. diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 0eca270ebb06f..f702483e431c2 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -490,8 +490,17 @@ impl HashJoinExecBuilder { }) } - fn with_dynamic_filter(mut self, filter: Option) -> Self { - self.exec.dynamic_filter = filter; + /// Attach a pre-existing [`DynamicFilterPhysicalExpr`] to this + /// `HashJoinExec`. The filter's shared mutable state is preserved; the + /// build-side update coordinator is reinitialized per execution. + /// + /// Callers typically get this filter from a proto round-trip so that the + /// `HashJoinExec` and any pushed-down scan site observe the same `inner`. + pub fn with_dynamic_filter(mut self, filter: Arc) -> Self { + self.exec.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter, + build_accumulator: OnceLock::new(), + }); self } } @@ -902,12 +911,10 @@ impl HashJoinExec { self.null_equality } - /// Get the dynamic filter expression for testing purposes. - /// Returns `None` if no dynamic filter has been set. - /// - /// This method is intended for testing only and should not be used in production code. - #[doc(hidden)] - pub fn dynamic_filter_for_test(&self) -> Option<&Arc> { + /// The [`DynamicFilterPhysicalExpr`] this join maintains at runtime, if + /// any. Updated on build completion and observed by consumers that had + /// the filter pushed down into them. + pub fn dynamic_filter(&self) -> Option<&Arc> { self.dynamic_filter.as_ref().map(|df| &df.filter) } @@ -1674,10 +1681,7 @@ impl ExecutionPlan for HashJoinExec { // We successfully pushed down our self filter - we need to make a new node with the dynamic filter let new_node = self .builder() - .with_dynamic_filter(Some(HashJoinExecDynamicFilter { - filter: dynamic_filter, - build_accumulator: OnceLock::new(), - })) + .with_dynamic_filter(dynamic_filter) .build_exec()?; result = result.with_updated_node(new_node); } From 89348641e7497685dd2817329935d96a5ae96202 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:52:10 -0500 Subject: [PATCH 4/7] sort: add SortExec::with_dynamic_filter builder setter + accessor Mirroring HashJoinExecBuilder::with_dynamic_filter: expose a fluent setter on SortExec that installs a caller-provided `Arc` as the TopK dynamic filter, replacing any auto-created one. Add a matching `dynamic_filter()` accessor that reads through the `TopKDynamicFilters` wrapper. No behavior change. Sets up SortExec.filter to be round-trippable through proto in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/physical-plan/src/sorts/sort.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..e9a82f6b62831 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -933,6 +933,25 @@ impl SortExec { } } + /// Install a pre-existing [`DynamicFilterPhysicalExpr`] as this sort's + /// TopK dynamic filter, replacing any auto-created one. + /// + /// Used on the deserialize side of a proto round-trip so the `SortExec` + /// and any pushed-down scan site observe the same `inner`. Only + /// meaningful when combined with [`Self::with_fetch`]; the filter is + /// consulted during TopK execution. + pub fn with_dynamic_filter(mut self, filter: Arc) -> Self { + self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter)))); + self + } + + /// Return this sort's TopK dynamic filter, if any. Set by + /// [`Self::with_fetch`] (auto-created) or [`Self::with_dynamic_filter`] + /// (caller-supplied). + pub fn dynamic_filter(&self) -> Option> { + self.filter.as_ref().map(|f| f.read().expr()) + } + /// Modify how many rows to include in the result /// /// If None, then all rows will be returned, in sorted order. From 3d9daeff174fa9d14ad1f655dd8b77548b36e6d9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 23 Apr 2026 07:39:37 -0500 Subject: [PATCH 5/7] proto: round-trip HashJoinExec dynamic_filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add optional PhysicalExprNode dynamic_filter field to HashJoinExecNode. Emit it from to_proto via the new HashJoinExec::dynamic_filter() accessor; on deserialize, parse it and install via HashJoinExecBuilder::with_dynamic_filter. Critically, because the scan-side pushed-down predicate is already in the id cache by the time we deserialize the HashJoinExec's field, the cache hit returns the same canonical Arc — the join's filter and the scan's filter share `inner` automatically. Build-side update() during execution is observed by the scan, and the scan prunes rows. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/proto/proto/datafusion.proto | 4 ++ datafusion/proto/src/generated/pbjson.rs | 18 ++++++++ datafusion/proto/src/generated/prost.rs | 5 +++ datafusion/proto/src/physical_plan/mod.rs | 50 +++++++++++++++++------ 4 files changed, 64 insertions(+), 13 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 0a55058ad30ab..c6c37aaaa1d1b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1196,6 +1196,10 @@ message HashJoinExecNode { JoinFilter filter = 8; repeated uint32 projection = 9; bool null_aware = 10; + // Dynamic filter built on the build side and observed by pushed-down scans. + // Carried here so receiver's HashJoinExec shares mutable state with the + // scan sites via the `DeduplicatingProtoConverter`'s expr_id cache. + optional PhysicalExprNode dynamic_filter = 11; } enum StreamPartitionMode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index faee84dca2e2d..edf799bf56fbd 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -8674,6 +8674,9 @@ impl serde::Serialize for HashJoinExecNode { if self.null_aware { len += 1; } + if self.dynamic_filter.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.HashJoinExecNode", len)?; if let Some(v) = self.left.as_ref() { struct_ser.serialize_field("left", v)?; @@ -8708,6 +8711,9 @@ impl serde::Serialize for HashJoinExecNode { if self.null_aware { struct_ser.serialize_field("nullAware", &self.null_aware)?; } + if let Some(v) = self.dynamic_filter.as_ref() { + struct_ser.serialize_field("dynamicFilter", v)?; + } struct_ser.end() } } @@ -8731,6 +8737,8 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "projection", "null_aware", "nullAware", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -8744,6 +8752,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { Filter, Projection, NullAware, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -8774,6 +8783,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { "filter" => Ok(GeneratedField::Filter), "projection" => Ok(GeneratedField::Projection), "nullAware" | "null_aware" => Ok(GeneratedField::NullAware), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -8802,6 +8812,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { let mut filter__ = None; let mut projection__ = None; let mut null_aware__ = None; + let mut dynamic_filter__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Left => { @@ -8861,6 +8872,12 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { } null_aware__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilter => { + if dynamic_filter__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + dynamic_filter__ = map_.next_value()?; + } } } Ok(HashJoinExecNode { @@ -8873,6 +8890,7 @@ impl<'de> serde::Deserialize<'de> for HashJoinExecNode { filter: filter__, projection: projection__.unwrap_or_default(), null_aware: null_aware__.unwrap_or_default(), + dynamic_filter: dynamic_filter__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 4dfbe9d5bf757..fd48aa8684798 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1776,6 +1776,11 @@ pub struct HashJoinExecNode { pub projection: ::prost::alloc::vec::Vec, #[prost(bool, tag = "10")] pub null_aware: bool, + /// Dynamic filter built on the build side and observed by pushed-down scans. + /// Carried here so receiver's HashJoinExec shares mutable state with the + /// scan sites via the `DeduplicatingProtoConverter`'s expr_id cache. + #[prost(message, optional, tag = "11")] + pub dynamic_filter: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SymmetricHashJoinExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 61419a155b8de..53a43f9b948ad 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -56,6 +56,7 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, @@ -73,8 +74,8 @@ use datafusion_physical_plan::expressions::PhysicalSortExpr; use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, - StreamJoinPartitionMode, SymmetricHashJoinExec, + CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode, + SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::memory::LazyMemoryExec; @@ -1407,17 +1408,31 @@ impl protobuf::PhysicalPlanNode { } else { None }; - Ok(Arc::new(HashJoinExec::try_new( - left, - right, - on, - filter, - &join_type.into(), - projection, - partition_mode, - null_equality.into(), - hashjoin.null_aware, - )?)) + let mut builder = HashJoinExecBuilder::new(left, right, on, join_type.into()) + .with_filter(filter) + .with_partition_mode(partition_mode) + .with_null_equality(null_equality.into()) + .with_null_aware(hashjoin.null_aware) + .with_projection(projection); + + if let Some(proto_filter) = &hashjoin.dynamic_filter { + let filter_expr = proto_converter.proto_to_physical_expr( + proto_filter, + ctx, + &right_schema, + codec, + )?; + let df = Arc::downcast::(filter_expr).map_err( + |_| { + proto_error( + "HashJoinExec.dynamic_filter was not a DynamicFilterPhysicalExpr", + ) + }, + )?; + builder = builder.with_dynamic_filter(df); + } + + Ok(Arc::new(builder.build()?)) } fn try_into_symmetric_hash_join_physical_plan( @@ -2465,6 +2480,14 @@ impl protobuf::PhysicalPlanNode { PartitionMode::Auto => protobuf::PartitionMode::Auto, }; + let dynamic_filter = exec + .dynamic_filter() + .map(|df| { + let df_expr: Arc = Arc::clone(df) as _; + proto_converter.physical_expr_to_proto(&df_expr, codec) + }) + .transpose()?; + Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( protobuf::HashJoinExecNode { @@ -2479,6 +2502,7 @@ impl protobuf::PhysicalPlanNode { v.iter().map(|x| *x as u32).collect::>() }), null_aware: exec.null_aware, + dynamic_filter, }, ))), }) From 256b00eccb4480482f16b80fbd150c6f667aed2a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 23 Apr 2026 07:40:20 -0500 Subject: [PATCH 6/7] proto: round-trip SortExec's TopK dynamic filter Add optional PhysicalExprNode dynamic_filter field to SortExecNode. Emit from to_proto via SortExec::dynamic_filter(); on deserialize parse it and install via SortExec::with_dynamic_filter after the usual with_fetch / with_preserve_partitioning chain. The with_fetch step may auto-create a TopK filter when fetch is set; with_dynamic_filter then replaces it with the one from the sender so the id matches the pushed-down scan's copy (shared via the id cache). Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/proto/proto/datafusion.proto | 5 ++++ datafusion/proto/src/generated/pbjson.rs | 18 +++++++++++++++ datafusion/proto/src/generated/prost.rs | 6 +++++ datafusion/proto/src/physical_plan/mod.rs | 28 ++++++++++++++++++++++- 4 files changed, 56 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c6c37aaaa1d1b..5e46c601196f6 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1352,6 +1352,11 @@ message SortExecNode { // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; bool preserve_partitioning = 4; + // TopK dynamic filter built by the sort and observed by pushed-down scans. + // Only present when `fetch` is set. Carried here so receiver's SortExec + // shares mutable state with the scan sites via the DeduplicatingProtoConverter's + // expr_id cache. + optional PhysicalExprNode dynamic_filter = 5; } message SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index edf799bf56fbd..471cbae37f64b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22016,6 +22016,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { len += 1; } + if self.dynamic_filter.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.SortExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -22031,6 +22034,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { struct_ser.serialize_field("preservePartitioning", &self.preserve_partitioning)?; } + if let Some(v) = self.dynamic_filter.as_ref() { + struct_ser.serialize_field("dynamicFilter", v)?; + } struct_ser.end() } } @@ -22046,6 +22052,8 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "fetch", "preserve_partitioning", "preservePartitioning", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -22054,6 +22062,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { Expr, Fetch, PreservePartitioning, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -22079,6 +22088,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "expr" => Ok(GeneratedField::Expr), "fetch" => Ok(GeneratedField::Fetch), "preservePartitioning" | "preserve_partitioning" => Ok(GeneratedField::PreservePartitioning), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -22102,6 +22112,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { let mut expr__ = None; let mut fetch__ = None; let mut preserve_partitioning__ = None; + let mut dynamic_filter__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -22130,6 +22141,12 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { } preserve_partitioning__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilter => { + if dynamic_filter__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + dynamic_filter__ = map_.next_value()?; + } } } Ok(SortExecNode { @@ -22137,6 +22154,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { expr: expr__.unwrap_or_default(), fetch: fetch__.unwrap_or_default(), preserve_partitioning: preserve_partitioning__.unwrap_or_default(), + dynamic_filter: dynamic_filter__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index fd48aa8684798..843f65f614b19 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1990,6 +1990,12 @@ pub struct SortExecNode { pub fetch: i64, #[prost(bool, tag = "4")] pub preserve_partitioning: bool, + /// TopK dynamic filter built by the sort and observed by pushed-down scans. + /// Only present when `fetch` is set. Carried here so receiver's SortExec + /// shares mutable state with the scan sites via the DeduplicatingProtoConverter's + /// expr_id cache. + #[prost(message, optional, tag = "5")] + pub dynamic_filter: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 53a43f9b948ad..4071de4a6ebb1 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1680,10 +1680,28 @@ impl protobuf::PhysicalPlanNode { return internal_err!("SortExec requires an ordering"); }; let fetch = (sort.fetch >= 0).then_some(sort.fetch as _); - let new_sort = SortExec::new(ordering, input) + let input_schema = input.schema(); + let mut new_sort = SortExec::new(ordering, input) .with_fetch(fetch) .with_preserve_partitioning(sort.preserve_partitioning); + if let Some(proto_filter) = &sort.dynamic_filter { + let filter_expr = proto_converter.proto_to_physical_expr( + proto_filter, + ctx, + &input_schema, + codec, + )?; + let df = Arc::downcast::(filter_expr).map_err( + |_| { + proto_error( + "SortExec.dynamic_filter was not a DynamicFilterPhysicalExpr", + ) + }, + )?; + new_sort = new_sort.with_dynamic_filter(df); + } + Ok(Arc::new(new_sort)) } @@ -3125,6 +3143,13 @@ impl protobuf::PhysicalPlanNode { }) }) .collect::>>()?; + let dynamic_filter = exec + .dynamic_filter() + .map(|df| { + let df_expr: Arc = df as _; + proto_converter.physical_expr_to_proto(&df_expr, codec) + }) + .transpose()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( protobuf::SortExecNode { @@ -3135,6 +3160,7 @@ impl protobuf::PhysicalPlanNode { _ => -1, }, preserve_partitioning: exec.preserve_partitioning(), + dynamic_filter, }, ))), }) From 96f90402836810c8fe88aac31a54b7f80e2b4294 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 23 Apr 2026 07:59:35 -0500 Subject: [PATCH 7/7] test: add datafusion-tests workspace crate for cross-crate integration tests Introduce a new workspace member `datafusion-tests` at datafusion/tests dedicated to integration tests that need to depend on both `datafusion` and `datafusion-proto`. Placing such tests in either of those crates' own `tests/` directory would close a dev-dependency cycle caught by the workspace's circular-dependency check (see `dev/depcheck`). The first two tests exercise end-to-end SQL round-trips proving the DynamicFilterPhysicalExpr wiring added in preceding commits actually drives scan-level pruning: - `hash_join_dynamic_filter_prunes_via_sql`: INNER JOIN with a WHERE on the build side; after round-trip the probe-side `ParquetSource` emits fewer rows than the full table. - `topk_dynamic_filter_prunes_files_via_sql`: ORDER BY ... LIMIT 1 over two single-row parquet files; after round-trip the second file is pruned by row-group statistics because TopK's dynamic filter tightens after reading the first, and the scan sees exactly one row. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 13 ++ Cargo.toml | 1 + datafusion/tests/Cargo.toml | 44 +++++ datafusion/tests/src/lib.rs | 23 +++ .../tests/dynamic_filter_proto_roundtrip.rs | 184 ++++++++++++++++++ 5 files changed, 265 insertions(+) create mode 100644 datafusion/tests/Cargo.toml create mode 100644 datafusion/tests/src/lib.rs create mode 100644 datafusion/tests/tests/dynamic_filter_proto_roundtrip.rs diff --git a/Cargo.lock b/Cargo.lock index 5a76c063bbfad..5801fb6deb6b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2721,6 +2721,19 @@ dependencies = [ "url", ] +[[package]] +name = "datafusion-tests" +version = "53.1.0" +dependencies = [ + "arrow", + "datafusion", + "datafusion-common", + "datafusion-proto", + "insta", + "tempfile", + "tokio", +] + [[package]] name = "datafusion-wasmtest" version = "53.1.0" diff --git a/Cargo.toml b/Cargo.toml index 59707ba8e3f27..17825380bec63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ members = [ "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", + "datafusion/tests", "datafusion/wasmtest", "datafusion-cli", "datafusion-examples", diff --git a/datafusion/tests/Cargo.toml b/datafusion/tests/Cargo.toml new file mode 100644 index 0000000000000..6a8956217c7af --- /dev/null +++ b/datafusion/tests/Cargo.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-tests" +description = "Integration tests that span multiple DataFusion crates (e.g. datafusion + datafusion-proto)" +publish = false +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +# Empty lib to give Cargo a target; tests/ is where the actual code lives. +path = "src/lib.rs" + +[dev-dependencies] +arrow = { workspace = true } +datafusion = { workspace = true, default-features = true } +datafusion-common = { workspace = true } +datafusion-proto = { workspace = true } +insta = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/datafusion/tests/src/lib.rs b/datafusion/tests/src/lib.rs new file mode 100644 index 0000000000000..837e1d1f13348 --- /dev/null +++ b/datafusion/tests/src/lib.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Cross-crate integration tests for DataFusion. +//! +//! This crate intentionally has no library code — it exists only to host +//! integration tests under `tests/` that need to depend on both `datafusion` +//! and `datafusion-proto` simultaneously, which can't live in either of those +//! crates without creating a dev-dependency cycle. diff --git a/datafusion/tests/tests/dynamic_filter_proto_roundtrip.rs b/datafusion/tests/tests/dynamic_filter_proto_roundtrip.rs new file mode 100644 index 0000000000000..d85dd58e45f8d --- /dev/null +++ b/datafusion/tests/tests/dynamic_filter_proto_roundtrip.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end tests asserting that `datafusion-proto` round-trip preserves +//! the identity + shared mutable state of `DynamicFilterPhysicalExpr` +//! instances attached to operators that produce them (`HashJoinExec` and +//! `SortExec`'s TopK). +//! +//! Each test plans `EXPLAIN ANALYZE ` via SQL, round-trips the +//! resulting physical plan through proto, executes the deserialized plan, +//! and snapshots the `EXPLAIN ANALYZE` output text with insta. With +//! `datafusion.explain.analyze_categories = 'rows'`, the text contains only +//! deterministic row-count metrics — no timing — so the snapshot captures +//! plan shape, dynamic-filter state after execution, and scan-level pruning +//! all in one. +//! +//! These tests live in a standalone crate (`datafusion-tests`) because they +//! need dev-dependencies on both `datafusion` and `datafusion-proto`; putting +//! them in either of those crates' own `tests/` directory would close a +//! dev-dependency cycle caught by the workspace's circular-dependency check. + +use std::sync::Arc; + +use arrow::util::pretty::pretty_format_batches; +use datafusion::physical_plan::collect; +use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use datafusion_proto::bytes::{ + physical_plan_from_bytes_with_proto_converter, + physical_plan_to_bytes_with_proto_converter, +}; +use datafusion_proto::physical_plan::{ + DeduplicatingProtoConverter, DefaultPhysicalExtensionCodec, +}; + +/// Execute a SQL statement for its side effect (table creation, `COPY`, +/// etc.), discarding any result batches. +async fn exec(ctx: &SessionContext, sql: &str) -> datafusion_common::Result<()> { + ctx.sql(sql).await?.collect().await?; + Ok(()) +} + +/// Round-trip an EXPLAIN ANALYZE plan through proto, execute it, and return +/// the rendered text. +async fn roundtrip_and_explain_analyze( + ctx: &SessionContext, + sql: &str, +) -> datafusion_common::Result { + let explain_sql = format!("EXPLAIN ANALYZE {sql}"); + let plan = ctx.sql(&explain_sql).await?.create_physical_plan().await?; + + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&plan), + &codec, + &converter, + )?; + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &converter, + )?; + + let batches = collect(result_plan, ctx.task_ctx()).await?; + Ok(pretty_format_batches(&batches)?.to_string()) +} + +/// Filters applied to insta snapshots to strip non-deterministic text +/// (tempdir paths, workspace-absolute paths). +fn settings() -> insta::Settings { + let mut settings = insta::Settings::clone_current(); + // Replace absolute paths ending in `foo.parquet` with just `foo.parquet` + // so snapshots are stable across machines and across tempdir runs. + settings.add_filter(r"[^\s\[\]]*/([A-Za-z0-9_\-]+\.parquet)", "$1"); + settings +} + +/// End-to-end: a SQL hash join with a selective WHERE on the build side +/// produces a dynamic filter that's pushed into the probe-side +/// `ParquetSource`. After proto round-trip, the `HashJoinExec`'s dynamic +/// filter Arc and the pushed predicate still share mutable state, so +/// build-side `update()` during execution is visible to the scan and the +/// scan prunes rows. The snapshot shows the plan shape + the dynamic filter +/// expression after execution + `output_rows` on every operator. +#[tokio::test] +async fn hash_join_dynamic_filter_prunes_via_sql() -> datafusion_common::Result<()> { + let config = SessionConfig::new() + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_str("datafusion.explain.analyze_categories", "rows"); + let ctx = SessionContext::new_with_config(config); + let parquet_path = concat!( + env!("CARGO_MANIFEST_DIR"), + "/../core/tests/data/tpch_nation_small.parquet" + ); + ctx.register_parquet("build", parquet_path, ParquetReadOptions::default()) + .await?; + ctx.register_parquet("probe", parquet_path, ParquetReadOptions::default()) + .await?; + + // Self-join with a selective WHERE on the build side. The build side + // reduces to a single row; the dynamic filter derived from it is pushed + // into the probe-side scan. + let sql = "SELECT p.n_name FROM probe p \ + INNER JOIN build b ON p.n_nationkey = b.n_nationkey \ + WHERE b.n_nationkey = 5"; + let output = roundtrip_and_explain_analyze(&ctx, sql).await?; + + settings().bind(|| insta::assert_snapshot!(output, @" + +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, n_nationkey@0)], projection=[n_name@1], metrics=[output_rows=1, output_batches=1, array_map_created_count=1, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] | + | | DataSourceExec: file_groups={1 group: [[tpch_nation_small.parquet]]}, projection=[n_nationkey, n_name], file_type=parquet, predicate=n_nationkey@0 = 5, pruning_predicate=n_nationkey_null_count@2 != row_count@3 AND n_nationkey_min@0 <= 5 AND 5 <= n_nationkey_max@1, required_guarantees=[n_nationkey in (5)], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=19, predicate_cache_inner_records=20, predicate_cache_records=1, scan_efficiency_ratio=10.56% (461/4.36 K)] | + | | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[output_rows=1, output_batches=1, spill_count=0, spilled_rows=0] | + | | DataSourceExec: file_groups={1 group: [[tpch_nation_small.parquet]]}, projection=[n_nationkey], file_type=parquet, predicate=n_nationkey@0 = 5 AND DynamicFilter [ n_nationkey@0 >= 5 AND n_nationkey@0 <= 5 AND n_nationkey@0 IN (SET) ([5]) ], pruning_predicate=n_nationkey_null_count@2 != row_count@3 AND n_nationkey_min@0 <= 5 AND 5 <= n_nationkey_max@1 AND n_nationkey_null_count@2 != row_count@3 AND n_nationkey_max@1 >= 5 AND n_nationkey_null_count@2 != row_count@3 AND n_nationkey_min@0 <= 5 AND n_nationkey_null_count@2 != row_count@3 AND n_nationkey_min@0 <= 5 AND 5 <= n_nationkey_max@1, required_guarantees=[n_nationkey in (5)], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=19, predicate_cache_inner_records=20, predicate_cache_records=2, scan_efficiency_ratio=4.42% (193/4.36 K)] | + | | | + +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + ")); + + Ok(()) +} + +/// End-to-end: an `ORDER BY ... LIMIT 1` over two single-row parquet files +/// (`a.parquet` key=1, `b.parquet` key=2). With `target_partitions=1`, both +/// files are read sequentially; after `a.parquet` TopK's filter tightens and +/// `b.parquet` gets pruned by row-group statistics — the scan never yields +/// b's row. The snapshot captures the updated dynamic filter expression, the +/// single emitted row, and the `row_groups_pruned` metric. +#[tokio::test] +async fn topk_dynamic_filter_proto_round_trip() -> datafusion_common::Result<()> { + let tmp = tempfile::TempDir::new()?; + let tmp_path = tmp.path().to_str().unwrap(); + + let config = SessionConfig::new() + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_str("datafusion.explain.analyze_categories", "rows") + .with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + // Write two single-row parquet files and register the directory as a + // listing table — all via SQL. + exec(&ctx, &format!( + "COPY (SELECT 1 AS id, 'foo' AS name) TO '{tmp_path}/a.parquet' STORED AS PARQUET" + )) + .await?; + exec(&ctx, &format!( + "COPY (SELECT 2 AS id, 'bar' AS name) TO '{tmp_path}/b.parquet' STORED AS PARQUET" + )) + .await?; + exec(&ctx, &format!( + "CREATE EXTERNAL TABLE t (id bigint, name text) STORED AS PARQUET LOCATION '{tmp_path}'" + )) + .await?; + + let sql = "SELECT name FROM t ORDER BY id ASC LIMIT 1"; + let output = roundtrip_and_explain_analyze(&ctx, sql).await?; + + settings().bind(|| insta::assert_snapshot!(output, @" + +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | Plan with Metrics | ProjectionExec: expr=[name@0 as name], metrics=[output_rows=1, output_batches=1] | + | | SortExec: TopK(fetch=1), expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false], filter=[id@1 < 1], metrics=[output_rows=1, output_batches=1, row_replacements=1] | + | | DataSourceExec: file_groups={1 group: [[a.parquet, b.parquet]]}, projection=[name, id], file_type=parquet, predicate=DynamicFilter [ id@1 < 1 ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 1, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=2 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=2, files_processed=2, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=0% (0/742)] | + | | | + +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + ")); + Ok(()) +}