From d6360ab274545ac214eae79ba2d20423fa8db5dc Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Sun, 9 Feb 2025 19:11:13 +0500 Subject: [PATCH] YQ-3561 first version of FQ run tool (#14259) --- ydb/core/testlib/test_client.cpp | 12 +- ydb/core/testlib/test_client.h | 2 + ydb/tests/tools/fqrun/.gitignore | 4 + .../tools/fqrun/configuration/fq_config.conf | 141 +++++++++ ydb/tests/tools/fqrun/fqprun.cpp | 166 +++++++++++ ydb/tests/tools/fqrun/src/common.h | 29 ++ ydb/tests/tools/fqrun/src/fq_runner.cpp | 136 +++++++++ ydb/tests/tools/fqrun/src/fq_runner.h | 24 ++ ydb/tests/tools/fqrun/src/fq_setup.cpp | 261 +++++++++++++++++ ydb/tests/tools/fqrun/src/fq_setup.h | 33 +++ ydb/tests/tools/fqrun/src/ya.make | 25 ++ ydb/tests/tools/fqrun/ya.make | 18 ++ ydb/tests/tools/kqprun/kqprun.cpp | 187 +----------- ydb/tests/tools/kqprun/runlib/application.cpp | 113 ++++++++ ydb/tests/tools/kqprun/runlib/application.h | 30 ++ ydb/tests/tools/kqprun/runlib/settings.h | 25 ++ ydb/tests/tools/kqprun/runlib/utils.cpp | 274 ++++++++++++++++++ ydb/tests/tools/kqprun/runlib/utils.h | 95 ++++++ ydb/tests/tools/kqprun/runlib/ya.make | 28 ++ ydb/tests/tools/kqprun/src/common.cpp | 29 -- ydb/tests/tools/kqprun/src/common.h | 24 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 163 +---------- ydb/tests/tools/kqprun/src/ya.make | 2 +- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 40 +-- ydb/tests/tools/kqprun/src/ydb_setup.h | 20 +- ydb/tests/tools/kqprun/ya.make | 1 + ydb/tests/tools/ya.make | 1 + 27 files changed, 1442 insertions(+), 441 deletions(-) create mode 100644 ydb/tests/tools/fqrun/.gitignore create mode 100644 ydb/tests/tools/fqrun/configuration/fq_config.conf create mode 100644 ydb/tests/tools/fqrun/fqprun.cpp create mode 100644 ydb/tests/tools/fqrun/src/common.h create mode 100644 ydb/tests/tools/fqrun/src/fq_runner.cpp create mode 100644 ydb/tests/tools/fqrun/src/fq_runner.h create mode 100644 ydb/tests/tools/fqrun/src/fq_setup.cpp create mode 100644 ydb/tests/tools/fqrun/src/fq_setup.h create mode 100644 ydb/tests/tools/fqrun/src/ya.make create mode 100644 ydb/tests/tools/fqrun/ya.make create mode 100644 ydb/tests/tools/kqprun/runlib/application.cpp create mode 100644 ydb/tests/tools/kqprun/runlib/application.h create mode 100644 ydb/tests/tools/kqprun/runlib/settings.h create mode 100644 ydb/tests/tools/kqprun/runlib/utils.cpp create mode 100644 ydb/tests/tools/kqprun/runlib/utils.h create mode 100644 ydb/tests/tools/kqprun/runlib/ya.make delete mode 100644 ydb/tests/tools/kqprun/src/common.cpp diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 1eb901280cb1..b47e27d5f5cd 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -707,7 +707,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1)); - if (Settings->EnableYq) { + if (Settings->EnableYq || Settings->EnableYqGrpc) { GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); } @@ -1364,13 +1364,15 @@ namespace Tests { Runtime->GetAppData(nodeIdx).Counters); const TActorId controllerActorId = Runtime->Register(controllerActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); Runtime->RegisterService(NMemory::MakeMemoryControllerId(0), controllerActorId, nodeIdx); - - auto statActor = NStat::CreateStatService(); - const TActorId statActorId = Runtime->Register(statActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).UserPoolId); - Runtime->RegisterService(NStat::MakeStatServiceID(Runtime->GetNodeId(nodeIdx)), statActorId, nodeIdx); } } + { + auto statActor = NStat::CreateStatService(); + const TActorId statActorId = Runtime->Register(statActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).UserPoolId); + Runtime->RegisterService(NStat::MakeStatServiceID(Runtime->GetNodeId(nodeIdx)), statActorId, nodeIdx); + } + { IActor* kesusService = NKesus::CreateKesusProxyService(); TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 0f1075bfa67b..1d04e48a1075 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -142,6 +142,7 @@ namespace Tests { bool UseRealThreads = true; bool EnableKqpSpilling = false; bool EnableYq = false; + bool EnableYqGrpc = false; TDuration KeepSnapshotTimeout = TDuration::Zero(); ui64 ChangesQueueItemsLimit = 0; ui64 ChangesQueueBytesLimit = 0; @@ -207,6 +208,7 @@ namespace Tests { TServerSettings& SetEnableDbCounters(bool value) { FeatureFlags.SetEnableDbCounters(value); return *this; } TServerSettings& SetEnablePersistentQueryStats(bool value) { FeatureFlags.SetEnablePersistentQueryStats(value); return *this; } TServerSettings& SetEnableYq(bool value) { EnableYq = value; return *this; } + TServerSettings& SetEnableYqGrpc(bool value) { EnableYqGrpc = value; return *this; } TServerSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; } TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; } TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; } diff --git a/ydb/tests/tools/fqrun/.gitignore b/ydb/tests/tools/fqrun/.gitignore new file mode 100644 index 000000000000..51aaf6608d57 --- /dev/null +++ b/ydb/tests/tools/fqrun/.gitignore @@ -0,0 +1,4 @@ +sync_dir + +*.log +*.sql diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf new file mode 100644 index 000000000000..8a980fa5daec --- /dev/null +++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf @@ -0,0 +1,141 @@ +Enabled: true +EnableDynamicNameservice: true +EnableTaskCounters: true + +CheckpointCoordinator { + Enabled: true + + Storage { + TablePrefix: "yq/checkpoints" + ClientTimeoutSec: 70 + OperationTimeoutSec: 60 + CancelAfterSec: 60 + } +} + +Common { + MdbGateway: "https://mdb.api.cloud.yandex.net:443" + ObjectStorageEndpoint: "https://storage-internal.cloud.yandex.net" + IdsPrefix: "kr" + QueryArtifactsCompressionMethod: "zstd_6" + MonitoringEndpoint: "monitoring.api.cloud.yandex.net" + KeepInternalErrors: true + UseNativeProtocolForClickHouse: true + DisableSslForGenericDataSources: true + ShowQueryTimeline: true +} + +ControlPlaneProxy { + Enabled: true +} + +ControlPlaneStorage { + Enabled: true + StatsMode: STATS_MODE_PROFILE + + AvailableConnection: "OBJECT_STORAGE" + AvailableConnection: "DATA_STREAMS" + AvailableConnection: "MONITORING" + AvailableConnection: "POSTGRESQL_CLUSTER" + AvailableConnection: "CLICKHOUSE_CLUSTER" + AvailableConnection: "YDB_DATABASE" + AvailableConnection: "GREENPLUM_CLUSTER" + AvailableConnection: "MYSQL_CLUSTER" + + AvailableStreamingConnection: "OBJECT_STORAGE" + AvailableStreamingConnection: "DATA_STREAMS" + AvailableStreamingConnection: "MONITORING" + AvailableStreamingConnection: "YDB_DATABASE" + + AvailableBinding: "OBJECT_STORAGE" + AvailableBinding: "DATA_STREAMS" + + Storage { + TablePrefix: "yq/control_plane" + ClientTimeoutSec: 70 + OperationTimeoutSec: 60 + CancelAfterSec: 60 + } +} + +DbPool { + Enabled: true + + Storage { + TablePrefix: "yq/db_pool" + ClientTimeoutSec: 70 + OperationTimeoutSec: 60 + CancelAfterSec: 60 + } +} + +NodesManager { + Enabled: true +} + +PendingFetcher { + Enabled: true +} + +PrivateApi { + Enabled: true +} + +PrivateProxy { + Enabled: true +} + +QuotasManager { + Enabled: true +} + +RateLimiter { + Enabled: true + ControlPlaneEnabled: true + DataPlaneEnabled: true + + Database { + TablePrefix: "yq/rate_limiter" + ClientTimeoutSec: 70 + OperationTimeoutSec: 60 + CancelAfterSec: 60 + } + + Limiters { + CoordinationNodePath: "limiter_alpha" + } +} + +ResourceManager { + Enabled: true +} + +RowDispatcher { + Enabled: true + SendStatusPeriodSec: 10 + TimeoutBeforeStartSessionSec: 10 + + CompileService { + ParallelCompilationLimit: 20 + } + + Coordinator { + CoordinationNodePath: "yq/row_dispatcher" + + Database { + TablePrefix: "yq/row_dispatcher" + ClientTimeoutSec: 70 + OperationTimeoutSec: 60 + CancelAfterSec: 60 + } + } + + JsonParser { + BatchSizeBytes: 1048576 + BatchCreationTimeoutMs: 1000 + } +} + +TestConnection { + Enabled: true +} diff --git a/ydb/tests/tools/fqrun/fqprun.cpp b/ydb/tests/tools/fqrun/fqprun.cpp new file mode 100644 index 000000000000..277aa8d2ed90 --- /dev/null +++ b/ydb/tests/tools/fqrun/fqprun.cpp @@ -0,0 +1,166 @@ +#include +#include + +#include + +#include +#include +#include + +using namespace NKikimrRun; + +namespace NFqRun { + +namespace { + +struct TExecutionOptions { + TString Query; + + bool HasResults() const { + return !Query.empty(); + } + + TRequestOptions GetQueryOptions() const { + return { + .Query = Query + }; + } + + void Validate(const TRunnerOptions& runnerOptions) const { + if (!Query && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) { + ythrow yexception() << "Nothing to execute and is not running as daemon"; + } + } +}; + +void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + if (executionOptions.Query) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl; + if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; + } + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl; + if (!runner.FetchQueryResults()) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed"; + } + } + + if (executionOptions.HasResults()) { + try { + runner.PrintQueryResults(); + } catch (...) { + ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage(); + } + } +} + +void RunAsDaemon() { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization finished" << colors.Default() << Endl; + while (true) { + Sleep(TDuration::Seconds(1)); + } +} + +void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& runnerOptions) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of fq runner..." << colors.Default() << Endl; + TFqRunner runner(runnerOptions); + + try { + RunArgumentQueries(executionOptions, runner); + } catch (const yexception& exception) { + if (runnerOptions.FqSettings.MonitoringEnabled) { + Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl; + } else { + throw exception; + } + } + + if (runnerOptions.FqSettings.MonitoringEnabled || runnerOptions.FqSettings.GrpcEnabled) { + RunAsDaemon(); + } + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of fq runner..." << colors.Default() << Endl; +} + +class TMain : public TMainBase { +protected: + void RegisterOptions(NLastGetopt::TOpts& options) override { + options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy"); + options.AddHelpOption('h'); + options.SetFreeArgsNum(0); + + // Inputs + + options.AddLongOption('p', "query", "Query to execute") + .RequiredArgument("file") + .StoreMappedResult(&ExecutionOptions.Query, &LoadFile); + + options.AddLongOption("fq-cfg", "File with FQ config (NFq::NConfig::TConfig for FQ proxy)") + .RequiredArgument("file") + .DefaultValue("./configuration/fq_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(TString(option->CurValOrDef())), &RunnerOptions.FqSettings.FqConfig)) { + ythrow yexception() << "Bad format of FQ configuration"; + } + }); + + // Outputs + + options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)") + .RequiredArgument("file") + .DefaultValue("-") + .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); + + TChoices resultFormat({ + {"rows", EResultOutputFormat::RowsJson}, + {"full-json", EResultOutputFormat::FullJson}, + {"full-proto", EResultOutputFormat::FullProto} + }); + options.AddLongOption('R', "result-format", "Query result format") + .RequiredArgument("result-format") + .DefaultValue("rows") + .Choices(resultFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.ResultOutputFormat, resultFormat); + + RegisterKikimrOptions(options, RunnerOptions.FqSettings); + } + + int DoRun(NLastGetopt::TOptsParseResult&&) override { + ExecutionOptions.Validate(RunnerOptions); + + auto& logConfig = RunnerOptions.FqSettings.LogConfig; + logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT); + FillLogConfig(logConfig); + + RunScript(ExecutionOptions, RunnerOptions); + + return 0; + } + +private: + TExecutionOptions ExecutionOptions; + TRunnerOptions RunnerOptions; +}; + +} // anonymous namespace + +} // namespace NFqRun + +int main(int argc, const char* argv[]) { + SetupSignalActions(); + + try { + NFqRun::TMain().Run(argc, argv); + } catch (...) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl; + return 1; + } +} diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h new file mode 100644 index 000000000000..5af4b019dbc3 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/common.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +#include +#include +#include + +namespace NFqRun { + +constexpr i64 MAX_RESULT_SET_ROWS = 1000; + +struct TFqSetupSettings : public NKikimrRun::TServerSettings { + NFq::NConfig::TConfig FqConfig; + NKikimrConfig::TLogConfig LogConfig; +}; + +struct TRunnerOptions { + IOutputStream* ResultOutput = nullptr; + NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson; + + TFqSetupSettings FqSettings; +}; + +struct TRequestOptions { + TString Query; +}; + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp new file mode 100644 index 000000000000..6fe51cfb50f9 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp @@ -0,0 +1,136 @@ +#include "fq_runner.h" +#include "fq_setup.h" + +#include + +using namespace NKikimrRun; + +namespace NFqRun { + +class TFqRunner::TImpl { + static constexpr TDuration REFRESH_PERIOD = TDuration::Seconds(1); + +public: + explicit TImpl(const TRunnerOptions& options) + : Options(options) + , FqSetup(options.FqSettings) + , CerrColors(NColorizer::AutoColors(Cerr)) + , CoutColors(NColorizer::AutoColors(Cout)) + {} + + bool ExecuteStreamQuery(const TRequestOptions& query) { + const TRequestResult status = FqSetup.StreamRequest(query, StreamQueryId); + + if (!status.IsSuccess()) { + Cerr << CerrColors.Red() << "Failed to start stream request execution, reason:" << CerrColors.Default() << Endl << status.ToString() << Endl; + return false; + } + + return WaitStreamQuery(); + } + + bool FetchQueryResults() { + ResultSets.clear(); + ResultSets.resize(ExecutionMeta.ResultSetSizes.size()); + for (i32 resultSetId = 0; resultSetId < static_cast(ExecutionMeta.ResultSetSizes.size()); ++resultSetId) { + const auto rowsCount = ExecutionMeta.ResultSetSizes[resultSetId]; + if (rowsCount > MAX_RESULT_SET_ROWS) { + Cerr << CerrColors.Red() << "Result set with id " << resultSetId << " have " << rowsCount << " rows, it is larger than allowed limit " << MAX_RESULT_SET_ROWS << ", results will be truncated" << CerrColors.Default() << Endl; + } + + const TRequestResult status = FqSetup.FetchQueryResults(StreamQueryId, resultSetId, ResultSets[resultSetId]); + if (!status.IsSuccess()) { + Cerr << CerrColors.Red() << "Failed to fetch result set with id " << resultSetId << ", reason:" << CerrColors.Default() << Endl << status.ToString() << Endl; + return false; + } + } + + return true; + } + + void PrintQueryResults() { + if (Options.ResultOutput) { + Cout << CoutColors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing query results..." << CoutColors.Default() << Endl; + for (size_t i = 0; i < ResultSets.size(); ++i) { + if (ResultSets.size() > 1) { + *Options.ResultOutput << CoutColors.Cyan() << "Result set " << i + 1 << ":" << CoutColors.Default() << Endl; + } + PrintResultSet(Options.ResultOutputFormat, *Options.ResultOutput, ResultSets[i]); + } + } + } + +private: + static bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) { + using EStatus = FederatedQuery::QueryMeta; + return IsIn({EStatus::FAILED, EStatus::COMPLETED, EStatus::ABORTED_BY_USER, EStatus::ABORTED_BY_SYSTEM}, status); + } + + bool WaitStreamQuery() { + StartTime = TInstant::Now(); + + while (true) { + const TRequestResult status = FqSetup.DescribeQuery(StreamQueryId, ExecutionMeta); + + if (IsFinalStatus(ExecutionMeta.Status)) { + break; + } + + if (!status.IsSuccess()) { + Cerr << CerrColors.Red() << "Failed to describe query, reason:" << CerrColors.Default() << Endl << status.ToString() << Endl; + return false; + } + + Sleep(REFRESH_PERIOD); + } + + Cout << CoutColors.Cyan() << "Query finished. Duration: " << TInstant::Now() - StartTime << CoutColors.Default() << Endl; + + if (ExecutionMeta.Status != FederatedQuery::QueryMeta::COMPLETED) { + Cerr << CerrColors.Red() << "Failed to execute query, invalid final status " << FederatedQuery::QueryMeta::ComputeStatus_Name(ExecutionMeta.Status) << ", issues:" << CerrColors.Default() << Endl << ExecutionMeta.Issues.ToString() << Endl; + if (ExecutionMeta.TransientIssues) { + Cerr << CerrColors.Red() << "Transient issues:" << CerrColors.Default() << Endl << ExecutionMeta.TransientIssues.ToString() << Endl; + } + return false; + } + + if (ExecutionMeta.Issues) { + Cerr << CerrColors.Red() << "Query finished with issues:" << CerrColors.Default() << Endl << ExecutionMeta.Issues.ToString() << Endl; + } + + if (ExecutionMeta.TransientIssues) { + Cerr << CerrColors.Red() << "Query finished with transient issues:" << CerrColors.Default() << Endl << ExecutionMeta.TransientIssues.ToString() << Endl; + } + + return true; + } + +private: + const TRunnerOptions Options; + const TFqSetup FqSetup; + const NColorizer::TColors CerrColors; + const NColorizer::TColors CoutColors; + + TString StreamQueryId; + TInstant StartTime; + TExecutionMeta ExecutionMeta; + std::vector ResultSets; +}; + +TFqRunner::TFqRunner(const TRunnerOptions& options) + : Impl(new TImpl(options)) +{} + +bool TFqRunner::ExecuteStreamQuery(const TRequestOptions& query) const { + return Impl->ExecuteStreamQuery(query); +} + +bool TFqRunner::FetchQueryResults() const { + return Impl->FetchQueryResults(); +} + +void TFqRunner::PrintQueryResults() const { + Impl->PrintQueryResults(); +} + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.h b/ydb/tests/tools/fqrun/src/fq_runner.h new file mode 100644 index 000000000000..7b803648dcf9 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/fq_runner.h @@ -0,0 +1,24 @@ +#pragma once + +#include "common.h" + +#include + +namespace NFqRun { + +class TFqRunner { +public: + explicit TFqRunner(const TRunnerOptions& options); + + bool ExecuteStreamQuery(const TRequestOptions& query) const; + + bool FetchQueryResults() const; + + void PrintQueryResults() const; + +private: + class TImpl; + std::shared_ptr Impl; +}; + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp new file mode 100644 index 000000000000..6b16b4f5af15 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -0,0 +1,261 @@ +#include "fq_setup.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace NKikimrRun; + +namespace NFqRun { + +namespace { + +Ydb::StatusIds::StatusCode GetStatus(const NYql::TIssues& issues) { + return issues ? Ydb::StatusIds::BAD_REQUEST : Ydb::StatusIds::SUCCESS; +} + +} // anonymous namespace + +class TFqSetup::TImpl { +private: + TAutoPtr CreateLogBackend() const { + if (Settings.LogOutputFile) { + return NActors::CreateFileBackend(Settings.LogOutputFile); + } else { + return NActors::CreateStderrBackend(); + } + } + + void SetLoggerSettings(NKikimr::Tests::TServerSettings& serverSettings) const { + auto loggerInitializer = [this](NActors::TTestActorRuntime& runtime) { + InitLogSettings(Settings.LogConfig, runtime); + runtime.SetLogBackendFactory([this]() { return CreateLogBackend(); }); + }; + + serverSettings.SetLoggerInitializer(loggerInitializer); + } + + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { + NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort()); + + serverSettings.SetDomainName(Settings.DomainName); + serverSettings.SetVerbose(false); + + NKikimrConfig::TAppConfig config; + *config.MutableLogConfig() = Settings.LogConfig; + serverSettings.SetAppConfig(config); + + SetLoggerSettings(serverSettings); + + if (Settings.MonitoringEnabled) { + serverSettings.InitKikimrRunConfig(); + serverSettings.SetMonitoringPortOffset(Settings.MonitoringPortOffset, true); + serverSettings.SetNeedStatsCollectors(true); + } + + serverSettings.SetGrpcPort(grpcPort); + serverSettings.SetEnableYqGrpc(true); + + return serverSettings; + } + + void InitializeServer(ui32 grpcPort) { + const auto& serverSettings = GetServerSettings(grpcPort); + + Server = MakeIntrusive(serverSettings); + Server->GetRuntime()->SetDispatchTimeout(TDuration::Max()); + + Server->EnableGRpc(NYdbGrpc::TServerOptions() + .SetHost("localhost") + .SetPort(grpcPort) + .SetLogger(NYdbGrpc::CreateActorSystemLogger(*GetRuntime()->GetActorSystem(0), NKikimrServices::GRPC_SERVER)) + .SetGRpcShutdownDeadline(TDuration::Zero()) + ); + + Client = std::make_unique(serverSettings); + Client->InitRootScheme(); + } + + NFq::NConfig::TConfig GetFqProxyConfig(ui32 grpcPort, ui32 httpPort) const { + auto fqConfig = Settings.FqConfig; + + fqConfig.MutableControlPlaneStorage()->AddSuperUsers(BUILTIN_ACL_ROOT); + fqConfig.MutablePrivateProxy()->AddGrantedUsers(BUILTIN_ACL_ROOT); + + const TString endpoint = TStringBuilder() << "localhost:" << grpcPort; + const TString database = NKikimr::CanonizePath(Settings.DomainName); + const auto fillStorageConfig = [endpoint, database](NFq::NConfig::TYdbStorageConfig* config) { + config->SetEndpoint(endpoint); + config->SetDatabase(database); + }; + fillStorageConfig(fqConfig.MutableControlPlaneStorage()->MutableStorage()); + fillStorageConfig(fqConfig.MutableDbPool()->MutableStorage()); + fillStorageConfig(fqConfig.MutableCheckpointCoordinator()->MutableStorage()); + fillStorageConfig(fqConfig.MutableRateLimiter()->MutableDatabase()); + fillStorageConfig(fqConfig.MutableRowDispatcher()->MutableCoordinator()->MutableDatabase()); + + auto* privateApiConfig = fqConfig.MutablePrivateApi(); + privateApiConfig->SetTaskServiceEndpoint(endpoint); + privateApiConfig->SetTaskServiceDatabase(database); + + auto* nodesMenagerConfig = fqConfig.MutableNodesManager(); + nodesMenagerConfig->SetPort(grpcPort); + nodesMenagerConfig->SetHost("localhost"); + + fqConfig.MutableCommon()->SetYdbMvpCloudEndpoint(TStringBuilder() << "http://localhost:" << httpPort << "/yql-mock/abc"); + + return fqConfig; + } + + void InitializeFqProxy(ui32 grpcPort) { + const ui32 httpPort = PortManager.GetPort(); + const auto& fqConfig = GetFqProxyConfig(grpcPort, httpPort); + const auto counters = GetRuntime()->GetAppData().Counters->GetSubgroup("counters", "yq"); + YqSharedResources = NFq::CreateYqSharedResources(fqConfig, NKikimr::CreateYdbCredentialsProviderFactory, counters); + + const auto actorRegistrator = [runtime = GetRuntime()](NActors::TActorId serviceActorId, NActors::IActor* actor) { + auto actorId = runtime->Register(actor, 0, runtime->GetAppData().UserPoolId); + runtime->RegisterService(serviceActorId, actorId); + }; + + const auto folderServiceFactory = [](auto& config) { + return NKikimr::NFolderService::CreateMockFolderServiceAdapterActor(config, ""); + }; + + NFq::Init( + fqConfig, GetRuntime()->GetNodeId(), actorRegistrator, &GetRuntime()->GetAppData(), + Settings.DomainName, nullptr, YqSharedResources, folderServiceFactory, 0, {} + ); + + NFq::InitTest(GetRuntime(), httpPort, grpcPort, YqSharedResources); + } + +public: + explicit TImpl(const TFqSetupSettings& settings) + : Settings(settings) + { + const ui32 grpcPort = Settings.GrpcPort ? Settings.GrpcPort : PortManager.GetPort(); + InitializeServer(grpcPort); + InitializeFqProxy(grpcPort); + + if (Settings.MonitoringEnabled) { + Cout << CoutColors.Cyan() << "Monitoring port: " << CoutColors.Default() << GetRuntime()->GetMonPort() << Endl; + } + + if (Settings.GrpcEnabled) { + Cout << CoutColors.Cyan() << "Domain gRPC port: " << CoutColors.Default() << grpcPort << Endl; + } + } + + ~TImpl() { + if (YqSharedResources) { + YqSharedResources->Stop(); + } + } + + NFq::TEvControlPlaneProxy::TEvCreateQueryResponse::TPtr StreamRequest(const TRequestOptions& query) const { + FederatedQuery::CreateQueryRequest request; + request.set_execute_mode(FederatedQuery::ExecuteMode::RUN); + + auto& content = *request.mutable_content(); + content.set_type(FederatedQuery::QueryContent::STREAMING); + content.set_text(query.Query); + content.mutable_acl()->set_visibility(::FederatedQuery::Acl::SCOPE); + + return RunControlPlaneProxyRequest(request); + } + + NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse::TPtr DescribeQuery(const TString& queryId) const { + FederatedQuery::DescribeQueryRequest request; + request.set_query_id(queryId); + + return RunControlPlaneProxyRequest(request); + } + + NFq::TEvControlPlaneProxy::TEvGetResultDataResponse::TPtr FetchQueryResults(const TString& queryId, i32 resultSetId) const { + FederatedQuery::GetResultDataRequest request; + request.set_query_id(queryId); + request.set_result_set_index(resultSetId); + request.set_limit(MAX_RESULT_SET_ROWS); + + return RunControlPlaneProxyRequest(request); + } + +private: + NActors::TTestActorRuntime* GetRuntime() const { + return Server->GetRuntime(); + } + + template + typename TResponse::TPtr RunControlPlaneProxyRequest(const TProto& request) const { + auto event = std::make_unique("yandexcloud://kqprun", request, BUILTIN_ACL_ROOT, BUILTIN_ACL_ROOT, TVector{}); + return RunControlPlaneProxyRequest(std::move(event)); + } + + template + typename TResponse::TPtr RunControlPlaneProxyRequest(std::unique_ptr event) const { + NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); + NActors::TActorId controlPlaneProxy = NFq::ControlPlaneProxyActorId(); + + GetRuntime()->Send(controlPlaneProxy, edgeActor, event.release()); + + return GetRuntime()->GrabEdgeEvent(edgeActor); + } + +private: + const TFqSetupSettings Settings; + const NColorizer::TColors CoutColors; + + NKikimr::Tests::TServer::TPtr Server; + std::unique_ptr Client; + NFq::IYqSharedResources::TPtr YqSharedResources; + TPortManager PortManager; +}; + +TFqSetup::TFqSetup(const TFqSetupSettings& settings) + : Impl(new TImpl(settings)) +{} + +TRequestResult TFqSetup::StreamRequest(const TRequestOptions& query, TString& queryId) const { + const auto response = Impl->StreamRequest(query); + + queryId = response->Get()->Result.query_id(); + + const auto& issues = response->Get()->Issues; + return TRequestResult(GetStatus(issues), issues); +} + +TRequestResult TFqSetup::DescribeQuery(const TString& queryId, TExecutionMeta& meta) const { + const auto response = Impl->DescribeQuery(queryId); + + const auto& result = response->Get()->Result.query(); + meta.Status = result.meta().status(); + NYql::IssuesFromMessage(result.issue(), meta.Issues); + NYql::IssuesFromMessage(result.transient_issue(), meta.TransientIssues); + + meta.ResultSetSizes.clear(); + for (const auto& resultMeta : result.result_set_meta()) { + meta.ResultSetSizes.emplace_back(resultMeta.rows_count()); + } + + const auto& issues = response->Get()->Issues; + return TRequestResult(GetStatus(issues), issues); +} + +TRequestResult TFqSetup::FetchQueryResults(const TString& queryId, i32 resultSetId, Ydb::ResultSet& resultSet) const { + const auto response = Impl->FetchQueryResults(queryId, resultSetId); + + resultSet = response->Get()->Result.result_set(); + + const auto& issues = response->Get()->Issues; + return TRequestResult(GetStatus(issues), issues); +} + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h new file mode 100644 index 000000000000..c78fad43e2dd --- /dev/null +++ b/ydb/tests/tools/fqrun/src/fq_setup.h @@ -0,0 +1,33 @@ +#pragma once + +#include "common.h" + +#include + +namespace NFqRun { + +struct TExecutionMeta { + FederatedQuery::QueryMeta::ComputeStatus Status; + NYql::TIssues Issues; + NYql::TIssues TransientIssues; + std::vector ResultSetSizes; +}; + +class TFqSetup { + using TRequestResult = NKikimrRun::TRequestResult; + +public: + explicit TFqSetup(const TFqSetupSettings& settings); + + TRequestResult StreamRequest(const TRequestOptions& query, TString& queryId) const; + + TRequestResult DescribeQuery(const TString& queryId, TExecutionMeta& meta) const; + + TRequestResult FetchQueryResults(const TString& queryId, i32 resultSetId, Ydb::ResultSet& resultSet) const; + +private: + class TImpl; + std::shared_ptr Impl; +}; + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make new file mode 100644 index 000000000000..5173c30f4be2 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +SRCS( + fq_runner.cpp + fq_setup.cpp +) + +PEERDIR( + library/cpp/colorizer + library/cpp/testing/unittest + util + ydb/core/fq/libs/config/protos + ydb/core/fq/libs/control_plane_proxy/events + ydb/core/fq/libs/init + ydb/core/fq/libs/mock + ydb/core/testlib + ydb/library/folder_service/mock + ydb/library/grpc/server/actors + ydb/library/security + ydb/tests/tools/kqprun/runlib +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/tests/tools/fqrun/ya.make b/ydb/tests/tools/fqrun/ya.make new file mode 100644 index 000000000000..fa7212f0f8bc --- /dev/null +++ b/ydb/tests/tools/fqrun/ya.make @@ -0,0 +1,18 @@ +PROGRAM(fqprun) + +SRCS( + fqprun.cpp +) + +PEERDIR( + library/cpp/colorizer + util + ydb/tests/tools/fqrun/src + ydb/tests/tools/kqprun/runlib + yql/essentials/parser/pg_wrapper + yql/essentials/sql/pg +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 1ddbd4adc78d..4505610ce417 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -1,5 +1,3 @@ -#include "src/kqp_runner.h" - #include #include @@ -9,11 +7,12 @@ #include #include -#include #include - #include #include +#include +#include +#include #include #include @@ -26,6 +25,7 @@ #include #endif +using namespace NKikimrRun; namespace NKqpRun { @@ -423,9 +423,8 @@ TIntrusivePtr CreateFunctionRegistr } -class TMain : public TMainClassArgs { +class TMain : public TMainBase { inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); - inline static std::vector> FileHolders; inline static IOutputStream* ProfileAllocationsOutput = nullptr; inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); @@ -439,52 +438,6 @@ class TMain : public TMainClassArgs { bool ExcludeLinkedUdfs = false; bool EmulateYt = false; - std::optional DefaultLogPriority; - std::unordered_map LogPriorities; - - static TString LoadFile(const TString& file) { - return TFileInput(file).ReadAll(); - } - - static IOutputStream* GetDefaultOutput(const TString& file) { - if (file == "-") { - return &Cout; - } - if (file) { - FileHolders.emplace_back(new TFileOutput(file)); - return FileHolders.back().get(); - } - return nullptr; - } - - template - class TChoices { - public: - explicit TChoices(std::map choicesMap) - : ChoicesMap(std::move(choicesMap)) - {} - - TResult operator()(const TString& choice) const { - return ChoicesMap.at(choice); - } - - TVector GetChoices() const { - TVector choices; - choices.reserve(ChoicesMap.size()); - for (const auto& [choice, _] : ChoicesMap) { - choices.emplace_back(choice); - } - return choices; - } - - bool Contains(const TString& choice) const { - return ChoicesMap.contains(choice); - } - - private: - const std::map ChoicesMap; - }; - #ifdef PROFILE_MEMORY_ALLOCATIONS public: static void FinishProfileMemoryAllocations() { @@ -590,51 +543,6 @@ class TMain : public TMainClassArgs { // Outputs - options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") - .RequiredArgument("file") - .StoreResult(&RunnerOptions.YdbSettings.LogOutputFile) - .Handler1([](const NLastGetopt::TOptsParser* option) { - if (const TString& file = option->CurVal()) { - std::remove(file.c_str()); - } - }); - - TChoices logPriority({ - {"emerg", NActors::NLog::EPriority::PRI_EMERG}, - {"alert", NActors::NLog::EPriority::PRI_ALERT}, - {"crit", NActors::NLog::EPriority::PRI_CRIT}, - {"error", NActors::NLog::EPriority::PRI_ERROR}, - {"warn", NActors::NLog::EPriority::PRI_WARN}, - {"notice", NActors::NLog::EPriority::PRI_NOTICE}, - {"info", NActors::NLog::EPriority::PRI_INFO}, - {"debug", NActors::NLog::EPriority::PRI_DEBUG}, - {"trace", NActors::NLog::EPriority::PRI_TRACE}, - }); - options.AddLongOption("log-default", "Default log priority") - .RequiredArgument("priority") - .Choices(logPriority.GetChoices()) - .StoreMappedResultT(&DefaultLogPriority, logPriority); - - options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") - .RequiredArgument("component priority") - .Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) { - TStringBuf component; - TStringBuf priority; - TStringBuf(option->CurVal()).Split('=', component, priority); - if (component.empty() || priority.empty()) { - ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; - } - - if (!logPriority.Contains(TString(priority))) { - ythrow yexception() << "Incorrect log priority: " << priority; - } - - const auto service = GetLogService(TString(component)); - if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { - ythrow yexception() << "Got duplicated log service name: " << component; - } - }); - TChoices traceOpt({ {"all", TRunnerOptions::ETraceOptType::All}, {"scheme", TRunnerOptions::ETraceOptType::Scheme}, @@ -669,10 +577,10 @@ class TMain : public TMainClassArgs { .DefaultValue(0) .StoreResult(&ExecutionOptions.ResultsRowsLimit); - TChoices resultFormat({ - {"rows", TRunnerOptions::EResultOutputFormat::RowsJson}, - {"full-json", TRunnerOptions::EResultOutputFormat::FullJson}, - {"full-proto", TRunnerOptions::EResultOutputFormat::FullProto} + TChoices resultFormat({ + {"rows", EResultOutputFormat::RowsJson}, + {"full-json", EResultOutputFormat::FullJson}, + {"full-proto", EResultOutputFormat::FullProto} }); options.AddLongOption('R', "result-format", "Script query result format") .RequiredArgument("result-format") @@ -837,24 +745,6 @@ class TMain : public TMainClassArgs { return nodeCount; }); - options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be run as daemon") - .RequiredArgument("uint") - .Handler1([this](const NLastGetopt::TOptsParser* option) { - if (const TString& port = option->CurVal()) { - RunnerOptions.YdbSettings.MonitoringEnabled = true; - RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port); - } - }); - - options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be run as daemon") - .RequiredArgument("uint") - .Handler1([this](const NLastGetopt::TOptsParser* option) { - if (const TString& port = option->CurVal()) { - RunnerOptions.YdbSettings.GrpcEnabled = true; - RunnerOptions.YdbSettings.GrpcPort = FromString(port); - } - }); - options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)") .NoArgument() .SetFlag(&EmulateYt); @@ -871,11 +761,6 @@ class TMain : public TMainClassArgs { .DefaultValue(10) .StoreMappedResultT(&RunnerOptions.YdbSettings.HealthCheckTimeout, &TDuration::Seconds); - options.AddLongOption("domain", "Test cluster domain name") - .RequiredArgument("name") - .DefaultValue(RunnerOptions.YdbSettings.DomainName) - .StoreResult(&RunnerOptions.YdbSettings.DomainName); - const auto addTenant = [this](const TString& type, TStorageMeta::TTenant::EType protoType, const NLastGetopt::TOptsParser* option) { TStringBuf tenant; TStringBuf nodesCountStr; @@ -939,18 +824,7 @@ class TMain : public TMainClassArgs { .NoArgument() .SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock); - TChoices> backtrace({ - {"heavy", &NKikimr::EnableYDBBacktraceFormat}, - {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} - }); - options.AddLongOption("backtrace", "Default backtrace format function") - .RequiredArgument("backtrace-type") - .DefaultValue("heavy") - .Choices(backtrace.GetChoices()) - .Handler1([backtrace](const NLastGetopt::TOptsParser* option) { - TString choice(option->CurValOrDef()); - backtrace(choice)(); - }); + RegisterKikimrOptions(options, RunnerOptions.YdbSettings); } int DoRun(NLastGetopt::TOptsParseResult&&) override { @@ -969,10 +843,7 @@ class TMain : public TMainClassArgs { appConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); } - if (DefaultLogPriority) { - appConfig.MutableLogConfig()->SetDefaultLevel(*DefaultLogPriority); - } - ModifyLogPriorities(LogPriorities, *appConfig.MutableLogConfig()); + FillLogConfig(*appConfig.MutableLogConfig()); if (EmulateYt) { const auto& fileStorageConfig = appConfig.GetQueryServiceConfig().GetFileStorage(); @@ -1023,38 +894,6 @@ class TMain : public TMainClassArgs { } }; - -void KqprunTerminateHandler() { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cerr << colors.Red() << "======= terminate() call stack ========" << colors.Default() << Endl; - FormatBackTrace(&Cerr); - Cerr << colors.Red() << "=======================================" << colors.Default() << Endl; - - abort(); -} - - -void SegmentationFaultHandler(int) { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cerr << colors.Red() << "======= segmentation fault call stack ========" << colors.Default() << Endl; - FormatBackTrace(&Cerr); - Cerr << colors.Red() << "==============================================" << colors.Default() << Endl; - - abort(); -} - -void FloatingPointExceptionHandler(int) { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cerr << colors.Red() << "======= floating point exception call stack ========" << colors.Default() << Endl; - FormatBackTrace(&Cerr); - Cerr << colors.Red() << "====================================================" << colors.Default() << Endl; - - abort(); -} - #ifdef PROFILE_MEMORY_ALLOCATIONS void InterruptHandler(int) { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); @@ -1071,9 +910,7 @@ void InterruptHandler(int) { } // namespace NKqpRun int main(int argc, const char* argv[]) { - std::set_terminate(NKqpRun::KqprunTerminateHandler); - signal(SIGSEGV, &NKqpRun::SegmentationFaultHandler); - signal(SIGFPE, &NKqpRun::FloatingPointExceptionHandler); + SetupSignalActions(); #ifdef PROFILE_MEMORY_ALLOCATIONS signal(SIGINT, &NKqpRun::InterruptHandler); diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp new file mode 100644 index 000000000000..9a35d584f19e --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/application.cpp @@ -0,0 +1,113 @@ +#include "application.h" +#include "utils.h" + +#include + +#include + +#include + +namespace NKikimrRun { + +void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings) { + options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") + .RequiredArgument("file") + .StoreResult(&settings.LogOutputFile) + .Handler1([](const NLastGetopt::TOptsParser* option) { + if (const TString& file = option->CurVal()) { + std::remove(file.c_str()); + } + }); + + TChoices logPriority({ + {"emerg", NActors::NLog::EPriority::PRI_EMERG}, + {"alert", NActors::NLog::EPriority::PRI_ALERT}, + {"crit", NActors::NLog::EPriority::PRI_CRIT}, + {"error", NActors::NLog::EPriority::PRI_ERROR}, + {"warn", NActors::NLog::EPriority::PRI_WARN}, + {"notice", NActors::NLog::EPriority::PRI_NOTICE}, + {"info", NActors::NLog::EPriority::PRI_INFO}, + {"debug", NActors::NLog::EPriority::PRI_DEBUG}, + {"trace", NActors::NLog::EPriority::PRI_TRACE}, + }); + options.AddLongOption("log-default", "Default log priority") + .RequiredArgument("priority") + .Choices(logPriority.GetChoices()) + .StoreMappedResultT(&DefaultLogPriority, logPriority); + + options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") + .RequiredArgument("component priority") + .Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) { + TStringBuf component; + TStringBuf priority; + TStringBuf(option->CurVal()).Split('=', component, priority); + if (component.empty() || priority.empty()) { + ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; + } + + if (!logPriority.Contains(TString(priority))) { + ythrow yexception() << "Incorrect log priority: " << priority; + } + + const auto service = GetLogService(TString(component)); + if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) { + ythrow yexception() << "Got duplicated log service name: " << component; + } + }); + + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used will be run as daemon") + .RequiredArgument("uint") + .Handler1([&settings](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + settings.MonitoringEnabled = true; + settings.MonitoringPortOffset = FromString(port); + } + }); + + options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used will be run as daemon") + .RequiredArgument("uint") + .Handler1([&settings](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + settings.GrpcEnabled = true; + settings.GrpcPort = FromString(port); + } + }); + + options.AddLongOption("domain", "Test cluster domain name") + .RequiredArgument("name") + .DefaultValue(settings.DomainName) + .StoreResult(&settings.DomainName); + + TChoices> backtrace({ + {"heavy", &NKikimr::EnableYDBBacktraceFormat}, + {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} + }); + options.AddLongOption("backtrace", "Default backtrace format function") + .RequiredArgument("backtrace-type") + .DefaultValue("heavy") + .Choices(backtrace.GetChoices()) + .Handler1([backtrace](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + backtrace(choice)(); + }); +} + +void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const { + if (DefaultLogPriority) { + config.SetDefaultLevel(*DefaultLogPriority); + } + ModifyLogPriorities(LogPriorities, config); +} + +IOutputStream* TMainBase::GetDefaultOutput(const TString& file) { + if (file == "-") { + return &Cout; + } + if (file) { + FileHolders.emplace_back(new TFileOutput(file)); + return FileHolders.back().get(); + } + return nullptr; +} + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h new file mode 100644 index 000000000000..6004d30fcb7d --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -0,0 +1,30 @@ +#pragma once + +#include "settings.h" + +#include + +#include + +#include +#include +#include + +namespace NKikimrRun { + +class TMainBase : public TMainClassArgs { +protected: + void RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings); + + void FillLogConfig(NKikimrConfig::TLogConfig& config) const; + + static IOutputStream* GetDefaultOutput(const TString& file); + +private: + inline static std::vector> FileHolders; + + std::optional DefaultLogPriority; + std::unordered_map LogPriorities; +}; + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/settings.h b/ydb/tests/tools/kqprun/runlib/settings.h new file mode 100644 index 000000000000..c77caf1876f2 --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/settings.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace NKikimrRun { + +struct TServerSettings { + TString DomainName = "Root"; + + bool MonitoringEnabled = false; + ui16 MonitoringPortOffset = 0; + + bool GrpcEnabled = false; + ui16 GrpcPort = 0; + + TString LogOutputFile; +}; + +enum class EResultOutputFormat { + RowsJson, // Rows in json format + FullJson, // Columns, rows and types in json format + FullProto, // Columns, rows and types in proto string format +}; + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp new file mode 100644 index 000000000000..208cf4daf045 --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -0,0 +1,274 @@ +#include "utils.h" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace NKikimrRun { + +namespace { + +void TerminateHandler() { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= terminate() call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "=======================================" << colors.Default() << Endl; + + abort(); +} + + +void SegmentationFaultHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= segmentation fault call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "==============================================" << colors.Default() << Endl; + + abort(); +} + +void FloatingPointExceptionHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= floating point exception call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "====================================================" << colors.Default() << Endl; + + abort(); +} + +} // nonymous namespace + + +TRequestResult::TRequestResult() + : Status(Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) +{} + +TRequestResult::TRequestResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) + : Status(status) + , Issues(issues) +{} + +TRequestResult::TRequestResult(Ydb::StatusIds::StatusCode status, const google::protobuf::RepeatedPtrField& issues) + : Status(status) +{ + NYql::IssuesFromMessage(issues, Issues); +} + +bool TRequestResult::IsSuccess() const { + return Status == Ydb::StatusIds::SUCCESS; +} + +TString TRequestResult::ToString() const { + return TStringBuilder() << "Request finished with status: " << Status << "\nIssues:\n" << Issues.ToString() << "\n"; +} + +TStatsPrinter::TStatsPrinter(NYdb::NConsoleClient::EDataFormat planFormat) + : PlanFormat(planFormat) + , StatProcessor(NFq::CreateStatProcessor("stat_full")) +{} + +void TStatsPrinter::PrintPlan(const TString& plan, IOutputStream& output) const { + if (!plan) { + return; + } + + NJson::TJsonValue planJson; + NJson::ReadJsonTree(plan, &planJson, true); + if (!planJson.GetMapSafe().contains("meta")) { + return; + } + + NYdb::NConsoleClient::TQueryPlanPrinter printer(PlanFormat, true, output); + printer.Print(plan); +} + +void TStatsPrinter::PrintInProgressStatistics(const TString& plan, IOutputStream& output) const { + output << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; + + auto convertedPlan = plan; + try { + convertedPlan = StatProcessor->ConvertPlan(plan); + } catch (const NJson::TJsonException& ex) { + output << "Error plan conversion: " << ex.what() << Endl; + return; + } + + try { + double cpuUsage = 0.0; + auto fullStat = StatProcessor->GetQueryStat(convertedPlan, cpuUsage, nullptr); + auto flatStat = StatProcessor->GetFlatStat(convertedPlan); + auto publicStat = StatProcessor->GetPublicStat(fullStat); + + output << "\nCPU usage: " << cpuUsage << Endl; + PrintStatistics(fullStat, flatStat, publicStat, output); + } catch (const NJson::TJsonException& ex) { + output << "Error stat conversion: " << ex.what() << Endl; + return; + } + + output << "\nPlan visualization:" << Endl; + PrintPlan(convertedPlan, output); +} + +void TStatsPrinter::PrintTimeline(const TString& plan, IOutputStream& output) { + TPlanVisualizer planVisualizer; + planVisualizer.LoadPlans(plan); + output.Write(planVisualizer.PrintSvg()); +} + +void TStatsPrinter::PrintStatistics(const TString& fullStat, const THashMap& flatStat, const NFq::TPublicStat& publicStat, IOutputStream& output) { + output << "\nFlat statistics:" << Endl; + for (const auto& [propery, value] : flatStat) { + TString valueString = ToString(value); + if (propery.Contains("Bytes")) { + valueString = NKikimr::NBlobDepot::FormatByteSize(value); + } else if (propery.Contains("TimeUs")) { + valueString = NFq::FormatDurationUs(value); + } else if (propery.Contains("TimeMs")) { + valueString = NFq::FormatDurationMs(value); + } else { + valueString = FormatNumber(value); + } + output << propery << " = " << valueString << Endl; + } + + output << "\nPublic statistics:" << Endl; + if (auto memoryUsageBytes = publicStat.MemoryUsageBytes) { + output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl; + } + if (auto cpuUsageUs = publicStat.CpuUsageUs) { + output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl; + } + if (auto inputBytes = publicStat.InputBytes) { + output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl; + } + if (auto outputBytes = publicStat.OutputBytes) { + output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl; + } + if (auto sourceInputRecords = publicStat.SourceInputRecords) { + output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl; + } + if (auto sinkOutputRecords = publicStat.SinkOutputRecords) { + output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl; + } + if (auto runningTasks = publicStat.RunningTasks) { + output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl; + } + + output << "\nFull statistics:" << Endl; + NJson::TJsonValue statsJson; + NJson::ReadJsonTree(fullStat, &statsJson); + NJson::WriteJson(&output, &statsJson, true, true, true); + output << Endl; +} + +TString TStatsPrinter::FormatNumber(i64 number) { + struct TSeparator : public std::numpunct { + char do_thousands_sep() const final { + return '.'; + } + + std::string do_grouping() const final { + return "\03"; + } + }; + + std::ostringstream stream; + stream.imbue(std::locale(stream.getloc(), new TSeparator())); + stream << number; + return stream.str(); +} + +TString LoadFile(const TString& file) { + return TFileInput(file).ReadAll(); +} + +NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName) { + NKikimrServices::EServiceKikimr service; + if (!NKikimrServices::EServiceKikimr_Parse(serviceName, &service)) { + ythrow yexception() << "Invalid kikimr service name " << serviceName; + } + return service; +} + +void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig) { + for (auto& entry : *logConfig.MutableEntry()) { + const auto it = logPriorities.find(GetLogService(entry.GetComponent())); + if (it != logPriorities.end()) { + entry.SetLevel(it->second); + logPriorities.erase(it); + } + } + for (const auto& [service, priority] : logPriorities) { + auto* entry = logConfig.AddEntry(); + entry->SetComponent(NKikimrServices::EServiceKikimr_Name(service)); + entry->SetLevel(priority); + } +} + +void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestActorRuntimeBase& runtime) { + if (logConfig.HasDefaultLevel()) { + auto priority = NActors::NLog::EPriority(logConfig.GetDefaultLevel()); + auto descriptor = NKikimrServices::EServiceKikimr_descriptor(); + for (int i = 0; i < descriptor->value_count(); ++i) { + runtime.SetLogPriority(static_cast(descriptor->value(i)->number()), priority); + } + } + + for (const auto& setting : logConfig.get_arr_entry()) { + runtime.SetLogPriority(GetLogService(setting.GetComponent()), NActors::NLog::EPriority(setting.GetLevel())); + } +} + +void SetupSignalActions() { + std::set_terminate(&TerminateHandler); + signal(SIGSEGV, &SegmentationFaultHandler); + signal(SIGFPE, &FloatingPointExceptionHandler); +} + +void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet) { + switch (format) { + case EResultOutputFormat::RowsJson: { + NYdb::TResultSet result(resultSet); + NYdb::TResultSetParser parser(result); + while (parser.TryNextRow()) { + NJsonWriter::TBuf writer(NJsonWriter::HEM_UNSAFE, &output); + writer.SetWriteNanAsString(true); + NYdb::FormatResultRowJson(parser, result.GetColumnsMeta(), writer, NYdb::EBinaryStringEncoding::Unicode); + output << Endl; + } + break; + } + + case EResultOutputFormat::FullJson: { + resultSet.PrintJSON(output); + output << Endl; + break; + } + + case EResultOutputFormat::FullProto: { + TString resultSetString; + google::protobuf::TextFormat::Printer printer; + printer.SetSingleLineMode(false); + printer.SetUseUtf8StringEscaping(true); + printer.PrintToString(resultSet, &resultSetString); + output << resultSetString; + break; + } + } +} + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/utils.h b/ydb/tests/tools/kqprun/runlib/utils.h new file mode 100644 index 000000000000..9e4d69c14501 --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/utils.h @@ -0,0 +1,95 @@ +#pragma once + +#include "settings.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimrRun { + +struct TRequestResult { + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + + TRequestResult(); + + TRequestResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + + TRequestResult(Ydb::StatusIds::StatusCode status, const google::protobuf::RepeatedPtrField& issues); + + bool IsSuccess() const; + + TString ToString() const; +}; + +template +class TChoices { +public: + explicit TChoices(std::map choicesMap) + : ChoicesMap(std::move(choicesMap)) + {} + + TResult operator()(const TString& choice) const { + return ChoicesMap.at(choice); + } + + TVector GetChoices() const { + TVector choices; + choices.reserve(ChoicesMap.size()); + for (const auto& [choice, _] : ChoicesMap) { + choices.emplace_back(choice); + } + return choices; + } + + bool Contains(const TString& choice) const { + return ChoicesMap.contains(choice); + } + +private: + const std::map ChoicesMap; +}; + +class TStatsPrinter { +public: + explicit TStatsPrinter(NYdb::NConsoleClient::EDataFormat planFormat); + + void PrintPlan(const TString& plan, IOutputStream& output) const; + + void PrintInProgressStatistics(const TString& plan, IOutputStream& output) const; + + static void PrintTimeline(const TString& plan, IOutputStream& output); + + static void PrintStatistics(const TString& fullStat, const THashMap& flatStat, const NFq::TPublicStat& publicStat, IOutputStream& output); + + // Function adds thousands separators + // 123456789 -> 123.456.789 + static TString FormatNumber(i64 number); + +private: + const NYdb::NConsoleClient::EDataFormat PlanFormat; + const std::unique_ptr StatProcessor; +}; + +TString LoadFile(const TString& file); + +NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName); + +void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig); + +void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestActorRuntimeBase& runtime); + +void SetupSignalActions(); + +void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet); + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make new file mode 100644 index 000000000000..352a3a423105 --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/ya.make @@ -0,0 +1,28 @@ +LIBRARY() + +SRCS( + application.cpp + utils.cpp +) + +PEERDIR( + library/cpp/colorizer + library/cpp/getopt + library/cpp/json + util + ydb/core/base + ydb/core/blob_depot + ydb/core/fq/libs/compute/common + ydb/core/protos + ydb/library/actors/core + ydb/library/actors/testlib + ydb/library/services + ydb/public/api/protos + ydb/public/lib/json_value + ydb/public/lib/ydb_cli/common + yql/essentials/public/issue +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/tests/tools/kqprun/src/common.cpp b/ydb/tests/tools/kqprun/src/common.cpp deleted file mode 100644 index 0241ca6a932c..000000000000 --- a/ydb/tests/tools/kqprun/src/common.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "common.h" - - -namespace NKqpRun { - -NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName) { - NKikimrServices::EServiceKikimr service; - if (!NKikimrServices::EServiceKikimr_Parse(serviceName, &service)) { - ythrow yexception() << "Invalid kikimr service name " << serviceName; - } - return service; -} - -void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig) { - for (auto& entry : *logConfig.MutableEntry()) { - const auto it = logPriorities.find(GetLogService(entry.GetComponent())); - if (it != logPriorities.end()) { - entry.SetLevel(it->second); - logPriorities.erase(it); - } - } - for (const auto& [service, priority] : logPriorities) { - auto* entry = logConfig.AddEntry(); - entry->SetComponent(NKikimrServices::EServiceKikimr_Name(service)); - entry->SetLevel(priority); - } -} - -} // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 4f4f4ac36913..427dc6c03947 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -2,12 +2,11 @@ #include #include - #include #include - #include #include +#include #include @@ -32,7 +31,7 @@ struct TAsyncQueriesSettings { EVerbose Verbose = EVerbose::EachQuery; }; -struct TYdbSetupSettings { +struct TYdbSetupSettings : public NKikimrRun::TServerSettings { enum class EVerbose { None, Info, @@ -49,7 +48,6 @@ struct TYdbSetupSettings { }; ui32 NodeCount = 1; - TString DomainName = "Root"; std::map Tenants; TDuration HealthCheckTimeout = TDuration::Seconds(10); EHealthCheck HealthCheckLevel = EHealthCheck::NodesCount; @@ -60,14 +58,7 @@ struct TYdbSetupSettings { std::optional PDisksPath; std::optional DiskSize; - bool MonitoringEnabled = false; - ui16 MonitoringPortOffset = 0; - - bool GrpcEnabled = false; - ui16 GrpcPort = 0; - bool TraceOptEnabled = false; - TString LogOutputFile; EVerbose VerboseLevel = EVerbose::Info; TString YqlToken; @@ -87,12 +78,6 @@ struct TRunnerOptions { All, }; - enum class EResultOutputFormat { - RowsJson, // Rows in json format - FullJson, // Columns, rows and types in json format - FullProto, // Columns, rows and types in proto string format - }; - IOutputStream* ResultOutput = nullptr; IOutputStream* SchemeQueryAstOutput = nullptr; std::vector ScriptQueryAstOutputs; @@ -100,7 +85,7 @@ struct TRunnerOptions { std::vector ScriptQueryTimelineFiles; std::vector InProgressStatisticsOutputFiles; - EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; + NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EDataFormat PlanOutputFormat = NYdb::NConsoleClient::EDataFormat::Default; ETraceOptType TraceOptType = ETraceOptType::Disabled; std::optional TraceOptScriptId; @@ -130,7 +115,4 @@ TValue GetValue(size_t index, const std::vector& values, TValue defaultV return values[std::min(index, values.size() - 1)]; } -NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName); -void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig); - } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index ef98165db8ca..f2378965461b 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -2,88 +2,11 @@ #include "ydb_setup.h" #include -#include - -#include -#include - -#include -#include -#include +using namespace NKikimrRun; namespace NKqpRun { -namespace { - -// Function adds thousands separators -// 123456789 -> 123.456.789 -TString FormatNumber(i64 number) { - struct TSeparator : public std::numpunct { - char do_thousands_sep() const final { - return '.'; - } - - std::string do_grouping() const final { - return "\03"; - } - }; - - std::ostringstream stream; - stream.imbue(std::locale(stream.getloc(), new TSeparator())); - stream << number; - return stream.str(); -} - -void PrintStatistics(const TString& fullStat, const THashMap& flatStat, const NFq::TPublicStat& publicStat, IOutputStream& output) { - output << "\nFlat statistics:" << Endl; - for (const auto& [propery, value] : flatStat) { - TString valueString = ToString(value); - if (propery.Contains("Bytes")) { - valueString = NKikimr::NBlobDepot::FormatByteSize(value); - } else if (propery.Contains("TimeUs")) { - valueString = NFq::FormatDurationUs(value); - } else if (propery.Contains("TimeMs")) { - valueString = NFq::FormatDurationMs(value); - } else { - valueString = FormatNumber(value); - } - output << propery << " = " << valueString << Endl; - } - - output << "\nPublic statistics:" << Endl; - if (auto memoryUsageBytes = publicStat.MemoryUsageBytes) { - output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl; - } - if (auto cpuUsageUs = publicStat.CpuUsageUs) { - output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl; - } - if (auto inputBytes = publicStat.InputBytes) { - output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl; - } - if (auto outputBytes = publicStat.OutputBytes) { - output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl; - } - if (auto sourceInputRecords = publicStat.SourceInputRecords) { - output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl; - } - if (auto sinkOutputRecords = publicStat.SinkOutputRecords) { - output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl; - } - if (auto runningTasks = publicStat.RunningTasks) { - output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl; - } - - output << "\nFull statistics:" << Endl; - NJson::TJsonValue statsJson; - NJson::ReadJsonTree(fullStat, &statsJson); - NJson::WriteJson(&output, &statsJson, true, true, true); - output << Endl; -} - -} // anonymous namespace - - //// TKqpRunner::TImpl class TKqpRunner::TImpl { @@ -100,7 +23,7 @@ class TKqpRunner::TImpl { : Options_(options) , VerboseLevel_(Options_.YdbSettings.VerboseLevel) , YdbSetup_(options.YdbSettings) - , StatProcessor_(NFq::CreateStatProcessor("stat_full")) + , StatsPrinter_(Options_.PlanOutputFormat) , CerrColors_(NColorizer::AutoColors(Cerr)) , CoutColors_(NColorizer::AutoColors(Cout)) {} @@ -242,7 +165,7 @@ class TKqpRunner::TImpl { if (ResultSets_.size() > 1 && Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; } - PrintScriptResult(ResultSets_[i]); + PrintResultSet(Options_.ResultOutputFormat, *Options_.ResultOutput, ResultSets_[i]); } } } @@ -337,66 +260,24 @@ class TKqpRunner::TImpl { } } - void PrintPlan(const TString& plan, IOutputStream* output) const { - if (!plan) { - return; - } - - NJson::TJsonValue planJson; - NJson::ReadJsonTree(plan, &planJson, true); - if (!planJson.GetMapSafe().contains("meta")) { - return; - } - - NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *output); - printer.Print(plan); - } - void PrintScriptPlan(size_t queryId, const TString& plan) const { if (const auto output = GetValue(queryId, Options_.ScriptQueryPlanOutputs, nullptr)) { if (Options_.YdbSettings.VerboseLevel >= EVerbose::Info) { Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; } - PrintPlan(plan, output); + StatsPrinter_.PrintPlan(plan, *output); } } void PrintScriptProgress(size_t queryId, const TString& plan) const { if (const auto& output = GetValue(queryId, Options_.InProgressStatisticsOutputFiles, {})) { TFileOutput outputStream(output); - outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; - - auto convertedPlan = plan; - try { - convertedPlan = StatProcessor_->ConvertPlan(plan); - } catch (const NJson::TJsonException& ex) { - outputStream << "Error plan conversion: " << ex.what() << Endl; - } - - try { - double cpuUsage = 0.0; - auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage, nullptr); - auto flatStat = StatProcessor_->GetFlatStat(convertedPlan); - auto publicStat = StatProcessor_->GetPublicStat(fullStat); - - outputStream << "\nCPU usage: " << cpuUsage << Endl; - PrintStatistics(fullStat, flatStat, publicStat, outputStream); - } catch (const NJson::TJsonException& ex) { - outputStream << "Error stat conversion: " << ex.what() << Endl; - } - - outputStream << "\nPlan visualization:" << Endl; - PrintPlan(convertedPlan, &outputStream); - + StatsPrinter_.PrintInProgressStatistics(plan, outputStream); outputStream.Finish(); } if (const auto& output = GetValue(queryId, Options_.ScriptQueryTimelineFiles, {})) { TFileOutput outputStream(output); - - TPlanVisualizer planVisualizer; - planVisualizer.LoadPlans(plan); - outputStream.Write(planVisualizer.PrintSvg()); - + StatsPrinter_.PrintTimeline(plan, outputStream); outputStream.Finish(); } } @@ -409,36 +290,6 @@ class TKqpRunner::TImpl { }; } - void PrintScriptResult(const Ydb::ResultSet& resultSet) const { - switch (Options_.ResultOutputFormat) { - case TRunnerOptions::EResultOutputFormat::RowsJson: { - NYdb::TResultSet result(resultSet); - NYdb::TResultSetParser parser(result); - while (parser.TryNextRow()) { - NJsonWriter::TBuf writer(NJsonWriter::HEM_UNSAFE, Options_.ResultOutput); - writer.SetWriteNanAsString(true); - NYdb::FormatResultRowJson(parser, result.GetColumnsMeta(), writer, NYdb::EBinaryStringEncoding::Unicode); - *Options_.ResultOutput << Endl; - } - break; - } - - case TRunnerOptions::EResultOutputFormat::FullJson: - resultSet.PrintJSON(*Options_.ResultOutput); - *Options_.ResultOutput << Endl; - break; - - case TRunnerOptions::EResultOutputFormat::FullProto: - TString resultSetString; - google::protobuf::TextFormat::Printer printer; - printer.SetSingleLineMode(false); - printer.SetUseUtf8StringEscaping(true); - printer.PrintToString(resultSet, &resultSetString); - *Options_.ResultOutput << resultSetString; - break; - } - } - void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const { if (Options_.YdbSettings.VerboseLevel < EVerbose::Info) { return; @@ -457,7 +308,7 @@ class TKqpRunner::TImpl { EVerbose VerboseLevel_; TYdbSetup YdbSetup_; - std::unique_ptr StatProcessor_; + TStatsPrinter StatsPrinter_; NColorizer::TColors CerrColors_; NColorizer::TColors CoutColors_; diff --git a/ydb/tests/tools/kqprun/src/ya.make b/ydb/tests/tools/kqprun/src/ya.make index 59097668037a..ed5f5a619bd5 100644 --- a/ydb/tests/tools/kqprun/src/ya.make +++ b/ydb/tests/tools/kqprun/src/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( actors.cpp - common.cpp kqp_runner.cpp ydb_setup.cpp ) @@ -10,6 +9,7 @@ SRCS( PEERDIR( ydb/core/testlib + ydb/tests/tools/kqprun/runlib ydb/tests/tools/kqprun/src/proto ) diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index d754d22e254c..3bdd5061882d 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -14,6 +14,7 @@ #include +using namespace NKikimrRun; namespace NKqpRun { @@ -147,18 +148,7 @@ class TYdbSetup::TImpl { void SetLoggerSettings(NKikimr::Tests::TServerSettings& serverSettings) const { auto loggerInitializer = [this](NActors::TTestActorRuntime& runtime) { - if (Settings_.AppConfig.GetLogConfig().HasDefaultLevel()) { - auto priority = NActors::NLog::EPriority(Settings_.AppConfig.GetLogConfig().GetDefaultLevel()); - auto descriptor = NKikimrServices::EServiceKikimr_descriptor(); - for (int i = 0; i < descriptor->value_count(); ++i) { - runtime.SetLogPriority(static_cast(descriptor->value(i)->number()), priority); - } - } - - for (const auto& setting : Settings_.AppConfig.GetLogConfig().get_arr_entry()) { - runtime.SetLogPriority(GetLogService(setting.GetComponent()), NActors::NLog::EPriority(setting.GetLevel())); - } - + InitLogSettings(Settings_.AppConfig.GetLogConfig(), runtime); runtime.SetLogBackendFactory([this]() { return CreateLogBackend(); }); }; @@ -696,32 +686,6 @@ class TYdbSetup::TImpl { }; -//// TRequestResult - -TRequestResult::TRequestResult() - : Status(Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) -{} - -TRequestResult::TRequestResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) - : Status(status) - , Issues(issues) -{} - -TRequestResult::TRequestResult(Ydb::StatusIds::StatusCode status, const google::protobuf::RepeatedPtrField& issues) - : Status(status) -{ - NYql::IssuesFromMessage(issues, Issues); -} - -bool TRequestResult::IsSuccess() const { - return Status == Ydb::StatusIds::SUCCESS; -} - -TString TRequestResult::ToString() const { - return TStringBuilder() << "Request finished with status: " << Status << "\nIssues:\n" << Issues.ToString() << "\n"; -} - - //// TYdbSetup TYdbSetup::TYdbSetup(const TYdbSetupSettings& settings) diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index fcb4249aa10c..8ebf82d1c6e6 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -3,6 +3,8 @@ #include "common.h" #include "actors.h" +#include + #include @@ -30,23 +32,9 @@ struct TExecutionMeta : public TQueryMeta { }; -struct TRequestResult { - Ydb::StatusIds::StatusCode Status; - NYql::TIssues Issues; - - TRequestResult(); - - TRequestResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); - - TRequestResult(Ydb::StatusIds::StatusCode status, const google::protobuf::RepeatedPtrField& issues); - - bool IsSuccess() const; - - TString ToString() const; -}; - - class TYdbSetup { + using TRequestResult = NKikimrRun::TRequestResult; + public: explicit TYdbSetup(const TYdbSetupSettings& settings); diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index 05b119d8e099..81689d3bd5a3 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -17,6 +17,7 @@ PEERDIR( yt/yql/providers/yt/gateway/file yql/essentials/sql/pg + ydb/tests/tools/kqprun/runlib ydb/tests/tools/kqprun/src ) diff --git a/ydb/tests/tools/ya.make b/ydb/tests/tools/ya.make index b7586673bba3..bcae74c00dc2 100644 --- a/ydb/tests/tools/ya.make +++ b/ydb/tests/tools/ya.make @@ -2,6 +2,7 @@ RECURSE( canondata_sync datastreams_helpers fq_runner + fqrun idx_test kqprun mdb_mock