Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions java/testfiles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.91.0"
components = ["rust-src", "rustfmt", "clippy", "rust-analyzer"]
profile = "minimal"
profile = "minimal"
198 changes: 198 additions & 0 deletions vortex-array/public-api.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions vortex-array/src/aggregate_fn/fns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub mod is_sorted;
pub mod last;
pub mod min_max;
pub mod nan_count;
pub mod row_count;
pub mod sum;
143 changes: 143 additions & 0 deletions vortex-array/src/aggregate_fn/fns/row_count/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::EmptyOptions;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::scalar::Scalar;

/// Count the total number of elements in an array, including nulls.
///
/// Applies to all types. Returns a `u64` count.
/// The identity value is zero.
///
/// Unlike [`Count`][crate::aggregate_fn::fns::count::Count], this aggregate includes
/// null elements in the total. It is primarily used as a marker inside pruning
/// predicates that need to refer to the scope row count.
#[derive(Clone, Debug)]
pub struct RowCount;

impl AggregateFnVTable for RowCount {
type Options = EmptyOptions;
type Partial = u64;

fn id(&self) -> AggregateFnId {
AggregateFnId::new("vortex.row_count")
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
unimplemented!("RowCount is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
Some(DType::Primitive(PType::U64, Nullability::NonNullable))
}

fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
}

fn empty_partial(
&self,
_options: &Self::Options,
_input_dtype: &DType,
) -> VortexResult<Self::Partial> {
Ok(0u64)
}

fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
let val = other
.as_primitive()
.typed_value::<u64>()
.vortex_expect("row_count partial should not be null");
*partial += val;
Ok(())
}

fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
Ok(Scalar::primitive(*partial, Nullability::NonNullable))
}

fn reset(&self, partial: &mut Self::Partial) {
*partial = 0;
}

#[inline]
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
false
}

fn try_accumulate(
&self,
state: &mut Self::Partial,
batch: &ArrayRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<bool> {
*state += batch.len() as u64;
Ok(true)
}

fn accumulate(
&self,
_partial: &mut Self::Partial,
_batch: &Columnar,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
unreachable!("RowCount::try_accumulate handles all arrays")
}

fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
}

fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
self.to_scalar(partial)
}
}

#[cfg(test)]
mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::aggregate_fn::Accumulator;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::row_count::RowCount;
use crate::arrays::PrimitiveArray;

#[test]
fn row_count_all_valid() -> VortexResult<()> {
let array = buffer![1i32, 2, 3, 4, 5].into_array();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut acc = Accumulator::try_new(RowCount, EmptyOptions, array.dtype().clone())?;
acc.accumulate(&array, &mut ctx)?;
let result = acc.finish()?;
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(5));
Ok(())
}

