diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 73386212a2a90..75eb2ff980325 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -24,8 +24,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; use log::warn; -use std::any::Any; -use std::panic::{AssertUnwindSafe, catch_unwind}; use std::sync::Arc; #[derive(Debug, Copy, Clone, Default)] @@ -143,7 +141,9 @@ impl BatchBuilder { .iter() .map(|(_, batch)| batch.column(column_idx).as_ref()) .collect(); - recover_offset_overflow_from_panic(|| interleave(&arrays, indices)) + // Arrow 58.1.0+ returns OffsetOverflowError directly from + // interleave, allowing retry_interleave to shrink the batch. + interleave(&arrays, indices).map_err(Into::into) }) .collect::>>() } @@ -243,33 +243,11 @@ fn is_offset_overflow(e: &DataFusionError) -> bool { ) } +#[cfg(test)] fn offset_overflow_error() -> DataFusionError { DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None) } -fn recover_offset_overflow_from_panic(f: F) -> Result -where - F: FnOnce() -> std::result::Result, -{ - // Arrow's interleave can panic on i32 offset overflow with - // `.expect("overflow")` / `.expect("offset overflow")`. - // Catch only those specific panics so the caller can retry - // with fewer rows while unrelated defects still unwind. - // - // TODO: remove once arrow-rs#9549 lands — interleave will return - // OffsetOverflowError directly instead of panicking. - match catch_unwind(AssertUnwindSafe(f)) { - Ok(result) => Ok(result?), - Err(panic_payload) => { - if is_arrow_offset_overflow_panic(panic_payload.as_ref()) { - Err(offset_overflow_error()) - } else { - std::panic::resume_unwind(panic_payload); - } - } - } -} - fn retry_interleave( mut rows_to_emit: usize, total_rows: usize, @@ -281,6 +259,7 @@ where loop { match interleave(rows_to_emit) { Ok(value) => return Ok((rows_to_emit, value)), + // Only offset overflow is recoverable by emitting fewer rows. Err(e) if is_offset_overflow(&e) => { rows_to_emit /= 2; if rows_to_emit == 0 { @@ -295,26 +274,34 @@ where } } -fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> { - if let Some(msg) = payload.downcast_ref::<&str>() { - return Some(msg); - } - if let Some(msg) = payload.downcast_ref::() { - return Some(msg.as_str()); - } - None -} - -/// Returns true if a caught panic payload matches the Arrow offset overflows -/// raised by interleave's offset builders. -fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool { - matches!(panic_message(payload), Some("overflow" | "offset overflow")) -} - #[cfg(test)] mod tests { use super::*; - use arrow::error::ArrowError; + use arrow::array::{Array, ArrayDataBuilder, Int32Array, ListArray}; + use arrow::buffer::Buffer; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, UnboundedMemoryPool, + }; + + fn overflow_list_batch() -> RecordBatch { + let values_field = Arc::new(Field::new_list_field(DataType::Int32, true)); + // SAFETY: This intentionally constructs an invalid child length so + // Arrow's interleave hits offset overflow before touching child data. + let list = ListArray::from(unsafe { + ArrayDataBuilder::new(DataType::List(Arc::clone(&values_field))) + .len(1) + .add_buffer(Buffer::from_slice_ref([0_i32, i32::MAX])) + .add_child_data(Int32Array::from(Vec::::new()).to_data()) + .build_unchecked() + }); + let schema = Arc::new(Schema::new(vec![Field::new( + "list_col", + DataType::List(values_field), + true, + )])); + RecordBatch::try_new(schema, vec![Arc::new(list)]).unwrap() + } #[test] fn test_retry_interleave_halves_rows_until_success() { @@ -336,43 +323,37 @@ mod tests { } #[test] - fn test_recover_offset_overflow_from_panic() { - let error = recover_offset_overflow_from_panic( - || -> std::result::Result<(), ArrowError> { panic!("offset overflow") }, - ) - .unwrap_err(); - - assert!(is_offset_overflow(&error)); + fn test_is_offset_overflow_matches_arrow_error() { + assert!(is_offset_overflow(&offset_overflow_error())); } #[test] - fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() { - let panic_payload = catch_unwind(AssertUnwindSafe(|| { - let _ = recover_offset_overflow_from_panic( - || -> std::result::Result<(), ArrowError> { panic!("capacity overflow") }, - ); - })); - - assert!(panic_payload.is_err()); + fn test_retry_interleave_does_not_retry_non_offset_errors() { + let mut attempts = Vec::new(); + + let error = retry_interleave(4, 4, |rows_to_emit| { + attempts.push(rows_to_emit); + Err::<(), _>(DataFusionError::Execution("boom".into())) + }) + .unwrap_err(); + + assert_eq!(attempts, vec![4]); + assert!(matches!(error, DataFusionError::Execution(msg) if msg == "boom")); } #[test] - fn test_is_arrow_offset_overflow_panic() { - let overflow = Box::new("overflow") as Box; - assert!(is_arrow_offset_overflow_panic(overflow.as_ref())); - - let offset_overflow = - Box::new(String::from("offset overflow")) as Box; - assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref())); - - let capacity_overflow = Box::new("capacity overflow") as Box; - assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref())); - - let arithmetic_overflow = - Box::new(String::from("attempt to multiply with overflow")) - as Box; - assert!(!is_arrow_offset_overflow_panic( - arithmetic_overflow.as_ref() - )); + fn test_try_interleave_columns_surfaces_arrow_offset_overflow() { + let batch = overflow_list_batch(); + let schema = batch.schema(); + let pool: Arc = Arc::new(UnboundedMemoryPool::default()); + let reservation = MemoryConsumer::new("test").register(&pool); + let mut builder = BatchBuilder::new(schema, 1, 2, reservation); + builder.push_batch(0, batch).unwrap(); + + let error = builder + .try_interleave_columns(&[(0, 0), (0, 0)]) + .unwrap_err(); + + assert!(is_offset_overflow(&error)); } }