Skip to content

Commit

Permalink
feat(server-core): Introduce CUBEJS_REFRESH_WORKER_CONCURRENCY env an…
Browse files Browse the repository at this point in the history
…d update default concurrency settings for drivers (#9168)

* add CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID

and deprecate CUBEJS_SCHEDULED_REFRESH_CONCURRENCY

* increase DefaultConcurrency settings for drivers

* use refreshWorkerConcurrency only for refresh workers

* correct warn flow for refreshConcurrency

* correct queueOptionsWrapper flow

* fix unit tests

* add tests for CUBEJS_REFRESH_WORKER_CONCURRENCY
  • Loading branch information
KSDaemon authored Feb 7, 2025
1 parent ae5d977 commit 7ef6282
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 19 deletions.
2 changes: 1 addition & 1 deletion packages/cubejs-athena-driver/src/AthenaDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 5;
return 10;
}

private config: AthenaDriverOptionsInitialized;
Expand Down
27 changes: 27 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,33 @@ const variables: Record<string, (...args: any) => any> = {
// It's true by default for development
return process.env.NODE_ENV !== 'production';
},
scheduledRefreshQueriesPerAppId: () => {
const refreshQueries = get('CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID').asIntPositive();

if (refreshQueries) {
return refreshQueries;
}

const refreshConcurrency = get('CUBEJS_SCHEDULED_REFRESH_CONCURRENCY').asIntPositive();

if (refreshConcurrency) {
console.warn(
'The CUBEJS_SCHEDULED_REFRESH_CONCURRENCY is deprecated. Please, use the CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID instead.'
);
}

return refreshConcurrency;
},
refreshWorkerConcurrency: () => get('CUBEJS_REFRESH_WORKER_CONCURRENCY')
.asIntPositive(),
// eslint-disable-next-line consistent-return
scheduledRefreshTimezones: () => {
const timezones = get('CUBEJS_SCHEDULED_REFRESH_TIMEZONES').asString();

if (timezones) {
return timezones.split(',').map(t => t.trim());
}
},
preAggregationsBuilder: () => get('CUBEJS_PRE_AGGREGATIONS_BUILDER').asBool(),
gracefulShutdown: () => get('CUBEJS_GRACEFUL_SHUTDOWN')
.asIntPositive(),
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 5;
return 10;
}

// ClickHouseClient has internal pool of several sockets, no need for generic-pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export class DatabricksDriver extends JDBCDriver {
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 2;
return 10;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-firebolt-driver/src/FireboltDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class FireboltDriver extends BaseDriver implements DriverInterface {
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 5;
return 10;
}

private config: FireboltDriverConfiguration;
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-pinot-driver/src/PinotDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class PinotDriver extends BaseDriver implements DriverInterface {
* Returns default concurrency value.
*/
public static getDefaultConcurrency() {
return 2;
return 10;
}

private config: PinotDriverConfiguration;
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-redshift-driver/src/RedshiftDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class RedshiftDriver extends PostgresDriver<RedshiftDriverConfiguration>
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 4;
return 5;
}

