Skip to content

Commit

Permalink
chore: parameters for documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Zorkaltsev committed Oct 9, 2024
1 parent f1b0688 commit 6608df0
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 180 deletions.
4 changes: 2 additions & 2 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Driver as YDB, getCredentialsFromEnv} from '../../src';
import {Driver as YDB, getCredentialsFromEnv} from 'ydb-sdk';
import {Ydb} from "ydb-sdk-proto";
import {Context} from "../../src/context";
import {getDefaultLogger} from "../../src/logger/get-default-logger";
import {main} from "../utils";
import Codec = Ydb.Topic.Codec;
import {Context} from "ydb-sdk/build/cjs/src/context/context";

require('dotenv').config();

Expand Down
102 changes: 52 additions & 50 deletions src/query/query-session-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import {CtxUnsubcribe} from "../context";
import IExecuteQueryRequest = Ydb.Query.IExecuteQueryRequest;
import IColumn = Ydb.IColumn;

export type IExecuteArgs = {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
};

export type IExecuteResult = {
resultSets: AsyncGenerator<ResultSet>,
execStats?: Ydb.TableStats.IQueryStats;
Expand Down Expand Up @@ -51,73 +80,46 @@ export const enum RowType {
* Finishes when the first data block is received or when the end of the stream is received. So if you are sure
* that the operation does not return any data, you may not process resultSets.
*/
export function execute(this: QuerySession, opts: {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
}): Promise<IExecuteResult> {
// Validate opts
if (!opts.text.trim()) throw new Error('"text" parameter is empty')
if (opts.parameters)
Object.keys(opts.parameters).forEach(n => {
export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecuteResult> {
// Validate args
if (!args.text.trim()) throw new Error('"text" parameter is empty')
if (args.parameters)
Object.keys(args.parameters).forEach(n => {
if (!n.startsWith('$')) throw new Error(`Parameter name must start with "$": ${n}`);
})
if (opts.txControl && this[sessionTxSettingsSymbol])
if (args.txControl && this[sessionTxSettingsSymbol])
throw new Error(CANNOT_MANAGE_TRASACTIONS_ERROR);
if (opts.txControl?.txId)
if (args.txControl?.txId)
throw new Error('Cannot contain txControl.txId because the current session transaction is used (see session.txId)');
if (this[sessionTxIdSymbol]) {
if (opts.txControl?.beginTx)
if (args.txControl?.beginTx)
throw new Error('txControl.beginTx when there\'s already an open transaction');
} else {
if (opts.txControl?.commitTx && !opts.txControl?.beginTx)
if (args.txControl?.commitTx && !args.txControl?.beginTx)
throw new Error('txControl.commitTx === true when no open transaction and there\'s no txControl.beginTx');
}

// Build params
const executeQueryRequest: IExecuteQueryRequest = {
sessionId: this.sessionId,
queryContent: {
text: opts.text,
syntax: opts.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
text: args.text,
syntax: args.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
},
execMode: opts.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
execMode: args.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
};
if (opts.statsMode) executeQueryRequest.statsMode = opts.statsMode;
if (opts.parameters) executeQueryRequest.parameters = opts.parameters;
if (args.statsMode) executeQueryRequest.statsMode = args.statsMode;
if (args.parameters) executeQueryRequest.parameters = args.parameters;
if (this[sessionTxSettingsSymbol] && !this[sessionTxIdSymbol])
executeQueryRequest.txControl = {beginTx: this[sessionTxSettingsSymbol], commitTx: false};
else if (opts.txControl)
executeQueryRequest.txControl = opts.txControl;
else if (args.txControl)
executeQueryRequest.txControl = args.txControl;
if (this[sessionTxIdSymbol])
(executeQueryRequest.txControl || (executeQueryRequest.txControl = {})).txId = this[sessionTxIdSymbol];
executeQueryRequest.concurrentResultSets = opts.concurrentResultSets ?? false;
if (opts.hasOwnProperty('idempotent')) {
executeQueryRequest.concurrentResultSets = args.concurrentResultSets ?? false;
if (args.hasOwnProperty('idempotent')) {
if (this[isIdempotentDoLevelSymbol]) throw new Error('The attribute of idempotency is already set at the level of do()');
if (opts.idempotent) this[isIdempotentSymbol] = true;
if (args.idempotent) this[isIdempotentSymbol] = true;
}

// Run the operation
Expand Down Expand Up @@ -197,13 +199,13 @@ export function execute(this: QuerySession, opts: {
let resultSetTuple = resultSetByIndex[index];
if (!resultSetTuple) {
iterator = buildAsyncQueueIterator<Ydb.IValue>();
switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], args.rowMode ?? RowType.Native, iterator);
break;
default: // Native
const nativeColumnsNames = (partialResp.resultSet!.columns as IColumn[]).map(v => snakeToCamelCaseConversion.ydbToJs(v.name!));
resultSet = new ResultSet(index, nativeColumnsNames, opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, nativeColumnsNames, args.rowMode ?? RowType.Native, iterator);
resultSet[resultsetYdbColumnsSymbol] = partialResp.resultSet!.columns as IColumn[];
}
resultSetIterator.push(resultSet);
Expand All @@ -216,7 +218,7 @@ export function execute(this: QuerySession, opts: {
[iterator, resultSet] = resultSetTuple;
}

switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
for (const row of partialResp.resultSet!.rows!) iterator.push(row);
break;
Expand Down
75 changes: 31 additions & 44 deletions src/topic/topic-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import EventEmitter from "events";
import {InternalWriteStreamInitArgs} from "./internal/internal-topic-write-stream";
import {InternalReadStreamInitArgs} from "./internal/internal-topic-read-stream";
import {TopicWriter} from "./topic-writer";
Expand All @@ -11,19 +10,19 @@ import {InternalTopicClient} from "./internal/internal-topic-client";

// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation

export type CommitOffsetArgs = {
export type ICommitOffsetArgs = {
path: (string|null);
partitionId?: (number|Long|null);
consumer: (string|null);
offset: (number|Long|null);
}
export type UpdateOffsetsInTransactionArgs = {
export type IUpdateOffsetsInTransactionArgs = {
operationParams?: (Ydb.Operations.IOperationParams|null);
tx?: (Ydb.Topic.ITransactionIdentity|null);
topics: Ydb.Topic.UpdateOffsetsInTransactionRequest.ITopicOffsets[];
consumer: string;
}
export type CreateTopicArgs = {
export type ICreateTopicArgs = {
path: (string|null);
partitioningSettings?: ({
minActivePartitions?: (number|Long|null);
Expand Down Expand Up @@ -58,15 +57,15 @@ export type CreateTopicArgs = {
}[]|null);
meteringMode?: (Ydb.Topic.MeteringMode|null); // UNSPECIFIED, RESERVED_CAPACITY, REQUEST_UNITS
};
export type DescribeTopicArgs = {
export type IDescribeTopicArgs = {
path: string;
includeStats?: (boolean|null);
}
export type DescribeConsumerArgs = {
export type IDescribeConsumerArgs = {
path: string;
consumer: string;
}
export type AlterTopicArgs = {
export type IAlterTopicArgs = {
path: string;
alterPartitioningSettings?: ({
setMinActivePartitions?: (number|Long|null);
Expand Down Expand Up @@ -112,11 +111,11 @@ export type AlterTopicArgs = {
setMeteringMode?: (Ydb.Topic.MeteringMode|null); // UNSPECIFIED, RESERVED_CAPACITY, REQUEST_UNITS

};
export type DropTopicArgs = {
export type IDropTopicArgs = {
path: string;
};

export type OperationResult = {
export type IOperationResult = {
readonly operation?: ({
readonly id?: (string|null);
readonly ready?: (boolean|null);
Expand All @@ -128,11 +127,10 @@ export type OperationResult = {
}|null);
};

export class TopicClient extends EventEmitter { // TODO: Reconsider why I need to have EventEmitter in any client
export class TopicClient {
private service?: InternalTopicClient;

constructor(private settings: IClientSettings) {
super();
}

/**
Expand All @@ -149,7 +147,7 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
public destroy(_ctx: Context): void;
@ensureContext(true)
public async destroy(_ctx: Context): Promise<void> {
// if (this.service) await this.service.destroy(); // TODO: service should be destroyed at the end
// TODO: Close opened readers and writers
}

// @ts-ignore
Expand All @@ -172,14 +170,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
// TODO: Add commit a queue - same as in writer, to confirm commits

// @ts-ignore
public commitOffset(request: CommitOffsetArgs): Promise<OperationResult>;
public commitOffset(ctx: Context, request: CommitOffsetArgs): Promise<OperationResult>;
public commitOffset(request: ICommitOffsetArgs): Promise<IOperationResult>;
public commitOffset(ctx: Context, request: ICommitOffsetArgs): Promise<IOperationResult>;
@ensureContext(true)
// TODO: Add retryer
public async commitOffset(ctx: Context, request: CommitOffsetArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
// if (!(typeof request.consumer === 'string' && request.consumer!.length > 0)) throw new Error('consumer is required');
// if (!(typeof request.offset !== undefined && request.offset !== null)) throw new Error('offset is required');
public async commitOffset(ctx: Context, request: ICommitOffsetArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).commitOffset(ctx, request);
Expand All @@ -188,12 +182,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public updateOffsetsInTransaction(request: UpdateOffsetsInTransactionArgs): Promise<OperationResult>;
public updateOffsetsInTransaction(ctx: Context, request: UpdateOffsetsInTransactionArgs): Promise<OperationResult>;
public updateOffsetsInTransaction(request: IUpdateOffsetsInTransactionArgs): Promise<IOperationResult>;
public updateOffsetsInTransaction(ctx: Context, request: IUpdateOffsetsInTransactionArgs): Promise<IOperationResult>;
@ensureContext(true)
public async updateOffsetsInTransaction(ctx: Context, request: UpdateOffsetsInTransactionArgs): Promise<OperationResult> {
// if (!(request.topics && request.topics.length > 0)) throw new Error('topics is required');
// if (!(typeof request.consumer === 'string' && request.consumer!.length > 0)) throw new Error('consumer is required');
public async updateOffsetsInTransaction(ctx: Context, request: IUpdateOffsetsInTransactionArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).updateOffsetsInTransaction(ctx, request);
Expand All @@ -202,11 +194,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public createTopic(request: CreateTopicArgs): Promise<OperationResult>;
public createTopic(ctx: Context, request: CreateTopicArgs): Promise<OperationResult>;
public createTopic(request: ICreateTopicArgs): Promise<IOperationResult>;
public createTopic(ctx: Context, request: ICreateTopicArgs): Promise<IOperationResult>;
@ensureContext(true)
public async createTopic(ctx: Context, request: CreateTopicArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
public async createTopic(ctx: Context, request: ICreateTopicArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).createTopic(ctx, request);
Expand All @@ -215,11 +206,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public describeTopic(request: DescribeTopicArgs): Promise<OperationResult>;
public describeTopic(ctx: Context, request: DescribeTopicArgs): Promise<OperationResult>;
public describeTopic(request: IDescribeTopicArgs): Promise<IOperationResult>;
public describeTopic(ctx: Context, request: IDescribeTopicArgs): Promise<IOperationResult>;
@ensureContext(true)
public async describeTopic(ctx: Context, request: DescribeTopicArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
public async describeTopic(ctx: Context, request: IDescribeTopicArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).describeTopic(ctx, request);
Expand All @@ -228,11 +218,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public describeConsumer(request: DescribeConsumerArgs): Promise<OperationResult>;
public describeConsumer(ctx: Context, request: DescribeConsumerArgs): Promise<OperationResult>;
public describeConsumer(request: IDescribeConsumerArgs): Promise<IOperationResult>;
public describeConsumer(ctx: Context, request: IDescribeConsumerArgs): Promise<IOperationResult>;
@ensureContext(true)
public async describeConsumer(ctx: Context, request: DescribeConsumerArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
public async describeConsumer(ctx: Context, request: IDescribeConsumerArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).describeConsumer(ctx, request);
Expand All @@ -241,11 +230,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public alterTopic(request: AlterTopicArgs): Promise<OperationResult>;
public alterTopic(ctx: Context, request: AlterTopicArgs): Promise<OperationResult>;
public alterTopic(request: IAlterTopicArgs): Promise<IOperationResult>;
public alterTopic(ctx: Context, request: IAlterTopicArgs): Promise<IOperationResult>;
@ensureContext(true)
public async alterTopic(ctx: Context, request: AlterTopicArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
public async alterTopic(ctx: Context, request: IAlterTopicArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return /*await*/ asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).alterTopic(ctx, request);
Expand All @@ -254,11 +242,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t
}

// @ts-ignore
public dropTopic(request: DropTopicArgs): Promise<OperationResult>;
public dropTopic(ctx: Context, request: DropTopicArgs): Promise<OperationResult>;
public dropTopic(request: IDropTopicArgs): Promise<IOperationResult>;
public dropTopic(ctx: Context, request: IDropTopicArgs): Promise<IOperationResult>;
@ensureContext(true)
public async dropTopic(ctx: Context, request: DropTopicArgs): Promise<OperationResult> {
// if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required');
public async dropTopic(ctx: Context, request: IDropTopicArgs): Promise<IOperationResult> {
return this.settings.retrier.retry(ctx, /*async*/ () => {
return asIdempotentRetryableLambda(async () => {
return /*await*/ (await this.nextNodeService()).dropTopic(ctx, request);
Expand Down
12 changes: 2 additions & 10 deletions src/topic/topic-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,7 @@ import {closeSymbol} from "./symbols";
import {google, Ydb} from "ydb-sdk-proto";
import Long from "long";

type IReadResponseFields = Omit<Ydb.Topic.StreamReadMessage.IReadResponse, 'partitionData'>;
type IDataFields = Omit<Ydb.Topic.StreamReadMessage.ReadResponse.IPartitionData, 'batches'>;
type IBatchFields = Omit<Ydb.Topic.StreamReadMessage.ReadResponse.IBatch, 'messageData'>;

export class Message implements
IReadResponseFields,
IDataFields,
IBatchFields,
Ydb.Topic.StreamReadMessage.ReadResponse.IMessageData {
export class Message {
// from IReadResponse
bytesSize?: number | Long | null;

Expand Down Expand Up @@ -271,7 +263,7 @@ export class TopicReader {
}
});

this.innerReadStream.readRequest(ctx,{
this.innerReadStream.readRequest(ctx, {
bytesSize: this.readStreamArgs.receiveBufferSizeInBytes,
});
}
Expand Down
Loading

0 comments on commit 6608df0

Please sign in to comment.