Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support QueryMeta and diagnostics #11371

Merged
merged 26 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Query::ExecuteQueryRequest& req) {
switch (req.exec_mode()) {
case Ydb::Query::EXEC_MODE_EXPLAIN:
return true;

case Ydb::Query::EXEC_MODE_EXECUTE:
switch (req.stats_mode()) {
case Ydb::Query::StatsMode::STATS_MODE_FULL:
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
return true;
default:
return false;
}

default:
return false;
}
}

class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -284,6 +303,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
req->pool_id());

ev->SetProgressStatsPeriod(TDuration::MilliSeconds(req->stats_period_ms()));
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -403,6 +423,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
if (NeedCollectDiagnostics(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
}
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ using namespace Ydb;
using namespace Ydb::Table;
using namespace NKqp;

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteDataQueryRequest& req) {
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
return false;
}
}

using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

Expand Down Expand Up @@ -147,6 +157,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId());
Expand All @@ -166,6 +178,9 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
if (from.HasQueryStats()) {
FillQueryStats(*to->mutable_query_stats(), from);
to->mutable_query_stats()->set_query_ast(from.GetQueryAst());
if (from.HasQueryDiagnostics()) {
to->mutable_query_stats()->set_query_meta(from.GetQueryDiagnostics());
}
return;
}
}
Expand Down
28 changes: 26 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteScanQueryRequest& req) {
switch (req.mode()) {
case ExecuteScanQueryRequest_Mode_MODE_EXPLAIN:
return true;

case ExecuteScanQueryRequest_Mode_MODE_EXEC:
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
break;
}

return false;

default:
return false;
}
}

bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
{
switch (req.mode()) {
Expand Down Expand Up @@ -228,7 +249,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
nullptr
);

ev->Record.MutableRequest()->SetCollectDiagnostics(req->Getcollect_full_diagnostics());
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -291,6 +312,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

bool reportStats = NeedReportStats(*Request_->GetProtoRequest());
bool reportPlan = reportStats && NeedReportPlan(*Request_->GetProtoRequest());
bool collectDiagnostics = NeedCollectDiagnostics(*Request_->GetProtoRequest());

if (reportStats) {
if (kqpResponse.HasQueryStats()) {
Expand All @@ -308,7 +330,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
response.mutable_result()->mutable_query_stats()->set_query_ast(kqpResponse.GetQueryAst());
}

response.mutable_result()->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
if (collectDiagnostics) {
response.mutable_result()->mutable_query_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
}

Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,14 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}, const std::optional<TString>& replayMessage = std::nullopt)
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
: CompileResult(compileResult)
, ReplayMessage(replayMessage)
, Orbit(std::move(orbit)) {
}

TKqpCompileResult::TConstPtr CompileResult;
TKqpStatsCompile Stats;
std::optional<TString> ReplayMessage;
std::optional<TString> ReplayMessageUserView;

NLWTrace::TOrbit Orbit;
};
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/kqp/common/compilation/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@ struct TKqpCompileResult {

TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, TMaybe<TQueryAst> queryAst = {},
bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
: Status(status)
, Issues(issues)
, Query(std::move(query))
, Uid(uid)
, MaxReadType(maxReadType)
, QueryAst(std::move(queryAst))
, NeedToSplit(needToSplit)
, CommandTagName(commandTagName) {}
, CommandTagName(commandTagName)
, ReplayMessageUserView(replayMessageUserView) {}

static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {},
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
{
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName);
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName, replayMessageUserView);
}

std::shared_ptr<NYql::TAstParseResult> GetAst() const;
Expand All @@ -47,6 +48,8 @@ struct TKqpCompileResult {
bool NeedToSplit = false;
TMaybe<TString> CommandTagName = {};

TMaybe<TString> ReplayMessageUserView;

std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
};

Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

