Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,65 @@ async fn test_select_cast_date_literal_to_timestamp_overflow() -> Result<()> {
);
Ok(())
}

// Regression test: a recursive CTE whose anchor aliases a computed column
// (`upper(val) AS val`) and whose recursive term leaves the same expression
// un-aliased must still produce batches whose schema field names come from
// the anchor term — especially when the outer query uses ORDER BY + LIMIT
// (the TopK path passes batch schemas through verbatim, so any drift in
// RecursiveQueryExec's emitted batches is visible downstream).
#[tokio::test]
async fn test_recursive_cte_batch_schema_stable_with_order_by_limit() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql(
"CREATE TABLE records (\
id VARCHAR NOT NULL, \
parent_id VARCHAR, \
ts TIMESTAMP NOT NULL, \
val VARCHAR\
)",
)
.await?
.collect()
.await?;
ctx.sql(
"INSERT INTO records VALUES \
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), \
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), \
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), \
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), \
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), \
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5')",
)
.await?
.collect()
.await?;

let results = ctx
.sql(
"WITH RECURSIVE descendants AS (\
SELECT id, parent_id, ts, upper(val) AS val \
FROM records WHERE id = 'a00' \
UNION ALL \
SELECT r.id, r.parent_id, r.ts, upper(r.val) \
FROM records r INNER JOIN descendants d ON r.parent_id = d.id \
) \
SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10",
)
.await?
.collect()
.await?;

let expected_names = ["id", "parent_id", "ts", "val"];
assert!(!results.is_empty(), "expected at least one batch");
for (i, batch) in results.iter().enumerate() {
let schema = batch.schema();
let actual_names: Vec<&str> =
schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
actual_names, expected_names,
"batch {i} schema field names leaked from recursive branch"
);
}
Ok(())
}
14 changes: 14 additions & 0 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,20 @@ impl RecursiveQueryStream {
mut batch: RecordBatch,
) -> Poll<Option<Result<RecordBatch>>> {
let baseline_metrics = self.baseline_metrics.clone();

// Rebind to the declared output schema. The recursive term is planned
// independently from the static term and its projection may leave
// columns un-aliased (e.g. `upper(r.val)` vs the anchor's
// `upper(val) AS val`); downstream consumers that key on
// `batch.schema().field(i).name()` (TopK, CSV/JSON writers, custom
// collectors) would otherwise see the recursive branch's names leak
// through. Logical-plan coercion guarantees matching types, so this
// is a zero-copy field rebind.
if batch.schema() != self.schema {
batch =
RecordBatch::try_new(Arc::clone(&self.schema), batch.columns().to_vec())?;
}

if let Some(deduplicator) = &mut self.distinct_deduplicator {
let _timer_guard = baseline_metrics.elapsed_compute().timer();
batch = deduplicator.deduplicate(&batch)?;
Expand Down
77 changes: 77 additions & 0 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,83 @@ physical_plan
07)------RepartitionExec: partitioning=Hash([start@0], 4), input_partitions=1
08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/closure.csv]]}, projection=[start, end], file_type=csv, has_header=true

# Regression test: recursive CTE column-name stability under ORDER BY + LIMIT.
#
# The recursive CTE below has an anchor that aliases a computed column
# (`upper(val) AS val`) while the recursive term leaves the same expression
# un-aliased (`upper(r.val)`). RecursiveQueryExec declares its output schema
# from the anchor, so the plan-level column is `val`; but if it forwards the
# recursive term's batches without rebinding their schema to the anchor's,
# downstream consumers that read `batch.schema().field(i).name()` observe
# the leaked `upper(r.val)` name. The outer `ORDER BY ts LIMIT 10` routes
# through the TopK path in SortExec, which passes batch schemas through
# verbatim — so the leak surfaces there.
#
# sqllogictest compares row values and can't assert column names directly.
# To surface the leak inside SLT, we `COPY` the result to a CSV file with a
# header row (the CSV writer names header columns from each batch's own
# schema) and then read the file back as headerless CSV so the header line
# appears as a data row we can compare against the expected column names.
statement ok
CREATE TABLE cte_schema_records (
id VARCHAR NOT NULL,
parent_id VARCHAR,
ts TIMESTAMP NOT NULL,
val VARCHAR
);

statement ok
INSERT INTO cte_schema_records VALUES
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'),
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'),
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'),
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'),
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'),
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5');

query I
COPY (WITH RECURSIVE descendants AS (
SELECT id, parent_id, ts, upper(val) AS val
FROM cte_schema_records WHERE id = 'a00'
UNION ALL
SELECT r.id, r.parent_id, r.ts, upper(r.val)
FROM cte_schema_records r
INNER JOIN descendants d ON r.parent_id = d.id
)
SELECT id, parent_id, ts, val
FROM descendants
ORDER BY ts ASC
LIMIT 10)
TO 'test_files/scratch/cte/recursive_order_limit_schema/'
STORED AS CSV OPTIONS ('format.has_header' 'true');
----
6

statement ok
CREATE EXTERNAL TABLE cte_schema_reread (
id VARCHAR, parent_id VARCHAR, ts VARCHAR, val VARCHAR
)
STORED AS CSV
LOCATION 'test_files/scratch/cte/recursive_order_limit_schema/'
OPTIONS ('format.has_header' 'false');

# The CSV header row appears as a data row here because we re-read with
# has_header='false'. It must match the CTE's declared column names; if
# RecursiveQueryExec leaked the recursive branch's schema, the last field
# would read `upper(r.val)` instead of `val`.
query TTTT
SELECT id, parent_id, ts, val
FROM cte_schema_reread
WHERE id = 'id';
----
id parent_id ts val

statement ok
DROP TABLE cte_schema_reread;

statement ok
DROP TABLE cte_schema_records;

statement count 0
set datafusion.execution.enable_recursive_ctes = false;

Expand Down
Loading