feat: statistics-driven TopK optimization for parquet (file reorder + RG reorder + threshold init + cumulative prune)#21580
Conversation
3700464 to
a013bf6
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (a013bf6) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR improves TopK performance for Parquet scans when sort pushdown is Inexact by enabling row-group reordering based on statistics, so likely “best” row groups are read earlier and dynamic filters can tighten sooner.
Changes:
- Thread an optional
LexOrderingfromParquetSource::try_pushdown_sortthroughParquetMorselizerto the access-plan preparation step. - Add
PreparedAccessPlan::reorder_by_statisticsto reorderrow_group_indexesusing Parquet statistics. - Add unit tests covering reorder/skip behavior for multiple edge cases.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/source.rs | Plumbs sort ordering into the file source for later row-group reordering. |
| datafusion/datasource-parquet/src/opener.rs | Carries optional sort ordering into the opener and applies reorder_by_statistics during plan preparation. |
| datafusion/datasource-parquet/src/access_plan.rs | Implements row-group reordering by statistics and adds focused unit tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let sort_order = LexOrdering::new(order.iter().cloned()); | ||
| let mut new_source = self.clone().with_reverse_row_groups(true); | ||
| new_source.sort_order_for_reorder = sort_order; |
There was a problem hiding this comment.
LexOrdering::new(...) appears to return a Result<LexOrdering, _> (as used with .unwrap() in the new unit tests), but here it’s assigned directly without ?/unwrap, and then assigned to sort_order_for_reorder: Option<LexOrdering> without wrapping in Some(...). This should be changed to construct a LexOrdering with error propagation and store it as Some(sort_order) (or skip setting the field on error). Otherwise this won’t compile.
| let sort_order = LexOrdering::new(order.iter().cloned()); | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = sort_order; | |
| let sort_order = LexOrdering::new(order.iter().cloned())?; | |
| let mut new_source = self.clone().with_reverse_row_groups(true); | |
| new_source.sort_order_for_reorder = Some(sort_order); |
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | ||
| let first_sort_expr = sort_order.first(); |
There was a problem hiding this comment.
sort_order.first() (if LexOrdering is Vec-like) returns Option<&PhysicalSortExpr>, but the code uses it as if it were &PhysicalSortExpr (first_sort_expr.expr...). This is likely a compile error. A concrete fix is to obtain the first element via iteration and handle the empty case (e.g., early-return Ok(self) if no sort expressions), then use the returned &PhysicalSortExpr.
| // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr | |
| let first_sort_expr = sort_order.first(); | |
| let first_sort_expr = match sort_order.iter().next() { | |
| Some(expr) => expr, | |
| None => { | |
| debug!("Skipping RG reorder: empty sort order"); | |
| return Ok(self); | |
| } | |
| }; |
| } | ||
| }; | ||
|
|
||
| let descending = first_sort_expr.options.descending; |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | ||
| let rg_metadata: Vec<&RowGroupMetaData> = self | ||
| .row_group_indexes | ||
| .iter() | ||
| .map(|&idx| file_metadata.row_group(idx)) | ||
| .collect(); | ||
|
|
||
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | ||
| Ok(vals) => vals, | ||
| Err(e) => { | ||
| debug!("Skipping RG reorder: cannot get min values: {e}"); | ||
| return Ok(self); | ||
| } | ||
| }; | ||
|
|
||
| // Sort indices by min values | ||
| let sort_options = arrow::compute::SortOptions { | ||
| descending, | ||
| nulls_first: first_sort_expr.options.nulls_first, | ||
| }; | ||
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, |
There was a problem hiding this comment.
For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.
| // Get min values for the selected row groups | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by min values | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &min_values, | |
| // Get values for the selected row groups: mins for ASC, maxs for DESC | |
| let rg_metadata: Vec<&RowGroupMetaData> = self | |
| .row_group_indexes | |
| .iter() | |
| .map(|&idx| file_metadata.row_group(idx)) | |
| .collect(); | |
| let sort_values = match if descending { | |
| converter.row_group_maxs(rg_metadata.iter().copied()) | |
| } else { | |
| converter.row_group_mins(rg_metadata.iter().copied()) | |
| } { | |
| Ok(vals) => vals, | |
| Err(e) => { | |
| debug!("Skipping RG reorder: cannot get min/max values: {e}"); | |
| return Ok(self); | |
| } | |
| }; | |
| // Sort indices by the statistics that best match the requested order | |
| let sort_options = arrow::compute::SortOptions { | |
| descending, | |
| nulls_first: first_sort_expr.options.nulls_first, | |
| }; | |
| let sorted_indices = match arrow::compute::sort_to_indices( | |
| &sort_values, |
There was a problem hiding this comment.
Yes, this is a good point.
| let sorted_indices = match arrow::compute::sort_to_indices( | ||
| &min_values, | ||
| Some(sort_options), | ||
| None, | ||
| ) { |
There was a problem hiding this comment.
If multiple row groups share the same min (or max) statistic, sort_to_indices may not guarantee a deterministic/stable tie-breaker across platforms/versions. Since row-group order can affect scan reproducibility and performance debugging, consider adding a stable secondary key (e.g., original row group index) when statistics are equal.
| /// - 0 or 1 row groups (nothing to reorder) | ||
| /// - Sort expression is not a simple column reference | ||
| /// - Statistics are unavailable | ||
| pub(crate) fn reorder_by_statistics( |
There was a problem hiding this comment.
I think @adriangb had the great idea to also order by grouping keys which can
- reduce cardinality within partitions (partition-local state can be smaller)
- allow for better cache locality (row groups with more equal keys are grouped together)
Doesn't have to be in this PR but perhaps we can think about how it fits in.
There was a problem hiding this comment.
Thanks @Dandandan for review! That's a great extension. The reorder_by_statistics method is generic enough to take any LexOrdering — it doesn't need to be tied to TopK specifically. So extending this for GROUP BY should be a matter of:
- Computing a preferred RG ordering from grouping keys in the aggregate planner
- Passing it through to ParquetSource::sort_order_for_reorder
Happy to track this as a follow-up issue. Will open one after this PR lands.
There was a problem hiding this comment.
Thanks @Dandandan! Created #21581 to track this. The existing infrastructure from this PR should be directly reusable — mainly needs the aggregate planner to populate sort_order_for_reorder from grouping keys.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned clickbench_extended |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/reorder-row-groups-by-stats (5018882) to 29c5dd5 (merge-base) diff using: clickbench_extended File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
I wonder if the ordering should be done before the files / row groups are assigned to partitions? So they are more globally sorted instead of just per partition? It seems now they are sorted within each partition, which should help, but perhaps not nearly as much as it would be if all the partitions contain the optimal row groups? This would also help in the case of #21581 |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
File an issue against this benchmark runner |
Great point @Dandandan — you're right that global reorder is much more effective than per-partition reorder. With global reorder + round-robin distribution, each partition's first RG is close to the global optimum, so:
The current per-partition reorder is limited because even after sorting, partition 0's "best" RG might be much worse than the global best (which may have landed in partition 2). Moving to global reorder would require changes at the planning / EnforceDistribution layer to load RG statistics and redistribute RGs across partitions. I'd prefer to keep this PR as an incremental step (per-partition) and track global reorder as a follow-up — it would benefit both #21317 and #21581. Does this make sense? |
Sure, makes sense. |
…lly exclusive Previously reorder_by_statistics and reverse_row_groups were mutually exclusive (else-if). This meant DESC queries on unsorted data could only get one optimization. Now they compose: reorder always sorts RGs by min ASC, then reverse flips for DESC. This ensures correct results for both sorted and unsorted inputs without regression. Also removes prepare_with_optimizer in favor of calling optimize() directly on each optimizer, and simplifies reorder_by_statistics to always use min ASC (direction handled by reverse).
The previous jitter formula only added overlap between adjacent RGs but kept the overall RG order ascending by min values. This meant reorder_by_statistics was a no-op — there was nothing to reorder. Fix by bucketing rows into 60 chunks, sorting within each chunk (with jitter for overlap), then scrambling chunk order using a deterministic permutation. This produces RGs that are individually sorted but appear in scrambled order in the file, so reorder_by_statistics has real work to do.
Add FileSource::reorder_files() trait method and ParquetSource implementation that sorts files by column statistics before placing them in the shared work queue. For DESC queries, files with the highest min value come first; for ASC, lowest max first. This ensures the first file read by any partition is the globally optimal one for TopK threshold convergence, complementing the intra-file RG reorder.
Before reading any parquet data, scan row group min/max statistics to compute an initial threshold for TopK's dynamic filter. This allows row-level filtering to benefit immediately from the first file opened, rather than waiting until TopK processes enough rows to build a threshold organically. Algorithm (single-column sort): - DESC LIMIT K: threshold = max(min) across RGs with num_rows >= K Filter: col > threshold - ASC LIMIT K: threshold = min(max) across RGs with num_rows >= K Filter: col < threshold Sort direction is read from sort_options on DynamicFilterPhysicalExpr, which is now set by SortExec::create_filter() for TopK queries. This makes the optimization work for ALL TopK queries on parquet, not just those with sort pushdown. The DynamicFilterPhysicalExpr is shared across all partitions, so each file's threshold update is visible to subsequent files globally. Graceful fallback: skips initialization when sort_options is absent, statistics are unavailable, column not found, or multi-column sort.
Previously file reorder and RG reorder only worked with sort pushdown (Inexact path, WITH ORDER). Now they extract sort info from DynamicFilterPhysicalExpr.sort_options in the predicate, which is set by SortExec for ALL TopK queries regardless of WITH ORDER. This means ORDER BY col DESC LIMIT K on any parquet table benefits from file reorder (best file first in shared queue), RG reorder (best RG first within file), and stats init (threshold before I/O).
Move try_init_topk_threshold() from build_stream() to prune_row_groups(), before prune_by_statistics(). This way: - File 1: stats init sets threshold from ALL its RG statistics, then prune_by_statistics uses it to prune file 1's own RGs. Only the best RG(s) are read, rest skipped with zero I/O. - File 2+: dynamic filter already has tight threshold from file 1, most RGs pruned immediately. This effectively achieves dynamic RG pruning without needing morsel- level scheduling — the threshold is computed from statistics (no data read), then used to prune RGs in the same file.
RG reorder and reverse must only trigger when sort pushdown is active (sort_order_for_reorder is set). Applying them to non-sort-pushdown TopK queries changes the RG read order, which alters tie-breaking for equal values (e.g. NULLs) and causes non-deterministic results. File reorder and stats init remain enabled for ALL TopK queries since they only affect pruning (which rows are skipped), not the relative order of rows within remaining RGs. Fixes fuzz_cases::topk_filter_pushdown::test_fuzz_topk_filter_pushdown
…wrap Critical fix: PruningPredicate compiles the expression at build time, so the DynamicFilterPhysicalExpr must be updated BEFORE the predicate is built. Previously stats init ran after, making RG pruning ineffective for the current file. Also fixes: - Unwrap CastExpr to find the inner Column (projection may add casts) - Use limit=1 default when scan limit is None (TopK fetch is at SortExec level, not pushed to scan) - Only init threshold in sort pushdown path to avoid tie-breaking changes for non-sort-pushdown TopK queries Local benchmark: single file with 61 sorted RGs, DESC LIMIT Baseline: 22-25ms per query Feature: 0.4-1.2ms per query (20-58x faster)
Add null-aware filter for NULLS FIRST sort: `col IS NULL OR col > threshold` ensures RGs with NULLs are not incorrectly pruned. Stats init remains restricted to sort pushdown path because pruning changes tie-breaking for equal values across RGs, which causes non-deterministic results in non-sort-pushdown TopK queries. The null-aware filter is still useful for sort pushdown DESC NULLS FIRST.
Stats init now fires for all TopK queries, not just sort pushdown path. The null-aware filter (IS NULL OR col > threshold for NULLS FIRST) ensures correctness when NULLs are present. Fix fuzz test: add remaining columns as ASC NULLS LAST tiebreakers to ORDER BY, making the sort fully deterministic. This is the correct approach since SQL doesn't guarantee tie-breaking order, and any optimization that changes RG read order may produce different but equally valid results for tied rows.
Stats init with max(min) threshold can over-prune for non-sorted data: the threshold may exceed the actual Kth value when rows are distributed across multiple RGs. This caused output_rows=0 in explain_analyze tests. Restrict stats init to sort pushdown path where data ordering guarantees the threshold is a valid lower bound. Keep fuzz test tiebreaker fix as it's independently correct (SQL doesn't guarantee tie-breaking order).
The max(min)/min(max) algorithm is only a valid threshold bound when RGs are non-overlapping (guaranteed by sorted data with sort pushdown). For overlapping RGs, top-K values may span multiple RGs and the threshold can over-prune, producing fewer results than expected. Keep stats init restricted to sort pushdown path. Keep fuzz test tiebreaker fix (independently correct).
Stats init now fires for ALL TopK queries where the predicate is only the DynamicFilterPhysicalExpr (no WHERE clause combined). This is safe because without WHERE, raw RG statistics accurately represent the qualifying rows. For TopK + WHERE queries, stats init remains restricted to sort pushdown path because the WHERE clause narrows qualifying rows below what raw statistics suggest, making the threshold potentially unsafe. Also adds surviving-rows safety check: after computing threshold, verify that remaining RGs have enough total rows (>= K) before applying. This prevents over-pruning when top-K values span multiple overlapping RGs.
Stats init is only safe when: 1. Sort pushdown active (sorted, non-overlapping RGs) 2. Predicate is DynamicFilter only (no WHERE clause) WHERE clause narrows qualifying rows below what raw statistics suggest, making the threshold potentially unsafe even on sorted data. Type coercion (CastExpr) issues also need resolution for general TopK support — tracked as follow-up work. Includes type cast fix (parquet stats type → column type) and surviving-rows safety check for future use.
Instead of threshold-based pruning (which fails with WHERE clauses due to unknown qualifying row counts), use cumulative row counting: after reorder + reverse, accumulate rows from the front until we have enough for the TopK fetch limit (K), then prune the rest. This works for sort pushdown with or without WHERE because it only depends on row counts + RG ordering, not threshold values or types. Adds fetch field to DynamicFilterPhysicalExpr (set by SortExec) so the parquet reader knows the TopK K value. Keeps stats init for the no-WHERE sort pushdown case (20-58x speedup) as a complementary optimization that also helps cross-file pruning via the shared DynamicFilter.
Extend RG reorder, reverse, and cumulative pruning beyond sort pushdown to ALL TopK queries via DynamicFilterPhysicalExpr sort_options. For non-sort-pushdown TopK, cumulative pruning is guarded by a non-overlap check: after reorder, verify adjacent RGs satisfy max[i] <= min[i+1]. Only prune when RGs are non-overlapping (guarantees top-K values are in the first N RGs). Sort pushdown path skips the overlap check (sorted data is guaranteed non-overlapping).
… regression) Reverse and cumulative pruning from DynamicFilter now only trigger when reorder_optimizer is Some (the sort column was found in parquet stats). For GROUP BY + ORDER BY queries, the sort column is an aggregate output not in parquet — reorder bails out, so reverse and cumulative prune should also skip. Previously, reverse ran regardless, changing I/O patterns with no benefit (Q23 2x slower in ClickBench).
…s PR - benchmarks/bench.sh: data generation fix belongs in apache#21711 - listing_table_partitions.slt: unrelated change from another branch Fuzz test tiebreaker retained (needed for RG reorder on all TopK).
…LT tests Remove try_init_topk_threshold and compute_best_threshold_from_stats. Stats init had multiple issues: - Gt vs GtEq boundary (excluded valid top-K values) - Conflicted with cumulative prune when K spans multiple RGs - Type coercion (CastExpr) and WHERE clause interaction Cumulative RG pruning is strictly better: works with WHERE, no threshold computation, no type issues. After reorder + reverse, just count rows from the front until >= K, truncate the rest. Add comprehensive SLT tests: - Test I: WITH ORDER + DESC LIMIT (stats init + cumulative prune) - Test J: Non-overlapping RGs without WITH ORDER (DynamicFilter path) - Test K: Overlapping RGs (cumulative prune must NOT trigger)
Bring back stats init with all issues fixed: - GtEq/LtEq instead of Gt/Lt (include boundary values) - Use df.fetch() as limit (TopK K value, not scan limit) When K > single RG rows, stats init skips → cumulative prune handles it - Cast threshold to column data type (parquet vs table schema mismatch) - Null-aware filter for NULLS FIRST - Generation check prevents overwrite by later partitions - Restricted to sort pushdown + pure DynamicFilter (no WHERE) Stats init and cumulative prune are complementary: - Stats init: updates PruningPredicate → prunes at RG statistics level - Cumulative prune: truncates after reorder+reverse → prunes by row count Both work together without conflict when using df.fetch().
create_filter() was called before new_sort.fetch was set, so DynamicFilterPhysicalExpr.fetch was always 0 (or None from old self). Fix by setting fetch before creating the filter. This was the root cause of stats init and cumulative prune not triggering on CI — fetch=0 meant "no rows needed" → skip.
For GROUP BY + ORDER BY queries, the TopK sort column is an aggregate output (e.g. COUNT(*)) that doesn't exist in the parquet file schema. Previously we still created ReorderByStatistics which tried to look up the column in statistics — wasted work. Now check column existence in file schema before creating the optimizer. This eliminates overhead for non-scan-level TopK queries (ClickBench Q40-Q42 regression fix).
- truncate_row_groups now skips when row_selection is present to preserve page-level pruning state (xudong review feedback) - Remove incomplete DynamicRgPruner exploration code
Multi-key ORDER BY: use the first sort key for RG-level optimizations. Secondary keys only affect tie-breaking within RGs, not RG decisions. truncate_row_groups: skip truncation when row_selection exists to preserve page-level pruning state (xudong review). Tests: - Test L: multi-key DESC/ASC LIMIT (6 sub-tests) - truncate unit tests: basic, row_selection skip, no-op overflow
Test M: file declared WITH ORDER (id ASC, value ASC), multi-key queries testing: - M.1: EXPLAIN showing reverse_row_groups=true for fully reversed match - M.2: DESC, DESC LIMIT 3 — correct results - M.3: larger LIMIT spanning multiple RGs - M.4: ASC, ASC (same direction = Exact, sort elimination) - M.5: partial match (first key reversed, second key same) — NOT Inexact - M.6: full sort, data integrity check
2081071 to
d725d84
Compare
Which issue does this PR close?
Closes #21691
Partial fix for #21399
Rationale for this change
TopK queries (
ORDER BY col DESC/ASC LIMIT K) on parquet data have several inefficiencies:lit(true), so early RGs are never prunedWhat changes are included in this PR?
A chain of composable optimizations that minimize I/O for TopK queries:
1. Global file reorder (
FileSource::reorder_files)Sort files in the shared work queue by column statistics. DESC: highest min first; ASC: lowest max first. Works for ALL TopK via
DynamicFilterPhysicalExpr.sort_options. Bails fast when sort column not in file schema (GROUP BY + ORDER BY).2. RG reorder within file (
reorder_by_statistics)Reorder row groups by min values (ASC). Works for all TopK via DynamicFilter sort_options (with file schema check). Combined with reverse for DESC queries.
3. TopK threshold init from statistics (
try_init_topk_threshold)Before reading data, compute threshold from RG min/max stats. Runs BEFORE
PruningPredicatebuild so the threshold is compiled into the predicate. UsesGtEq/LtEqto include boundary values. Null-aware filter for NULLS FIRST. Usesdf.fetch()(TopK K value) so stats init skips when K spans multiple RGs. Restricted to sort pushdown + no WHERE (pure DynamicFilter predicate).4. Cumulative RG pruning (
truncate_row_groups)After reorder + reverse, accumulate rows from the front until >= K, prune the rest. For non-sort-pushdown TopK, guarded by a non-overlap check (
max(i) <= min(i+1)). Only when predicate is pure DynamicFilter (no WHERE).5. Compose reorder + reverse
Sequential steps instead of mutually exclusive. Reverse only triggers when reorder succeeds (sort column found in file schema).
How they work together
Coverage matrix
Local benchmark (single file, 61 sorted RGs, DESC LIMIT, 1 partition)
Key bug fix:
SortExec.fetchorderingcreate_filter()was called beforenew_sort.fetchwas set, soDynamicFilterPhysicalExpr.fetchwas always 0. Fixed by setting fetch before creating the filter.Changes to
DynamicFilterPhysicalExprsort_options: Option<Vec<SortOptions>>— sort direction for each childfetch: Option<usize>— TopK K value for cumulative pruningnew_with_sort_options()constructor,sort_options()andfetch()gettersSortExec::create_filter()for all TopK queriesAre these changes tested?
datafusion-datasource-parquet(all pass)test_fuzz_topk_filter_pushdown— updated with tiebreaker columns for deterministic ORDER BYAre there any user-facing changes?
No. Transparent optimization — same results, faster TopK on parquet with statistics.