Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic write retrier, test reader and continues reauth #400

Merged
merged 25 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.dev.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
YDB_ANONYMOUS_CREDENTIALS=1
YDB_SSL_ROOT_CERTIFICATES_FILE=../slo-tests/playground/data/ydb_certs/ca.pem
YDB_ENDPOINT=grpc://<localhost / fqdn / ip-address>:<2135 / 2136>
YDB_ENDPOINT=grpc://localhost:2136
YDB_LOG_LEVEL=debug
YDB_DETAILED_TRACE_STACK=true
1 change: 1 addition & 0 deletions examples/topic-service-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Due to a problem with a reference to json - TEMPORARY example is as md-file
70 changes: 70 additions & 0 deletions examples/topic-service-example/index.ts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {Driver as YDB} from '../../src';
import {AnonymousAuthService} from "../../src/credentials/anonymous-auth-service";
import {Ydb} from "ydb-sdk-proto";
import {SimpleLogger} from "../../src/logger/simple-logger";
import {Context} from "../../src/context";

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function main() {
const db = new YDB({
endpoint: ENDPOINT,
database: DATABASE,
authService: new AnonymousAuthService(),
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
await db.topic.createTopic({
path: 'demoTopic',
consumers: [{
name: 'demo',
}],
});
const writer = await db.topic.createWriter({
path: 'demoTopic',
// producerId: '...', // will be genereted automatically
// messageGroupId: '...' // will be the same as producerId
getLastSeqNo: true, // seqNo will be assigned automatically
});
await writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from('Hello, world'),
uncompressedSize: 'Hello, world'.length,
}],
});
const promises = [];
for (let n = 0; n < 4; n++) {
// ((writer as any).innerWriteStream as TopicWriteStreamWithEvents).close(Context.createNew().ctx, new Error('Fake error'));

// await sleep(3000); // TODO:

promises.push(writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from(`Message N${n}`),
uncompressedSize: `Message N${n}`.length,
}],
}));
}
await Promise.all(promises);
const reader = await db.topic.createReader(Context.createNew({
timeout: 3000,
}).ctx, {
topicsReadSettings: [{
path: 'demoTopic',
}],
consumer: 'demo',
receiveBufferSizeInBytes: 10_000_000,
});
for await (const message of reader.messages) {
console.info(`Message: ${message.data!.toString()}`);
await message.commit();
}
await reader.close(); // graceful close() - complete when all messages are commited
}

main();
1 change: 1 addition & 0 deletions jest.config.dev.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ module.exports = {
},
testRegex: '(/__tests__/.*|(\\.|/)(test|spec))\\.tsx?$',
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
noStackTrace: true,
}
6 changes: 3 additions & 3 deletions slo-workload/DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ in the _slo-workload_ folder

### Create the test database

`npx ts-node src/index.ts create grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md create grpcs://localhost:2135 local`

### Run the test - for 5 min

`npx ts-node src/index.ts run grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md run grpcs://localhost:2135 local`

### Clean the baseClean the base

`npx ts-node src/index.ts cleanup grpcs://localhost:2135 local`
`npx ts-node src/index.ts.md cleanup grpcs://localhost:2135 local`

### What to do in case of problems

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {initDriver, destroyDriver} from "../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
Expand Down
4 changes: 4 additions & 0 deletions src/__tests__/e2e/query-service/method-execute.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
Expand All @@ -10,6 +11,8 @@ import {Context} from "../../../context";
import {ctxSymbol} from "../../../query/symbols";
import StatsMode = Ydb.Query.StatsMode;
import ExecMode = Ydb.Query.ExecMode;
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -238,6 +241,7 @@ describe('Query.execute()', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/query-service/query-service-client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from "../../../driver";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import * as errors from "../../../errors";
Expand Down
4 changes: 4 additions & 0 deletions src/__tests__/e2e/query-service/rows-conversion.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import DiscoveryService from "../../../discovery/discovery-service";
import {QuerySession, RowType} from "../../../query";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
Expand All @@ -8,6 +9,8 @@ import {Ydb} from "ydb-sdk-proto";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {ctxSymbol} from "../../../query/symbols";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -155,6 +158,7 @@ describe('Rows conversion', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,
});

