Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ pub use native::BoolArray256DistinctCountAccumulator;
pub use native::BoolArray256DistinctCountAccumulatorI8;
pub use native::FloatDistinctCountAccumulator;
pub use native::PrimitiveDistinctCountAccumulator;
pub use native::SlidingPrimitiveDistinctCountAccumulator;
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
//!
//! [`Int64Array`]: arrow::array::Int64Array
//! [`Float64Array`]: arrow::array::Float64Array
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::size_of_val;
use std::mem::{size_of, size_of_val};
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::array::PrimitiveArray;
use arrow::array::types::ArrowPrimitiveType;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
use datafusion_common::hash_utils::RandomState;

Expand Down Expand Up @@ -85,11 +85,13 @@ where
}

let arr = as_primitive_array::<T>(&values[0])?;
arr.iter().for_each(|value| {
if let Some(value) = value {
if arr.null_count() > 0 {
arr.iter().flatten().for_each(|value| {
self.values.insert(value);
}
});
});
} else {
self.values.extend(arr.values().iter().copied());
}

Ok(())
}
Expand Down Expand Up @@ -518,3 +520,162 @@ impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
size_of_val(self) + 8192
}
}

/// Sliding-window variant of [`PrimitiveDistinctCountAccumulator`].
#[derive(Debug)]
pub struct SlidingPrimitiveDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send,
T::Native: Eq + Hash,
{
counts: HashMap<T::Native, usize, RandomState>,
data_type: DataType,
}

impl<T> SlidingPrimitiveDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send,
T::Native: Eq + Hash,
{
pub fn new(data_type: &DataType) -> Self {
Self {
counts: HashMap::default(),
data_type: data_type.clone(),
}
}
}

impl<T> Accumulator for SlidingPrimitiveDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send + Debug,
T::Native: Eq + Hash,
{
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let arr = Arc::new(
PrimitiveArray::<T>::from_iter_values(self.counts.keys().cloned())
.with_data_type(self.data_type.clone()),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<T>(&values[0])?;
if arr.null_count() == 0 {
for value in arr.values().iter() {
*self.counts.entry(*value).or_insert(0) += 1;
}
} else {
for idx in 0..arr.len() {
if arr.is_valid(idx) {
*self.counts.entry(arr.value(idx)).or_insert(0) += 1;
}
}
}
Ok(())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<T>(&values[0])?;
if arr.null_count() == 0 {
for value in arr.values().iter() {
if let Some(count) = self.counts.get_mut(value) {
*count -= 1;
if *count == 0 {
self.counts.remove(value);
}
}
}
} else {
for idx in 0..arr.len() {
if arr.is_valid(idx) {
let value = arr.value(idx);
if let Some(count) = self.counts.get_mut(&value) {
*count -= 1;
if *count == 0 {
self.counts.remove(&value);
}
}
}
}
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(
states.len(),
1,
"count_distinct states must be single array"
);

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<T>(&list)?;
for value in list.values().iter() {
*self.counts.entry(*value).or_insert(0) += 1;
}
};
Ok(())
})
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.counts.len() as i64)))
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
let num_elements = self.counts.len();
let fixed_size = size_of_val(self) + size_of_val(&self.counts);

estimate_memory_size::<(T::Native, usize)>(
num_elements,
fixed_size + size_of::<RandomState>(),
)
.unwrap()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Int32Type};

#[test]
fn sliding_primitive_distinct_count_accumulator_update_retract() {
let mut acc =
SlidingPrimitiveDistinctCountAccumulator::<Int32Type>::new(&DataType::Int32);
let values: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(2),
Some(3),
None,
]));

acc.update_batch(&[Arc::clone(&values)]).unwrap();
assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(3)));

let leaving: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3), None]));
acc.retract_batch(&[leaving]).unwrap();
assert_eq!(acc.evaluate().unwrap(), ScalarValue::Int64(Some(2)));
}
}
64 changes: 64 additions & 0 deletions datafusion/functions-aggregate/benches/count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ fn prepare_accumulator(data_type: DataType) -> Box<dyn Accumulator> {
Count::new().accumulator(accumulator_args).unwrap()
}

fn prepare_sliding_accumulator(data_type: DataType) -> Box<dyn Accumulator> {
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
let expr = col("f", &schema).unwrap();
let accumulator_args = AccumulatorArgs {
return_field: Field::new("f", DataType::Int64, true).into(),
schema: &schema,
expr_fields: &[expr.return_field(&schema).unwrap()],
ignore_nulls: false,
order_bys: &[],
is_reversed: false,
name: "count(distinct f)",
is_distinct: true,
exprs: &[expr],
};
Count::new()
.create_sliding_accumulator(accumulator_args)
.unwrap()
}

fn create_i64_array(n_distinct: usize) -> Int64Array {
let mut rng = StdRng::seed_from_u64(42);
(0..BATCH_SIZE)
Expand Down Expand Up @@ -216,6 +235,50 @@ fn count_distinct_benchmark(c: &mut Criterion) {
}
}

