Conversation
When total tablets are much larger than pipeline capacity, one-tablet-per-instance planning creates excessive BE concurrency pressure in query-cache workloads. Trigger partition-based planning when: total_tablets > parallel_pipeline_task_num * participating_be_num Before: instance_num ~= total_tablets After: instance_num ~= partitions_on_each_be Per-BE planning example: BE1 tablets: p1[t1,t2], p2[t3] -> instances: [p1:t1,t2], [p2:t3] BE2 tablets: p1[t4], p2[t5,t6] -> instances: [p1:t4], [p2:t5,t6] This keeps tablets from the same partition in one instance and separates different partitions into different instances. If partition mapping is incomplete or partition planning fails, fallback to default planning for correctness. Tests: - partition-based planning path - fallback-to-default path (incomplete mapping) - non-query-cache default planning path
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR extends query-cache execution to support multiple tablets per instance (when appropriate) and updates FE planning to group tablets by partition to reduce BE instance concurrency/pressure while preserving cache correctness constraints.
Changes:
- FE: Add “best-effort” partition-based instance assignment for query-cache scans of a single OLAP table, with fallback to default planning.
- BE: Update query cache key construction to support multiple scan ranges (multiple tablets) in one instance and update cache source profiling accordingly.
- Tests: Add FE unit tests for partition-based assignment/fallback, and expand BE unit tests for multi-tablet cache-key behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| fe/fe-core/src/main/java/.../UnassignedScanSingleOlapTableJob.java | Adds partition-based instance assignment for query-cache scans with safe fallback. |
| fe/fe-core/src/test/java/.../UnassignedScanSingleOlapTableJobTest.java | Adds unit tests covering partition grouping, fallback, and non-query-cache behavior. |
| be/src/pipeline/query_cache/query_cache.h | Enhances cache key building to handle multiple tablets per instance and validates consistency constraints. |
| be/src/pipeline/exec/cache_source_operator.cpp | Updates profiling output to reflect multiple tablets used for cache lookup. |
| be/test/pipeline/exec/query_cache_test.cpp | Adds/updates tests for multi-tablet cache-key construction and failure cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| std::vector<int64_t> tablet_ids; | ||
| tablet_ids.reserve(scan_ranges.size()); | ||
| for (const auto& scan_range : scan_ranges) { | ||
| auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id; | ||
| tablet_ids.push_back(tablet_id); | ||
| } | ||
| std::sort(tablet_ids.begin(), tablet_ids.end()); | ||
|
|
There was a problem hiding this comment.
QueryCache::build_cache_key now uses std::sort / std::find_if (and std::from_chars) in this header, but the header doesn't include <algorithm> (and should also explicitly include <charconv> if not guaranteed elsewhere). Depending on include order this can break compilation; please add the required standard headers here.
| tablet_ids_str += ","; | ||
| } | ||
| } | ||
| custom_profile()->add_info_string("CacheTabletId", tablet_ids_str); |
There was a problem hiding this comment.
CacheTabletId profile entry now contains a comma-separated list of multiple tablet ids. Consider renaming the profile key to something plural (e.g. CacheTabletIds) to avoid confusion when debugging profiles.
| custom_profile()->add_info_string("CacheTabletId", tablet_ids_str); | |
| custom_profile()->add_info_string("CacheTabletIds", tablet_ids_str); |
| TPaloScanRange palp_scan_range1; | ||
| palp_scan_range1.__set_tablet_id(1); | ||
| palp_scan_range1.__set_version("100"); |
There was a problem hiding this comment.
The variable name palp_scan_range* looks like a typo (should likely be palo_scan_range* to match the Thrift type TPaloScanRange). Renaming would improve readability and avoid confusion in future test edits.
| ConnectContext connectContext = new ConnectContext(); | ||
| connectContext.setThreadLocalInfo(); | ||
| connectContext.setQueryId(new TUniqueId(1, 1)); | ||
| connectContext.getSessionVariable().parallelPipelineTaskNum = 1; | ||
| StatementContext statementContext = new StatementContext( |
There was a problem hiding this comment.
These tests set the thread-local ConnectContext via setThreadLocalInfo() but never clear it (e.g. ConnectContext.remove()). Many other FE tests clean up the thread-local to avoid cross-test interference; please add an @AfterEach (or finally block) to remove the context.
| for (const auto& scan_range : scan_ranges) { | ||
| auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id; | ||
| tablet_ids.push_back(tablet_id); | ||
| } |
There was a problem hiding this comment.
QueryCache::build_cache_key reads scan_range.scan_range.palo_scan_range.* without checking scan_range.scan_range.__isset.palo_scan_range (the previous DCHECK is gone). If a non-palo scan range reaches here this will build an invalid key / version; please validate the field is set for every range and return a clear error instead of assuming it.
| std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(), | ||
| scan_range_iter->scan_range.palo_scan_range.version.data() + | ||
| scan_range_iter->scan_range.palo_scan_range.version.size(), | ||
| current_version); |
There was a problem hiding this comment.
std::from_chars(...) parse result is ignored. If version is empty/non-numeric/partially-numeric, current_version can remain -1 (or parse partially) and still be treated as valid, leading to wrong cache versioning. Please check the returned std::from_chars_result (ec/ptr) and fail fast on parse errors.
| std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(), | |
| scan_range_iter->scan_range.palo_scan_range.version.data() + | |
| scan_range_iter->scan_range.palo_scan_range.version.size(), | |
| current_version); | |
| const auto& version_str = scan_range_iter->scan_range.palo_scan_range.version; | |
| const char* version_begin = version_str.data(); | |
| const char* version_end = version_begin + version_str.size(); | |
| auto parse_result = std::from_chars(version_begin, version_end, current_version); | |
| if (parse_result.ec != std::errc() || parse_result.ptr != version_end) { | |
| return Status::InternalError("tablet version is invalid, plan error"); | |
| } |
| auto scan_range_iter = | ||
| std::find_if(scan_ranges.begin(), scan_ranges.end(), | ||
| [&tablet_id](const TScanRangeParams& range) { | ||
| return range.scan_range.palo_scan_range.tablet_id == tablet_id; | ||
| }); |
There was a problem hiding this comment.
build_cache_key does a std::find_if(scan_ranges.begin(), scan_ranges.end(), ...) inside a loop over tablet_ids, making it O(n^2) per instance. For large instances this adds avoidable CPU overhead; consider a single pass that builds a map (tablet_id -> parsed version) or collects (tablet_id, version, range) pairs and then sorts.
|
|
||
| auto find_tablet = cache_param.tablet_to_range.find(tablet_id); | ||
| if (find_tablet == cache_param.tablet_to_range.end()) { | ||
| return Status::InternalError("Not find tablet in partition_to_tablets, plan error"); |
There was a problem hiding this comment.
The error message says partition_to_tablets, but the code is actually checking cache_param.tablet_to_range. Please update the message to reference the correct field to make diagnosing plan issues easier.
| return Status::InternalError("Not find tablet in partition_to_tablets, plan error"); | |
| return Status::InternalError("Not find tablet in tablet_to_range, plan error"); |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)