From 7b2ae8f4ce995fac08adc7697c69a90432641007 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 20 Feb 2025 22:46:06 +0200 Subject: [PATCH 01/14] some typos and refactoring --- .../src/orchestrator/PreAggregations.ts | 11 ++++++----- .../src/orchestrator/QueryCache.ts | 4 ++-- .../cubejs-server-core/src/core/OrchestratorApi.ts | 8 ++++---- .../cubejs-server-core/src/core/RefreshScheduler.ts | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 4c858c3b7d32e..3ec4c61580cd4 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -8,7 +8,9 @@ import { FROM_PARTITION_RANGE, getEnv, inDbTimeZone, - MAX_SOURCE_ROW_LIMIT, MaybeCancelablePromise, reformatInIsoLocal, + MAX_SOURCE_ROW_LIMIT, + MaybeCancelablePromise, + reformatInIsoLocal, timeSeries, TO_PARTITION_RANGE, utcToLocalTimeZone, @@ -504,7 +506,7 @@ export class PreAggregationLoader { private preAggregationsTablesToTempTables: any; /** - * Determines whether current instance instantiated for a jobed build query + * Determines whether current instance instantiated for a jobbed build query * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or * not. */ @@ -1505,7 +1507,7 @@ interface PreAggsPartitionRangeLoaderOpts { export class PreAggregationPartitionRangeLoader { /** - * Determines whether current instance instantiated for a jobed build query + * Determines whether current instance instantiated for a jobbed build query * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or * not. */ @@ -2518,8 +2520,7 @@ export class PreAggregations { public async getQueueState(dataSource: string) { const queue = await this.getQueue(dataSource); - const queries = await queue.getQueries(); - return queries; + return queue.getQueries(); } public async cancelQueriesFromQueue(queryKeys: string[], dataSource: string) { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index acac7b568a2d0..337836730a7d1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -96,7 +96,7 @@ export type TempTable = { * definition (stored in the second element) link. */ export type PreAggTableToTempTable = [ - string, // common table name (without sufix) + string, // common table name (without suffix) TempTable, ]; @@ -187,7 +187,7 @@ export class QueryCache { /** * Generates from the `queryBody` the final `sql` query and push it to * the queue. Returns promise which will be resolved by the different - * objects, depend from the original `queryBody` object. For the + * objects, depend on the original `queryBody` object. For the * persistent queries returns the `stream.Writable` instance. * * @throw Error diff --git a/packages/cubejs-server-core/src/core/OrchestratorApi.ts b/packages/cubejs-server-core/src/core/OrchestratorApi.ts index 90ba2fcb2da07..4579923353015 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorApi.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorApi.ts @@ -2,12 +2,12 @@ import * as stream from 'stream'; import pt from 'promise-timeout'; import { - QueryOrchestrator, ContinueWaitError, DriverFactoryByDataSource, DriverType, - QueryOrchestratorOptions, QueryBody, + QueryOrchestrator, + QueryOrchestratorOptions, } from '@cubejs-backend/query-orchestrator'; import { DatabaseType, RequestContext } from './types'; @@ -86,13 +86,13 @@ export class OrchestratorApi { : this.orchestrator.fetchQuery(query); if (query.isJob) { - // We want to immediately resolve and return a jobed build query result + // We want to immediately resolve and return a jobbed build query result // (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) // because the following stack was optimized for such behavior. const job = await fetchQueryPromise; return job; } - + fetchQueryPromise = pt.timeout(fetchQueryPromise, this.continueWaitTimeout * 1000); const data = await fetchQueryPromise; diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index c79365f7d1f54..d6692b583873a 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -147,7 +147,7 @@ export class RefreshScheduler { const orchestratorApi = await this.serverCore.getOrchestratorApi(context); const preAggregationsLoadCacheByDataSource = {}; - // Return a empty array for cases with 2 same pre-aggregations but with different partitionGranularity + // Return an empty array for cases with 2 same pre-aggregations but with different partitionGranularity // Only the most detailed pre-aggregations will be use if (!preAggregationDescription) { return { From 0e11582e9b741879d38b73f73fb7cc063fc11d18 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 21 Feb 2025 18:40:48 +0200 Subject: [PATCH 02/14] rafactor CacheKey type def --- .../src/orchestrator/QueryCache.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 337836730a7d1..63baa253cae4d 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -100,12 +100,11 @@ export type PreAggTableToTempTable = [ TempTable, ]; -export type CacheKey = +export type CacheKey = Array< | string - | [ - query: string | QueryTuple, - options?: string[] - ]; + | string[] + | QueryTuple +>; type CacheEntry = { time: number; From c97cabe94745832736a404fb85603ecb8f6b3920 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 21 Feb 2025 19:38:01 +0200 Subject: [PATCH 03/14] typo --- .../cubejs-query-orchestrator/src/orchestrator/QueryQueue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 5536aa8805884..fee5aa7250fc6 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -236,7 +236,7 @@ export class QueryQueue { throw new Error('Priority should be between -10000 and 10000'); } - // Result here won't be fetched for a forced build query and a jobed build + // Result here won't be fetched for a forced build query and a jobbed build // query (initialized by the /cubejs-system/v1/pre-aggregations/jobs // endpoint). let result = !query.forceBuild && await queueConnection.getResult(queryKey); From 8b9b97b40db4ea5d29bf46fe854dbdd26e2a9c29 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Mon, 24 Feb 2025 18:58:57 +0200 Subject: [PATCH 04/14] refactor imports in time.ts --- packages/cubejs-backend-shared/src/time.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-backend-shared/src/time.ts b/packages/cubejs-backend-shared/src/time.ts index e828fd978fc73..8b239bc283bf9 100644 --- a/packages/cubejs-backend-shared/src/time.ts +++ b/packages/cubejs-backend-shared/src/time.ts @@ -1,9 +1,9 @@ -import { DateRange, extendMoment } from 'moment-range'; -import { unitOfTime } from 'moment-timezone'; +import type { unitOfTime } from 'moment-timezone'; +import type { DateRange } from 'moment-range'; +import Moment from 'moment-timezone'; +import { extendMoment } from 'moment-range'; -const Moment = require('moment-timezone'); - -const moment = extendMoment(Moment); +const moment = extendMoment(Moment as any); export type QueryDateRange = [string, string]; type SqlInterval = string; From 186e2b338efde782bdca5e44c0fa1271bb9ae804 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 25 Feb 2025 20:45:03 +0200 Subject: [PATCH 05/14] just typo --- packages/cubejs-backend-shared/src/promises.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-backend-shared/src/promises.ts b/packages/cubejs-backend-shared/src/promises.ts index 81e9f76909080..d458296060554 100644 --- a/packages/cubejs-backend-shared/src/promises.ts +++ b/packages/cubejs-backend-shared/src/promises.ts @@ -122,7 +122,7 @@ export interface CancelableIntervalOptions { } /** - * It's helps to create an interval that can be canceled with awaiting latest execution + * It helps to create an interval that can be canceled with awaiting latest execution */ export function createCancelableInterval( fn: (token: CancelToken) => Promise, From 33bd00374b601f4868dc9fbce3a811ba6e5b160f Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 25 Feb 2025 20:45:32 +0200 Subject: [PATCH 06/14] remove unused: redisPrefix from PreAggregationLoadCache class --- .../src/orchestrator/PreAggregations.ts | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 3ec4c61580cd4..ab0cb6914fdc1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -255,8 +255,6 @@ type VersionEntriesObj = { }; class PreAggregationLoadCache { - private redisPrefix: string; - private driverFactory: DriverFactory; private queryCache: QueryCache; @@ -286,13 +284,11 @@ class PreAggregationLoadCache { private tablePrefixes: string[] | null; public constructor( - redisPrefix, clientFactory: DriverFactory, queryCache, preAggregations, options: PreAggregationLoadCacheOptions = { dataSource: 'default' } ) { - this.redisPrefix = `${redisPrefix}_${options.dataSource}`; this.dataSource = options.dataSource; this.driverFactory = clientFactory; this.queryCache = queryCache; @@ -2116,7 +2112,6 @@ export class PreAggregations { ): Promise<[boolean, string]> { // fetching tables const loadCache = new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, @@ -2189,7 +2184,6 @@ export class PreAggregations { if (!loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`]) { loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`] = new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, @@ -2307,7 +2301,6 @@ export class PreAggregations { if (!loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`]) { loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`] = new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, @@ -2375,7 +2368,6 @@ export class PreAggregations { preAggregation, preAggregationsTablesToTempTables, new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, @@ -2423,7 +2415,6 @@ export class PreAggregations { requestId } = q; const loadCache = new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, @@ -2485,7 +2476,6 @@ export class PreAggregations { if (!loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`]) { loadCacheByDataSource[`${dataSource}_${preAggregationSchema}`] = new PreAggregationLoadCache( - this.redisPrefix, () => this.driverFactory(dataSource), this.queryCache, this, From 0c38be7fe242b791ccdd1e8d55105ed0543571d6 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Tue, 25 Feb 2025 20:55:35 +0200 Subject: [PATCH 07/14] remove unused: redisPrefix from PreAggregationLoader & PreAggregationPartitionRangeLoader --- .../src/orchestrator/PreAggregations.ts | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index ab0cb6914fdc1..3e9baa32ccfad 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -525,7 +525,6 @@ export class PreAggregationLoader { private externalRefresh: boolean; public constructor( - private readonly redisPrefix: string, private readonly driverFactory: DriverFactory, private readonly logger: any, private readonly queryCache: QueryCache, @@ -588,7 +587,7 @@ export class PreAggregationLoader { : undefined, }; } else { - // Case 3: pre-agg is exists + // Case 3: pre-agg exists const structureVersion = getStructureVersion(this.preAggregation); const getVersionsStarted = new Date(); const { byStructure } = await this.loadCache.getVersionEntries(this.preAggregation); @@ -619,7 +618,7 @@ export class PreAggregationLoader { } if (versionEntryByStructureVersion) { - // this triggers an asyncronous/background load of the pre-aggregation but immediately + // this triggers an asynchronous/background load of the pre-aggregation but immediately // returns the latest data it already has this.loadPreAggregationWithKeys().catch(e => { if (!(e instanceof ContinueWaitError)) { @@ -1520,7 +1519,6 @@ export class PreAggregationPartitionRangeLoader { protected compilerCacheFn: (subKey: string[], cacheFn: () => T) => T; public constructor( - private readonly redisPrefix: string, private readonly driverFactory: DriverFactory, private readonly logger: any, private readonly queryCache: QueryCache, @@ -1684,7 +1682,6 @@ export class PreAggregationPartitionRangeLoader { if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) { const loadPreAggregationsByPartitionRanges = async ({ buildRange, partitionRanges }: PartitionRanges) => { const partitionLoaders = partitionRanges.map(range => new PreAggregationLoader( - this.redisPrefix, this.driverFactory, this.logger, this.queryCache, @@ -1766,7 +1763,6 @@ export class PreAggregationPartitionRangeLoader { }; } else { return new PreAggregationLoader( - this.redisPrefix, this.driverFactory, this.logger, this.queryCache, @@ -2207,7 +2203,6 @@ export class PreAggregations { const preAggregationsTablesToTempTablesPromise = preAggregations.map((p: PreAggregationDescription, i) => (preAggregationsTablesToTempTables) => { const loader = new PreAggregationPartitionRangeLoader( - this.redisPrefix, () => this.driverFactory(p.dataSource || 'default'), this.logger, this.queryCache, @@ -2316,7 +2311,6 @@ export class PreAggregations { const expandedPreAggregations: PreAggregationDescription[][] = await Promise.all(preAggregations.map(p => { const loader = new PreAggregationPartitionRangeLoader( - this.redisPrefix, () => this.driverFactory(p.dataSource || 'default'), this.logger, this.queryCache, @@ -2360,7 +2354,6 @@ export class PreAggregations { preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId, invalidationKeys, buildRangeEnd } = q; const loader = new PreAggregationLoader( - this.redisPrefix, () => this.driverFactory(dataSource), this.logger, this.queryCache, From 797168c4623b8793189687659949c11f7429c8c0 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 26 Feb 2025 11:53:47 +0200 Subject: [PATCH 08/14] refactor PreAggregations: move every class to a separate file --- .../orchestrator/PreAggregationLoadCache.ts | 245 ++++ .../src/orchestrator/PreAggregationLoader.ts | 1045 +++++++++++++++++ .../PreAggregationPartitionRangeLoader.ts | 504 ++++++++ .../src/orchestrator/index.ts | 3 + 4 files changed, 1797 insertions(+) create mode 100644 packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts create mode 100644 packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts create mode 100644 packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts new file mode 100644 index 0000000000000..0c0d0737757f6 --- /dev/null +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -0,0 +1,245 @@ +import { TableStructure } from '@cubejs-backend/base-driver'; +import { DriverFactory } from './DriverFactory'; +import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { + PreAggregationDescription, + PreAggregations, + TableCacheEntry, + tablesToVersionEntries, + VersionEntriesObj, + VersionEntry +} from './PreAggregations'; + +type PreAggregationLoadCacheOptions = { + requestId?: string, + dataSource: string, + tablePrefixes?: string[], +}; + +export class PreAggregationLoadCache { + private driverFactory: DriverFactory; + + private queryCache: QueryCache; + + // eslint-disable-next-line no-use-before-define + private preAggregations: PreAggregations; + + private queryResults: any; + + private externalDriverFactory: any; + + private requestId: any; + + private versionEntries: { [redisKey: string]: Promise }; + + private tables: { [redisKey: string]: TableCacheEntry[] }; + + private tableColumnTypes: { [cacheKey: string]: { [tableName: string]: TableStructure } }; + + // TODO this is in memory cache structure as well however it depends on + // data source only and load cache is per data source for now. + // Make it per data source key in case load cache scope is broaden. + private queryStageState: any; + + private dataSource: string; + + private tablePrefixes: string[] | null; + + public constructor( + clientFactory: DriverFactory, + queryCache, + preAggregations, + options: PreAggregationLoadCacheOptions = { dataSource: 'default' } + ) { + this.dataSource = options.dataSource; + this.driverFactory = clientFactory; + this.queryCache = queryCache; + this.preAggregations = preAggregations; + this.queryResults = {}; + this.externalDriverFactory = preAggregations.externalDriverFactory; + this.requestId = options.requestId; + this.tablePrefixes = options.tablePrefixes; + this.versionEntries = {}; + this.tables = {}; + this.tableColumnTypes = {}; + } + + protected async tablesFromCache(preAggregation, forceRenew?) { + let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation)); + if (!tables) { + tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue( + 'query', + `Fetch tables for ${preAggregation.preAggregationsSchema}`, + { + preAggregation, requestId: this.requestId + }, + 0, + { requestId: this.requestId } + ); + } + return tables; + } + + public async fetchTables(preAggregation: PreAggregationDescription) { + if (preAggregation.external && !this.externalDriverFactory) { + throw new Error('externalDriverFactory is not provided. Please use CUBEJS_DEV_MODE=true or provide Cube Store connection env variables for production usage.'); + } + + const newTables = await this.fetchTablesNoCache(preAggregation); + await this.queryCache.getCacheDriver().set( + this.tablesCachePrefixKey(preAggregation), + newTables, + this.preAggregations.options.preAggregationsSchemaCacheExpire || 60 * 60 + ); + return newTables; + } + + private async fetchTablesNoCache(preAggregation: PreAggregationDescription) { + const client = preAggregation.external ? + await this.externalDriverFactory() : + await this.driverFactory(); + if (this.tablePrefixes && client.getPrefixTablesQuery && this.preAggregations.options.skipExternalCacheAndQueue) { + return client.getPrefixTablesQuery(preAggregation.preAggregationsSchema, this.tablePrefixes); + } + return client.getTablesQuery(preAggregation.preAggregationsSchema); + } + + public tablesCachePrefixKey(preAggregation: PreAggregationDescription) { + return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`); + } + + protected async getTablesQuery(preAggregation) { + const redisKey = this.tablesCachePrefixKey(preAggregation); + if (!this.tables[redisKey]) { + const tables = this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external ? + await this.fetchTablesNoCache(preAggregation) : + await this.tablesFromCache(preAggregation); + if (tables === undefined) { + throw new Error('Pre-aggregation tables are undefined.'); + } + this.tables[redisKey] = tables; + } + return this.tables[redisKey]; + } + + public async getTableColumnTypes(preAggregation: PreAggregationDescription, tableName: string): Promise { + const prefixKey = this.tablesCachePrefixKey(preAggregation); + if (!this.tableColumnTypes[prefixKey]?.[tableName]) { + if (!this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external) { + throw new Error(`Lambda union with source data feature is supported only by external rollups stored in Cube Store but was invoked for '${preAggregation.preAggregationId}'`); + } + const client = await this.externalDriverFactory(); + const columnTypes = await client.tableColumnTypes(tableName); + if (!this.tableColumnTypes[prefixKey]) { + this.tableColumnTypes[prefixKey] = {}; + } + this.tableColumnTypes[prefixKey][tableName] = columnTypes; + } + return this.tableColumnTypes[prefixKey][tableName]; + } + + private async calculateVersionEntries(preAggregation): Promise { + let versionEntries = tablesToVersionEntries( + preAggregation.preAggregationsSchema, + await this.getTablesQuery(preAggregation) + ); + // It presumes strong consistency guarantees for external pre-aggregation tables ingestion + if (!preAggregation.external) { + // eslint-disable-next-line + const [active, toProcess, queries] = await this.fetchQueryStageState(); + const targetTableNamesInQueue = (Object.keys(queries)) + // eslint-disable-next-line no-use-before-define + .map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry)); + + versionEntries = versionEntries.filter( + // eslint-disable-next-line no-use-before-define + e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1 + ); + } + + const byContent: { [key: string]: VersionEntry } = {}; + const byStructure: { [key: string]: VersionEntry } = {}; + const byTableName: { [key: string]: VersionEntry } = {}; + + versionEntries.forEach(e => { + const contentKey = `${e.table_name}_${e.content_version}`; + if (!byContent[contentKey]) { + byContent[contentKey] = e; + } + const structureKey = `${e.table_name}_${e.structure_version}`; + if (!byStructure[structureKey]) { + byStructure[structureKey] = e; + } + if (!byTableName[e.table_name]) { + byTableName[e.table_name] = e; + } + }); + + return { versionEntries, byContent, byStructure, byTableName }; + } + + public async getVersionEntries(preAggregation): Promise { + if (this.tablePrefixes && !this.tablePrefixes.find(p => preAggregation.tableName.split('.')[1].startsWith(p))) { + throw new Error(`Load cache tries to load table ${preAggregation.tableName} outside of tablePrefixes filter: ${this.tablePrefixes.join(', ')}`); + } + const redisKey = this.tablesCachePrefixKey(preAggregation); + if (!this.versionEntries[redisKey]) { + this.versionEntries[redisKey] = this.calculateVersionEntries(preAggregation).catch(e => { + delete this.versionEntries[redisKey]; + throw e; + }); + } + return this.versionEntries[redisKey]; + } + + public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { + const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; + + if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { + this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( + query, + values, + [query, values], + 60 * 60, + { + renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold + || queryOptions?.renewalThreshold || 2 * 60, + renewalKey: [query, values], + waitForRenew, + priority, + requestId: this.requestId, + dataSource: this.dataSource, + useInMemory: true, + external: queryOptions?.external + } + ); + } + return this.queryResults[this.queryCache.queryRedisKey([query, values])]; + } + + public hasKeyQueryResult(keyQuery) { + return !!this.queryResults[this.queryCache.queryRedisKey(keyQuery)]; + } + + public async getQueryStage(stageQueryKey) { + const queue = await this.preAggregations.getQueue(this.dataSource); + await this.fetchQueryStageState(queue); + return queue.getQueryStage(stageQueryKey, undefined, this.queryStageState); + } + + protected async fetchQueryStageState(queue?) { + queue = queue || await this.preAggregations.getQueue(this.dataSource); + if (!this.queryStageState) { + this.queryStageState = await queue.fetchQueryStageState(); + } + return this.queryStageState; + } + + public async reset(preAggregation) { + await this.tablesFromCache(preAggregation, true); + this.tables = {}; + this.tableColumnTypes = {}; + this.queryStageState = undefined; + this.versionEntries = {}; + } +} diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts new file mode 100644 index 0000000000000..191694c1f1d63 --- /dev/null +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -0,0 +1,1045 @@ +import R from 'ramda'; +import { getEnv, MaybeCancelablePromise } from '@cubejs-backend/shared'; +import { + cancelCombinator, + DownloadQueryResultsResult, + DownloadTableData, + DriverCapabilities, + DriverInterface, + isDownloadTableCSVData, + SaveCancelFn, + StreamOptions, + UnloadOptions +} from '@cubejs-backend/base-driver'; +import { DriverFactory } from './DriverFactory'; +import { QueryCache, QueryTuple } from './QueryCache'; +import { ContinueWaitError } from './ContinueWaitError'; +import { LargeStreamWarning } from './StreamObjectsCounter'; +import { + getStructureVersion, + InvalidationKeys, LoadPreAggregationResult, + PreAggregations, + tablesToVersionEntries, + version, VersionEntriesObj, + VersionEntry +} from './PreAggregations'; +import { PreAggregationLoadCache } from './PreAggregationLoadCache'; + +type IndexesSql = { sql: [string, unknown[]], indexName: string }[]; + +type QueryKey = [QueryTuple, IndexesSql, InvalidationKeys] | [QueryTuple, InvalidationKeys]; + +type QueryOptions = { + queryKey: QueryKey; + newVersionEntry: VersionEntry; + query: string; + values: unknown[]; + requestId: string; + buildRangeEnd?: string; +}; + +// There are community developed and custom drivers which not always up-to-date with latest BaseDriver. +// Extra defence for drivers that don't expose now() yet. +function nowTimestamp(client: DriverInterface) { + return client.nowTimestamp?.() ?? new Date().getTime(); +} + +export class PreAggregationLoader { + private preAggregations: PreAggregations; + + public preAggregation: any; + + private preAggregationsTablesToTempTables: any; + + /** + * Determines whether current instance instantiated for a jobbed build query + * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or + * not. + */ + private isJob: boolean; + + private waitForRenew: boolean; + + private forceBuild: boolean; + + private orphanedTimeout: number; + + private externalDriverFactory: DriverFactory; + + private requestId: string; + + private metadata: any; + + private structureVersionPersistTime: any; + + private externalRefresh: boolean; + + public constructor( + private readonly driverFactory: DriverFactory, + private readonly logger: any, + private readonly queryCache: QueryCache, + preAggregations: PreAggregations, + preAggregation, + preAggregationsTablesToTempTables, + private readonly loadCache: PreAggregationLoadCache, + options: any = {} + ) { + this.preAggregations = preAggregations; + this.preAggregation = preAggregation; + this.preAggregationsTablesToTempTables = preAggregationsTablesToTempTables; + this.isJob = !!options.isJob; + this.waitForRenew = options.waitForRenew; + this.forceBuild = options.forceBuild; + this.orphanedTimeout = options.orphanedTimeout; + this.externalDriverFactory = preAggregations.externalDriverFactory; + this.requestId = options.requestId; + this.metadata = options.metadata; + this.structureVersionPersistTime = preAggregations.structureVersionPersistTime; + this.externalRefresh = options.externalRefresh; + + if (this.externalRefresh && this.waitForRenew) { + const message = 'Invalid configuration - when externalRefresh is true, it will not perform a renew, therefore you cannot wait for it using waitForRenew.'; + if (['production', 'test'].includes(getEnv('nodeEnv'))) { + throw new Error(message); + } else { + this.logger('Invalid Configuration', { + requestId: this.requestId, + warning: message, + }); + this.waitForRenew = false; + } + } + } + + public async loadPreAggregation( + throwOnMissingPartition: boolean, + ): Promise { + const notLoadedKey = (this.preAggregation.invalidateKeyQueries || []) + .find(keyQuery => !this.loadCache.hasKeyQueryResult(keyQuery)); + + if (this.isJob || !(notLoadedKey && !this.waitForRenew)) { + // Case 1: pre-agg build job processing. + // Case 2: either we have no data cached for this rollup or waitForRenew + // is true, either way, synchronously renew what data is needed so that + // the most current data will be returned fo the current request. + const result = await this.loadPreAggregationWithKeys(); + const refreshKeyValues = await this.getInvalidationKeyValues(); + return { + ...result, + refreshKeyValues, + queryKey: this.isJob + // We need to return a queryKey value for the jobed build query + // (initialized by the /cubejs-system/v1/pre-aggregations/jobs + // endpoint) as a part of the response to make it possible to get a + // query result from the cache by the other API call. + ? this.preAggregationQueryKey(refreshKeyValues) + : undefined, + }; + } else { + // Case 3: pre-agg exists + const structureVersion = getStructureVersion(this.preAggregation); + const getVersionsStarted = new Date(); + const { byStructure } = await this.loadCache.getVersionEntries(this.preAggregation); + this.logger('Load PreAggregations Tables', { + preAggregation: this.preAggregation, + requestId: this.requestId, + duration: (new Date().getTime() - getVersionsStarted.getTime()) + }); + + const versionEntryByStructureVersion = byStructure[`${this.preAggregation.tableName}_${structureVersion}`]; + if (this.externalRefresh) { + if (!versionEntryByStructureVersion && throwOnMissingPartition) { + // eslint-disable-next-line no-use-before-define + throw new Error(PreAggregations.noPreAggregationPartitionsBuiltMessage([this.preAggregation])); + } + if (!versionEntryByStructureVersion) { + return null; + } else { + // the rollups are being maintained independently of this instance of cube.js + // immediately return the latest rollup data that instance already has + return { + targetTableName: this.targetTableName(versionEntryByStructureVersion), + refreshKeyValues: [], + lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, + buildRangeEnd: versionEntryByStructureVersion.build_range_end, + }; + } + } + + if (versionEntryByStructureVersion) { + // this triggers an asynchronous/background load of the pre-aggregation but immediately + // returns the latest data it already has + this.loadPreAggregationWithKeys().catch(e => { + if (!(e instanceof ContinueWaitError)) { + this.logger('Error loading pre-aggregation', { + error: (e.stack || e), + preAggregation: this.preAggregation, + requestId: this.requestId + }); + } + }); + return { + targetTableName: this.targetTableName(versionEntryByStructureVersion), + refreshKeyValues: [], + lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, + buildRangeEnd: versionEntryByStructureVersion.build_range_end, + }; + } else { + // no rollup has been built yet - build it synchronously as part of responding to this request + return this.loadPreAggregationWithKeys(); + } + } + } + + protected async loadPreAggregationWithKeys(): Promise { + const invalidationKeys = await this.getPartitionInvalidationKeyValues(); + + const contentVersion = this.contentVersion(invalidationKeys); + const structureVersion = getStructureVersion(this.preAggregation); + + const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation); + + const getVersionEntryByContentVersion = ({ byContent }: VersionEntriesObj) => byContent[`${this.preAggregation.tableName}_${contentVersion}`]; + + const versionEntryByContentVersion = getVersionEntryByContentVersion(versionEntries); + if (versionEntryByContentVersion && !this.forceBuild) { + const targetTableName = this.targetTableName(versionEntryByContentVersion); + // No need to block here + this.updateLastTouch(targetTableName); + return { + targetTableName, + refreshKeyValues: [], + lastUpdatedAt: versionEntryByContentVersion.last_updated_at, + buildRangeEnd: versionEntryByContentVersion.build_range_end, + }; + } + + if (!this.waitForRenew && !this.forceBuild) { + const versionEntryByStructureVersion = versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`]; + if (versionEntryByStructureVersion) { + const targetTableName = this.targetTableName(versionEntryByStructureVersion); + // No need to block here + this.updateLastTouch(targetTableName); + return { + targetTableName, + refreshKeyValues: [], + lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, + buildRangeEnd: versionEntryByStructureVersion.build_range_end, + }; + } + } + + const client = this.preAggregation.external ? + await this.externalDriverFactory() : + await this.driverFactory(); + + if (!versionEntries.versionEntries.length) { + await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); + } + + // ensure we find appropriate structure version before invalidating anything + const versionEntry = + versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`] || + versionEntries.byTableName[this.preAggregation.tableName]; + + const newVersionEntry = { + table_name: this.preAggregation.tableName, + structure_version: structureVersion, + content_version: contentVersion, + last_updated_at: nowTimestamp(client), + naming_version: 2, + }; + + const mostRecentResult: () => Promise = async () => { + await this.loadCache.reset(this.preAggregation); + const lastVersion = getVersionEntryByContentVersion( + await this.loadCache.getVersionEntries(this.preAggregation) + ); + if (!lastVersion) { + throw new Error(`Pre-aggregation table is not found for ${this.preAggregation.tableName} after it was successfully created`); + } + const targetTableName = this.targetTableName(lastVersion); + this.updateLastTouch(targetTableName); + return { + targetTableName, + refreshKeyValues: [], + lastUpdatedAt: lastVersion.last_updated_at, + buildRangeEnd: lastVersion.build_range_end, + }; + }; + + if (this.forceBuild) { + this.logger('Force build pre-aggregation', { + preAggregation: this.preAggregation, + requestId: this.requestId, + metadata: this.metadata, + queryKey: this.preAggregationQueryKey(invalidationKeys), + newVersionEntry + }); + if (this.isJob) { + // We don't want to wait for the jobed build query result. So we run the + // executeInQueue method and immediately return the LoadPreAggregationResult object. + this + .executeInQueue(invalidationKeys, this.priority(10), newVersionEntry) + .catch((e: any) => { + this.logger('Pre-aggregations build job error', { + preAggregation: this.preAggregation, + requestId: this.requestId, + newVersionEntry, + error: (e.stack || e), + }); + }); + const targetTableName = this.targetTableName(newVersionEntry); + this.updateLastTouch(targetTableName); + return { + targetTableName, + refreshKeyValues: [], + lastUpdatedAt: newVersionEntry.last_updated_at, + buildRangeEnd: this.preAggregation.buildRangeEnd, + }; + } else { + await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); + return mostRecentResult(); + } + } + + if (versionEntry) { + if (versionEntry.structure_version !== newVersionEntry.structure_version) { + this.logger('Invalidating pre-aggregation structure', { + preAggregation: this.preAggregation, + requestId: this.requestId, + queryKey: this.preAggregationQueryKey(invalidationKeys), + newVersionEntry + }); + await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); + return mostRecentResult(); + } else if (versionEntry.content_version !== newVersionEntry.content_version) { + if (this.waitForRenew) { + this.logger('Waiting for pre-aggregation renew', { + preAggregation: this.preAggregation, + requestId: this.requestId, + queryKey: this.preAggregationQueryKey(invalidationKeys), + newVersionEntry + }); + await this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry); + return mostRecentResult(); + } else { + this.scheduleRefresh(invalidationKeys, newVersionEntry); + } + } + } else { + this.logger('Creating pre-aggregation from scratch', { + preAggregation: this.preAggregation, + requestId: this.requestId, + queryKey: this.preAggregationQueryKey(invalidationKeys), + newVersionEntry + }); + await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); + return mostRecentResult(); + } + const targetTableName = this.targetTableName(versionEntry); + this.updateLastTouch(targetTableName); + return { + targetTableName, + refreshKeyValues: [], + lastUpdatedAt: versionEntry.last_updated_at, + buildRangeEnd: versionEntry.build_range_end, + }; + } + + private updateLastTouch(tableName: string) { + this.preAggregations.updateLastTouch(tableName).catch(e => { + this.logger('Error on pre-aggregation touch', { + error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId, + }); + }); + } + + protected contentVersion(invalidationKeys) { + const versionArray = [this.preAggregation.structureVersionLoadSql || this.preAggregation.loadSql]; + if (this.preAggregation.indexesSql && this.preAggregation.indexesSql.length) { + versionArray.push(this.preAggregation.indexesSql); + } + if (this.preAggregation.streamOffset) { + versionArray.push(this.preAggregation.streamOffset); + } + if (this.preAggregation.outputColumnTypes) { + versionArray.push(this.preAggregation.outputColumnTypes); + } + versionArray.push(invalidationKeys); + return version(versionArray); + } + + protected priority(defaultValue: number): number { + return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue; + } + + protected getInvalidationKeyValues() { + return Promise.all( + (this.preAggregation.invalidateKeyQueries || []).map( + (sqlQuery) => this.loadCache.keyQueryResult(sqlQuery, this.waitForRenew, this.priority(10)) + ) + ); + } + + protected getPartitionInvalidationKeyValues() { + if (this.preAggregation.partitionInvalidateKeyQueries) { + return Promise.all( + (this.preAggregation.partitionInvalidateKeyQueries || []).map( + (sqlQuery) => this.loadCache.keyQueryResult(sqlQuery, this.waitForRenew, this.priority(10)) + ) + ); + } else { + return this.getInvalidationKeyValues(); + } + } + + protected scheduleRefresh(invalidationKeys, newVersionEntry) { + this.logger('Refreshing pre-aggregation content', { + preAggregation: this.preAggregation, + requestId: this.requestId, + queryKey: this.preAggregationQueryKey(invalidationKeys), + newVersionEntry + }); + this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry) + .catch(e => { + if (!(e instanceof ContinueWaitError)) { + this.logger('Error refreshing pre-aggregation', { + error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId + }); + } + }); + } + + protected async executeInQueue(invalidationKeys, priority, newVersionEntry) { + const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource); + return queue.executeInQueue( + 'query', + this.preAggregationQueryKey(invalidationKeys), + { + preAggregation: this.preAggregation, + preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables, + newVersionEntry, + requestId: this.requestId, + invalidationKeys, + forceBuild: this.forceBuild, + isJob: this.isJob, + metadata: this.metadata, + orphanedTimeout: this.orphanedTimeout, + }, + priority, + // eslint-disable-next-line no-use-before-define + { stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation), requestId: this.requestId } + ); + } + + protected preAggregationQueryKey(invalidationKeys: InvalidationKeys): QueryKey { + return this.preAggregation.indexesSql && this.preAggregation.indexesSql.length ? + [this.preAggregation.loadSql, this.preAggregation.indexesSql, invalidationKeys] : + [this.preAggregation.loadSql, invalidationKeys]; + } + + protected targetTableName(versionEntry: VersionEntry): string { + // eslint-disable-next-line no-use-before-define + return PreAggregations.targetTableName(versionEntry); + } + + public refresh(newVersionEntry: VersionEntry, invalidationKeys: InvalidationKeys, client) { + this.updateLastTouch(this.targetTableName(newVersionEntry)); + let refreshStrategy = this.refreshStoreInSourceStrategy; + if (this.preAggregation.external) { + const readOnly = + this.preAggregation.readOnly || + client.config && client.config.readOnly || + client.readOnly && (typeof client.readOnly === 'boolean' ? client.readOnly : client.readOnly()); + + if (readOnly) { + refreshStrategy = this.refreshReadOnlyExternalStrategy; + } else { + refreshStrategy = this.refreshWriteStrategy; + } + } + return cancelCombinator( + saveCancelFn => refreshStrategy.bind(this)( + client, + newVersionEntry, + saveCancelFn, + invalidationKeys + ) + ); + } + + protected logExecutingSql(payload) { + this.logger( + 'Executing Load Pre Aggregation SQL', + payload + ); + } + + protected queryOptions(invalidationKeys: InvalidationKeys, query: string, params: unknown[], targetTableName: string, newVersionEntry: VersionEntry) { + return { + queryKey: this.preAggregationQueryKey(invalidationKeys), + query, + values: params, + targetTableName, + requestId: this.requestId, + newVersionEntry, + buildRangeEnd: this.preAggregation.buildRangeEnd, + }; + } + + protected async refreshStoreInSourceStrategy( + client: DriverInterface, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + invalidationKeys: InvalidationKeys + ) { + const [loadSql, params] = + Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; + const targetTableName = this.targetTableName(newVersionEntry); + const query = ( + QueryCache.replacePreAggregationTableNames( + loadSql, + this.preAggregationsTablesToTempTables, + ) + ).replace( + this.preAggregation.tableName, + targetTableName + ); + const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry); + this.logExecutingSql(queryOptions); + + try { + // TODO move index creation to the driver + await saveCancelFn(client.loadPreAggregationIntoTable( + targetTableName, + query, + params, + { + streamOffset: this.preAggregation.streamOffset, + outputColumnTypes: this.preAggregation.outputColumnTypes, + ...queryOptions + } + )); + + await this.createIndexes(client, newVersionEntry, saveCancelFn, queryOptions); + await this.loadCache.fetchTables(this.preAggregation); + } finally { + // We must clean orphaned in any cases: success or exception + await this.dropOrphanedTables(client, targetTableName, saveCancelFn, false, queryOptions); + await this.loadCache.fetchTables(this.preAggregation); + } + } + + protected async refreshWriteStrategy( + client: DriverInterface, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + invalidationKeys: InvalidationKeys, + ) { + const capabilities = client?.capabilities(); + + const withTempTable = !(capabilities?.unloadWithoutTempTable); + const dropSourceTempTable = !capabilities?.streamingSource; + + return this.runWriteStrategy( + client, + newVersionEntry, + saveCancelFn, + invalidationKeys, + withTempTable, + dropSourceTempTable + ); + } + + /** + * Runs export strategy with write access in data source + */ + protected async runWriteStrategy( + client: DriverInterface, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + invalidationKeys: InvalidationKeys, + withTempTable: boolean, + dropSourceTempTable: boolean, + ) { + if (withTempTable) { + await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); + } + const targetTableName = this.targetTableName(newVersionEntry); + const queryOptions = await this.prepareWriteStrategy( + client, + targetTableName, + newVersionEntry, + saveCancelFn, + invalidationKeys, + withTempTable, + ); + + try { + const tableData = await this.downloadExternalPreAggregation( + client, + newVersionEntry, + saveCancelFn, + queryOptions, + withTempTable, + ); + + try { + await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn, queryOptions); + } finally { + if (tableData && tableData.release) { + await tableData.release(); + } + } + } finally { + await this.cleanupWriteStrategy( + client, + targetTableName, + queryOptions, + saveCancelFn, + withTempTable, + dropSourceTempTable, + ); + } + } + + /** + * Cleanup tables after write strategy + */ + protected async cleanupWriteStrategy( + client: DriverInterface, + targetTableName: string, + queryOptions: QueryOptions, + saveCancelFn: SaveCancelFn, + withTempTable: boolean, + dropSourceTempTable: boolean, + ) { + if (withTempTable && dropSourceTempTable) { + await this.withDropLock(false, async () => { + this.logger('Dropping source temp table', queryOptions); + + const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema); + const mappedActualTables = actualTables.map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`); + if (mappedActualTables.includes(targetTableName)) { + await client.dropTable(targetTableName); + } + }); + } + + // We must clean orphaned in any cases: success or exception + await this.loadCache.fetchTables(this.preAggregation); + await this.dropOrphanedTables(client, targetTableName, saveCancelFn, false, queryOptions); + } + + /** + * Create table (if required) and prepares query options object + */ + protected async prepareWriteStrategy( + client: DriverInterface, + targetTableName: string, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + invalidationKeys: InvalidationKeys, + withTempTable: boolean + ): Promise { + if (withTempTable) { + const [loadSql, params] = + Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; + + const query = ( + QueryCache.replacePreAggregationTableNames( + loadSql, + this.preAggregationsTablesToTempTables, + ) + ).replace( + this.preAggregation.tableName, + targetTableName + ); + const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry); + this.logExecutingSql(queryOptions); + await saveCancelFn(client.loadPreAggregationIntoTable( + targetTableName, + query, + params, + { + streamOffset: this.preAggregation.streamOffset, + outputColumnTypes: this.preAggregation.outputColumnTypes, + ...queryOptions + } + )); + + return queryOptions; + } else { + const [sql, params] = + Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; + const queryOptions = this.queryOptions(invalidationKeys, sql, params, targetTableName, newVersionEntry); + this.logExecutingSql(queryOptions); + return queryOptions; + } + } + + /** + * Strategy to copy pre-aggregation from source db (for read-only permissions) to external data + */ + protected async refreshReadOnlyExternalStrategy( + client: DriverInterface, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + invalidationKeys: InvalidationKeys + ) { + const [sql, params] = + Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; + + const queryOptions = this.queryOptions(invalidationKeys, sql, params, this.targetTableName(newVersionEntry), newVersionEntry); + this.logExecutingSql(queryOptions); + this.logger('Downloading external pre-aggregation via query', queryOptions); + const externalDriver = await this.externalDriverFactory(); + const capabilities = externalDriver.capabilities && externalDriver.capabilities(); + + let tableData: DownloadQueryResultsResult; + + if (capabilities.csvImport && client.unloadFromQuery && await client.isUnloadSupported(this.getUnloadOptions())) { + tableData = await saveCancelFn( + client.unloadFromQuery( + sql, + params, + this.getUnloadOptions(), + ) + ).catch((error: any) => { + this.logger('Downloading external pre-aggregation via query error', { + ...queryOptions, + error: error.stack || error.message + }); + throw error; + }); + } else { + tableData = await saveCancelFn(client.downloadQueryResults( + sql, + params, { + streamOffset: this.preAggregation.streamOffset, + outputColumnTypes: this.preAggregation.outputColumnTypes, + ...queryOptions, + ...capabilities, + ...this.getStreamingOptions(), + } + )).catch((error: any) => { + this.logger('Downloading external pre-aggregation via query error', { + ...queryOptions, + error: error.stack || error.message + }); + throw error; + }); + } + + this.logger('Downloading external pre-aggregation via query completed', { + ...queryOptions, + isUnloadSupported: isDownloadTableCSVData(tableData) + }); + + try { + await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn, queryOptions); + } finally { + if (tableData.release) { + await tableData.release(); + } + } + + await this.loadCache.fetchTables(this.preAggregation); + } + + protected getUnloadOptions(): UnloadOptions { + return { + // Default: 16mb for Snowflake, Should be specified in MBs, because drivers convert it + maxFileSize: 64 + }; + } + + protected getStreamingOptions(): StreamOptions { + return { + // Default: 16384 (16KB), or 16 for objectMode streams. PostgreSQL/MySQL use object streams + highWaterMark: 10000 + }; + } + + /** + * prepares download data for future cube store usage + */ + protected async downloadExternalPreAggregation( + client: DriverInterface, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + queryOptions: QueryOptions, + withTempTable: boolean + ) { + const table = this.targetTableName(newVersionEntry); + this.logger('Downloading external pre-aggregation', queryOptions); + + try { + const externalDriver = await this.externalDriverFactory(); + const capabilities = externalDriver.capabilities && externalDriver.capabilities(); + + let tableData: DownloadTableData; + if (withTempTable) { + tableData = await this.getTableDataWithTempTable(client, table, saveCancelFn, queryOptions, capabilities); + } else { + tableData = await this.getTableDataWithoutTempTable(client, table, saveCancelFn, queryOptions, capabilities); + } + + this.logger('Downloading external pre-aggregation completed', { + ...queryOptions, + isUnloadSupported: isDownloadTableCSVData(tableData) + }); + + return tableData; + } catch (error: any) { + this.logger('Downloading external pre-aggregation error', { + ...queryOptions, + error: error?.stack || error?.message + }); + throw error; + } + } + + /** + * prepares download data when temp table = true + */ + protected async getTableDataWithTempTable(client: DriverInterface, table: string, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions, externalDriverCapabilities: DriverCapabilities) { + let tableData: DownloadTableData; + + if (externalDriverCapabilities.csvImport && client.unload && await client.isUnloadSupported(this.getUnloadOptions())) { + tableData = await saveCancelFn( + client.unload(table, this.getUnloadOptions()), + ); + } else if (externalDriverCapabilities.streamImport && client.stream) { + tableData = await saveCancelFn( + client.stream(`SELECT * FROM ${table}`, [], this.getStreamingOptions()) + ); + + if (client.unload) { + const stream = new LargeStreamWarning(this.preAggregation.preAggregationId, (msg) => { + this.logger('Downloading external pre-aggregation warning', { + ...queryOptions, + error: msg + }); + }); + tableData.rowStream.pipe(stream); + tableData.rowStream = stream; + } + } else { + tableData = await saveCancelFn(client.downloadTable(table, { + streamOffset: this.preAggregation.streamOffset, + outputColumnTypes: this.preAggregation.outputColumnTypes, + ...externalDriverCapabilities + })); + } + + if (!tableData.types) { + tableData.types = await saveCancelFn(client.tableColumnTypes(table)); + } + + return tableData; + } + + /** + * prepares download data when temp table = false + */ + protected async getTableDataWithoutTempTable(client: DriverInterface, table: string, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions, externalDriverCapabilities: DriverCapabilities) { + const [sql, params] = + Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; + + let tableData: DownloadTableData; + + if (externalDriverCapabilities.csvImport && client.unload && await client.isUnloadSupported(this.getUnloadOptions())) { + return saveCancelFn( + client.unload( + table, + { ...this.getUnloadOptions(), query: { sql, params } }, + ) + ); + } else if (externalDriverCapabilities.streamImport && client.stream) { + tableData = await saveCancelFn( + client.stream(sql, params, this.getStreamingOptions()) + ); + + if (client.unload) { + const stream = new LargeStreamWarning(this.preAggregation.preAggregationId, (msg) => { + this.logger('Downloading external pre-aggregation warning', { + ...queryOptions, + error: msg + }); + }); + tableData.rowStream.pipe(stream); + tableData.rowStream = stream; + } + } else { + tableData = { rows: await saveCancelFn(client.query(sql, params)) }; + } + + if (!tableData.types && client.queryColumnTypes) { + tableData.types = await saveCancelFn(client.queryColumnTypes(sql, params)); + } + + return tableData; + } + + protected async uploadExternalPreAggregation( + tableData: DownloadTableData, + newVersionEntry: VersionEntry, + saveCancelFn: SaveCancelFn, + queryOptions: QueryOptions + ) { + const externalDriver: DriverInterface = await this.externalDriverFactory(); + const table = this.targetTableName(newVersionEntry); + + this.logger('Uploading external pre-aggregation', queryOptions); + await saveCancelFn( + externalDriver.uploadTableWithIndexes( + table, + tableData.types, + tableData, + this.prepareIndexesSql(newVersionEntry, queryOptions), + this.preAggregation.uniqueKeyColumns, + queryOptions, + { + aggregationsColumns: this.preAggregation.aggregationsColumns, + createTableIndexes: this.prepareCreateTableIndexes(newVersionEntry), + sealAt: this.preAggregation.sealAt + } + ) + ).catch((error: any) => { + this.logger('Uploading external pre-aggregation error', { + ...queryOptions, + error: error?.stack || error?.message + }); + throw error; + }); + this.logger('Uploading external pre-aggregation completed', queryOptions); + + await this.loadCache.fetchTables(this.preAggregation); + await this.dropOrphanedTables(externalDriver, table, saveCancelFn, true, queryOptions); + } + + protected async createIndexes(driver, newVersionEntry: VersionEntry, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions) { + const indexesSql = this.prepareIndexesSql(newVersionEntry, queryOptions); + for (let i = 0; i < indexesSql.length; i++) { + const [query, params] = indexesSql[i].sql; + await saveCancelFn(driver.query(query, params)); + } + } + + protected prepareIndexesSql(newVersionEntry: VersionEntry, queryOptions: QueryOptions) { + if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) { + return []; + } + return this.preAggregation.indexesSql.map(({ sql, indexName }) => { + const [query, params] = sql; + const indexVersionEntry = { + ...newVersionEntry, + table_name: indexName + }; + this.logger('Creating pre-aggregation index', queryOptions); + const resultingSql = QueryCache.replacePreAggregationTableNames( + query, + this.preAggregationsTablesToTempTables.concat([ + [this.preAggregation.tableName, { targetTableName: this.targetTableName(newVersionEntry) }], + [indexName, { targetTableName: this.targetTableName(indexVersionEntry) }] + ]) + ); + return { sql: [resultingSql, params] }; + }); + } + + protected prepareCreateTableIndexes(newVersionEntry: VersionEntry) { + if (!this.preAggregation.createTableIndexes || !this.preAggregation.createTableIndexes.length) { + return []; + } + return this.preAggregation.createTableIndexes.map(({ indexName, type, columns }) => { + const indexVersionEntry = { + ...newVersionEntry, + table_name: indexName + }; + return { indexName: this.targetTableName(indexVersionEntry), type, columns }; + }); + } + + private async withDropLock(external: boolean, lockFn: () => MaybeCancelablePromise): Promise { + const lockKey = this.dropLockKey(external); + return this.queryCache.withLock(lockKey, 60 * 5, lockFn); + } + + protected async dropOrphanedTables( + client: DriverInterface, + justCreatedTable: string, + saveCancelFn: SaveCancelFn, + external: boolean, + queryOptions: QueryOptions + ) { + await this.preAggregations.addTableUsed(justCreatedTable); + + return this.withDropLock(external, async () => { + this.logger('Dropping orphaned tables', { ...queryOptions, external }); + const actualTables = await client.getTablesQuery( + this.preAggregation.preAggregationsSchema, + ); + const versionEntries = tablesToVersionEntries( + this.preAggregation.preAggregationsSchema, + actualTables, + ); + const versionEntriesToSave = R.pipe< + VersionEntry[], + { [index: string]: VersionEntry[] }, + Array<[string, VersionEntry[]]>, + VersionEntry[] + >( + R.groupBy(v => v.table_name), + R.toPairs, + R.map(p => p[1][0]) + )(versionEntries); + const structureVersionsToSave = R.pipe< + VersionEntry[], + VersionEntry[], + { [index: string]: VersionEntry[] }, + Array<[string, VersionEntry[]]>, + VersionEntry[] + >( + R.filter( + (v: VersionEntry) => ( + new Date().getTime() - v.last_updated_at < + this.structureVersionPersistTime * 1000 + ) + ), + R.groupBy(v => `${v.table_name}_${v.structure_version}`), + R.toPairs, + R.map(p => p[1][0]) + )(versionEntries); + + const refreshEndReached = await this.preAggregations.getRefreshEndReached(); + const toSave = + this.preAggregations.dropPreAggregationsWithoutTouch && refreshEndReached + ? (await this.preAggregations.tablesUsed()) + .concat(await this.preAggregations.tablesTouched()) + .concat([justCreatedTable]) + : (await this.preAggregations.tablesUsed()) + .concat(structureVersionsToSave.map(v => this.targetTableName(v))) + .concat(versionEntriesToSave.map(v => this.targetTableName(v))) + .concat([justCreatedTable]); + const toDrop = actualTables + .map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`) + .filter(t => toSave.indexOf(t) === -1); + + await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table)))); + this.logger('Dropping orphaned tables completed', { + ...queryOptions, + external, + tablesToDrop: JSON.stringify(toDrop), + }); + }); + } + + private dropLockKey(external: boolean) { + return external + ? 'drop-orphaned-tables-external' + : `drop-orphaned-tables:${this.preAggregation.dataSource}`; + } +} diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts new file mode 100644 index 0000000000000..23af066803360 --- /dev/null +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -0,0 +1,504 @@ +import { + addSecondsToLocalTimestamp, + BUILD_RANGE_END_LOCAL, + BUILD_RANGE_START_LOCAL, + FROM_PARTITION_RANGE, + TO_PARTITION_RANGE, + MAX_SOURCE_ROW_LIMIT, + reformatInIsoLocal, + utcToLocalTimeZone, + timeSeries, + inDbTimeZone, + extractDate +} from '@cubejs-backend/shared'; +import { InlineTable, TableStructure } from '@cubejs-backend/base-driver'; +import { DriverFactory } from './DriverFactory'; +import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; +import { + getLastUpdatedAtTimestamp, + LAMBDA_TABLE_PREFIX, + LambdaQuery, + LoadPreAggregationResult, + PartitionRanges, + PreAggregationDescription, + PreAggregations, + QueryDateRange +} from './PreAggregations'; +import { PreAggregationLoader } from './PreAggregationLoader'; +import { PreAggregationLoadCache } from './PreAggregationLoadCache'; + +interface PreAggsPartitionRangeLoaderOpts { + maxPartitions: number; + maxSourceRowLimit: number; + waitForRenew?: boolean; + requestId?: string; + externalRefresh?: boolean; + forceBuild?: boolean; + metadata?: any; + orphanedTimeout?: number; + lambdaQuery?: LambdaQuery; + isJob?: boolean; + compilerCacheFn?: (subKey: string[], cacheFn: () => T) => T; +} + +export class PreAggregationPartitionRangeLoader { + /** + * Determines whether current instance instantiated for a jobbed build query + * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or + * not. + */ + protected isJob: boolean; + + protected waitForRenew: boolean; + + protected requestId: string; + + protected lambdaQuery: LambdaQuery; + + protected dataSource: string; + + protected compilerCacheFn: (subKey: string[], cacheFn: () => T) => T; + + public constructor( + private readonly driverFactory: DriverFactory, + private readonly logger: any, + private readonly queryCache: QueryCache, + // eslint-disable-next-line no-use-before-define + private readonly preAggregations: PreAggregations, + private readonly preAggregation: PreAggregationDescription, + private readonly preAggregationsTablesToTempTables: [string, LoadPreAggregationResult][], + private readonly loadCache: PreAggregationLoadCache, + private readonly options: PreAggsPartitionRangeLoaderOpts = { + maxPartitions: 10000, + maxSourceRowLimit: 10000, + }, + ) { + this.isJob = !!options.isJob; + this.waitForRenew = options.waitForRenew; + this.requestId = options.requestId; + this.lambdaQuery = options.lambdaQuery; + this.dataSource = preAggregation.dataSource; + this.compilerCacheFn = options.compilerCacheFn || ((subKey, cacheFn) => cacheFn()); + } + + private async loadRangeQuery(rangeQuery: QueryTuple, partitionRange?: QueryDateRange) { + const [query, values, queryOptions]: QueryTuple = rangeQuery; + const invalidate = + this.preAggregation.invalidateKeyQueries && + this.preAggregation.invalidateKeyQueries[0] + ? this.preAggregation.invalidateKeyQueries[0].slice(0, 2) + : false; + + return this.queryCache.cacheQueryResult( + query, + values, + QueryCache.queryCacheKey({ + query, + values: (values), + invalidate, + }), + 24 * 60 * 60, + { + renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold + || queryOptions?.renewalThreshold || 24 * 60 * 60, + waitForRenew: this.waitForRenew, + priority: this.priority(10), + requestId: this.requestId, + dataSource: this.dataSource, + useInMemory: true, + external: queryOptions?.external, + renewalKey: partitionRange ? await this.getInvalidationKeyValues(partitionRange) : null, + } + ); + } + + protected getInvalidationKeyValues(range) { + const partitionTableName = PreAggregationPartitionRangeLoader.partitionTableName( + this.preAggregation.tableName, this.preAggregation.partitionGranularity, range + ); + return Promise.all( + (this.preAggregation.invalidateKeyQueries || []).map( + (sqlQuery) => ( + this.loadCache.keyQueryResult( + this.replacePartitionSqlAndParams(sqlQuery, range, partitionTableName), this.waitForRenew, this.priority(10) + ) + ) + ) + ); + } + + protected priority(defaultValue) { + return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue; + } + + public async replaceQueryBuildRangeParams(queryValues: string[]): Promise { + if (queryValues?.find(p => p === BUILD_RANGE_START_LOCAL || p === BUILD_RANGE_END_LOCAL)) { + const [buildRangeStart, buildRangeEnd] = await this.loadBuildRange(); + return queryValues?.map( + param => { + if (param === BUILD_RANGE_START_LOCAL) { + return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeStart); + } else if (param === BUILD_RANGE_END_LOCAL) { + return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeEnd); + } else { + return param; + } + }, + ); + } + return null; + } + + private replacePartitionSqlAndParams( + query: QueryWithParams, + dateRange: QueryDateRange, + partitionTableName: string + ): QueryWithParams { + const [sql, params, options] = query; + const updateWindowToBoundary = options?.incremental && addSecondsToLocalTimestamp( + dateRange[1], this.preAggregation.timezone, options?.updateWindowSeconds || 0 + ); + return [sql.replace(this.preAggregation.tableName, partitionTableName), params?.map( + param => { + if (dateRange && param === FROM_PARTITION_RANGE) { + return PreAggregationPartitionRangeLoader.inDbTimeZone(this.preAggregation, dateRange[0]); + } else if (dateRange && param === TO_PARTITION_RANGE) { + return PreAggregationPartitionRangeLoader.inDbTimeZone(this.preAggregation, dateRange[1]); + } else { + return param; + } + }, + ), { + ...options, + renewalThreshold: + options?.incremental && updateWindowToBoundary < new Date() ? + // if updateWindowToBoundary passed just moments ago we want to renew it earlier in case + // of server and db clock don't match + Math.min( + Math.round((new Date().getTime() - updateWindowToBoundary.getTime()) / 1000), + options?.renewalThresholdOutsideUpdateWindow + ) : + options?.renewalThreshold + }]; + } + + private partitionPreAggregationDescription(range: QueryDateRange, buildRange: QueryDateRange): PreAggregationDescription { + const partitionTableName = PreAggregationPartitionRangeLoader.partitionTableName( + this.preAggregation.tableName, this.preAggregation.partitionGranularity, range + ); + const [_, buildRangeEnd] = buildRange; + const loadRange: [string, string] = [...range]; + const partitionInvalidateKeyQueries = this.preAggregation.partitionInvalidateKeyQueries || this.preAggregation.invalidateKeyQueries; + // `partitionInvalidateKeyQueries = []` in case of real time + if ((!partitionInvalidateKeyQueries || partitionInvalidateKeyQueries.length > 0) && buildRangeEnd < range[1]) { + loadRange[1] = buildRangeEnd; + } + const sealAt = addSecondsToLocalTimestamp( + loadRange[1], this.preAggregation.timezone, this.preAggregation.updateWindowSeconds || 0 + ).toISOString(); + return { + ...this.preAggregation, + tableName: partitionTableName, + structureVersionLoadSql: this.preAggregation.loadSql && + this.replacePartitionSqlAndParams(this.preAggregation.loadSql, range, partitionTableName), + loadSql: this.preAggregation.loadSql && + this.replacePartitionSqlAndParams(this.preAggregation.loadSql, loadRange, partitionTableName), + sql: this.preAggregation.sql && + this.replacePartitionSqlAndParams(this.preAggregation.sql, loadRange, partitionTableName), + invalidateKeyQueries: (this.preAggregation.invalidateKeyQueries || []) + .map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)), + partitionInvalidateKeyQueries: this.preAggregation.partitionInvalidateKeyQueries && + this.preAggregation.partitionInvalidateKeyQueries.map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)), + indexesSql: (this.preAggregation.indexesSql || []) + .map(q => ({ ...q, sql: this.replacePartitionSqlAndParams(q.sql, range, partitionTableName) })), + previewSql: this.preAggregation.previewSql && + this.replacePartitionSqlAndParams(this.preAggregation.previewSql, range, partitionTableName), + buildRangeStart: loadRange[0], + buildRangeEnd: loadRange[1], + sealAt, // Used only for kSql pre aggregations + }; + } + + public async loadPreAggregations(): Promise { + if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) { + const loadPreAggregationsByPartitionRanges = async ({ buildRange, partitionRanges }: PartitionRanges) => { + const partitionLoaders = partitionRanges.map(range => new PreAggregationLoader( + this.driverFactory, + this.logger, + this.queryCache, + this.preAggregations, + this.partitionPreAggregationDescription(range, buildRange), + this.preAggregationsTablesToTempTables, + this.loadCache, + this.options, + )); + const resolveResults = await Promise.all(partitionLoaders.map(async (l, i) => { + const result = await l.loadPreAggregation(false); + return result && { + ...result, + partitionRange: partitionRanges[i] + }; + })); + return { loadResults: resolveResults.filter(res => res !== null), partitionLoaders }; + }; + + // eslint-disable-next-line prefer-const + let loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges()); + if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) { + loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges(true)); + // In case there're no partitions ready at matched time dimension intersection then no data can be retrieved. + // We need to provide any table so query can just execute successfully. + if (loadResultAndLoaders.loadResults.length > 0) { + loadResultAndLoaders.loadResults = [loadResultAndLoaders.loadResults[loadResultAndLoaders.loadResults.length - 1]]; + } + } + if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) { + throw new Error( + // eslint-disable-next-line no-use-before-define + PreAggregations.noPreAggregationPartitionsBuiltMessage(loadResultAndLoaders.partitionLoaders.map(p => p.preAggregation)) + ); + } + + let { loadResults } = loadResultAndLoaders; + + let lambdaTable: InlineTable; + let emptyResult = false; + + if (this.preAggregation.rollupLambdaId) { + if (this.lambdaQuery && loadResults.length > 0) { + const { buildRangeEnd, targetTableName } = loadResults[loadResults.length - 1]; + const lambdaTypes = await this.loadCache.getTableColumnTypes(this.preAggregation, targetTableName); + lambdaTable = await this.downloadLambdaTable(buildRangeEnd, lambdaTypes); + } + const rollupLambdaResults = this.preAggregationsTablesToTempTables.filter(tempTableResult => tempTableResult[1].rollupLambdaId === this.preAggregation.rollupLambdaId); + const filteredResults = loadResults.filter( + r => (this.preAggregation.lastRollupLambda || reformatInIsoLocal(r.buildRangeEnd) === reformatInIsoLocal(r.partitionRange[1])) && + rollupLambdaResults.every(result => !result[1].buildRangeEnd || reformatInIsoLocal(result[1].buildRangeEnd) < reformatInIsoLocal(r.partitionRange[0])) + ); + if (filteredResults.length === 0) { + emptyResult = true; + loadResults = [loadResults[loadResults.length - 1]]; + } else { + loadResults = filteredResults; + } + } + + const allTableTargetNames = loadResults.map(targetTableName => targetTableName.targetTableName); + let lastUpdatedAt = getLastUpdatedAtTimestamp(loadResults.map(r => r.lastUpdatedAt)); + + if (lambdaTable) { + allTableTargetNames.push(lambdaTable.name); + lastUpdatedAt = Date.now(); + } + + const unionTargetTableName = allTableTargetNames + .map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`) + .join(' UNION ALL '); + return { + targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`, + refreshKeyValues: loadResults.map(t => t.refreshKeyValues), + lastUpdatedAt, + buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd, + lambdaTable, + rollupLambdaId: this.preAggregation.rollupLambdaId, + }; + } else { + return new PreAggregationLoader( + this.driverFactory, + this.logger, + this.queryCache, + this.preAggregations, + this.preAggregation, + this.preAggregationsTablesToTempTables, + this.loadCache, + this.options + ).loadPreAggregation(true); + } + } + + /** + * Downloads the lambda table from the source DB. + */ + private async downloadLambdaTable(fromDate: string, lambdaTypes: TableStructure): Promise { + const { sqlAndParams, cacheKeyQueries } = this.lambdaQuery; + const [query, params] = sqlAndParams; + const values = params.map((p) => { + if (p === FROM_PARTITION_RANGE) { + return fromDate; + } + if (p === MAX_SOURCE_ROW_LIMIT) { + return this.options.maxSourceRowLimit; + } + return p; + }); + const { data } = await this.queryCache.renewQuery( + query, + values, + cacheKeyQueries, + 60 * 60, + [query, values], + undefined, + { + requestId: this.requestId, + skipRefreshKeyWaitForRenew: false, + dataSource: this.dataSource, + external: false, + useCsvQuery: true, + lambdaTypes, + } + ); + if (data.rowCount === this.options.maxSourceRowLimit) { + throw new Error(`The maximum number of source rows ${this.options.maxSourceRowLimit} was reached for ${this.preAggregation.preAggregationId}`); + } + return { + name: `${LAMBDA_TABLE_PREFIX}_${this.preAggregation.tableName.replace('.', '_')}`, + columns: data.types, + csvRows: data.csvRows, + }; + } + + public async partitionPreAggregations(): Promise { + if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) { + const { buildRange, partitionRanges } = await this.partitionRanges(); + return this.compilerCacheFn(['partitions', JSON.stringify(buildRange)], () => partitionRanges.map(range => this.partitionPreAggregationDescription(range, buildRange))); + } else { + return [this.preAggregation]; + } + } + + private async partitionRanges(ignoreMatchedDateRange?: boolean): Promise { + const buildRange = await this.loadBuildRange(); + if (!buildRange[0] || !buildRange[1]) { + return { buildRange, partitionRanges: [] }; + } + let dateRange = PreAggregationPartitionRangeLoader.intersectDateRanges( + buildRange, + ignoreMatchedDateRange ? undefined : this.preAggregation.matchedTimeDimensionDateRange, + ); + if (!dateRange) { + // If there's no date range intersection between query data range and pre-aggregation build range + // use last partition so outer query can receive expected table structure. + dateRange = [buildRange[1], buildRange[1]]; + } + const partitionRanges = this.compilerCacheFn(['timeSeries', this.preAggregation.partitionGranularity, JSON.stringify(dateRange), `${this.preAggregation.timestampPrecision}`], () => PreAggregationPartitionRangeLoader.timeSeries( + this.preAggregation.partitionGranularity, + dateRange, + this.preAggregation.timestampPrecision + )); + if (partitionRanges.length > this.options.maxPartitions) { + throw new Error( + `Pre-aggregation '${this.preAggregation.tableName}' requested to build ${partitionRanges.length} partitions which exceeds the maximum number of partitions per pre-aggregation of ${this.options.maxPartitions}` + ); + } + return { buildRange: dateRange, partitionRanges }; + } + + public async loadBuildRange(): Promise { + const { preAggregationStartEndQueries } = this.preAggregation; + const [startDate, endDate] = await Promise.all( + preAggregationStartEndQueries.map( + async rangeQuery => PreAggregationPartitionRangeLoader.extractDate(await this.loadRangeQuery(rangeQuery)), + ), + ); + if (!this.preAggregation.partitionGranularity) { + return this.orNowIfEmpty([startDate, endDate]); + } + const wholeSeriesRanges = PreAggregationPartitionRangeLoader.timeSeries( + this.preAggregation.partitionGranularity, + this.orNowIfEmpty([startDate, endDate]), + this.preAggregation.timestampPrecision, + ); + const [rangeStart, rangeEnd] = await Promise.all( + preAggregationStartEndQueries.map( + async (rangeQuery, i) => PreAggregationPartitionRangeLoader.extractDate( + await this.loadRangeQuery( + rangeQuery, i === 0 ? wholeSeriesRanges[0] : wholeSeriesRanges[wholeSeriesRanges.length - 1], + ), + ), + ), + ); + return this.orNowIfEmpty([rangeStart, rangeEnd]); + } + + private now() { + return utcToLocalTimeZone(this.preAggregation.timezone, 'YYYY-MM-DDTHH:mm:ss.SSS', new Date().toJSON().substring(0, 23)); + } + + private orNowIfEmpty(dateRange: QueryDateRange): QueryDateRange { + if (!dateRange[0] && !dateRange[1]) { + const now = this.now(); + return [now, now]; + } + if (!dateRange[0]) { + return [dateRange[1], dateRange[1]]; + } + if (!dateRange[1]) { + return [dateRange[0], dateRange[0]]; + } + return dateRange; + } + + private static checkDataRangeType(range: QueryDateRange) { + if (!range) { + return; + } + + if (range.length !== 2) { + throw new Error(`Date range expected to be an array with 2 elements but ${range} found`); + } + + if (typeof range[0] !== 'string' || typeof range[1] !== 'string') { + throw new Error(`Date range expected to be a string array but ${range} found`); + } + + if ((range[0].length !== 23 && range[0].length !== 26) || (range[1].length !== 23 && range[0].length !== 26)) { + throw new Error(`Date range expected to be in YYYY-MM-DDTHH:mm:ss.SSS format but ${range} found`); + } + } + + public static intersectDateRanges(rangeA: QueryDateRange | null, rangeB: QueryDateRange | null): QueryDateRange { + PreAggregationPartitionRangeLoader.checkDataRangeType(rangeA); + PreAggregationPartitionRangeLoader.checkDataRangeType(rangeB); + if (!rangeB) { + return rangeA; + } + if (!rangeA) { + return rangeB; + } + const from = rangeA[0] > rangeB[0] ? rangeA[0] : rangeB[0]; + const to = rangeA[1] < rangeB[1] ? rangeA[1] : rangeB[1]; + if (from > to) { + return null; + } + return [ + from, + to, + ]; + } + + public static timeSeries(granularity: string, dateRange: QueryDateRange, timestampPrecision: number): QueryDateRange[] { + return timeSeries(granularity, dateRange, { + timestampPrecision + }); + } + + public static partitionTableName(tableName: string, partitionGranularity: string, dateRange: string[]) { + const partitionSuffix = dateRange[0].substring( + 0, + partitionGranularity === 'hour' ? 13 : 10 + ).replace(/[-T:]/g, ''); + return `${tableName}${partitionSuffix}`; + } + + public static inDbTimeZone(preAggregationDescription: any, timestamp: string): string { + return inDbTimeZone(preAggregationDescription.timezone, preAggregationDescription.timestampFormat, timestamp); + } + + public static extractDate(data: any): string { + return extractDate(data); + } + + public static FROM_PARTITION_RANGE = FROM_PARTITION_RANGE; + + public static TO_PARTITION_RANGE = TO_PARTITION_RANGE; +} diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/index.ts b/packages/cubejs-query-orchestrator/src/orchestrator/index.ts index d91d52f587230..e4a28e1cbc491 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/index.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/index.ts @@ -3,6 +3,9 @@ export * from './ContinueWaitError'; export * from './LocalCacheDriver'; export * from './LocalQueueDriver'; export * from './PreAggregations'; +export * from './PreAggregationPartitionRangeLoader'; +export * from './PreAggregationLoader'; +export * from './PreAggregationLoadCache'; export * from './QueryCache'; export * from './QueryOrchestrator'; export * from './QueryQueue'; From d88bc25fb9584cf25e407275d1c92d2cfb2fd4e0 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 27 Feb 2025 13:20:14 +0200 Subject: [PATCH 09/14] refactor PreAggregations: delete moved --- .../src/orchestrator/PreAggregations.ts | 1811 +---------------- 1 file changed, 32 insertions(+), 1779 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 3e9baa32ccfad..3d3ecaa13f7b1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -1,44 +1,18 @@ -import crypto from 'crypto'; import R from 'ramda'; -import { - addSecondsToLocalTimestamp, - BUILD_RANGE_END_LOCAL, - BUILD_RANGE_START_LOCAL, - extractDate, - FROM_PARTITION_RANGE, - getEnv, - inDbTimeZone, - MAX_SOURCE_ROW_LIMIT, - MaybeCancelablePromise, - reformatInIsoLocal, - timeSeries, - TO_PARTITION_RANGE, - utcToLocalTimeZone, -} from '@cubejs-backend/shared'; - -import { - BaseDriver, - cancelCombinator, - DownloadQueryResultsResult, - DownloadTableData, - DriverCapabilities, - DriverInterface, - InlineTable, - isDownloadTableCSVData, - SaveCancelFn, - StreamOptions, - TableStructure, - UnloadOptions, -} from '@cubejs-backend/base-driver'; +import crypto from 'crypto'; +import { getEnv, } from '@cubejs-backend/shared'; + +import { BaseDriver, InlineTable, } from '@cubejs-backend/base-driver'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import LRUCache from 'lru-cache'; import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache'; -import { ContinueWaitError } from './ContinueWaitError'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; import { QueryQueue } from './QueryQueue'; -import { LargeStreamWarning } from './StreamObjectsCounter'; import { CacheAndQueryDriverType } from './QueryOrchestrator'; +import { PreAggregationPartitionRangeLoader } from './PreAggregationPartitionRangeLoader'; +import { PreAggregationLoader } from './PreAggregationLoader'; +import { PreAggregationLoadCache } from './PreAggregationLoadCache'; /// Name of the inline table containing the lambda rows. export const LAMBDA_TABLE_PREFIX = 'lambda'; @@ -51,7 +25,7 @@ function decodeTimeStamp(time) { return parseInt(time, 32) * 1000; } -function version(cacheKey) { +export function version(cacheKey) { let result = ''; const hashCharset = 'abcdefghijklmnopqrstuvwxyz012345'; @@ -79,12 +53,6 @@ function version(cacheKey) { return result; } -// There’re community developed and custom drivers which not always up-to-date with latest BaseDriver. -// Extra defence for drivers that don't expose now() yet. -function nowTimestamp(client: DriverInterface) { - return client.nowTimestamp?.() ?? new Date().getTime(); -} - // Returns the oldest timestamp, if any. export function getLastUpdatedAtTimestamp( timestamps: (number | undefined)[] @@ -97,7 +65,7 @@ export function getLastUpdatedAtTimestamp( } } -function getStructureVersion(preAggregation) { +export function getStructureVersion(preAggregation) { const versionArray = [preAggregation.structureVersionLoadSql || preAggregation.loadSql]; if (preAggregation.indexesSql && preAggregation.indexesSql.length) { versionArray.push(preAggregation.indexesSql); @@ -112,7 +80,7 @@ function getStructureVersion(preAggregation) { return version(versionArray.length === 1 ? versionArray[0] : versionArray); } -type VersionEntry = { +export type VersionEntry = { 'table_name': string, 'content_version': string, 'structure_version': string, @@ -121,21 +89,16 @@ type VersionEntry = { 'naming_version'?: number }; -type IndexesSql = { sql: [string, unknown[]], indexName: string }[]; -type InvalidationKeys = unknown[]; - -type QueryKey = [QueryTuple, IndexesSql, InvalidationKeys] | [QueryTuple, InvalidationKeys]; - -type QueryOptions = { - queryKey: QueryKey; - newVersionEntry: VersionEntry; - query: string; - values: unknown[]; - requestId: string; - buildRangeEnd?: string; +export type VersionEntriesObj = { + versionEntries: VersionEntry[], + byStructure: { [key: string]: VersionEntry }, + byContent: { [key: string]: VersionEntry }, + byTableName: { [key: string]: VersionEntry }, }; -type TableCacheEntry = { +export type InvalidationKeys = unknown[]; + +export type TableCacheEntry = { // eslint-disable-next-line camelcase table_name?: string; TABLE_NAME?: string; @@ -143,9 +106,9 @@ type TableCacheEntry = { build_range_end?: string; }; -type QueryDateRange = [string, string]; +export type QueryDateRange = [string, string]; -type PartitionRanges = { +export type PartitionRanges = { buildRange: QueryDateRange, partitionRanges: QueryDateRange[], }; @@ -170,6 +133,17 @@ type PreAggJob = { dataSource: string, }; +export type LoadPreAggregationResult = { + targetTableName: string; + refreshKeyValues: any[]; + lastUpdatedAt: number; + buildRangeEnd: string; + lambdaTable?: InlineTable; + queryKey?: any[]; + rollupLambdaId?: string; + partitionRange?: QueryDateRange; +}; + export type LambdaOptions = { maxSourceRows: number }; @@ -211,7 +185,7 @@ export type PreAggregationDescription = { lastRollupLambda?: boolean; }; -const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy( +export const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy( table => -table.last_updated_at, tables.map(table => { const match = (table.table_name || table.TABLE_NAME).match(/(.+)_(.+)_(.+)_(.+)/); @@ -241,1727 +215,6 @@ const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry }).filter(R.identity) ); -type PreAggregationLoadCacheOptions = { - requestId?: string, - dataSource: string, - tablePrefixes?: string[], -}; - -type VersionEntriesObj = { - versionEntries: VersionEntry[], - byStructure: { [key: string]: VersionEntry }, - byContent: { [key: string]: VersionEntry }, - byTableName: { [key: string]: VersionEntry }, -}; - -class PreAggregationLoadCache { - private driverFactory: DriverFactory; - - private queryCache: QueryCache; - - // eslint-disable-next-line no-use-before-define - private preAggregations: PreAggregations; - - private queryResults: any; - - private externalDriverFactory: any; - - private requestId: any; - - private versionEntries: { [redisKey: string]: Promise }; - - private tables: { [redisKey: string]: TableCacheEntry[] }; - - private tableColumnTypes: { [cacheKey: string]: { [tableName: string]: TableStructure } }; - - // TODO this is in memory cache structure as well however it depends on - // data source only and load cache is per data source for now. - // Make it per data source key in case load cache scope is broaden. - private queryStageState: any; - - private dataSource: string; - - private tablePrefixes: string[] | null; - - public constructor( - clientFactory: DriverFactory, - queryCache, - preAggregations, - options: PreAggregationLoadCacheOptions = { dataSource: 'default' } - ) { - this.dataSource = options.dataSource; - this.driverFactory = clientFactory; - this.queryCache = queryCache; - this.preAggregations = preAggregations; - this.queryResults = {}; - this.externalDriverFactory = preAggregations.externalDriverFactory; - this.requestId = options.requestId; - this.tablePrefixes = options.tablePrefixes; - this.versionEntries = {}; - this.tables = {}; - this.tableColumnTypes = {}; - } - - protected async tablesFromCache(preAggregation, forceRenew?) { - let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation)); - if (!tables) { - tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue( - 'query', - `Fetch tables for ${preAggregation.preAggregationsSchema}`, - { - preAggregation, requestId: this.requestId - }, - 0, - { requestId: this.requestId } - ); - } - return tables; - } - - public async fetchTables(preAggregation: PreAggregationDescription) { - if (preAggregation.external && !this.externalDriverFactory) { - throw new Error('externalDriverFactory is not provided. Please use CUBEJS_DEV_MODE=true or provide Cube Store connection env variables for production usage.'); - } - - const newTables = await this.fetchTablesNoCache(preAggregation); - await this.queryCache.getCacheDriver().set( - this.tablesCachePrefixKey(preAggregation), - newTables, - this.preAggregations.options.preAggregationsSchemaCacheExpire || 60 * 60 - ); - return newTables; - } - - private async fetchTablesNoCache(preAggregation: PreAggregationDescription) { - const client = preAggregation.external ? - await this.externalDriverFactory() : - await this.driverFactory(); - if (this.tablePrefixes && client.getPrefixTablesQuery && this.preAggregations.options.skipExternalCacheAndQueue) { - return client.getPrefixTablesQuery(preAggregation.preAggregationsSchema, this.tablePrefixes); - } - return client.getTablesQuery(preAggregation.preAggregationsSchema); - } - - public tablesCachePrefixKey(preAggregation: PreAggregationDescription) { - return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`); - } - - protected async getTablesQuery(preAggregation) { - const redisKey = this.tablesCachePrefixKey(preAggregation); - if (!this.tables[redisKey]) { - const tables = this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external ? - await this.fetchTablesNoCache(preAggregation) : - await this.tablesFromCache(preAggregation); - if (tables === undefined) { - throw new Error('Pre-aggregation tables are undefined.'); - } - this.tables[redisKey] = tables; - } - return this.tables[redisKey]; - } - - public async getTableColumnTypes(preAggregation: PreAggregationDescription, tableName: string): Promise { - const prefixKey = this.tablesCachePrefixKey(preAggregation); - if (!this.tableColumnTypes[prefixKey]?.[tableName]) { - if (!this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external) { - throw new Error(`Lambda union with source data feature is supported only by external rollups stored in Cube Store but was invoked for '${preAggregation.preAggregationId}'`); - } - const client = await this.externalDriverFactory(); - const columnTypes = await client.tableColumnTypes(tableName); - if (!this.tableColumnTypes[prefixKey]) { - this.tableColumnTypes[prefixKey] = {}; - } - this.tableColumnTypes[prefixKey][tableName] = columnTypes; - } - return this.tableColumnTypes[prefixKey][tableName]; - } - - private async calculateVersionEntries(preAggregation): Promise { - let versionEntries = tablesToVersionEntries( - preAggregation.preAggregationsSchema, - await this.getTablesQuery(preAggregation) - ); - // It presumes strong consistency guarantees for external pre-aggregation tables ingestion - if (!preAggregation.external) { - // eslint-disable-next-line - const [active, toProcess, queries] = await this.fetchQueryStageState(); - const targetTableNamesInQueue = (Object.keys(queries)) - // eslint-disable-next-line no-use-before-define - .map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry)); - - versionEntries = versionEntries.filter( - // eslint-disable-next-line no-use-before-define - e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1 - ); - } - - const byContent: { [key: string]: VersionEntry } = {}; - const byStructure: { [key: string]: VersionEntry } = {}; - const byTableName: { [key: string]: VersionEntry } = {}; - - versionEntries.forEach(e => { - const contentKey = `${e.table_name}_${e.content_version}`; - if (!byContent[contentKey]) { - byContent[contentKey] = e; - } - const structureKey = `${e.table_name}_${e.structure_version}`; - if (!byStructure[structureKey]) { - byStructure[structureKey] = e; - } - if (!byTableName[e.table_name]) { - byTableName[e.table_name] = e; - } - }); - - return { versionEntries, byContent, byStructure, byTableName }; - } - - public async getVersionEntries(preAggregation): Promise { - if (this.tablePrefixes && !this.tablePrefixes.find(p => preAggregation.tableName.split('.')[1].startsWith(p))) { - throw new Error(`Load cache tries to load table ${preAggregation.tableName} outside of tablePrefixes filter: ${this.tablePrefixes.join(', ')}`); - } - const redisKey = this.tablesCachePrefixKey(preAggregation); - if (!this.versionEntries[redisKey]) { - this.versionEntries[redisKey] = this.calculateVersionEntries(preAggregation).catch(e => { - delete this.versionEntries[redisKey]; - throw e; - }); - } - return this.versionEntries[redisKey]; - } - - public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) { - const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}]; - - if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) { - this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult( - query, - values, - [query, values], - 60 * 60, - { - renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold - || queryOptions?.renewalThreshold || 2 * 60, - renewalKey: [query, values], - waitForRenew, - priority, - requestId: this.requestId, - dataSource: this.dataSource, - useInMemory: true, - external: queryOptions?.external - } - ); - } - return this.queryResults[this.queryCache.queryRedisKey([query, values])]; - } - - public hasKeyQueryResult(keyQuery) { - return !!this.queryResults[this.queryCache.queryRedisKey(keyQuery)]; - } - - public async getQueryStage(stageQueryKey) { - const queue = await this.preAggregations.getQueue(this.dataSource); - await this.fetchQueryStageState(queue); - return queue.getQueryStage(stageQueryKey, undefined, this.queryStageState); - } - - protected async fetchQueryStageState(queue?) { - queue = queue || await this.preAggregations.getQueue(this.dataSource); - if (!this.queryStageState) { - this.queryStageState = await queue.fetchQueryStageState(); - } - return this.queryStageState; - } - - public async reset(preAggregation) { - await this.tablesFromCache(preAggregation, true); - this.tables = {}; - this.tableColumnTypes = {}; - this.queryStageState = undefined; - this.versionEntries = {}; - } -} - -type LoadPreAggregationResult = { - targetTableName: string; - refreshKeyValues: any[]; - lastUpdatedAt: number; - buildRangeEnd: string; - lambdaTable?: InlineTable; - queryKey?: any[]; - rollupLambdaId?: string; - partitionRange?: QueryDateRange; -}; - -export class PreAggregationLoader { - // eslint-disable-next-line no-use-before-define - private preAggregations: PreAggregations; - - public preAggregation: any; - - private preAggregationsTablesToTempTables: any; - - /** - * Determines whether current instance instantiated for a jobbed build query - * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or - * not. - */ - private isJob: boolean; - - private waitForRenew: boolean; - - private forceBuild: boolean; - - private orphanedTimeout: number; - - private externalDriverFactory: DriverFactory; - - private requestId: string; - - private metadata: any; - - private structureVersionPersistTime: any; - - private externalRefresh: boolean; - - public constructor( - private readonly driverFactory: DriverFactory, - private readonly logger: any, - private readonly queryCache: QueryCache, - // eslint-disable-next-line no-use-before-define - preAggregations: PreAggregations, - preAggregation, - preAggregationsTablesToTempTables, - private readonly loadCache: PreAggregationLoadCache, - options: any = {} - ) { - this.preAggregations = preAggregations; - this.preAggregation = preAggregation; - this.preAggregationsTablesToTempTables = preAggregationsTablesToTempTables; - this.isJob = !!options.isJob; - this.waitForRenew = options.waitForRenew; - this.forceBuild = options.forceBuild; - this.orphanedTimeout = options.orphanedTimeout; - this.externalDriverFactory = preAggregations.externalDriverFactory; - this.requestId = options.requestId; - this.metadata = options.metadata; - this.structureVersionPersistTime = preAggregations.structureVersionPersistTime; - this.externalRefresh = options.externalRefresh; - - if (this.externalRefresh && this.waitForRenew) { - const message = 'Invalid configuration - when externalRefresh is true, it will not perform a renew, therefore you cannot wait for it using waitForRenew.'; - if (['production', 'test'].includes(getEnv('nodeEnv'))) { - throw new Error(message); - } else { - this.logger('Invalid Configuration', { - requestId: this.requestId, - warning: message, - }); - this.waitForRenew = false; - } - } - } - - public async loadPreAggregation( - throwOnMissingPartition: boolean, - ): Promise { - const notLoadedKey = (this.preAggregation.invalidateKeyQueries || []) - .find(keyQuery => !this.loadCache.hasKeyQueryResult(keyQuery)); - - if (this.isJob || !(notLoadedKey && !this.waitForRenew)) { - // Case 1: pre-agg build job processing. - // Case 2: either we have no data cached for this rollup or waitForRenew - // is true, either way, synchronously renew what data is needed so that - // the most current data will be returned fo the current request. - const result = await this.loadPreAggregationWithKeys(); - const refreshKeyValues = await this.getInvalidationKeyValues(); - return { - ...result, - refreshKeyValues, - queryKey: this.isJob - // We need to return a queryKey value for the jobed build query - // (initialized by the /cubejs-system/v1/pre-aggregations/jobs - // endpoint) as a part of the response to make it possible to get a - // query result from the cache by the other API call. - ? this.preAggregationQueryKey(refreshKeyValues) - : undefined, - }; - } else { - // Case 3: pre-agg exists - const structureVersion = getStructureVersion(this.preAggregation); - const getVersionsStarted = new Date(); - const { byStructure } = await this.loadCache.getVersionEntries(this.preAggregation); - this.logger('Load PreAggregations Tables', { - preAggregation: this.preAggregation, - requestId: this.requestId, - duration: (new Date().getTime() - getVersionsStarted.getTime()) - }); - - const versionEntryByStructureVersion = byStructure[`${this.preAggregation.tableName}_${structureVersion}`]; - if (this.externalRefresh) { - if (!versionEntryByStructureVersion && throwOnMissingPartition) { - // eslint-disable-next-line no-use-before-define - throw new Error(PreAggregations.noPreAggregationPartitionsBuiltMessage([this.preAggregation])); - } - if (!versionEntryByStructureVersion) { - return null; - } else { - // the rollups are being maintained independently of this instance of cube.js - // immediately return the latest rollup data that instance already has - return { - targetTableName: this.targetTableName(versionEntryByStructureVersion), - refreshKeyValues: [], - lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, - buildRangeEnd: versionEntryByStructureVersion.build_range_end, - }; - } - } - - if (versionEntryByStructureVersion) { - // this triggers an asynchronous/background load of the pre-aggregation but immediately - // returns the latest data it already has - this.loadPreAggregationWithKeys().catch(e => { - if (!(e instanceof ContinueWaitError)) { - this.logger('Error loading pre-aggregation', { - error: (e.stack || e), - preAggregation: this.preAggregation, - requestId: this.requestId - }); - } - }); - return { - targetTableName: this.targetTableName(versionEntryByStructureVersion), - refreshKeyValues: [], - lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, - buildRangeEnd: versionEntryByStructureVersion.build_range_end, - }; - } else { - // no rollup has been built yet - build it synchronously as part of responding to this request - return this.loadPreAggregationWithKeys(); - } - } - } - - protected async loadPreAggregationWithKeys(): Promise { - const invalidationKeys = await this.getPartitionInvalidationKeyValues(); - - const contentVersion = this.contentVersion(invalidationKeys); - const structureVersion = getStructureVersion(this.preAggregation); - - const versionEntries = await this.loadCache.getVersionEntries(this.preAggregation); - - const getVersionEntryByContentVersion = ({ byContent }: VersionEntriesObj) => byContent[`${this.preAggregation.tableName}_${contentVersion}`]; - - const versionEntryByContentVersion = getVersionEntryByContentVersion(versionEntries); - if (versionEntryByContentVersion && !this.forceBuild) { - const targetTableName = this.targetTableName(versionEntryByContentVersion); - // No need to block here - this.updateLastTouch(targetTableName); - return { - targetTableName, - refreshKeyValues: [], - lastUpdatedAt: versionEntryByContentVersion.last_updated_at, - buildRangeEnd: versionEntryByContentVersion.build_range_end, - }; - } - - if (!this.waitForRenew && !this.forceBuild) { - const versionEntryByStructureVersion = versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`]; - if (versionEntryByStructureVersion) { - const targetTableName = this.targetTableName(versionEntryByStructureVersion); - // No need to block here - this.updateLastTouch(targetTableName); - return { - targetTableName, - refreshKeyValues: [], - lastUpdatedAt: versionEntryByStructureVersion.last_updated_at, - buildRangeEnd: versionEntryByStructureVersion.build_range_end, - }; - } - } - - const client = this.preAggregation.external ? - await this.externalDriverFactory() : - await this.driverFactory(); - - if (!versionEntries.versionEntries.length) { - await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); - } - - // ensure we find appropriate structure version before invalidating anything - const versionEntry = - versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`] || - versionEntries.byTableName[this.preAggregation.tableName]; - - const newVersionEntry = { - table_name: this.preAggregation.tableName, - structure_version: structureVersion, - content_version: contentVersion, - last_updated_at: nowTimestamp(client), - naming_version: 2, - }; - - const mostRecentResult: () => Promise = async () => { - await this.loadCache.reset(this.preAggregation); - const lastVersion = getVersionEntryByContentVersion( - await this.loadCache.getVersionEntries(this.preAggregation) - ); - if (!lastVersion) { - throw new Error(`Pre-aggregation table is not found for ${this.preAggregation.tableName} after it was successfully created`); - } - const targetTableName = this.targetTableName(lastVersion); - this.updateLastTouch(targetTableName); - return { - targetTableName, - refreshKeyValues: [], - lastUpdatedAt: lastVersion.last_updated_at, - buildRangeEnd: lastVersion.build_range_end, - }; - }; - - if (this.forceBuild) { - this.logger('Force build pre-aggregation', { - preAggregation: this.preAggregation, - requestId: this.requestId, - metadata: this.metadata, - queryKey: this.preAggregationQueryKey(invalidationKeys), - newVersionEntry - }); - if (this.isJob) { - // We don't want to wait for the jobed build query result. So we run the - // executeInQueue method and immediately return the LoadPreAggregationResult object. - this - .executeInQueue(invalidationKeys, this.priority(10), newVersionEntry) - .catch((e: any) => { - this.logger('Pre-aggregations build job error', { - preAggregation: this.preAggregation, - requestId: this.requestId, - newVersionEntry, - error: (e.stack || e), - }); - }); - const targetTableName = this.targetTableName(newVersionEntry); - this.updateLastTouch(targetTableName); - return { - targetTableName, - refreshKeyValues: [], - lastUpdatedAt: newVersionEntry.last_updated_at, - buildRangeEnd: this.preAggregation.buildRangeEnd, - }; - } else { - await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); - return mostRecentResult(); - } - } - - if (versionEntry) { - if (versionEntry.structure_version !== newVersionEntry.structure_version) { - this.logger('Invalidating pre-aggregation structure', { - preAggregation: this.preAggregation, - requestId: this.requestId, - queryKey: this.preAggregationQueryKey(invalidationKeys), - newVersionEntry - }); - await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); - return mostRecentResult(); - } else if (versionEntry.content_version !== newVersionEntry.content_version) { - if (this.waitForRenew) { - this.logger('Waiting for pre-aggregation renew', { - preAggregation: this.preAggregation, - requestId: this.requestId, - queryKey: this.preAggregationQueryKey(invalidationKeys), - newVersionEntry - }); - await this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry); - return mostRecentResult(); - } else { - this.scheduleRefresh(invalidationKeys, newVersionEntry); - } - } - } else { - this.logger('Creating pre-aggregation from scratch', { - preAggregation: this.preAggregation, - requestId: this.requestId, - queryKey: this.preAggregationQueryKey(invalidationKeys), - newVersionEntry - }); - await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry); - return mostRecentResult(); - } - const targetTableName = this.targetTableName(versionEntry); - this.updateLastTouch(targetTableName); - return { - targetTableName, - refreshKeyValues: [], - lastUpdatedAt: versionEntry.last_updated_at, - buildRangeEnd: versionEntry.build_range_end, - }; - } - - private updateLastTouch(tableName: string) { - this.preAggregations.updateLastTouch(tableName).catch(e => { - this.logger('Error on pre-aggregation touch', { - error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId, - }); - }); - } - - protected contentVersion(invalidationKeys) { - const versionArray = [this.preAggregation.structureVersionLoadSql || this.preAggregation.loadSql]; - if (this.preAggregation.indexesSql && this.preAggregation.indexesSql.length) { - versionArray.push(this.preAggregation.indexesSql); - } - if (this.preAggregation.streamOffset) { - versionArray.push(this.preAggregation.streamOffset); - } - if (this.preAggregation.outputColumnTypes) { - versionArray.push(this.preAggregation.outputColumnTypes); - } - versionArray.push(invalidationKeys); - return version(versionArray); - } - - protected priority(defaultValue: number): number { - return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue; - } - - protected getInvalidationKeyValues() { - return Promise.all( - (this.preAggregation.invalidateKeyQueries || []).map( - (sqlQuery) => this.loadCache.keyQueryResult(sqlQuery, this.waitForRenew, this.priority(10)) - ) - ); - } - - protected getPartitionInvalidationKeyValues() { - if (this.preAggregation.partitionInvalidateKeyQueries) { - return Promise.all( - (this.preAggregation.partitionInvalidateKeyQueries || []).map( - (sqlQuery) => this.loadCache.keyQueryResult(sqlQuery, this.waitForRenew, this.priority(10)) - ) - ); - } else { - return this.getInvalidationKeyValues(); - } - } - - protected scheduleRefresh(invalidationKeys, newVersionEntry) { - this.logger('Refreshing pre-aggregation content', { - preAggregation: this.preAggregation, - requestId: this.requestId, - queryKey: this.preAggregationQueryKey(invalidationKeys), - newVersionEntry - }); - this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry) - .catch(e => { - if (!(e instanceof ContinueWaitError)) { - this.logger('Error refreshing pre-aggregation', { - error: (e.stack || e), preAggregation: this.preAggregation, requestId: this.requestId - }); - } - }); - } - - protected async executeInQueue(invalidationKeys, priority, newVersionEntry) { - const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource); - return queue.executeInQueue( - 'query', - this.preAggregationQueryKey(invalidationKeys), - { - preAggregation: this.preAggregation, - preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables, - newVersionEntry, - requestId: this.requestId, - invalidationKeys, - forceBuild: this.forceBuild, - isJob: this.isJob, - metadata: this.metadata, - orphanedTimeout: this.orphanedTimeout, - }, - priority, - // eslint-disable-next-line no-use-before-define - { stageQueryKey: PreAggregations.preAggregationQueryCacheKey(this.preAggregation), requestId: this.requestId } - ); - } - - protected preAggregationQueryKey(invalidationKeys: InvalidationKeys): QueryKey { - return this.preAggregation.indexesSql && this.preAggregation.indexesSql.length ? - [this.preAggregation.loadSql, this.preAggregation.indexesSql, invalidationKeys] : - [this.preAggregation.loadSql, invalidationKeys]; - } - - protected targetTableName(versionEntry: VersionEntry): string { - // eslint-disable-next-line no-use-before-define - return PreAggregations.targetTableName(versionEntry); - } - - public refresh(newVersionEntry: VersionEntry, invalidationKeys: InvalidationKeys, client) { - this.updateLastTouch(this.targetTableName(newVersionEntry)); - let refreshStrategy = this.refreshStoreInSourceStrategy; - if (this.preAggregation.external) { - const readOnly = - this.preAggregation.readOnly || - client.config && client.config.readOnly || - client.readOnly && (typeof client.readOnly === 'boolean' ? client.readOnly : client.readOnly()); - - if (readOnly) { - refreshStrategy = this.refreshReadOnlyExternalStrategy; - } else { - refreshStrategy = this.refreshWriteStrategy; - } - } - return cancelCombinator( - saveCancelFn => refreshStrategy.bind(this)( - client, - newVersionEntry, - saveCancelFn, - invalidationKeys - ) - ); - } - - protected logExecutingSql(payload) { - this.logger( - 'Executing Load Pre Aggregation SQL', - payload - ); - } - - protected queryOptions(invalidationKeys: InvalidationKeys, query: string, params: unknown[], targetTableName: string, newVersionEntry: VersionEntry) { - return { - queryKey: this.preAggregationQueryKey(invalidationKeys), - query, - values: params, - targetTableName, - requestId: this.requestId, - newVersionEntry, - buildRangeEnd: this.preAggregation.buildRangeEnd, - }; - } - - protected async refreshStoreInSourceStrategy( - client: DriverInterface, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - invalidationKeys: InvalidationKeys - ) { - const [loadSql, params] = - Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; - const targetTableName = this.targetTableName(newVersionEntry); - const query = ( - QueryCache.replacePreAggregationTableNames( - loadSql, - this.preAggregationsTablesToTempTables, - ) - ).replace( - this.preAggregation.tableName, - targetTableName - ); - const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry); - this.logExecutingSql(queryOptions); - - try { - // TODO move index creation to the driver - await saveCancelFn(client.loadPreAggregationIntoTable( - targetTableName, - query, - params, - { - streamOffset: this.preAggregation.streamOffset, - outputColumnTypes: this.preAggregation.outputColumnTypes, - ...queryOptions - } - )); - - await this.createIndexes(client, newVersionEntry, saveCancelFn, queryOptions); - await this.loadCache.fetchTables(this.preAggregation); - } finally { - // We must clean orphaned in any cases: success or exception - await this.dropOrphanedTables(client, targetTableName, saveCancelFn, false, queryOptions); - await this.loadCache.fetchTables(this.preAggregation); - } - } - - protected async refreshWriteStrategy( - client: DriverInterface, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - invalidationKeys: InvalidationKeys, - ) { - const capabilities = client?.capabilities(); - - const withTempTable = !(capabilities?.unloadWithoutTempTable); - const dropSourceTempTable = !capabilities?.streamingSource; - - return this.runWriteStrategy( - client, - newVersionEntry, - saveCancelFn, - invalidationKeys, - withTempTable, - dropSourceTempTable - ); - } - - /** - * Runs export strategy with write access in data source - */ - protected async runWriteStrategy( - client: DriverInterface, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - invalidationKeys: InvalidationKeys, - withTempTable: boolean, - dropSourceTempTable: boolean, - ) { - if (withTempTable) { - await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); - } - const targetTableName = this.targetTableName(newVersionEntry); - const queryOptions = await this.prepareWriteStrategy( - client, - targetTableName, - newVersionEntry, - saveCancelFn, - invalidationKeys, - withTempTable, - ); - - try { - const tableData = await this.downloadExternalPreAggregation( - client, - newVersionEntry, - saveCancelFn, - queryOptions, - withTempTable, - ); - - try { - await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn, queryOptions); - } finally { - if (tableData && tableData.release) { - await tableData.release(); - } - } - } finally { - await this.cleanupWriteStrategy( - client, - targetTableName, - queryOptions, - saveCancelFn, - withTempTable, - dropSourceTempTable, - ); - } - } - - /** - * Cleanup tables after write strategy - */ - protected async cleanupWriteStrategy( - client: DriverInterface, - targetTableName: string, - queryOptions: QueryOptions, - saveCancelFn: SaveCancelFn, - withTempTable: boolean, - dropSourceTempTable: boolean, - ) { - if (withTempTable && dropSourceTempTable) { - await this.withDropLock(false, async () => { - this.logger('Dropping source temp table', queryOptions); - - const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema); - const mappedActualTables = actualTables.map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`); - if (mappedActualTables.includes(targetTableName)) { - await client.dropTable(targetTableName); - } - }); - } - - // We must clean orphaned in any cases: success or exception - await this.loadCache.fetchTables(this.preAggregation); - await this.dropOrphanedTables(client, targetTableName, saveCancelFn, false, queryOptions); - } - - /** - * Create table (if required) and prepares query options object - */ - protected async prepareWriteStrategy( - client: DriverInterface, - targetTableName: string, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - invalidationKeys: InvalidationKeys, - withTempTable: boolean - ): Promise { - if (withTempTable) { - const [loadSql, params] = - Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; - - const query = ( - QueryCache.replacePreAggregationTableNames( - loadSql, - this.preAggregationsTablesToTempTables, - ) - ).replace( - this.preAggregation.tableName, - targetTableName - ); - const queryOptions = this.queryOptions(invalidationKeys, query, params, targetTableName, newVersionEntry); - this.logExecutingSql(queryOptions); - await saveCancelFn(client.loadPreAggregationIntoTable( - targetTableName, - query, - params, - { - streamOffset: this.preAggregation.streamOffset, - outputColumnTypes: this.preAggregation.outputColumnTypes, - ...queryOptions - } - )); - - return queryOptions; - } else { - const [sql, params] = - Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; - const queryOptions = this.queryOptions(invalidationKeys, sql, params, targetTableName, newVersionEntry); - this.logExecutingSql(queryOptions); - return queryOptions; - } - } - - /** - * Strategy to copy pre-aggregation from source db (for read-only permissions) to external data - */ - protected async refreshReadOnlyExternalStrategy( - client: DriverInterface, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - invalidationKeys: InvalidationKeys - ) { - const [sql, params] = - Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; - - const queryOptions = this.queryOptions(invalidationKeys, sql, params, this.targetTableName(newVersionEntry), newVersionEntry); - this.logExecutingSql(queryOptions); - this.logger('Downloading external pre-aggregation via query', queryOptions); - const externalDriver = await this.externalDriverFactory(); - const capabilities = externalDriver.capabilities && externalDriver.capabilities(); - - let tableData: DownloadQueryResultsResult; - - if (capabilities.csvImport && client.unloadFromQuery && await client.isUnloadSupported(this.getUnloadOptions())) { - tableData = await saveCancelFn( - client.unloadFromQuery( - sql, - params, - this.getUnloadOptions(), - ) - ).catch((error: any) => { - this.logger('Downloading external pre-aggregation via query error', { ...queryOptions, error: error.stack || error.message }); - throw error; - }); - } else { - tableData = await saveCancelFn(client.downloadQueryResults( - sql, - params, { - streamOffset: this.preAggregation.streamOffset, - outputColumnTypes: this.preAggregation.outputColumnTypes, - ...queryOptions, - ...capabilities, - ...this.getStreamingOptions(), - } - )).catch((error: any) => { - this.logger('Downloading external pre-aggregation via query error', { ...queryOptions, error: error.stack || error.message }); - throw error; - }); - } - - this.logger('Downloading external pre-aggregation via query completed', { - ...queryOptions, - isUnloadSupported: isDownloadTableCSVData(tableData) - }); - - try { - await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn, queryOptions); - } finally { - if (tableData.release) { - await tableData.release(); - } - } - - await this.loadCache.fetchTables(this.preAggregation); - } - - protected getUnloadOptions(): UnloadOptions { - return { - // Default: 16mb for Snowflake, Should be specified in MBs, because drivers convert it - maxFileSize: 64 - }; - } - - protected getStreamingOptions(): StreamOptions { - return { - // Default: 16384 (16KB), or 16 for objectMode streams. PostgreSQL/MySQL use object streams - highWaterMark: 10000 - }; - } - - /** - * prepares download data for future cube store usage - */ - protected async downloadExternalPreAggregation( - client: DriverInterface, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - queryOptions: QueryOptions, - withTempTable: boolean - ) { - const table = this.targetTableName(newVersionEntry); - this.logger('Downloading external pre-aggregation', queryOptions); - - try { - const externalDriver = await this.externalDriverFactory(); - const capabilities = externalDriver.capabilities && externalDriver.capabilities(); - - let tableData: DownloadTableData; - if (withTempTable) { - tableData = await this.getTableDataWithTempTable(client, table, saveCancelFn, queryOptions, capabilities); - } else { - tableData = await this.getTableDataWithoutTempTable(client, table, saveCancelFn, queryOptions, capabilities); - } - - this.logger('Downloading external pre-aggregation completed', { - ...queryOptions, - isUnloadSupported: isDownloadTableCSVData(tableData) - }); - - return tableData; - } catch (error: any) { - this.logger('Downloading external pre-aggregation error', { - ...queryOptions, - error: error?.stack || error?.message - }); - throw error; - } - } - - /** - * prepares download data when temp table = true - */ - protected async getTableDataWithTempTable(client: DriverInterface, table: string, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions, externalDriverCapabilities: DriverCapabilities) { - let tableData: DownloadTableData; - - if (externalDriverCapabilities.csvImport && client.unload && await client.isUnloadSupported(this.getUnloadOptions())) { - tableData = await saveCancelFn( - client.unload(table, this.getUnloadOptions()), - ); - } else if (externalDriverCapabilities.streamImport && client.stream) { - tableData = await saveCancelFn( - client.stream(`SELECT * FROM ${table}`, [], this.getStreamingOptions()) - ); - - if (client.unload) { - const stream = new LargeStreamWarning(this.preAggregation.preAggregationId, (msg) => { - this.logger('Downloading external pre-aggregation warning', { - ...queryOptions, - error: msg - }); - }); - tableData.rowStream.pipe(stream); - tableData.rowStream = stream; - } - } else { - tableData = await saveCancelFn(client.downloadTable(table, { - streamOffset: this.preAggregation.streamOffset, - outputColumnTypes: this.preAggregation.outputColumnTypes, - ...externalDriverCapabilities - })); - } - - if (!tableData.types) { - tableData.types = await saveCancelFn(client.tableColumnTypes(table)); - } - - return tableData; - } - - /** - * prepares download data when temp table = false - */ - protected async getTableDataWithoutTempTable(client: DriverInterface, table: string, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions, externalDriverCapabilities: DriverCapabilities) { - const [sql, params] = - Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; - - let tableData: DownloadTableData; - - if (externalDriverCapabilities.csvImport && client.unload && await client.isUnloadSupported(this.getUnloadOptions())) { - return saveCancelFn( - client.unload( - table, - { ...this.getUnloadOptions(), query: { sql, params } }, - ) - ); - } else if (externalDriverCapabilities.streamImport && client.stream) { - tableData = await saveCancelFn( - client.stream(sql, params, this.getStreamingOptions()) - ); - - if (client.unload) { - const stream = new LargeStreamWarning(this.preAggregation.preAggregationId, (msg) => { - this.logger('Downloading external pre-aggregation warning', { - ...queryOptions, - error: msg - }); - }); - tableData.rowStream.pipe(stream); - tableData.rowStream = stream; - } - } else { - tableData = { rows: await saveCancelFn(client.query(sql, params)) }; - } - - if (!tableData.types && client.queryColumnTypes) { - tableData.types = await saveCancelFn(client.queryColumnTypes(sql, params)); - } - - return tableData; - } - - protected async uploadExternalPreAggregation( - tableData: DownloadTableData, - newVersionEntry: VersionEntry, - saveCancelFn: SaveCancelFn, - queryOptions: QueryOptions - ) { - const externalDriver: DriverInterface = await this.externalDriverFactory(); - const table = this.targetTableName(newVersionEntry); - - this.logger('Uploading external pre-aggregation', queryOptions); - await saveCancelFn( - externalDriver.uploadTableWithIndexes( - table, - tableData.types, - tableData, - this.prepareIndexesSql(newVersionEntry, queryOptions), - this.preAggregation.uniqueKeyColumns, - queryOptions, - { - aggregationsColumns: this.preAggregation.aggregationsColumns, - createTableIndexes: this.prepareCreateTableIndexes(newVersionEntry), - sealAt: this.preAggregation.sealAt - } - ) - ).catch((error: any) => { - this.logger('Uploading external pre-aggregation error', { ...queryOptions, error: error?.stack || error?.message }); - throw error; - }); - this.logger('Uploading external pre-aggregation completed', queryOptions); - - await this.loadCache.fetchTables(this.preAggregation); - await this.dropOrphanedTables(externalDriver, table, saveCancelFn, true, queryOptions); - } - - protected async createIndexes(driver, newVersionEntry: VersionEntry, saveCancelFn: SaveCancelFn, queryOptions: QueryOptions) { - const indexesSql = this.prepareIndexesSql(newVersionEntry, queryOptions); - for (let i = 0; i < indexesSql.length; i++) { - const [query, params] = indexesSql[i].sql; - await saveCancelFn(driver.query(query, params)); - } - } - - protected prepareIndexesSql(newVersionEntry: VersionEntry, queryOptions: QueryOptions) { - if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) { - return []; - } - return this.preAggregation.indexesSql.map(({ sql, indexName }) => { - const [query, params] = sql; - const indexVersionEntry = { - ...newVersionEntry, - table_name: indexName - }; - this.logger('Creating pre-aggregation index', queryOptions); - const resultingSql = QueryCache.replacePreAggregationTableNames( - query, - this.preAggregationsTablesToTempTables.concat([ - [this.preAggregation.tableName, { targetTableName: this.targetTableName(newVersionEntry) }], - [indexName, { targetTableName: this.targetTableName(indexVersionEntry) }] - ]) - ); - return { sql: [resultingSql, params] }; - }); - } - - protected prepareCreateTableIndexes(newVersionEntry: VersionEntry) { - if (!this.preAggregation.createTableIndexes || !this.preAggregation.createTableIndexes.length) { - return []; - } - return this.preAggregation.createTableIndexes.map(({ indexName, type, columns }) => { - const indexVersionEntry = { - ...newVersionEntry, - table_name: indexName - }; - return { indexName: this.targetTableName(indexVersionEntry), type, columns }; - }); - } - - private async withDropLock(external: boolean, lockFn: () => MaybeCancelablePromise): Promise { - const lockKey = this.dropLockKey(external); - return this.queryCache.withLock(lockKey, 60 * 5, lockFn); - } - - protected async dropOrphanedTables( - client: DriverInterface, - justCreatedTable: string, - saveCancelFn: SaveCancelFn, - external: boolean, - queryOptions: QueryOptions - ) { - await this.preAggregations.addTableUsed(justCreatedTable); - - return this.withDropLock(external, async () => { - this.logger('Dropping orphaned tables', { ...queryOptions, external }); - const actualTables = await client.getTablesQuery( - this.preAggregation.preAggregationsSchema, - ); - const versionEntries = tablesToVersionEntries( - this.preAggregation.preAggregationsSchema, - actualTables, - ); - const versionEntriesToSave = R.pipe< - VersionEntry[], - { [index: string]: VersionEntry[] }, - Array<[string, VersionEntry[]]>, - VersionEntry[] - >( - R.groupBy(v => v.table_name), - R.toPairs, - R.map(p => p[1][0]) - )(versionEntries); - const structureVersionsToSave = R.pipe< - VersionEntry[], - VersionEntry[], - { [index: string]: VersionEntry[] }, - Array<[string, VersionEntry[]]>, - VersionEntry[] - >( - R.filter( - (v: VersionEntry) => ( - new Date().getTime() - v.last_updated_at < - this.structureVersionPersistTime * 1000 - ) - ), - R.groupBy(v => `${v.table_name}_${v.structure_version}`), - R.toPairs, - R.map(p => p[1][0]) - )(versionEntries); - - const refreshEndReached = await this.preAggregations.getRefreshEndReached(); - const toSave = - this.preAggregations.dropPreAggregationsWithoutTouch && refreshEndReached - ? (await this.preAggregations.tablesUsed()) - .concat(await this.preAggregations.tablesTouched()) - .concat([justCreatedTable]) - : (await this.preAggregations.tablesUsed()) - .concat(structureVersionsToSave.map(v => this.targetTableName(v))) - .concat(versionEntriesToSave.map(v => this.targetTableName(v))) - .concat([justCreatedTable]); - const toDrop = actualTables - .map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`) - .filter(t => toSave.indexOf(t) === -1); - - await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table)))); - this.logger('Dropping orphaned tables completed', { - ...queryOptions, - external, - tablesToDrop: JSON.stringify(toDrop), - }); - }); - } - - private dropLockKey(external: boolean) { - return external - ? 'drop-orphaned-tables-external' - : `drop-orphaned-tables:${this.preAggregation.dataSource}`; - } -} - -interface PreAggsPartitionRangeLoaderOpts { - maxPartitions: number; - maxSourceRowLimit: number; - waitForRenew?: boolean; - requestId?: string; - externalRefresh?: boolean; - forceBuild?: boolean; - metadata?: any; - orphanedTimeout?: number; - lambdaQuery?: LambdaQuery; - isJob?: boolean; - compilerCacheFn?: (subKey: string[], cacheFn: () => T) => T; -} - -export class PreAggregationPartitionRangeLoader { - /** - * Determines whether current instance instantiated for a jobbed build query - * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or - * not. - */ - protected isJob: boolean; - - protected waitForRenew: boolean; - - protected requestId: string; - - protected lambdaQuery: LambdaQuery; - - protected dataSource: string; - - protected compilerCacheFn: (subKey: string[], cacheFn: () => T) => T; - - public constructor( - private readonly driverFactory: DriverFactory, - private readonly logger: any, - private readonly queryCache: QueryCache, - // eslint-disable-next-line no-use-before-define - private readonly preAggregations: PreAggregations, - private readonly preAggregation: PreAggregationDescription, - private readonly preAggregationsTablesToTempTables: [string, LoadPreAggregationResult][], - private readonly loadCache: PreAggregationLoadCache, - private readonly options: PreAggsPartitionRangeLoaderOpts = { - maxPartitions: 10000, - maxSourceRowLimit: 10000, - }, - ) { - this.isJob = !!options.isJob; - this.waitForRenew = options.waitForRenew; - this.requestId = options.requestId; - this.lambdaQuery = options.lambdaQuery; - this.dataSource = preAggregation.dataSource; - this.compilerCacheFn = options.compilerCacheFn || ((subKey, cacheFn) => cacheFn()); - } - - private async loadRangeQuery(rangeQuery: QueryTuple, partitionRange?: QueryDateRange) { - const [query, values, queryOptions]: QueryTuple = rangeQuery; - const invalidate = - this.preAggregation.invalidateKeyQueries && - this.preAggregation.invalidateKeyQueries[0] - ? this.preAggregation.invalidateKeyQueries[0].slice(0, 2) - : false; - - return this.queryCache.cacheQueryResult( - query, - values, - QueryCache.queryCacheKey({ - query, - values: (values), - invalidate, - }), - 24 * 60 * 60, - { - renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold - || queryOptions?.renewalThreshold || 24 * 60 * 60, - waitForRenew: this.waitForRenew, - priority: this.priority(10), - requestId: this.requestId, - dataSource: this.dataSource, - useInMemory: true, - external: queryOptions?.external, - renewalKey: partitionRange ? await this.getInvalidationKeyValues(partitionRange) : null, - } - ); - } - - protected getInvalidationKeyValues(range) { - const partitionTableName = PreAggregationPartitionRangeLoader.partitionTableName( - this.preAggregation.tableName, this.preAggregation.partitionGranularity, range - ); - return Promise.all( - (this.preAggregation.invalidateKeyQueries || []).map( - (sqlQuery) => ( - this.loadCache.keyQueryResult( - this.replacePartitionSqlAndParams(sqlQuery, range, partitionTableName), this.waitForRenew, this.priority(10) - ) - ) - ) - ); - } - - protected priority(defaultValue) { - return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue; - } - - public async replaceQueryBuildRangeParams(queryValues: string[]): Promise { - if (queryValues?.find(p => p === BUILD_RANGE_START_LOCAL || p === BUILD_RANGE_END_LOCAL)) { - const [buildRangeStart, buildRangeEnd] = await this.loadBuildRange(); - return queryValues?.map( - param => { - if (param === BUILD_RANGE_START_LOCAL) { - return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeStart); - } else if (param === BUILD_RANGE_END_LOCAL) { - return utcToLocalTimeZone(this.preAggregation.timezone, this.preAggregation.timestampFormat, buildRangeEnd); - } else { - return param; - } - }, - ); - } - return null; - } - - private replacePartitionSqlAndParams( - query: QueryWithParams, - dateRange: QueryDateRange, - partitionTableName: string - ): QueryWithParams { - const [sql, params, options] = query; - const updateWindowToBoundary = options?.incremental && addSecondsToLocalTimestamp( - dateRange[1], this.preAggregation.timezone, options?.updateWindowSeconds || 0 - ); - return [sql.replace(this.preAggregation.tableName, partitionTableName), params?.map( - param => { - if (dateRange && param === FROM_PARTITION_RANGE) { - return PreAggregationPartitionRangeLoader.inDbTimeZone(this.preAggregation, dateRange[0]); - } else if (dateRange && param === TO_PARTITION_RANGE) { - return PreAggregationPartitionRangeLoader.inDbTimeZone(this.preAggregation, dateRange[1]); - } else { - return param; - } - }, - ), { - ...options, - renewalThreshold: - options?.incremental && updateWindowToBoundary < new Date() ? - // if updateWindowToBoundary passed just moments ago we want to renew it earlier in case - // of server and db clock don't match - Math.min( - Math.round((new Date().getTime() - updateWindowToBoundary.getTime()) / 1000), - options?.renewalThresholdOutsideUpdateWindow - ) : - options?.renewalThreshold - }]; - } - - private partitionPreAggregationDescription(range: QueryDateRange, buildRange: QueryDateRange): PreAggregationDescription { - const partitionTableName = PreAggregationPartitionRangeLoader.partitionTableName( - this.preAggregation.tableName, this.preAggregation.partitionGranularity, range - ); - const [_, buildRangeEnd] = buildRange; - const loadRange: [string, string] = [...range]; - const partitionInvalidateKeyQueries = this.preAggregation.partitionInvalidateKeyQueries || this.preAggregation.invalidateKeyQueries; - // `partitionInvalidateKeyQueries = []` in case of real time - if ((!partitionInvalidateKeyQueries || partitionInvalidateKeyQueries.length > 0) && buildRangeEnd < range[1]) { - loadRange[1] = buildRangeEnd; - } - const sealAt = addSecondsToLocalTimestamp( - loadRange[1], this.preAggregation.timezone, this.preAggregation.updateWindowSeconds || 0 - ).toISOString(); - return { - ...this.preAggregation, - tableName: partitionTableName, - structureVersionLoadSql: this.preAggregation.loadSql && - this.replacePartitionSqlAndParams(this.preAggregation.loadSql, range, partitionTableName), - loadSql: this.preAggregation.loadSql && - this.replacePartitionSqlAndParams(this.preAggregation.loadSql, loadRange, partitionTableName), - sql: this.preAggregation.sql && - this.replacePartitionSqlAndParams(this.preAggregation.sql, loadRange, partitionTableName), - invalidateKeyQueries: (this.preAggregation.invalidateKeyQueries || []) - .map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)), - partitionInvalidateKeyQueries: this.preAggregation.partitionInvalidateKeyQueries && - this.preAggregation.partitionInvalidateKeyQueries.map(q => this.replacePartitionSqlAndParams(q, range, partitionTableName)), - indexesSql: (this.preAggregation.indexesSql || []) - .map(q => ({ ...q, sql: this.replacePartitionSqlAndParams(q.sql, range, partitionTableName) })), - previewSql: this.preAggregation.previewSql && - this.replacePartitionSqlAndParams(this.preAggregation.previewSql, range, partitionTableName), - buildRangeStart: loadRange[0], - buildRangeEnd: loadRange[1], - sealAt, // Used only for kSql pre aggregations - }; - } - - public async loadPreAggregations(): Promise { - if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) { - const loadPreAggregationsByPartitionRanges = async ({ buildRange, partitionRanges }: PartitionRanges) => { - const partitionLoaders = partitionRanges.map(range => new PreAggregationLoader( - this.driverFactory, - this.logger, - this.queryCache, - this.preAggregations, - this.partitionPreAggregationDescription(range, buildRange), - this.preAggregationsTablesToTempTables, - this.loadCache, - this.options, - )); - const resolveResults = await Promise.all(partitionLoaders.map(async (l, i) => { - const result = await l.loadPreAggregation(false); - return result && { - ...result, - partitionRange: partitionRanges[i] - }; - })); - return { loadResults: resolveResults.filter(res => res !== null), partitionLoaders }; - }; - - // eslint-disable-next-line prefer-const - let loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges()); - if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) { - loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges(true)); - // In case there're no partitions ready at matched time dimension intersection then no data can be retrieved. - // We need to provide any table so query can just execute successfully. - if (loadResultAndLoaders.loadResults.length > 0) { - loadResultAndLoaders.loadResults = [loadResultAndLoaders.loadResults[loadResultAndLoaders.loadResults.length - 1]]; - } - } - if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) { - throw new Error( - // eslint-disable-next-line no-use-before-define - PreAggregations.noPreAggregationPartitionsBuiltMessage(loadResultAndLoaders.partitionLoaders.map(p => p.preAggregation)) - ); - } - - let { loadResults } = loadResultAndLoaders; - - let lambdaTable: InlineTable; - let emptyResult = false; - - if (this.preAggregation.rollupLambdaId) { - if (this.lambdaQuery && loadResults.length > 0) { - const { buildRangeEnd, targetTableName } = loadResults[loadResults.length - 1]; - const lambdaTypes = await this.loadCache.getTableColumnTypes(this.preAggregation, targetTableName); - lambdaTable = await this.downloadLambdaTable(buildRangeEnd, lambdaTypes); - } - const rollupLambdaResults = this.preAggregationsTablesToTempTables.filter(tempTableResult => tempTableResult[1].rollupLambdaId === this.preAggregation.rollupLambdaId); - const filteredResults = loadResults.filter( - r => (this.preAggregation.lastRollupLambda || reformatInIsoLocal(r.buildRangeEnd) === reformatInIsoLocal(r.partitionRange[1])) && - rollupLambdaResults.every(result => !result[1].buildRangeEnd || reformatInIsoLocal(result[1].buildRangeEnd) < reformatInIsoLocal(r.partitionRange[0])) - ); - if (filteredResults.length === 0) { - emptyResult = true; - loadResults = [loadResults[loadResults.length - 1]]; - } else { - loadResults = filteredResults; - } - } - - const allTableTargetNames = loadResults.map(targetTableName => targetTableName.targetTableName); - let lastUpdatedAt = getLastUpdatedAtTimestamp(loadResults.map(r => r.lastUpdatedAt)); - - if (lambdaTable) { - allTableTargetNames.push(lambdaTable.name); - lastUpdatedAt = Date.now(); - } - - const unionTargetTableName = allTableTargetNames - .map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`) - .join(' UNION ALL '); - return { - targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`, - refreshKeyValues: loadResults.map(t => t.refreshKeyValues), - lastUpdatedAt, - buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd, - lambdaTable, - rollupLambdaId: this.preAggregation.rollupLambdaId, - }; - } else { - return new PreAggregationLoader( - this.driverFactory, - this.logger, - this.queryCache, - this.preAggregations, - this.preAggregation, - this.preAggregationsTablesToTempTables, - this.loadCache, - this.options - ).loadPreAggregation(true); - } - } - - /** - * Downloads the lambda table from the source DB. - */ - private async downloadLambdaTable(fromDate: string, lambdaTypes: TableStructure): Promise { - const { sqlAndParams, cacheKeyQueries } = this.lambdaQuery; - const [query, params] = sqlAndParams; - const values = params.map((p) => { - if (p === FROM_PARTITION_RANGE) { - return fromDate; - } - if (p === MAX_SOURCE_ROW_LIMIT) { - return this.options.maxSourceRowLimit; - } - return p; - }); - const { data } = await this.queryCache.renewQuery( - query, - values, - cacheKeyQueries, - 60 * 60, - [query, values], - undefined, - { - requestId: this.requestId, - skipRefreshKeyWaitForRenew: false, - dataSource: this.dataSource, - external: false, - useCsvQuery: true, - lambdaTypes, - } - ); - if (data.rowCount === this.options.maxSourceRowLimit) { - throw new Error(`The maximum number of source rows ${this.options.maxSourceRowLimit} was reached for ${this.preAggregation.preAggregationId}`); - } - return { - name: `${LAMBDA_TABLE_PREFIX}_${this.preAggregation.tableName.replace('.', '_')}`, - columns: data.types, - csvRows: data.csvRows, - }; - } - - public async partitionPreAggregations(): Promise { - if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) { - const { buildRange, partitionRanges } = await this.partitionRanges(); - return this.compilerCacheFn(['partitions', JSON.stringify(buildRange)], () => partitionRanges.map(range => this.partitionPreAggregationDescription(range, buildRange))); - } else { - return [this.preAggregation]; - } - } - - private async partitionRanges(ignoreMatchedDateRange?: boolean): Promise { - const buildRange = await this.loadBuildRange(); - if (!buildRange[0] || !buildRange[1]) { - return { buildRange, partitionRanges: [] }; - } - let dateRange = PreAggregationPartitionRangeLoader.intersectDateRanges( - buildRange, - ignoreMatchedDateRange ? undefined : this.preAggregation.matchedTimeDimensionDateRange, - ); - if (!dateRange) { - // If there's no date range intersection between query data range and pre-aggregation build range - // use last partition so outer query can receive expected table structure. - dateRange = [buildRange[1], buildRange[1]]; - } - const partitionRanges = this.compilerCacheFn(['timeSeries', this.preAggregation.partitionGranularity, JSON.stringify(dateRange), `${this.preAggregation.timestampPrecision}`], () => PreAggregationPartitionRangeLoader.timeSeries( - this.preAggregation.partitionGranularity, - dateRange, - this.preAggregation.timestampPrecision - )); - if (partitionRanges.length > this.options.maxPartitions) { - throw new Error( - `Pre-aggregation '${this.preAggregation.tableName}' requested to build ${partitionRanges.length} partitions which exceeds the maximum number of partitions per pre-aggregation of ${this.options.maxPartitions}` - ); - } - return { buildRange: dateRange, partitionRanges }; - } - - public async loadBuildRange(): Promise { - const { preAggregationStartEndQueries } = this.preAggregation; - const [startDate, endDate] = await Promise.all( - preAggregationStartEndQueries.map( - async rangeQuery => PreAggregationPartitionRangeLoader.extractDate(await this.loadRangeQuery(rangeQuery)), - ), - ); - if (!this.preAggregation.partitionGranularity) { - return this.orNowIfEmpty([startDate, endDate]); - } - const wholeSeriesRanges = PreAggregationPartitionRangeLoader.timeSeries( - this.preAggregation.partitionGranularity, - this.orNowIfEmpty([startDate, endDate]), - this.preAggregation.timestampPrecision, - ); - const [rangeStart, rangeEnd] = await Promise.all( - preAggregationStartEndQueries.map( - async (rangeQuery, i) => PreAggregationPartitionRangeLoader.extractDate( - await this.loadRangeQuery( - rangeQuery, i === 0 ? wholeSeriesRanges[0] : wholeSeriesRanges[wholeSeriesRanges.length - 1], - ), - ), - ), - ); - return this.orNowIfEmpty([rangeStart, rangeEnd]); - } - - private now() { - return utcToLocalTimeZone(this.preAggregation.timezone, 'YYYY-MM-DDTHH:mm:ss.SSS', new Date().toJSON().substring(0, 23)); - } - - private orNowIfEmpty(dateRange: QueryDateRange): QueryDateRange { - if (!dateRange[0] && !dateRange[1]) { - const now = this.now(); - return [now, now]; - } - if (!dateRange[0]) { - return [dateRange[1], dateRange[1]]; - } - if (!dateRange[1]) { - return [dateRange[0], dateRange[0]]; - } - return dateRange; - } - - private static checkDataRangeType(range: QueryDateRange) { - if (!range) { - return; - } - - if (range.length !== 2) { - throw new Error(`Date range expected to be an array with 2 elements but ${range} found`); - } - - if (typeof range[0] !== 'string' || typeof range[1] !== 'string') { - throw new Error(`Date range expected to be a string array but ${range} found`); - } - - if ((range[0].length !== 23 && range[0].length !== 26) || (range[1].length !== 23 && range[0].length !== 26)) { - throw new Error(`Date range expected to be in YYYY-MM-DDTHH:mm:ss.SSS format but ${range} found`); - } - } - - public static intersectDateRanges(rangeA: QueryDateRange | null, rangeB: QueryDateRange | null): QueryDateRange { - PreAggregationPartitionRangeLoader.checkDataRangeType(rangeA); - PreAggregationPartitionRangeLoader.checkDataRangeType(rangeB); - if (!rangeB) { - return rangeA; - } - if (!rangeA) { - return rangeB; - } - const from = rangeA[0] > rangeB[0] ? rangeA[0] : rangeB[0]; - const to = rangeA[1] < rangeB[1] ? rangeA[1] : rangeB[1]; - if (from > to) { - return null; - } - return [ - from, - to, - ]; - } - - public static timeSeries(granularity: string, dateRange: QueryDateRange, timestampPrecision: number): QueryDateRange[] { - return timeSeries(granularity, dateRange, { - timestampPrecision - }); - } - - public static partitionTableName(tableName: string, partitionGranularity: string, dateRange: string[]) { - const partitionSuffix = dateRange[0].substring( - 0, - partitionGranularity === 'hour' ? 13 : 10 - ).replace(/[-T:]/g, ''); - return `${tableName}${partitionSuffix}`; - } - - public static inDbTimeZone(preAggregationDescription: any, timestamp: string): string { - return inDbTimeZone(preAggregationDescription.timezone, preAggregationDescription.timestampFormat, timestamp); - } - - public static extractDate(data: any): string { - return extractDate(data); - } - - public static FROM_PARTITION_RANGE = FROM_PARTITION_RANGE; - - public static TO_PARTITION_RANGE = TO_PARTITION_RANGE; -} - type PreAggregationsOptions = { maxPartitions: number; maxSourceRowLimit: number; From bef5950ca87d65b764e710bef4876332cb4bdb43 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 26 Feb 2025 12:39:44 +0200 Subject: [PATCH 10/14] code reformat --- .../cubejs-schema-compiler/src/adapter/BaseQuery.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index bcd7efb6f3907..9415f397af2c7 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -8,14 +8,21 @@ import R from 'ramda'; import cronParser from 'cron-parser'; - import moment from 'moment-timezone'; import inflection from 'inflection'; -import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv, timeSeries as timeSeriesBase } from '@cubejs-backend/shared'; +import { + FROM_PARTITION_RANGE, + MAX_SOURCE_ROW_LIMIT, + inDbTimeZone, + QueryAlias, + getEnv, + timeSeries as timeSeriesBase +} from '@cubejs-backend/shared'; import { buildSqlAndParams as nativeBuildSqlAndParams, } from '@cubejs-backend/native'; + import { UserError } from '../compiler/UserError'; import { BaseMeasure } from './BaseMeasure'; import { BaseDimension } from './BaseDimension'; From f1724d5a26595c1368a358e8c25e8666e4e87418 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 26 Feb 2025 13:27:11 +0200 Subject: [PATCH 11/14] better naming in utcToLocalTimeZone() --- packages/cubejs-backend-shared/src/time.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-backend-shared/src/time.ts b/packages/cubejs-backend-shared/src/time.ts index 8b239bc283bf9..f6277de9fae7b 100644 --- a/packages/cubejs-backend-shared/src/time.ts +++ b/packages/cubejs-backend-shared/src/time.ts @@ -236,11 +236,11 @@ export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, ti const parsedTime = Date.parse(`${timestamp}Z`); // TODO parsedTime might be incorrect offset for conversion const offset = zone.utcOffset(parsedTime); - const inDbTimeZoneDate = new Date(parsedTime - offset * 60 * 1000); + const localTimeZoneDate = new Date(parsedTime - offset * 60 * 1000); if (timestampFormat === 'YYYY-MM-DD[T]HH:mm:ss.SSS[Z]' || timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSSZ') { - return inDbTimeZoneDate.toJSON(); + return localTimeZoneDate.toJSON(); } else if (timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSS') { - return inDbTimeZoneDate.toJSON().replace('Z', ''); + return localTimeZoneDate.toJSON().replace('Z', ''); } } From 0bfa767e59efcc364b247429fb220979220aae68 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 26 Feb 2025 19:08:54 +0200 Subject: [PATCH 12/14] code polishment in PreAggregationLoadCache class --- .../orchestrator/PreAggregationLoadCache.ts | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts index 0c0d0737757f6..5a11b9569a696 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoadCache.ts @@ -17,18 +17,17 @@ type PreAggregationLoadCacheOptions = { }; export class PreAggregationLoadCache { - private driverFactory: DriverFactory; + private readonly driverFactory: DriverFactory; private queryCache: QueryCache; - // eslint-disable-next-line no-use-before-define private preAggregations: PreAggregations; - private queryResults: any; + private readonly queryResults: any; - private externalDriverFactory: any; + private readonly externalDriverFactory: any; - private requestId: any; + private readonly requestId: any; private versionEntries: { [redisKey: string]: Promise }; @@ -41,9 +40,9 @@ export class PreAggregationLoadCache { // Make it per data source key in case load cache scope is broaden. private queryStageState: any; - private dataSource: string; + private readonly dataSource: string; - private tablePrefixes: string[] | null; + private readonly tablePrefixes: string[] | null; public constructor( clientFactory: DriverFactory, @@ -64,7 +63,7 @@ export class PreAggregationLoadCache { this.tableColumnTypes = {}; } - protected async tablesFromCache(preAggregation, forceRenew?) { + protected async tablesFromCache(preAggregation, forceRenew: boolean = false) { let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation)); if (!tables) { tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue( @@ -145,14 +144,11 @@ export class PreAggregationLoadCache { ); // It presumes strong consistency guarantees for external pre-aggregation tables ingestion if (!preAggregation.external) { - // eslint-disable-next-line - const [active, toProcess, queries] = await this.fetchQueryStageState(); + const [,, queries] = await this.fetchQueryStageState(); const targetTableNamesInQueue = (Object.keys(queries)) - // eslint-disable-next-line no-use-before-define .map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry)); versionEntries = versionEntries.filter( - // eslint-disable-next-line no-use-before-define e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1 ); } From 4a4db22970e3740bfa2cc168186b869f7a657527 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 26 Feb 2025 19:16:24 +0200 Subject: [PATCH 13/14] a bit of code polishment in PreAggregationLoader class --- .../src/orchestrator/PreAggregationLoader.ts | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts index 191694c1f1d63..f559998a568e9 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -17,10 +17,12 @@ import { ContinueWaitError } from './ContinueWaitError'; import { LargeStreamWarning } from './StreamObjectsCounter'; import { getStructureVersion, - InvalidationKeys, LoadPreAggregationResult, + InvalidationKeys, + LoadPreAggregationResult, PreAggregations, tablesToVersionEntries, - version, VersionEntriesObj, + version, + VersionEntriesObj, VersionEntry } from './PreAggregations'; import { PreAggregationLoadCache } from './PreAggregationLoadCache'; @@ -56,23 +58,23 @@ export class PreAggregationLoader { * (initialized by the /cubejs-system/v1/pre-aggregations/jobs endpoint) or * not. */ - private isJob: boolean; + private readonly isJob: boolean; - private waitForRenew: boolean; + private readonly waitForRenew: boolean; - private forceBuild: boolean; + private readonly forceBuild: boolean; - private orphanedTimeout: number; + private readonly orphanedTimeout: number; - private externalDriverFactory: DriverFactory; + private readonly externalDriverFactory: DriverFactory; - private requestId: string; + private readonly requestId: string; - private metadata: any; + private readonly metadata: any; - private structureVersionPersistTime: any; + private readonly structureVersionPersistTime: any; - private externalRefresh: boolean; + private readonly externalRefresh: boolean; public constructor( private readonly driverFactory: DriverFactory, @@ -242,7 +244,7 @@ export class PreAggregationLoader { versionEntries.byStructure[`${this.preAggregation.tableName}_${structureVersion}`] || versionEntries.byTableName[this.preAggregation.tableName]; - const newVersionEntry = { + const newVersionEntry: VersionEntry = { table_name: this.preAggregation.tableName, structure_version: structureVersion, content_version: contentVersion, @@ -355,7 +357,7 @@ export class PreAggregationLoader { }); } - protected contentVersion(invalidationKeys) { + protected contentVersion(invalidationKeys: InvalidationKeys) { const versionArray = [this.preAggregation.structureVersionLoadSql || this.preAggregation.loadSql]; if (this.preAggregation.indexesSql && this.preAggregation.indexesSql.length) { versionArray.push(this.preAggregation.indexesSql); @@ -394,7 +396,7 @@ export class PreAggregationLoader { } } - protected scheduleRefresh(invalidationKeys, newVersionEntry) { + protected scheduleRefresh(invalidationKeys: InvalidationKeys, newVersionEntry: VersionEntry) { this.logger('Refreshing pre-aggregation content', { preAggregation: this.preAggregation, requestId: this.requestId, @@ -411,7 +413,7 @@ export class PreAggregationLoader { }); } - protected async executeInQueue(invalidationKeys, priority, newVersionEntry) { + protected async executeInQueue(invalidationKeys: InvalidationKeys, priority: number, newVersionEntry: VersionEntry) { const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource); return queue.executeInQueue( 'query', From 626e41f9149db8622247a2a656f60445cd372291 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Thu, 27 Feb 2025 16:02:06 +0200 Subject: [PATCH 14/14] attempt to fix CacheKey type --- .../src/orchestrator/QueryCache.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 63baa253cae4d..087f401106d7b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -100,11 +100,12 @@ export type PreAggTableToTempTable = [ TempTable, ]; -export type CacheKey = Array< - | string - | string[] - | QueryTuple ->; +export type CacheKeyItem = string | string[] | QueryTuple | QueryTuple[] | undefined; + +export type CacheKey = + [CacheKeyItem, CacheKeyItem] | + [CacheKeyItem, CacheKeyItem, CacheKeyItem] | + [CacheKeyItem, CacheKeyItem, CacheKeyItem, CacheKeyItem]; type CacheEntry = { time: number; @@ -373,7 +374,7 @@ export class QueryCache { } public static queryCacheKey(queryBody: QueryBody): CacheKey { - const key = [ + const key: CacheKey = [ queryBody.query, queryBody.values, (queryBody.preAggregations || []).map(p => p.loadSql) @@ -383,7 +384,7 @@ export class QueryCache { } // @ts-ignore key.persistent = queryBody.persistent; - return key; + return key; } protected static replaceAll(replaceThis, withThis, inThis) {