Expand Down
5 changes: 4 additions & 1 deletion src/__tests__/e2e/query-service/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
Expand All @@ -7,6 +8,8 @@ import * as symbols from "../../../query/symbols";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {ctxSymbol} from "../../../query/symbols";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

Expand Down Expand Up @@ -122,8 +125,8 @@ describe('Query service transactions', () => {
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
retrier: new RetryStrategy(new RetryParameters(), logger),
logger,

});

await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
Expand Down
19 changes: 12 additions & 7 deletions src/__tests__/e2e/retries.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
Unavailable,
Undetermined,
YdbError,
// ExternalError // TODO: Add test for this error
} from '../../errors';
import {retryable, RetryParameters} from '../../retries_obsoleted';
import {Endpoint} from "../../discovery";
Expand All @@ -26,12 +27,14 @@ import {LogLevel, SimpleLogger} from "../../logger/simple-logger";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const MAX_RETRIES = 3;

const logger = new SimpleLogger({level: LogLevel.error});
class ErrorThrower {
constructor(public endpoint: Endpoint) {}

@retryable(
new RetryParameters({maxRetries: 3, backoffCeiling: 3, backoffSlotDuration: 5}),
new RetryParameters({maxRetries: MAX_RETRIES, backoffCeiling: 3, backoffSlotDuration: 5}),
logger,
)
@pessimizable
Expand All @@ -40,6 +43,7 @@ class ErrorThrower {
}
}

// TODO: Remake for new retry policy - no attempts limit, only optional timeout
describe('Retries on errors', () => {
let driver: Driver;

Expand Down Expand Up @@ -71,20 +75,21 @@ describe('Retries on errors', () => {

createError(BadRequest);
createError(InternalError);
createError(Aborted, 3); // have retries
createError(Aborted, MAX_RETRIES); // have retries
createError(Unauthenticated);
createError(Unauthorized);
createError(Unavailable, 3); // have retries
createError(Unavailable, MAX_RETRIES); // have retries
createError(Undetermined); // TODO: have retries for idempotent queries
createError(Overloaded, 3); // have retries
// createError(ExternalError); // TODO: have retries for idempotent queries
createError(Overloaded, MAX_RETRIES); // have retries
createError(SchemeError);
createError(GenericError);
createError(Timeout); // TODO: have retries for idempotent queries
createError(BadSession); // WHY?
createError(PreconditionFailed);
// Transport/Client errors
createError(TransportUnavailable, 3); // TODO: have retries for idempotent queries, BUT now always have retries
createError(ClientResourceExhausted, 3);
createError(ClientDeadlineExceeded, 3);
createError(TransportUnavailable, MAX_RETRIES); // TODO: have retries for idempotent queries, BUT now always have retries
createError(ClientResourceExhausted, MAX_RETRIES);
createError(ClientDeadlineExceeded, MAX_RETRIES);
// TODO: Add EXTERNAL ERROR
});
3 changes: 2 additions & 1 deletion src/__tests__/e2e/table-service/alter-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import { Types } from '../../../types';
import {
Expand Down Expand Up @@ -128,7 +129,7 @@ describe('Alter table', () => {
alterTableDescription.addIndexes = [idxOverTestBool];

await session.alterTable(TABLE_NAME, alterTableDescription);
await new Promise((resolve) => setTimeout(resolve, 200)); // wait 200ms
await new Promise((resolve) => setTimeout(resolve, 1000)); // wait 1000ms
const alteredTableDescription = await session.describeTable(TABLE_NAME);

expect(JSON.stringify(alteredTableDescription.indexes)).toBe(
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/bulk-upsert.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {Ydb} from 'ydb-sdk-proto';
import Driver from '../../../driver';
import {TableSession} from "../../../table";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {declareType, TypedData, Types} from '../../../types';
import {withRetries} from '../../../retries_obsoleted';
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/create-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedValues, Types} from '../../../types';
import Long from 'long';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import http from 'http';
import Driver from "../../../driver";

Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/read-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedValues, TypedData} from '../../../types';
import {ReadTableSettings, TableSession} from "../../../table";
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/scan-query.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../../driver';
import {TypedData} from '../../../types';
import {TableSession} from "../../../table";
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/e2e/table-service/types.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Long from 'long';
import {google, Ydb} from 'ydb-sdk-proto';
import Driver from '../../../driver';
Expand Down
Loading
Loading