Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,14 +692,12 @@ mod test {
let metadata = reader.metadata();

let table_schema =
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema");
Arc::new(parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema"));

let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);

let table_schema = Arc::new(table_schema.clone());

let list_index = table_schema
.index_of("int64_list")
.expect("list column should exist");
Expand All @@ -725,11 +723,11 @@ mod test {

// This is the schema we would like to coerce to,
// which is different from the physical schema of the file.
let table_schema = Schema::new(vec![Field::new(
let table_schema = Arc::new(Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);
)]));

// Test all should fail
let expr = col("timestamp_col").lt(Expr::Literal(
Expand All @@ -738,7 +736,7 @@ mod test {
));
let expr = logical2physical(&expr, &table_schema);
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema.clone())
Expand Down Expand Up @@ -777,7 +775,7 @@ mod test {
let expr = logical2physical(&expr, &table_schema);
// Rewrite the expression to add CastExpr for type coercion
let expr = DefaultPhysicalExprAdapterFactory {}
.create(Arc::new(table_schema), Arc::clone(&file_schema))
.create(Arc::clone(&table_schema), Arc::clone(&file_schema))
.rewrite(expr)
.expect("rewriting expression");
let candidate = FilterCandidateBuilder::new(expr, file_schema)
Expand Down Expand Up @@ -807,6 +805,7 @@ mod test {
),
true,
)]));
let table_schema: SchemaRef = table_schema;

let expr = col("struct_col").is_not_null();
let expr = logical2physical(&expr, &table_schema);
Expand All @@ -829,6 +828,7 @@ mod test {
),
Field::new("int_col", DataType::Int32, false),
]));
let table_schema: SchemaRef = table_schema;

// Expression: (struct_col IS NOT NULL) AND (int_col = 5)
// Even though int_col is primitive, the presence of struct_col in the
Expand All @@ -853,7 +853,7 @@ mod test {

#[test]
fn nested_lists_allow_pushdown_checks() {
let table_schema = Arc::new(get_lists_table_schema());
let table_schema = get_lists_table_schema();

let expr = col("utf8_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
Expand Down Expand Up @@ -1060,7 +1060,7 @@ mod test {
assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
}

fn get_basic_table_schema() -> Schema {
fn get_basic_table_schema() -> SchemaRef {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");
Expand All @@ -1069,11 +1069,13 @@ mod test {

let metadata = reader.metadata();

parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
Arc::new(
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema"),
)
}

fn get_lists_table_schema() -> Schema {
fn get_lists_table_schema() -> SchemaRef {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
.expect("opening file");
Expand All @@ -1082,8 +1084,10 @@ mod test {

let metadata = reader.metadata();

parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
Arc::new(
parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema"),
)
}

/// Sanity check that the given expression could be evaluated against the given schema without any errors.
Expand Down
7 changes: 4 additions & 3 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ mod tests {
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let schema = Arc::new(Schema::new(vec![Field::new("String", DataType::Utf8, false)]));

let expr = col(r#""String""#).in_list(
(1..25)
Expand All @@ -1508,7 +1508,7 @@ mod tests {
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();

let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
Expand Down Expand Up @@ -1729,9 +1729,10 @@ mod tests {
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

let schema = Arc::new(schema);
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();

let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
&file_name,
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use crate::{
expressions::{self, Column, Literal, binary, like, similar_to},
};

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::metadata::FieldMetadata;
use datafusion_common::{
DFSchema, Result, ScalarValue, ToDFSchema, exec_err, not_impl_err, plan_err,
DFSchema, Result, ScalarValue, exec_err, not_impl_err, plan_err,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
Expand Down Expand Up @@ -406,17 +406,17 @@ where
}

/// Convert a logical expression to a physical expression (without any simplification, etc)
pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
// TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy
let df_schema = schema.clone().to_dfschema().unwrap();
pub fn logical2physical(expr: &Expr, schema: &SchemaRef) -> Arc<dyn PhysicalExpr> {
// This avoids a deep copy of the Schema by taking a SchemaRef (Arc<Schema>)
let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}

#[cfg(test)]
mod tests {
use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{DataType, Field, Schema};

use datafusion_expr::{Operator, col, lit};

Expand Down
21 changes: 12 additions & 9 deletions datafusion/pruning/src/pruning_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4784,8 +4784,8 @@ mod tests {

#[test]
fn test_rewrite_expr_to_prunable() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();

// column op lit
let left_input = col("a");
Expand Down Expand Up @@ -4853,6 +4853,8 @@ mod tests {
let rewriter = PredicateRewriter::new()
.with_unhandled_hook(Arc::new(CustomUnhandledHook {}));

let schema = Arc::new(schema);
let schema_with_b = Arc::new(schema_with_b);
let transform_expr = |expr| {
let expr = logical2physical(&expr, &schema_with_b);
rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
Expand Down Expand Up @@ -4903,8 +4905,8 @@ mod tests {
fn test_rewrite_expr_to_prunable_error() {
// cast string value to numeric value
// this cast is not supported
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
let left_input = cast(col("a"), DataType::Int64);
let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
Expand Down Expand Up @@ -5401,11 +5403,12 @@ mod tests {
schema: &Schema,
required_columns: &mut RequiredColumns,
) -> Arc<dyn PhysicalExpr> {
let expr = logical2physical(expr, schema);
let schema_ref = Arc::new(schema.clone());
let expr = logical2physical(expr, &schema_ref);
let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
build_predicate_expression(
&expr,
&Arc::new(schema.clone()),
&schema_ref,
required_columns,
&unhandled_hook,
)
Expand All @@ -5414,7 +5417,7 @@ mod tests {
#[test]
fn test_build_predicate_expression_with_false() {
let expr = lit(ScalarValue::Boolean(Some(false)));
let schema = Schema::empty();
let schema = Arc::new(Schema::empty());
let res =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
let expected = logical2physical(&expr, &schema);
Expand All @@ -5423,7 +5426,7 @@ mod tests {

#[test]
fn test_build_predicate_expression_with_and_false() {
let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]));
let expr = and(
col("c1").eq(lit("a")),
lit(ScalarValue::Boolean(Some(false))),
Expand All @@ -5436,7 +5439,7 @@ mod tests {

#[test]
fn test_build_predicate_expression_with_or_false() {
let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]));
let left_expr = col("c1").eq(lit("a"));
let right_expr = lit(ScalarValue::Boolean(Some(false)));
let res = test_build_predicate_expression(
Expand Down
Loading