Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic #421

Merged
merged 5 commits into from
Oct 22, 2024
Merged

Topic #421

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk';
import {Driver as YDB, getCredentialsFromEnv, Context, SimpleLogger} from 'ydb-sdk';
import {Ydb} from "ydb-sdk-proto";
import {getDefaultLogger} from "../../src/logger/get-default-logger";
// import {main} from "../utils";
import Codec = Ydb.Topic.Codec;

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function run() {
const logger = getDefaultLogger();
// const logger = getDefaultLogger();
const logger = new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'});
const authService = getCredentialsFromEnv(logger);
const db = new YDB({
endpoint: ENDPOINT, // i.e.: grc(s)://<x.x.x.x>
database: DATABASE, // i.e.: '/local'
authService, logger
// logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
connectionString: process.env.YDB_CONNECTION_STRING || 'grpc://localhost:2136?database=/local',
authService, logger,
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
if (!(await db.ready(30000))) throw new Error('Driver is not ready!');
try {
await db.topic.createTopic({
path: 'demoTopic',
Expand All @@ -43,9 +37,9 @@ async function run() {
},
});

logger.info(await db.topic.describeTopic({
await db.topic.describeTopic({
path: 'demoTopic',
}));
});

const writer = await db.topic.createWriter({
path: 'demoTopic',
Expand Down
4 changes: 3 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ export class TransportError extends YdbError {
} catch (error) {}
return new Error(`Unexpected transport error code ${e.code}! Error itself: ${errStr}`);
} else {
return new ErrCls(
const ydbErr = new ErrCls(
`${ErrCls.name} (code ${ErrCls.status}): ${e.name}: ${e.message}. ${e.details}`,
);
ydbErr.stack = e.stack;
return ydbErr;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,7 @@ export {RemoveDirectorySettings} from "./schema/scheme-service";
export {MakeDirectorySettings} from "./schema/scheme-service";

export {ParsedConnectionString, parseConnectionString} from "./utils/parse-connection-string";

export {QueryClient, ResultSet, RowType} from "./query";

export {SimpleLogger} from "./logger/simple-logger";
export {getDefaultLogger} from "./logger/get-default-logger";
30 changes: 21 additions & 9 deletions src/topic/internal/internal-topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
// @ts-ignore
public destroy();
public /*async*/ destroy(ctx: Context) {
this.logger.trace('%s: InternalTopicClient.destroy()', ctx);
let destroyPromise;
if (this.allStreams.length > 0) {
destroyPromise = new Promise((resolve) => {
Expand All @@ -78,13 +79,15 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
}

public async openWriteStreamWithEvents(ctx: Context, args: InternalWriteStreamInitArgs & Pick<Ydb.Topic.StreamWriteMessage.IInitRequest, 'messageGroupId'>) {
this.logger.trace('%s: InternalTopicClient.openWriteStreamWithEvents()', ctx);
if (args.producerId === undefined || args.producerId === null) {
const newGUID = uuid_v4();
args = {...args, producerId: newGUID, messageGroupId: newGUID}
} else if (args.messageGroupId === undefined || args.messageGroupId === null) {
args = {...args, messageGroupId: args.producerId};
}
const writerStream = new InternalTopicWriteStream(ctx, args, this, this.logger);
const writerStream = new InternalTopicWriteStream(ctx, this, this.logger);
await writerStream.init(ctx, args);
writerStream.events.once('end', () => {
const index = this.allStreams.findIndex(v => v === writerStream)
if (index >= 0) this.allStreams.splice(index, 1);
Expand All @@ -95,7 +98,9 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
}

public async openReadStreamWithEvents(ctx: Context, args: InternalReadStreamInitArgs) {
const readStream = new InternalTopicReadStream(ctx, args, this, this.logger);
this.logger.trace('%s: InternalTopicClient.openReadStreamWithEvents()', ctx);
const readStream = new InternalTopicReadStream(ctx, this, this.logger);
await readStream.init(ctx, args);
readStream.events.once('end', () => {
const index = this.allStreams.findIndex(v => v === readStream)
if (index >= 0) this.allStreams.splice(index, 1);
Expand All @@ -105,31 +110,38 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
return readStream;
}

public async commitOffset(_ctx: Context, request: InternalCommitOffsetArgs) {
public async commitOffset(ctx: Context, request: InternalCommitOffsetArgs) {
this.logger.trace('%s: InternalTopicClient.commitOffset()', ctx);
return (await this.api.commitOffset(request)) as InternalCommitOffsetResult;
}

public async updateOffsetsInTransaction(_ctx: Context, request: InternalUpdateOffsetsInTransactionArgs) {
public async updateOffsetsInTransaction(ctx: Context, request: InternalUpdateOffsetsInTransactionArgs) {
this.logger.trace('%s: InternalTopicClient.updateOffsetsInTransaction()', ctx);
return (await this.api.updateOffsetsInTransaction(request)) as InternalUpdateOffsetsInTransactionResult;
}

public async createTopic(_ctx: Context, request: InternalCreateTopicArgs) {
public async createTopic(ctx: Context, request: InternalCreateTopicArgs) {
this.logger.trace('%s: InternalTopicClient.createTopic()', ctx);
return (await this.api.createTopic(request)) as InternalCreateTopicResult;
}

public async describeTopic(_ctx: Context, request: InternalDescribeTopicArgs) {
public async describeTopic(ctx: Context, request: InternalDescribeTopicArgs) {
this.logger.trace('%s: InternalTopicClient.describeTopic()', ctx);
return (await this.api.describeTopic(request)) as InternalDescribeTopicResult;
}

public async describeConsumer(_ctx: Context, request: InternalDescribeConsumerArgs) {
public async describeConsumer(ctx: Context, request: InternalDescribeConsumerArgs) {
this.logger.trace('%s: InternalTopicClient.describeConsumer()', ctx);
return (await this.api.describeConsumer(request)) as InternalDescribeConsumerResult;
}

public async alterTopic(_ctx: Context, request: InternalAlterTopicArgs) {
public async alterTopic(ctx: Context, request: InternalAlterTopicArgs) {
this.logger.trace('%s: InternalTopicClient.alterTopic()', ctx);
return (await this.api.alterTopic(request)) as InternalAlterTopicResult;
}

public async dropTopic(_ctx: Context, request: InternalDropTopicArgs) {
public async dropTopic(ctx: Context, request: InternalDropTopicArgs) {
this.logger.trace('%s: InternalTopicClient.dropTopic()', ctx);
return (await this.api.dropTopic(request)) as InternalDropTopicResult;
}
}
19 changes: 8 additions & 11 deletions src/topic/internal/internal-topic-read-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ export type ReadStreamEvents = {
end: (cause: Error) => void,
}

export const enum TopicWriteStreamState {
Init,
Active,
Closing,
Closed
}

export class InternalTopicReadStream {
public events = new EventEmitter() as TypedEmitter<ReadStreamEvents>;

Expand All @@ -61,12 +54,15 @@ export class InternalTopicReadStream {

constructor(
ctx: Context,
args: InternalReadStreamInitArgs,
private topicService: InternalTopicClient,
// @ts-ignore
public readonly logger: Logger) {
this.logger.trace('%s: new TopicReadStreamWithEvents()', ctx);
this.topicService.updateMetadata();
};

public async init(ctx: Context, args: InternalReadStreamInitArgs) {
this.logger.trace('%s: InternalTopicReadStream.init()', ctx);
await this.topicService.updateMetadata();
this.readBidiStream = this.topicService.grpcServiceClient!
.makeBidiStreamRequest<Ydb.Topic.StreamReadMessage.FromClient, Ydb.Topic.StreamReadMessage.FromServer>(
'/Ydb.Topic.V1.TopicService/StreamRead',
Expand Down Expand Up @@ -111,9 +107,10 @@ export class InternalTopicReadStream {
}
})
this.initRequest(ctx, args);
};

private initRequest(ctx: Context, args: InternalReadStreamInitArgs) {
}

public initRequest(ctx: Context, args: InternalReadStreamInitArgs) {
this.logger.trace('%s: TopicReadStreamWithEvents.initRequest()', ctx);
this.readBidiStream!.write(
Ydb.Topic.StreamReadMessage.create({
Expand Down
22 changes: 14 additions & 8 deletions src/topic/internal/internal-topic-write-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,24 @@ export type WriteStreamEvents = {

export class InternalTopicWriteStream {
private reasonForClose?: Error;
private writeBidiStream: ClientDuplexStream<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>;
private writeBidiStream?: ClientDuplexStream<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>;

public readonly events = new EventEmitter() as TypedEmitter<WriteStreamEvents>;

constructor(
ctx: Context,
args: InternalWriteStreamInitArgs,
private topicService: InternalTopicClient,
// @ts-ignore
private logger: Logger) {
this.logger.trace('%s: new TopicWriteStreamWithEvents|()', ctx);
this.topicService.updateMetadata();
this.logger.trace('%s: new TopicWriteStreamWithEvents()', ctx);
};

public async init(
ctx: Context,
args: InternalWriteStreamInitArgs
) {
this.logger.trace('%s: TopicWriteStreamWithEvents.init()', ctx);
await this.topicService.updateMetadata();
this.writeBidiStream = this.topicService.grpcServiceClient!
.makeBidiStreamRequest<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>(
'/Ydb.Topic.V1.TopicService/StreamWrite',
Expand Down Expand Up @@ -86,10 +92,10 @@ export class InternalTopicWriteStream {
err = TransportError.convertToYdbError(err as (Error & StatusObject));
this.events.emit('error', err);
}
this.writeBidiStream.end();
this.writeBidiStream!.end();
});
this.initRequest(ctx, args);
};
}

private initRequest(ctx: Context, args: InternalWriteStreamInitArgs) {
this.logger.trace('%s: TopicWriteStreamWithEvents.initRequest()', ctx);
Expand All @@ -107,7 +113,7 @@ export class InternalTopicWriteStream {
this.logger.trace('%s: TopicWriteStreamWithEvents.writeRequest()', ctx);
if (this.reasonForClose) throw new Error('Stream is not open');
await this.updateToken(ctx);
this.writeBidiStream.write(
this.writeBidiStream!.write(
Ydb.Topic.StreamWriteMessage.FromClient.create({
writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(args),
}));
Expand All @@ -117,7 +123,7 @@ export class InternalTopicWriteStream {
this.logger.trace('%s: TopicWriteStreamWithEvents.updateTokenRequest()', ctx);
if (this.reasonForClose) throw new Error('Stream is not open');
await this.updateToken(ctx);
this.writeBidiStream.write(
this.writeBidiStream!.write(
Ydb.Topic.StreamWriteMessage.FromClient.create({
updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(args),
}));
Expand Down
4 changes: 2 additions & 2 deletions src/topic/topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd
import {google, Ydb} from "ydb-sdk-proto";
import {InternalTopicClient} from "./internal/internal-topic-client";

// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation
// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in every jdbc operation

export type ICreateWriterArgs = {
path: string;
Expand Down Expand Up @@ -184,7 +184,7 @@ export class TopicClient {
return new TopicReader(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger);
}

// TODO: Add commit a queue - same as in writer, to confirm commits
// TODO: Add commit queue - same as in writer, to confirm commits

// @ts-ignore
public commitOffset(request: ICommitOffsetArgs): Promise<IOperationResult>;
Expand Down
3 changes: 1 addition & 2 deletions src/topic/topic-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ export class TopicReader {

private async initInnerStream(ctx: Context) {
this.logger.trace('%s: TopicReader.initInnerStream()', ctx);
this.innerReadStream = new InternalTopicReadStream(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger);

this.innerReadStream = await (await this.discovery.getTopicNodeClient()).openReadStreamWithEvents(ctx, this.readStreamArgs);
// this.innerReadStream.events.on('initResponse', async (resp) => {
// try {
// // TODO: Impl
Expand Down
2 changes: 1 addition & 1 deletion src/topic/topic-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class TopicWriter {
delete this.writeStreamArgs.getLastSeqNo;
}
delete this.firstInnerStreamInitResp;
const stream = new InternalTopicWriteStream(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger);
const stream = await (await this.discovery.getTopicNodeClient()).openWriteStreamWithEvents(ctx, this.writeStreamArgs);
stream.events.on('initResponse', (resp) => {
this.logger.trace('%s: TopicWriter.on "initResponse"', ctx);
try {
Expand Down
Loading