Skip to content

Commit

Permalink
fix: Merge streaming methods to one interface to allow SQL API use ba…
Browse files Browse the repository at this point in the history
…tching and Databricks batching implementation (#7695)

* fix: Merge streaming methods to one interface to allow SQL API use standard batching used for pre-aggregations

* Fix tests

* Try mssql without incremental schema loading

* Move incremental schema loading test inside main full suite

* Protection from not matched JDBC types

* Fix linter

* Fix linter

* Missing return on error

* Add missing rolling window snapshots

* Test out different startup message for MSSQL

* Databricks batching tests

* Databricks batching tests

* Linter

* Update snapshots

* Move bigdecimal type handling to Cube Store Driver
  • Loading branch information
paveltiunov authored Jan 28, 2024
1 parent e21c8da commit 73ad72d
Show file tree
Hide file tree
Showing 20 changed files with 8,173 additions and 169 deletions.
1 change: 1 addition & 0 deletions .github/workflows/drivers-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
- bigquery
- clickhouse
- databricks-jdbc
- databricks-jdbc-export-bucket
- mssql
- mysql
- postgres
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const GenericTypeToCubeStore: Record<string, string> = {
// Cube Store uses an old version of sql parser which doesn't support timestamp with custom precision, but
// athena driver (I believe old version) allowed to use it
'timestamp(3)': 'timestamp',
// TODO comes from JDBC. We might consider decimal96 here
bigdecimal: 'decimal'
};

type Column = {
Expand Down
51 changes: 31 additions & 20 deletions packages/cubejs-jdbc-driver/src/JDBCDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import {
assertDataSource,
CancelablePromise,
} from '@cubejs-backend/shared';
import { BaseDriver } from '@cubejs-backend/base-driver';
import {
BaseDriver,
DownloadQueryResultsOptions,
DownloadQueryResultsResult,
StreamOptions,
} from '@cubejs-backend/base-driver';
import * as SqlString from 'sqlstring';
import { promisify } from 'util';
import genericPool, { Factory, Pool } from 'generic-pool';
Expand Down Expand Up @@ -261,7 +266,7 @@ export class JDBCDriver extends BaseDriver {
}
}

public async streamQuery(sql: string, values: string[]): Promise<Readable> {
public async stream(sql: string, values: unknown[], { highWaterMark }: StreamOptions): Promise<DownloadQueryResultsResult> {
const conn = await this.pool.acquire();
const query = applyParams(sql, values);
const cancelObj: {cancel?: Function} = {};
Expand All @@ -275,7 +280,7 @@ export class JDBCDriver extends BaseDriver {

const executeQuery = promisify(statement.execute.bind(statement));
const resultSet = await executeQuery(query);
return new Promise((resolve, reject) => {
return (await new Promise((resolve, reject) => {
resultSet.toObjectIter(
(
err: unknown,
Expand All @@ -285,25 +290,24 @@ export class JDBCDriver extends BaseDriver {
rows: { next: nextFn },
},
) => {
if (err) reject(err);
const rowsStream = new QueryStream(res.rows.next);
let connectionReleased = false;
const cleanup = (e?: Error) => {
if (!connectionReleased) {
this.pool.release(conn);
connectionReleased = true;
}
if (!rowsStream.destroyed) {
rowsStream.destroy(e);
}
};
rowsStream.once('end', cleanup);
rowsStream.once('error', cleanup);
rowsStream.once('close', cleanup);
resolve(rowsStream);
if (err) {
reject(err);
return;
}
const rowStream = new QueryStream(res.rows.next, highWaterMark);
resolve({
rowStream,
release: () => this.pool.release(conn),
types: res.types.map(
(t, i) => ({
name: res.labels[i],
type: this.toGenericType(((t === -5 ? 'bigint' : resultSet._types[t]) || 'string').toLowerCase())
})
)
});
}
);
});
}));
} catch (ex: any) {
await this.pool.release(conn);
if (ex.cause) {
Expand All @@ -314,6 +318,13 @@ export class JDBCDriver extends BaseDriver {
}
}

public async downloadQueryResults(query: string, values: unknown[], options: DownloadQueryResultsOptions): Promise<DownloadQueryResultsResult> {
if (options.streamImport) {
return this.stream(query, values, options);
}
return super.downloadQueryResults(query, values, options);
}

protected async executeStatement(conn: any, query: any, cancelObj?: any) {
const createStatementAsync = promisify(conn.createStatement.bind(conn));
const statement = await createStatementAsync();
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-jdbc-driver/src/QueryStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ export class QueryStream extends Readable {
/**
* @constructor
*/
public constructor(nextFn: nextFn) {
public constructor(nextFn: nextFn, highWaterMark: number) {
super({
objectMode: true,
highWaterMark: getEnv('dbQueryStreamHighWaterMark'),
highWaterMark,
});
this.next = nextFn;
}
Expand Down
25 changes: 0 additions & 25 deletions packages/cubejs-postgres-driver/src/PostgresDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,31 +256,6 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
});
}

public async streamQuery(sql: string, values: string[]): Promise<QueryStream> {
const conn = await this.pool.connect();
try {
await this.prepareConnection(conn);
const query: QueryStream = new QueryStream(sql, values, {
types: { getTypeParser: this.getTypeParser },
highWaterMark: getEnv('dbQueryStreamHighWaterMark'),
});
const rowsStream: QueryStream = await conn.query(query);
const cleanup = (err?: Error) => {
if (!rowsStream.destroyed) {
conn.release();
rowsStream.destroy(err);
}
};
rowsStream.once('end', cleanup);
rowsStream.once('error', cleanup);
rowsStream.once('close', cleanup);
return rowsStream;
} catch (e) {
await conn.release();
throw e;
}
}

public async stream(
query: string,
values: unknown[],
Expand Down
31 changes: 19 additions & 12 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ import crypto from 'crypto';
import csvWriter from 'csv-write-stream';
import LRUCache from 'lru-cache';
import { pipeline } from 'stream';
import { MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
import { getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import { BaseDriver, InlineTables, CacheDriverInterface, TableStructure } from '@cubejs-backend/base-driver';
import {
BaseDriver,
InlineTables,
CacheDriverInterface,
TableStructure,
DriverInterface,
} from '@cubejs-backend/base-driver';

import { QueryQueue } from './QueryQueue';
import { ContinueWaitError } from './ContinueWaitError';
Expand Down Expand Up @@ -605,17 +611,18 @@ export class QueryCache {
let logged = false;
Promise
.all([clientFactory()])
// TODO use stream method instead
.then(([client]) => client.streamQuery(req.query, req.values))
.then(([client]) => (<DriverInterface>client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') }))
.then((source) => {
const cleanup = (error) => {
if (error && !source.destroyed) {
source.destroy(error);
const cleanup = async (error) => {
if (source.release) {
const toRelease = source.release;
delete source.release;
await toRelease();
}
if (error && !target.destroyed) {
target.destroy(error);
}
if (!logged && source.destroyed && target.destroyed) {
if (!logged && target.destroyed) {
logged = true;
if (error) {
queue.logger('Streaming done with error', {
Expand All @@ -633,15 +640,15 @@ export class QueryCache {
}
};

source.once('end', () => cleanup(undefined));
source.once('error', cleanup);
source.once('close', () => cleanup(undefined));
source.rowStream.once('end', () => cleanup(undefined));
source.rowStream.once('error', cleanup);
source.rowStream.once('close', () => cleanup(undefined));

target.once('end', () => cleanup(undefined));
target.once('error', cleanup);
target.once('close', () => cleanup(undefined));

source.pipe(target);
source.rowStream.pipe(target);
})
.catch((reason) => {
target.emit('error', reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class MockDriver {
return {};
}

async streamQuery(sql) {
return Readable.from((await this.query(sql)).map(r => (typeof r === 'string' ? { query: r } : r)));
async stream(sql) {
return {
rowStream: Readable.from((await this.query(sql)).map(r => (typeof r === 'string' ? { query: r } : r)))
};
}
}

Expand Down
80 changes: 60 additions & 20 deletions packages/cubejs-testing-drivers/fixtures/databricks-jdbc.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
{
"extendedEnvs": {
"export-bucket": {
"cube": {
"environment": {
"CUBEJS_DB_EXPORT_BUCKET_TYPE": "s3",
"CUBEJS_DB_EXPORT_BUCKET": "s3://databricks-drivers-tests-preaggs",
"CUBEJS_DB_EXPORT_BUCKET_AWS_KEY": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_REGION": "us-east-1"
}
}
}
},
"cube": {
"environment": {
"CUBEJS_API_SECRET": "mysupersecret",
Expand All @@ -9,11 +22,6 @@
"CUBEJS_DB_DATABRICKS_TOKEN": "${DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN}",
"CUBEJS_DB_DATABRICKS_URL": "${DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL}",
"CUBEJS_DB_DATABRICKS_ACCEPT_POLICY": "true",
"CUBEJS_DB_EXPORT_BUCKET_TYPE": "s3",
"CUBEJS_DB_EXPORT_BUCKET": "s3://databricks-drivers-tests-preaggs",
"CUBEJS_DB_EXPORT_BUCKET_AWS_KEY": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_REGION": "us-east-1",
"CUBEJS_PRE_AGGREGATIONS_SCHEMA": "drivers_tests_preaggs"
},
"ports" : ["4000"]
Expand All @@ -36,8 +44,52 @@
},
"preAggregations": {
"Products": [],
"Customers": [],
"ECommerce": []
"Customers": [
{
"name": "RA",
"measures": ["CUBE.count", "CUBE.runningTotal"]
}
],
"ECommerce": [
{
"name": "SA",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.totalQuantity",
"CUBE.avgDiscount",
"CUBE.totalSales",
"CUBE.totalProfit"
]
},
{
"name": "TA",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "month",
"dimensions": ["CUBE.productName"],
"measures": [
"CUBE.totalQuantity",
"CUBE.avgDiscount",
"CUBE.totalSales",
"CUBE.totalProfit"
]
}
],
"BigECommerce": [
{
"name": "TA",
"time_dimension": "CUBE.orderDate",
"granularity": "month",
"partition_granularity": "year",
"dimensions": ["CUBE.productName", "CUBE.id"],
"measures": [
"CUBE.totalQuantity",
"CUBE.avgDiscount",
"CUBE.totalSales",
"CUBE.totalProfit"
]
}
]
},
"skip": [
"---------------------------------------",
Expand All @@ -56,24 +108,12 @@
"for the ECommerce.TimeAnalysisInternal",
"for the ECommerce.TimeAnalysisExternal",

"---------------------------------------",
"Full tests ",
"---------------------------------------",
"must built pre-aggregations",
"querying Customers: dimentions + order + total + offset",
"querying Customers: dimentions + order + limit + total + offset",
"querying ECommerce: dimentions + order + total + offset",
"querying ECommerce: dimentions + order + limit + total + offset",

"---------------------------------------",
"SKIPED FOR ALL ",
"---------------------------------------",
"querying Products: dimensions -- doesn't work wo ordering",
"querying ECommerce: total quantity, avg discount, total sales, total profit by product + order + total -- rounding in athena",
"querying ECommerce: total sales, total profit by month + order (date) + total -- doesn't work with the BigQuery",
"querying ECommerce: total quantity, avg discount, total sales, total profit by product + order + total -- noisy test",
"querying ECommerce: partitioned pre-agg",
"querying ECommerce: partitioned pre-agg higher granularity",
"querying BigECommerce: partitioned pre-agg"
"querying ECommerce: total quantity, avg discount, total sales, total profit by product + order + total -- noisy test"
]
}
1 change: 1 addition & 0 deletions packages/cubejs-testing-drivers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"databricks-jdbc-driver": "jest --forceExit --runInBand --verbose -i dist/test/databricks-jdbc-driver.test.js",
"databricks-jdbc-core": "jest --forceExit --runInBand --verbose -i dist/test/databricks-jdbc-core.test.js",
"databricks-jdbc-full": "jest --forceExit --runInBand --verbose -i dist/test/databricks-jdbc-full.test.js",
"databricks-jdbc-export-bucket-full": "jest --forceExit --runInBand --verbose -i dist/test/databricks-jdbc-export-bucket-full.test.js",
"mssql-driver": "jest --forceExit --runInBand --verbose -i dist/test/mssql-driver.test.js",
"mssql-core": "jest --forceExit --runInBand --verbose -i dist/test/mssql-core.test.js",
"mssql-full": "jest --forceExit --runInBand --verbose -i dist/test/mssql-full.test.js",
Expand Down
23 changes: 17 additions & 6 deletions packages/cubejs-testing-drivers/src/helpers/getFixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ import path from 'path';
import fs from 'fs-extra';
import { Fixture } from '../types/Fixture';

let fixtures: Fixture;
function deepMerge(a: any, b: any): any {
a = { ...a };
for (const k of Object.keys(b)) {
if (a[k] && typeof a[k] === 'object') {
a[k] = deepMerge(a[k], b[k]);
} else {
a[k] = b[k];
}
}
return a;
}

/**
* Returns fixture by data source type.
*/
export function getFixtures(type: string): Fixture {
if (!fixtures) {
const _path = path.resolve(process.cwd(), `./fixtures/${type}.json`);
const _content = fs.readFileSync(_path, 'utf-8');
fixtures = JSON.parse(_content);
export function getFixtures(type: string, extendedEnv?: string): Fixture {
const _path = path.resolve(process.cwd(), `./fixtures/${type}.json`);
const _content = fs.readFileSync(_path, 'utf-8');
let fixtures = JSON.parse(_content);
if (extendedEnv) {
fixtures = deepMerge(fixtures, fixtures.extendedEnvs[extendedEnv]);
}
return fixtures;
}
Loading

0 comments on commit 73ad72d

Please sign in to comment.