Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,48 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
.map(|e| map.get(e).unwrap_or(e).clone())
.collect::<Vec<_>>();

// The inner Projection may define aliases that the Sort references
// but the outer Projection does not include. Since we are about to
// replace the inner Projection's expressions with `new_exprs` (which
// only contains the outer Projection's columns), those alias
// definitions will be lost. To keep the Sort valid, rewrite any
// sort expression that references a dropped alias so that it uses
// the alias's underlying expression instead.
let projected_aliases: HashSet<&str> = new_exprs
.iter()
.filter_map(|e| match e {
Expr::Alias(alias) => Some(alias.name.as_str()),
_ => None,
})
.collect();

let dropped_aliases: HashMap<String, Expr> = inner_p
.expr
.iter()
.filter_map(|e| match e {
Expr::Alias(alias)
if !projected_aliases.contains(alias.name.as_str()) =>
{
Some((alias.name.clone(), (*alias.expr).clone()))
}
_ => None,
})
.collect();

if !dropped_aliases.is_empty() {
for sort_expr in &mut sort.expr {
let mut expr = sort_expr.expr.clone();
while let Expr::Alias(alias) = expr {
expr = *alias.expr;
}
if let Expr::Column(ref col) = expr
&& let Some(underlying) = dropped_aliases.get(col.name())
{
sort_expr.expr = underlying.clone();
}
}
}

inner_p.expr.clone_from(&new_exprs);
sort.input = Arc::new(LogicalPlan::Projection(inner_p));

Expand Down
49 changes: 49 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3008,6 +3008,55 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> {
Ok(())
}

/// Regression test for https://github.com/apache/datafusion/issues/21490
///
/// When the outer Projection excludes a Sort column whose definition only
/// exists as an alias in the inner Projection, the Unparser must inline the
/// underlying expression into ORDER BY rather than emitting the now-missing
/// alias name.
#[test]
fn test_sort_on_aliased_column_dropped_by_outer_projection() -> Result<()> {
let schema = Schema::new(vec![
Field::new("X", DataType::Utf8, true),
Field::new("Y", DataType::Utf8, true),
Field::new("Z", DataType::Utf8, true),
]);

// Build:
// Projection: [a, b] -- outer: excludes sort column "c"
// Sort: [c DESC, fetch=1] -- references alias "c"
// Projection: [X AS a, Y AS b, Z AS c] -- defines alias "c"
// SubqueryAlias: t
// TableScan: phys_table [X, Y, Z]
let plan = table_scan(Some("phys_table"), &schema, None)?
.alias("t")?
.project(vec![
Expr::Column(Column::new(Some(TableReference::bare("t")), "X")).alias("a"),
Expr::Column(Column::new(Some(TableReference::bare("t")), "Y")).alias("b"),
Expr::Column(Column::new(Some(TableReference::bare("t")), "Z")).alias("c"),
])?
.sort_with_limit(
vec![Expr::Column(Column::new_unqualified("c")).sort(false, true)],
Some(1),
)?
.project(vec![
Expr::Column(Column::new_unqualified("a")),
Expr::Column(Column::new_unqualified("b")),
])?
.build()?;

let unparser = Unparser::default();
let sql = unparser.plan_to_sql(&plan)?;

// ORDER BY must reference the physical column, not the dropped alias.
assert_snapshot!(
sql,
@r#"SELECT t."X" AS a, t."Y" AS b FROM phys_table AS t ORDER BY t."Z" DESC NULLS FIRST LIMIT 1"#
);

Ok(())
}

#[test]
fn snowflake_unnest_to_lateral_flatten_simple() -> Result<(), DataFusionError> {
let snowflake = SnowflakeDialect::new();
Expand Down
Loading