diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bb330c3f4caa1..b960d8e6a03f0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -48,7 +48,7 @@ use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, + BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; @@ -135,6 +135,7 @@ impl FileOpener for ParquetOpener { let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); + let baseline_metrics = BaselineMetrics::new(&self.metrics, self.partition_index); let metadata_size_hint = partitioned_file .metadata_size_hint @@ -605,6 +606,7 @@ impl FileOpener for ParquetOpener { arrow_reader_metrics, predicate_cache_inner_records, predicate_cache_records, + baseline_metrics, }, |mut state| async move { let result = state.transition().await; @@ -646,6 +648,7 @@ struct PushDecoderStreamState { arrow_reader_metrics: ArrowReaderMetrics, predicate_cache_inner_records: Gauge, predicate_cache_records: Gauge, + baseline_metrics: BaselineMetrics, } impl PushDecoderStreamState { @@ -671,8 +674,11 @@ impl PushDecoderStreamState { } } Ok(DecodeResult::Data(batch)) => { + let mut timer = self.baseline_metrics.elapsed_compute().timer(); self.copy_arrow_reader_metrics(); - return Some(self.project_batch(&batch)); + let result = self.project_batch(&batch); + timer.stop(); + return Some(result); } Ok(DecodeResult::Finished) => { return None; diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 5a1058b8a203f..b0d9fceeff604 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -104,7 +104,7 @@ Plan with Metrics 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, ] 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, , selectivity=100% (10/10)] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, ] -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=1ns, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18% (210/1.15 K)] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18% (210/1.15 K)] statement ok set datafusion.explain.analyze_level = dev;