/**
Expand Down
20 changes: 14 additions & 6 deletions packages/cubejs-server-core/src/core/OptsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ export class OptsHandler {
private queueOptionsWrapper(
context: RequestContext,
queueOptions: unknown | ((dataSource?: string) => QueueOptions),
queueType: 'query' | 'pre-aggs',
): (dataSource?: string) => Promise<QueueOptions> {
return async (dataSource = 'default') => {
const options = (
Expand All @@ -298,6 +299,14 @@ export class OptsHandler {
// concurrency specified in cube.js
return options;
} else {
const workerConcurrency = getEnv('refreshWorkerConcurrency');
if (queueType === 'pre-aggs' && workerConcurrency) {
return {
...options,
concurrency: workerConcurrency,
};
}

const envConcurrency: number = getEnv('concurrency', { dataSource });
if (envConcurrency) {
// concurrency specified in CUBEJS_CONCURRENCY
Expand All @@ -320,7 +329,7 @@ export class OptsHandler {
// no specified concurrency
return {
...options,
concurrency: 2,
concurrency: 5,
};
}
}
Expand Down Expand Up @@ -453,15 +462,12 @@ export class OptsHandler {
externalDialectFactory,
apiSecret: process.env.CUBEJS_API_SECRET,
telemetry: getEnv('telemetry'),
scheduledRefreshTimeZones:
process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES &&
process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES.split(',').map(t => t.trim()),
scheduledRefreshTimeZones: getEnv('scheduledRefreshTimezones'),
scheduledRefreshContexts: async () => [null],
basePath: '/cubejs-api',
dashboardAppPath: 'dashboard-app',
dashboardAppPort: 3000,
scheduledRefreshConcurrency:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10),
scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'),
scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'),
preAggregationsSchema:
getEnv('preAggregationsSchema') ||
Expand Down Expand Up @@ -662,13 +668,15 @@ export class OptsHandler {
clone.queryCacheOptions.queueOptions = this.queueOptionsWrapper(
context,
clone.queryCacheOptions.queueOptions,
'query'
);

// pre-aggs queue options
clone.preAggregationsOptions = clone.preAggregationsOptions || {};
clone.preAggregationsOptions.queueOptions = this.queueOptionsWrapper(
context,
clone.preAggregationsOptions.queueOptions,
'pre-aggs'
);

// pre-aggs external refresh flag (force to run pre-aggs build flow first if
Expand Down
47 changes: 41 additions & 6 deletions packages/cubejs-server-core/test/unit/OptsHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,13 @@ describe('OptsHandler class', () => {
expect(opts.queryCacheOptions.queueOptions).toBeDefined();
expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function');
expect(await opts.queryCacheOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});

expect(opts.preAggregationsOptions.queueOptions).toBeDefined();
expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function');
expect(await opts.preAggregationsOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});
}
);
Expand All @@ -555,13 +555,13 @@ describe('OptsHandler class', () => {
expect(opts.queryCacheOptions.queueOptions).toBeDefined();
expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function');
expect(await opts.queryCacheOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});

expect(opts.preAggregationsOptions.queueOptions).toBeDefined();
expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function');
expect(await opts.preAggregationsOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});
}
);
Expand All @@ -585,13 +585,13 @@ describe('OptsHandler class', () => {
expect(opts.queryCacheOptions.queueOptions).toBeDefined();
expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function');
expect(await opts.queryCacheOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});

expect(opts.preAggregationsOptions.queueOptions).toBeDefined();
expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function');
expect(await opts.preAggregationsOptions.queueOptions()).toEqual({
concurrency: 2,
concurrency: 5,
});
}
);
Expand Down Expand Up @@ -658,6 +658,41 @@ describe('OptsHandler class', () => {
}
);

test(
'must configure queueOptions with empty orchestratorOptions function, ' +
'with CUBEJS_REFRESH_WORKER_CONCURRENCY, CUBEJS_CONCURRENCY and with default driver concurrency',
async () => {
process.env.CUBEJS_CONCURRENCY = '11';
process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY = '22';
process.env.CUBEJS_DB_TYPE = 'postgres';

const core = new CubejsServerCoreExposed({
...conf,
dbType: undefined,
driverFactory: () => ({ type: <DatabaseType>process.env.CUBEJS_DB_TYPE }),
orchestratorOptions: () => ({}),
});

const opts = (<any> await core.getOrchestratorApi(<RequestContext>{})).options;

expect(opts.queryCacheOptions.queueOptions).toBeDefined();
expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function');
expect(await opts.queryCacheOptions.queueOptions()).toEqual({
concurrency: parseInt(process.env.CUBEJS_CONCURRENCY, 10),
});

expect(opts.preAggregationsOptions.queueOptions).toBeDefined();
expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function');
expect(await opts.preAggregationsOptions.queueOptions()).toEqual({
concurrency: parseInt(process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY, 10),
});

delete process.env.CUBEJS_CONCURRENCY;
delete process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY;
delete process.env.CUBEJS_DB_TYPE;
}
);

test(
'multi data source concurrency',
async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
* Returns default concurrency value.
*/
public static getDefaultConcurrency(): number {
return 5;
return 8;
}

public static driverEnvVariables() {
Expand Down

0 comments on commit 7ef6282

Please sign in to comment.