From 91a5391f69293faf61194b4e1d8a58a6c6115110 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 22 Apr 2026 22:34:08 +0800 Subject: [PATCH 1/5] add benchmark --- .../benches/count_distinct.rs | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 4d9e8c5b67b3..79f6a196dba5 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -49,6 +49,25 @@ fn prepare_accumulator(data_type: DataType) -> Box { Count::new().accumulator(accumulator_args).unwrap() } +fn prepare_sliding_accumulator(data_type: DataType) -> Box { + let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); + let expr = col("f", &schema).unwrap(); + let accumulator_args = AccumulatorArgs { + return_field: Field::new("f", DataType::Int64, true).into(), + schema: &schema, + expr_fields: &[expr.return_field(&schema).unwrap()], + ignore_nulls: false, + order_bys: &[], + is_reversed: false, + name: "count(distinct f)", + is_distinct: true, + exprs: &[expr], + }; + Count::new() + .create_sliding_accumulator(accumulator_args) + .unwrap() +} + fn create_i64_array(n_distinct: usize) -> Int64Array { let mut rng = StdRng::seed_from_u64(42); (0..BATCH_SIZE) @@ -216,6 +235,55 @@ fn count_distinct_benchmark(c: &mut Criterion) { } } +fn count_distinct_sliding_benchmark(c: &mut Criterion) { + fn bench_sliding_count_distinct( + c: &mut Criterion, + cardinality_name: &str, + values: ArrayRef, + window_size: usize, + ) { + let name = + format!("count_distinct_sliding i64 {cardinality_name} window_{window_size}"); + c.bench_function(&name, |b| { + b.iter(|| { + let mut accumulator = prepare_sliding_accumulator(DataType::Int64); + + for idx in 0..BATCH_SIZE { + accumulator + .update_batch(std::slice::from_ref(&values.slice(idx, 1))) + .unwrap(); + if idx >= window_size { + accumulator + .retract_batch(std::slice::from_ref( + &values.slice(idx - window_size, 1), + )) + .unwrap(); + } + } + + accumulator.evaluate().unwrap() + }) + }); + } + + let cardinalities = [("low", 20), ("mid", 80), ("high", 99)]; + let window_sizes = [256, 1024]; + + for (cardinality_name, distinct_pct) in cardinalities { + let n_distinct = BATCH_SIZE * distinct_pct / 100; + let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; + + for window_size in window_sizes { + bench_sliding_count_distinct( + c, + cardinality_name, + Arc::clone(&values), + window_size, + ); + } + } +} + /// Create group indices with uniform distribution fn create_uniform_groups(num_groups: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -454,6 +522,7 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { criterion_group!( benches, count_distinct_benchmark, + count_distinct_sliding_benchmark, count_distinct_groups_benchmark ); criterion_main!(benches); From fb8c6ed6e76de874e8c09e71e22979cb929d3ddf Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 22 Apr 2026 22:35:35 +0800 Subject: [PATCH 2/5] perf count distinct null fast path --- .../src/aggregate/count_distinct/native.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index fb9cfb379a26..27547b08f27c 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -26,8 +26,8 @@ use std::hash::Hash; use std::mem::size_of_val; use std::sync::Arc; -use arrow::array::ArrayRef; use arrow::array::PrimitiveArray; +use arrow::array::ArrayRef; use arrow::array::types::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::hash_utils::RandomState; @@ -85,11 +85,13 @@ where } let arr = as_primitive_array::(&values[0])?; - arr.iter().for_each(|value| { - if let Some(value) = value { + if arr.null_count() > 0 { + arr.iter().flatten().for_each(|value| { self.values.insert(value); - } - }); + }); + } else { + self.values.extend(arr.values().iter().copied()); + } Ok(()) } From 335210532f85364ada640395d28520820f8dddab Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 22 Apr 2026 22:38:33 +0800 Subject: [PATCH 3/5] perf count sliding path --- .../src/aggregate/count_distinct.rs | 1 + .../src/aggregate/count_distinct/native.rs | 138 +++++++++++++++++- datafusion/functions-aggregate/src/count.rs | 37 ++++- 3 files changed, 170 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs index 83cc5cded836..09208c3052ff 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs @@ -30,3 +30,4 @@ pub use native::BoolArray256DistinctCountAccumulator; pub use native::BoolArray256DistinctCountAccumulatorI8; pub use native::FloatDistinctCountAccumulator; pub use native::PrimitiveDistinctCountAccumulator; +pub use native::SlidingPrimitiveDistinctCountAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index 27547b08f27c..2571c1845960 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -20,15 +20,15 @@ //! //! [`Int64Array`]: arrow::array::Int64Array //! [`Float64Array`]: arrow::array::Float64Array -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; -use std::mem::size_of_val; +use std::mem::{size_of, size_of_val}; use std::sync::Arc; use arrow::array::PrimitiveArray; -use arrow::array::ArrayRef; use arrow::array::types::ArrowPrimitiveType; +use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType; use datafusion_common::hash_utils::RandomState; @@ -520,3 +520,135 @@ impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { size_of_val(self) + 8192 } } + +/// Sliding-window variant of [`PrimitiveDistinctCountAccumulator`]. +#[derive(Debug)] +pub struct SlidingPrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + counts: HashMap, + data_type: DataType, +} + +impl SlidingPrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + pub fn new(data_type: &DataType) -> Self { + Self { + counts: HashMap::default(), + data_type: data_type.clone(), + } + } +} + +impl Accumulator for SlidingPrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send + Debug, + T::Native: Eq + Hash, +{ + fn state(&mut self) -> datafusion_common::Result> { + let arr = Arc::new( + PrimitiveArray::::from_iter_values(self.counts.keys().cloned()) + .with_data_type(self.data_type.clone()), + ); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + if arr.null_count() == 0 { + for value in arr.values().iter() { + *self.counts.entry(*value).or_insert(0) += 1; + } + } else { + for idx in 0..arr.len() { + if arr.is_valid(idx) { + *self.counts.entry(arr.value(idx)).or_insert(0) += 1; + } + } + } + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + if arr.null_count() == 0 { + for value in arr.values().iter() { + if let Some(count) = self.counts.get_mut(value) { + *count -= 1; + if *count == 0 { + self.counts.remove(value); + } + } + } + } else { + for idx in 0..arr.len() { + if arr.is_valid(idx) { + let value = arr.value(idx); + if let Some(count) = self.counts.get_mut(&value) { + *count -= 1; + if *count == 0 { + self.counts.remove(&value); + } + } + } + } + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + assert_eq!( + states.len(), + 1, + "count_distinct states must be single array" + ); + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + *self.counts.entry(*value).or_insert(0) += 1; + } + }; + Ok(()) + }) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.counts.len() as i64))) + } + + fn supports_retract_batch(&self) -> bool { + true + } + + fn size(&self) -> usize { + let num_elements = self.counts.len(); + let fixed_size = size_of_val(self) + size_of_val(&self.counts); + + estimate_memory_size::<(T::Native, usize)>( + num_elements, + fixed_size + size_of::(), + ) + .unwrap() + } +} diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index eab36d4951a9..1b49b7088e02 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -52,6 +52,7 @@ use datafusion_functions_aggregate_common::aggregate::{ count_distinct::DictionaryCountAccumulator, count_distinct::FloatDistinctCountAccumulator, count_distinct::PrimitiveDistinctCountAccumulator, + count_distinct::SlidingPrimitiveDistinctCountAccumulator, groups_accumulator::accumulate::accumulate_indices, }; use datafusion_macros::user_doc; @@ -267,6 +268,38 @@ fn get_count_accumulator(data_type: &DataType) -> Box { } } +fn get_sliding_distinct_count_accumulator( + data_type: &DataType, +) -> Result> { + Ok(match data_type { + DataType::Int8 => Box::new( + SlidingPrimitiveDistinctCountAccumulator::::new(data_type), + ), + DataType::Int16 => Box::new( + SlidingPrimitiveDistinctCountAccumulator::::new(data_type), + ), + DataType::Int32 => Box::new( + SlidingPrimitiveDistinctCountAccumulator::::new(data_type), + ), + DataType::Int64 => Box::new( + SlidingPrimitiveDistinctCountAccumulator::::new(data_type), + ), + DataType::UInt8 => Box::new( + SlidingPrimitiveDistinctCountAccumulator::::new(data_type), + ), + DataType::UInt16 => Box::new(SlidingPrimitiveDistinctCountAccumulator::< + UInt16Type, + >::new(data_type)), + DataType::UInt32 => Box::new(SlidingPrimitiveDistinctCountAccumulator::< + UInt32Type, + >::new(data_type)), + DataType::UInt64 => Box::new(SlidingPrimitiveDistinctCountAccumulator::< + UInt64Type, + >::new(data_type)), + _ => Box::new(SlidingDistinctCountAccumulator::try_new(data_type)?), + }) +} + /// Uses optimized bitmap accumulators but separated to keep hot path small #[cold] fn get_small_int_accumulator(data_type: &DataType) -> Result> { @@ -434,9 +467,7 @@ impl AggregateUDFImpl for Count { args: AccumulatorArgs, ) -> Result> { if args.is_distinct { - let acc = - SlidingDistinctCountAccumulator::try_new(args.return_field.data_type())?; - Ok(Box::new(acc)) + get_sliding_distinct_count_accumulator(args.expr_fields[0].data_type()) } else { let acc = CountAccumulator::new(); Ok(Box::new(acc)) From f63c1b028939341bb0295a9013c673e95823bf5e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 22 Apr 2026 22:39:54 +0800 Subject: [PATCH 4/5] add test and slt test --- .../src/aggregate/count_distinct/native.rs | 27 +++++++++++++ datafusion/sqllogictest/test_files/window.slt | 40 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index 2571c1845960..1ff144f7f766 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -652,3 +652,30 @@ where .unwrap() } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Int32Type}; + + #[test] + fn sliding_primitive_distinct_count_accumulator_update_retract() { + let mut acc = + SlidingPrimitiveDistinctCountAccumulator::::new(&DataType::Int32); + let values: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + Some(2), + Some(3), + None, + ])); + + acc.update_batch(&[Arc::clone(&values)]).unwrap(); + assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(3))); + + let leaving: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3), None])); + acc.retract_batch(&[leaving]).unwrap(); + assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(2))); + } +} diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index caaf22f0adbd..57507c19c09b 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5826,6 +5826,46 @@ physical_plan 05)--------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2 06)----------DataSourceExec: partitions=2, partition_sizes=[5, 4] +statement ok +CREATE TABLE table_test_distinct_count_rows_null ( + k VARCHAR, + seq INT, + v INT +); + +statement ok +INSERT INTO table_test_distinct_count_rows_null (k, seq, v) VALUES + ('a', 1, 1), + ('a', 2, 1), + ('a', 3, NULL), + ('a', 4, 2), + ('a', 5, 1), + ('b', 1, NULL), + ('b', 2, 3), + ('b', 3, 3), + ('b', 4, 4); + +query TII +SELECT + k, + seq, + COUNT(DISTINCT v) OVER ( + PARTITION BY k + ORDER BY seq + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW + ) AS distinct_count +FROM table_test_distinct_count_rows_null +ORDER BY k, seq; +---- +a 1 1 +a 2 1 +a 3 1 +a 4 2 +a 5 2 +b 1 0 +b 2 1 +b 3 1 +b 4 2 # Add testing for distinct sum query TPII From d71d55bd95658e9c5276307349562c0214abc3b9 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 22 Apr 2026 23:39:46 +0800 Subject: [PATCH 5/5] fix clippy --- datafusion/functions-aggregate/benches/count_distinct.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 79f6a196dba5..b3b27298e992 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -239,7 +239,7 @@ fn count_distinct_sliding_benchmark(c: &mut Criterion) { fn bench_sliding_count_distinct( c: &mut Criterion, cardinality_name: &str, - values: ArrayRef, + values: &ArrayRef, window_size: usize, ) { let name = @@ -274,12 +274,7 @@ fn count_distinct_sliding_benchmark(c: &mut Criterion) { let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; for window_size in window_sizes { - bench_sliding_count_distinct( - c, - cardinality_name, - Arc::clone(&values), - window_size, - ); + bench_sliding_count_distinct(c, cardinality_name, &values, window_size); } } }