Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Integration tests for reading Paimon log tables (system tables).
//!
//! Paimon log tables are system tables that contain metadata about the table,
//! such as snapshots, manifests, schemas, etc. They are stored as Parquet files
//! and can be read using the Arrow reader.
//! Integration tests for reading Paimon tables provisioned by Spark.

use arrow_array::{Int32Array, StringArray};
use futures::TryStreamExt;
Expand All @@ -31,30 +27,21 @@ fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
}

/// Test reading a table and verifying the data matches expected values.
///
/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol')
#[tokio::test]
async fn test_read_log_table() {
async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");

// Get the table
let identifier = Identifier::new("default", "simple_log_table");

let identifier = Identifier::new("default", table_name);
let table = catalog
.get_table(&identifier)
.await
.expect("Failed to get table");

// Scan the table
let read_builder = table.new_read_builder();
let read = read_builder.new_read().expect("Failed to create read");
let scan = read_builder.new_scan();

let plan = scan.plan().await.expect("Failed to plan scan");

// Read to Arrow
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
Expand All @@ -66,10 +53,9 @@ async fn test_read_log_table() {

assert!(
!batches.is_empty(),
"Expected at least one batch from table"
"Expected at least one batch from table {table_name}"
);

// Collect all rows as (id, name) tuples
let mut actual_rows: Vec<(i32, String)> = Vec::new();

for batch in &batches {
Expand All @@ -87,18 +73,39 @@ async fn test_read_log_table() {
}
}

// Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol')
actual_rows.sort_by_key(|(id, _)| *id);
actual_rows
}

#[tokio::test]
async fn test_read_log_table() {
let actual_rows = read_rows("simple_log_table").await;
let expected_rows = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];

// Sort for consistent comparison
actual_rows.sort_by_key(|(id, _)| *id);

assert_eq!(
actual_rows, expected_rows,
"Rows should match expected values"
);
}

#[tokio::test]
async fn test_read_dv_primary_key_table() {
let actual_rows = read_rows("simple_dv_pk_table").await;
let expected_rows = vec![
(1, "alice-v2".to_string()),
(2, "bob-v2".to_string()),
(3, "carol-v2".to_string()),
(4, "dave-v2".to_string()),
(5, "eve-v2".to_string()),
(6, "frank-v1".to_string()),
];

assert_eq!(
actual_rows, expected_rows,
"DV-enabled PK table should only expose the latest row per key"
);
}
198 changes: 162 additions & 36 deletions crates/paimon/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
use crate::io::{FileIO, FileRead, FileStatus};
use crate::spec::DataField;
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use std::ops::Range;
use std::sync::Arc;
use tokio::try_join;
Expand All @@ -46,11 +48,13 @@ impl ArrowReaderBuilder {
}
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
/// Build the ArrowReader with the given read type (logical row type or projected subset).
/// Used to clip Parquet schema to requested columns only.
pub fn build(self, read_type: Vec<DataField>) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
read_type,
}
}
}
Expand All @@ -60,57 +64,179 @@ impl ArrowReaderBuilder {
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
read_type: Vec<DataField>,
}

