Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ members = [
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
"datafusion/tests",
"datafusion/wasmtest",
"datafusion-cli",
"datafusion-examples",
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2755,8 +2755,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
);
}

// Not portable to sqllogictest: asserts on `HashJoinExec::dynamic_filter_for_test().is_used()`
// which is a debug-only API. The observable behavior (probe-side scan
// Not portable to sqllogictest: asserts on `HashJoinExec::dynamic_filter().is_used()`
// which is a Rust API. The observable behavior (probe-side scan
// receiving the dynamic filter when the data source supports it) is
// already covered by the simpler CollectLeft port in push_down_filter_parquet.slt;
// the with_support(false) branch has no SQL analog (parquet always supports
Expand Down Expand Up @@ -2835,7 +2835,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {

// Verify that a dynamic filter was created
let dynamic_filter = hash_join
.dynamic_filter_for_test()
.dynamic_filter()
.expect("Dynamic filter should be created");

// Verify that is_used() returns the expected value based on probe side support.
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,21 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
0
}

/// Return a stable, globally-unique identifier for this `PhysicalExpr`, if it has one.
///
/// Expressions that carry shared mutable state (e.g. `DynamicFilterPhysicalExpr`)
/// use this to preserve their identity across proto serialize/deserialize round-trips
/// so that multiple references in a plan continue to point at the same state.
///
/// The id must be stable across [`PhysicalExpr::with_new_children`] — it follows the
/// underlying shared state, not the outer wrapper.
///
/// Default is `None`: the expression has no identity worth preserving across a
/// serialization boundary, and consumers should fall back to structural equality.
fn expression_id(&self) -> Option<u64> {
None
}

/// Returns true if the expression node is volatile, i.e. whether it can return
/// different results when evaluated multiple times with the same input.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ indexmap = { workspace = true }
itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
petgraph = "0.8.3"
rand = { workspace = true }
recursive = { workspace = true, optional = true }
tokio = { workspace = true }
half = { workspace = true }
Expand All @@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
criterion = { workspace = true }
datafusion-functions = { workspace = true }
insta = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }

[[bench]]
Expand Down
128 changes: 115 additions & 13 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ impl FilterState {
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
///
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
/// Stable identifier shared by all wrappers that observe the same `inner`.
/// Generated once at construction; copied verbatim through `with_new_children`
/// so that every `Arc<DynamicFilterPhysicalExpr>` pointing at the same mutable
/// state reports the same id.
///
/// Kept on the outer struct (rather than inside `Inner`) so that
/// `expression_id()` is a pure field read and does not require acquiring the
/// `inner` RwLock — the id never changes after construction anyway.
id: u64,
/// The original children of this PhysicalExpr, if any.
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
/// and later remapped to the actual expressions that are being filtered.
Expand Down Expand Up @@ -89,16 +97,6 @@ struct Inner {
}

impl Inner {
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
// Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
// This is not currently used anywhere but it seems useful to have this simple distinction.
generation: 1,
expr,
is_complete: false,
}
}

/// Clone the inner expression.
fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
Expand Down Expand Up @@ -137,6 +135,23 @@ impl Display for DynamicFilterPhysicalExpr {
}
}

impl std::fmt::Debug for DynamicFilterPhysicalExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// `id` is a random per-instance value; including it in Debug output
// makes plan snapshots nondeterministic across runs and across
// proto round-trips (receiver reconstructs the filter with the same
// id but the Debug representation would still differ from any freshly
// constructed filter on another machine). Skip it here.
f.debug_struct("DynamicFilterPhysicalExpr")
.field("children", &self.children)
.field("remapped_children", &self.remapped_children)
.field("inner", &self.inner)
.field("data_type", &self.data_type)
.field("nullable", &self.nullable)
.finish_non_exhaustive()
}
}

