From fe226dd055c47a3f8ec2085930ff5579b12bdc25 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:17:06 +0800 Subject: [PATCH 01/22] Update min_max.rs to support dictionary scalars Return ScalarValue::Dictionary(...) in dictionary batches instead of unwrapping to inner scalars. Enhance min_max! logic to safely handle dictionary-vs-dictionary and dictionary-vs-non-dictionary comparisons. Add regression tests for raw-dictionary covering no-coercion, null-containing, and multi-batch scenarios. --- .../functions-aggregate-common/src/min_max.rs | 36 +++++-- datafusion/functions-aggregate/src/min_max.rs | 95 +++++++++++++++++++ 2 files changed, 125 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 27620221cf23c..df950323d80d9 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -413,6 +413,28 @@ macro_rules! min_max { min_max_generic!(lhs, rhs, $OP) } + ( + ScalarValue::Dictionary(key_type, lhs_inner), + ScalarValue::Dictionary(_, rhs_inner), + ) => { + let winner = min_max_generic!(lhs_inner.as_ref(), rhs_inner.as_ref(), $OP); + ScalarValue::Dictionary(key_type.clone(), Box::new(winner)) + } + + ( + ScalarValue::Dictionary(_, lhs_inner), + rhs, + ) => { + min_max_generic!(lhs_inner.as_ref(), rhs, $OP) + } + + ( + lhs, + ScalarValue::Dictionary(_, rhs_inner), + ) => { + min_max_generic!(lhs, rhs_inner.as_ref(), $OP) + } + e => { return internal_err!( "MIN/MAX is not expected to receive scalars of incompatible types {:?}", @@ -766,9 +788,10 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => { - let values = values.as_any_dictionary().values(); - min_batch(values)? + DataType::Dictionary(key_type, _) => { + let dict_values = values.as_any_dictionary().values(); + let inner = min_batch(dict_values)?; + ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) } _ => min_max_batch!(values, min), }) @@ -847,9 +870,10 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => { - let values = values.as_any_dictionary().values(); - max_batch(values)? + DataType::Dictionary(key_type, _) => { + let dict_values = values.as_any_dictionary().values(); + let inner = max_batch(dict_values)?; + ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) } _ => min_max_batch!(values, max), }) diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 9d05c57b02e93..d7aa9bfd1b5e2 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1262,4 +1262,99 @@ mod tests { assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string()))); Ok(()) } + + fn dict_scalar(key_type: DataType, inner: ScalarValue) -> ScalarValue { + ScalarValue::Dictionary(Box::new(key_type), Box::new(inner)) + } + + #[test] + fn test_min_max_dictionary_without_coercion() -> Result<()> { + let values = StringArray::from(vec!["b", "c", "a", "d"]); + let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), Some(3)]); + let dict_array = + DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); + let dict_array_ref = Arc::new(dict_array) as ArrayRef; + let dict_type = dict_array_ref.data_type().clone(); + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) + ); + Ok(()) + } + + #[test] + fn test_min_max_dictionary_with_nulls() -> Result<()> { + let values = StringArray::from(vec!["b", "c", "a"]); + let keys = Int32Array::from(vec![None, Some(0), None, Some(1), Some(2)]); + let dict_array = + DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); + let dict_array_ref = Arc::new(dict_array) as ArrayRef; + let dict_type = dict_array_ref.data_type().clone(); + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("c".to_string()))) + ); + Ok(()) + } + + #[test] + fn test_min_max_dictionary_multi_batch() -> Result<()> { + let dict_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + let values1 = StringArray::from(vec!["b", "c"]); + let keys1 = Int32Array::from(vec![Some(0), Some(1)]); + let batch1 = Arc::new( + DictionaryArray::try_new(keys1, Arc::new(values1) as ArrayRef).unwrap(), + ) as ArrayRef; + + let values2 = StringArray::from(vec!["a", "d"]); + let keys2 = Int32Array::from(vec![Some(0), Some(1)]); + let batch2 = Arc::new( + DictionaryArray::try_new(keys2, Arc::new(values2) as ArrayRef).unwrap(), + ) as ArrayRef; + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&batch1)])?; + min_acc.update_batch(&[Arc::clone(&batch2)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&batch1)])?; + max_acc.update_batch(&[Arc::clone(&batch2)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) + ); + Ok(()) + } } From caafe1ce6aee4467f929adbbaf5d6e5718567f07 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:24:31 +0800 Subject: [PATCH 02/22] Refactor dictionary min/max logic and tests Centralize dictionary batch handling for min/max operations. Streamline min_max_batch_generic to initialize from the first non-null element. Implement shared setup/assert helpers in dictionary tests to reduce repetition while preserving test coverage. --- .../functions-aggregate-common/src/min_max.rs | 65 +++++---- datafusion/functions-aggregate/src/min_max.rs | 125 ++++++++---------- 2 files changed, 93 insertions(+), 97 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index df950323d80d9..4d984fc4b0c78 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,6 +141,16 @@ macro_rules! min_max_generic { }}; } +macro_rules! min_max_dictionary { + ($VALUE:expr, $DELTA:expr, wrap $KEY_TYPE:expr, $OP:ident) => {{ + let winner = min_max_generic!($VALUE, $DELTA, $OP); + ScalarValue::Dictionary($KEY_TYPE.clone(), Box::new(winner)) + }}; + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + min_max_generic!($VALUE, $DELTA, $OP) + }}; +} + // min/max of two scalar values of the same type macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ @@ -417,22 +427,26 @@ macro_rules! min_max { ScalarValue::Dictionary(key_type, lhs_inner), ScalarValue::Dictionary(_, rhs_inner), ) => { - let winner = min_max_generic!(lhs_inner.as_ref(), rhs_inner.as_ref(), $OP); - ScalarValue::Dictionary(key_type.clone(), Box::new(winner)) + min_max_dictionary!( + lhs_inner.as_ref(), + rhs_inner.as_ref(), + wrap key_type, + $OP + ) } ( ScalarValue::Dictionary(_, lhs_inner), rhs, ) => { - min_max_generic!(lhs_inner.as_ref(), rhs, $OP) + min_max_dictionary!(lhs_inner.as_ref(), rhs, $OP) } ( lhs, ScalarValue::Dictionary(_, rhs_inner), ) => { - min_max_generic!(lhs, rhs_inner.as_ref(), $OP) + min_max_dictionary!(lhs, rhs_inner.as_ref(), $OP) } e => { @@ -445,6 +459,17 @@ macro_rules! min_max { }}; } +fn dictionary_batch_extreme( + values: &ArrayRef, + extreme_fn: fn(&ArrayRef) -> Result, +) -> Result { + let DataType::Dictionary(key_type, _) = values.data_type() else { + unreachable!("dictionary_batch_extreme requires dictionary arrays") + }; + let inner = extreme_fn(values.as_any_dictionary().values())?; + Ok(ScalarValue::Dictionary(key_type.clone(), Box::new(inner))) +} + /// An accumulator to compute the maximum value #[derive(Debug, Clone)] pub struct MaxAccumulator { @@ -788,32 +813,22 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(key_type, _) => { - let dict_values = values.as_any_dictionary().values(); - let inner = min_batch(dict_values)?; - ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) - } + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, min_batch)?, _ => min_max_batch!(values, min), }) } /// Generic min/max implementation for complex types fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - if array.len() == array.null_count() { + let mut non_null_indices = (0..array.len()).filter(|&i| !array.is_null(i)); + let Some(first_idx) = non_null_indices.next() else { return ScalarValue::try_from(array.data_type()); - } - let mut extreme = ScalarValue::try_from_array(array, 0)?; - for i in 1..array.len() { + }; + + let mut extreme = ScalarValue::try_from_array(array, first_idx)?; + for i in non_null_indices { let current = ScalarValue::try_from_array(array, i)?; - if current.is_null() { - continue; - } - if extreme.is_null() { - extreme = current; - continue; - } - let cmp = extreme.try_cmp(¤t)?; - if cmp == ordering { + if extreme.try_cmp(¤t)? == ordering { extreme = current; } } @@ -870,11 +885,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(key_type, _) => { - let dict_values = values.as_any_dictionary().values(); - let inner = max_batch(dict_values)?; - ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) - } + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, max_batch)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index d7aa9bfd1b5e2..5734f2854dd8d 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1267,94 +1267,79 @@ mod tests { ScalarValue::Dictionary(Box::new(key_type), Box::new(inner)) } - #[test] - fn test_min_max_dictionary_without_coercion() -> Result<()> { - let values = StringArray::from(vec!["b", "c", "a", "d"]); - let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), Some(3)]); - let dict_array = - DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); - let dict_array_ref = Arc::new(dict_array) as ArrayRef; - let dict_type = dict_array_ref.data_type().clone(); + fn utf8_dict_scalar(key_type: DataType, value: &str) -> ScalarValue { + dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string()))) + } + + fn string_dictionary_batch( + values: Vec<&str>, + keys: Vec>, + ) -> ArrayRef { + let values = Arc::new(StringArray::from(values)) as ArrayRef; + Arc::new(DictionaryArray::try_new(Int32Array::from(keys), values).unwrap()) + as ArrayRef + } + + fn assert_dictionary_min_max( + dict_type: &DataType, + batches: &[ArrayRef], + expected_min: &str, + expected_max: &str, + ) -> Result<()> { + let key_type = match dict_type { + DataType::Dictionary(key_type, _) => key_type.as_ref().clone(), + other => panic!("expected dictionary type, got {other:?}"), + }; - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let min_result = min_acc.evaluate()?; + let mut min_acc = MinAccumulator::try_new(dict_type)?; + for batch in batches { + min_acc.update_batch(&[Arc::clone(batch)])?; + } assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + min_acc.evaluate()?, + utf8_dict_scalar(key_type.clone(), expected_min) ); - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) - ); + let mut max_acc = MaxAccumulator::try_new(dict_type)?; + for batch in batches { + max_acc.update_batch(&[Arc::clone(batch)])?; + } + assert_eq!(max_acc.evaluate()?, utf8_dict_scalar(key_type, expected_max)); + Ok(()) } #[test] - fn test_min_max_dictionary_with_nulls() -> Result<()> { - let values = StringArray::from(vec!["b", "c", "a"]); - let keys = Int32Array::from(vec![None, Some(0), None, Some(1), Some(2)]); - let dict_array = - DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); - let dict_array_ref = Arc::new(dict_array) as ArrayRef; + fn test_min_max_dictionary_without_coercion() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + vec!["b", "c", "a", "d"], + vec![Some(0), Some(1), Some(2), Some(3)], + ); let dict_type = dict_array_ref.data_type().clone(); - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let min_result = min_acc.evaluate()?; - assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) - ); + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d") + } - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("c".to_string()))) + #[test] + fn test_min_max_dictionary_with_nulls() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + vec!["b", "c", "a"], + vec![None, Some(0), None, Some(1), Some(2)], ); - Ok(()) + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c") } #[test] fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let batch1 = + string_dictionary_batch(vec!["b", "c"], vec![Some(0), Some(1)]); + let batch2 = + string_dictionary_batch(vec!["a", "d"], vec![Some(0), Some(1)]); - let values1 = StringArray::from(vec!["b", "c"]); - let keys1 = Int32Array::from(vec![Some(0), Some(1)]); - let batch1 = Arc::new( - DictionaryArray::try_new(keys1, Arc::new(values1) as ArrayRef).unwrap(), - ) as ArrayRef; - - let values2 = StringArray::from(vec!["a", "d"]); - let keys2 = Int32Array::from(vec![Some(0), Some(1)]); - let batch2 = Arc::new( - DictionaryArray::try_new(keys2, Arc::new(values2) as ArrayRef).unwrap(), - ) as ArrayRef; - - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&batch1)])?; - min_acc.update_batch(&[Arc::clone(&batch2)])?; - let min_result = min_acc.evaluate()?; - assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) - ); - - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&batch1)])?; - max_acc.update_batch(&[Arc::clone(&batch2)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) - ); - Ok(()) + assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } } From 0bbc56e23a68f7ed7500934a939b708f4cbb644e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:30:44 +0800 Subject: [PATCH 03/22] Simplify min/max flow in dictionary handling Refactor dictionary min/max flow by removing the wrap macro arm, making re-wrapping explicit through a private helper. This separates the "choose inner winner" from the "wrap as dictionary" step for easier auditing. In `datafusion/functions-aggregate/src/min_max.rs`, update `string_dictionary_batch` to accept slices instead of owned Vecs, and introduce a small `evaluate_dictionary_accumulator` helper to streamline min/max assertions with a shared accumulator execution path, reducing repeated setup. --- .../functions-aggregate-common/src/min_max.rs | 20 +++---- datafusion/functions-aggregate/src/min_max.rs | 59 ++++++++++--------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 4d984fc4b0c78..aa802d4003f4d 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -142,13 +142,7 @@ macro_rules! min_max_generic { } macro_rules! min_max_dictionary { - ($VALUE:expr, $DELTA:expr, wrap $KEY_TYPE:expr, $OP:ident) => {{ - let winner = min_max_generic!($VALUE, $DELTA, $OP); - ScalarValue::Dictionary($KEY_TYPE.clone(), Box::new(winner)) - }}; - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ - min_max_generic!($VALUE, $DELTA, $OP) - }}; + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_generic!($VALUE, $DELTA, $OP) }}; } // min/max of two scalar values of the same type @@ -427,11 +421,13 @@ macro_rules! min_max { ScalarValue::Dictionary(key_type, lhs_inner), ScalarValue::Dictionary(_, rhs_inner), ) => { - min_max_dictionary!( + wrap_dictionary_scalar( + key_type.as_ref(), + min_max_dictionary!( lhs_inner.as_ref(), rhs_inner.as_ref(), - wrap key_type, $OP + ), ) } @@ -467,7 +463,11 @@ fn dictionary_batch_extreme( unreachable!("dictionary_batch_extreme requires dictionary arrays") }; let inner = extreme_fn(values.as_any_dictionary().values())?; - Ok(ScalarValue::Dictionary(key_type.clone(), Box::new(inner))) + Ok(wrap_dictionary_scalar(key_type.as_ref(), inner)) +} + +fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { + ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(value)) } /// An accumulator to compute the maximum value diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 5734f2854dd8d..6be7341ed10cc 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1271,13 +1271,21 @@ mod tests { dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string()))) } - fn string_dictionary_batch( - values: Vec<&str>, - keys: Vec>, - ) -> ArrayRef { - let values = Arc::new(StringArray::from(values)) as ArrayRef; - Arc::new(DictionaryArray::try_new(Int32Array::from(keys), values).unwrap()) - as ArrayRef + fn string_dictionary_batch(values: &[&str], keys: &[Option]) -> ArrayRef { + let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + + fn evaluate_dictionary_accumulator( + mut acc: impl Accumulator, + batches: &[ArrayRef], + ) -> Result { + for batch in batches { + acc.update_batch(&[Arc::clone(batch)])?; + } + acc.evaluate() } fn assert_dictionary_min_max( @@ -1291,20 +1299,17 @@ mod tests { other => panic!("expected dictionary type, got {other:?}"), }; - let mut min_acc = MinAccumulator::try_new(dict_type)?; - for batch in batches { - min_acc.update_batch(&[Arc::clone(batch)])?; - } - assert_eq!( - min_acc.evaluate()?, - utf8_dict_scalar(key_type.clone(), expected_min) - ); + let min_result = evaluate_dictionary_accumulator( + MinAccumulator::try_new(dict_type)?, + batches, + )?; + assert_eq!(min_result, utf8_dict_scalar(key_type.clone(), expected_min)); - let mut max_acc = MaxAccumulator::try_new(dict_type)?; - for batch in batches { - max_acc.update_batch(&[Arc::clone(batch)])?; - } - assert_eq!(max_acc.evaluate()?, utf8_dict_scalar(key_type, expected_max)); + let max_result = evaluate_dictionary_accumulator( + MaxAccumulator::try_new(dict_type)?, + batches, + )?; + assert_eq!(max_result, utf8_dict_scalar(key_type, expected_max)); Ok(()) } @@ -1312,8 +1317,8 @@ mod tests { #[test] fn test_min_max_dictionary_without_coercion() -> Result<()> { let dict_array_ref = string_dictionary_batch( - vec!["b", "c", "a", "d"], - vec![Some(0), Some(1), Some(2), Some(3)], + &["b", "c", "a", "d"], + &[Some(0), Some(1), Some(2), Some(3)], ); let dict_type = dict_array_ref.data_type().clone(); @@ -1323,8 +1328,8 @@ mod tests { #[test] fn test_min_max_dictionary_with_nulls() -> Result<()> { let dict_array_ref = string_dictionary_batch( - vec!["b", "c", "a"], - vec![None, Some(0), None, Some(1), Some(2)], + &["b", "c", "a"], + &[None, Some(0), None, Some(1), Some(2)], ); let dict_type = dict_array_ref.data_type().clone(); @@ -1335,10 +1340,8 @@ mod tests { fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let batch1 = - string_dictionary_batch(vec!["b", "c"], vec![Some(0), Some(1)]); - let batch2 = - string_dictionary_batch(vec!["a", "d"], vec![Some(0), Some(1)]); + let batch1 = string_dictionary_batch(&["b", "c"], &[Some(0), Some(1)]); + let batch2 = string_dictionary_batch(&["a", "d"], &[Some(0), Some(1)]); assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } From 9240400ba56efcdb96c0622480dad16130cfeebb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:50:23 +0800 Subject: [PATCH 04/22] Fix dictionary min/max behavior in DataFusion Update min_max.rs to ensure dictionary batches iterate actual array rows, comparing referenced scalar values. Unreferenced dictionary entries no longer affect MIN/MAX, and referenced null values are correctly skipped. Expanded tests to cover these changes and updated expectations Added regression tests for unreferenced and referenced null dictionary values. --- .../functions-aggregate-common/src/min_max.rs | 37 +++++++++++++------ datafusion/functions-aggregate/src/min_max.rs | 34 ++++++++++++++++- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index aa802d4003f4d..21e67cf442077 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -18,8 +18,8 @@ //! Basic min/max functionality shared across DataFusion aggregate functions use arrow::array::{ - ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, - Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, + ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, + Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, @@ -457,13 +457,23 @@ macro_rules! min_max { fn dictionary_batch_extreme( values: &ArrayRef, - extreme_fn: fn(&ArrayRef) -> Result, + ordering: Ordering, ) -> Result { - let DataType::Dictionary(key_type, _) = values.data_type() else { - unreachable!("dictionary_batch_extreme requires dictionary arrays") - }; - let inner = extreme_fn(values.as_any_dictionary().values())?; - Ok(wrap_dictionary_scalar(key_type.as_ref(), inner)) + let mut extreme: Option = None; + + for i in 0..values.len() { + let current = ScalarValue::try_from_array(values, i)?; + if current.is_null() { + continue; + } + + match &extreme { + Some(existing) if existing.try_cmp(¤t)? != ordering => {} + _ => extreme = Some(current), + } + } + + extreme.map_or_else(|| ScalarValue::try_from(values.data_type()), Ok) } fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { @@ -813,7 +823,9 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, min_batch)?, + DataType::Dictionary(_, _) => { + dictionary_batch_extreme(values, Ordering::Greater)? + } _ => min_max_batch!(values, min), }) } @@ -828,7 +840,10 @@ fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, max_batch)?, + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, Ordering::Less)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 6be7341ed10cc..09142a4858b5f 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1259,7 +1259,7 @@ mod tests { let mut max_acc = MaxAccumulator::try_new(&rt_type)?; max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; let max_result = max_acc.evaluate()?; - assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string()))); + assert_eq!(max_result, ScalarValue::Utf8(Some("d".to_string()))); Ok(()) } @@ -1278,6 +1278,16 @@ mod tests { ) as ArrayRef } + fn optional_string_dictionary_batch( + values: &[Option<&str>], + keys: &[Option], + ) -> ArrayRef { + let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + fn evaluate_dictionary_accumulator( mut acc: impl Accumulator, batches: &[ArrayRef], @@ -1336,6 +1346,28 @@ mod tests { assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c") } + #[test] + fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + &["a", "z", "zz_unused"], + &[Some(1), Some(1), None], + ); + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z") + } + + #[test] + fn test_min_max_dictionary_ignores_referenced_null_values() -> Result<()> { + let dict_array_ref = optional_string_dictionary_batch( + &[Some("b"), None, Some("a"), Some("d")], + &[Some(0), Some(1), Some(2), Some(3)], + ); + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d") + } + #[test] fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = From ed2d3fd1e44363a9f6bea2d0d2c681c357e1ae59 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 11:47:34 +0800 Subject: [PATCH 05/22] Refactor min/max logic for shared row-wise handling Consolidate row-wise min/max scan logic into a single helper in min_max.rs to ensure consistency between dictionary and generic complex-type paths. Add regression test for the float dictionary handling NaN and -inf cases, validating ordering semantics across batches. --- .../functions-aggregate-common/src/min_max.rs | 29 ++------------ datafusion/functions-aggregate/src/min_max.rs | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 21e67cf442077..8bce942bfa4c5 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -455,10 +455,7 @@ macro_rules! min_max { }}; } -fn dictionary_batch_extreme( - values: &ArrayRef, - ordering: Ordering, -) -> Result { +fn scalar_batch_extreme(values: &ArrayRef, ordering: Ordering) -> Result { let mut extreme: Option = None; for i in 0..values.len() { @@ -823,32 +820,14 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => { - dictionary_batch_extreme(values, Ordering::Greater)? - } + DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Greater)?, _ => min_max_batch!(values, min), }) } /// Generic min/max implementation for complex types fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - let mut non_null_indices = (0..array.len()).filter(|&i| !array.is_null(i)); - let Some(first_idx) = non_null_indices.next() else { - return ScalarValue::try_from(array.data_type()); - }; - - let mut extreme = ScalarValue::try_from_array(array, first_idx)?; - for i in non_null_indices { - let current = ScalarValue::try_from_array(array, i)?; - if current.is_null() { - continue; - } - if extreme.is_null() || extreme.try_cmp(¤t)? == ordering { - extreme = current; - } - } - - Ok(extreme) + scalar_batch_extreme(array, ordering) } /// dynamically-typed max(array) -> ScalarValue @@ -900,7 +879,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, Ordering::Less)?, + DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Less)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 09142a4858b5f..fc6ec7bbc0e64 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1288,6 +1288,13 @@ mod tests { ) as ArrayRef } + fn float_dictionary_batch(values: &[f32], keys: &[Option]) -> ArrayRef { + let values = Arc::new(Float32Array::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + fn evaluate_dictionary_accumulator( mut acc: impl Accumulator, batches: &[ArrayRef], @@ -1377,4 +1384,35 @@ mod tests { assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } + + #[test] + fn test_min_max_dictionary_float_with_nans() -> Result<()> { + let dict_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Float32)); + let batch1 = float_dictionary_batch(&[0.0, f32::NAN], &[Some(0), Some(1)]); + let batch2 = float_dictionary_batch(&[f32::NEG_INFINITY], &[Some(0)]); + + let min_result = evaluate_dictionary_accumulator( + MinAccumulator::try_new(&dict_type)?, + &[Arc::clone(&batch1), Arc::clone(&batch2)], + )?; + assert_eq!( + min_result, + dict_scalar( + DataType::Int32, + ScalarValue::Float32(Some(f32::NEG_INFINITY)), + ) + ); + + let max_result = evaluate_dictionary_accumulator( + MaxAccumulator::try_new(&dict_type)?, + &[batch1, batch2], + )?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Float32(Some(f32::NAN))) + ); + + Ok(()) + } } From dad6e022362ac0924997d044d3520e4a41de61f8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 11:52:41 +0800 Subject: [PATCH 06/22] Refactor dictionary handling and simplify batch logic Remove the no-op dictionary macro and single-use wrapper. Collapse dictionary handling into a normalized path and seed scalar_batch_extreme from the first non-null value. Unify row-wise batch dispatch behind a shared predicate. Apply formatting adjustments in min_max.rs as per cargo fmt. --- .../functions-aggregate-common/src/min_max.rs | 110 +++++++++--------- datafusion/functions-aggregate/src/min_max.rs | 6 +- 2 files changed, 56 insertions(+), 60 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 8bce942bfa4c5..ef48fd3f69c9b 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,10 +141,6 @@ macro_rules! min_max_generic { }}; } -macro_rules! min_max_dictionary { - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_generic!($VALUE, $DELTA, $OP) }}; -} - // min/max of two scalar values of the same type macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ @@ -417,32 +413,20 @@ macro_rules! min_max { min_max_generic!(lhs, rhs, $OP) } - ( - ScalarValue::Dictionary(key_type, lhs_inner), - ScalarValue::Dictionary(_, rhs_inner), - ) => { - wrap_dictionary_scalar( - key_type.as_ref(), - min_max_dictionary!( - lhs_inner.as_ref(), - rhs_inner.as_ref(), - $OP - ), - ) - } + (lhs, rhs) + if matches!(lhs, ScalarValue::Dictionary(_, _)) + || matches!(rhs, ScalarValue::Dictionary(_, _)) => + { + let (lhs, lhs_key_type) = dictionary_scalar_parts(lhs); + let (rhs, rhs_key_type) = dictionary_scalar_parts(rhs); + let result = min_max_generic!(lhs, rhs, $OP); - ( - ScalarValue::Dictionary(_, lhs_inner), - rhs, - ) => { - min_max_dictionary!(lhs_inner.as_ref(), rhs, $OP) - } - - ( - lhs, - ScalarValue::Dictionary(_, rhs_inner), - ) => { - min_max_dictionary!(lhs, rhs_inner.as_ref(), $OP) + match lhs_key_type.zip(rhs_key_type) { + Some((key_type, _)) => { + ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(result)) + } + None => result, + } } e => { @@ -456,25 +440,50 @@ macro_rules! min_max { } fn scalar_batch_extreme(values: &ArrayRef, ordering: Ordering) -> Result { - let mut extreme: Option = None; + let mut index = 0; + let mut extreme = loop { + if index == values.len() { + return ScalarValue::try_from(values.data_type()); + } + + let current = ScalarValue::try_from_array(values, index)?; + index += 1; - for i in 0..values.len() { - let current = ScalarValue::try_from_array(values, i)?; - if current.is_null() { - continue; + if !current.is_null() { + break current; } + }; + + while index < values.len() { + let current = ScalarValue::try_from_array(values, index)?; + index += 1; - match &extreme { - Some(existing) if existing.try_cmp(¤t)? != ordering => {} - _ => extreme = Some(current), + if !current.is_null() && extreme.try_cmp(¤t)? == ordering { + extreme = current; } } - extreme.map_or_else(|| ScalarValue::try_from(values.data_type()), Ok) + Ok(extreme) +} + +fn dictionary_scalar_parts(value: &ScalarValue) -> (&ScalarValue, Option<&DataType>) { + match value { + ScalarValue::Dictionary(key_type, inner) => { + (inner.as_ref(), Some(key_type.as_ref())) + } + other => (other, None), + } } -fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { - ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(value)) +fn is_row_wise_batch_type(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Dictionary(_, _) + ) } /// An accumulator to compute the maximum value @@ -814,22 +823,13 @@ pub fn min_batch(values: &ArrayRef) -> Result { min_binary_view ) } - DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::FixedSizeList(_, _) => { - min_max_batch_generic(values, Ordering::Greater)? + data_type if is_row_wise_batch_type(data_type) => { + scalar_batch_extreme(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Greater)?, _ => min_max_batch!(values, min), }) } -/// Generic min/max implementation for complex types -fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - scalar_batch_extreme(array, ordering) -} - /// dynamically-typed max(array) -> ScalarValue pub fn max_batch(values: &ArrayRef) -> Result { Ok(match values.data_type() { @@ -875,11 +875,9 @@ pub fn max_batch(values: &ArrayRef) -> Result { let value = value.map(|e| e.to_vec()); ScalarValue::FixedSizeBinary(*size, value) } - DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Less)?, + data_type if is_row_wise_batch_type(data_type) => { + scalar_batch_extreme(values, Ordering::Less)? + } _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index fc6ec7bbc0e64..9bd7e153d382b 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1355,10 +1355,8 @@ mod tests { #[test] fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> { - let dict_array_ref = string_dictionary_batch( - &["a", "z", "zz_unused"], - &[Some(1), Some(1), None], - ); + let dict_array_ref = + string_dictionary_batch(&["a", "z", "zz_unused"], &[Some(1), Some(1), None]); let dict_type = dict_array_ref.data_type().clone(); assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z") From b92aeef50866aa8550627e9d17aeae4d822951b0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 17 Apr 2026 18:32:04 +0800 Subject: [PATCH 07/22] fix(min_max): rename helper to scalar_row_extreme and update documentation - Renamed the helper function from `scalar_batch_extreme` to `scalar_row_extreme` - Added a doc comment explaining that this helper scans logical rows and the necessity of this for dictionary arrays (noting that comparing dictionary.values() is not semantically correct). - Updated both call sites to use the new helper name in min/max batch logic at min_max.rs --- datafusion/functions-aggregate-common/src/min_max.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index ef48fd3f69c9b..ce620891e6d16 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -439,7 +439,12 @@ macro_rules! min_max { }}; } -fn scalar_batch_extreme(values: &ArrayRef, ordering: Ordering) -> Result { +/// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. +/// +/// This path is required for dictionary arrays because comparing +/// `dictionary.values()` is not semantically correct: it can include +/// unreferenced values and ignore null key positions. +fn scalar_row_extreme(values: &ArrayRef, ordering: Ordering) -> Result { let mut index = 0; let mut extreme = loop { if index == values.len() { @@ -824,7 +829,7 @@ pub fn min_batch(values: &ArrayRef) -> Result { ) } data_type if is_row_wise_batch_type(data_type) => { - scalar_batch_extreme(values, Ordering::Greater)? + scalar_row_extreme(values, Ordering::Greater)? } _ => min_max_batch!(values, min), }) @@ -876,7 +881,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { ScalarValue::FixedSizeBinary(*size, value) } data_type if is_row_wise_batch_type(data_type) => { - scalar_batch_extreme(values, Ordering::Less)? + scalar_row_extreme(values, Ordering::Less)? } _ => min_max_batch!(values, max), }) From a80fc7713d32e39bed848c8489790386ce89b154 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 17 Apr 2026 18:33:39 +0800 Subject: [PATCH 08/22] feat(min_max): rename predicate to requires_logical_row_scan - Renamed the predicate from `is_row_wise_batch_type` to `requires_logical_row_scan` at `min_max.rs - Added a comment clarifying the strategy for handling primitive, string, and binary types using specialized kernels, while this set falls back to scalar row-by-row logical comparison. - Updated both call sites to reflect the new name at `min_max.rs` --- datafusion/functions-aggregate-common/src/min_max.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index ce620891e6d16..eed535c2c0d6d 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -480,7 +480,9 @@ fn dictionary_scalar_parts(value: &ScalarValue) -> (&ScalarValue, Option<&DataTy } } -fn is_row_wise_batch_type(data_type: &DataType) -> bool { +// Primitive, string, and binary types use specialized Arrow min/max kernels. +// These remaining types fall back to scalar row-by-row logical comparison. +fn requires_logical_row_scan(data_type: &DataType) -> bool { matches!( data_type, DataType::Struct(_) @@ -828,7 +830,7 @@ pub fn min_batch(values: &ArrayRef) -> Result { min_binary_view ) } - data_type if is_row_wise_batch_type(data_type) => { + data_type if requires_logical_row_scan(data_type) => { scalar_row_extreme(values, Ordering::Greater)? } _ => min_max_batch!(values, min), @@ -880,7 +882,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { let value = value.map(|e| e.to_vec()); ScalarValue::FixedSizeBinary(*size, value) } - data_type if is_row_wise_batch_type(data_type) => { + data_type if requires_logical_row_scan(data_type) => { scalar_row_extreme(values, Ordering::Less)? } _ => min_max_batch!(values, max), From 7ea7cb4f29d0817309dc993243d7e912112066b1 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 17 Apr 2026 18:34:58 +0800 Subject: [PATCH 09/22] feat(min_max): enhance documentation and clarify error messages - Updated macro comment to explicitly document dictionary unwrapping behavior, stating that min/max operates on logically compatible scalar values (min_max.rs) - Clarified catch-all error arm wording to emphasize logical incompatibility instead of enum-variant mismatch, changing it to "logically incompatible scalar values" (min_max.rs) --- datafusion/functions-aggregate-common/src/min_max.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index eed535c2c0d6d..3d33cae8e65d0 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,7 +141,9 @@ macro_rules! min_max_generic { }}; } -// min/max of two scalar values of the same type +// min/max of two logically compatible scalar values. +// Dictionary scalars are unwrapped to their inner values for comparison, +// then rewrapped with the dictionary key type when both inputs are dictionaries. macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ Ok(match ($VALUE, $DELTA) { @@ -431,7 +433,7 @@ macro_rules! min_max { e => { return internal_err!( - "MIN/MAX is not expected to receive scalars of incompatible types {:?}", + "MIN/MAX is not expected to receive logically incompatible scalar values {:?}", e ) } From 377fb5d751e8cbd074f5c2135371881f98ec70e2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 17 Apr 2026 18:36:24 +0800 Subject: [PATCH 10/22] feat(min_max): add dictionary key-type validation and improve error handling - Implemented explicit dictionary key-type validation for dictionary scalars in the macro dictionary branch - Introduced an error for mismatched dictionary key types instead of silently preserving the left key type - Updated macro comments to clarify that key types are validated before rewrapping --- .../functions-aggregate-common/src/min_max.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 3d33cae8e65d0..b7712c2fe3459 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -143,7 +143,8 @@ macro_rules! min_max_generic { // min/max of two logically compatible scalar values. // Dictionary scalars are unwrapped to their inner values for comparison, -// then rewrapped with the dictionary key type when both inputs are dictionaries. +// then rewrapped with the dictionary key type when both inputs are dictionaries +// after validating that their key types match. macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ Ok(match ($VALUE, $DELTA) { @@ -424,8 +425,19 @@ macro_rules! min_max { let result = min_max_generic!(lhs, rhs, $OP); match lhs_key_type.zip(rhs_key_type) { - Some((key_type, _)) => { - ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(result)) + Some((lhs_key_type, rhs_key_type)) => { + if lhs_key_type != rhs_key_type { + return internal_err!( + "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})", + lhs_key_type, + rhs_key_type + ); + } + + ScalarValue::Dictionary( + Box::new(lhs_key_type.clone()), + Box::new(result), + ) } None => result, } From 150bc6fe24c850598a0d8a17f62835d17e207aea Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 15:10:08 +0800 Subject: [PATCH 11/22] feat(min_max): rename row-scan helper and update match arms - Renamed the generic row-scan helper back to `min_max_batch_generic` at `min_max.rs:461`. - Removed the `requires_logical_row_scan` indirection. - Updated match arms at `min_max.rs:803` and `min_max.rs:855` to explicitly route `Struct`, `List`, `LargeList`, `FixedSizeList`, and `Dictionary` through `min_max_batch_generic`, including dictionary reuse as requested. --- .../functions-aggregate-common/src/min_max.rs | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index b7712c2fe3459..51f8cbd91ee4a 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -458,7 +458,7 @@ macro_rules! min_max { /// This path is required for dictionary arrays because comparing /// `dictionary.values()` is not semantically correct: it can include /// unreferenced values and ignore null key positions. -fn scalar_row_extreme(values: &ArrayRef, ordering: Ordering) -> Result { +fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { let mut index = 0; let mut extreme = loop { if index == values.len() { @@ -494,19 +494,6 @@ fn dictionary_scalar_parts(value: &ScalarValue) -> (&ScalarValue, Option<&DataTy } } -// Primitive, string, and binary types use specialized Arrow min/max kernels. -// These remaining types fall back to scalar row-by-row logical comparison. -fn requires_logical_row_scan(data_type: &DataType) -> bool { - matches!( - data_type, - DataType::Struct(_) - | DataType::List(_) - | DataType::LargeList(_) - | DataType::FixedSizeList(_, _) - | DataType::Dictionary(_, _) - ) -} - /// An accumulator to compute the maximum value #[derive(Debug, Clone)] pub struct MaxAccumulator { @@ -844,9 +831,11 @@ pub fn min_batch(values: &ArrayRef) -> Result { min_binary_view ) } - data_type if requires_logical_row_scan(data_type) => { - scalar_row_extreme(values, Ordering::Greater)? - } + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?, _ => min_max_batch!(values, min), }) } @@ -896,9 +885,11 @@ pub fn max_batch(values: &ArrayRef) -> Result { let value = value.map(|e| e.to_vec()); ScalarValue::FixedSizeBinary(*size, value) } - data_type if requires_logical_row_scan(data_type) => { - scalar_row_extreme(values, Ordering::Less)? - } + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?, _ => min_max_batch!(values, max), }) } From 7bd29e19aff16275505d106b741577655ecb930e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 15:50:41 +0800 Subject: [PATCH 12/22] feat: enhance dictionary comparison logic and add unit tests - Improved comparison behavior between dictionaries: - Same key type dictionaries now compare inner logical values and rewrap the result. - Different key type dictionaries raise an explicit internal error. - Comparisons between dictionaries and non-dictionary types now check inner logical values directly. - Updated and tightened the macro comment to clarify mixed-type dictionary support limitations. - Added focused unit tests for: - Dictionary vs scalar comparison - Same-key dictionary rewrapping - Mismatched dictionary key types - Incompatible dictionary and plain scalar comparisons --- .../functions-aggregate-common/src/min_max.rs | 161 ++++++++++++++---- 1 file changed, 128 insertions(+), 33 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 51f8cbd91ee4a..7a1fc71a60f0e 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -142,9 +142,9 @@ macro_rules! min_max_generic { } // min/max of two logically compatible scalar values. -// Dictionary scalars are unwrapped to their inner values for comparison, -// then rewrapped with the dictionary key type when both inputs are dictionaries -// after validating that their key types match. +// Dictionary scalars participate by comparing their inner logical values. +// When both inputs are dictionaries, matching key types are preserved in the +// result; differing key types remain an unexpected invariant violation. macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ Ok(match ($VALUE, $DELTA) { @@ -416,31 +416,38 @@ macro_rules! min_max { min_max_generic!(lhs, rhs, $OP) } - (lhs, rhs) - if matches!(lhs, ScalarValue::Dictionary(_, _)) - || matches!(rhs, ScalarValue::Dictionary(_, _)) => - { - let (lhs, lhs_key_type) = dictionary_scalar_parts(lhs); - let (rhs, rhs_key_type) = dictionary_scalar_parts(rhs); - let result = min_max_generic!(lhs, rhs, $OP); - - match lhs_key_type.zip(rhs_key_type) { - Some((lhs_key_type, rhs_key_type)) => { - if lhs_key_type != rhs_key_type { - return internal_err!( - "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})", - lhs_key_type, - rhs_key_type - ); - } - - ScalarValue::Dictionary( - Box::new(lhs_key_type.clone()), - Box::new(result), - ) - } - None => result, + ( + ScalarValue::Dictionary(lhs_key_type, lhs), + ScalarValue::Dictionary(rhs_key_type, rhs), + ) => { + if lhs_key_type != rhs_key_type { + return internal_err!( + "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})", + lhs_key_type, + rhs_key_type + ); } + + let result = dictionary_inner_scalar_min_max( + lhs.as_ref(), + rhs.as_ref(), + choose_min_max!($OP), + )?; + ScalarValue::Dictionary(lhs_key_type.clone(), Box::new(result)) + } + (ScalarValue::Dictionary(_, lhs), rhs) => { + dictionary_inner_scalar_min_max( + lhs.as_ref(), + rhs, + choose_min_max!($OP), + )? + } + (lhs, ScalarValue::Dictionary(_, rhs)) => { + dictionary_inner_scalar_min_max( + lhs, + rhs.as_ref(), + choose_min_max!($OP), + )? } e => { @@ -485,12 +492,15 @@ fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result (&ScalarValue, Option<&DataType>) { - match value { - ScalarValue::Dictionary(key_type, inner) => { - (inner.as_ref(), Some(key_type.as_ref())) - } - other => (other, None), +fn dictionary_inner_scalar_min_max( + lhs: &ScalarValue, + rhs: &ScalarValue, + ordering: Ordering, +) -> Result { + match ordering { + Ordering::Greater => min_max!(lhs, rhs, min), + Ordering::Less => min_max!(lhs, rhs, max), + Ordering::Equal => unreachable!("min/max comparisons do not use equal ordering"), } } @@ -893,3 +903,88 @@ pub fn max_batch(values: &ArrayRef) -> Result { _ => min_max_batch!(values, max), }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> { + let dictionary = ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(1.0))), + ); + let scalar = ScalarValue::Float32(Some(2.0)); + + let result: Result = + min_max!(&dictionary, &scalar, max); + let result = result?; + + assert_eq!(result, ScalarValue::Float32(Some(2.0))); + Ok(()) + } + + #[test] + fn min_max_dictionary_same_key_type_rewraps_result() -> Result<()> { + let lhs = ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(1.0))), + ); + let rhs = ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(2.0))), + ); + + let result: Result = min_max!(&lhs, &rhs, max); + let result = result?; + + assert_eq!( + result, + ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(2.0))), + ) + ); + Ok(()) + } + + #[test] + fn min_max_dictionary_different_key_types_error() -> Result<()> { + let lhs = ScalarValue::Dictionary( + Box::new(DataType::Int8), + Box::new(ScalarValue::Float32(Some(1.0))), + ); + let rhs = ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(2.0))), + ); + + let error: DataFusionError = min_max!(&lhs, &rhs, max).unwrap_err(); + + assert!( + error + .to_string() + .contains("dictionary scalars with different key types") + ); + Ok(()) + } + + #[test] + fn min_max_dictionary_and_incompatible_scalar_error() -> Result<()> { + let dictionary = ScalarValue::Dictionary( + Box::new(DataType::Int32), + Box::new(ScalarValue::Float32(Some(1.0))), + ); + let scalar = ScalarValue::Int32(Some(2)); + + let error: DataFusionError = + min_max!(&dictionary, &scalar, max).unwrap_err(); + + assert!( + error + .to_string() + .contains("logically incompatible scalar values") + ); + Ok(()) + } +} From 47f75b239a3461573f5d8950dc6cc3a014c1a135 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 16:49:06 +0800 Subject: [PATCH 13/22] fix: extract scalar comparison logic into min_max_scalar function - Resolved non-local return issue in macro by moving scalar comparison logic to `min_max_scalar(...) -> Result`. - Maintained `min_max!` as a thin wrapper for existing call sites. - Updated dictionary error-path tests to directly call `min_max_scalar`, preserving behavior while enhancing testability for error cases. --- .../functions-aggregate-common/src/min_max.rs | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 7a1fc71a60f0e..7959c8b360c1a 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -145,9 +145,9 @@ macro_rules! min_max_generic { // Dictionary scalars participate by comparing their inner logical values. // When both inputs are dictionaries, matching key types are preserved in the // result; differing key types remain an unexpected invariant violation. -macro_rules! min_max { +macro_rules! min_max_scalar_impl { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ - Ok(match ($VALUE, $DELTA) { + match ($VALUE, $DELTA) { (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null, ( lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss), @@ -456,7 +456,25 @@ macro_rules! min_max { e ) } - }) + } + }}; +} + +fn min_max_scalar( + lhs: &ScalarValue, + rhs: &ScalarValue, + ordering: Ordering, +) -> Result { + match ordering { + Ordering::Greater => Ok(min_max_scalar_impl!(lhs, rhs, min)), + Ordering::Less => Ok(min_max_scalar_impl!(lhs, rhs, max)), + Ordering::Equal => unreachable!("min/max comparisons do not use equal ordering"), + } +} + +macro_rules! min_max { + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) }}; } @@ -497,11 +515,7 @@ fn dictionary_inner_scalar_min_max( rhs: &ScalarValue, ordering: Ordering, ) -> Result { - match ordering { - Ordering::Greater => min_max!(lhs, rhs, min), - Ordering::Less => min_max!(lhs, rhs, max), - Ordering::Equal => unreachable!("min/max comparisons do not use equal ordering"), - } + min_max_scalar(lhs, rhs, ordering) } /// An accumulator to compute the maximum value @@ -916,9 +930,7 @@ mod tests { ); let scalar = ScalarValue::Float32(Some(2.0)); - let result: Result = - min_max!(&dictionary, &scalar, max); - let result = result?; + let result = min_max_scalar(&dictionary, &scalar, Ordering::Less)?; assert_eq!(result, ScalarValue::Float32(Some(2.0))); Ok(()) @@ -935,8 +947,7 @@ mod tests { Box::new(ScalarValue::Float32(Some(2.0))), ); - let result: Result = min_max!(&lhs, &rhs, max); - let result = result?; + let result = min_max_scalar(&lhs, &rhs, Ordering::Less)?; assert_eq!( result, @@ -959,7 +970,8 @@ mod tests { Box::new(ScalarValue::Float32(Some(2.0))), ); - let error: DataFusionError = min_max!(&lhs, &rhs, max).unwrap_err(); + let error: DataFusionError = + min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err(); assert!( error @@ -978,7 +990,7 @@ mod tests { let scalar = ScalarValue::Int32(Some(2)); let error: DataFusionError = - min_max!(&dictionary, &scalar, max).unwrap_err(); + min_max_scalar(&dictionary, &scalar, Ordering::Less).unwrap_err(); assert!( error From bd8f1adb903f802602774bb97bca59660b4fdc79 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 17:16:01 +0800 Subject: [PATCH 14/22] feat(aggregate): simplify min/max helper and enhance testing for Dictionary(Int8, Utf8) - Removed redundant `dictionary_inner_scalar_min_max` helper and invoked `min_max_scalar(...)` directly in the dictionary match arms to streamline code. - Added end-to-end aggregate test for `Dictionary(Int8, Utf8)` via `test_min_max_dictionary_int8_keys`. - Introduced a generic test helper for building string dictionaries with various key types, reducing setup duplication. --- .../functions-aggregate-common/src/min_max.rs | 27 +++----------- datafusion/functions-aggregate/src/min_max.rs | 35 +++++++++++++++---- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 7959c8b360c1a..4e97c2b0c904a 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -428,26 +428,15 @@ macro_rules! min_max_scalar_impl { ); } - let result = dictionary_inner_scalar_min_max( - lhs.as_ref(), - rhs.as_ref(), - choose_min_max!($OP), - )?; + let result = + min_max_scalar(lhs.as_ref(), rhs.as_ref(), choose_min_max!($OP))?; ScalarValue::Dictionary(lhs_key_type.clone(), Box::new(result)) } (ScalarValue::Dictionary(_, lhs), rhs) => { - dictionary_inner_scalar_min_max( - lhs.as_ref(), - rhs, - choose_min_max!($OP), - )? + min_max_scalar(lhs.as_ref(), rhs, choose_min_max!($OP))? } (lhs, ScalarValue::Dictionary(_, rhs)) => { - dictionary_inner_scalar_min_max( - lhs, - rhs.as_ref(), - choose_min_max!($OP), - )? + min_max_scalar(lhs, rhs.as_ref(), choose_min_max!($OP))? } e => { @@ -510,14 +499,6 @@ fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result Result { - min_max_scalar(lhs, rhs, ordering) -} - /// An accumulator to compute the maximum value #[derive(Debug, Clone)] pub struct MaxAccumulator { diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 9bd7e153d382b..65756b4843268 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1004,12 +1004,13 @@ mod tests { use super::*; use arrow::{ array::{ - DictionaryArray, Float32Array, Int32Array, IntervalDayTimeArray, - IntervalMonthDayNanoArray, IntervalYearMonthArray, StringArray, + Array, DictionaryArray, Float32Array, Int32Array, Int8Array, + IntervalDayTimeArray, IntervalMonthDayNanoArray, PrimitiveArray, + IntervalYearMonthArray, StringArray, }, datatypes::{ - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, - IntervalYearMonthType, + ArrowDictionaryKeyType, IntervalDayTimeType, IntervalMonthDayNanoType, + IntervalUnit, IntervalYearMonthType, }, }; use std::sync::Arc; @@ -1272,10 +1273,18 @@ mod tests { } fn string_dictionary_batch(values: &[&str], keys: &[Option]) -> ArrayRef { + string_dictionary_batch_with_keys(Int32Array::from(keys.to_vec()), values) + } + + fn string_dictionary_batch_with_keys( + keys: PrimitiveArray, + values: &[&str], + ) -> ArrayRef + where + K: ArrowDictionaryKeyType, + { let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef; - Arc::new( - DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), - ) as ArrayRef + Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef } fn optional_string_dictionary_batch( @@ -1383,6 +1392,18 @@ mod tests { assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } + #[test] + fn test_min_max_dictionary_int8_keys() -> Result<()> { + let dict_type = + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)); + let dict_array_ref = string_dictionary_batch_with_keys( + Int8Array::from(vec![Some(0), Some(1), Some(2), Some(3)]), + &["b", "c", "a", "d"], + ); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d") + } + #[test] fn test_min_max_dictionary_float_with_nans() -> Result<()> { let dict_type = From bca94bebff410bdbc01c39638bf31303dc25b52b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 18:26:40 +0800 Subject: [PATCH 15/22] chore: rename variables in min_max.rs for clarity - Updated dictionary bindings to use explicit names: - `lhs_dict_key_type` - `rhs_dict_key_type` - `lhs_dict_value` - `rhs_dict_value` - Renamed mixed-arm bindings to better distinguish between scalar and dictionary-inner values: - `rhs_scalar` - `lhs_scalar` - `rhs_dict_value` - `lhs_dict_value` This improves consistency in reading each arm by role without introducing new errors. --- .../functions-aggregate-common/src/min_max.rs | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 4e97c2b0c904a..4d60551d61f74 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -417,26 +417,29 @@ macro_rules! min_max_scalar_impl { } ( - ScalarValue::Dictionary(lhs_key_type, lhs), - ScalarValue::Dictionary(rhs_key_type, rhs), + ScalarValue::Dictionary(lhs_dict_key_type, lhs_dict_value), + ScalarValue::Dictionary(rhs_dict_key_type, rhs_dict_value), ) => { - if lhs_key_type != rhs_key_type { + if lhs_dict_key_type != rhs_dict_key_type { return internal_err!( "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})", - lhs_key_type, - rhs_key_type + lhs_dict_key_type, + rhs_dict_key_type ); } - let result = - min_max_scalar(lhs.as_ref(), rhs.as_ref(), choose_min_max!($OP))?; - ScalarValue::Dictionary(lhs_key_type.clone(), Box::new(result)) + let result = min_max_scalar( + lhs_dict_value.as_ref(), + rhs_dict_value.as_ref(), + choose_min_max!($OP), + )?; + ScalarValue::Dictionary(lhs_dict_key_type.clone(), Box::new(result)) } - (ScalarValue::Dictionary(_, lhs), rhs) => { - min_max_scalar(lhs.as_ref(), rhs, choose_min_max!($OP))? + (ScalarValue::Dictionary(_, lhs_dict_value), rhs_scalar) => { + min_max_scalar(lhs_dict_value.as_ref(), rhs_scalar, choose_min_max!($OP))? } - (lhs, ScalarValue::Dictionary(_, rhs)) => { - min_max_scalar(lhs, rhs.as_ref(), choose_min_max!($OP))? + (lhs_scalar, ScalarValue::Dictionary(_, rhs_dict_value)) => { + min_max_scalar(lhs_scalar, rhs_dict_value.as_ref(), choose_min_max!($OP))? } e => { @@ -462,9 +465,7 @@ fn min_max_scalar( } macro_rules! min_max { - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ - min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) - }}; + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) }}; } /// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. From ccbff59ed3d1d41e44401243508fee12226f5e4b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 18:42:17 +0800 Subject: [PATCH 16/22] feat: refactor min_max to utilize choose_min_max for improved internal matching - Removed the call to min_max_scalar within min_max function. - Implemented internal matching using choose_min_max!($OP). - Updated return values to provide Ok with min or max based on Ordering result. - Added unreachable!() for Ordering::Equal case for error handling. --- .../functions-aggregate-common/src/min_max.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 4d60551d61f74..4944650df674a 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,6 +141,18 @@ macro_rules! min_max_generic { }}; } +macro_rules! min_max { + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + match choose_min_max!($OP) { + Ordering::Greater => Ok(min_max_scalar_impl!($VALUE, $DELTA, min)), + Ordering::Less => Ok(min_max_scalar_impl!($VALUE, $DELTA, max)), + Ordering::Equal => { + unreachable!("min/max comparisons do not use equal ordering") + } + } + }}; +} + // min/max of two logically compatible scalar values. // Dictionary scalars participate by comparing their inner logical values. // When both inputs are dictionaries, matching key types are preserved in the @@ -464,10 +476,6 @@ fn min_max_scalar( } } -macro_rules! min_max { - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) }}; -} - /// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. /// /// This path is required for dictionary arrays because comparing From 77a518ec2d8785a6341c08e01d38fa0ebd5e4d2f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 19:50:04 +0800 Subject: [PATCH 17/22] feat: reintroduce min_max_batch_generic function for dictionary array handling --- .../functions-aggregate-common/src/min_max.rs | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 4944650df674a..ee91f8d632949 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -476,38 +476,6 @@ fn min_max_scalar( } } -/// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. -/// -/// This path is required for dictionary arrays because comparing -/// `dictionary.values()` is not semantically correct: it can include -/// unreferenced values and ignore null key positions. -fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { - let mut index = 0; - let mut extreme = loop { - if index == values.len() { - return ScalarValue::try_from(values.data_type()); - } - - let current = ScalarValue::try_from_array(values, index)?; - index += 1; - - if !current.is_null() { - break current; - } - }; - - while index < values.len() { - let current = ScalarValue::try_from_array(values, index)?; - index += 1; - - if !current.is_null() && extreme.try_cmp(¤t)? == ordering { - extreme = current; - } - } - - Ok(extreme) -} - /// An accumulator to compute the maximum value #[derive(Debug, Clone)] pub struct MaxAccumulator { @@ -854,6 +822,38 @@ pub fn min_batch(values: &ArrayRef) -> Result { }) } +/// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. +/// +/// This path is required for dictionary arrays because comparing +/// `dictionary.values()` is not semantically correct: it can include +/// unreferenced values and ignore null key positions. +fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { + let mut index = 0; + let mut extreme = loop { + if index == values.len() { + return ScalarValue::try_from(values.data_type()); + } + + let current = ScalarValue::try_from_array(values, index)?; + index += 1; + + if !current.is_null() { + break current; + } + }; + + while index < values.len() { + let current = ScalarValue::try_from_array(values, index)?; + index += 1; + + if !current.is_null() && extreme.try_cmp(¤t)? == ordering { + extreme = current; + } + } + + Ok(extreme) +} + /// dynamically-typed max(array) -> ScalarValue pub fn max_batch(values: &ArrayRef) -> Result { Ok(match values.data_type() { From 0b8592d0b8ad53427a8363a0a329f158667ae219 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 21 Apr 2026 19:58:00 +0800 Subject: [PATCH 18/22] feat: reorder imports in min_max.rs for improved clarity Modified the import statements in the `min_max.rs` file to enhance readability by grouping related types together. This change organizes the code structure and follows standard conventions. --- datafusion/functions-aggregate/src/min_max.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 65756b4843268..78ceba9779310 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1004,9 +1004,9 @@ mod tests { use super::*; use arrow::{ array::{ - Array, DictionaryArray, Float32Array, Int32Array, Int8Array, - IntervalDayTimeArray, IntervalMonthDayNanoArray, PrimitiveArray, - IntervalYearMonthArray, StringArray, + Array, DictionaryArray, Float32Array, Int8Array, Int32Array, + IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray, + PrimitiveArray, StringArray, }, datatypes::{ ArrowDictionaryKeyType, IntervalDayTimeType, IntervalMonthDayNanoType, From a34ddf11756ad6dccab1bec917771f42f5132aed Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 22 Apr 2026 13:23:14 +0800 Subject: [PATCH 19/22] docs: update helper documentation in min_max.rs for dictionary routing responsibility - Clarified caller responsibility for dictionary routing in the helper docs. - Maintained caller-side correctness routing in min_max.rs at lines 820 and 906. - Added regression coverage with a new test in min_max.rs at line 996 to ensure min/max are computed from logical rows instead of raw dictionary values, including unreferenced entries and null-key positions. --- .../functions-aggregate-common/src/min_max.rs | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index ee91f8d632949..e9861487164e8 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -824,9 +824,9 @@ pub fn min_batch(values: &ArrayRef) -> Result { /// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. /// -/// This path is required for dictionary arrays because comparing -/// `dictionary.values()` is not semantically correct: it can include -/// unreferenced values and ignore null key positions. +/// Callers are responsible for routing dictionary arrays to this helper. +/// Passing `dictionary.values()` is semantically incorrect because it can +/// include unreferenced dictionary entries and ignore null key positions. fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { let mut index = 0; let mut extreme = loop { @@ -911,6 +911,8 @@ pub fn max_batch(values: &ArrayRef) -> Result { #[cfg(test)] mod tests { use super::*; + use arrow::array::DictionaryArray; + use std::sync::Arc; #[test] fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> { @@ -989,4 +991,24 @@ mod tests { ); Ok(()) } + + #[test] + fn min_max_batch_dictionary_uses_logical_rows() -> Result<()> { + let keys = Int8Array::from(vec![Some(1), None, Some(1), Some(1)]); + let values = Arc::new(StringArray::from(vec!["zzz", "bbb", "aaa"])); + let array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; + + let min = min_batch(&array)?; + let max = max_batch(&array)?; + + let expected = ScalarValue::Dictionary( + Box::new(DataType::Int8), + Box::new(ScalarValue::Utf8(Some("bbb".to_string()))), + ); + + assert_eq!(min, expected); + assert_eq!(max, expected); + + Ok(()) + } } From ba96f77bfc2d7d5d2bdacc7f463b634da8ad311e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 22 Apr 2026 13:59:24 +0800 Subject: [PATCH 20/22] feat(min_max): update min_max_batch_generic to handle raw values from DictionaryArray - Removed redundant commentary in `min_max.rs` regarding dictionary arrays. - Improved handling of dictionary array values by explicitly extracting and processing raw values. - Added assertions in unit tests to validate the results when operating on raw values from the dictionary. --- datafusion/functions-aggregate-common/src/min_max.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index e9861487164e8..a88ed5e1447d3 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -823,10 +823,6 @@ pub fn min_batch(values: &ArrayRef) -> Result { } /// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. -/// -/// Callers are responsible for routing dictionary arrays to this helper. -/// Passing `dictionary.values()` is semantically incorrect because it can -/// include unreferenced dictionary entries and ignore null key positions. fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { let mut index = 0; let mut extreme = loop { @@ -911,7 +907,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow::array::DictionaryArray; + use arrow::array::{AsArray, DictionaryArray}; use std::sync::Arc; #[test] @@ -997,9 +993,12 @@ mod tests { let keys = Int8Array::from(vec![Some(1), None, Some(1), Some(1)]); let values = Arc::new(StringArray::from(vec!["zzz", "bbb", "aaa"])); let array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; + let raw_values = array.as_any_dictionary().values(); let min = min_batch(&array)?; let max = max_batch(&array)?; + let raw_min = min_batch(raw_values)?; + let raw_max = max_batch(raw_values)?; let expected = ScalarValue::Dictionary( Box::new(DataType::Int8), @@ -1008,6 +1007,8 @@ mod tests { assert_eq!(min, expected); assert_eq!(max, expected); + assert_eq!(raw_min, ScalarValue::Utf8(Some("aaa".to_string()))); + assert_eq!(raw_max, ScalarValue::Utf8(Some("zzz".to_string()))); Ok(()) } From 2669a30ba79a76b5cb7700d571b57ed3f401cc65 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 22 Apr 2026 14:04:19 +0800 Subject: [PATCH 21/22] Revert "feat(min_max): update min_max_batch_generic to handle raw values from DictionaryArray" This reverts commit f1bffe32b5f7493725138bdab04b6f58e8db651a. --- datafusion/functions-aggregate-common/src/min_max.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index a88ed5e1447d3..e9861487164e8 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -823,6 +823,10 @@ pub fn min_batch(values: &ArrayRef) -> Result { } /// Finds the min/max by scanning logical rows via `ScalarValue::try_from_array`. +/// +/// Callers are responsible for routing dictionary arrays to this helper. +/// Passing `dictionary.values()` is semantically incorrect because it can +/// include unreferenced dictionary entries and ignore null key positions. fn min_max_batch_generic(values: &ArrayRef, ordering: Ordering) -> Result { let mut index = 0; let mut extreme = loop { @@ -907,7 +911,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow::array::{AsArray, DictionaryArray}; + use arrow::array::DictionaryArray; use std::sync::Arc; #[test] @@ -993,12 +997,9 @@ mod tests { let keys = Int8Array::from(vec![Some(1), None, Some(1), Some(1)]); let values = Arc::new(StringArray::from(vec!["zzz", "bbb", "aaa"])); let array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; - let raw_values = array.as_any_dictionary().values(); let min = min_batch(&array)?; let max = max_batch(&array)?; - let raw_min = min_batch(raw_values)?; - let raw_max = max_batch(raw_values)?; let expected = ScalarValue::Dictionary( Box::new(DataType::Int8), @@ -1007,8 +1008,6 @@ mod tests { assert_eq!(min, expected); assert_eq!(max, expected); - assert_eq!(raw_min, ScalarValue::Utf8(Some("aaa".to_string()))); - assert_eq!(raw_max, ScalarValue::Utf8(Some("zzz".to_string()))); Ok(()) } From e716c92737840c271f53ecdf6b71f6ddd62951f7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 22 Apr 2026 14:09:34 +0800 Subject: [PATCH 22/22] feat(tests): enhance dictionary array tests with raw values extraction - Added `AsArray` import for `DictionaryArray` to facilitate raw values extraction. - Updated tests to include assertions for raw minimum values from the dictionary array, improving test coverage and validation accuracy. --- datafusion/functions-aggregate-common/src/min_max.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index e9861487164e8..ca2939cf5aa16 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -911,7 +911,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow::array::DictionaryArray; + use arrow::array::{AsArray, DictionaryArray}; use std::sync::Arc; #[test] @@ -997,6 +997,8 @@ mod tests { let keys = Int8Array::from(vec![Some(1), None, Some(1), Some(1)]); let values = Arc::new(StringArray::from(vec!["zzz", "bbb", "aaa"])); let array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; + let raw_values = array.as_any_dictionary().values(); + let raw_min = min_batch(raw_values)?; let min = min_batch(&array)?; let max = max_batch(&array)?; @@ -1006,6 +1008,10 @@ mod tests { Box::new(ScalarValue::Utf8(Some("bbb".to_string()))), ); + // raw_min is "aaa" because it is the min of the values, but min/max of the dictionary should be "bbb" + // because the null key is ignored and all non-null keys point to "bbb". + assert_ne!(raw_min, expected); + assert_eq!(min, expected); assert_eq!(max, expected);