Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fe226dd
Update min_max.rs to support dictionary scalars
kosiew Apr 2, 2026
caafe1c
Refactor dictionary min/max logic and tests
kosiew Apr 2, 2026
0bbc56e
Simplify min/max flow in dictionary handling
kosiew Apr 2, 2026
9240400
Fix dictionary min/max behavior in DataFusion
kosiew Apr 2, 2026
ed2d3fd
Refactor min/max logic for shared row-wise handling
kosiew Apr 7, 2026
dad6e02
Refactor dictionary handling and simplify batch logic
kosiew Apr 7, 2026
b92aeef
fix(min_max): rename helper to scalar_row_extreme and update document…
kosiew Apr 17, 2026
a80fc77
feat(min_max): rename predicate to requires_logical_row_scan
kosiew Apr 17, 2026
7ea7cb4
feat(min_max): enhance documentation and clarify error messages
kosiew Apr 17, 2026
377fb5d
feat(min_max): add dictionary key-type validation and improve error h…
kosiew Apr 17, 2026
150bc6f
feat(min_max): rename row-scan helper and update match arms
kosiew Apr 21, 2026
7bd29e1
feat: enhance dictionary comparison logic and add unit tests
kosiew Apr 21, 2026
47f75b2
fix: extract scalar comparison logic into min_max_scalar function
kosiew Apr 21, 2026
bd8f1ad
feat(aggregate): simplify min/max helper and enhance testing for Dict…
kosiew Apr 21, 2026
bca94be
chore: rename variables in min_max.rs for clarity
kosiew Apr 21, 2026
ccbff59
feat: refactor min_max to utilize choose_min_max for improved interna…
kosiew Apr 21, 2026
77a518e
feat: reintroduce min_max_batch_generic function for dictionary array…
kosiew Apr 21, 2026
0b8592d
feat: reorder imports in min_max.rs for improved clarity
kosiew Apr 21, 2026
a34ddf1
docs: update helper documentation in min_max.rs for dictionary routin…
kosiew Apr 22, 2026
ba96f77
feat(min_max): update min_max_batch_generic to handle raw values from…
kosiew Apr 22, 2026
2669a30
Revert "feat(min_max): update min_max_batch_generic to handle raw val…
kosiew Apr 22, 2026
e716c92
feat(tests): enhance dictionary array tests with raw values extraction
kosiew Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 203 additions & 39 deletions datafusion/functions-aggregate-common/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! Basic min/max functionality shared across DataFusion aggregate functions

use arrow::array::{
ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
Expand Down Expand Up @@ -141,10 +141,25 @@ macro_rules! min_max_generic {
}};
}

// min/max of two scalar values of the same type
macro_rules! min_max {
($VALUE:expr, $DELTA:expr, $OP:ident) => {{
Ok(match ($VALUE, $DELTA) {
match choose_min_max!($OP) {
Ordering::Greater => Ok(min_max_scalar_impl!($VALUE, $DELTA, min)),
Ordering::Less => Ok(min_max_scalar_impl!($VALUE, $DELTA, max)),
Ordering::Equal => {
unreachable!("min/max comparisons do not use equal ordering")
}
}
}};
}

// min/max of two logically compatible scalar values.
// Dictionary scalars participate by comparing their inner logical values.
// When both inputs are dictionaries, matching key types are preserved in the
// result; differing key types remain an unexpected invariant violation.
macro_rules! min_max_scalar_impl {
($VALUE:expr, $DELTA:expr, $OP:ident) => {{
match ($VALUE, $DELTA) {
(ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
(
lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
Expand Down Expand Up @@ -413,16 +428,54 @@ macro_rules! min_max {
min_max_generic!(lhs, rhs, $OP)
}

(
ScalarValue::Dictionary(lhs_dict_key_type, lhs_dict_value),
ScalarValue::Dictionary(rhs_dict_key_type, rhs_dict_value),
) => {
if lhs_dict_key_type != rhs_dict_key_type {
return internal_err!(
"MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})",
lhs_dict_key_type,
rhs_dict_key_type
);
}

let result = min_max_scalar(
lhs_dict_value.as_ref(),
rhs_dict_value.as_ref(),
choose_min_max!($OP),
)?;
ScalarValue::Dictionary(lhs_dict_key_type.clone(), Box::new(result))
}
(ScalarValue::Dictionary(_, lhs_dict_value), rhs_scalar) => {
min_max_scalar(lhs_dict_value.as_ref(), rhs_scalar, choose_min_max!($OP))?
}
(lhs_scalar, ScalarValue::Dictionary(_, rhs_dict_value)) => {
min_max_scalar(lhs_scalar, rhs_dict_value.as_ref(), choose_min_max!($OP))?
}

e => {
return internal_err!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
"MIN/MAX is not expected to receive logically incompatible scalar values {:?}",
e
)
}
})
}
}};
}

fn min_max_scalar(
lhs: &ScalarValue,
rhs: &ScalarValue,
ordering: Ordering,
) -> Result<ScalarValue> {
match ordering {
Ordering::Greater => Ok(min_max_scalar_impl!(lhs, rhs, min)),
Ordering::Less => Ok(min_max_scalar_impl!(lhs, rhs, max)),
Ordering::Equal => unreachable!("min/max comparisons do not use equal ordering"),
}
}

/// An accumulator to compute the maximum value
#[derive(Debug, Clone)]
pub struct MaxAccumulator {
Expand Down Expand Up @@ -760,37 +813,40 @@ pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
min_binary_view
)
}
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
DataType::FixedSizeList(_, _) => {
min_max_batch_generic(values, Ordering::Greater)?
}
DataType::Dictionary(_, _) => {
let values = values.as_any_dictionary().values();
min_batch(values)?
}
DataType::Struct(_)
| DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _)
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
_ => min_max_batch!(values, min),
})
}

