From da8ee49875e47178952af046ef8f2147de6d6afa Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 13:00:35 +0000 Subject: [PATCH] feat: Add fast path for single-batch build side in HashJoinExec This optimization adds a fast path to `collect_left_input` for the common case where the build side of a hash join consists of a single `RecordBatch`. By handling this case separately, it avoids the expensive `concat_batches` operation, which allocates new memory and copies data. This results in reduced memory allocation and CPU usage for single-batch build sides. --- .../physical-plan/src/joins/hash_join/exec.rs | 64 ++++++++++++++----- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..85e8b2ad02b6b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1653,34 +1653,66 @@ async fn collect_left_input( Box::new(JoinHashMapU32::with_capacity(num_rows)) }; - let mut hashes_buffer = Vec::new(); - let mut offset = 0; + // ⚡ FAST PATH: If the build side is a single batch, we can bypass the expensive + // `concat_batches` operation and build the hash map directly. + // + // 💡 What: This is a performance optimization for the common case where the build side + // of a hash join is small enough to fit into a single `RecordBatch`. + // + // 🎯 Why: The standard logic iterates over multiple batches, reverses them, and then + // calls `concat_batches`. The `concat_batches` function is expensive as it allocates + // new memory buffers and copies all the data. + // + // 📊 Impact: This fast path avoids the allocations and copies from `concat_batches`, + // and also saves the overhead of iterating over a vector of batches, resulting in + // a significant performance improvement for single-batch build sides. + if batches.len() == 1 { + let batch = batches.into_iter().next().unwrap(); + let mut hashes_buffer = vec![0; batch.num_rows()]; - let batches_iter = batches.iter().rev(); - - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); update_hash( &on_left, - batch, + &batch, &mut *hashmap, - offset, + 0, &random_state, &mut hashes_buffer, 0, true, )?; - offset += batch.num_rows(); - } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + (Map::HashMap(hashmap), batch, left_values) + } else { + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + let batches_iter = batches.iter().rev(); + + // Updating hashmap starting from the last batch + for batch in batches_iter.clone() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + batch, + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + )?; + offset += batch.num_rows(); + } + + // Merge all batches into a single batch, so we can directly index into the arrays + let batch = concat_batches(&schema, batches_iter.clone())?; - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; - (Map::HashMap(hashmap), batch, left_values) + (Map::HashMap(hashmap), batch, left_values) + } }; // Reserve additional memory for visited indices bitmap and create shared builder