feat(parquet): row-group morselization for sibling FileStream stealing#21766
feat(parquet): row-group morselization for sibling FileStream stealing#21766Dandandan wants to merge 8 commits intoapache:mainfrom
Conversation
When a parquet file is scanned inside a shared sibling-stream pool (the `SharedWorkSource` introduced by apache#21351), the first stream to open the file now donates its remaining row groups back to the shared queue so idle sibling partitions can steal them. A single large parquet file no longer bottlenecks on one worker. Implementation: - `ParquetOpenState` gains a `SplitAndDonate` state between `LoadMetadata` and `PrepareFilters`. The donor keeps the first eligible row group and pushes each remaining one onto the front of the shared queue as a `PartitionedFile` clone whose `range` is a one-byte `FileRange` at the row group's starting offset. The existing `prune_by_range` path matches that offset and scopes the stealer to just that row group — no new extension types, no metadata carriage, no access-plan donation. - If the caller pre-narrowed the scan with a `file_range` that still covers multiple row groups (byte-range file partitioning), splitting stays inside that range: donated ranges remain subsets of the caller's. - Caller-supplied `ParquetAccessPlan` in `extensions` and single-row- group scopes suppress donation. - `SharedWorkSource` is `pub` and gets `push_front` / `pop_front` / `Default`. `row_group_start_offset` is extracted so it's shared with `prune_by_range`. Stealers re-load the parquet footer; object stores typically cache the range so this is cheap. Sharing loaded metadata across siblings is left for a follow-up. 5 new tests cover: basic donation + stealer round-trip, single-RG files, caller access-plan suppression, splitting inside a caller `file_range`, and single-RG caller ranges. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (bfa1a93) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (bfa1a93) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (bfa1a93) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
…m directly Moves `SplitAndDonate` from between `LoadMetadata` and `PrepareFilters` to *after* `PruneWithBloomFilters`, and restructures stealer paths so row-group morselization composes with the full pruning pipeline. **Donor path**: - Runs the existing pipeline unchanged: file-level pruning → metadata load → prepare filters → page index → stats pruning → bloom pruning. - `SplitAndDonate` then runs `prune_by_limit` (moved out of `build_stream`) as a separate file-level pass, picks the first surviving row group, and packages each remaining one into a `ParquetOpenChunk` containing the access plan, loaded `ArrowReaderMetadata`, prepared `PruningPredicate`, `PagePruningAccessPlanFilter`, physical schema, and rewritten predicate/projection. **Stealer path**: - `ParquetMorselPlanner::try_new` detects a `ParquetOpenChunk` on the incoming `PartitionedFile` and constructs state directly at `BuildStream` via `build_stealer_state`. No metadata load, no predicate rebuild, no pruning traversal — the stealer just builds its reader against the donor's finalized access plan. **Shared work queue split**: - `SharedWorkSource` now has two queues: `morsels` (pre-prepared chunks with finalized state) and `files` (whole unopened files). `pop_front` drains morsels first so their latency stays low. Donor calls `push_morsels` instead of the old `push_front` convention. **Removed state/guards** (no longer needed with direct-BuildStream entry): - `PreparedParquetOpen::is_donated_chunk` and `preloaded_reader_metadata` fields. - The `is_donated_chunk` short-circuits in `prune_file`, `prepare_open_file`, `load`, `prune_row_groups`, and `split_and_donate`. Limit-pruning tests (`test_limit_pruning_*` in `datafusion/core/tests/parquet/row_group_pruning.rs`) pass — the donor sees the full row-group picture for `prune_by_limit`, stealers inherit the pruned plan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
- Inline the `caller_range` construction in
`row_group_split_within_caller_file_range` test and drop the
vestigial `let _ = caller_range;` binding left over from the earlier
file-range-based donation mechanism.
- Update `split_and_donate` docstring: the stale `is_donated_chunk`
reference predates the direct-to-BuildStream entry path. Stealers
now never reach this function.
- Drop `rg_metadata.to_vec()` in the LIMIT pruning pass —
`prune_by_limit` takes `&[RowGroupMetaData]`, so the slice is enough
and we save one allocation per limit-pruned file.
- Delete two "what-not-why" narrating comments from the donation
path ("Bundle everything the stealer needs..." and "Narrow the
donor's access plan...") — the code is self-explanatory.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_1 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (16967cd) to 9a1ed57 (merge-base) diff using: clickbench_1 File an issue against this benchmark runner |
…ealers `build_row_filter` does two things at very different costs: 1. Walk the predicate, split conjuncts, build a `FilterCandidate` per conjunct (resolves ProjectionMask + projected schema + required-bytes estimate), and optionally reorder by cost. This is schema/metadata work that is identical for every open of the same file. 2. Bind each candidate to the current open's metrics counters. Before this change, both ran per open — so a 226-RG file split into 226 chunks paid the analysis cost 226×. After this change, the donor (or an un-split file open) builds the `Vec<FilterCandidate>` once in `prepare_filters`; donated chunks carry it through `ParquetOpenChunk`; each `build_stream` does only the cheap metric binding via the new `row_filter_from_candidates`. Refactor: - Split `row_filter::build_row_filter` into `build_row_filter_candidates` (expensive, metrics-free) and `row_filter_from_candidates` (cheap, per-open). `build_row_filter` becomes a thin wrapper. - `FilterCandidate` now `Clone`. - `FiltersPreparedParquetOpen` gains `row_filter_candidates: Option<Arc<Vec<FilterCandidate>>>`, built in `prepare_filters` from the donor's rewritten predicate. - `ParquetOpenChunk` carries the same `Arc` across the handoff so stealers reuse it in `build_stream`. - `build_stream` now calls `row_filter_from_candidates` on the cached vec instead of re-running the full builder. Correctness: each open still gets its own metric bindings — only the candidate analysis is shared. Existing tests pass (103 lib + 200 integration). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (641e6cc) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (641e6cc) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (641e6cc) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (26f09e4) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (26f09e4) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (26f09e4) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
FYI tpch will continue to fail until #21625 is resolved |
|
run benchmark tpch10 |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (26f09e4) to 9a1ed57 (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (26f09e4) to 9a1ed57 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
…scan Before: when every shared file is popped but no donor has reached its split point yet, an idle sibling saw empty queues and returned Done. Any row groups the donor subsequently pushed to the morsel queue were missed by that sibling. Now SharedWorkSource tracks an in-flight donor count via a FileLease RAII guard. Idle siblings that find both queues empty check the count and, if non-zero, wake_by_ref + Poll::Pending to re-poll. The lease drops at the morsel-to-reader transition — once a file is streaming, the donation window is closed, so we don't block siblings on the donor's assigned row groups. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (52d6bce) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (52d6bce) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (52d6bce) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
|
@Dandandan can you rebase / merge main to get #21625 and we can see if that fixes tcph timing out? |
|
run benchmark tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing row-group-morselization (a5c02d8) to 4bff17e (merge-base) diff using: tpch File an issue against this benchmark runner |
|
This looks really cool @Dandandan -- please let me know when it is ready for review (or if you would like help, etc) |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
I think the perf results look good - I think we mostly need to find out the right API that will also extend to further splitting/merging and other optimizations... |
Which issue does this PR close?
Follow-up to #21351 (Dynamic work scheduling in FileStream), which closed #20529 and explicitly deferred "splitting files into smaller units (e.g. across row groups)" as future work. This PR implements that.
Rationale for this change
With #21351, sibling FileStreams already steal whole files from a
SharedWorkSourcequeue. But a single large parquet file still bottlenecks on one worker — the other N−1 sibling partitions sit idle even though each row group is independently readable. This shows up on single-file queries (ClickBench-style) and on the long-tail large-file case in multi-file scans.This PR adds row-group granularity: the worker that pops a file donates its other row groups back to the shared queue so idle siblings steal them.
What changes are included in this PR?
Donation path (
datafusion/datasource-parquet/src/opener.rs):Shared queue plumbing:
Trade-offs (v1):
Are these changes tested?
Yes. Five new unit tests in `datafusion/datasource-parquet/src/opener.rs`:
All existing `datafusion-datasource` and `datafusion-datasource-parquet` tests continue to pass. `cargo clippy --all-targets --all-features -- -D warnings` is clean on both crates.
Are there any user-facing changes?
Performance only — faster single-file and tail-file scans under sibling work stealing. No semantic or API changes visible to SQL users. `SharedWorkSource` becomes `pub` (it was `pub(crate)`); `FileSource::create_morselizer` gains one parameter — default implementations ignore it.
🤖 Generated with Claude Code