From ac1b582ee3968d1aa94036b6e526d962f57c8b89 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:03:46 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20hash=20join=20bu?= =?UTF-8?q?ild=20side=20for=20single-batch=20inputs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This optimization speeds up hash joins with a small build side (single batch) by: 1. Bypassing `concat_batches` which is unnecessary for a single batch. 2. Reusing pre-evaluated join key arrays to avoid redundant `evaluate_expressions_to_arrays` calls in both the ArrayMap attempt and standard HashMap fallback. 3. Only performing pre-evaluation for single-column joins to avoid unnecessary work for multi-column joins. The implementation carefully handles the borrow checker by ensuring ownership of the batch vector is only taken when appropriate. Tests passed in datafusion-physical-plan. Co-authored-by: Dandandan <163737+Dandandan@users.noreply.github.com> --- .../physical-plan/src/joins/hash_join/exec.rs | 58 +++++++++++++++---- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10aacf..72309f3bfe44b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -107,17 +107,24 @@ fn try_create_array_map( perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, + pre_evaluated_left_values: Option>, ) -> Result)>> { if on_left.len() != 1 { return Ok(None); } if null_equality == NullEquality::NullEqualsNull { - for batch in batches.iter() { - let arrays = evaluate_expressions_to_arrays(on_left, batch)?; - if arrays[0].null_count() > 0 { + if let Some(ref left_values) = pre_evaluated_left_values { + if left_values[0].null_count() > 0 { return Ok(None); } + } else { + for batch in batches.iter() { + let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + if arrays[0].null_count() > 0 { + return Ok(None); + } + } } } @@ -172,8 +179,19 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; - let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + let (batch, left_values) = if batches.len() == 1 { + let batch = batches[0].clone(); + let left_values = if let Some(left_values) = pre_evaluated_left_values { + left_values + } else { + evaluate_expressions_to_arrays(on_left, &batch)? + }; + (batch, left_values) + } else { + let batch = concat_batches(schema, batches)?; + let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + (batch, left_values) + }; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; @@ -1615,6 +1633,15 @@ async fn collect_left_input( _ => None, }; + // Pre-evaluate join keys if we have a single batch and a single join key. + // This optimization targets the common case of small joins, avoiding redundant evaluations + // between ArrayMap creation attempts and the standard HashMap fallback. + let single_batch_keys = if batches.len() == 1 && on_left.len() == 1 { + Some(evaluate_expressions_to_arrays(&on_left, &batches[0])?) + } else { + None + }; + let (join_hash_map, batch, left_values) = if let Some((array_map, batch, left_value)) = try_create_array_map( &bounds, @@ -1625,6 +1652,7 @@ async fn collect_left_input( config.execution.perfect_hash_join_small_build_threshold, config.execution.perfect_hash_join_min_key_density, null_equality, + single_batch_keys.clone(), )? { array_map_created_count.add(1); metrics.build_mem_used.add(array_map.size()); @@ -1656,10 +1684,8 @@ async fn collect_left_input( let mut hashes_buffer = Vec::new(); let mut offset = 0; - let batches_iter = batches.iter().rev(); - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { + for batch in batches.iter().rev() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); update_hash( @@ -1676,9 +1702,19 @@ async fn collect_left_input( } // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; - - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + let (batch, left_values) = if batches.len() == 1 { + let batch = batches.into_iter().next().unwrap(); + let left_values = if let Some(keys) = single_batch_keys { + keys + } else { + evaluate_expressions_to_arrays(&on_left, &batch)? + }; + (batch, left_values) + } else { + let batch = concat_batches(&schema, batches.iter().rev())?; + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + (batch, left_values) + }; (Map::HashMap(hashmap), batch, left_values) };