Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new vectorized column API (insert_to_multi_column) to scatter rows from one source column into multiple destination columns, and uses it to optimize row distribution into exchange channels by directly scattering block columns into each channel’s mutable block before flushing.
Changes:
- Add
IColumn::insert_to_multi_column(...)with a COWHelper default implementation, plus optimized overrides forColumnVector,ColumnStr(string), andColumnNullable. - Extend
BlockSerializer/Channelto support external mutable-block initialization/access and a post-scatter flush path (try_flush_after_scatter). - Update
ExchangeTrivialWriter::_channel_add_rowsto distribute rows via per-column scatter instead of building an_origin_row_idxindirection array.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| be/src/vec/sink/vdata_stream_sender.h | Add serializer helpers (ensure/init, flush decision, serialize helper) and expose serializer/flush method on Channel. |
| be/src/vec/sink/vdata_stream_sender.cpp | Implement Channel::try_flush_after_scatter. |
| be/src/vec/common/cow.h | Wire insert_to_multi_column through COWHelper to the base implementation. |
| be/src/vec/columns/column.h | Add new pure virtual API + default impl helper; update column interface surface. |
| be/src/vec/columns/column_vector.h | Declare ColumnVector::insert_to_multi_column. |
| be/src/vec/columns/column_vector.cpp | Implement optimized numeric scatter into multiple destination columns. |
| be/src/vec/columns/column_string.h | Declare ColumnStr::insert_to_multi_column. |
| be/src/vec/columns/column_string.cpp | Implement optimized string scatter (offsets/chars) into multiple destinations. |
| be/src/vec/columns/column_nullable.h | Declare ColumnNullable::insert_to_multi_column. |
| be/src/vec/columns/column_nullable.cpp | Implement nullable scatter by delegating to nested column + null-map scatter. |
| be/src/pipeline/shuffle/exchange_writer.cpp | Use insert_to_multi_column to scatter each input column into channel mutable blocks, then flush channels. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| RETURN_IF_CATCH_EXCEPTION({ | ||
| // Ensure each channel's mutable block is initialized. | ||
| // Even EOF channels need a valid mutable block as a dummy destination, | ||
| // since insert_to_multi_column scatters unconditionally. | ||
| for (size_t i = 0; i < channel_count; ++i) { | ||
| channels[i]->serializer().ensure_mutable_block(block); | ||
| } |
There was a problem hiding this comment.
insert_to_multi_column currently forces ensure_mutable_block() for all channels (including is_receiver_eof() ones). For EOF channels this re-allocates a mutable block after close() has reset it, and since EOF channels are skipped in the flush loops, rows routed to them will accumulate in memory and never be released until query end. Consider routing EOF channels to a per-call dummy MutableBlock/columns (or teaching scatter to ignore nullptr dsts), and only initializing real channel blocks for non-EOF channels that actually receive rows in this batch.
| // positions.size() == this->size(), positions[i] is the index into dsts | ||
| // that row i should be inserted into. |
There was a problem hiding this comment.
The new API comment mentions positions.size() == this->size(), but the signature now takes a raw positions pointer plus an explicit rows count. Please update the comment to reflect the actual contract (e.g., positions has length rows, and rows may be <= size()).
| // positions.size() == this->size(), positions[i] is the index into dsts | |
| // that row i should be inserted into. | |
| // `positions` points to an array of length `rows`, and `rows` is the number of | |
| // source rows to scatter (rows <= this->size()). positions[i] is the index into | |
| // `dsts` that row i should be inserted into. |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)