diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs index 54b3b505d0f..53ca2588700 100644 --- a/arrow-ord/src/cmp.rs +++ b/arrow-ord/src/cmp.rs @@ -27,10 +27,10 @@ use arrow_array::cast::AsArray; use arrow_array::types::{ByteArrayType, ByteViewType}; use arrow_array::{ AnyDictionaryArray, Array, ArrowNativeTypeOp, BooleanArray, Datum, FixedSizeBinaryArray, - GenericByteArray, GenericByteViewArray, downcast_primitive_array, + GenericByteArray, GenericByteViewArray, downcast_primitive_array, downcast_run_array, }; use arrow_buffer::bit_util::ceil; -use arrow_buffer::{BooleanBuffer, NullBuffer}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use arrow_schema::{ArrowError, DataType}; use arrow_select::take::take; use std::cmp::Ordering; @@ -239,6 +239,12 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result Result BooleanBuffer { let d = downcast_primitive_array! { - (l, r) => apply(op, l.values().as_ref(), l_s, l_v, r.values().as_ref(), r_s, r_v), - (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v, r.as_boolean(), r_s, r_v), - (Utf8, Utf8) => apply(op, l.as_string::(), l_s, l_v, r.as_string::(), r_s, r_v), - (Utf8View, Utf8View) => apply(op, l.as_string_view(), l_s, l_v, r.as_string_view(), r_s, r_v), - (LargeUtf8, LargeUtf8) => apply(op, l.as_string::(), l_s, l_v, r.as_string::(), r_s, r_v), - (Binary, Binary) => apply(op, l.as_binary::(), l_s, l_v, r.as_binary::(), r_s, r_v), - (BinaryView, BinaryView) => apply(op, l.as_binary_view(), l_s, l_v, r.as_binary_view(), r_s, r_v), - (LargeBinary, LargeBinary) => apply(op, l.as_binary::(), l_s, l_v, r.as_binary::(), r_s, r_v), - (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v), + (l, r) => apply(op, l.values().as_ref(), &l_side, r.values().as_ref(), &r_side), + (Boolean, Boolean) => apply(op, l.as_boolean(), &l_side, r.as_boolean(), &r_side), + (Utf8, Utf8) => apply(op, l.as_string::(), &l_side, r.as_string::(), &r_side), + (Utf8View, Utf8View) => apply(op, l.as_string_view(), &l_side, r.as_string_view(), &r_side), + (LargeUtf8, LargeUtf8) => apply(op, l.as_string::(), &l_side, r.as_string::(), &r_side), + (Binary, Binary) => apply(op, l.as_binary::(), &l_side, r.as_binary::(), &r_side), + (BinaryView, BinaryView) => apply(op, l.as_binary_view(), &l_side, r.as_binary_view(), &r_side), + (LargeBinary, LargeBinary) => apply(op, l.as_binary::(), &l_side, r.as_binary::(), &r_side), + (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, l.as_fixed_size_binary(), &l_side, r.as_fixed_size_binary(), &r_side), (Null, Null) => None, _ => unreachable!(), }; @@ -340,29 +357,44 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result { + is_scalar: bool, + dict: Option<&'a dyn AnyDictionaryArray>, + ree: Option<&'a ReeInfo<'a>>, +} + +impl SideInfo<'_> { + fn has_indirection(&self) -> bool { + self.dict.is_some() || self.ree.is_some() + } +} + /// Perform a potentially vectored `op` on the provided `ArrayOrd` fn apply( op: Op, l: T, - l_s: bool, - l_v: Option<&dyn AnyDictionaryArray>, + l_info: &SideInfo, r: T, - r_s: bool, - r_v: Option<&dyn AnyDictionaryArray>, + r_info: &SideInfo, ) -> Option { if l.len() == 0 || r.len() == 0 { return None; // Handle empty dictionaries } - if !l_s && !r_s && (l_v.is_some() || r_v.is_some()) { - // Not scalar and at least one side has a dictionary, need to perform vectored comparison - let l_v = l_v - .map(|x| x.normalized_keys()) - .unwrap_or_else(|| (0..l.len()).collect()); + if !l_info.is_scalar + && !r_info.is_scalar + && (l_info.has_indirection() || r_info.has_indirection()) + { + // Both non-scalar with indirection. Pure REE-vs-REE uses segment-based + // bulk comparison; other combinations fall back to index vectors. + if let (Some(li), None, Some(ri), None) = (l_info.ree, l_info.dict, r_info.ree, r_info.dict) + { + return Some(apply_ree(op, l, li, r, ri)); + } - let r_v = r_v - .map(|x| x.normalized_keys()) - .unwrap_or_else(|| (0..r.len()).collect()); + let l_v = logical_indices(l.len(), l_info.dict, l_info.ree); + let r_v = logical_indices(r.len(), r_info.dict, r_info.ree); assert_eq!(l_v.len(), r_v.len()); // Sanity check @@ -375,8 +407,12 @@ fn apply( Op::GreaterEqual => apply_op_vectored(l, &l_v, r, &r_v, true, T::is_lt), }) } else { - let l_s = l_s.then(|| l_v.map(|x| x.normalized_keys()[0]).unwrap_or_default()); - let r_s = r_s.then(|| r_v.map(|x| x.normalized_keys()[0]).unwrap_or_default()); + let l_s = l_info + .is_scalar + .then(|| scalar_index(l_info.dict, l_info.ree)); + let r_s = r_info + .is_scalar + .then(|| scalar_index(r_info.dict, r_info.ree)); let buffer = match op { Op::Equal | Op::NotDistinct => apply_op(l, l_s, r, r_s, false, T::is_eq), @@ -387,16 +423,87 @@ fn apply( Op::GreaterEqual => apply_op(l, l_s, r, r_s, true, T::is_lt), }; - // If a side had a dictionary, and was not scalar, we need to materialize this - Some(match (l_v, r_v) { - (Some(l_v), _) if l_s.is_none() => take_bits(l_v, buffer), - (_, Some(r_v)) if r_s.is_none() => take_bits(r_v, buffer), - _ => buffer, + // Expand the physical-length result back to logical length. + // Find the non-scalar side that needs expansion (at most one). + let side = if l_s.is_none() { l_info } else { r_info }; + let buffer = match side.dict { + Some(d) => take_bits(d, buffer), + None => buffer, + }; + Some(match side.ree { + Some(info) => expand_from_runs(info, buffer), + None => buffer, }) } } -/// Perform a take operation on `buffer` with the given dictionary +/// Build a logical→physical index vector for one side of a non-scalar comparison. +fn logical_indices( + len: usize, + dict: Option<&dyn AnyDictionaryArray>, + ree: Option<&ReeInfo>, +) -> Vec { + match (dict, ree) { + (Some(d), Some(info)) => { + let keys = d.normalized_keys(); + ree_physical_indices(info) + .iter() + .map(|&i| keys[i]) + .collect() + } + (Some(d), None) => d.normalized_keys(), + (None, Some(info)) => ree_physical_indices(info), + (None, None) => (0..len).collect(), + } +} + +fn ree_physical_indices(info: &ReeInfo) -> Vec { + let run_ends = info.run_ends_as_usize(); + let end = info.offset + info.len; + let mut indices = Vec::with_capacity(info.len); + let mut pos = info.offset; + for (physical, &run_end) in run_ends.iter().enumerate().skip(info.start_physical) { + let run_end = run_end.min(end); + indices.extend(std::iter::repeat_n(physical, run_end - pos)); + pos = run_end; + if pos >= end { + break; + } + } + indices +} + +fn scalar_index(dict: Option<&dyn AnyDictionaryArray>, ree: Option<&ReeInfo>) -> usize { + let idx = ree.map(|r| r.start_physical).unwrap_or_default(); + dict.map(|d| d.normalized_keys()[idx]).unwrap_or(idx) +} + +/// Expand a physical-length BooleanBuffer to logical length by bulk-appending +/// each run's result. Zero allocation — downcasts internally to access typed +/// run_ends directly. +fn expand_from_runs(info: &ReeInfo, buffer: BooleanBuffer) -> BooleanBuffer { + let array = info.array; + downcast_run_array!( + array => { + let run_ends = array.run_ends().values(); + let end = info.offset + info.len; + let mut builder = BooleanBufferBuilder::new(info.len); + let mut pos = info.offset; + for (physical, re) in run_ends.iter().enumerate().skip(info.start_physical) { + let run_end = re.as_usize().min(end); + // SAFETY: physical < buffer.len() (one entry per run in the values array) + builder.append_n(run_end - pos, unsafe { buffer.value_unchecked(physical) }); + pos = run_end; + if pos >= end { + break; + } + } + builder.finish() + }, + _ => unreachable!() + ) +} + fn take_bits(v: &dyn AnyDictionaryArray, buffer: BooleanBuffer) -> BooleanBuffer { let array = take(&BooleanArray::new(buffer, None), v.keys(), None).unwrap(); array.as_boolean().values().clone() @@ -493,6 +600,62 @@ fn apply_op_vectored( }) } +/// Dispatch `op` for a REE-vs-REE comparison (no dictionary on either side) +/// using segment-based bulk comparison. +fn apply_ree(op: Op, l: T, l_info: &ReeInfo, r: T, r_info: &ReeInfo) -> BooleanBuffer { + match op { + Op::Equal | Op::NotDistinct => apply_op_ree_segments(l, l_info, r, r_info, false, T::is_eq), + Op::NotEqual | Op::Distinct => apply_op_ree_segments(l, l_info, r, r_info, true, T::is_eq), + Op::Less => apply_op_ree_segments(l, l_info, r, r_info, false, T::is_lt), + Op::LessEqual => apply_op_ree_segments(r, r_info, l, l_info, true, T::is_lt), + Op::Greater => apply_op_ree_segments(r, r_info, l, l_info, false, T::is_lt), + Op::GreaterEqual => apply_op_ree_segments(l, l_info, r, r_info, true, T::is_lt), + } +} + +/// Compare two REE arrays by walking both run_ends simultaneously, comparing +/// once per aligned segment and bulk-filling the result. +fn apply_op_ree_segments( + l: T, + l_info: &ReeInfo, + r: T, + r_info: &ReeInfo, + neg: bool, + op: fn(T::Item, T::Item) -> bool, +) -> BooleanBuffer { + let l_re = l_info.run_ends_as_usize(); + let r_re = r_info.run_ends_as_usize(); + let end = l_info.len; + let mut builder = BooleanBufferBuilder::new(l_info.len); + let mut l_phys = l_info.start_physical; + let mut r_phys = r_info.start_physical; + let mut pos = 0usize; + + while pos < end { + // SAFETY: l_phys/r_phys are bounded by their respective run counts — + // they advance only when pos reaches a run boundary, and pos < end + // guarantees we haven't exhausted all runs. + // Subtract each side's offset to convert absolute run-ends to logical + // coordinates so that arrays with different offsets align correctly. + let l_end = (unsafe { *l_re.get_unchecked(l_phys) } - l_info.offset).min(end); + let r_end = (unsafe { *r_re.get_unchecked(r_phys) } - r_info.offset).min(end); + let seg_end = l_end.min(r_end); + + let result = unsafe { op(l.value_unchecked(l_phys), r.value_unchecked(r_phys)) } ^ neg; + builder.append_n(seg_end - pos, result); + + pos = seg_end; + if pos >= l_end { + l_phys += 1; + } + if pos >= r_end { + r_phys += 1; + } + } + + builder.finish() +} + trait ArrayOrd { type Item: Copy; @@ -715,10 +878,48 @@ pub fn compare_byte_view( unsafe { GenericByteViewArray::compare_unchecked(left, left_idx, right, right_idx) } } +/// Run-end encoding metadata for one side of a comparison. Holds a reference +/// to the original REE array for deferred typed access to run_ends. +struct ReeInfo<'a> { + array: &'a dyn Array, + offset: usize, + start_physical: usize, + len: usize, +} + +impl ReeInfo<'_> { + /// Materialize run_ends as `Vec`. + fn run_ends_as_usize(&self) -> Vec { + let array = self.array; + downcast_run_array!( + array => array.run_ends().values().iter().map(|v| v.as_usize()).collect(), + _ => unreachable!() + ) + } +} + +/// If `array` is RunEndEncoded, return its physical values array and run metadata. +fn ree_info_opt(array: &dyn Array) -> Option<(&dyn Array, ReeInfo<'_>)> { + downcast_run_array!( + array => { + let run_ends = array.run_ends(); + let info = ReeInfo { + array, + offset: run_ends.offset(), + start_physical: run_ends.get_start_physical_index(), + len: run_ends.len(), + }; + Some((array.values().as_ref(), info)) + }, + _ => None + ) +} + #[cfg(test)] mod tests { use std::sync::Arc; + use arrow_array::types::Int32Type; use arrow_array::{DictionaryArray, Int32Array, Scalar, StringArray}; use arrow_buffer::{Buffer, ScalarBuffer}; @@ -1092,6 +1293,184 @@ mod tests { !array.data_buffers().is_empty() } + fn ree_str(runs: &[(Option<&str>, i32)]) -> arrow_array::RunArray { + let mut ends = Vec::new(); + let mut vals = Vec::new(); + let mut end = 0i32; + for &(v, n) in runs { + end += n; + ends.push(end); + vals.push(v); + } + arrow_array::RunArray::try_new(&Int32Array::from(ends), &StringArray::from(vals)).unwrap() + } + + #[test] + fn test_ree_scalar() { + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]); + + let s = Scalar::new(StringArray::from(vec!["b"])); + assert_eq!( + eq(&a, &s).unwrap(), + BooleanArray::from(vec![false, false, false, true, true]) + ); + assert_eq!( + neq(&a, &s).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]) + ); + assert_eq!( + lt(&a, &s).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]) + ); + assert_eq!(lt_eq(&a, &s).unwrap(), BooleanArray::from(vec![true; 5])); + assert_eq!(gt(&a, &s).unwrap(), BooleanArray::from(vec![false; 5])); + assert_eq!( + gt_eq(&a, &s).unwrap(), + BooleanArray::from(vec![false, false, false, true, true]) + ); + + // Scalar on left side + let scalar = Scalar::new(ree_str(&[(Some("a"), 1)])); + assert_eq!( + eq(&scalar, &a).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]) + ); + assert_eq!( + lt_eq(&scalar, &a).unwrap(), + BooleanArray::from(vec![true; 5]) + ); + + // REE-wrapped scalar (DataFusion's ScalarValue::RunEndEncoded) + assert_eq!( + eq(&a, &Scalar::new(ree_str(&[(Some("a"), 1)]))).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]), + ); + + // Single run + let a = ree_str(&[(Some("x"), 100)]); + let r = eq(&a, &Scalar::new(StringArray::from(vec!["x"]))).unwrap(); + assert_eq!(r.true_count(), 100); + } + + #[test] + fn test_ree_ree() { + // Different run boundaries, all ops. + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]); + let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]); + // a=[a,a,a,b,b] vs b=[a,a,b,b,b] + assert_eq!( + eq(&a, &b).unwrap(), + BooleanArray::from(vec![true, true, false, true, true]) + ); + assert_eq!( + neq(&a, &b).unwrap(), + BooleanArray::from(vec![false, false, true, false, false]) + ); + assert_eq!( + lt(&a, &b).unwrap(), + BooleanArray::from(vec![false, false, true, false, false]) + ); + assert_eq!( + gt_eq(&a, &b).unwrap(), + BooleanArray::from(vec![true, true, false, true, true]) + ); + } + + #[test] + fn test_ree_sliced() { + // Scalar with sliced REE + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(2, 3); + let s = Scalar::new(StringArray::from(vec!["b"])); + assert_eq!( + eq(&a, &s).unwrap(), + BooleanArray::from(vec![false, true, true]) + ); + + // Both sides sliced, REE vs REE + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4); + let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(1, 4); + assert_eq!( + eq(&a, &b).unwrap(), + BooleanArray::from(vec![true, false, true, true]) + ); + } + + #[test] + fn test_ree_sliced_different_offsets() { + // left expands to ["a", "a", "b", "b"] + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4); + // right expands to ["a", "a", "b", "b"] + let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(0, 4); + assert_eq!( + eq(&a, &b).unwrap(), + BooleanArray::from(vec![true, true, true, true]) + ); + } + + #[test] + fn test_ree_nullable() { + let a = ree_str(&[(Some("a"), 2), (None, 1), (Some("b"), 2)]); + + // Scalar: null-aware ops + let s = Scalar::new(StringArray::from(vec!["a"])); + assert_eq!( + not_distinct(&a, &s).unwrap(), + BooleanArray::from(vec![true, true, false, false, false]) + ); + assert_eq!( + distinct(&a, &s).unwrap(), + BooleanArray::from(vec![false, false, true, true, true]) + ); + + // REE vs REE with nulls + let b = ree_str(&[(Some("a"), 3), (None, 2)]); + assert_eq!( + eq(&a, &b).unwrap(), + BooleanArray::from(vec![Some(true), Some(true), None, None, None]) + ); + } + + #[test] + fn test_ree_mixed() { + let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]); + + // REE vs plain array + let b = StringArray::from(vec!["a", "a", "b", "b", "b"]); + assert_eq!( + eq(&a, &b).unwrap(), + BooleanArray::from(vec![true, true, false, true, true]) + ); + + // REE wrapping a DictionaryArray + let dict = DictionaryArray::new( + Int32Array::from(vec![1, 0]), + Arc::new(StringArray::from(vec!["x", "y"])), + ); + let ree_dict = + arrow_array::RunArray::try_new(&Int32Array::from(vec![3, 5]), &dict).unwrap(); + let s = Scalar::new(StringArray::from(vec!["y"])); + assert_eq!( + eq(&ree_dict, &s).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]) + ); + + // Numeric REE (Int32 values) + let ree_int = arrow_array::RunArray::try_new( + &Int32Array::from(vec![3, 5]), + &Int32Array::from(vec![10, 20]), + ) + .unwrap(); + assert_eq!( + eq(&ree_int, &Scalar::new(Int32Array::from(vec![10]))).unwrap(), + BooleanArray::from(vec![true, true, true, false, false]) + ); + + // Empty REE + let empty_a = ree_str(&[(Some("a"), 1)]).slice(0, 0); + let empty_b = ree_str(&[(Some("b"), 1)]).slice(0, 0); + assert_eq!(eq(&empty_a, &empty_b).unwrap().len(), 0); + } + #[test] fn test_compare_byte_view() { let a = arrow_array::StringViewArray::from(vec![ diff --git a/arrow/benches/comparison_kernels.rs b/arrow/benches/comparison_kernels.rs index 00c01374b62..3b3e72241ca 100644 --- a/arrow/benches/comparison_kernels.rs +++ b/arrow/benches/comparison_kernels.rs @@ -530,6 +530,42 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("eq dictionary[10] string[4])", |b| { b.iter(|| eq(&dict_arr_a, &dict_arr_b).unwrap()) }); + + // RunEndEncoded benchmarks + + let mut group = c.benchmark_group("ree_comparison"); + + for (physical, logical) in [(64, SIZE), (1024, SIZE), (SIZE / 2, SIZE)] { + let ree_a = create_primitive_run_array::(logical, physical); + let ree_b = create_primitive_run_array::(logical, physical); + let scalar = Int32Array::from(vec![1]); + + let tag = format!("phys={physical},log={logical}"); + + group.bench_function(format!("eq_ree_scalar({tag})"), |b| { + b.iter(|| eq(&ree_a, &Scalar::new(&scalar)).unwrap()) + }); + + group.bench_function(format!("lt_ree_scalar({tag})"), |b| { + b.iter(|| lt(&ree_a, &Scalar::new(&scalar)).unwrap()) + }); + + group.bench_function(format!("eq_ree_ree({tag})"), |b| { + b.iter(|| eq(&ree_a, &ree_b).unwrap()) + }); + + group.bench_function(format!("lt_ree_ree({tag})"), |b| { + b.iter(|| lt(&ree_a, &ree_b).unwrap()) + }); + + let flat = create_primitive_array_with_seed::(logical, 0., 42); + + group.bench_function(format!("eq_ree_flat({tag})"), |b| { + b.iter(|| eq(&ree_a, &flat).unwrap()) + }); + } + + group.finish(); } criterion_group!(benches, add_benchmark);