Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ log = { workspace = true }
object_store = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
datafusion-datasource-parquet = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
150 changes: 142 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,25 @@ fn filter_partitions(
Ok(None)
}

/// Returns `Ok(None)` when the file is not inside a valid partition path
/// (e.g. a stale file in the table root directory). Such files are skipped
/// because hive-style partition values are never null and there is no valid
/// value to assign for non-partitioned files.
fn try_into_partitioned_file(
object_meta: ObjectMeta,
partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
) -> Result<PartitionedFile> {
) -> Result<Option<PartitionedFile>> {
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);

let Some(parsed) = parsed else {
// parse_partitions_for_path already logs a debug message
return Ok(None);
};

let partition_values = parsed
.into_iter()
.flatten()
.zip(partition_cols)
.map(|(parsed, (_, datatype))| {
ScalarValue::try_from_string(parsed.to_string(), datatype)
Expand All @@ -360,7 +368,7 @@ fn try_into_partitioned_file(
let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;

Ok(pf)
Ok(Some(pf))
}

/// Discover the partitions on the given path and prune out files
Expand Down Expand Up @@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>(
)?;

Ok(objects
.map_ok(|object_meta| {
try_into_partitioned_file(object_meta, partition_cols, table_path)
.try_filter_map(|object_meta| {
futures::future::ready(try_into_partitioned_file(
object_meta,
partition_cols,
table_path,
))
})
.try_filter_map(move |pf| {
futures::future::ready(
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
)
futures::future::ready(filter_partitions(pf, filters, &df_schema))
})
.boxed())
}
Expand Down Expand Up @@ -574,6 +584,130 @@ mod tests {
);
}

#[test]
fn test_try_into_partitioned_file_valid_partition() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 1);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024-01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_root_file_skipped() {
// File in root directory (not inside any partition path) should be
// skipped — this is the case where a stale file exists from before
// hive partitioning was added.
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/data.parquet"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files outside partition structure should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_wrong_partition_name() {
// File in a directory that doesn't match the expected partition column
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files with wrong partition column name should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_multiple_partitions() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 2);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024".to_string()))
);
assert_eq!(
pf.partition_values[1],
ScalarValue::Utf8(Some("01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_partial_partition_skipped() {
// File has first partition but not second — should be skipped
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/data.parquet"),
last_modified: chrono::Utc::now(),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
// File has year=2024 but no month= directory — parse_partitions_for_path
// returns None because the path component "data.parquet" doesn't match
// the expected "month=..." pattern.
assert!(
result.is_none(),
"Files with incomplete partition structure should be skipped"
);
}

#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
Expand Down
67 changes: 67 additions & 0 deletions datafusion/sqllogictest/test_files/listing_table_partitions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,70 @@ foo

statement count 0
set datafusion.execution.listing_table_factory_infer_partitions = true;

# Test: files outside partition structure are skipped
Comment thread
zhuqi-lucas marked this conversation as resolved.
# This simulates a table that transitioned from non-partitioned to
# hive-partitioned storage, leaving a stale file in the root directory.

# Create partitioned files first
query I
copy (values(1, 'alice'), (2, 'bob'))
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2024/data.parquet';
----
2

query I
copy (values(3, 'charlie'))
to 'test_files/scratch/listing_table_partitions/root_file_skipped/year=2025/data.parquet';
----
1

# Create the table before adding the stale root file, so partition
# inference succeeds (it only runs at CREATE TABLE time).
statement count 0
create external table root_file_test
stored as parquet
location 'test_files/scratch/listing_table_partitions/root_file_skipped/';

# Now add a stale root-level file (outside any partition directory).
# This simulates a file left over from before partitioning was added.
query I
copy (values(99, 'stale'))
to 'test_files/scratch/listing_table_partitions/root_file_skipped/stale.parquet';
----
1

# The root file should be skipped — only partitioned files are included
query IT
select column1, column2 from root_file_test order by column1;
----
1 alice
2 bob
3 charlie

# Partition column should be accessible
query ITT
select column1, column2, year from root_file_test order by column1;
----
1 alice 2024
2 bob 2024
3 charlie 2025

# Partition filter should work
query ITT
select column1, column2, year from root_file_test where year = '2025';
----
3 charlie 2025

# COUNT should not include the root file's rows
query I
select count(*) from root_file_test;
----
3

# GROUP BY partition column should work
query TI
select year, count(*) from root_file_test group by year order by year;
----
2024 2
2025 1
Loading