diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e..eb15f5097f4c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1348,6 +1348,13 @@ config_namespace! { /// closer to the leaf table scans, and push those projections down /// towards the leaf nodes. pub enable_leaf_expression_pushdown: bool, default = true + + /// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that + /// read from the same source and differ only by filter predicates into a single branch + /// with a combined filter. This optimization is conservative and only applies when the + /// branches share the same source and compatible wrapper nodes such as identical + /// projections or aliases. + pub enable_unions_to_filter: bool, default = false } } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index a767526feb93..0822a17f24f1 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -83,3 +83,7 @@ harness = false [[bench]] name = "optimize_projections" harness = false + +[[bench]] +name = "unions_to_filter" +harness = false diff --git a/datafusion/optimizer/benches/unions_to_filter.rs b/datafusion/optimizer/benches/unions_to_filter.rs new file mode 100644 index 000000000000..3f7ef1e58241 --- /dev/null +++ b/datafusion/optimizer/benches/unions_to_filter.rs @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Microbenchmarks for the [`UnionsToFilter`] optimizer rule. +//! +//! Three scenarios are covered: +//! +//! 1. **merge** – N branches over the *same* table, each with a simple +//! equality filter. All branches should be merged into a single +//! `DISTINCT(Filter(OR …))` plan. +//! +//! 2. **no_merge** – N branches over *different* tables. The rule must +//! recognise that no merge is possible and leave the plan unchanged. +//! This exercises the "cold path" without any rewrite work. +//! +//! 3. **merge_with_projection** – N branches over the same table but each +//! branch wraps the filter in a `Projection`. This exercises the wrapper- +//! peeling and re-wrapping paths in addition to the core merge logic. +//! +//! To generate a flamegraph (requires `cargo-flamegraph`): +//! ```text +//! cargo flamegraph -p datafusion-optimizer --bench unions_to_filter \ +//! --flamechart --root --profile profiling --freq 1000 -- --bench +//! ``` + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan}; +use datafusion_expr::{col, lit}; +use datafusion_optimizer::OptimizerContext; +use datafusion_optimizer::unions_to_filter::UnionsToFilter; +use datafusion_optimizer::{Optimizer, OptimizerRule}; +use std::hint::black_box; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Build a three-column table scan for `name`. +fn scan(name: &str) -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + table_scan(Some(name), &schema, None) + .unwrap() + .build() + .unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches all filter over +/// the *same* table (`t`), so the rule can merge them. +fn build_merge_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan("t")) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each filter over a +/// *different* table, so no merge is possible. +fn build_no_merge_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan(&format!("t{i}"))) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each wrap the +/// filter inside a `Projection` over the *same* table. +fn build_merge_with_projection_plan(n: usize) -> LogicalPlan { + assert!(n >= 2); + let mut builder: Option = None; + for i in 0..n { + let branch = LogicalPlanBuilder::from(scan("t")) + .filter(col("a").eq(lit(i as i32))) + .unwrap() + .project(vec![col("a"), col("b")]) + .unwrap() + .build() + .unwrap(); + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(b) => b.union(branch).unwrap(), + }); + } + builder.unwrap().distinct().unwrap().build().unwrap() +} + +/// Run the [`UnionsToFilter`] rule through the full [`Optimizer`] pipeline +/// (single pass, feature flag enabled). +fn run_optimizer(plan: &LogicalPlan, ctx: &OptimizerContext) -> LogicalPlan { + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + Optimizer::with_rules(rules) + .optimize(plan.clone(), ctx, |_, _| {}) + .unwrap() +} + +// --------------------------------------------------------------------------- +// Benchmark functions +// --------------------------------------------------------------------------- + +fn bench_merge(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/merge"); + for n in [2, 8, 32, 128] { + let plan = build_merge_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +fn bench_no_merge(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/no_merge"); + for n in [2, 8, 32, 128] { + let plan = build_no_merge_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +fn bench_merge_with_projection(c: &mut Criterion) { + let mut options = ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let ctx = + OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1); + + let mut group = c.benchmark_group("unions_to_filter/merge_with_projection"); + for n in [2, 8, 32, 128] { + let plan = build_merge_with_projection_plan(n); + group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| { + b.iter(|| black_box(run_optimizer(p, &ctx))); + }); + } + group.finish(); +} + +criterion_group!( + benches, + bench_merge, + bench_no_merge, + bench_merge_with_projection +); +criterion_main!(benches); diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index e61009182409..47adc99ff21b 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -70,6 +70,7 @@ pub mod rewrite_set_comparison; pub mod scalar_subquery_to_join; pub mod simplify_expressions; pub mod single_distinct_to_groupby; +pub mod unions_to_filter; pub mod utils; #[cfg(test)] diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d0fbb31414da..31f8088f79c9 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -56,6 +56,7 @@ use crate::rewrite_set_comparison::RewriteSetComparison; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; +use crate::unions_to_filter::UnionsToFilter; use crate::utils::log_plan; /// Transforms one [`LogicalPlan`] into another which computes the same results, @@ -280,6 +281,7 @@ impl Optimizer { let rules: Vec> = vec![ Arc::new(RewriteSetComparison::new()), Arc::new(OptimizeUnions::new()), + Arc::new(UnionsToFilter::new()), Arc::new(SimplifyExpressions::new()), Arc::new(ReplaceDistinctWithAggregate::new()), Arc::new(EliminateJoin::new()), diff --git a/datafusion/optimizer/src/unions_to_filter.rs b/datafusion/optimizer/src/unions_to_filter.rs new file mode 100644 index 000000000000..852a5eed70e8 --- /dev/null +++ b/datafusion/optimizer/src/unions_to_filter.rs @@ -0,0 +1,652 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrites `UNION DISTINCT` branches that differ only by filter predicates +//! into a single filtered branch plus `DISTINCT`. + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; +use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; +use datafusion_expr::utils::disjunction; +use datafusion_expr::{ + Distinct, Expr, Filter, LogicalPlan, Projection, SubqueryAlias, Union, +}; +use log::debug; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Default, Debug)] +pub struct UnionsToFilter; + +impl UnionsToFilter { + #[expect(missing_docs)] + pub fn new() -> Self { + Self + } +} + +impl OptimizerRule for UnionsToFilter { + fn name(&self) -> &str { + "unions_to_filter" + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + if !config.options().optimizer.enable_unions_to_filter { + return Ok(Transformed::no(plan)); + } + + // Fast pre-check: if the plan tree has no Distinct::All node at all we can + // skip the expensive bottom-up rewrite_with_subqueries traversal entirely. + // This matters for large UNION ALL plans (e.g. TPC-DS Q4) where the rule + // can never fire and the traversal overhead is otherwise measurable. + if !plan.exists(|p| Ok(matches!(p, LogicalPlan::Distinct(Distinct::All(_)))))? { + return Ok(Transformed::no(plan)); + } + + plan.rewrite_with_subqueries(&mut UnionsToFilterRewriter) + } +} + +struct UnionsToFilterRewriter; + +impl TreeNodeRewriter for UnionsToFilterRewriter { + type Node = LogicalPlan; + + fn f_up(&mut self, plan: LogicalPlan) -> Result> { + match plan { + LogicalPlan::Distinct(Distinct::All(input)) => { + let inner = Arc::unwrap_or_clone(input); + match try_rewrite_distinct_union(inner.clone())? { + Some(rewritten) => Ok(Transformed::yes(rewritten)), + None => Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( + Arc::new(inner), + )))), + } + } + _ => Ok(Transformed::no(plan)), + } + } +} + +fn try_rewrite_distinct_union(plan: LogicalPlan) -> Result> { + let LogicalPlan::Union(Union { inputs, schema }) = plan else { + debug!("unions_to_filter skipped: input is not a UNION"); + return Ok(None); + }; + + if inputs.len() < 2 { + debug!( + "unions_to_filter skipped: UNION has {} input(s), need at least 2", + inputs.len() + ); + return Ok(None); + } + + let mut grouped: HashMap> = HashMap::new(); + let mut input_order: Vec = Vec::new(); + let mut transformed = false; + + for input in inputs { + let Some(branch) = extract_branch(Arc::unwrap_or_clone(input))? else { + return Ok(None); + }; + + let key = GroupKey { + source: branch.source, + wrappers: branch.wrappers, + }; + if let Some(conds) = grouped.get_mut(&key) { + conds.push(branch.predicate); + transformed = true; + } else { + input_order.push(key.clone()); + grouped.insert(key, vec![branch.predicate]); + } + } + + if !transformed { + debug!("unions_to_filter skipped: no branch groups could be merged"); + return Ok(None); + } + + let mut builder: Option = None; + for key in input_order { + let predicates = grouped + .remove(&key) + .expect("grouped predicates should exist for every source"); + let combined = + disjunction(predicates).expect("union branches always provide predicates"); + let branch = LogicalPlanBuilder::from(key.source) + .filter(combined)? + .build()?; + let branch = wrap_branch(branch, &key.wrappers)?; + let branch = coerce_plan_expr_for_schema(branch, &schema)?; + let branch = align_plan_to_schema(branch, Arc::clone(&schema))?; + builder = Some(match builder { + None => LogicalPlanBuilder::from(branch), + Some(builder) => builder.union(branch)?, + }); + } + + let union = builder + .expect("at least one branch after rewrite") + .build()?; + Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(union))))) +} + +struct UnionBranch { + source: LogicalPlan, + predicate: Expr, + wrappers: Vec, +} + +fn extract_branch(plan: LogicalPlan) -> Result> { + let (wrappers, plan) = peel_wrappers(plan); + + // Volatile or subquery expressions in the projection must not be merged: + // they are evaluated once per branch in the original plan but would be + // evaluated once per combined row after the rewrite, which can change the + // output row set. + if !wrapper_projections_are_safe(&wrappers) { + debug!( + "unions_to_filter skipped: projection wrapper contains volatile expression or subquery" + ); + return Ok(None); + } + + match plan { + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + if !is_mergeable_predicate(&predicate) { + debug!( + "unions_to_filter skipped: branch predicate contains volatility or a subquery" + ); + return Ok(None); + } + Ok(Some(UnionBranch { + source: strip_passthrough_nodes(Arc::unwrap_or_clone(input)), + predicate, + wrappers, + })) + } + // A Limit or Sort node changes the row-set semantics of the branch. + // Merging two such branches into one would silently drop the per-branch + // row restriction (LIMIT) or rely on an order guarantee that UNION does + // not preserve (ORDER BY). Bail out to leave the UNION unchanged. + LogicalPlan::Limit(_) => { + debug!("unions_to_filter skipped: branch contains LIMIT"); + Ok(None) + } + LogicalPlan::Sort(_) => { + debug!("unions_to_filter skipped: branch contains ORDER BY / SORT"); + Ok(None) + } + other => Ok(Some(UnionBranch { + source: strip_passthrough_nodes(other.clone()), + predicate: Expr::Literal( + datafusion_common::ScalarValue::Boolean(Some(true)), + None, + ), + wrappers, + })), + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct GroupKey { + source: LogicalPlan, + wrappers: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum Wrapper { + Projection { + expr: Vec, + schema: datafusion_common::DFSchemaRef, + }, + SubqueryAlias { + alias: datafusion_common::TableReference, + schema: datafusion_common::DFSchemaRef, + }, +} + +fn peel_wrappers(mut plan: LogicalPlan) -> (Vec, LogicalPlan) { + let mut wrappers = vec![]; + loop { + match plan { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + wrappers.push(Wrapper::Projection { expr, schema }); + plan = Arc::unwrap_or_clone(input); + } + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + .. + }) => { + wrappers.push(Wrapper::SubqueryAlias { alias, schema }); + plan = Arc::unwrap_or_clone(input); + } + other => return (wrappers, other), + } + } +} + +fn wrap_branch(mut plan: LogicalPlan, wrappers: &[Wrapper]) -> Result { + for wrapper in wrappers.iter().rev() { + plan = match wrapper { + Wrapper::Projection { expr, schema } => { + LogicalPlan::Projection(Projection::try_new_with_schema( + expr.clone(), + Arc::new(plan), + Arc::clone(schema), + )?) + } + Wrapper::SubqueryAlias { alias, .. } => LogicalPlan::SubqueryAlias( + SubqueryAlias::try_new(Arc::new(plan), alias.clone())?, + ), + }; + } + Ok(plan) +} + +fn strip_passthrough_nodes(plan: LogicalPlan) -> LogicalPlan { + match plan { + LogicalPlan::Projection(Projection { input, .. }) => { + strip_passthrough_nodes(Arc::unwrap_or_clone(input)) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + strip_passthrough_nodes(Arc::unwrap_or_clone(input)) + } + other => other, + } +} + +fn align_plan_to_schema( + plan: LogicalPlan, + schema: datafusion_common::DFSchemaRef, +) -> Result { + if plan.schema() == &schema { + return Ok(plan); + } + + let expr = plan + .schema() + .iter() + .enumerate() + .map(|(i, _)| { + Expr::Column(datafusion_common::Column::from( + plan.schema().qualified_field(i), + )) + }) + .collect::>(); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr, + Arc::new(plan), + schema, + )?)) +} + +fn is_mergeable_predicate(expr: &Expr) -> bool { + !expr.is_volatile() && !expr_contains_subquery(expr) +} + +/// Returns `true` when every projection expression in `wrappers` is both +/// non-volatile and subquery-free. +/// +/// Volatile expressions (e.g. `random()`, `now()`) or correlated subqueries +/// in the SELECT list cannot be safely merged: the original plan evaluates +/// them once per branch execution, while the rewritten plan evaluates them +/// once per combined row, which can change the set of output rows. +fn wrapper_projections_are_safe(wrappers: &[Wrapper]) -> bool { + wrappers.iter().all(|w| match w { + Wrapper::Projection { expr, .. } => expr + .iter() + .all(|e| !e.is_volatile() && !expr_contains_subquery(e)), + Wrapper::SubqueryAlias { .. } => true, + }) +} + +fn expr_contains_subquery(expr: &Expr) -> bool { + expr.exists(|e| match e { + Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true), + _ => Ok(false), + }) + .expect("boolean expression walk is infallible") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::OptimizerContext; + use crate::assert_optimized_plan_eq_snapshot; + use crate::test::test_table_scan_with_name; + use arrow::datatypes::DataType; + use datafusion_common::Result; + use datafusion_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, col, lit, + }; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let mut options = datafusion_common::config::ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let optimizer_ctx = OptimizerContext::new_with_config_options(Arc::new(options)) + .with_max_passes(1); + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct VolatileTestUdf; + + impl ScalarUDFImpl for VolatileTestUdf { + fn name(&self) -> &str { + "volatile_test" + } + + fn signature(&self) -> &Signature { + static SIGNATURE: std::sync::LazyLock = + std::sync::LazyLock::new(|| Signature::nullary(Volatility::Volatile)); + &SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + panic!("VolatileTestUdf is not intended for execution") + } + } + + fn volatile_expr() -> Expr { + ScalarUDF::new_from_impl(VolatileTestUdf).call(vec![]) + } + + #[test] + fn rewrite_union_distinct_same_source_filters() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: t.a, t.b, t.c + Filter: t.a = Int32(1) OR t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_different_sources() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(col("a").eq(lit(1)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t2")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: t1.a = Int32(1) + TableScan: t1 + Filter: t2.a = Int32(2) + TableScan: t2 + ")?; + Ok(()) + } + + #[test] + fn keep_union_distinct_with_volatile_predicate() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(volatile_expr().gt(lit(0.5_f64)))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Filter: volatile_test() > Float64(0.5) + TableScan: t + Filter: t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + #[test] + fn rewrite_union_distinct_with_matching_projection_prefix() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .filter(col("b").eq(lit(5)))? + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: emp.a AS mgr, emp.b AS comm + Filter: Boolean(true) OR emp.b = Int32(5) + TableScan: emp + ")?; + Ok(()) + } + + /// A volatile expression in the **projection** (SELECT list) must block the + /// rewrite. Each original branch evaluates it independently; merging them + /// would evaluate it once per combined row, changing the row set. + #[test] + fn keep_union_distinct_with_volatile_projection() -> Result<()> { + // Both branches project volatile_test() AS v over the same source. + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .project(vec![volatile_expr().alias("v"), col("a")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .project(vec![volatile_expr().alias("v"), col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Projection: volatile_test() AS v, t.a + Filter: t.a = Int32(1) + TableScan: t + Projection: volatile_test() AS v, t.a + Filter: t.a = Int32(2) + TableScan: t + ")?; + Ok(()) + } + + /// A scalar subquery in the **projection** must also block the rewrite. + #[test] + fn keep_union_distinct_with_subquery_in_projection() -> Result<()> { + use datafusion_expr::scalar_subquery; + + // Build a simple scalar subquery: (SELECT t2.b FROM t2 WHERE t2.a = t.a) + let t2 = test_table_scan_with_name("t2")?; + let subquery_plan = Arc::new( + LogicalPlanBuilder::from(t2) + .filter(col("t2.a").eq(col("t.a")))? + .project(vec![col("t2.b")])? + .build()?, + ); + let sq = scalar_subquery(subquery_plan); + + let left = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(1)))? + .project(vec![sq.clone().alias("sub"), col("a")])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("t")?) + .filter(col("a").eq(lit(2)))? + .project(vec![sq.alias("sub"), col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + // Plan should be left untouched because the projection contains a subquery. + let optimized = { + let mut options = datafusion_common::config::ConfigOptions::default(); + options.optimizer.enable_unions_to_filter = true; + let optimizer_ctx = + OptimizerContext::new_with_config_options(Arc::new(options)) + .with_max_passes(1); + let rules: Vec> = + vec![Arc::new(UnionsToFilter::new())]; + crate::Optimizer::with_rules(rules).optimize( + plan.clone(), + &optimizer_ctx, + |_, _| {}, + )? + }; + // The Distinct(Union(...)) structure must be preserved. + assert!( + matches!( + &optimized, + LogicalPlan::Distinct(Distinct::All(inner)) + if matches!(inner.as_ref(), LogicalPlan::Union(_)) + ), + "expected Distinct(Union(...)) to be preserved, got:\n{optimized:?}" + ); + Ok(()) + } + + /// A UNION where both branches have a LIMIT must **not** be rewritten. + /// Each branch independently restricts the row-set; collapsing them into a + /// single branch would lose the per-branch LIMIT semantics. + #[test] + fn keep_union_distinct_with_limit_branches() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .limit(0, Some(2))? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .limit(0, Some(2))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Limit: skip=0, fetch=2 + Projection: emp.a AS mgr, emp.b AS comm + TableScan: emp + Limit: skip=0, fetch=2 + Projection: emp.a AS mgr, emp.b AS comm + TableScan: emp + ")?; + Ok(()) + } + + /// A UNION where both branches have an ORDER BY (Sort) must **not** be + /// rewritten. ORDER BY inside a UNION subquery does not guarantee ordering + /// in the result; merging the branches would silently discard the Sort. + #[test] + fn keep_union_distinct_with_sort_branches() -> Result<()> { + let left = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .sort(vec![col("a").sort(true, true)])? + .build()?; + let right = LogicalPlanBuilder::from(test_table_scan_with_name("emp")?) + .project(vec![col("a").alias("mgr"), col("b").alias("comm")])? + .sort(vec![col("a").sort(true, true)])? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .union_distinct(right)? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Union + Projection: mgr, comm + Sort: emp.a ASC NULLS FIRST + Projection: emp.a AS mgr, emp.b AS comm, emp.a + TableScan: emp + Projection: mgr, comm + Sort: emp.a ASC NULLS FIRST + Projection: emp.a AS mgr, emp.b AS comm, emp.a + TableScan: emp + ")?; + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2e8a65385541..67d2c1e7b516 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -178,6 +178,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -202,6 +203,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -553,6 +555,7 @@ logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE @@ -577,6 +580,7 @@ logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE +logical_plan after unions_to_filter SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE logical_plan after eliminate_join SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774..8ec72d1b9946 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -308,6 +308,7 @@ datafusion.optimizer.enable_sort_pushdown true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_topk_repartition true +datafusion.optimizer.enable_unions_to_filter false datafusion.optimizer.enable_window_limits true datafusion.optimizer.enable_window_topn false datafusion.optimizer.expand_views_at_output false @@ -455,6 +456,7 @@ datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. +datafusion.optimizer.enable_unions_to_filter false When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime. datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 3871468411c4..500c9d0d9064 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -297,6 +297,53 @@ physical_plan 04)--ProjectionExec: expr=[name@0 || _new as name] 05)----DataSourceExec: partitions=1, partition_sizes=[1] +# unions_to_filter is disabled by default + +statement ok +set datafusion.optimizer.enable_unions_to_filter = false; + +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Union +03)----Filter: t1.id = Int32(1) +04)------TableScan: t1 projection=[id, name] +05)----Filter: t1.id = Int32(2) +06)------TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------UnionExec +06)----------FilterExec: id@0 = 1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] +08)----------FilterExec: id@0 = 2 +09)------------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = true; + +query TT +EXPLAIN SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 +---- +logical_plan +01)Aggregate: groupBy=[[id, name]], aggr=[[]] +02)--Filter: t1.id = Int32(1) OR t1.id = Int32(2) +03)----TableScan: t1 projection=[id, name] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 +03)----AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------FilterExec: id@0 = 1 OR id@0 = 2 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +set datafusion.optimizer.enable_unions_to_filter = false; + # Make sure to choose a small batch size to introduce parallelism to the plan. statement ok set datafusion.execution.batch_size = 2; diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 030ca729f265..6ed71761cdd7 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -372,3 +372,37 @@ impl Default for MyTreeNode { } } ``` + +[#21075]: https://github.com/apache/datafusion/pull/21075 + +### `UnionsToFilter` optimizer rule is now disabled by default + +The `datafusion.optimizer.enable_unions_to_filter` option now defaults to +`false`. When enabled, the rule rewrites `UNION DISTINCT` branches that read the +same source and differ only by filter predicates into a single scan with a +combined `OR` predicate: + +```sql +-- Before: two separate scans +SELECT * FROM t WHERE a = 1 +UNION +SELECT * FROM t WHERE a = 2 + +-- After: one scan +SELECT DISTINCT * FROM t WHERE a = 1 OR a = 2 +``` + +**Who is affected:** + +- Queries using `UNION` against the same table with different filter + conditions may benefit from enabling this rule. + +**Migration guide:** + +Enable the rule when your `UNION` queries scan the same large table +multiple times with different predicates. Avoid it when the data source handles individual equality predicates more efficiently than +a combined `OR` (e.g., index-backed sources). + +```sql +SET datafusion.optimizer.enable_unions_to_filter = true; +``` diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c2..d2841ceb4ca5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -173,6 +173,7 @@ The following configuration settings are available: | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | | datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | +| datafusion.optimizer.enable_unions_to_filter | false | When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that read from the same source and differ only by filter predicates into a single branch with a combined filter. This optimization is conservative and only applies when the branches share the same source and compatible wrapper nodes such as identical projections or aliases. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |