From c7573ecc343445f7a8d6a7f01f2aa8e06c4fb2d0 Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 15 Oct 2024 08:53:07 +0300 Subject: [PATCH 1/4] fix: update protobufjs to the latest version and add dependencies to protobufjs in the proto file --- .../index.ts | 2 +- .../index.ts | 2 +- examples/url-shortener/database.ts | 11 +++++----- package-lock.json | 22 ++++++++++++------- package.json | 2 +- src/topic/topic-client.ts | 1 - 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/examples/basic-example-v1-with-table-service/index.ts b/examples/basic-example-v1-with-table-service/index.ts index 5129251b..cfb498a7 100644 --- a/examples/basic-example-v1-with-table-service/index.ts +++ b/examples/basic-example-v1-with-table-service/index.ts @@ -224,7 +224,7 @@ async function selectPrepared(session: Session, data: ThreeIds[], logger: Logger '$episodeId': episode.getTypedValue('episodeId') }); const result = Series.createNativeObjects(resultSets[0]); - logger.info(`Select prepared query ${JSON.stringify(result, null, 2)}`); + logger.info(`Parametrized select query ${JSON.stringify(result, null, 2)}`); } } await withRetries(select); diff --git a/examples/basic-example-v2-with-query-service/index.ts b/examples/basic-example-v2-with-query-service/index.ts index e3170c41..6d78578f 100644 --- a/examples/basic-example-v2-with-query-service/index.ts +++ b/examples/basic-example-v2-with-query-service/index.ts @@ -165,7 +165,7 @@ async function selectWithParameters(driver: Driver, data: ThreeIds[], logger: Lo const {value: resultSet} = await resultSets.next(); const {value: row} = await resultSet.rows.next(); await opFinished; - logger.info(`Select prepared query ${JSON.stringify(row, null, 2)}`); + logger.info(`Parametrized select query ${JSON.stringify(row, null, 2)}`); } } }); diff --git a/examples/url-shortener/database.ts b/examples/url-shortener/database.ts index 32b375eb..568d655f 100644 --- a/examples/url-shortener/database.ts +++ b/examples/url-shortener/database.ts @@ -53,18 +53,17 @@ VALUES ($shortenUrl, $sourceUrl);`; return shortenUrl; } - export async function selectSource( shortenUrl: string, session: Session, logger: Logger): Promise { const query = ` ${SYNTAX_V1} - DECLARE $shortenUrl as Utf8; +DECLARE $shortenUrl as Utf8; - SELECT * - FROM ${URLS_TABLE} - WHERE shorten = $shortenUrl;`; +SELECT * +FROM ${URLS_TABLE} +WHERE shorten = $shortenUrl;`; async function execute() : Promise { logger.info('Preparing query...'); @@ -75,7 +74,7 @@ export async function selectSource( '$shortenUrl': requestSourceUrl.getTypedValue('shorten') }); const result = UrlsMatch.createNativeObjects(resultSets[0]); - logger.info('Select prepared query', result); + logger.info('Parametrized select query', result); if (result.length == 0) { return ''; } else { diff --git a/package-lock.json b/package-lock.json index 4bf01bac..bd9cc228 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "reflect-metadata": "^0.1.13", "typed-emitter": "^2.1.0", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.1.0" + "ydb-sdk-proto": "^1.2.5" }, "devDependencies": { "@commitlint/cli": "^17.6.1", @@ -10198,10 +10198,13 @@ } }, "node_modules/ydb-sdk-proto": { - "version": "1.2.3", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.3.tgz", - "integrity": "sha512-rz8NeYoxsw/tU/7W1QAgsSH42y9vHcexLZUyWtQ2pp7Uu0YORltG5cXkXb7YKVomAao736Il/PL9fonDl/7CMQ==", - "license": "Apache" + "version": "1.2.5", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.5.tgz", + "integrity": "sha512-Sk74VpLQF5k0h+Ev0z83s1BMjpJmiyD2p2fqe3SrusXAmBL2JtB7DbNaejgTJVRlDUfJBjjreQ4Ah0Q16n+XSw==", + "license": "Apache", + "dependencies": { + "protobufjs": "^7.4.0" + } }, "node_modules/yn": { "version": "3.1.1", @@ -17958,9 +17961,12 @@ "dev": true }, "ydb-sdk-proto": { - "version": "1.2.3", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.3.tgz", - "integrity": "sha512-rz8NeYoxsw/tU/7W1QAgsSH42y9vHcexLZUyWtQ2pp7Uu0YORltG5cXkXb7YKVomAao736Il/PL9fonDl/7CMQ==" + "version": "1.2.5", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.5.tgz", + "integrity": "sha512-Sk74VpLQF5k0h+Ev0z83s1BMjpJmiyD2p2fqe3SrusXAmBL2JtB7DbNaejgTJVRlDUfJBjjreQ4Ah0Q16n+XSw==", + "requires": { + "protobufjs": "^7.4.0" + } }, "yn": { "version": "3.1.1", diff --git a/package.json b/package.json index cb4af6ee..e627b865 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "reflect-metadata": "^0.1.13", "typed-emitter": "^2.1.0", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.1.0" + "ydb-sdk-proto": "^1.2.5" }, "devDependencies": { "@commitlint/cli": "^17.6.1", diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index 2dec6334..040aa7d8 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -26,7 +26,6 @@ export type ICreateReaderArgs = { }[]; consumer?: (string|null); readerName?: (string|null); - } export type ICommitOffsetArgs = { path: (string|null); From 9c2518e6b22b4f1e14a473a17414278ae3765f0e Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 15 Oct 2024 09:04:35 +0300 Subject: [PATCH 2/4] fix: fix topic example and make Context publically available --- examples/topic-service/index.ts | 8 ++++---- src/index.ts | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index 8e19ac24..d5551f63 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,9 +1,8 @@ -import {Driver as YDB, getCredentialsFromEnv} from 'ydb-sdk'; +import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk'; import {Ydb} from "ydb-sdk-proto"; import {getDefaultLogger} from "../../src/logger/get-default-logger"; -import {main} from "../utils"; +// import {main} from "../utils"; import Codec = Ydb.Topic.Codec; -import {Context} from "ydb-sdk/build/cjs/src/context/context"; require('dotenv').config(); @@ -113,4 +112,5 @@ async function run() { } } -main(run); +// main(run); +run(); diff --git a/src/index.ts b/src/index.ts index 2a903efb..7d8ca493 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,6 +36,7 @@ export {getCredentialsFromEnv, getSACredentialsFromJson} from './utils/parse-env export {ISslCredentials} from './utils/ssl-credentials'; export {withRetries, RetryParameters} from './retries_obsoleted'; +export {Context} from './context'; export {YdbError, StatusCode} from './errors'; From 6adca13c4b470e73fb300cdda0340b135fa66377 Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 22 Oct 2024 14:17:33 +0300 Subject: [PATCH 3/4] fix: topic stream auth in cloud --- examples/topic-service/index.ts | 21 ++++++------- src/errors.ts | 4 ++- src/index.ts | 4 ++- src/topic/internal/internal-topic-client.ts | 30 +++++++++++++------ .../internal/internal-topic-read-stream.ts | 19 +++++------- .../internal/internal-topic-write-stream.ts | 22 +++++++++----- src/topic/topic-client.ts | 4 +-- src/topic/topic-reader.ts | 3 +- src/topic/topic-writer.ts | 2 +- 9 files changed, 62 insertions(+), 47 deletions(-) diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index d5551f63..982b2602 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,24 +1,21 @@ import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk'; import {Ydb} from "ydb-sdk-proto"; -import {getDefaultLogger} from "../../src/logger/get-default-logger"; +// import {getDefaultLogger} from "../../src/logger/get-default-logger"; // import {main} from "../utils"; import Codec = Ydb.Topic.Codec; +import {SimpleLogger} from "../../src"; require('dotenv').config(); -const DATABASE = '/local'; -const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; - async function run() { - const logger = getDefaultLogger(); + // const logger = getDefaultLogger(); + const logger = new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}); const authService = getCredentialsFromEnv(logger); const db = new YDB({ - endpoint: ENDPOINT, // i.e.: grc(s):// - database: DATABASE, // i.e.: '/local' - authService, logger - // logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), + connectionString: process.env.YDB_CONNECTION_STRING || 'grpc://localhost:2136?database=/local', + authService, logger, }); - if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); + if (!(await db.ready(30000))) throw new Error('Driver is not ready!'); try { await db.topic.createTopic({ path: 'demoTopic', @@ -43,9 +40,9 @@ async function run() { }, }); - logger.info(await db.topic.describeTopic({ + await db.topic.describeTopic({ path: 'demoTopic', - })); + }); const writer = await db.topic.createWriter({ path: 'demoTopic', diff --git a/src/errors.ts b/src/errors.ts index 1cef9d4b..bf105a37 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -293,9 +293,11 @@ export class TransportError extends YdbError { } catch (error) {} return new Error(`Unexpected transport error code ${e.code}! Error itself: ${errStr}`); } else { - return new ErrCls( + const ydbErr = new ErrCls( `${ErrCls.name} (code ${ErrCls.status}): ${e.name}: ${e.message}. ${e.details}`, ); + ydbErr.stack = e.stack; + return ydbErr; } } } diff --git a/src/index.ts b/src/index.ts index 7d8ca493..1112b559 100644 --- a/src/index.ts +++ b/src/index.ts @@ -86,5 +86,7 @@ export {RemoveDirectorySettings} from "./schema/scheme-service"; export {MakeDirectorySettings} from "./schema/scheme-service"; export {ParsedConnectionString, parseConnectionString} from "./utils/parse-connection-string"; - export {QueryClient, ResultSet, RowType} from "./query"; + +export {SimpleLogger} from "./logger/simple-logger"; +export {getDefaultLogger} from "./logger/get-default-logger"; diff --git a/src/topic/internal/internal-topic-client.ts b/src/topic/internal/internal-topic-client.ts index 9dc56af8..a474c478 100644 --- a/src/topic/internal/internal-topic-client.ts +++ b/src/topic/internal/internal-topic-client.ts @@ -64,6 +64,7 @@ export class InternalTopicClient extends AuthenticatedService 0) { destroyPromise = new Promise((resolve) => { @@ -78,13 +79,15 @@ export class InternalTopicClient extends AuthenticatedService) { + this.logger.trace('%s: InternalTopicClient.openWriteStreamWithEvents()', ctx); if (args.producerId === undefined || args.producerId === null) { const newGUID = uuid_v4(); args = {...args, producerId: newGUID, messageGroupId: newGUID} } else if (args.messageGroupId === undefined || args.messageGroupId === null) { args = {...args, messageGroupId: args.producerId}; } - const writerStream = new InternalTopicWriteStream(ctx, args, this, this.logger); + const writerStream = new InternalTopicWriteStream(ctx, this, this.logger); + await writerStream.init(ctx, args); writerStream.events.once('end', () => { const index = this.allStreams.findIndex(v => v === writerStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -95,7 +98,9 @@ export class InternalTopicClient extends AuthenticatedService { const index = this.allStreams.findIndex(v => v === readStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -105,31 +110,38 @@ export class InternalTopicClient extends AuthenticatedService void, } -export const enum TopicWriteStreamState { - Init, - Active, - Closing, - Closed -} - export class InternalTopicReadStream { public events = new EventEmitter() as TypedEmitter; @@ -61,12 +54,15 @@ export class InternalTopicReadStream { constructor( ctx: Context, - args: InternalReadStreamInitArgs, private topicService: InternalTopicClient, // @ts-ignore public readonly logger: Logger) { this.logger.trace('%s: new TopicReadStreamWithEvents()', ctx); - this.topicService.updateMetadata(); + }; + + public async init(ctx: Context, args: InternalReadStreamInitArgs) { + this.logger.trace('%s: InternalTopicReadStream.init()', ctx); + await this.topicService.updateMetadata(); this.readBidiStream = this.topicService.grpcServiceClient! .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamRead', @@ -111,9 +107,10 @@ export class InternalTopicReadStream { } }) this.initRequest(ctx, args); - }; - private initRequest(ctx: Context, args: InternalReadStreamInitArgs) { + } + + public initRequest(ctx: Context, args: InternalReadStreamInitArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.initRequest()', ctx); this.readBidiStream!.write( Ydb.Topic.StreamReadMessage.create({ diff --git a/src/topic/internal/internal-topic-write-stream.ts b/src/topic/internal/internal-topic-write-stream.ts index a78ab8bf..1a4ea109 100644 --- a/src/topic/internal/internal-topic-write-stream.ts +++ b/src/topic/internal/internal-topic-write-stream.ts @@ -38,18 +38,24 @@ export type WriteStreamEvents = { export class InternalTopicWriteStream { private reasonForClose?: Error; - private writeBidiStream: ClientDuplexStream; + private writeBidiStream?: ClientDuplexStream; public readonly events = new EventEmitter() as TypedEmitter; constructor( ctx: Context, - args: InternalWriteStreamInitArgs, private topicService: InternalTopicClient, // @ts-ignore private logger: Logger) { - this.logger.trace('%s: new TopicWriteStreamWithEvents|()', ctx); - this.topicService.updateMetadata(); + this.logger.trace('%s: new TopicWriteStreamWithEvents()', ctx); + }; + + public async init( + ctx: Context, + args: InternalWriteStreamInitArgs + ) { + this.logger.trace('%s: TopicWriteStreamWithEvents.init()', ctx); + await this.topicService.updateMetadata(); this.writeBidiStream = this.topicService.grpcServiceClient! .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamWrite', @@ -86,10 +92,10 @@ export class InternalTopicWriteStream { err = TransportError.convertToYdbError(err as (Error & StatusObject)); this.events.emit('error', err); } - this.writeBidiStream.end(); + this.writeBidiStream!.end(); }); this.initRequest(ctx, args); - }; + } private initRequest(ctx: Context, args: InternalWriteStreamInitArgs) { this.logger.trace('%s: TopicWriteStreamWithEvents.initRequest()', ctx); @@ -107,7 +113,7 @@ export class InternalTopicWriteStream { this.logger.trace('%s: TopicWriteStreamWithEvents.writeRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); - this.writeBidiStream.write( + this.writeBidiStream!.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(args), })); @@ -117,7 +123,7 @@ export class InternalTopicWriteStream { this.logger.trace('%s: TopicWriteStreamWithEvents.updateTokenRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); - this.writeBidiStream.write( + this.writeBidiStream!.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(args), })); diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index 040aa7d8..52139aac 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -6,7 +6,7 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd import {google, Ydb} from "ydb-sdk-proto"; import {InternalTopicClient} from "./internal/internal-topic-client"; -// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation +// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in every jdbc operation export type ICreateWriterArgs = { path: string; @@ -184,7 +184,7 @@ export class TopicClient { return new TopicReader(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger); } - // TODO: Add commit a queue - same as in writer, to confirm commits + // TODO: Add commit queue - same as in writer, to confirm commits // @ts-ignore public commitOffset(request: ICommitOffsetArgs): Promise; diff --git a/src/topic/topic-reader.ts b/src/topic/topic-reader.ts index e9e021b6..522c9cbd 100644 --- a/src/topic/topic-reader.ts +++ b/src/topic/topic-reader.ts @@ -157,8 +157,7 @@ export class TopicReader { private async initInnerStream(ctx: Context) { this.logger.trace('%s: TopicReader.initInnerStream()', ctx); - this.innerReadStream = new InternalTopicReadStream(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); - + this.innerReadStream = await (await this.discovery.getTopicNodeClient()).openReadStreamWithEvents(ctx, this.readStreamArgs); // this.innerReadStream.events.on('initResponse', async (resp) => { // try { // // TODO: Impl diff --git a/src/topic/topic-writer.ts b/src/topic/topic-writer.ts index 249d9b65..c40c970c 100644 --- a/src/topic/topic-writer.ts +++ b/src/topic/topic-writer.ts @@ -107,7 +107,7 @@ export class TopicWriter { delete this.writeStreamArgs.getLastSeqNo; } delete this.firstInnerStreamInitResp; - const stream = new InternalTopicWriteStream(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); + const stream = await (await this.discovery.getTopicNodeClient()).openWriteStreamWithEvents(ctx, this.writeStreamArgs); stream.events.on('initResponse', (resp) => { this.logger.trace('%s: TopicWriter.on "initResponse"', ctx); try { From dc64f5badf9863f3284902e7947be13943554701 Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 22 Oct 2024 14:22:32 +0300 Subject: [PATCH 4/4] chore: little fix --- examples/topic-service/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index 33f8e3da..5195fc59 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,6 +1,5 @@ -import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk'; +import {Driver as YDB, getCredentialsFromEnv, Context, SimpleLogger} from 'ydb-sdk'; import {Ydb} from "ydb-sdk-proto"; -import {getDefaultLogger} from "../../src/logger/get-default-logger"; import Codec = Ydb.Topic.Codec; require('dotenv').config();