replayMessage.InsertValue("query_id", Uid);
replayMessage.InsertValue("version", "1.0");
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
NJson::TJsonValue queryParameterTypes(NJson::JSON_MAP);
if (QueryId.QueryParameterTypes) {
for (const auto& [paramName, paramType] : *QueryId.QueryParameterTypes) {
Expand All @@ -365,7 +364,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
replayMessage.InsertValue("query_database", QueryId.Database);
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
replayMessage.InsertValue("query_plan", queryPlan);
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));

if (CollectFullDiagnostics) {
Expand All @@ -380,6 +378,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
ReplayMessageUserView = NJson::WriteJson(replayMessage, /*formatOutput*/ false);
}

replayMessage.InsertValue("query_plan", queryPlan);
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false)));
replayMessage.InsertValue("table_meta_serialization_type", EMetaSerializationType::EncodedProto);

Expand All @@ -401,10 +401,12 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", issues: " << KqpCompileResult->Issues.ToString()
<< ", uid: " << KqpCompileResult->Uid);

if (ReplayMessageUserView) {
KqpCompileResult->ReplayMessageUserView = std::move(*ReplayMessageUserView);
}
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(KqpCompileResult);

responseEv->ReplayMessage = std::move(ReplayMessage);
responseEv->ReplayMessageUserView = std::move(ReplayMessageUserView);
ReplayMessage = std::nullopt;
ReplayMessageUserView = std::nullopt;
auto& stats = responseEv->Stats;
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

if (compileResult->NeedToSplit) {
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
ProcessQueue(ctx);
return;
}
Expand All @@ -635,7 +635,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
for (auto& request : requests) {
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
Reply(request.Sender, compileResult, compileStats, ctx,
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan));
}
} else {
if (!hasTempTablesNameClashes) {
Expand All @@ -647,7 +647,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
Expand Down Expand Up @@ -809,7 +809,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) {
return false;
}
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst,
false, {}, compileResult->ReplayMessageUserView);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
Expand Down Expand Up @@ -865,7 +866,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
LWTRACK(KqpCompileServiceReply,
Expand All @@ -878,7 +879,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", queryUid: " << compileResult->Uid
<< ", status:" << compileResult->Status);

auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit));
responseEv->Stats = compileStats;

if (span) {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
return false;
}
Orbit = std::move(ev->Orbit);
if (ev->ReplayMessage) {
ReplayMessage = *ev->ReplayMessage;
}

return true;
}
Expand All @@ -160,6 +157,10 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr comp
return false;
}

if (compileResult->ReplayMessageUserView && GetCollectDiagnostics()) {
ReplayMessage = *compileResult->ReplayMessageUserView;
}

YQL_ENSURE(CompileResult->PreparedQuery);
const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion();
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2698,15 +2698,15 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(!value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(!value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

Expand Down
16 changes: 7 additions & 9 deletions ydb/core/kqp/ut/olap/helpers/query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

namespace NKikimr::NKqp {

TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* diagnostics /*= nullptr*/) {
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* meta /*= nullptr*/) {
TVector<THashMap<TString, NYdb::TValue>> rows;
if (statInfo) {
*statInfo = NJson::JSON_NULL;
}
if (diagnostics) {
*diagnostics = NJson::JSON_NULL;
if (meta) {
*meta = NJson::JSON_NULL;
}
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
Expand All @@ -28,12 +28,10 @@ TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPar
if (plan && statInfo) {
UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
}
}

if (streamPart.HasDiagnostics()) {
auto diagnosticsString = TString{streamPart.GetDiagnostics()};
if (!diagnosticsString.empty() && diagnostics) {
UNIT_ASSERT(NJson::ReadJsonFastTree(diagnosticsString, diagnostics));
auto metaString = streamPart.GetQueryStats().GetMeta();
if (metaString && !metaString->empty() && meta) {
UNIT_ASSERT(NJson::ReadJsonFastTree(*metaString, meta));
}
}

Expand Down Expand Up @@ -70,4 +68,4 @@ TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableCl
return rows;
}

}
}
Loading
Loading