Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema-compiler): Move transpiling to worker threads (under the flag) #9188

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ jobs:
# Current docker version + next LTS
node-version: [20.x, 22.x]
python-version: [3.11]
transpile-worker-threads: [false, true]
fail-fast: false

env:
CUBEJS_TRANSPILATION_WORKER_THREADS: ${{ matrix.transpile-worker-threads }}
steps:
- id: get-tag-out
run: echo "$OUT"
Expand Down
6 changes: 6 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ const variables: Record<string, (...args: any) => any> = {
nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR')
.default('false')
.asBoolStrict(),
transpilationWorkerThreads: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS')
.default('false')
.asBoolStrict(),
transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT')
.default('0')
.asInt(),

/** ****************************************************************
* Common db options *
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-schema-compiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"node-dijkstra": "^2.5.0",
"ramda": "^0.27.2",
"syntax-error": "^1.3.0",
"uuid": "^8.3.2"
"uuid": "^8.3.2",
"workerpool": "^9.2.0"
},
"devDependencies": {
"@clickhouse/client": "^1.7.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-schema-compiler/src/compiler/CubeSymbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { camelizeCube } from './utils';
import { BaseQuery } from '../adapter';

const FunctionRegex = /function\s+\w+\(([A-Za-z0-9_,]*)|\(([\s\S]*?)\)\s*=>|\(?(\w+)\)?\s*=>/;
const CONTEXT_SYMBOLS = {
export const CONTEXT_SYMBOLS = {
SECURITY_CONTEXT: 'securityContext',
// SECURITY_CONTEXT has been deprecated, however security_context (lowecase)
// is allowed in RBAC policies for query-time attribute matching
Expand All @@ -19,7 +19,7 @@ const CONTEXT_SYMBOLS = {
SQL_UTILS: 'sqlUtils'
};

const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE'];
export const CURRENT_CUBE_CONSTANTS = ['CUBE', 'TABLE'];

export class CubeSymbols {
constructor(evaluateViews) {
Expand Down
101 changes: 78 additions & 23 deletions packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { parse } from '@babel/parser';
import babelGenerator from '@babel/generator';
import babelTraverse from '@babel/traverse';
import R from 'ramda';
import workerpool from 'workerpool';

import { isNativeSupported } from '@cubejs-backend/shared';
import { getEnv, isNativeSupported } from '@cubejs-backend/shared';
import { UserError } from './UserError';
import { ErrorReporter } from './ErrorReporter';

Expand All @@ -28,6 +29,8 @@ export class DataSchemaCompiler {
this.viewCompilationGate = options.viewCompilationGate;
this.cubeNameCompilers = options.cubeNameCompilers || [];
this.extensions = options.extensions || {};
this.cubeDictionary = options.cubeDictionary;
this.cubeSymbols = options.cubeSymbols;
this.cubeFactory = options.cubeFactory;
this.filesToCompile = options.filesToCompile;
this.omitErrors = options.omitErrors;
Expand All @@ -40,6 +43,7 @@ export class DataSchemaCompiler {
this.yamlCompiler = options.yamlCompiler;
this.yamlCompiler.dataSchemaCompiler = this;
this.pythonContext = null;
this.workerPool = null;
}

compileObjects(compileServices, objects, errorsReport) {
Expand Down Expand Up @@ -89,10 +93,40 @@ export class DataSchemaCompiler {
const errorsReport = new ErrorReporter(null, [], this.errorReport);
this.errorsReport = errorsReport;

// TODO: required in order to get pre transpile compilation work
const transpile = () => toCompile.map(f => this.transpileFile(f, errorsReport)).filter(f => !!f);
if (getEnv('transpilationWorkerThreads')) {
const wc = getEnv('transpilationWorkerThreadsCount');
this.workerPool = workerpool.pool(
path.join(__dirname, 'transpilers/transpiler_worker'),
wc > 0 ? { maxWorkers: wc } : undefined,
);
}

const transpile = async () => {
let cubeNames;
let cubeSymbolsNames;

if (getEnv('transpilationWorkerThreads')) {
cubeNames = Object.keys(this.cubeDictionary.byId);
// We need only cubes and all its member names for transpiling.
// Cubes doesn't change during transpiling, but are changed during compilation phase,
// so we can prepare them once for every phase.
// Communication between main and worker threads uses
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
cubeSymbolsNames = Object.fromEntries(
Object.entries(this.cubeSymbols.symbols)
.map(
([key, value]) => [key, Object.fromEntries(
Object.keys(value).map((k) => [k, true]),
)],
),
);
}
const results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbolsNames })));
return results.filter(f => !!f);
};

const compilePhase = (compilers) => this.compileCubeFiles(compilers, transpile(), errorsReport);
const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport);

return compilePhase({ cubeCompilers: this.cubeNameCompilers })
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }))
Expand All @@ -102,7 +136,12 @@ export class DataSchemaCompiler {
.then(() => compilePhase({
cubeCompilers: this.cubeCompilers,
contextCompilers: this.contextCompilers,
}));
}))
.then(() => {
if (this.workerPool) {
this.workerPool.terminate();
}
});
}

compile() {
Expand All @@ -118,7 +157,7 @@ export class DataSchemaCompiler {
return this.compilePromise;
}

transpileFile(file, errorsReport) {
async transpileFile(file, errorsReport, options) {
if (R.endsWith('.jinja', file.fileName) ||
(R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName))
// TODO do Jinja syntax check with jinja compiler
Expand All @@ -137,31 +176,47 @@ export class DataSchemaCompiler {
} else if (R.endsWith('.yml', file.fileName) || R.endsWith('.yaml', file.fileName)) {
return file;
} else if (R.endsWith('.js', file.fileName)) {
return this.transpileJsFile(file, errorsReport);
return this.transpileJsFile(file, errorsReport, options);
} else {
return file;
}
}

transpileJsFile(file, errorsReport) {
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbolsNames }) {
try {
const ast = parse(
file.content,
{
sourceFilename: file.fileName,
sourceType: 'module',
plugins: ['objectRestSpread']
},
);
if (getEnv('transpilationWorkerThreads')) {
const data = {
fileName: file.fileName,
content: file.content,
transpilers: this.transpilers.map(t => t.constructor.name),
cubeNames,
cubeSymbolsNames,
};

const res = await this.workerPool.exec('transpile', [data]);
errorsReport.addErrors(res.errors);
errorsReport.addWarnings(res.warnings);

return Object.assign({}, file, { content: res.content });
} else {
const ast = parse(
file.content,
{
sourceFilename: file.fileName,
sourceType: 'module',
plugins: ['objectRestSpread'],
},
);

this.transpilers.forEach((t) => {
errorsReport.inFile(file);
babelTraverse(ast, t.traverseObject(errorsReport));
errorsReport.exitFile();
});
this.transpilers.forEach((t) => {
errorsReport.inFile(file);
babelTraverse(ast, t.traverseObject(errorsReport));
errorsReport.exitFile();
});

const content = babelGenerator(ast, {}, file.content).code;
return Object.assign({}, file, { content });
const content = babelGenerator(ast, {}, file.content).code;
return Object.assign({}, file, { content });
}
} catch (e) {
if (e.toString().indexOf('SyntaxError') !== -1) {
const line = file.content.split('\n')[e.loc.line - 1];
Expand Down
16 changes: 16 additions & 0 deletions packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ export class ErrorReporter {
}
}

public getErrors() {
return this.rootReporter().errors;
}

public addErrors(errors: CompilerErrorInterface[]) {
this.rootReporter().errors.push(...errors);
}

public getWarnings() {
return this.rootReporter().warnings;
}

public addWarnings(warnings: SyntaxErrorInterface[]) {
this.rootReporter().warnings.push(...warnings);
}

protected rootReporter(): ErrorReporter {
return this.parent ? this.parent.rootReporter() : this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareComp
contextCompilers: [contextEvaluator],
cubeFactory: cubeSymbols.createCube.bind(cubeSymbols),
compilerCache,
cubeDictionary,
cubeSymbols,
extensions: {
Funnels,
RefreshKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as t from '@babel/types';
import R from 'ramda';

import type { NodePath } from '@babel/traverse';
import type { TranspilerInterface, TraverseObject } from './transpiler.interface';
import {
TranspilerCubeResolver,
TranspilerInterface,
TranspilerSymbolResolver,
TraverseObject
} from './transpiler.interface';
import type { CubeSymbols } from '../CubeSymbols';
import type { CubeDictionary } from '../CubeDictionary';

Expand Down Expand Up @@ -39,9 +44,9 @@ transpiledFieldsPatterns?.forEach((r) => {

export class CubePropContextTranspiler implements TranspilerInterface {
public constructor(
protected readonly cubeSymbols: CubeSymbols,
protected readonly cubeDictionary: CubeDictionary,
protected readonly viewCompiler: CubeSymbols,
protected readonly cubeSymbols: TranspilerSymbolResolver,
protected readonly cubeDictionary: TranspilerCubeResolver,
protected readonly viewCompiler: TranspilerSymbolResolver,
) {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { TranspilerCubeResolver } from './transpiler.interface';

export class LightweightNodeCubeDictionary implements TranspilerCubeResolver {
public constructor(private cubeNames: string[] = []) {
}

public resolveCube(name: string): boolean {
return this.cubeNames.includes(name);
}

public setCubeNames(cubeNames: string[]): void {
this.cubeNames = cubeNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { TranspilerSymbolResolver } from './transpiler.interface';
import { CONTEXT_SYMBOLS, CURRENT_CUBE_CONSTANTS } from '../CubeSymbols';

type CubeSymbols = Record<string, Record<string, boolean>>;

export class LightweightSymbolResolver implements TranspilerSymbolResolver {
public constructor(private symbols: CubeSymbols = {}) {
}

public setSymbols(symbols: CubeSymbols) {
this.symbols = symbols;
}

public isCurrentCube(name): boolean {
return CURRENT_CUBE_CONSTANTS.indexOf(name) >= 0;
}

public resolveSymbol(cubeName, name): any {
if (name === 'USER_CONTEXT') {
throw new Error('Support for USER_CONTEXT was removed, please migrate to SECURITY_CONTEXT.');
}

if (CONTEXT_SYMBOLS[name]) {
return true;
}

const cube = this.symbols[this.isCurrentCube(name) ? cubeName : name];
return cube || (this.symbols[cubeName] && this.symbols[cubeName][name]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ export type TraverseObject = TraverseOptions;
export interface TranspilerInterface {
traverseObject(reporter: ErrorReporter): TraverseObject;
}

export interface TranspilerSymbolResolver {
resolveSymbol(cubeName, name): any;
isCurrentCube(name): boolean;
}

export interface TranspilerCubeResolver {
resolveCube(name): boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import workerpool from 'workerpool';
import { parse } from '@babel/parser';
import babelGenerator from '@babel/generator';
import babelTraverse from '@babel/traverse';

import { ValidationTranspiler } from './ValidationTranspiler';
import { ImportExportTranspiler } from './ImportExportTranspiler';
import { CubeCheckDuplicatePropTranspiler } from './CubeCheckDuplicatePropTranspiler';
import { CubePropContextTranspiler } from './CubePropContextTranspiler';
import { ErrorReporter } from '../ErrorReporter';
import { LightweightSymbolResolver } from './LightweightSymbolResolver';
import { LightweightNodeCubeDictionary } from './LightweightNodeCubeDictionary';

type TransferContent = {
fileName: string;
content: string;
transpilers: string[];
cubeNames: string[];
cubeSymbolsNames: Record<string, Record<string, boolean>>;
};

const cubeDictionary = new LightweightNodeCubeDictionary();
const cubeSymbols = new LightweightSymbolResolver();
const errorsReport = new ErrorReporter(null, []);

const transpilers = {
ValidationTranspiler: new ValidationTranspiler(),
ImportExportTranspiler: new ImportExportTranspiler(),
CubeCheckDuplicatePropTranspiler: new CubeCheckDuplicatePropTranspiler(),
CubePropContextTranspiler: new CubePropContextTranspiler(cubeSymbols, cubeDictionary, cubeSymbols),
};

const transpile = (data: TransferContent) => {
cubeDictionary.setCubeNames(data.cubeNames);
cubeSymbols.setSymbols(data.cubeSymbolsNames);

const ast = parse(
data.content,
{
sourceFilename: data.fileName,
sourceType: 'module',
plugins: ['objectRestSpread']
},
);

data.transpilers.forEach(transpilerName => {
if (transpilers[transpilerName]) {
errorsReport.inFile(data);
babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport));
errorsReport.exitFile();
} else {
throw new Error(`Transpiler ${transpilerName} not supported`);
}
});

const content = babelGenerator(ast, {}, data.content).code;

return {
content,
errors: errorsReport.getErrors(),
warnings: errorsReport.getWarnings()
};
};

workerpool.worker({
transpile,
});
Loading
Loading