Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Planner/Utils.h>
Expand Down Expand Up @@ -115,11 +117,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, size_t entry_, ContextPtr context)
: Base(context)
, types(types_)
, entry(entry_) {}

bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/)
{
return getSubqueryDepth() <= 2 && !passed_node;
return getSubqueryDepth() <= 2 && !passed_node && !current_entry;
}

void enterImpl(QueryTreeNodePtr & node)
Expand All @@ -130,13 +135,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
auto node_type = node->getNodeType();

if (types.contains(node_type))
passed_node = node;
{
++current_entry;
if (current_entry == entry)
passed_node = node;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }

private:
std::unordered_set<QueryTreeNodeType> types;
size_t entry;
size_t current_entry = 0;
QueryTreeNodePtr passed_node;
};

Expand Down Expand Up @@ -203,33 +214,42 @@ Converts
localtable as t
ON s3.key == t.key

to
to (object_storage_cluster_join_mode='local')

SELECT s3.c1, s3.c2, s3.key
FROM
s3Cluster(...) AS s3

or (object_storage_cluster_join_mode='global')

SELECT s3.c1, s3.c2, t.c3
FROM
s3Cluster(...) as s3
JOIN
values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t
ON s3.key == t.key
*/
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
ASTPtr & query_to_send,
QueryTreeNodePtr query_tree,
SelectQueryInfo query_info,
const ContextPtr & context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
switch (object_storage_cluster_join_mode)
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto info = getQueryTreeInfo(query_tree, context);
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();
auto modified_query_tree = query_info.query_tree->clone();

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(modified_query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node");

QueryTreeNodePtr query_tree_distributed;

Expand All @@ -242,7 +262,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
}
else if (info.has_cross_join)
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context);
join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
Expand Down Expand Up @@ -297,8 +317,24 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
// TODO
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
{
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_info.query_tree->clone();

rewriteJoinToGlobalJoin(modified_query_tree, context);
modified_query_tree = buildQueryTreeForShard(
query_info.planner_context,
modified_query_tree,
/*allow_global_join_for_right_table*/ true,
/*find_cross_join*/ true);
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
return;
}
Expand Down Expand Up @@ -336,7 +372,7 @@ void IStorageCluster::read(
SharedHeader sample_block;
ASTPtr query_to_send = query_info.query;

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context);

if (settings[Setting::allow_experimental_analyzer])
{
Expand Down Expand Up @@ -374,6 +410,10 @@ void IStorageCluster::read(

auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());

std::optional<Tables> external_tables = std::nullopt;
if (query_info.planner_context && query_info.planner_context->getMutableQueryContext())
external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables();

auto reading = std::make_unique<ReadFromCluster>(
column_names,
query_info,
Expand All @@ -384,7 +424,8 @@ void IStorageCluster::read(
std::move(query_to_send),
processed_stage,
cluster,
log);
log,
external_tables);

query_plan.addStep(std::move(reading));
}
Expand Down Expand Up @@ -502,7 +543,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
external_tables.has_value() ? *external_tables : Tables(),
processed_stage,
nullptr,
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
Expand Down Expand Up @@ -540,7 +581,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt
info.has_cross_join = true;
}

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
Expand Down Expand Up @@ -575,9 +616,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL)
{
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}
}

/// Initiator executes query on remote node.
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IStorageCluster : public IStorage

protected:
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context);

virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}

Expand Down Expand Up @@ -137,7 +137,8 @@ class ReadFromCluster : public SourceStepWithFilter
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
LoggerPtr log_)
LoggerPtr log_,
std::optional<Tables> external_tables_)
: SourceStepWithFilter(
std::move(sample_block),
column_names_,
Expand All @@ -149,6 +150,7 @@ class ReadFromCluster : public SourceStepWithFilter
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
, external_tables(external_tables_)
{
}

Expand All @@ -160,6 +162,7 @@ class ReadFromCluster : public SourceStepWithFilter
LoggerPtr log;

std::optional<RemoteQueryExecutor::Extension> extension;
std::optional<Tables> external_tables;

void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
Expand Down
43 changes: 37 additions & 6 deletions src/Storages/buildQueryTreeForShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace Setting
extern const SettingsBool prefer_global_in_and_join;
extern const SettingsBool enable_add_distinct_to_in_subqueries;
extern const SettingsInt64 optimize_const_name_size;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
}

namespace ErrorCodes
Expand Down Expand Up @@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;

explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_)
: Base(context_)
, find_cross_join(find_cross_join_)
{}

struct InFunctionOrJoin
Expand Down Expand Up @@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
CrossJoinNode * cross_join_node = find_cross_join ? node->as<CrossJoinNode>() : nullptr;

if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
(join_node && join_node->getLocality() == JoinLocality::Global) ||
cross_join_node)
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
Expand Down Expand Up @@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) &&
else if ((distributed_product_mode == DistributedProductMode::GLOBAL ||
getSettings()[Setting::prefer_global_in_and_join] ||
(find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
Expand Down Expand Up @@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;

bool find_cross_join = false;
};

/** Replaces large constant values with `__getScalar` function calls to avoid
Expand Down Expand Up @@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression(

}

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table)
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join)
{
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);

const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();

DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext());
DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join);
visitor.visit(query_tree_to_modify);

auto replacement_map = visitor.getReplacementMap();
Expand Down Expand Up @@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
if (auto * cross_join_node = global_in_or_join_node.query_node->as<CrossJoinNode>())
{
auto tables_count = cross_join_node->getTableExpressions().size();
for (size_t i = 1; i < tables_count; ++i)
{
QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i];

auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext());

auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_table_expression->getAlias());

replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
}
continue;
}
if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
Expand Down Expand Up @@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext
{
if (auto * join_node = node->as<JoinNode>())
{
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join];
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]
&& getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL;
Comment on lines +694 to +695

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep local-join preference outside object-storage mode

rewriteJoinToGlobalJoin is also used by parallel-replica planning (src/Planner/findParallelReplicasQuery.cpp and src/Interpreters/ClusterProxy/executeQuery.cpp), so tying prefer_local_join to object_storage_cluster_join_mode here makes unrelated non-object-storage queries ignore parallel_replicas_prefer_local_join whenever the session sets object_storage_cluster_join_mode='global'. That forces extra GLOBAL JOIN rewrites in those flows and can significantly increase broadcast/memory costs; this override should be scoped to the object-storage cluster path instead of the shared visitor.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, object_storage_cluster_join_mode=global has priority and oveerides parallel_replicas_prefer_local_join setting here.

bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression());
if (should_use_global_join)
join_node->setLocality(JoinLocality::Global);
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/buildQueryTreeForShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table);
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join = false);

void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context);

Expand Down
Loading
Loading