From dc683d388d45e3d12e6b9dd29c206806b93f1226 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 17 Feb 2026 21:48:32 +0000 Subject: [PATCH 1/3] proto: serialize and dedupe dynamic filters Informs: https://github.com/datafusion-contrib/datafusion-distributed/issues/180 Closes: https://github.com/apache/datafusion/issues/20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Adding a new concept, a `PhysicalExprId`, which has two identifiers, a "shallow" identifier to indicate two equal expressions which may have different children, and an "exact" identifier to indicate two exprs that are exactly the same. 4. Updating the deduping deserializer and protos to now be aware of the new "shallow" id, deduping exprs which are the same but have different children accordingly. This change adds tests which roundtrip dynamic filters and assert that referential integrity is maintained. --- .../physical-expr-common/src/physical_expr.rs | 46 ++- .../src/expressions/dynamic_filters.rs | 232 +++++++++++++- .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/proto/proto/datafusion.proto | 26 +- datafusion/proto/src/generated/pbjson.rs | 202 ++++++++++++ datafusion/proto/src/generated/prost.rs | 28 +- .../proto/src/physical_plan/from_proto.rs | 45 +++ datafusion/proto/src/physical_plan/mod.rs | 94 +++--- .../proto/src/physical_plan/to_proto.rs | 65 +++- .../tests/cases/roundtrip_physical_plan.rs | 291 +++++++++++++++++- 10 files changed, 965 insertions(+), 66 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a42a1560cb769..4fad4a09f5088 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Display, Formatter}; -use std::hash::{Hash, Hasher}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use crate::utils::scatter; @@ -438,6 +438,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression + /// is dropped, then the returned id is no longer valid. + fn expr_id(self: Arc, salt: &[u64]) -> Option { + Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None)) + } +} + +/// A composite identifier for [`PhysicalExpr`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct PhysicalExprId { + exact: u64, + shallow: Option, +} + +impl PhysicalExprId { + /// Create a new [`PhysicalExprId`]. Both ids must be globally unique within + /// a process. + pub fn new(exact: u64, shallow: Option) -> Self { + Self { exact, shallow } + } + + /// Returns the identifier for the full expression tree, including children. + pub fn exact(&self) -> u64 { + self.exact + } + + /// Returns the identifier for just the expression root, ignoring children. + pub fn shallow(&self) -> Option { + self.shallow + } +} + +/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes +/// the [`Arc`] pointer to create a process-local identifier that remains valid +/// only while that allocation is still alive. +pub fn expr_id_from_arc(expr: &Arc, salt: &[u64]) -> u64 { + let mut hasher = DefaultHasher::new(); + let ptr = Arc::as_ptr(expr) as *const () as u64; + ptr.hash(&mut hasher); + for &salt in salt { + salt.hash(&mut hasher); + } + hasher.finish() } #[deprecated( diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..b5857cc1fbbef 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -26,7 +26,9 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::DynHash; +use datafusion_physical_expr_common::physical_expr::{ + DynHash, PhysicalExprId, expr_id_from_arc, +}; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -88,6 +90,118 @@ struct Inner { is_complete: bool, } +/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during +/// serialization / deserialization. +pub struct DynamicFilterSnapshot { + children: Vec>, + remapped_children: Option>>, + // Inner state. + generation: u64, + inner_expr: Arc, + is_complete: bool, +} + +impl DynamicFilterSnapshot { + pub fn new( + children: Vec>, + remapped_children: Option>>, + generation: u64, + inner_expr: Arc, + is_complete: bool, + ) -> Self { + Self { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } + } + + pub fn children(&self) -> &[Arc] { + &self.children + } + + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + pub fn generation(&self) -> u64 { + self.generation + } + + pub fn inner_expr(&self) -> &Arc { + &self.inner_expr + } + + pub fn is_complete(&self) -> bool { + self.is_complete + } +} + +impl Display for DynamicFilterSnapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}", + self.children, + self.remapped_children, + self.generation, + self.inner_expr, + self.is_complete + ) + } +} + +impl From for DynamicFilterPhysicalExpr { + fn from(snapshot: DynamicFilterSnapshot) -> Self { + let DynamicFilterSnapshot { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } = snapshot; + + let state = if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + inner: Arc::new(RwLock::new(Inner { + generation, + expr: inner_expr, + is_complete, + })), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } +} + +impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot { + fn from(expr: &DynamicFilterPhysicalExpr) -> Self { + // Snapshot everything in the mutex atomically. + let (generation, inner_expr, is_complete) = { + let inner = expr.inner.read(); + (inner.generation, Arc::clone(&inner.expr), inner.is_complete) + }; + DynamicFilterSnapshot { + children: expr.children.clone(), + remapped_children: expr.remapped_children.clone(), + generation, + inner_expr, + is_complete, + } + } +} + impl Inner { fn new(expr: Arc) -> Self { Self { @@ -444,6 +558,15 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expr_id(self: Arc, salt: &[u64]) -> Option { + Some(PhysicalExprId::new( + // Capture the outer arc, which contains children and the expr. + expr_id_from_arc(&self, salt), + // Capture the inner arc, which contains the expr only. + Some(expr_id_from_arc(&self.inner, salt)), + )) + } } #[cfg(test)] @@ -861,4 +984,111 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + #[test] + fn test_current_snapshot_roundtrip() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + // Create a dynamic filter with children + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )); + let filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + expr as Arc, + ); + + // Update expression and mark complete + filter + .update(lit(42) as Arc) + .expect("Update should succeed"); + filter.mark_complete(); + + // Change the children of the expr. + let reassigned_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let reassigned = reassign_expr_columns( + Arc::new(filter) as Arc, + &reassigned_schema, + ) + .expect("reassign_expr_columns should succeed"); + let reassigned = reassigned + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter after reassignment"); + + // Take a snapshot and reconstruct + let snapshot = DynamicFilterSnapshot::from(reassigned); + let reconstructed = DynamicFilterPhysicalExpr::from(snapshot); + + // Assert snapshots are equal. + assert_eq!( + DynamicFilterSnapshot::from(reassigned).to_string(), + DynamicFilterSnapshot::from(&reconstructed).to_string(), + ); + } + + #[tokio::test] + async fn test_expr_id() { + let source_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &source_schema).unwrap(); + + // Create a source filter + let source = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + )); + let source_clone = Arc::clone(&source); + + // Create a derived filter by reassigning the source filter to a different schema. + let derived_schema = Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let derived = reassign_expr_columns( + Arc::clone(&source) as Arc, + &derived_schema, + ) + .expect("reassign_expr_columns should succeed"); + + let derived_expr_id = Arc::clone(&derived) + .expr_id(&[]) + .expect("combined filter should have an expr_id"); + let source_expr_id = Arc::clone(&source) + .expr_id(&[]) + .expect("source filter should have an expr_id"); + let source_clone_expr_id = Arc::clone(&source_clone) + .expr_id(&[]) + .expect("source clone should have an expr_id"); + + assert_eq!( + source_clone_expr_id.exact(), + source_expr_id.exact(), + "cloned filter should have the same exact id because the children are the same", + ); + + assert_eq!( + source_clone_expr_id.shallow(), + source_expr_id.shallow(), + "cloned filter should have the same shallow id because the exprs are the same", + ); + + assert_ne!( + derived_expr_id.exact(), + source_expr_id.exact(), + "filters should have different exact ids because the children are different", + ); + + assert_eq!( + derived_expr_id.shallow(), + source_expr_id.shallow(), + "filters should have the same shallow id because the exprs are the same", + ); + } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 1e082355486f6..b40848828fa41 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c61226fb526f6..d5dae0f4cab79 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -870,18 +870,18 @@ message PhysicalExtensionNode { repeated PhysicalPlanNode inputs = 2; } -// physical expressions message PhysicalExprNode { // Was date_time_interval_expr reserved 17; - // Unique identifier for this expression to do deduplication during deserialization. - // When serializing, this is set to a unique identifier for each combination of - // expression, process and serialization run. - // When deserializing, if this ID has been seen before, the cached Arc is returned - // instead of creating a new one, enabling reconstruction of referential integrity - // across serde roundtrips. + // Unique identifiers for this expression used during deserialization to restore + // referential integrity across serde roundtrips. + // + // expr_id: if two exprs have the same expr_id, they are identical (including children) + // shallow_expr_id: if two exprs have the same shallow_expr_id, they are identical but may + // have different children optional uint64 expr_id = 30; + optional uint64 shallow_expr_id = 31; oneof ExprType { // column references @@ -920,9 +920,19 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterNode dynamic_filter = 22; } } +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1477,4 +1487,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 82bcdac898204..3075c3c9342f4 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16531,6 +16531,172 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16542,6 +16708,9 @@ impl serde::Serialize for PhysicalExprNode { if self.expr_id.is_some() { len += 1; } + if self.shallow_expr_id.is_some() { + len += 1; + } if self.expr_type.is_some() { len += 1; } @@ -16551,6 +16720,11 @@ impl serde::Serialize for PhysicalExprNode { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("exprId", ToString::to_string(&v).as_str())?; } + if let Some(v) = self.shallow_expr_id.as_ref() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("shallowExprId", ToString::to_string(&v).as_str())?; + } if let Some(v) = self.expr_type.as_ref() { match v { physical_expr_node::ExprType::Column(v) => { @@ -16610,6 +16784,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16624,6 +16801,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { const FIELDS: &[&str] = &[ "expr_id", "exprId", + "shallow_expr_id", + "shallowExprId", "column", "literal", "binary_expr", @@ -16656,11 +16835,14 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { ExprId, + ShallowExprId, Column, Literal, BinaryExpr, @@ -16680,6 +16862,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16702,6 +16885,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { { match value { "exprId" | "expr_id" => Ok(GeneratedField::ExprId), + "shallowExprId" | "shallow_expr_id" => Ok(GeneratedField::ShallowExprId), "column" => Ok(GeneratedField::Column), "literal" => Ok(GeneratedField::Literal), "binaryExpr" | "binary_expr" => Ok(GeneratedField::BinaryExpr), @@ -16721,6 +16905,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16741,6 +16926,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { V: serde::de::MapAccess<'de>, { let mut expr_id__ = None; + let mut shallow_expr_id__ = None; let mut expr_type__ = None; while let Some(k) = map_.next_key()? { match k { @@ -16752,6 +16938,14 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::ShallowExprId => { + if shallow_expr_id__.is_some() { + return Err(serde::de::Error::duplicate_field("shallowExprId")); + } + shallow_expr_id__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } GeneratedField::Column => { if expr_type__.is_some() { return Err(serde::de::Error::duplicate_field("column")); @@ -16883,12 +17077,20 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } } Ok(PhysicalExprNode { expr_id: expr_id__, + shallow_expr_id: shallow_expr_id__, expr_type: expr_type__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ab60c3058dbde..4ea1a77bfbfd3 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1304,20 +1304,17 @@ pub struct PhysicalExtensionNode { #[prost(message, repeated, tag = "2")] pub inputs: ::prost::alloc::vec::Vec, } -/// physical expressions #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { - /// Unique identifier for this expression to do deduplication during deserialization. - /// When serializing, this is set to a unique identifier for each combination of - /// expression, process and serialization run. - /// When deserializing, if this ID has been seen before, the cached Arc is returned - /// instead of creating a new one, enabling reconstruction of referential integrity - /// across serde roundtrips. + /// Unique identifier for this expression used during deserialization to restore + /// referential integrity across serde roundtrips. #[prost(uint64, optional, tag = "30")] pub expr_id: ::core::option::Option, + #[prost(uint64, optional, tag = "31")] + pub shallow_expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1370,9 +1367,24 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 61665db607757..2a122e8c9b974 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,6 +58,9 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -524,6 +527,48 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::DynamicFilter(dynamic_filter) => { + let children = parse_physical_exprs( + &dynamic_filter.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some(parse_physical_exprs( + &dynamic_filter.remapped_children, + ctx, + input_schema, + codec, + proto_converter, + )?) + } else { + None + }; + + let inner_expr = parse_required_physical_expr( + dynamic_filter.inner_expr.as_deref(), + ctx, + "inner_expr", + input_schema, + codec, + proto_converter, + )?; + + // Recreate filter from snapshot + let snapshot = DynamicFilterSnapshot::new( + children, + remapped_children, + dynamic_filter.generation, + inner_expr, + dynamic_filter.is_complete, + ); + let base_filter: Arc = + Arc::new(DynamicFilterPhysicalExpr::from(snapshot)); + base_filter + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..bd8ec7664ffa6 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use arrow::compute::SortOptions; @@ -58,6 +57,7 @@ use datafusion_functions_table::generate_series::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; +use datafusion_physical_expr_common::physical_expr::PhysicalExprId; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, }; @@ -3098,6 +3098,7 @@ impl protobuf::PhysicalPlanNode { }); Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3184,6 +3185,7 @@ impl protobuf::PhysicalPlanNode { }); Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3791,6 +3793,13 @@ struct DataEncoderTuple { } pub struct DefaultPhysicalProtoConverter; + +fn from_proto_expr_id(proto: &protobuf::PhysicalExprNode) -> Option { + proto + .expr_id + .map(|exact| PhysicalExprId::new(exact, proto.shallow_expr_id)) +} + impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, @@ -3839,10 +3848,10 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. +/// Internal serializer that makes distinct expr_ids for each serialization session. struct DeduplicatingSerializer { - /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. + /// Random salt for this serializer which gets hashed with expression ids to create + /// unique ids for this serialization session. session_id: u64, } @@ -3898,17 +3907,10 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { codec: &dyn PhysicalExtensionCodec, ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); - + if let Some(expr_id) = Arc::clone(expr).expr_id(&[self.session_id]) { + proto.expr_id = Some(expr_id.exact()); + proto.shallow_expr_id = expr_id.shallow(); + } Ok(proto) } } @@ -3917,7 +3919,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { /// Created fresh for each deserialization operation. #[derive(Default)] struct DeduplicatingDeserializer { - /// Cache mapping expr_id to deserialized expressions. + /// Cache mapping exact and shallow expr ids to deserialized expressions. cache: RefCell>>, } @@ -3952,24 +3954,40 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); + let expr_id = from_proto_expr_id(proto); + + // The expressions are exactly the same, so return the cached expr. + if let Some(expr_id) = expr_id.as_ref() + && let Some(cached) = self.cache.borrow().get(&expr_id.exact()) + { + return Ok(Arc::clone(cached)); + } + + // Cache miss, we must deserialize the expr. + let mut expr = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + + if let Some(expr_id) = expr_id { + if let Some(shallow_id) = expr_id.shallow() { + if let Some(cached_expr) = self.cache.borrow().get(&shallow_id) { + // Cache hit on shallow id. Re-use the cached expr with children from the + // deserialized expr. + let children = expr.children().into_iter().cloned().collect(); + expr = Arc::clone(cached_expr).with_new_children(children)?; + } else { + // Cache miss on shallow id. Cache the expr. + self.cache + .borrow_mut() + .insert(shallow_id, Arc::clone(&expr)); + } } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( - proto, - ctx, - input_schema, - codec, - self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + + self.cache + .borrow_mut() + .insert(expr_id.exact(), Arc::clone(&expr)); } + + Ok(expr) } fn physical_expr_to_proto( @@ -3984,13 +4002,13 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { /// A proto converter that adds expression deduplication during serialization /// and deserialization. /// -/// During serialization, each expression's Arc pointer address is XORed with a -/// random session_id to create a salted `expr_id`. This prevents cross-process -/// collisions when serialized plans are merged. +/// During serialization, each expression's identifier is salted with a random +/// session_id. This prevents cross-process collisions when serialized plans are merged. /// -/// During deserialization, expressions with the same `expr_id` share the same -/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large -/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. +/// During deserialization, expressions with the same `expr_id` are reconstructed +/// according to the id variant, reducing memory usage for plans with duplicate +/// expressions (e.g., large IN lists) and preserving referential integrity for +/// [`DynamicFilterPhysicalExpr`] instances. /// /// This converter is stateless - it creates internal serializers/deserializers /// on demand for each operation. diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f0eb6d27aac30..8556df6b19347 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -33,11 +33,11 @@ use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, + DynamicFilterSnapshot, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, + NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -72,6 +72,7 @@ pub fn serialize_physical_aggr_expr( codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -257,10 +258,7 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let expr = snapshot_physical_expr(Arc::clone(value))?; - + let expr = value.as_ref(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -282,6 +280,7 @@ pub fn serialize_physical_expr_with_converter( }; return Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), }); } @@ -289,6 +288,7 @@ pub fn serialize_physical_expr_with_converter( if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( protobuf::PhysicalColumn { name: expr.name().to_string(), @@ -299,6 +299,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( protobuf::UnknownColumn { name: expr.name().to_string(), @@ -341,6 +342,7 @@ pub fn serialize_physical_expr_with_converter( Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr( binary_expr, )), @@ -348,6 +350,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some( protobuf::physical_expr_node::ExprType::Case( Box::new( @@ -391,6 +394,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { expr: Some(Box::new( @@ -402,6 +406,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { expr: Some(Box::new( @@ -413,6 +418,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { expr: Some(Box::new( @@ -424,6 +430,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { expr: Some(Box::new( @@ -437,6 +444,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { expr: Some(Box::new( @@ -448,6 +456,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), @@ -455,6 +464,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { expr: Some(Box::new( @@ -467,6 +477,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { expr: Some(Box::new( @@ -481,6 +492,7 @@ pub fn serialize_physical_expr_with_converter( codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), @@ -498,6 +510,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( protobuf::PhysicalLikeExprNode { negated: expr.negated(), @@ -514,6 +527,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( protobuf::PhysicalHashExprNode { on_columns: serialize_physical_exprs( @@ -526,6 +540,42 @@ pub fn serialize_physical_expr_with_converter( }, )), }) + } else if let Some(df) = expr.downcast_ref::() { + // Capture all state atomically. + let snapshot = DynamicFilterSnapshot::from(df); + + let children = snapshot + .children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let remapped_children = if let Some(remapped) = snapshot.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + + let inner_expr = Box::new( + proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?, + ); + + Ok(protobuf::PhysicalExprNode { + expr_id: None, + shallow_expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: snapshot.generation(), + inner_expr: Some(inner_expr), + is_complete: snapshot.is_complete(), + }), + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) { @@ -537,6 +587,7 @@ pub fn serialize_physical_expr_with_converter( .collect::>()?; Ok(protobuf::PhysicalExprNode { expr_id: None, + shallow_expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( protobuf::PhysicalExtensionExprNode { expr: buf, inputs }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3f1c306603bc1..b164925afa9df 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -128,6 +128,11 @@ use crate::cases::{ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode, }; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; +use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr_common::physical_expr::PhysicalExprId; /// Perform a serde roundtrip and assert that the string representation of the before and after plans /// are identical. Note that this often isn't sufficient to guarantee that no information is @@ -2824,6 +2829,7 @@ fn test_backward_compatibility_no_expr_id() -> Result<()> { // Manually create a proto without expr_id set let proto = PhysicalExprNode { expr_id: None, // Simulating old proto without this field + shallow_expr_id: None, expr_type: Some( datafusion_proto::protobuf::physical_expr_node::ExprType::Column( datafusion_proto::protobuf::PhysicalColumn { @@ -3015,6 +3021,287 @@ fn test_deduplication_within_expr_deserialization() -> Result<()> { Ok(()) } +/// Create a [`DynamicFilterPhysicalExpr`] with child column expression "a" @ index 0. +fn make_dynamic_filter() -> Arc { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc +} + +/// Update a [`DynamicFilterPhysicalExpr`]'s children to support child schema "b" @ 0, "a" @ 1. +fn make_reassigned_dynamic_filter( + filter: Arc, +) -> Result<(Arc, Arc)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let reassigned = reassign_expr_columns(filter, &schema)?; + Ok((schema, reassigned)) +} + +/// Extract a [`PhysicalExprId`] from a [`PhysicalExpr`] proto. +fn proto_expr_id(expr: &PhysicalExprNode) -> PhysicalExprId { + let expr_id = expr.expr_id.expect("physical expr should have expr_id"); + PhysicalExprId::new(expr_id, expr.shallow_expr_id) +} + +/// Roundtrip a single physical expression shaped like so: +/// +/// ```text +/// BinaryExpr(AND) +/// / \ +/// filter_expr_1 filter_expr_2 +/// ``` +/// +/// Returns filter_expr_1 and filter_expr_2 after deserialization. +fn roundtrip_dynamic_filter_expr_pair( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<(Arc, Arc)> { + let pair_expr = Arc::new(BinaryExpr::new( + Arc::clone(&filter_expr_1), + Operator::And, + Arc::clone(&filter_expr_2), + )) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.physical_expr_to_proto(&pair_expr, &codec)?; + let ctx = SessionContext::new(); + let deserialized_expr = converter.proto_to_physical_expr( + &proto, + ctx.task_ctx().as_ref(), + &schema, + &codec, + )?; + + let binary = deserialized_expr + .downcast_ref::() + .expect("Expected BinaryExpr"); + + Ok((Arc::clone(binary.left()), Arc::clone(binary.right()))) +} + +/// Roundtrip an execution plan shaped like so: +/// +/// ```text +/// FilterExec(dynamic_filter_1 on a@0) +/// ProjectionExec(a := Column("a", source_index)) +/// DataSourceExec +/// ParquetSource(predicate = dynamic_filter_2) +/// ``` +/// +/// `dynamic_filter_1` and `dynamic_filter_2` are the same dynamic filter, except with +/// different children. +/// +/// Returns +/// - `dynamic_filter_1` before serialization +/// - `dynamic_filter_2` before serialization +/// - `dynamic_filter_1` after serialization +/// - `dynamic_filter_2` after serialization +fn roundtrip_dynamic_filter_plan_pair() -> Result<( + Arc, + Arc, + Arc, + Arc, +)> { + let filter_expr_1 = make_dynamic_filter(); + let (data_source_schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + let left_before = Arc::clone(&filter_expr_1); + let right_before = Arc::clone(&filter_expr_2); + let file_source = Arc::new( + ParquetSource::new(Arc::clone(&data_source_schema)) + .with_predicate(Arc::clone(&filter_expr_2)), + ); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .build(); + let data_source_exec = + DataSourceExec::from_data_source(scan_config) as Arc; + + let projection_exec = Arc::new(ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 1)) as Arc, + alias: "a".to_string(), + }], + data_source_exec, + )?) as Arc; + let filter_exec = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_1), + projection_exec, + )?) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.execution_plan_to_proto(&filter_exec, &codec)?; + + let ctx = SessionContext::new(); + let deserialized_plan = + converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; + + let outer_filter = deserialized_plan + .downcast_ref::() + .expect("Expected outer FilterExec"); + let left_filter = Arc::clone(outer_filter.predicate()); + let projection = outer_filter.children()[0] + .downcast_ref::() + .expect("Expected ProjectionExec"); + let data_source = projection + .input() + .downcast_ref::() + .expect("Expected DataSourceExec"); + let scan_config = data_source + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let right_filter = scan_config + .file_source() + .filter() + .expect("Expected pushed-down predicate"); + + Ok((left_before, right_before, left_filter, right_filter)) +} + +/// Takes two [`DynamicFilterPhysicalExpr`] and asserts that updates to one are visible +/// via the other. This helps assert that referential integrity is maintained after +/// deserializing. +fn assert_dynamic_filter_update_is_visible( + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + let left_filter = left_filter + .downcast_ref::() + .expect("Expected dynamic filter"); + let right_filter = right_filter + .downcast_ref::() + .expect("Expected dynamic filter"); + + // Sanity check that the filters have the same generation. + let original_generation = left_filter.snapshot_generation(); + assert_eq!(original_generation, right_filter.snapshot_generation(),); + + left_filter.update(lit(123_i64))?; + + // Assert that both generations updated. + assert_eq!(original_generation + 1, right_filter.snapshot_generation(),); + assert_eq!( + left_filter.snapshot_generation(), + right_filter.snapshot_generation(), + ); + + // Ensure both filters have the updated expr. + let expected_current = r#"Literal { value: Int64(123), field: Field { name: "lit", data_type: Int64 } }"#; + assert_eq!(expected_current, format!("{:?}", left_filter.current()?),); + assert_eq!(expected_current, format!("{:?}", right_filter.current()?),); + + Ok(()) +} + +fn assert_dynamic_filter_snapshot_matches( + expected: &Arc, + actual: &Arc, +) { + let expected = expected + .downcast_ref::() + .expect("Expected dynamic filter"); + let actual = actual + .downcast_ref::() + .expect("Expected dynamic filter"); + + assert_eq!( + DynamicFilterSnapshot::from(expected).to_string(), + DynamicFilterSnapshot::from(actual).to_string(), + ); +} + +// Two clones of a dynamic filter expression should be deduped to the exact same expression. +#[test] +fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let filter_expr_1 = make_dynamic_filter(); + let filter_expr_2 = Arc::clone(&filter_expr_1); + + let (filter_expr_1_after_roundtrip, filter_expr_2_after_roundtrip) = + roundtrip_dynamic_filter_expr_pair( + Arc::clone(&filter_expr_1), + Arc::clone(&filter_expr_2), + schema, + )?; + + // Assert the filters are not modified during roundtrip. + assert_dynamic_filter_snapshot_matches( + &filter_expr_1, + &filter_expr_1_after_roundtrip, + ); + assert_dynamic_filter_snapshot_matches( + &filter_expr_2, + &filter_expr_2_after_roundtrip, + ); + assert_dynamic_filter_snapshot_matches( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + ); + + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + )?; + + Ok(()) +} + +/// Rountrip test for an execution plan where there are multiple instances of a dynamic filter +/// with different children. +#[test] +fn test_dynamic_filter_plan_roundtrip_dedupe_shallow_expr_id() -> Result<()> { + let ( + filter_expr_1, + filter_expr_2, + filter_expr_1_after_roundtrip, + filter_expr_2_after_roundtrip, + ) = roundtrip_dynamic_filter_plan_pair()?; + + // Assert the filters are not modified during roundtrip. + // + // There's a small technicality that `filter_expr_1` is rewritten to an eqivalent expression + // during deserialization, so we capture that here by calling + // `filter.with_new_children(filter.children)`. + let filter_expr_1_children = Arc::clone(&filter_expr_1) + .children() + .iter() + .map(|child| Arc::clone(child)) + .collect::>(); + let filter_expr_1 = filter_expr_1 + .clone() + .with_new_children(filter_expr_1_children) + .unwrap(); + assert_dynamic_filter_snapshot_matches( + &filter_expr_1, + &filter_expr_1_after_roundtrip, + ); + assert_dynamic_filter_snapshot_matches( + &filter_expr_2, + &filter_expr_2_after_roundtrip, + ); + + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + )?; + + Ok(()) +} + /// Test that session_id rotates between top-level serialization operations. /// This verifies that each top-level serialization gets a fresh session_id, /// which prevents cross-process collisions when serialized plans are merged. @@ -3031,13 +3318,13 @@ fn test_session_id_rotation_between_serializations() -> Result<()> { // First serialization let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set"); + let expr_id1 = proto_expr_id(&proto1); // Second serialization with the same converter // The session_id should have rotated, so the expr_id should be different // even though we're serializing the same expression (same pointer address) let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set"); + let expr_id2 = proto_expr_id(&proto2); // The expr_ids should be different because session_id rotated assert_ne!( From b4f84c4598e5af0c770da02ad8a4a6f87553d07f Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 22 Apr 2026 04:31:32 +0000 Subject: [PATCH 2/3] fix ci --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 1 - datafusion/proto/src/generated/prost.rs | 6 +++++- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index b5857cc1fbbef..13cd527dae8dd 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -1018,7 +1018,6 @@ mod test { ) .expect("reassign_expr_columns should succeed"); let reassigned = reassigned - .as_any() .downcast_ref::() .expect("Expected dynamic filter after reassignment"); diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 4ea1a77bfbfd3..9c6456f78bd24 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1306,8 +1306,12 @@ pub struct PhysicalExtensionNode { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { - /// Unique identifier for this expression used during deserialization to restore + /// Unique identifiers for this expression used during deserialization to restore /// referential integrity across serde roundtrips. + /// + /// expr_id: if two exprs have the same expr_id, they are identical (including children) + /// shallow_expr_id: if two exprs have the same shallow_expr_id, they are identical but may + /// have different children #[prost(uint64, optional, tag = "30")] pub expr_id: ::core::option::Option, #[prost(uint64, optional, tag = "31")] diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b164925afa9df..bf7f82359cc66 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3259,7 +3259,7 @@ fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { Ok(()) } -/// Rountrip test for an execution plan where there are multiple instances of a dynamic filter +/// Roundtrip test for an execution plan where there are multiple instances of a dynamic filter /// with different children. #[test] fn test_dynamic_filter_plan_roundtrip_dedupe_shallow_expr_id() -> Result<()> { @@ -3272,7 +3272,7 @@ fn test_dynamic_filter_plan_roundtrip_dedupe_shallow_expr_id() -> Result<()> { // Assert the filters are not modified during roundtrip. // - // There's a small technicality that `filter_expr_1` is rewritten to an eqivalent expression + // There's a small technicality that `filter_expr_1` is rewritten to an equivalent expression // during deserialization, so we capture that here by calling // `filter.with_new_children(filter.children)`. let filter_expr_1_children = Arc::clone(&filter_expr_1) From 244a848da57f9bb828de244fd1bbea3193346146 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 22 Apr 2026 15:35:01 +0000 Subject: [PATCH 3/3] clipp --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bf7f82359cc66..16dba752b61ab 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3102,6 +3102,7 @@ fn roundtrip_dynamic_filter_expr_pair( /// - `dynamic_filter_2` before serialization /// - `dynamic_filter_1` after serialization /// - `dynamic_filter_2` after serialization +#[allow(clippy::type_complexity)] fn roundtrip_dynamic_filter_plan_pair() -> Result<( Arc, Arc,