#[test]
fn row_count_includes_nulls() -> VortexResult<()> {
let array = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None, Some(5)])
.into_array();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut acc = Accumulator::try_new(RowCount, EmptyOptions, array.dtype().clone())?;
acc.accumulate(&array, &mut ctx)?;
let result = acc.finish()?;
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(5));
Ok(())
}
}
2 changes: 2 additions & 0 deletions vortex-array/src/aggregate_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::aggregate_fn::fns::is_sorted::IsSorted;
use crate::aggregate_fn::fns::last::Last;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::nan_count::NanCount;
use crate::aggregate_fn::fns::row_count::RowCount;
use crate::aggregate_fn::fns::sum::Sum;
use crate::aggregate_fn::kernels::DynAggregateKernel;
use crate::aggregate_fn::kernels::DynGroupedAggregateKernel;
Expand Down Expand Up @@ -59,6 +60,7 @@ impl Default for AggregateFnSession {
this.register(Last);
this.register(MinMax);
this.register(NanCount);
this.register(RowCount);
this.register(Sum);

// Register the built-in aggregate kernels.
Expand Down
22 changes: 22 additions & 0 deletions vortex-array/src/expr/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
use vortex_utils::iter::ReduceBalancedIterExt;

use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTableExt;
use crate::aggregate_fn::fns::row_count::RowCount;
use crate::dtype::DType;
use crate::dtype::FieldName;
use crate::dtype::FieldNames;
Expand Down Expand Up @@ -46,6 +49,7 @@ use crate::scalar_fn::fns::pack::PackOptions;
use crate::scalar_fn::fns::root::Root;
use crate::scalar_fn::fns::select::FieldSelection;
use crate::scalar_fn::fns::select::Select;
use crate::scalar_fn::fns::stats_expression::StatsExpression;
use crate::scalar_fn::fns::zip::Zip;

// ---- Root ----
Expand Down Expand Up @@ -701,3 +705,21 @@ pub fn dynamic(
pub fn list_contains(list: Expression, value: Expression) -> Expression {
ListContains.new_expr(EmptyOptions, [list, value])
}

// ---- StatsExpression ----

/// Creates a placeholder [`StatsExpression`] wrapping the given aggregate.
///
/// The expression must be substituted before evaluation by the layer that owns the
/// evaluation scope — see [`StatsExpression`] for details.
pub fn stats_expression(agg: AggregateFnRef) -> Expression {
StatsExpression.new_expr(agg, [])
}

/// Creates a [`StatsExpression`] wrapping the [`RowCount`] aggregate.
///
/// This is the canonical way to refer to the row count of the current evaluation scope inside
/// a pruning predicate.
pub fn row_count() -> Expression {
stats_expression(RowCount.bind(crate::aggregate_fn::EmptyOptions))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stats_expression(RowCount.bind(crate::aggregate_fn::EmptyOptions))
stats_expression(RowCount.bind(EmptyOptions))

}
4 changes: 4 additions & 0 deletions vortex-array/src/expr/pruning/pruning_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub fn field_path_stat_field_name(field_path: &FieldPath, stat: Stat) -> FieldNa
/// cannot hold, and false if it cannot be determined from stats alone whether the positions can
/// be pruned.
///
/// Row-count-aware pruning (for example `is_not_null(...)`) emits
/// [`row_count`][crate::expr::row_count] placeholders that the evaluation layer must substitute
/// before executing the returned expression.
///
/// If the falsification logic attempts to access an unknown stat,
/// this function will return `None`.
pub fn checked_pruning_expr(
Expand Down
47 changes: 12 additions & 35 deletions vortex-array/src/scalar_fn/fns/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::expr::Expression;
use crate::expr::StatsCatalog;
use crate::expr::and;
use crate::expr::eq;
use crate::expr::gt;
use crate::expr::lit;
use crate::expr::row_count;
use crate::expr::stats::Stat;
use crate::scalar_fn::Arity;
use crate::scalar_fn::ChildName;
Expand Down Expand Up @@ -106,20 +104,10 @@ impl ScalarFnVTable for IsNotNull {
expr: &Expression,
catalog: &dyn StatsCatalog,
) -> Option<Expression> {
// is_not_null is falsified when ALL values are null, i.e. null_count == len.
// Since there is no len stat in the zone map, we approximate using IsConstant:
// if the zone is constant and has any nulls, then all values must be null.
//
// TODO(#7187): Add a len stat to enable the more general falsification:
// null_count == len => is_not_null is all false.
let null_count_expr = expr.child(0).stat_expression(Stat::NullCount, catalog)?;
let is_constant_expr = expr.child(0).stat_expression(Stat::IsConstant, catalog)?;
// If the zone is constant (is_constant == true) and has nulls (null_count > 0),
// then all values must be null, so is_not_null is all false.
Some(and(
eq(is_constant_expr, lit(true)),
gt(null_count_expr, lit(0u64)),
))
// is_not_null is falsified when ALL values are null, i.e. null_count == row_count.
let child = expr.child(0);
let null_count_expr = child.stat_expression(Stat::NullCount, catalog)?;
Some(eq(null_count_expr, row_count()))
}
}

Expand Down Expand Up @@ -267,38 +255,27 @@ mod tests {
use crate::dtype::Field;
use crate::dtype::FieldPath;
use crate::dtype::FieldPathSet;
use crate::expr::and;
use crate::expr::col;
use crate::expr::eq;
use crate::expr::gt;
use crate::expr::lit;
use crate::expr::pruning::checked_pruning_expr;
use crate::expr::row_count;
use crate::expr::stats::Stat;

let expr = is_not_null(col("a"));

let (pruning_expr, st) = checked_pruning_expr(
&expr,
&FieldPathSet::from_iter([
FieldPath::from_iter([Field::Name("a".into()), Field::Name("null_count".into())]),
FieldPath::from_iter([Field::Name("a".into()), Field::Name("is_constant".into())]),
]),
&FieldPathSet::from_iter([FieldPath::from_iter([
Field::Name("a".into()),
Field::Name("null_count".into()),
])]),
)
.unwrap();

assert_eq!(
&pruning_expr,
&and(
eq(col("a_is_constant"), lit(true)),
gt(col("a_null_count"), lit(0u64)),
)
);
assert_eq!(&pruning_expr, &eq(col("a_null_count"), row_count()));
assert_eq!(
st.map(),
&HashMap::from_iter([(
FieldPath::from_name("a"),
HashSet::from([Stat::NullCount, Stat::IsConstant])
)])
&HashMap::from_iter([(FieldPath::from_name("a"), HashSet::from([Stat::NullCount]))])
);
}
}
1 change: 1 addition & 0 deletions vortex-array/src/scalar_fn/fns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub mod operators;
pub mod pack;
pub mod root;
pub mod select;
pub mod stats_expression;
pub mod zip;
Loading
Loading