/// Generic min/max implementation for complex types
fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
if array.len() == array.null_count() {
return ScalarValue::try_from(array.data_type());
}
let mut extreme = ScalarValue::try_from_array(array, 0)?;
for i in 1..array.len() {
let current = ScalarValue::try_from_array(array, i)?;
if current.is_null() {
continue;
/// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`.
///
/// Callers are responsible for routing dictionary arrays to this helper.
/// Passing `dictionary.values()` is semantically incorrect because it can
/// include unreferenced dictionary entries and ignore null key positions.
fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
let mut index = 0;
Comment on lines +825 to +831
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still do not understand why this function is being changed. The provided reasoning makes no sense considering the original version already works as intended. Especially considering it is not the responsibility of this function that dictionary.values() isn't semantically correct; it is the responsibility of the caller, which is not being fixed by refactoring this function. Can we please revert the changes to this function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not the responsibility of this function that dictionary.values() isn't semantically correct

I amended the comment and locked this down with a new test.

..why this function is being changed....the original version already works as intended.

The changed min_max_batch_generic shape is a secondary improvement (independent of the bug fix): it first finds the first non-null row and only then enters comparison.

The two-phase version is easier to follow because it separates setup from comparison.

  1. Phase 1 finds the first non-null value.
  2. Phase 2 compares remaining non-null values against that baseline.

After phase 1, extreme is always non-null, so the loop only needs to:

  1. Skip null current values.
  2. Compare/update for non-null values.

The old single-loop mixed these concerns in every iteration (current null, extreme null, or compare), which made the logic more branchy and harder to read.

let mut extreme = loop {
if index == values.len() {
return ScalarValue::try_from(values.data_type());
}
if extreme.is_null() {
extreme = current;
continue;

let current = ScalarValue::try_from_array(values, index)?;
index += 1;

if !current.is_null() {
break current;
}
let cmp = extreme.try_cmp(&current)?;
if cmp == ordering {
};

while index < values.len() {
let current = ScalarValue::try_from_array(values, index)?;
index += 1;

if !current.is_null() && extreme.try_cmp(&current)? == ordering {
extreme = current;
}
}
Expand Down Expand Up @@ -843,14 +899,122 @@ pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
let value = value.map(|e| e.to_vec());
ScalarValue::FixedSizeBinary(*size, value)
}
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
DataType::Dictionary(_, _) => {
let values = values.as_any_dictionary().values();
max_batch(values)?
}
DataType::Struct(_)
| DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _)
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
_ => min_max_batch!(values, max),
})
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{AsArray, DictionaryArray};
use std::sync::Arc;

#[test]
fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> {
let dictionary = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(1.0))),
);
let scalar = ScalarValue::Float32(Some(2.0));

let result = min_max_scalar(&dictionary, &scalar, Ordering::Less)?;

assert_eq!(result, ScalarValue::Float32(Some(2.0)));
Ok(())
}

#[test]
fn min_max_dictionary_same_key_type_rewraps_result() -> Result<()> {
let lhs = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(1.0))),
);
let rhs = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(2.0))),
);

let result = min_max_scalar(&lhs, &rhs, Ordering::Less)?;

assert_eq!(
result,
ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(2.0))),
)
);
Ok(())
}

#[test]
fn min_max_dictionary_different_key_types_error() -> Result<()> {
let lhs = ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Float32(Some(1.0))),
);
let rhs = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(2.0))),
);

let error: DataFusionError =
min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();

assert!(
error
.to_string()
.contains("dictionary scalars with different key types")
);
Ok(())
}

#[test]
fn min_max_dictionary_and_incompatible_scalar_error() -> Result<()> {
let dictionary = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Float32(Some(1.0))),
);
let scalar = ScalarValue::Int32(Some(2));

let error: DataFusionError =
min_max_scalar(&dictionary, &scalar, Ordering::Less).unwrap_err();

assert!(
error
.to_string()
.contains("logically incompatible scalar values")
);
Ok(())
}

#[test]
fn min_max_batch_dictionary_uses_logical_rows() -> Result<()> {
let keys = Int8Array::from(vec![Some(1), None, Some(1), Some(1)]);
let values = Arc::new(StringArray::from(vec!["zzz", "bbb", "aaa"]));
let array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef;
let raw_values = array.as_any_dictionary().values();
let raw_min = min_batch(raw_values)?;

let min = min_batch(&array)?;
let max = max_batch(&array)?;

let expected = ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Utf8(Some("bbb".to_string()))),
);

// raw_min is "aaa" because it is the min of the values, but min/max of the dictionary should be "bbb"
// because the null key is ignored and all non-null keys point to "bbb".
assert_ne!(raw_min, expected);

assert_eq!(min, expected);
assert_eq!(max, expected);

Ok(())
}
}
Loading
Loading