From 75cd4d79021e0a0bec6e245e1912b64202e8c209 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 22 Mar 2026 00:02:39 -0400 Subject: [PATCH 1/6] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. --- datafusion/sql/src/unparser/plan.rs | 37 ++++++++++++++++ datafusion/sql/tests/cases/plan_to_sql.rs | 54 ++++++++++++++++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ca8dfa431b4f5..73950b961e8ba 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -828,6 +828,27 @@ impl Unparser<'_> { Some(plan_alias.alias.clone()), select.already_projected(), )?; + + // If the SubqueryAlias directly wraps a plan that builds its + // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, + // we must emit a derived subquery: (SELECT ...) AS alias. + // Without this, the recursive handler would merge those clauses + // into the outer SELECT, losing the subquery structure entirely. + if unparsed_table_scan.is_none() + && Self::requires_derived_subquery(plan_alias.input.as_ref()) + { + return self.derive( + &plan_alias.input, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + columns, + )), + false, + ); + } + // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it if !select.already_projected() && unparsed_table_scan.is_none() { @@ -1060,6 +1081,22 @@ impl Unparser<'_> { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } + /// Returns true if a plan, when used as the direct child of a SubqueryAlias, + /// must be emitted as a derived subquery `(SELECT ...) AS alias`. + /// + /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, + /// window functions). + fn requires_derived_subquery(plan: &LogicalPlan) -> bool { + matches!( + plan, + LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Union(_) + ) + } + /// Try to unparse a table scan with pushdown operations into a new subquery plan. /// If the table scan is without any pushdown operations, return None. fn unparse_table_scan_pushdown( diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index aefb404ba4106..24f9226636455 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -23,7 +23,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{WindowFunction, WindowFunctionParams}; use datafusion_expr::test::function_stub::{ - count_udaf, max_udaf, min_udaf, sum, sum_udaf, + count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, @@ -2893,3 +2893,55 @@ fn test_json_access_3() { @r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"# ); } + +/// Test that unparsing a manually constructed join with a subquery aggregate +/// preserves the MAX aggregate function. +/// +/// Builds the equivalent of: +/// SELECT j1.j1_string FROM j1 +/// JOIN (SELECT max(j2_id) AS max_id FROM j2) AS b +/// ON j1.j1_id = b.max_id +#[test] +fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + let j2_schema = context + .get_table_source(TableReference::bare("j2"))? + .schema(); + + // Build the right side: SELECT max(j2_id) AS max_id FROM j2 + let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; + let right_agg = LogicalPlanBuilder::from(right_scan) + .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .build()?; + let right_subquery = subquery_alias(right_agg, "b")?; + + // Build the full plan: SELECT j1.j1_string FROM j1 JOIN (...) AS b ON j1.j1_id = b.max_id + let left_scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(left_scan) + .join( + right_subquery, + datafusion_expr::JoinType::Inner, + ( + vec![Column::from_qualified_name("j1.j1_id")], + vec![Column::from_qualified_name("b.max_id")], + ), + None, + )? + .project(vec![col("j1.j1_string")])? + .build()?; + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + let sql_upper = sql.to_uppercase(); + assert!( + sql_upper.contains("MAX("), + "Unparsed SQL should preserve the MAX aggregate function call, got: {sql}" + ); + + Ok(()) +} From 0d223f9fea321ca4667aca57c5bca483cc201aa9 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 27 Mar 2026 12:41:57 -0400 Subject: [PATCH 2/6] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/plan.rs | 2 +- datafusion/sql/tests/cases/plan_to_sql.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 73950b961e8ba..0b9f24bc87327 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1085,7 +1085,7 @@ impl Unparser<'_> { /// must be emitted as a derived subquery `(SELECT ...) AS alias`. /// /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, - /// window functions). + /// window functions). fn requires_derived_subquery(plan: &LogicalPlan) -> bool { matches!( plan, diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 24f9226636455..db94e32c8d913 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2916,7 +2916,10 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { // Build the right side: SELECT max(j2_id) AS max_id FROM j2 let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; let right_agg = LogicalPlanBuilder::from(right_scan) - .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .aggregate( + vec![] as Vec, + vec![max(col("j2.j2_id")).alias("max_id")], + )? .build()?; let right_subquery = subquery_alias(right_agg, "b")?; From 42f7f64e6a9cd400dc03e10a498bc32ffd67be57 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 3 Apr 2026 22:21:44 -0400 Subject: [PATCH 3/6] Fixes in PR --- datafusion/sql/src/unparser/plan.rs | 33 +++++++++++++++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 14 ++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 0b9f24bc87327..c0b77b9dba88a 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -829,17 +829,40 @@ impl Unparser<'_> { select.already_projected(), )?; - // If the SubqueryAlias directly wraps a plan that builds its - // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // If the (possibly rewritten) inner plan builds its own + // SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, // we must emit a derived subquery: (SELECT ...) AS alias. // Without this, the recursive handler would merge those clauses // into the outer SELECT, losing the subquery structure entirely. - if unparsed_table_scan.is_none() - && Self::requires_derived_subquery(plan_alias.input.as_ref()) + if unparsed_table_scan.is_none() && Self::requires_derived_subquery(plan) { + // When the dialect does not support column aliases in + // table aliases (e.g. SQLite), inject the aliases into + // the inner projection before wrapping as a derived + // subquery. + if !columns.is_empty() + && !self.dialect.supports_column_alias_in_table_alias() + { + let Ok(rewritten_plan) = + inject_column_aliases_into_subquery(plan.clone(), columns) + else { + return internal_err!( + "Failed to transform SubqueryAlias plan" + ); + }; + return self.derive( + &rewritten_plan, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + vec![], + )), + false, + ); + } return self.derive( - &plan_alias.input, + plan, relation, Some(self.new_table_alias( plan_alias.alias.table().to_string(), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index db94e32c8d913..9aff555825776 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2894,6 +2894,20 @@ fn test_json_access_3() { ); } +/// Roundtrip test for a subquery aggregate with column aliases. +/// Ensures that `subquery_alias_inner_query_and_columns` unwrapping +/// a Projection -> Aggregate still triggers the derived-subquery path. +#[test] +fn roundtrip_subquery_aggregate_with_column_alias() -> Result<(), DataFusionError> { + roundtrip_statement_with_dialect_helper!( + sql: "SELECT id FROM (SELECT max(j1_id) FROM j1) AS c(id)", + parser_dialect: GenericDialect {}, + unparser_dialect: UnparserDefaultDialect {}, + expected: @"SELECT c.id FROM (SELECT max(j1.j1_id) FROM j1) AS c (id)", + ); + Ok(()) +} + /// Test that unparsing a manually constructed join with a subquery aggregate /// preserves the MAX aggregate function. /// From 9a31259f7cf05b673ebea4d6466a6a6e8c2cb2d7 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sat, 4 Apr 2026 12:24:22 -0400 Subject: [PATCH 4/6] test: add tests for aggregate over subquery unparsing Add roundtrip_aggregate_over_subquery and test_unparse_aggregate_over_subquery_no_inner_proj to cover aggregate expressions over subquery aliases. The latter demonstrates a bug where the outer projection resolves aggregate expressions instead of column references when the Projection between Limit and Aggregate is absent. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/tests/cases/plan_to_sql.rs | 85 +++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 1d80b5b4816c2..9ccb06249031d 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2919,6 +2919,91 @@ fn roundtrip_subquery_aggregate_with_column_alias() -> Result<(), DataFusionErro Ok(()) } +/// Roundtrip: aggregate over a subquery projection. +#[test] +fn roundtrip_aggregate_over_subquery() -> Result<(), DataFusionError> { + let sql = r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(j1_rename) AS __agg_0, max(j1_rename) AS __agg_1 FROM (SELECT j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#; + + let statement = Parser::new(&GenericDialect {}) + .try_with_sql(sql)? + .parse_statement()?; + + let state = MockSessionState::default() + .with_aggregate_function(max_udaf()) + .with_aggregate_function(min_udaf()) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())) + .with_expr_planner(Arc::new(NestedFunctionPlanner)) + .with_expr_planner(Arc::new(FieldAccessPlanner)); + + let context = MockContextProvider { state }; + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel + .sql_statement_to_plan(statement) + .unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}")); + + println!("Logical plan:\n{plan}"); + println!("\nLogical plan (verbose):\n{}", plan.display_indent_schema()); + + let unparser = Unparser::new(&UnparserDefaultDialect {}); + let roundtrip_statement = unparser.plan_to_sql(&plan)?; + let actual = &roundtrip_statement.to_string(); + + insta::assert_snapshot!(actual, @r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1 FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#); + Ok(()) +} + +/// Same as roundtrip_aggregate_over_subquery but with the Projection between +/// Limit and Aggregate removed — the aliases are inlined into the Aggregate. +/// +/// Plan shape: +/// Projection: __agg_0 AS "max1(j1_id)", __agg_1 AS "max2(j1_id)" +/// Limit: fetch=20 +/// Aggregate: aggr=[[max(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1]] +/// SubqueryAlias: bla +/// Projection: j1.j1_id AS j1_rename +/// TableScan: j1 +#[test] +fn test_unparse_aggregate_over_subquery_no_inner_proj() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + + // (SELECT j1_id AS j1_rename FROM j1) AS bla + let scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let inner_subquery = LogicalPlanBuilder::from(scan) + .project(vec![col("j1.j1_id").alias("j1_rename")])? + .alias("bla")? + .build()?; + + // Aggregate with aliases inlined (no separate Projection) + let plan = LogicalPlanBuilder::from(inner_subquery) + .aggregate( + vec![] as Vec, + vec![ + max(col("bla.j1_rename")).alias("__agg_0"), + max(col("bla.j1_rename")).alias("__agg_1"), + ], + )? + .limit(0, Some(20))? + .project(vec![ + col("__agg_0").alias("max1(j1_id)"), + col("__agg_1").alias("max2(j1_id)"), + ])? + .build()?; + + println!("Logical plan:\n{plan}"); + println!("\nLogical plan (verbose):\n{}", plan.display_indent_schema()); + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + println!("\nUnparsed SQL:\n{sql}"); + + Ok(()) +} + /// Test that unparsing a manually constructed join with a subquery aggregate /// preserves the MAX aggregate function. /// From 66629b7307467dff9642e74c986222b408d245ed Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sat, 4 Apr 2026 17:39:28 -0400 Subject: [PATCH 5/6] test: add no-outer-rename aggregate unparse test Adds test_unparse_aggregate_no_outer_rename to verify behavior when the outer Projection references aggregate columns without renaming them. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/tests/cases/plan_to_sql.rs | 59 ++++++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 9ccb06249031d..379b48eef8d75 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2942,7 +2942,10 @@ fn roundtrip_aggregate_over_subquery() -> Result<(), DataFusionError> { .unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}")); println!("Logical plan:\n{plan}"); - println!("\nLogical plan (verbose):\n{}", plan.display_indent_schema()); + println!( + "\nLogical plan (verbose):\n{}", + plan.display_indent_schema() + ); let unparser = Unparser::new(&UnparserDefaultDialect {}); let roundtrip_statement = unparser.plan_to_sql(&plan)?; @@ -2995,7 +2998,59 @@ fn test_unparse_aggregate_over_subquery_no_inner_proj() -> Result<()> { .build()?; println!("Logical plan:\n{plan}"); - println!("\nLogical plan (verbose):\n{}", plan.display_indent_schema()); + println!( + "\nLogical plan (verbose):\n{}", + plan.display_indent_schema() + ); + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + println!("\nUnparsed SQL:\n{sql}"); + + Ok(()) +} + +/// Same as test_unparse_aggregate_over_subquery_no_inner_proj but the outer +/// Projection references the aggregate columns WITHOUT renaming them. +/// The output column names should still match the Aggregate's aliases. +/// +/// Plan shape: +/// Projection: __agg_0, __agg_1 +/// Aggregate: aggr=[[max(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1]] +/// SubqueryAlias: bla +/// Projection: j1.j1_id AS j1_rename +/// TableScan: j1 +#[test] +fn test_unparse_aggregate_no_outer_rename() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + + let scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let inner_subquery = LogicalPlanBuilder::from(scan) + .project(vec![col("j1.j1_id").alias("j1_rename")])? + .alias("bla")? + .build()?; + + let plan = LogicalPlanBuilder::from(inner_subquery) + .aggregate( + vec![] as Vec, + vec![ + max(col("bla.j1_rename")).alias("__agg_0"), + max(col("bla.j1_rename")).alias("__agg_1"), + ], + )? + .project(vec![col("__agg_0"), col("__agg_1")])? + .build()?; + + println!("Logical plan:\n{plan}"); + println!( + "\nLogical plan (verbose):\n{}", + plan.display_indent_schema() + ); let unparser = Unparser::default(); let sql = unparser.plan_to_sql(&plan)?.to_string(); From da4306c432b67f1e9d433da868aee0395cf03be8 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sat, 4 Apr 2026 18:08:11 -0400 Subject: [PATCH 6/6] fix: fold Limit/Sort into outer SELECT when Projection claims Aggregate through them When a Projection's `reconstruct_select_statement` reaches through a Limit or Sort to claim an Aggregate, the Limit/Sort arm would later see `already_projected` and wrap everything in a spurious derived subquery, emitting the aggregate twice. Fix: in the Projection arm, after claiming the Aggregate, detect if the direct child is a Limit or Sort. If so, fold its clauses (LIMIT/OFFSET or ORDER BY) into the current query and recurse into the Limit/Sort's child, skipping the node entirely. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/plan.rs | 79 +++++++++++++- datafusion/sql/tests/cases/plan_to_sql.rs | 122 ++++++++-------------- 2 files changed, 120 insertions(+), 81 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index c0b77b9dba88a..df0a18d870b17 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -226,12 +226,14 @@ impl Unparser<'_> { /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate /// and Window nodes and matching column expressions to the appropriate agg or window expressions. + /// + /// Returns `true` if an Aggregate node was found and claimed for this SELECT. fn reconstruct_select_statement( &self, plan: &LogicalPlan, p: &Projection, select: &mut SelectBuilder, - ) -> Result<()> { + ) -> Result { let mut exprs = p.expr.clone(); // If an Unnest node is found within the select, find and unproject the unnest column @@ -264,6 +266,7 @@ impl Unparser<'_> { .collect::>>()?, vec![], )); + Ok(true) } (None, Some(window)) => { let items = exprs @@ -275,6 +278,7 @@ impl Unparser<'_> { .collect::>>()?; select.projection(items); + Ok(false) } _ => { let items = exprs @@ -282,9 +286,9 @@ impl Unparser<'_> { .map(|e| self.select_item_to_sql(e)) .collect::>>()?; select.projection(items); + Ok(false) } } - Ok(()) } fn derive( @@ -423,7 +427,76 @@ impl Unparser<'_> { columns, ); } - self.reconstruct_select_statement(plan, p, select)?; + let found_agg = self.reconstruct_select_statement(plan, p, select)?; + + // If the Projection claimed an Aggregate by reaching through + // a Limit or Sort, fold those clauses into the current query + // and skip the node during recursion. Otherwise the Limit/Sort + // arm would see `already_projected` and wrap everything in a + // spurious derived subquery. + if found_agg { + if let LogicalPlan::Limit(limit) = p.input.as_ref() { + if let Some(fetch) = &limit.fetch { + let Some(query) = query.as_mut() else { + return internal_err!( + "Limit operator only valid in a statement context." + ); + }; + query.limit(Some(self.expr_to_sql(fetch)?)); + } + if let Some(skip) = &limit.skip { + let Some(query) = query.as_mut() else { + return internal_err!( + "Offset operator only valid in a statement context." + ); + }; + query.offset(Some(ast::Offset { + rows: ast::OffsetRows::None, + value: self.expr_to_sql(skip)?, + })); + } + return self.select_to_sql_recursively( + limit.input.as_ref(), + query, + select, + relation, + ); + } + if let LogicalPlan::Sort(sort) = p.input.as_ref() { + let Some(query_ref) = query.as_mut() else { + return internal_err!( + "Sort operator only valid in a statement context." + ); + }; + if let Some(fetch) = sort.fetch { + query_ref.limit(Some(ast::Expr::value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } + let agg = + find_agg_node_within_select(plan, select.already_projected()); + let sort_exprs: Vec = sort + .expr + .iter() + .map(|sort_expr| { + unproject_sort_expr( + sort_expr.clone(), + agg, + sort.input.as_ref(), + ) + }) + .collect::>>()?; + query_ref.order_by(self.sorts_to_sql(&sort_exprs)?); + return self.select_to_sql_recursively( + sort.input.as_ref(), + query, + select, + relation, + ); + } + } + self.select_to_sql_recursively(p.input.as_ref(), query, select, relation) } LogicalPlan::Filter(filter) => { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 379b48eef8d75..f45886e2f6cbe 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2919,52 +2919,21 @@ fn roundtrip_subquery_aggregate_with_column_alias() -> Result<(), DataFusionErro Ok(()) } -/// Roundtrip: aggregate over a subquery projection. +/// Roundtrip: aggregate over a subquery projection with limit. #[test] fn roundtrip_aggregate_over_subquery() -> Result<(), DataFusionError> { - let sql = r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(j1_rename) AS __agg_0, max(j1_rename) AS __agg_1 FROM (SELECT j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#; - - let statement = Parser::new(&GenericDialect {}) - .try_with_sql(sql)? - .parse_statement()?; - - let state = MockSessionState::default() - .with_aggregate_function(max_udaf()) - .with_aggregate_function(min_udaf()) - .with_expr_planner(Arc::new(CoreFunctionPlanner::default())) - .with_expr_planner(Arc::new(NestedFunctionPlanner)) - .with_expr_planner(Arc::new(FieldAccessPlanner)); - - let context = MockContextProvider { state }; - let sql_to_rel = SqlToRel::new(&context); - let plan = sql_to_rel - .sql_statement_to_plan(statement) - .unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}")); - - println!("Logical plan:\n{plan}"); - println!( - "\nLogical plan (verbose):\n{}", - plan.display_indent_schema() + roundtrip_statement_with_dialect_helper!( + sql: r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(j1_rename) AS __agg_0, max(j1_rename) AS __agg_1 FROM (SELECT j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#, + parser_dialect: GenericDialect {}, + unparser_dialect: UnparserDefaultDialect {}, + expected: @r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1 FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#, ); - - let unparser = Unparser::new(&UnparserDefaultDialect {}); - let roundtrip_statement = unparser.plan_to_sql(&plan)?; - let actual = &roundtrip_statement.to_string(); - - insta::assert_snapshot!(actual, @r#"SELECT __agg_0 AS "min(j1_id)", __agg_1 AS "max(j1_id)" FROM (SELECT min(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1 FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla LIMIT 20)"#); Ok(()) } -/// Same as roundtrip_aggregate_over_subquery but with the Projection between -/// Limit and Aggregate removed — the aliases are inlined into the Aggregate. -/// -/// Plan shape: -/// Projection: __agg_0 AS "max1(j1_id)", __agg_1 AS "max2(j1_id)" -/// Limit: fetch=20 -/// Aggregate: aggr=[[max(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1]] -/// SubqueryAlias: bla -/// Projection: j1.j1_id AS j1_rename -/// TableScan: j1 +/// Projection → Limit → Aggregate (aliases inlined into Aggregate, no +/// intermediate Projection). Verifies the Limit is folded into the outer +/// SELECT rather than creating a spurious derived subquery. #[test] fn test_unparse_aggregate_over_subquery_no_inner_proj() -> Result<()> { let context = MockContextProvider { @@ -2974,15 +2943,10 @@ fn test_unparse_aggregate_over_subquery_no_inner_proj() -> Result<()> { .get_table_source(TableReference::bare("j1"))? .schema(); - // (SELECT j1_id AS j1_rename FROM j1) AS bla let scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; - let inner_subquery = LogicalPlanBuilder::from(scan) + let plan = LogicalPlanBuilder::from(scan) .project(vec![col("j1.j1_id").alias("j1_rename")])? .alias("bla")? - .build()?; - - // Aggregate with aliases inlined (no separate Projection) - let plan = LogicalPlanBuilder::from(inner_subquery) .aggregate( vec![] as Vec, vec![ @@ -2997,29 +2961,13 @@ fn test_unparse_aggregate_over_subquery_no_inner_proj() -> Result<()> { ])? .build()?; - println!("Logical plan:\n{plan}"); - println!( - "\nLogical plan (verbose):\n{}", - plan.display_indent_schema() - ); - - let unparser = Unparser::default(); - let sql = unparser.plan_to_sql(&plan)?.to_string(); - println!("\nUnparsed SQL:\n{sql}"); - + let sql = Unparser::default().plan_to_sql(&plan)?.to_string(); + insta::assert_snapshot!(sql, @r#"SELECT max(bla.j1_rename) AS "max1(j1_id)", max(bla.j1_rename) AS "max2(j1_id)" FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla LIMIT 20"#); Ok(()) } -/// Same as test_unparse_aggregate_over_subquery_no_inner_proj but the outer -/// Projection references the aggregate columns WITHOUT renaming them. -/// The output column names should still match the Aggregate's aliases. -/// -/// Plan shape: -/// Projection: __agg_0, __agg_1 -/// Aggregate: aggr=[[max(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1]] -/// SubqueryAlias: bla -/// Projection: j1.j1_id AS j1_rename -/// TableScan: j1 +/// Projection → Aggregate (aliases inlined, no rename in outer Projection). +/// Verifies the aggregate aliases are preserved as output column names. #[test] fn test_unparse_aggregate_no_outer_rename() -> Result<()> { let context = MockContextProvider { @@ -3030,12 +2978,9 @@ fn test_unparse_aggregate_no_outer_rename() -> Result<()> { .schema(); let scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; - let inner_subquery = LogicalPlanBuilder::from(scan) + let plan = LogicalPlanBuilder::from(scan) .project(vec![col("j1.j1_id").alias("j1_rename")])? .alias("bla")? - .build()?; - - let plan = LogicalPlanBuilder::from(inner_subquery) .aggregate( vec![] as Vec, vec![ @@ -3046,16 +2991,37 @@ fn test_unparse_aggregate_no_outer_rename() -> Result<()> { .project(vec![col("__agg_0"), col("__agg_1")])? .build()?; - println!("Logical plan:\n{plan}"); - println!( - "\nLogical plan (verbose):\n{}", - plan.display_indent_schema() - ); + let sql = Unparser::default().plan_to_sql(&plan)?.to_string(); + insta::assert_snapshot!(sql, @"SELECT max(bla.j1_rename) AS __agg_0, max(bla.j1_rename) AS __agg_1 FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla"); + Ok(()) +} - let unparser = Unparser::default(); - let sql = unparser.plan_to_sql(&plan)?.to_string(); - println!("\nUnparsed SQL:\n{sql}"); +/// Projection → Sort → Aggregate (aliases inlined into Aggregate). +/// Verifies the Sort is folded into the outer SELECT rather than creating +/// a spurious derived subquery. +#[test] +fn test_unparse_aggregate_with_sort_no_inner_proj() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + + let scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(scan) + .project(vec![col("j1.j1_id").alias("j1_rename")])? + .alias("bla")? + .aggregate( + vec![] as Vec, + vec![max(col("bla.j1_rename")).alias("__agg_0")], + )? + .sort(vec![col("__agg_0").sort(true, true)])? + .project(vec![col("__agg_0").alias("max1(j1_id)")])? + .build()?; + let sql = Unparser::default().plan_to_sql(&plan)?.to_string(); + insta::assert_snapshot!(sql, @r#"SELECT max(bla.j1_rename) AS "max1(j1_id)" FROM (SELECT j1.j1_id AS j1_rename FROM j1) AS bla ORDER BY max(bla.j1_rename) ASC NULLS FIRST"#); Ok(()) }