fn count_distinct_sliding_benchmark(c: &mut Criterion) {
fn bench_sliding_count_distinct(
c: &mut Criterion,
cardinality_name: &str,
values: &ArrayRef,
window_size: usize,
) {
let name =
format!("count_distinct_sliding i64 {cardinality_name} window_{window_size}");
c.bench_function(&name, |b| {
b.iter(|| {
let mut accumulator = prepare_sliding_accumulator(DataType::Int64);

for idx in 0..BATCH_SIZE {
accumulator
.update_batch(std::slice::from_ref(&values.slice(idx, 1)))
.unwrap();
if idx >= window_size {
accumulator
.retract_batch(std::slice::from_ref(
&values.slice(idx - window_size, 1),
))
.unwrap();
}
}

accumulator.evaluate().unwrap()
})
});
}

let cardinalities = [("low", 20), ("mid", 80), ("high", 99)];
let window_sizes = [256, 1024];

for (cardinality_name, distinct_pct) in cardinalities {
let n_distinct = BATCH_SIZE * distinct_pct / 100;
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;

for window_size in window_sizes {
bench_sliding_count_distinct(c, cardinality_name, &values, window_size);
}
}
}

/// Create group indices with uniform distribution
fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
let mut rng = StdRng::seed_from_u64(42);
Expand Down Expand Up @@ -454,6 +517,7 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) {
criterion_group!(
benches,
count_distinct_benchmark,
count_distinct_sliding_benchmark,
count_distinct_groups_benchmark
);
criterion_main!(benches);
37 changes: 34 additions & 3 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_functions_aggregate_common::aggregate::{
count_distinct::DictionaryCountAccumulator,
count_distinct::FloatDistinctCountAccumulator,
count_distinct::PrimitiveDistinctCountAccumulator,
count_distinct::SlidingPrimitiveDistinctCountAccumulator,
groups_accumulator::accumulate::accumulate_indices,
};
use datafusion_macros::user_doc;
Expand Down Expand Up @@ -267,6 +268,38 @@ fn get_count_accumulator(data_type: &DataType) -> Box<dyn Accumulator> {
}
}

fn get_sliding_distinct_count_accumulator(
data_type: &DataType,
) -> Result<Box<dyn Accumulator>> {
Ok(match data_type {
DataType::Int8 => Box::new(
SlidingPrimitiveDistinctCountAccumulator::<Int8Type>::new(data_type),
),
DataType::Int16 => Box::new(
SlidingPrimitiveDistinctCountAccumulator::<Int16Type>::new(data_type),
),
DataType::Int32 => Box::new(
SlidingPrimitiveDistinctCountAccumulator::<Int32Type>::new(data_type),
),
DataType::Int64 => Box::new(
SlidingPrimitiveDistinctCountAccumulator::<Int64Type>::new(data_type),
),
DataType::UInt8 => Box::new(
SlidingPrimitiveDistinctCountAccumulator::<UInt8Type>::new(data_type),
),
DataType::UInt16 => Box::new(SlidingPrimitiveDistinctCountAccumulator::<
UInt16Type,
>::new(data_type)),
DataType::UInt32 => Box::new(SlidingPrimitiveDistinctCountAccumulator::<
UInt32Type,
>::new(data_type)),
DataType::UInt64 => Box::new(SlidingPrimitiveDistinctCountAccumulator::<
UInt64Type,
>::new(data_type)),
_ => Box::new(SlidingDistinctCountAccumulator::try_new(data_type)?),
})
}

/// Uses optimized bitmap accumulators but separated to keep hot path small
#[cold]
fn get_small_int_accumulator(data_type: &DataType) -> Result<Box<dyn Accumulator>> {
Expand Down Expand Up @@ -434,9 +467,7 @@ impl AggregateUDFImpl for Count {
args: AccumulatorArgs,
) -> Result<Box<dyn Accumulator>> {
if args.is_distinct {
let acc =
SlidingDistinctCountAccumulator::try_new(args.return_field.data_type())?;
Ok(Box::new(acc))
get_sliding_distinct_count_accumulator(args.expr_fields[0].data_type())
} else {
let acc = CountAccumulator::new();
Ok(Box::new(acc))
Expand Down
40 changes: 40 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5826,6 +5826,46 @@ physical_plan
05)--------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
06)----------DataSourceExec: partitions=2, partition_sizes=[5, 4]

statement ok
CREATE TABLE table_test_distinct_count_rows_null (
k VARCHAR,
seq INT,
v INT
);

statement ok
INSERT INTO table_test_distinct_count_rows_null (k, seq, v) VALUES
('a', 1, 1),
('a', 2, 1),
('a', 3, NULL),
('a', 4, 2),
('a', 5, 1),
('b', 1, NULL),
('b', 2, 3),
('b', 3, 3),
('b', 4, 4);

query TII
SELECT
k,
seq,
COUNT(DISTINCT v) OVER (
PARTITION BY k
ORDER BY seq
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS distinct_count
FROM table_test_distinct_count_rows_null
ORDER BY k, seq;
----
a 1 1
a 2 1
a 3 1
a 4 2
a 5 2
b 1 0
b 2 1
b 3 1
b 4 2

# Add testing for distinct sum
query TPII
Expand Down
Loading