diff --git a/packages/cubejs-athena-driver/src/AthenaDriver.ts b/packages/cubejs-athena-driver/src/AthenaDriver.ts index fe60a1dd71722..aaea16c04a275 100644 --- a/packages/cubejs-athena-driver/src/AthenaDriver.ts +++ b/packages/cubejs-athena-driver/src/AthenaDriver.ts @@ -76,7 +76,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } private config: AthenaDriverOptionsInitialized; diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index cc5417eac04ab..2f8760f412c35 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -170,6 +170,33 @@ const variables: Record any> = { // It's true by default for development return process.env.NODE_ENV !== 'production'; }, + scheduledRefreshQueriesPerAppId: () => { + const refreshQueries = get('CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID').asIntPositive(); + + if (refreshQueries) { + return refreshQueries; + } + + const refreshConcurrency = get('CUBEJS_SCHEDULED_REFRESH_CONCURRENCY').asIntPositive(); + + if (refreshConcurrency) { + console.warn( + 'The CUBEJS_SCHEDULED_REFRESH_CONCURRENCY is deprecated. Please, use the CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID instead.' + ); + } + + return refreshConcurrency; + }, + refreshWorkerConcurrency: () => get('CUBEJS_REFRESH_WORKER_CONCURRENCY') + .asIntPositive(), + // eslint-disable-next-line consistent-return + scheduledRefreshTimezones: () => { + const timezones = get('CUBEJS_SCHEDULED_REFRESH_TIMEZONES').asString(); + + if (timezones) { + return timezones.split(',').map(t => t.trim()); + } + }, preAggregationsBuilder: () => get('CUBEJS_PRE_AGGREGATIONS_BUILDER').asBool(), gracefulShutdown: () => get('CUBEJS_GRACEFUL_SHUTDOWN') .asIntPositive(), diff --git a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts index 6ef8ba7cd93ad..19a2f037c2e91 100644 --- a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts +++ b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts @@ -119,7 +119,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } // ClickHouseClient has internal pool of several sockets, no need for generic-pool diff --git a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts index d3754207a3d07..d79ca9a0ec1d7 100644 --- a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts +++ b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts @@ -147,7 +147,7 @@ export class DatabricksDriver extends JDBCDriver { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 2; + return 10; } /** diff --git a/packages/cubejs-firebolt-driver/src/FireboltDriver.ts b/packages/cubejs-firebolt-driver/src/FireboltDriver.ts index 10ce708a2d5d8..a9ccc9e4161b3 100644 --- a/packages/cubejs-firebolt-driver/src/FireboltDriver.ts +++ b/packages/cubejs-firebolt-driver/src/FireboltDriver.ts @@ -46,7 +46,7 @@ export class FireboltDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 10; } private config: FireboltDriverConfiguration; diff --git a/packages/cubejs-pinot-driver/src/PinotDriver.ts b/packages/cubejs-pinot-driver/src/PinotDriver.ts index c9f96de3f02b6..be02426001acf 100644 --- a/packages/cubejs-pinot-driver/src/PinotDriver.ts +++ b/packages/cubejs-pinot-driver/src/PinotDriver.ts @@ -74,7 +74,7 @@ export class PinotDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency() { - return 2; + return 10; } private config: PinotDriverConfiguration; diff --git a/packages/cubejs-redshift-driver/src/RedshiftDriver.ts b/packages/cubejs-redshift-driver/src/RedshiftDriver.ts index ecc39bd31e167..5ce9870cd331d 100644 --- a/packages/cubejs-redshift-driver/src/RedshiftDriver.ts +++ b/packages/cubejs-redshift-driver/src/RedshiftDriver.ts @@ -57,7 +57,7 @@ export class RedshiftDriver extends PostgresDriver * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 4; + return 5; } /** diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index 55731d1aa0c9b..c0d79990d945a 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -287,6 +287,7 @@ export class OptsHandler { private queueOptionsWrapper( context: RequestContext, queueOptions: unknown | ((dataSource?: string) => QueueOptions), + queueType: 'query' | 'pre-aggs', ): (dataSource?: string) => Promise { return async (dataSource = 'default') => { const options = ( @@ -298,6 +299,14 @@ export class OptsHandler { // concurrency specified in cube.js return options; } else { + const workerConcurrency = getEnv('refreshWorkerConcurrency'); + if (queueType === 'pre-aggs' && workerConcurrency) { + return { + ...options, + concurrency: workerConcurrency, + }; + } + const envConcurrency: number = getEnv('concurrency', { dataSource }); if (envConcurrency) { // concurrency specified in CUBEJS_CONCURRENCY @@ -320,7 +329,7 @@ export class OptsHandler { // no specified concurrency return { ...options, - concurrency: 2, + concurrency: 5, }; } } @@ -453,15 +462,12 @@ export class OptsHandler { externalDialectFactory, apiSecret: process.env.CUBEJS_API_SECRET, telemetry: getEnv('telemetry'), - scheduledRefreshTimeZones: - process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES && - process.env.CUBEJS_SCHEDULED_REFRESH_TIMEZONES.split(',').map(t => t.trim()), + scheduledRefreshTimeZones: getEnv('scheduledRefreshTimezones'), scheduledRefreshContexts: async () => [null], basePath: '/cubejs-api', dashboardAppPath: 'dashboard-app', dashboardAppPort: 3000, - scheduledRefreshConcurrency: - parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10), + scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'), scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'), preAggregationsSchema: getEnv('preAggregationsSchema') || @@ -662,6 +668,7 @@ export class OptsHandler { clone.queryCacheOptions.queueOptions = this.queueOptionsWrapper( context, clone.queryCacheOptions.queueOptions, + 'query' ); // pre-aggs queue options @@ -669,6 +676,7 @@ export class OptsHandler { clone.preAggregationsOptions.queueOptions = this.queueOptionsWrapper( context, clone.preAggregationsOptions.queueOptions, + 'pre-aggs' ); // pre-aggs external refresh flag (force to run pre-aggs build flow first if diff --git a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts index 3d37757679c8b..306b810192ec0 100644 --- a/packages/cubejs-server-core/test/unit/OptsHandler.test.ts +++ b/packages/cubejs-server-core/test/unit/OptsHandler.test.ts @@ -525,13 +525,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); @@ -555,13 +555,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); @@ -585,13 +585,13 @@ describe('OptsHandler class', () => { expect(opts.queryCacheOptions.queueOptions).toBeDefined(); expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); expect(await opts.queryCacheOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ - concurrency: 2, + concurrency: 5, }); } ); @@ -658,6 +658,41 @@ describe('OptsHandler class', () => { } ); + test( + 'must configure queueOptions with empty orchestratorOptions function, ' + + 'with CUBEJS_REFRESH_WORKER_CONCURRENCY, CUBEJS_CONCURRENCY and with default driver concurrency', + async () => { + process.env.CUBEJS_CONCURRENCY = '11'; + process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY = '22'; + process.env.CUBEJS_DB_TYPE = 'postgres'; + + const core = new CubejsServerCoreExposed({ + ...conf, + dbType: undefined, + driverFactory: () => ({ type: process.env.CUBEJS_DB_TYPE }), + orchestratorOptions: () => ({}), + }); + + const opts = ( await core.getOrchestratorApi({})).options; + + expect(opts.queryCacheOptions.queueOptions).toBeDefined(); + expect(typeof opts.queryCacheOptions.queueOptions).toEqual('function'); + expect(await opts.queryCacheOptions.queueOptions()).toEqual({ + concurrency: parseInt(process.env.CUBEJS_CONCURRENCY, 10), + }); + + expect(opts.preAggregationsOptions.queueOptions).toBeDefined(); + expect(typeof opts.preAggregationsOptions.queueOptions).toEqual('function'); + expect(await opts.preAggregationsOptions.queueOptions()).toEqual({ + concurrency: parseInt(process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY, 10), + }); + + delete process.env.CUBEJS_CONCURRENCY; + delete process.env.CUBEJS_REFRESH_WORKER_CONCURRENCY; + delete process.env.CUBEJS_DB_TYPE; + } + ); + test( 'multi data source concurrency', async () => { diff --git a/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts b/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts index b8422155af569..8561e24b2052d 100644 --- a/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts +++ b/packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts @@ -195,7 +195,7 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface { * Returns default concurrency value. */ public static getDefaultConcurrency(): number { - return 5; + return 8; } public static driverEnvVariables() {