diff --git a/crates/integration_tests/tests/read_log_tables.rs b/crates/integration_tests/tests/read_tables.rs similarity index 77% rename from crates/integration_tests/tests/read_log_tables.rs rename to crates/integration_tests/tests/read_tables.rs index e7b42f7..69a05f7 100644 --- a/crates/integration_tests/tests/read_log_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Integration tests for reading Paimon log tables (system tables). -//! -//! Paimon log tables are system tables that contain metadata about the table, -//! such as snapshots, manifests, schemas, etc. They are stored as Parquet files -//! and can be read using the Arrow reader. +//! Integration tests for reading Paimon tables provisioned by Spark. use arrow_array::{Int32Array, StringArray}; use futures::TryStreamExt; @@ -31,30 +27,21 @@ fn get_test_warehouse() -> String { std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) } -/// Test reading a table and verifying the data matches expected values. -/// -/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol') -#[tokio::test] -async fn test_read_log_table() { +async fn read_rows(table_name: &str) -> Vec<(i32, String)> { let warehouse = get_test_warehouse(); let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); - // Get the table - let identifier = Identifier::new("default", "simple_log_table"); - + let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) .await .expect("Failed to get table"); - // Scan the table let read_builder = table.new_read_builder(); let read = read_builder.new_read().expect("Failed to create read"); let scan = read_builder.new_scan(); - let plan = scan.plan().await.expect("Failed to plan scan"); - // Read to Arrow let stream = read .to_arrow(plan.splits()) .expect("Failed to create arrow stream"); @@ -66,10 +53,9 @@ async fn test_read_log_table() { assert!( !batches.is_empty(), - "Expected at least one batch from table" + "Expected at least one batch from table {table_name}" ); - // Collect all rows as (id, name) tuples let mut actual_rows: Vec<(i32, String)> = Vec::new(); for batch in &batches { @@ -87,18 +73,39 @@ async fn test_read_log_table() { } } - // Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol') + actual_rows.sort_by_key(|(id, _)| *id); + actual_rows +} + +#[tokio::test] +async fn test_read_log_table() { + let actual_rows = read_rows("simple_log_table").await; let expected_rows = vec![ (1, "alice".to_string()), (2, "bob".to_string()), (3, "carol".to_string()), ]; - // Sort for consistent comparison - actual_rows.sort_by_key(|(id, _)| *id); - assert_eq!( actual_rows, expected_rows, "Rows should match expected values" ); } + +#[tokio::test] +async fn test_read_dv_primary_key_table() { + let actual_rows = read_rows("simple_dv_pk_table").await; + let expected_rows = vec![ + (1, "alice-v2".to_string()), + (2, "bob-v2".to_string()), + (3, "carol-v2".to_string()), + (4, "dave-v2".to_string()), + (5, "eve-v2".to_string()), + (6, "frank-v1".to_string()), + ]; + + assert_eq!( + actual_rows, expected_rows, + "DV-enabled PK table should only expose the latest row per key" + ); +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 209874f..7690e92 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -15,18 +15,20 @@ // specific language governing permissions and limitations // under the License. +use crate::deletion_vector::{DeletionVector, DeletionVectorFactory}; use crate::io::{FileIO, FileRead, FileStatus}; +use crate::spec::DataField; use crate::table::ArrowRecordBatchStream; use crate::{DataSplit, Error}; use async_stream::try_stream; use bytes::Bytes; use futures::future::BoxFuture; use futures::{StreamExt, TryFutureExt}; -use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch}; -use parquet::arrow::ParquetRecordBatchStreamBuilder; -use parquet::file::metadata::ParquetMetaData; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::ParquetMetaDataReader; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use std::ops::Range; use std::sync::Arc; use tokio::try_join; @@ -46,11 +48,13 @@ impl ArrowReaderBuilder { } } - /// Build the ArrowReader. - pub fn build(self) -> ArrowReader { + /// Build the ArrowReader with the given read type (logical row type or projected subset). + /// Used to clip Parquet schema to requested columns only. + pub fn build(self, read_type: Vec) -> ArrowReader { ArrowReader { batch_size: self.batch_size, file_io: self.file_io, + read_type, } } } @@ -60,57 +64,176 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, + read_type: Vec, } impl ArrowReader { /// Take a stream of DataSplits and read every data file in each split. /// Returns a stream of Arrow RecordBatches from all files. + /// When a split has deletion files (see [DataSplit::data_deletion_files]), the corresponding + /// deletion vectors are loaded and applied so that deleted rows are filtered out from the stream. + /// Row positions are 0-based within each data file, matching Java's ApplyDeletionVectorReader. + /// + /// Matches [RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java): + /// one DV factory per DataSplit (created from that split's data files and deletion files). + /// + /// Parquet schema is clipped to this reader's read type (column names from [DataField]s). + /// File-only columns are not read. See [ParquetReaderFactory.clipParquetSchema](https://github.com/apache/paimon/blob/master/paimon-format/paimon-format-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java). pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { let file_io = self.file_io.clone(); let batch_size = self.batch_size; - let paths_to_read: Vec = data_splits - .iter() - .flat_map(|ds| ds.data_file_entries().map(|(p, _)| p)) - .map(|p| { - if !p.to_ascii_lowercase().ends_with(".parquet") { - Err(Error::Unsupported { - message: format!( - "unsupported file format: only .parquet is supported, got: {p}" - ), - }) - } else { - Ok(p) - } - }) - .collect::, _>>()?; + // Owned list of splits so the stream does not hold references. + let splits: Vec = data_splits.to_vec(); + let read_type = self.read_type; + let projected_column_names = (!read_type.is_empty()).then(|| { + read_type + .iter() + .map(|field| field.name().to_string()) + .collect::>() + }); Ok(try_stream! { - for path_to_read in paths_to_read { - let parquet_file = file_io.new_input(&path_to_read)?; + for split in splits { + // Create DV factory for this split only (like Java createReader(partition, bucket, files, deletionFiles)). + let core_data_files = split.data_files(); + let dv_factory = if split + .data_deletion_files() + .is_some_and(|files| files.iter().any(Option::is_some)) + { + Some( + DeletionVectorFactory::new( + &file_io, + core_data_files, + split.data_deletion_files(), + ) + .await?, + ) + } else { + None + }; - let (parquet_metadata, parquet_reader) = try_join!( - parquet_file.metadata(), - parquet_file.reader() - )?; + for file_meta in core_data_files { + let path_to_read = split.data_file_path(file_meta); + if !path_to_read.to_ascii_lowercase().ends_with(".parquet") { + Err(Error::Unsupported { + message: format!( + "unsupported file format: only .parquet is supported, got: {path_to_read}" + ), + })? + } + let dv = dv_factory + .as_ref() + .and_then(|factory| factory.get_deletion_vector(&file_meta.file_name)); - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + let parquet_file = file_io.new_input(&path_to_read)?; + let (parquet_metadata, parquet_reader) = try_join!( + parquet_file.metadata(), + parquet_file.reader() + )?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - let mut batch_stream_builder = - ParquetRecordBatchStreamBuilder::new(arrow_file_reader) - .await?; + let mut batch_stream_builder = + ParquetRecordBatchStreamBuilder::new(arrow_file_reader) + .await?; + // Clip to read type columns; file-only columns are dropped. + if let Some(projected_column_names) = projected_column_names.as_ref() { + let parquet_schema = batch_stream_builder.parquet_schema(); + let mask = ProjectionMask::columns( + parquet_schema, + projected_column_names.iter().map(String::as_str), + ); + batch_stream_builder = batch_stream_builder.with_projection(mask); + } - if let Some(size) = batch_size { - batch_stream_builder = batch_stream_builder.with_batch_size(size); + if let Some(dv) = dv { + if !dv.is_empty() { + let row_selection = + build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?; + batch_stream_builder = batch_stream_builder.with_row_selection(row_selection); + } + } + if let Some(size) = batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(size); + } + let mut batch_stream = batch_stream_builder.build()?; + + while let Some(batch) = batch_stream.next().await { + yield batch? + } } + } + } + .boxed()) + } +} + +/// Builds a Parquet [RowSelection] from deletion vector. +/// Only rows not in the deletion vector are selected; deleted rows are skipped at read time. +/// todo: Uses [DeletionVectorIterator] with [advance_to](DeletionVectorIterator::advance_to) when skipping row groups similar to iceberg-rust +fn build_deletes_row_selection( + row_group_metadata_list: &[RowGroupMetaData], + deletion_vector: &DeletionVector, +) -> crate::Result { + let mut delete_iter = deletion_vector.iter(); - let mut batch_stream = batch_stream_builder.build()?; + let mut results: Vec = Vec::new(); + let mut current_row_group_base_idx: u64 = 0; + let mut next_deleted_row_idx_opt = delete_iter.next(); - while let Some(batch) = batch_stream.next().await { - yield batch? + for row_group_metadata in row_group_metadata_list { + let row_group_num_rows = row_group_metadata.num_rows() as u64; + let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows; + + let mut next_deleted_row_idx = match next_deleted_row_idx_opt { + Some(next_deleted_row_idx) => { + if next_deleted_row_idx >= next_row_group_base_idx { + results.push(RowSelector::select(row_group_num_rows as usize)); + current_row_group_base_idx += row_group_num_rows; + continue; } + next_deleted_row_idx + } + None => { + results.push(RowSelector::select(row_group_num_rows as usize)); + current_row_group_base_idx += row_group_num_rows; + continue; + } + }; + + let mut current_idx = current_row_group_base_idx; + 'chunks: while next_deleted_row_idx < next_row_group_base_idx { + if current_idx < next_deleted_row_idx { + let run_length = next_deleted_row_idx - current_idx; + results.push(RowSelector::select(run_length as usize)); + current_idx += run_length; + } + let mut run_length = 0u64; + while next_deleted_row_idx == current_idx + && next_deleted_row_idx < next_row_group_base_idx + { + run_length += 1; + current_idx += 1; + next_deleted_row_idx_opt = delete_iter.next(); + next_deleted_row_idx = match next_deleted_row_idx_opt { + Some(v) => v, + None => { + results.push(RowSelector::skip(run_length as usize)); + break 'chunks; + } + }; + } + if run_length > 0 { + results.push(RowSelector::skip(run_length as usize)); } } - .boxed()) + if current_idx < next_row_group_base_idx { + results.push(RowSelector::select( + (next_row_group_base_idx - current_idx) as usize, + )); + } + current_row_group_base_idx += row_group_num_rows; } + + Ok(results.into()) } /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. diff --git a/crates/paimon/src/deletion_vector/core.rs b/crates/paimon/src/deletion_vector/core.rs index 94a3c18..6940608 100644 --- a/crates/paimon/src/deletion_vector/core.rs +++ b/crates/paimon/src/deletion_vector/core.rs @@ -62,6 +62,18 @@ impl DeletionVector { Ok(self.bitmap.contains(row_position as u32)) } + /// Returns an iterator over deleted positions that supports [DeletionVectorIterator::advance_to]. + /// Required for efficient row selection building when skipping row groups (avoid re-scanning + /// deletes in skipped ranges). + /// + /// Ideally we would wrap `roaring::RoaringBitmap::iter()` directly, but that iterator does not + /// expose `advance_to`. There is a PR open on roaring to add this + /// (); once merged we can simplify + /// by delegating `advance_to` to the underlying iterator. + pub fn iter(&self) -> DeletionVectorIterator { + DeletionVectorIterator::new(self.bitmap.iter().map(u64::from).collect()) + } + /// Get the number of deleted rows (cardinality) pub fn deleted_count(&self) -> u64 { self.bitmap.len() @@ -123,7 +135,6 @@ impl DeletionVector { // Read bitmap data (bitmapLength - 4 bytes, since magic is already included in bitmapLength) let bitmap_data_size = bitmap_length - MAGIC_NUMBER_SIZE_BYTES; - // 4(bitmap_length) + 4(magic_number) + bitmap_data_size + 4(crc) if bytes.len() < 8 + bitmap_data_size + 4 { return Err(crate::Error::DataInvalid { @@ -159,6 +170,40 @@ impl Default for DeletionVector { } } +/// Iterator over deleted row positions with [advance_to](DeletionVectorIterator::advance_to) support. +/// +/// See [DeletionVector::iter] for why we use an internal sorted vec instead of wrapping +/// `roaring::RoaringBitmap::iter()` (which does not provide `advance_to`). +#[derive(Debug)] +pub struct DeletionVectorIterator { + /// Sorted deleted positions (from bitmap.iter()). + positions: Vec, + cursor: usize, +} + +impl DeletionVectorIterator { + pub(crate) fn new(positions: Vec) -> Self { + Self { + positions, + cursor: 0, + } + } +} + +impl Iterator for DeletionVectorIterator { + type Item = u64; + + fn next(&mut self) -> Option { + if self.cursor < self.positions.len() { + let v = self.positions[self.cursor]; + self.cursor += 1; + Some(v) + } else { + None + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/deletion_vector/factory.rs b/crates/paimon/src/deletion_vector/factory.rs index 2537e82..e0a2d3b 100644 --- a/crates/paimon/src/deletion_vector/factory.rs +++ b/crates/paimon/src/deletion_vector/factory.rs @@ -19,6 +19,7 @@ use crate::deletion_vector::core::DeletionVector; use crate::io::{FileIO, FileRead}; +use crate::spec::DataFileMeta; use crate::Result; use std::collections::HashMap; use std::sync::Arc; @@ -39,23 +40,27 @@ impl DeletionVectorFactory { /// has a DeletionFile, reads path/offset/length and loads the DV. pub async fn new( file_io: &FileIO, - entries: Vec<(String, Option)>, + data_files: &[DataFileMeta], + data_deletion_files: Option<&[Option]>, ) -> Result { let mut deletion_vectors = HashMap::new(); - for (data_file_name, opt_df) in entries { - let df = match &opt_df { - Some(d) => d, - _ => continue, + let Some(data_deletion_files) = data_deletion_files else { + return Ok(DeletionVectorFactory { deletion_vectors }); + }; + + for (data_file, opt_df) in data_files.iter().zip(data_deletion_files.iter()) { + let Some(df) = opt_df.as_ref() else { + continue; }; let dv = Self::read(file_io, df).await?; - deletion_vectors.insert(data_file_name, Arc::new(dv)); + deletion_vectors.insert(data_file.file_name.clone(), Arc::new(dv)); } Ok(DeletionVectorFactory { deletion_vectors }) } - /// Get the deletion vector for a specific data file - pub fn get_deletion_vector(&self, data_file_name: &str) -> Option> { - self.deletion_vectors.get(data_file_name).cloned() + /// Get the deletion vector for a specific data file. + pub fn get_deletion_vector(&self, data_file_name: &str) -> Option<&Arc> { + self.deletion_vectors.get(data_file_name) } /// Read a single DeletionVector from storage using DeletionFile (path/offset/length). @@ -65,7 +70,10 @@ impl DeletionVectorFactory { let reader = input.reader().await?; let offset = df.offset() as u64; let len = df.length() as u64; - let bytes = reader.read(offset..offset.saturating_add(len)).await?; + let bytes = reader + // 4 bytes for bitmap length, 4 bytes for magic number + .read(offset..offset.saturating_add(len).saturating_add(8)) + .await?; DeletionVector::read_from_bytes(&bytes, Some(len)) } } diff --git a/crates/paimon/src/deletion_vector/mod.rs b/crates/paimon/src/deletion_vector/mod.rs index 4f7005d..4554987 100644 --- a/crates/paimon/src/deletion_vector/mod.rs +++ b/crates/paimon/src/deletion_vector/mod.rs @@ -16,4 +16,8 @@ // under the License. mod core; + +pub use core::DeletionVector; mod factory; + +pub use factory::DeletionVectorFactory; diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs new file mode 100644 index 0000000..484ab57 --- /dev/null +++ b/crates/paimon/src/spec/core_options.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled"; +const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size"; +const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost"; +const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; +const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; + +/// Typed accessors for common table options. +/// +/// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight. +#[derive(Debug, Clone, Copy)] +pub struct CoreOptions<'a> { + options: &'a HashMap, +} + +impl<'a> CoreOptions<'a> { + pub fn new(options: &'a HashMap) -> Self { + Self { options } + } + + pub fn deletion_vectors_enabled(&self) -> bool { + self.options + .get(DELETION_VECTORS_ENABLED_OPTION) + .map(|value| matches!(value.to_ascii_lowercase().as_str(), "true")) + .unwrap_or(false) + } + + pub fn source_split_target_size(&self) -> i64 { + self.options + .get(SOURCE_SPLIT_TARGET_SIZE_OPTION) + .and_then(|value| parse_memory_size(value)) + .unwrap_or(DEFAULT_SOURCE_SPLIT_TARGET_SIZE) + } + + pub fn source_split_open_file_cost(&self) -> i64 { + self.options + .get(SOURCE_SPLIT_OPEN_FILE_COST_OPTION) + .and_then(|value| parse_memory_size(value)) + .unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST) + } +} + +/// Parse a memory size string to bytes using binary (1024-based) semantics. +/// +/// Supports formats like `128 mb`, `128mb`, `4 gb`, `1024` (plain bytes). +/// Uses binary units: `kb` = 1024, `mb` = 1024², `gb` = 1024³, matching Java Paimon's `MemorySize`. +/// +/// NOTE: Java Paimon's `MemorySize` also accepts long unit names such as `bytes`, +/// `kibibytes`, `mebibytes`, `gibibytes`, and `tebibytes`. This implementation +/// only supports short units (`b`, `kb`, `mb`, `gb`, `tb`), which covers all practical usage. +fn parse_memory_size(value: &str) -> Option { + let value = value.trim(); + if value.is_empty() { + return None; + } + + let pos = value + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(value.len()); + let (num_str, unit_str) = value.split_at(pos); + let num: i64 = num_str.trim().parse().ok()?; + let multiplier = match unit_str.trim().to_ascii_lowercase().as_str() { + "" | "b" => 1, + "kb" | "k" => 1024, + "mb" | "m" => 1024 * 1024, + "gb" | "g" => 1024 * 1024 * 1024, + "tb" | "t" => 1024 * 1024 * 1024 * 1024, + _ => return None, + }; + Some(num * multiplier) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_source_split_defaults() { + let options = HashMap::new(); + let core_options = CoreOptions::new(&options); + + assert_eq!(core_options.source_split_target_size(), 128 * 1024 * 1024); + assert_eq!(core_options.source_split_open_file_cost(), 4 * 1024 * 1024); + } + + #[test] + fn test_source_split_custom_values() { + let options = HashMap::from([ + ( + SOURCE_SPLIT_TARGET_SIZE_OPTION.to_string(), + "256 mb".to_string(), + ), + ( + SOURCE_SPLIT_OPEN_FILE_COST_OPTION.to_string(), + "8 mb".to_string(), + ), + ]); + let core_options = CoreOptions::new(&options); + + assert_eq!(core_options.source_split_target_size(), 256 * 1024 * 1024); + assert_eq!(core_options.source_split_open_file_cost(), 8 * 1024 * 1024); + } + + #[test] + fn test_parse_memory_size() { + assert_eq!(parse_memory_size("1024"), Some(1024)); + assert_eq!(parse_memory_size("128 mb"), Some(128 * 1024 * 1024)); + assert_eq!(parse_memory_size("128mb"), Some(128 * 1024 * 1024)); + assert_eq!(parse_memory_size("4MB"), Some(4 * 1024 * 1024)); + assert_eq!(parse_memory_size("1 gb"), Some(1024 * 1024 * 1024)); + assert_eq!(parse_memory_size("1024 kb"), Some(1024 * 1024)); + assert_eq!(parse_memory_size("100 b"), Some(100)); + assert_eq!(parse_memory_size(""), None); + assert_eq!(parse_memory_size("abc"), None); + } +} diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 1ea99fa..8931f8c 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -226,7 +226,7 @@ impl BinaryRow { /// Metadata of a data file. /// /// Impl References: -#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Eq, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DataFileMeta { #[serde(rename = "_FILE_NAME")] diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs index b86d7eb..397c7f4 100644 --- a/crates/paimon/src/spec/manifest_entry.rs +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -84,7 +84,7 @@ impl ManifestEntry { &self.file.max_key } - fn identifier(&self) -> Identifier { + pub(crate) fn identifier(&self) -> Identifier { Identifier { partition: self.partition.clone(), bucket: self.bucket, diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index b437d30..358e10b 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -22,6 +22,9 @@ mod data_file; pub use data_file::*; +mod core_options; +pub use core_options::*; + mod schema; pub use schema::*; diff --git a/crates/paimon/src/table/bin_pack.rs b/crates/paimon/src/table/bin_pack.rs index 885c961..2d64e6b 100644 --- a/crates/paimon/src/table/bin_pack.rs +++ b/crates/paimon/src/table/bin_pack.rs @@ -21,20 +21,6 @@ use crate::spec::DataFileMeta; use std::cmp; -use std::collections::HashMap; - -/// Option key for target size of a source split when scanning a bucket. -pub const SOURCE_SPLIT_TARGET_SIZE: &str = "source.split.target-size"; - -/// Option key for open file cost of a source file, used as the minimum weight per file -/// to avoid reading too many files with a source split. -pub const SOURCE_SPLIT_OPEN_FILE_COST: &str = "source.split.open-file-cost"; - -/// Default target split size: 128 MB. -const DEFAULT_TARGET_SPLIT_SIZE: i64 = 128 * 1024 * 1024; - -/// Default open file cost: 4 MB. -const DEFAULT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; /// Greedy order-preserving bin packing. /// @@ -87,47 +73,6 @@ pub fn split_for_batch( pack_for_ordered(files, weight_func, target_split_size) } -/// Read split configuration from table options, returning `(target_split_size, open_file_cost)`. -pub fn read_split_config(options: &HashMap) -> (i64, i64) { - let target_size = options - .get(SOURCE_SPLIT_TARGET_SIZE) - .and_then(|v| parse_memory_size(v)) - .unwrap_or(DEFAULT_TARGET_SPLIT_SIZE); - let open_cost = options - .get(SOURCE_SPLIT_OPEN_FILE_COST) - .and_then(|v| parse_memory_size(v)) - .unwrap_or(DEFAULT_OPEN_FILE_COST); - (target_size, open_cost) -} - -/// Parse a memory size string to bytes using binary (1024-based) semantics. -/// -/// Supports formats like `"128 mb"`, `"128mb"`, `"4 gb"`, `"1024"` (plain bytes). -/// Uses binary units: `kb` = 1024, `mb` = 1024², `gb` = 1024³, matching Java Paimon's `MemorySize`. -/// -/// NOTE: Java Paimon's `MemorySize` also accepts long unit names such as `"bytes"`, -/// `"kibibytes"`, `"mebibytes"`, `"gibibytes"`, and `"tebibytes"`. This implementation -/// only supports short units (`b`, `kb`, `mb`, `gb`, `tb`), which covers all practical usage. -fn parse_memory_size(s: &str) -> Option { - let s = s.trim(); - if s.is_empty() { - return None; - } - // Split into numeric part and optional unit suffix. - let pos = s.find(|c: char| !c.is_ascii_digit()).unwrap_or(s.len()); - let (num_str, unit_str) = s.split_at(pos); - let num: i64 = num_str.trim().parse().ok()?; - let multiplier: i64 = match unit_str.trim().to_lowercase().as_str() { - "" | "b" => 1, - "kb" | "k" => 1024, - "mb" | "m" => 1024 * 1024, - "gb" | "g" => 1024 * 1024 * 1024, - "tb" | "t" => 1024 * 1024 * 1024 * 1024, - _ => return None, - }; - Some(num * multiplier) -} - #[cfg(test)] mod tests { use super::*; @@ -238,35 +183,4 @@ mod tests { let groups = split_for_batch(vec![], 128, 4); assert!(groups.is_empty()); } - - #[test] - fn test_parse_memory_size() { - assert_eq!(parse_memory_size("1024"), Some(1024)); - assert_eq!(parse_memory_size("128 mb"), Some(128 * 1024 * 1024)); - assert_eq!(parse_memory_size("128mb"), Some(128 * 1024 * 1024)); - assert_eq!(parse_memory_size("4MB"), Some(4 * 1024 * 1024)); - assert_eq!(parse_memory_size("1 gb"), Some(1024 * 1024 * 1024)); - assert_eq!(parse_memory_size("1024 kb"), Some(1024 * 1024)); - assert_eq!(parse_memory_size("100 b"), Some(100)); - assert_eq!(parse_memory_size(""), None); - assert_eq!(parse_memory_size("abc"), None); - } - - #[test] - fn test_read_split_config_defaults() { - let options = HashMap::new(); - let (target, open_cost) = read_split_config(&options); - assert_eq!(target, 128 * 1024 * 1024); - assert_eq!(open_cost, 4 * 1024 * 1024); - } - - #[test] - fn test_read_split_config_custom() { - let mut options = HashMap::new(); - options.insert(SOURCE_SPLIT_TARGET_SIZE.to_string(), "256 mb".to_string()); - options.insert(SOURCE_SPLIT_OPEN_FILE_COST.to_string(), "8 mb".to_string()); - let (target, open_cost) = read_split_config(&options); - assert_eq!(target, 256 * 1024 * 1024); - assert_eq!(open_cost, 8 * 1024 * 1024); - } } diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 4054aa7..dded84c 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -22,7 +22,7 @@ use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::ArrowReaderBuilder; -use crate::spec::DataField; +use crate::spec::{CoreOptions, DataField}; use crate::Result; use crate::{DataSplit, Error}; @@ -77,14 +77,19 @@ impl<'a> TableRead<'a> { /// Returns an [`ArrowRecordBatchStream`]. pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result { // todo: consider get read batch size from table - if !self.table.schema.primary_keys().is_empty() { + let has_primary_keys = !self.table.schema.primary_keys().is_empty(); + let core_options = CoreOptions::new(self.table.schema.options()); + let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); + + if has_primary_keys && !deletion_vectors_enabled { return Err(Error::Unsupported { message: format!( - "Reading tables with primary keys is not yet supported. Primary keys: {:?}", + "Reading primary-key tables without deletion vectors is not yet supported. Primary keys: {:?}", self.table.schema.primary_keys() ), }); } + if !self.table.schema.partition_keys().is_empty() { return Err(Error::Unsupported { message: format!( @@ -93,7 +98,8 @@ impl<'a> TableRead<'a> { ), }); } - let arrow_reader_builder = ArrowReaderBuilder::new(self.table.file_io.clone()).build(); - arrow_reader_builder.read(data_splits) + let reader = + ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec()); + reader.read(data_splits) } } diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index ae08631..ca9ba9b 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -97,7 +97,7 @@ impl PartitionBucket { /// Input split for reading: partition + bucket + list of data files and optional deletion files. /// /// Reference: [org.apache.paimon.table.source.DataSplit](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java) -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DataSplit { snapshot_id: i64, partition: BinaryRow, @@ -274,6 +274,18 @@ impl DataSplitBuilder { source: None, }); } + if let Some(ref data_deletion_files) = self.data_deletion_files { + if data_deletion_files.len() != data_files.len() { + return Err(crate::Error::UnexpectedError { + message: format!( + "DataSplit deletion files length {} must match data_files length {}", + data_deletion_files.len(), + data_files.len() + ), + source: None, + }); + } + } Ok(DataSplit { snapshot_id: self.snapshot_id, partition, diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index f702f79..5d5bc4a 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -22,8 +22,8 @@ use super::Table; use crate::io::FileIO; -use crate::spec::{BinaryRow, FileKind, IndexManifest, ManifestEntry, Snapshot}; -use crate::table::bin_pack::{read_split_config, split_for_batch}; +use crate::spec::{BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry, Snapshot}; +use crate::table::bin_pack::split_for_batch; use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; use crate::table::SnapshotManager; use crate::Error; @@ -76,6 +76,16 @@ async fn read_all_manifest_entries( Ok(all_entries) } +fn filter_manifest_entries( + entries: Vec, + deletion_vectors_enabled: bool, +) -> Vec { + entries + .into_iter() + .filter(|entry| !(deletion_vectors_enabled && entry.file().level == 0)) + .collect() +} + /// Builds a map from (partition, bucket) to (data_file_name -> DeletionFile) from index manifest entries. /// Only considers ADD entries with index_type "DELETION_VECTORS" and their deletion_vectors_ranges. fn build_deletion_files_map( @@ -116,33 +126,26 @@ fn build_deletion_files_map( map } -/// Merges add/delete manifest entries: keeps only ADD entries whose (partition, bucket, file_name) is not in DELETE set. +/// Merges add/delete manifest entries following pypaimon's `adds - deletes` behavior. +/// +/// The identifier must be rich enough to match Paimon's file identity, otherwise a delete +/// for one file version can incorrectly remove another with the same file name. fn merge_manifest_entries(entries: Vec) -> Vec { - let mut deleted = HashSet::new(); - let mut added = Vec::new(); - for e in entries { - // follow python code to use partition, bucket, filename as duplicator - let key = ( - e.partition().to_vec(), - e.bucket(), - e.file().file_name.clone(), - ); - match e.kind() { - FileKind::Add => added.push(e), + let mut deleted_entry_keys = HashSet::new(); + let mut added_entries = Vec::new(); + + for entry in entries { + match entry.kind() { + FileKind::Add => added_entries.push(entry), FileKind::Delete => { - deleted.insert(key); + deleted_entry_keys.insert(entry.identifier()); } } } - added + + added_entries .into_iter() - .filter(|e| { - !deleted.contains(&( - e.partition().to_vec(), - e.bucket(), - e.file().file_name.clone(), - )) - }) + .filter(|entry| !deleted_entry_keys.contains(&entry.identifier())) .collect() } @@ -169,13 +172,17 @@ impl<'a> TableScan<'a> { Some(s) => s, None => return Ok(Plan::new(Vec::new())), }; - let (target_split_size, open_file_cost) = read_split_config(self.table.schema().options()); + let core_options = CoreOptions::new(self.table.schema().options()); + let target_split_size = core_options.source_split_target_size(); + let open_file_cost = core_options.source_split_open_file_cost(); + let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); Self::plan_snapshot( snapshot, file_io, table_path, target_split_size, open_file_cost, + deletion_vectors_enabled, ) .await } @@ -186,8 +193,10 @@ impl<'a> TableScan<'a> { table_path: &str, target_split_size: i64, open_file_cost: i64, + deletion_vectors_enabled: bool, ) -> crate::Result { let entries = read_all_manifest_entries(file_io, table_path, &snapshot).await?; + let entries = filter_manifest_entries(entries, deletion_vectors_enabled); let entries = merge_manifest_entries(entries); if entries.is_empty() { return Ok(Plan::new(Vec::new())); diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 435a5b2..21a7a52 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -22,6 +22,7 @@ from pyspark.sql import SparkSession + def main(): spark = SparkSession.builder.getOrCreate() @@ -29,13 +30,86 @@ def main(): spark.sql("USE paimon.default") # Table: simple log table for read tests - spark.sql(""" + spark.sql( + """ CREATE TABLE IF NOT EXISTS simple_log_table ( id INT, name STRING ) USING paimon - """) + """ + ) spark.sql("INSERT INTO simple_log_table VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") + # Spark SQL here does not accept table constraints like + # PRIMARY KEY (id) NOT ENFORCED inside the column list, so use + # Paimon table properties instead. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS simple_pk_table ( + id INT, + name STRING + ) USING paimon + TBLPROPERTIES ( + 'primary-key' = 'id', + 'bucket' = '1' + ) + """ + ) + spark.sql( + """ + INSERT INTO simple_pk_table VALUES + (1, 'alice'), + (2, 'bob'), + (3, 'carol') + """ + ) + + # Table: primary key table with deletion vectors enabled. + # Re-inserting the same keys with newer values creates deleted historical + # rows that readers must filter via deletion vectors. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS simple_dv_pk_table ( + id INT, + name STRING + ) USING paimon + TBLPROPERTIES ( + 'primary-key' = 'id', + 'bucket' = '2', + 'deletion-vectors.enabled' = 'true' + ) + """ + ) + + spark.sql( + """ + INSERT INTO simple_dv_pk_table VALUES + (1, 'alice-v1'), + (2, 'bob-v1'), + (3, 'carol-v1'), + (5, 'eve-v1') + """ + ) + + spark.sql( + """ + INSERT INTO simple_dv_pk_table VALUES + (2, 'bob-v2'), + (3, 'carol-v2'), + (4, 'dave-v1'), + (6, 'frank-v1') + """ + ) + + spark.sql( + """ + INSERT INTO simple_dv_pk_table VALUES + (1, 'alice-v2'), + (4, 'dave-v2'), + (5, 'eve-v2') + """ + ) + + if __name__ == "__main__": main()