diff --git a/Cargo.toml b/Cargo.toml index 9c6a0a54d4365..6ffacad94e32d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ arrow-flight = { version = "57.2.0", features = [ ] } arrow-ipc = { version = "57.2.0", default-features = false, features = [ "lz4", + "zstd", ] } arrow-ord = { version = "57.2.0", default-features = false } arrow-schema = { version = "57.2.0", default-features = false } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..0e2fb82adfcf4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -172,8 +172,16 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; - let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + // Fast path for single batch, avoiding `concat_batches` overhead + let (batch, left_values) = if batches.len() == 1 { + let batch = &batches[0]; + let left_values = evaluate_expressions_to_arrays(on_left, batch)?; + (batch.clone(), left_values) + } else { + let batch = concat_batches(schema, batches)?; + let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + (batch, left_values) + }; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; @@ -1675,10 +1683,18 @@ async fn collect_left_input( offset += batch.num_rows(); } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; - - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + let (batch, left_values) = if batches.len() == 1 { + // Fast path for single batch, avoiding `concat_batches` overhead + let batch = batches.into_iter().next().unwrap(); + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + (batch, left_values) + } else { + // The build side has multiple batches, so we need to concatenate them + // Merge all batches into a single batch, so we can directly index into the arrays + let batch = concat_batches(&schema, batches_iter.clone())?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + (batch, left_values) + }; (Map::HashMap(hashmap), batch, left_values) };