diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 6126793145efd..96b911e8db130 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -431,3 +431,65 @@ async fn test_select_cast_date_literal_to_timestamp_overflow() -> Result<()> { ); Ok(()) } + +// Regression test: a recursive CTE whose anchor aliases a computed column +// (`upper(val) AS val`) and whose recursive term leaves the same expression +// un-aliased must still produce batches whose schema field names come from +// the anchor term — especially when the outer query uses ORDER BY + LIMIT +// (the TopK path passes batch schemas through verbatim, so any drift in +// RecursiveQueryExec's emitted batches is visible downstream). +#[tokio::test] +async fn test_recursive_cte_batch_schema_stable_with_order_by_limit() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql( + "CREATE TABLE records (\ + id VARCHAR NOT NULL, \ + parent_id VARCHAR, \ + ts TIMESTAMP NOT NULL, \ + val VARCHAR\ + )", + ) + .await? + .collect() + .await?; + ctx.sql( + "INSERT INTO records VALUES \ + ('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), \ + ('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), \ + ('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), \ + ('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), \ + ('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), \ + ('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5')", + ) + .await? + .collect() + .await?; + + let results = ctx + .sql( + "WITH RECURSIVE descendants AS (\ + SELECT id, parent_id, ts, upper(val) AS val \ + FROM records WHERE id = 'a00' \ + UNION ALL \ + SELECT r.id, r.parent_id, r.ts, upper(r.val) \ + FROM records r INNER JOIN descendants d ON r.parent_id = d.id \ + ) \ + SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10", + ) + .await? + .collect() + .await?; + + let expected_names = ["id", "parent_id", "ts", "val"]; + assert!(!results.is_empty(), "expected at least one batch"); + for (i, batch) in results.iter().enumerate() { + let schema = batch.schema(); + let actual_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + actual_names, expected_names, + "batch {i} schema field names leaked from recursive branch" + ); + } + Ok(()) +} diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 35b787759441c..6eee5d1237d29 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -317,6 +317,20 @@ impl RecursiveQueryStream { mut batch: RecordBatch, ) -> Poll>> { let baseline_metrics = self.baseline_metrics.clone(); + + // Rebind to the declared output schema. The recursive term is planned + // independently from the static term and its projection may leave + // columns un-aliased (e.g. `upper(r.val)` vs the anchor's + // `upper(val) AS val`); downstream consumers that key on + // `batch.schema().field(i).name()` (TopK, CSV/JSON writers, custom + // collectors) would otherwise see the recursive branch's names leak + // through. Logical-plan coercion guarantees matching types, so this + // is a zero-copy field rebind. + if batch.schema() != self.schema { + batch = + RecordBatch::try_new(Arc::clone(&self.schema), batch.columns().to_vec())?; + } + if let Some(deduplicator) = &mut self.distinct_deduplicator { let _timer_guard = baseline_metrics.elapsed_compute().timer(); batch = deduplicator.deduplicate(&batch)?; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index d56b6f52034fb..e9c1c0245d1c8 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -1223,6 +1223,83 @@ physical_plan 07)------RepartitionExec: partitioning=Hash([start@0], 4), input_partitions=1 08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/closure.csv]]}, projection=[start, end], file_type=csv, has_header=true +# Regression test: recursive CTE column-name stability under ORDER BY + LIMIT. +# +# The recursive CTE below has an anchor that aliases a computed column +# (`upper(val) AS val`) while the recursive term leaves the same expression +# un-aliased (`upper(r.val)`). RecursiveQueryExec declares its output schema +# from the anchor, so the plan-level column is `val`; but if it forwards the +# recursive term's batches without rebinding their schema to the anchor's, +# downstream consumers that read `batch.schema().field(i).name()` observe +# the leaked `upper(r.val)` name. The outer `ORDER BY ts LIMIT 10` routes +# through the TopK path in SortExec, which passes batch schemas through +# verbatim — so the leak surfaces there. +# +# sqllogictest compares row values and can't assert column names directly. +# To surface the leak inside SLT, we `COPY` the result to a CSV file with a +# header row (the CSV writer names header columns from each batch's own +# schema) and then read the file back as headerless CSV so the header line +# appears as a data row we can compare against the expected column names. +statement ok +CREATE TABLE cte_schema_records ( + id VARCHAR NOT NULL, + parent_id VARCHAR, + ts TIMESTAMP NOT NULL, + val VARCHAR +); + +statement ok +INSERT INTO cte_schema_records VALUES + ('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), + ('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), + ('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), + ('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), + ('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), + ('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5'); + +query I +COPY (WITH RECURSIVE descendants AS ( + SELECT id, parent_id, ts, upper(val) AS val + FROM cte_schema_records WHERE id = 'a00' + UNION ALL + SELECT r.id, r.parent_id, r.ts, upper(r.val) + FROM cte_schema_records r + INNER JOIN descendants d ON r.parent_id = d.id + ) + SELECT id, parent_id, ts, val + FROM descendants + ORDER BY ts ASC + LIMIT 10) +TO 'test_files/scratch/cte/recursive_order_limit_schema/' +STORED AS CSV OPTIONS ('format.has_header' 'true'); +---- +6 + +statement ok +CREATE EXTERNAL TABLE cte_schema_reread ( + id VARCHAR, parent_id VARCHAR, ts VARCHAR, val VARCHAR +) +STORED AS CSV +LOCATION 'test_files/scratch/cte/recursive_order_limit_schema/' +OPTIONS ('format.has_header' 'false'); + +# The CSV header row appears as a data row here because we re-read with +# has_header='false'. It must match the CTE's declared column names; if +# RecursiveQueryExec leaked the recursive branch's schema, the last field +# would read `upper(r.val)` instead of `val`. +query TTTT +SELECT id, parent_id, ts, val + FROM cte_schema_reread + WHERE id = 'id'; +---- +id parent_id ts val + +statement ok +DROP TABLE cte_schema_reread; + +statement ok +DROP TABLE cte_schema_records; + statement count 0 set datafusion.execution.enable_recursive_ctes = false;