Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query-orchestrator): Extract Pre-aggregation-related classes to separate files #9274

Merged
merged 14 commits into from
Feb 27, 2025
Merged
2 changes: 1 addition & 1 deletion packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export interface CancelableIntervalOptions {
}

/**
* It's helps to create an interval that can be canceled with awaiting latest execution
* It helps to create an interval that can be canceled with awaiting latest execution
*/
export function createCancelableInterval<T>(
fn: (token: CancelToken) => Promise<T>,
Expand Down
16 changes: 8 additions & 8 deletions packages/cubejs-backend-shared/src/time.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { DateRange, extendMoment } from 'moment-range';
import { unitOfTime } from 'moment-timezone';
import type { unitOfTime } from 'moment-timezone';
import type { DateRange } from 'moment-range';
import Moment from 'moment-timezone';
import { extendMoment } from 'moment-range';

const Moment = require('moment-timezone');

const moment = extendMoment(Moment);
const moment = extendMoment(Moment as any);

export type QueryDateRange = [string, string];
type SqlInterval = string;
Expand Down Expand Up @@ -236,11 +236,11 @@ export const utcToLocalTimeZone = (timezone: string, timestampFormat: string, ti
const parsedTime = Date.parse(`${timestamp}Z`);
// TODO parsedTime might be incorrect offset for conversion
const offset = zone.utcOffset(parsedTime);
const inDbTimeZoneDate = new Date(parsedTime - offset * 60 * 1000);
const localTimeZoneDate = new Date(parsedTime - offset * 60 * 1000);
if (timestampFormat === 'YYYY-MM-DD[T]HH:mm:ss.SSS[Z]' || timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSSZ') {
return inDbTimeZoneDate.toJSON();
return localTimeZoneDate.toJSON();
} else if (timestampFormat === 'YYYY-MM-DDTHH:mm:ss.SSS') {
return inDbTimeZoneDate.toJSON().replace('Z', '');
return localTimeZoneDate.toJSON().replace('Z', '');
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import { TableStructure } from '@cubejs-backend/base-driver';
import { DriverFactory } from './DriverFactory';
import { QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import {
PreAggregationDescription,
PreAggregations,
TableCacheEntry,
tablesToVersionEntries,
VersionEntriesObj,
VersionEntry
} from './PreAggregations';

type PreAggregationLoadCacheOptions = {
requestId?: string,
dataSource: string,
tablePrefixes?: string[],
};

export class PreAggregationLoadCache {
private readonly driverFactory: DriverFactory;

private queryCache: QueryCache;

private preAggregations: PreAggregations;

private readonly queryResults: any;

private readonly externalDriverFactory: any;

private readonly requestId: any;

private versionEntries: { [redisKey: string]: Promise<VersionEntriesObj> };

private tables: { [redisKey: string]: TableCacheEntry[] };

private tableColumnTypes: { [cacheKey: string]: { [tableName: string]: TableStructure } };

// TODO this is in memory cache structure as well however it depends on
// data source only and load cache is per data source for now.
// Make it per data source key in case load cache scope is broaden.
private queryStageState: any;

private readonly dataSource: string;

private readonly tablePrefixes: string[] | null;

public constructor(
clientFactory: DriverFactory,
queryCache,
preAggregations,
options: PreAggregationLoadCacheOptions = { dataSource: 'default' }
) {
this.dataSource = options.dataSource;
this.driverFactory = clientFactory;
this.queryCache = queryCache;
this.preAggregations = preAggregations;
this.queryResults = {};
this.externalDriverFactory = preAggregations.externalDriverFactory;
this.requestId = options.requestId;
this.tablePrefixes = options.tablePrefixes;
this.versionEntries = {};
this.tables = {};
this.tableColumnTypes = {};
}

protected async tablesFromCache(preAggregation, forceRenew: boolean = false) {
let tables = forceRenew ? null : await this.queryCache.getCacheDriver().get(this.tablesCachePrefixKey(preAggregation));
if (!tables) {
tables = await this.preAggregations.getLoadCacheQueue(this.dataSource).executeInQueue(
'query',
`Fetch tables for ${preAggregation.preAggregationsSchema}`,
{
preAggregation, requestId: this.requestId
},
0,
{ requestId: this.requestId }
);
}
return tables;
}

public async fetchTables(preAggregation: PreAggregationDescription) {
if (preAggregation.external && !this.externalDriverFactory) {
throw new Error('externalDriverFactory is not provided. Please use CUBEJS_DEV_MODE=true or provide Cube Store connection env variables for production usage.');
}

const newTables = await this.fetchTablesNoCache(preAggregation);
await this.queryCache.getCacheDriver().set(
this.tablesCachePrefixKey(preAggregation),
newTables,
this.preAggregations.options.preAggregationsSchemaCacheExpire || 60 * 60
);
return newTables;
}

private async fetchTablesNoCache(preAggregation: PreAggregationDescription) {
const client = preAggregation.external ?
await this.externalDriverFactory() :
await this.driverFactory();
if (this.tablePrefixes && client.getPrefixTablesQuery && this.preAggregations.options.skipExternalCacheAndQueue) {
return client.getPrefixTablesQuery(preAggregation.preAggregationsSchema, this.tablePrefixes);
}
return client.getTablesQuery(preAggregation.preAggregationsSchema);
}

public tablesCachePrefixKey(preAggregation: PreAggregationDescription) {
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES', `${preAggregation.dataSource}${preAggregation.preAggregationsSchema}${preAggregation.external ? '_EXT' : ''}`);
}

protected async getTablesQuery(preAggregation) {
const redisKey = this.tablesCachePrefixKey(preAggregation);
if (!this.tables[redisKey]) {
const tables = this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external ?
await this.fetchTablesNoCache(preAggregation) :
await this.tablesFromCache(preAggregation);
if (tables === undefined) {
throw new Error('Pre-aggregation tables are undefined.');
}
this.tables[redisKey] = tables;
}
return this.tables[redisKey];
}

public async getTableColumnTypes(preAggregation: PreAggregationDescription, tableName: string): Promise<TableStructure> {
const prefixKey = this.tablesCachePrefixKey(preAggregation);
if (!this.tableColumnTypes[prefixKey]?.[tableName]) {
if (!this.preAggregations.options.skipExternalCacheAndQueue && preAggregation.external) {
throw new Error(`Lambda union with source data feature is supported only by external rollups stored in Cube Store but was invoked for '${preAggregation.preAggregationId}'`);
}
const client = await this.externalDriverFactory();
const columnTypes = await client.tableColumnTypes(tableName);
if (!this.tableColumnTypes[prefixKey]) {
this.tableColumnTypes[prefixKey] = {};
}
this.tableColumnTypes[prefixKey][tableName] = columnTypes;
}
return this.tableColumnTypes[prefixKey][tableName];
}

private async calculateVersionEntries(preAggregation): Promise<VersionEntriesObj> {
let versionEntries = tablesToVersionEntries(
preAggregation.preAggregationsSchema,
await this.getTablesQuery(preAggregation)
);
// It presumes strong consistency guarantees for external pre-aggregation tables ingestion
if (!preAggregation.external) {
const [,, queries] = await this.fetchQueryStageState();
const targetTableNamesInQueue = (Object.keys(queries))
.map(q => PreAggregations.targetTableName(queries[q].query.newVersionEntry));

versionEntries = versionEntries.filter(
e => targetTableNamesInQueue.indexOf(PreAggregations.targetTableName(e)) === -1
);
}

const byContent: { [key: string]: VersionEntry } = {};
const byStructure: { [key: string]: VersionEntry } = {};
const byTableName: { [key: string]: VersionEntry } = {};

versionEntries.forEach(e => {
const contentKey = `${e.table_name}_${e.content_version}`;
if (!byContent[contentKey]) {
byContent[contentKey] = e;
}
const structureKey = `${e.table_name}_${e.structure_version}`;
if (!byStructure[structureKey]) {
byStructure[structureKey] = e;
}
if (!byTableName[e.table_name]) {
byTableName[e.table_name] = e;
}
});

return { versionEntries, byContent, byStructure, byTableName };
}

public async getVersionEntries(preAggregation): Promise<VersionEntriesObj> {
if (this.tablePrefixes && !this.tablePrefixes.find(p => preAggregation.tableName.split('.')[1].startsWith(p))) {
throw new Error(`Load cache tries to load table ${preAggregation.tableName} outside of tablePrefixes filter: ${this.tablePrefixes.join(', ')}`);
}
const redisKey = this.tablesCachePrefixKey(preAggregation);
if (!this.versionEntries[redisKey]) {
this.versionEntries[redisKey] = this.calculateVersionEntries(preAggregation).catch(e => {
delete this.versionEntries[redisKey];
throw e;
});
}
return this.versionEntries[redisKey];
}

public async keyQueryResult(sqlQuery: QueryWithParams, waitForRenew: boolean, priority: number) {
const [query, values, queryOptions]: QueryTuple = Array.isArray(sqlQuery) ? sqlQuery : [sqlQuery, [], {}];

if (!this.queryResults[this.queryCache.queryRedisKey([query, values])]) {
this.queryResults[this.queryCache.queryRedisKey([query, values])] = await this.queryCache.cacheQueryResult(
query,
<string[]>values,
[query, <string[]>values],
60 * 60,
{
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold
|| queryOptions?.renewalThreshold || 2 * 60,
renewalKey: [query, values],
waitForRenew,
priority,
requestId: this.requestId,
dataSource: this.dataSource,
useInMemory: true,
external: queryOptions?.external
}
);
}
return this.queryResults[this.queryCache.queryRedisKey([query, values])];
}

public hasKeyQueryResult(keyQuery) {
return !!this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
}

public async getQueryStage(stageQueryKey) {
const queue = await this.preAggregations.getQueue(this.dataSource);
await this.fetchQueryStageState(queue);
return queue.getQueryStage(stageQueryKey, undefined, this.queryStageState);
}

protected async fetchQueryStageState(queue?) {
queue = queue || await this.preAggregations.getQueue(this.dataSource);
if (!this.queryStageState) {
this.queryStageState = await queue.fetchQueryStageState();
}
return this.queryStageState;
}

public async reset(preAggregation) {
await this.tablesFromCache(preAggregation, true);
this.tables = {};
this.tableColumnTypes = {};
this.queryStageState = undefined;
this.versionEntries = {};
}
}
Loading
Loading