feat: add approx_top_k aggregate function#20968
feat: add approx_top_k aggregate function#20968sesteves wants to merge 3 commits intoapache:mainfrom
Conversation
0a2f5f8 to
1465c70
Compare
7d12109 to
aa7bc47
Compare
|
cc @Dandandan @martin-g Should be good for another look |
aa7bc47 to
b7a06ea
Compare
|
Thanks for the review @Dandandan @martin-g , I believe I've addressed all of your feedback. |
|
Unfortunately, after the squash+force push I will have to review the whole PR again ... |
Sorry about that, the squash slipped through accidentally! |
|
No problem! |
|
Hi @martin-g and @Dandandan ! Friendly ping on this when you have a moment. I know you mentioned wanting a second person to review - let me know if there is someone specific I should tag, or if there is anything else I can do from my side to help get this across the finish line. |
| #[derive(Debug, Clone)] | ||
| struct SpaceSavingSummary { | ||
| counters: Vec<Counter>, | ||
| counter_map: HashMap<Vec<u8>, usize>, |
There was a problem hiding this comment.
Vec<u8>: this is quite wasteful to store in the map.
we can store the hash inline in the map using the HashTable API instead.
HashTable<u64, usize>
We can remove the hash from the counters
and look up the item using the usize pointer using counter_map.find.
(Look up the pattern in other places like aggregation / join ...)
This saves many allocations / expensive rehashing / memory usage (for larger k, lot's of replacements ...)
There was a problem hiding this comment.
Good call! switched to HashTable<(u64, usize)> following the same pattern as group_values/row.rs and join_hash_map.rs. No key bytes cloned anymore, rebuild is zero-allocation, and double-hashing is gone.
Add a new approx_top_k(expression, k) aggregate function that returns
the approximate top-k most frequent values with their estimated counts,
using the Filtered Space-Saving algorithm.
The implementation uses a capacity multiplier of 3 (matching ClickHouse's
default) and includes an alpha map for improved accuracy by filtering
low-frequency noise before it enters the main summary.
Return type is List(Struct({value: T, count: UInt64})) ordered by count
descending, where T matches the input column type.
Closes apache#20967
- Replace HashMap<Vec<u8>, usize> with hashbrown::HashTable<(u64, usize)> to eliminate key cloning, double-hashing, and O(n) rebuild allocations - Fix merge() to use Parallel Space-Saving m1/m2 algorithm (min counter count correction) matching ClickHouse's SpaceSaving::merge() - Fix serialize() to select top-N counters and fold evicted ones into alpha_map when requested_capacity < counters.len() < target_capacity - Harden from_bytes() with validation: requested_capacity > 0, num_counters <= requested_capacity, alpha_map_size is power-of-two, and overflow-safe bounds checking throughout - Replace all raw += on count/error/alpha with saturating_add - Track item_heap_bytes incrementally for O(1) size() accounting - Pre-allocate serialize() output buffer - Add TIMEZONE_WILDCARD timestamp variants to signature - Remove unused k and data_type from state_fields/state/merge_batch - Replace all manual DataFusionError construction with exec_err!/plan_err! - Replace .unwrap() downcasts in update_batch with proper error returns - Fix test_accumulator_large_utf8_input to use DataType::LargeUtf8 - Update #[user_doc] to document approximate counts, NULL handling, supported types, and return shape - Use with_timezone() instead of arrow::compute::cast for timestamps - Adapt to upstream API changes (remove as_any from AggregateUDFImpl, use direct downcast_ref for Literal)
b7a06ea to
3d1f008
Compare
|
starts to look good! Can you also remove the testing submodule commit / update? And fix remaining CI issues? |
- Fix testing submodule pointer (reverted to match upstream main) - Fix Cargo.toml formatting (taplo) - Fix unresolved rustdoc link: [`size`] -> `size()`
Which issue does this PR close?
Closes #20967.
Rationale
Finding the most frequently occurring values in a column is a very common analytical pattern — top error codes, most popular products, most active users, frequent URL paths, etc. The exact approach (
GROUP BY value ORDER BY COUNT(*) DESC LIMIT k) requires materializing all distinct groups, which is memory-intensive and slow on high-cardinality columns.approx_top_ksolves this with a streaming approximation algorithm that operates in bounded memory, making it practical for large-scale analytics.This function is already available in ClickHouse (
topK) and via extensions in PostgreSQL and Druid, and is a natural addition to DataFusion's existing family of approximate aggregate functions (approx_distinct,approx_median,approx_percentile_cont).What changes are included in this PR?
Core implementation (
datafusion/functions-aggregate/src/approx_top_k.rs)SpaceSavingSummary— Implements the Filtered Space-Saving algorithm (Metwally et al., 2005):CAPACITY_MULTIPLIER = 3(matching ClickHouse's default) — the summary tracksk × 3counters internallyApproxTopK—AggregateUDFImplwith#[user_doc]documentation, supporting any hashable scalar typeApproxTopKAccumulator— Accumulator withupdate_batch,merge_batch,evaluate(returnsList(Struct({value: T, count: UInt64}))ordered by count descending), andstate(serialized summary + data type)9 unit tests covering basic operations, eviction, alpha map, merge, serialization, accumulator update/evaluate, merge_batch, and a distributed merge simulation
Registration (
datafusion/functions-aggregate/src/lib.rs)expr_fnre-export, and registration inall_default_aggregate_functions()SQL logic tests (
datafusion/sqllogictest/test_files/approx_top_k.slt)Documentation
docs/source/user-guide/sql/aggregate_functions.md— TOC entry + full documentation sectiondocs/source/user-guide/expressions.md— quick-reference table entryIntegration tests
datafusion/proto/tests/cases/roundtrip_logical_plan.rs) —approx_top_k(col("a"), lit(3))datafusion/core/tests/dataframe/dataframe_functions.rs) —test_fn_approx_top_kwithinsta::assert_snapshot!Are these changes tested?
Yes:
approx_top_k.rs(algorithm correctness, serialization, accumulator lifecycle, distributed merge)approx_top_k.slt(end-to-end SQL behavior including edge cases and errors)Are there any user-facing changes?
Yes — new
approx_top_kaggregate function available in SQL and the DataFrame API: