Skip to content

Commit

Permalink
YQ-3561 first version of FQ run tool (#14259)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 9, 2025
1 parent c62ddf2 commit d6360ab
Show file tree
Hide file tree
Showing 27 changed files with 1,442 additions and 441 deletions.
12 changes: 7 additions & 5 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1));
GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1));
if (Settings->EnableYq) {
if (Settings->EnableYq || Settings->EnableYqGrpc) {
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
}
Expand Down Expand Up @@ -1364,13 +1364,15 @@ namespace Tests {
Runtime->GetAppData(nodeIdx).Counters);
const TActorId controllerActorId = Runtime->Register(controllerActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(NMemory::MakeMemoryControllerId(0), controllerActorId, nodeIdx);

auto statActor = NStat::CreateStatService();
const TActorId statActorId = Runtime->Register(statActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).UserPoolId);
Runtime->RegisterService(NStat::MakeStatServiceID(Runtime->GetNodeId(nodeIdx)), statActorId, nodeIdx);
}
}

{
auto statActor = NStat::CreateStatService();
const TActorId statActorId = Runtime->Register(statActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).UserPoolId);
Runtime->RegisterService(NStat::MakeStatServiceID(Runtime->GetNodeId(nodeIdx)), statActorId, nodeIdx);
}

{
IActor* kesusService = NKesus::CreateKesusProxyService();
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ namespace Tests {
bool UseRealThreads = true;
bool EnableKqpSpilling = false;
bool EnableYq = false;
bool EnableYqGrpc = false;
TDuration KeepSnapshotTimeout = TDuration::Zero();
ui64 ChangesQueueItemsLimit = 0;
ui64 ChangesQueueBytesLimit = 0;
Expand Down Expand Up @@ -207,6 +208,7 @@ namespace Tests {
TServerSettings& SetEnableDbCounters(bool value) { FeatureFlags.SetEnableDbCounters(value); return *this; }
TServerSettings& SetEnablePersistentQueryStats(bool value) { FeatureFlags.SetEnablePersistentQueryStats(value); return *this; }
TServerSettings& SetEnableYq(bool value) { EnableYq = value; return *this; }
TServerSettings& SetEnableYqGrpc(bool value) { EnableYqGrpc = value; return *this; }
TServerSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; }
TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; }
TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; }
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/fqrun/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sync_dir

*.log
*.sql
141 changes: 141 additions & 0 deletions ydb/tests/tools/fqrun/configuration/fq_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
Enabled: true
EnableDynamicNameservice: true
EnableTaskCounters: true

CheckpointCoordinator {
Enabled: true

Storage {
TablePrefix: "yq/checkpoints"
ClientTimeoutSec: 70
OperationTimeoutSec: 60
CancelAfterSec: 60
}
}

Common {
MdbGateway: "https://mdb.api.cloud.yandex.net:443"
ObjectStorageEndpoint: "https://storage-internal.cloud.yandex.net"
IdsPrefix: "kr"
QueryArtifactsCompressionMethod: "zstd_6"
MonitoringEndpoint: "monitoring.api.cloud.yandex.net"
KeepInternalErrors: true
UseNativeProtocolForClickHouse: true
DisableSslForGenericDataSources: true
ShowQueryTimeline: true
}

ControlPlaneProxy {
Enabled: true
}

ControlPlaneStorage {
Enabled: true
StatsMode: STATS_MODE_PROFILE

AvailableConnection: "OBJECT_STORAGE"
AvailableConnection: "DATA_STREAMS"
AvailableConnection: "MONITORING"
AvailableConnection: "POSTGRESQL_CLUSTER"
AvailableConnection: "CLICKHOUSE_CLUSTER"
AvailableConnection: "YDB_DATABASE"
AvailableConnection: "GREENPLUM_CLUSTER"
AvailableConnection: "MYSQL_CLUSTER"

AvailableStreamingConnection: "OBJECT_STORAGE"
AvailableStreamingConnection: "DATA_STREAMS"
AvailableStreamingConnection: "MONITORING"
AvailableStreamingConnection: "YDB_DATABASE"

AvailableBinding: "OBJECT_STORAGE"
AvailableBinding: "DATA_STREAMS"

Storage {
TablePrefix: "yq/control_plane"
ClientTimeoutSec: 70
OperationTimeoutSec: 60
CancelAfterSec: 60
}
}

DbPool {
Enabled: true

Storage {
TablePrefix: "yq/db_pool"
ClientTimeoutSec: 70
OperationTimeoutSec: 60
CancelAfterSec: 60
}
}

NodesManager {
Enabled: true
}

PendingFetcher {
Enabled: true
}

PrivateApi {
Enabled: true
}

PrivateProxy {
Enabled: true
}

QuotasManager {
Enabled: true
}

RateLimiter {
Enabled: true
ControlPlaneEnabled: true
DataPlaneEnabled: true

Database {
TablePrefix: "yq/rate_limiter"
ClientTimeoutSec: 70
OperationTimeoutSec: 60
CancelAfterSec: 60
}

Limiters {
CoordinationNodePath: "limiter_alpha"
}
}

ResourceManager {
Enabled: true
}

RowDispatcher {
Enabled: true
SendStatusPeriodSec: 10
TimeoutBeforeStartSessionSec: 10

CompileService {
ParallelCompilationLimit: 20
}

Coordinator {
CoordinationNodePath: "yq/row_dispatcher"

Database {
TablePrefix: "yq/row_dispatcher"
ClientTimeoutSec: 70
OperationTimeoutSec: 60
CancelAfterSec: 60
}
}

JsonParser {
BatchSizeBytes: 1048576
BatchCreationTimeoutMs: 1000
}
}

TestConnection {
Enabled: true
}
166 changes: 166 additions & 0 deletions ydb/tests/tools/fqrun/fqprun.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#include <library/cpp/colorizer/colors.h>
#include <library/cpp/getopt/last_getopt.h>

#include <util/datetime/base.h>

#include <ydb/tests/tools/fqrun/src/fq_runner.h>
#include <ydb/tests/tools/kqprun/runlib/application.h>
#include <ydb/tests/tools/kqprun/runlib/utils.h>

using namespace NKikimrRun;

namespace NFqRun {

namespace {

struct TExecutionOptions {
TString Query;

bool HasResults() const {
return !Query.empty();
}

TRequestOptions GetQueryOptions() const {
return {
.Query = Query
};
}

void Validate(const TRunnerOptions& runnerOptions) const {
if (!Query && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute and is not running as daemon";
}
}
};

void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

if (executionOptions.Query) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl;
if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl;
if (!runner.FetchQueryResults()) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed";
}
}

if (executionOptions.HasResults()) {
try {
runner.PrintQueryResults();
} catch (...) {
ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage();
}
}
}

void RunAsDaemon() {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization finished" << colors.Default() << Endl;
while (true) {
Sleep(TDuration::Seconds(1));
}
}

void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& runnerOptions) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of fq runner..." << colors.Default() << Endl;
TFqRunner runner(runnerOptions);

try {
RunArgumentQueries(executionOptions, runner);
} catch (const yexception& exception) {
if (runnerOptions.FqSettings.MonitoringEnabled) {
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
} else {
throw exception;
}
}

if (runnerOptions.FqSettings.MonitoringEnabled || runnerOptions.FqSettings.GrpcEnabled) {
RunAsDaemon();
}

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of fq runner..." << colors.Default() << Endl;
}

class TMain : public TMainBase {
protected:
void RegisterOptions(NLastGetopt::TOpts& options) override {
options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy");
options.AddHelpOption('h');
options.SetFreeArgsNum(0);

// Inputs

options.AddLongOption('p', "query", "Query to execute")
.RequiredArgument("file")
.StoreMappedResult(&ExecutionOptions.Query, &LoadFile);

options.AddLongOption("fq-cfg", "File with FQ config (NFq::NConfig::TConfig for FQ proxy)")
.RequiredArgument("file")
.DefaultValue("./configuration/fq_config.conf")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(TString(option->CurValOrDef())), &RunnerOptions.FqSettings.FqConfig)) {
ythrow yexception() << "Bad format of FQ configuration";
}
});

// Outputs

options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
.RequiredArgument("file")
.DefaultValue("-")
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);

TChoices<EResultOutputFormat> resultFormat({
{"rows", EResultOutputFormat::RowsJson},
{"full-json", EResultOutputFormat::FullJson},
{"full-proto", EResultOutputFormat::FullProto}
});
options.AddLongOption('R', "result-format", "Query result format")
.RequiredArgument("result-format")
.DefaultValue("rows")
.Choices(resultFormat.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutputFormat, resultFormat);

RegisterKikimrOptions(options, RunnerOptions.FqSettings);
}

int DoRun(NLastGetopt::TOptsParseResult&&) override {
ExecutionOptions.Validate(RunnerOptions);

auto& logConfig = RunnerOptions.FqSettings.LogConfig;
logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT);
FillLogConfig(logConfig);

RunScript(ExecutionOptions, RunnerOptions);

return 0;
}

private:
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;
};

} // anonymous namespace

} // namespace NFqRun

int main(int argc, const char* argv[]) {
SetupSignalActions();

try {
NFqRun::TMain().Run(argc, argv);
} catch (...) {
NColorizer::TColors colors = NColorizer::AutoColors(Cerr);

Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
return 1;
}
}
29 changes: 29 additions & 0 deletions ydb/tests/tools/fqrun/src/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <util/generic/string.h>

#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/tests/tools/kqprun/runlib/settings.h>

namespace NFqRun {

constexpr i64 MAX_RESULT_SET_ROWS = 1000;

struct TFqSetupSettings : public NKikimrRun::TServerSettings {
NFq::NConfig::TConfig FqConfig;
NKikimrConfig::TLogConfig LogConfig;
};

struct TRunnerOptions {
IOutputStream* ResultOutput = nullptr;
NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson;

TFqSetupSettings FqSettings;
};

struct TRequestOptions {
TString Query;
};

} // namespace NFqRun
Loading

0 comments on commit d6360ab

Please sign in to comment.