diff --git a/Cargo.lock b/Cargo.lock index 02da8661eedea..2b8af8f12371a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1883,6 +1883,7 @@ version = "53.1.0" dependencies = [ "arrow", "async-trait", + "chrono", "datafusion-catalog", "datafusion-common", "datafusion-datasource", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index be1374b371485..61b55397137df 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -48,6 +48,7 @@ log = { workspace = true } object_store = { workspace = true } [dev-dependencies] +chrono = { workspace = true } datafusion-datasource-parquet = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 9c30028ddd547..62257ec027c6f 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -340,17 +340,25 @@ fn filter_partitions( Ok(None) } +/// Returns `Ok(None)` when the file is not inside a valid partition path +/// (e.g. a stale file in the table root directory). Such files are skipped +/// because hive-style partition values are never null and there is no valid +/// value to assign for non-partitioned files. fn try_into_partitioned_file( object_meta: ObjectMeta, partition_cols: &[(String, DataType)], table_path: &ListingTableUrl, -) -> Result { +) -> Result> { let cols = partition_cols.iter().map(|(name, _)| name.as_str()); let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols); + let Some(parsed) = parsed else { + // parse_partitions_for_path already logs a debug message + return Ok(None); + }; + let partition_values = parsed .into_iter() - .flatten() .zip(partition_cols) .map(|(parsed, (_, datatype))| { ScalarValue::try_from_string(parsed.to_string(), datatype) @@ -360,7 +368,7 @@ fn try_into_partitioned_file( let mut pf: PartitionedFile = object_meta.into(); pf.partition_values = partition_values; - Ok(pf) + Ok(Some(pf)) } /// Discover the partitions on the given path and prune out files @@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>( )?; Ok(objects - .map_ok(|object_meta| { - try_into_partitioned_file(object_meta, partition_cols, table_path) + .try_filter_map(|object_meta| { + futures::future::ready(try_into_partitioned_file( + object_meta, + partition_cols, + table_path, + )) }) .try_filter_map(move |pf| { - futures::future::ready( - pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)), - ) + futures::future::ready(filter_partitions(pf, filters, &df_schema)) }) .boxed()) } @@ -574,6 +584,130 @@ mod tests { ); } + #[test] + fn test_try_into_partitioned_file_valid_partition() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!(result.is_some()); + let pf = result.unwrap(); + assert_eq!(pf.partition_values.len(), 1); + assert_eq!( + pf.partition_values[0], + ScalarValue::Utf8(Some("2024-01".to_string())) + ); + } + + #[test] + fn test_try_into_partitioned_file_root_file_skipped() { + // File in root directory (not inside any partition path) should be + // skipped — this is the case where a stale file exists from before + // hive partitioning was added. + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/data.parquet"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!( + result.is_none(), + "Files outside partition structure should be skipped" + ); + } + + #[test] + fn test_try_into_partitioned_file_wrong_partition_name() { + // File in a directory that doesn't match the expected partition column + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("year_month".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!( + result.is_none(), + "Files with wrong partition column name should be skipped" + ); + } + + #[test] + fn test_try_into_partitioned_file_multiple_partitions() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![ + ("year".to_string(), DataType::Utf8), + ("month".to_string(), DataType::Utf8), + ]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!(result.is_some()); + let pf = result.unwrap(); + assert_eq!(pf.partition_values.len(), 2); + assert_eq!( + pf.partition_values[0], + ScalarValue::Utf8(Some("2024".to_string())) + ); + assert_eq!( + pf.partition_values[1], + ScalarValue::Utf8(Some("01".to_string())) + ); + } + + #[test] + fn test_try_into_partitioned_file_partial_partition_skipped() { + // File has first partition but not second — should be skipped + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![ + ("year".to_string(), DataType::Utf8), + ("month".to_string(), DataType::Utf8), + ]; + let meta = ObjectMeta { + location: Path::from("bucket/mytable/year=2024/data.parquet"), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + // File has year=2024 but no month= directory — parse_partitions_for_path + // returns None because the path component "data.parquet" doesn't match + // the expected "month=..." pattern. + assert!( + result.is_none(), + "Files with incomplete partition structure should be skipped" + ); + } + #[test] fn test_expr_applicable_for_cols() { assert!(expr_applicable_for_cols( diff --git a/datafusion/sqllogictest/test_files/listing_table_partitions.slt b/datafusion/sqllogictest/test_files/listing_table_partitions.slt index 52433429cfe80..5df78b674fe8d 100644 --- a/datafusion/sqllogictest/test_files/listing_table_partitions.slt +++ b/datafusion/sqllogictest/test_files/listing_table_partitions.slt @@ -73,3 +73,70 @@ foo statement count 0 set datafusion.execution.listing_table_factory_infer_partitions = true; + +# Test: files outside partition structure are skipped +# This simulates a table that transitioned from non-partitioned to +# hive-partitioned storage, leaving a stale file in the root directory. + +# Create partitioned files first +query I +copy (values(1, 'alice'), (2, 'bob')) +to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2024/data.parquet'; +---- +2 + +query I +copy (values(3, 'charlie')) +to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2025/data.parquet'; +---- +1 + +# Create the table before adding the stale root file, so partition +# inference succeeds (it only runs at CREATE TABLE time). +statement count 0 +create external table root_file_test +stored as parquet +location 'test_files/scratch/listing_table_partitions/root_file_skipped/'; + +# Now add a stale root-level file (outside any partition directory). +# This simulates a file left over from before partitioning was added. +query I +copy (values(99, 'stale')) +to 'test_files/scratch/listing_table_partitions/root_file_skipped/stale.parquet'; +---- +1 + +# The root file should be skipped — only partitioned files are included +query IT +select column1, column2 from root_file_test order by column1; +---- +1 alice +2 bob +3 charlie + +# Partition column should be accessible +query ITT +select column1, column2, year from root_file_test order by column1; +---- +1 alice 2024 +2 bob 2024 +3 charlie 2025 + +# Partition filter should work +query ITT +select column1, column2, year from root_file_test where year = '2025'; +---- +3 charlie 2025 + +# COUNT should not include the root file's rows +query I +select count(*) from root_file_test; +---- +3 + +# GROUP BY partition column should work +query TI +select year, count(*) from root_file_test group by year order by year; +---- +2024 2 +2025 1