impl ArrowReader {
/// Take a stream of DataSplits and read every data file in each split.
/// Returns a stream of Arrow RecordBatches from all files.
/// When a split has deletion files (see [DataSplit::data_deletion_files]), the corresponding
/// deletion vectors are loaded and applied so that deleted rows are filtered out from the stream.
/// Row positions are 0-based within each data file, matching Java's ApplyDeletionVectorReader.
///
/// Matches [RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java):
/// one DV factory per DataSplit (created from that split's data files and deletion files).
///
/// Parquet schema is clipped to this reader's read type (column names from [DataField]s).
/// File-only columns are not read. See [ParquetReaderFactory.clipParquetSchema](https://github.com/apache/paimon/blob/master/paimon-format/paimon-format-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java).
pub fn read(self, data_splits: &[DataSplit]) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let paths_to_read: Vec<String> = data_splits
.iter()
.flat_map(|ds| ds.data_file_entries().map(|(p, _)| p))
.map(|p| {
if !p.to_ascii_lowercase().ends_with(".parquet") {
Err(Error::Unsupported {
message: format!(
"unsupported file format: only .parquet is supported, got: {p}"
),
})
} else {
Ok(p)
}
})
.collect::<Result<Vec<_>, _>>()?;
// Owned list of splits so the stream does not hold references.
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
let projected_column_names = (!read_type.is_empty()).then(|| {
read_type
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
});
Ok(try_stream! {
for path_to_read in paths_to_read {
let parquet_file = file_io.new_input(&path_to_read)?;
for split in splits {
// Create DV factory for this split only (like Java createReader(partition, bucket, files, deletionFiles)).
let core_data_files = split.data_files();
let dv_factory = if split
.data_deletion_files()
.is_some_and(|files| files.iter().any(Option::is_some))
{
Some(
DeletionVectorFactory::new(
&file_io,
core_data_files,
split.data_deletion_files(),
)
.await?,
)
} else {
None
};

let (parquet_metadata, parquet_reader) = try_join!(
parquet_file.metadata(),
parquet_file.reader()
)?;
for file_meta in core_data_files {
let path_to_read = split.data_file_path(file_meta);
if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
Err(Error::Unsupported {
message: format!(
"unsupported file format: only .parquet is supported, got: {path_to_read}"
),
})?
}
let dv = dv_factory
.as_ref()
.and_then(|factory| factory.get_deletion_vector(&file_meta.file_name));

let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let parquet_file = file_io.new_input(&path_to_read)?;
let (parquet_metadata, parquet_reader) = try_join!(
parquet_file.metadata(),
parquet_file.reader()
)?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
// Clip to read type columns; file-only columns are dropped.
if let Some(projected_column_names) = projected_column_names.as_ref() {
let parquet_schema = batch_stream_builder.parquet_schema();
let mask = ProjectionMask::columns(
parquet_schema,
projected_column_names.iter().map(String::as_str),
);
batch_stream_builder = batch_stream_builder.with_projection(mask);
}

if let Some(size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(size);
if let Some(dv) = dv {
if !dv.is_empty() {
let row_selection =
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
batch_stream_builder = batch_stream_builder.with_row_selection(row_selection);
}
}
if let Some(size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(size);
}
let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
let batch = batch?;
if batch.num_rows() > 0 {
yield batch;
}
}
}
}
}
.boxed())
}
}

/// Builds a Parquet [RowSelection] from deletion vector.
/// Only rows not in the deletion vector are selected; deleted rows are skipped at read time.
/// Uses [DeletionVectorIterator] with [advance_to](DeletionVectorIterator::advance_to) when skipping row groups.
fn build_deletes_row_selection(
row_group_metadata_list: &[RowGroupMetaData],
deletion_vector: &DeletionVector,
) -> crate::Result<RowSelection> {
let mut delete_iter = deletion_vector.iter();

let mut batch_stream = batch_stream_builder.build()?;
let mut results: Vec<RowSelector> = Vec::new();
let mut current_row_group_base_idx: u64 = 0;
let mut next_deleted_row_idx_opt = delete_iter.next();

while let Some(batch) = batch_stream.next().await {
yield batch?
for row_group_metadata in row_group_metadata_list {
let row_group_num_rows = row_group_metadata.num_rows() as u64;
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;

let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => {
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
next_deleted_row_idx
}
None => {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
};

let mut current_idx = current_row_group_base_idx;
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length as usize));
current_idx += run_length;
}
let mut run_length = 0u64;
while next_deleted_row_idx == current_idx
&& next_deleted_row_idx < next_row_group_base_idx
{
run_length += 1;
current_idx += 1;
next_deleted_row_idx_opt = delete_iter.next();
next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(v) => v,
None => {
results.push(RowSelector::skip(run_length as usize));
break 'chunks;
}
};
}
if run_length > 0 {
results.push(RowSelector::skip(run_length as usize));
}
}
.boxed())
if current_idx < next_row_group_base_idx {
results.push(RowSelector::select(
(next_row_group_base_idx - current_idx) as usize,
));
}
current_row_group_base_idx += row_group_num_rows;
}

Ok(results.into())
}

/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
Expand Down
Loading
Loading