impl DynamicFilterPhysicalExpr {
/// Create a new [`DynamicFilterPhysicalExpr`]
/// from an initial expression and a list of children.
Expand Down Expand Up @@ -171,15 +186,70 @@ impl DynamicFilterPhysicalExpr {
) -> Self {
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
Self {
id: rand::random(),
children,
remapped_children: None, // Initially no remapped children
inner: Arc::new(RwLock::new(Inner::new(inner))),
remapped_children: None,
inner: Arc::new(RwLock::new(Inner {
generation: 1,
expr: inner,
is_complete: false,
})),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}

/// Override this filter's [`Self::expression_id`].
///
/// Typically used on the deserialize side of a proto round-trip so a
/// reconstructed wrapper keeps the same identity as the sender's, letting
/// multiple call sites share mutable state via the id cache.
pub fn with_id(mut self, id: u64) -> Self {
self.id = id;
self
}

/// Override the initial generation counter. Intended for the deserialize
/// side of a proto round-trip, to rehydrate the sender's generation so
/// later local [`Self::update`] calls keep a monotonic sequence.
///
/// Only safe to call on a freshly-constructed filter that hasn't been
/// shared yet — use [`Self::update`] for live mutation of a shared filter.
pub fn with_generation(self, generation: u64) -> Self {
let is_complete = {
let mut inner = self.inner.write();
inner.generation = generation;
inner.is_complete
};
let _ = self.state_watch.send(if is_complete {
FilterState::Complete { generation }
} else {
FilterState::InProgress { generation }
});
self
}

/// Override the completion flag. Intended for the deserialize side of a
/// proto round-trip, to rehydrate a sender that had already called
/// [`Self::mark_complete`] before serialization.
///
/// Only safe to call on a freshly-constructed filter that hasn't been
/// shared yet — use [`Self::mark_complete`] for live mutation.
pub fn with_is_complete(self, is_complete: bool) -> Self {
let generation = {
let mut inner = self.inner.write();
inner.is_complete = is_complete;
inner.generation
};
let _ = self.state_watch.send(if is_complete {
FilterState::Complete { generation }
} else {
FilterState::InProgress { generation }
});
self
}

fn remap_children(
children: &[Arc<dyn PhysicalExpr>],
remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
Expand Down Expand Up @@ -214,6 +284,12 @@ impl DynamicFilterPhysicalExpr {
self.inner.read().generation
}

/// Whether [`Self::mark_complete`] has been called on this filter's shared
/// state.
pub fn is_complete(&self) -> bool {
self.inner.read().is_complete
}

/// Get the current expression.
/// This will return the current expression with any children
/// remapped to match calls to [`PhysicalExpr::with_new_children`].
Expand Down Expand Up @@ -362,6 +438,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Self {
id: self.id,
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
Expand Down Expand Up @@ -444,6 +521,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
// Return the current generation of the expression.
self.inner.read().generation
}

fn expression_id(&self) -> Option<u64> {
Some(self.id)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -861,4 +942,25 @@ mod test {
"Hash should be stable after update (identity-based)"
);
}

/// `expression_id` identifies the shared `inner` state, so it must be
/// identical on any wrapper produced by `with_new_children` — even one
/// whose `children()` view has been remapped. This is what lets proto
/// serialization link the two sides after deserialization.
#[test]
fn test_expression_id_stable_across_with_new_children() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![col("a", &schema).unwrap()],
lit(true) as Arc<dyn PhysicalExpr>,
));
let id = filter
.expression_id()
.expect("dynamic filter must have an id");

let remapped = Arc::clone(&filter)
.with_new_children(vec![col("a", &schema).unwrap()])
.expect("with_new_children should succeed");
assert_eq!(remapped.expression_id(), Some(id));
}
}
28 changes: 16 additions & 12 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,17 @@ impl HashJoinExecBuilder {
})
}

fn with_dynamic_filter(mut self, filter: Option<HashJoinExecDynamicFilter>) -> Self {
self.exec.dynamic_filter = filter;
/// Attach a pre-existing [`DynamicFilterPhysicalExpr`] to this
/// `HashJoinExec`. The filter's shared mutable state is preserved; the
/// build-side update coordinator is reinitialized per execution.
///
/// Callers typically get this filter from a proto round-trip so that the
/// `HashJoinExec` and any pushed-down scan site observe the same `inner`.
pub fn with_dynamic_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
self.exec.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter,
build_accumulator: OnceLock::new(),
});
self
}
}
Expand Down Expand Up @@ -902,12 +911,10 @@ impl HashJoinExec {
self.null_equality
}

/// Get the dynamic filter expression for testing purposes.
/// Returns `None` if no dynamic filter has been set.
///
/// This method is intended for testing only and should not be used in production code.
#[doc(hidden)]
pub fn dynamic_filter_for_test(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
/// The [`DynamicFilterPhysicalExpr`] this join maintains at runtime, if
/// any. Updated on build completion and observed by consumers that had
/// the filter pushed down into them.
pub fn dynamic_filter(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}

Expand Down Expand Up @@ -1674,10 +1681,7 @@ impl ExecutionPlan for HashJoinExec {
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
let new_node = self
.builder()
.with_dynamic_filter(Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
}))
.with_dynamic_filter(dynamic_filter)
.build_exec()?;
result = result.with_updated_node(new_node);
}
Expand Down
19 changes: 19 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,25 @@ impl SortExec {
}
}

/// Install a pre-existing [`DynamicFilterPhysicalExpr`] as this sort's
/// TopK dynamic filter, replacing any auto-created one.
///
/// Used on the deserialize side of a proto round-trip so the `SortExec`
/// and any pushed-down scan site observe the same `inner`. Only
/// meaningful when combined with [`Self::with_fetch`]; the filter is
/// consulted during TopK execution.
pub fn with_dynamic_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter))));
self
}

/// Return this sort's TopK dynamic filter, if any. Set by
/// [`Self::with_fetch`] (auto-created) or [`Self::with_dynamic_filter`]
/// (caller-supplied).
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
self.filter.as_ref().map(|f| f.read().expr())
}

/// Modify how many rows to include in the result
///
/// If None, then all rows will be returned, in sorted order.
Expand Down
Loading
Loading