Skip to content

Commit

Permalink
feat(cubesql): WHERE SQL push down (#7808)
Browse files Browse the repository at this point in the history
* feat(cubesql): `WHERE` SQL push down

* Remove TODO as we need to allocate names for segments
  • Loading branch information
paveltiunov authored Feb 24, 2024
1 parent 60aad90 commit 98b5709
Show file tree
Hide file tree
Showing 26 changed files with 558 additions and 56 deletions.
1 change: 1 addition & 0 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ class ApiGateway {
...query,
measures: (query.measures || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
dimensions: (query.dimensions || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
segments: (query.segments || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
};
}

Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-api-gateway/src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const querySchema = Joi.object().keys({
Joi.object().pattern(id, Joi.valid('asc', 'desc')),
Joi.array().items(Joi.array().min(2).ordered(id, Joi.valid('asc', 'desc')))
),
segments: Joi.array().items(id),
segments: Joi.array().items(Joi.alternatives(id, memberExpression)),
timezone: Joi.string(),
limit: Joi.number().integer().min(1),
offset: Joi.number().integer().min(0),
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-api-gateway/src/types/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ interface Query {
dimensions?: (Member | TimeMember | MemberExpression)[];
filters?: (QueryFilter | LogicalAndFilter | LogicalOrFilter)[];
timeDimensions?: QueryTimeDimension[];
segments?: Member[];
segments?: (Member | MemberExpression)[];
limit?: null | number;
offset?: number;
total?: boolean;
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ impl TransportService for NodeBridgeTransport {
)
.await?;

if let Some(error) = response.get("error") {
if let Some(stack) = response.get("stack") {
if let Some(error) = response.get("error").and_then(|e| e.as_str()) {
if let Some(stack) = response.get("stack").and_then(|e| e.as_str()) {
return Err(CubeError::user(format!(
"Error during SQL generation: {}\n{}",
error, stack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export class DatabricksQuery extends BaseQuery {
templates.functions.LTRIM = 'LTRIM({{ args|reverse|join(", ") }})';
templates.functions.RTRIM = 'RTRIM({{ args|reverse|join(", ") }})';
templates.functions.DATEDIFF = 'DATEDIFF({{ date_part }}, DATE_TRUNC(\'{{ date_part }}\', {{ args[1] }}), DATE_TRUNC(\'{{ date_part }}\', {{ args[2] }}))';
templates.expressions.timestamp_literal = 'from_utc_timestamp(\'{{ value }}\', \'UTC\')';
return templates;
}
}
1 change: 1 addition & 0 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2500,6 +2500,7 @@ export class BaseQuery {
statements: {
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +
'FROM (\n {{ from }}\n) AS {{ from_alias }} \n' +
'{% if filter %} WHERE {{ filter }}{% endif %}' +
'{% if group_by %} GROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}' +
'{% if order_by %} ORDER BY {{ order_by | map(attribute=\'expr\') | join(\', \') }}{% endif %}' +
'{% if limit %}\nLIMIT {{ limit }}{% endif %}' +
Expand Down
44 changes: 38 additions & 6 deletions packages/cubejs-schema-compiler/src/adapter/BaseSegment.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
import type { BaseQuery } from './BaseQuery';

export class BaseSegment {
public readonly expression: any;

public readonly expressionCubeName: any;

public readonly expressionName: any;

public readonly isMemberExpression: boolean = false;

public constructor(
protected readonly query: BaseQuery,
public readonly segment: string
) {}
public readonly segment: string | any
) {
if (segment.expression) {
this.expression = segment.expression;
this.expressionCubeName = segment.cubeName;
this.expressionName = segment.expressionName || `${segment.cubeName}.${segment.name}`;
this.isMemberExpression = !!segment.definition;
}
}

public filterToWhere() {
return this.segmentSql();
}

public segmentSql() {
if (this.expression) {
return this.query.evaluateSymbolSql(this.expressionCubeName, this.expressionName, this.definition(), 'segment');
}
return this.query.segmentSql(this);
}

Expand All @@ -19,6 +41,11 @@ export class BaseSegment {
}

public definition() {
if (this.expression) {
return {
sql: this.expression
};
}
return this.segmentDefinition();
}

Expand All @@ -27,6 +54,9 @@ export class BaseSegment {
}

public cube() {
if (this.expression) {
return this.query.cubeEvaluator.cubeFromPath(this.expressionCubeName);
}
return this.query.cubeEvaluator.cubeFromPath(this.segment);
}

Expand All @@ -35,14 +65,16 @@ export class BaseSegment {
}

public path() {
if (this.expression) {
return null;
}
return this.query.cubeEvaluator.parsePath('segments', this.segment);
}

public expressionPath() {
// TODO expression support
// if (this.expression) {
// return `expr:${this.expression.expressionName}`;
// }
if (this.expression) {
return `expr:${this.expression.expressionName}`;
}
return this.query.cubeEvaluator.pathFromArray(this.path());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ export class BigqueryQuery extends BaseQuery {
templates.expressions.binary = '{% if op == \'%\' %}MOD({{ left }}, {{ right }}){% else %}({{ left }} {{ op }} {{ right }}){% endif %}';
templates.expressions.interval = 'INTERVAL {{ interval }}';
templates.expressions.extract = 'EXTRACT({% if date_part == \'DOW\' %}DAYOFWEEK{% elif date_part == \'DOY\' %}DAYOFYEAR{% else %}{{ date_part }}{% endif %} FROM {{ expr }})';
templates.expressions.timestamp_literal = 'TIMESTAMP(\'{{ value }}\')';
return templates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class PostgresQuery extends BaseQuery {
templates.functions.DATEDIFF = 'CASE WHEN LOWER(\'{{ date_part }}\') IN (\'year\', \'quarter\', \'month\') THEN (EXTRACT(YEAR FROM AGE(DATE_TRUNC(\'{{ date_part }}\', {{ args[2] }}), DATE_TRUNC(\'{{ date_part }}\', {{ args[1] }}))) * 12 + EXTRACT(MONTH FROM AGE(DATE_TRUNC(\'{{ date_part }}\', {{ args[2] }}), DATE_TRUNC(\'{{ date_part }}\', {{ args[1] }})))) / CASE LOWER(\'{{ date_part }}\') WHEN \'year\' THEN 12 WHEN \'quarter\' THEN 3 WHEN \'month\' THEN 1 END ELSE EXTRACT(EPOCH FROM DATE_TRUNC(\'{{ date_part }}\', {{ args[2] }}) - DATE_TRUNC(\'{{ date_part }}\', {{ args[1] }})) / EXTRACT(EPOCH FROM \'1 {{ date_part }}\'::interval) END::bigint';
templates.expressions.interval = 'INTERVAL \'{{ interval }}\'';
templates.expressions.extract = 'EXTRACT({{ date_part }} FROM {{ expr }})';
templates.expressions.timestamp_literal = 'timestamptz \'{{ value }}\'';

return templates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export class PrestodbQuery extends BaseQuery {
'{% if limit %}\nLIMIT {{ limit }}{% endif %}';
templates.expressions.extract = 'EXTRACT({{ date_part }} FROM {{ expr }})';
templates.expressions.interval = 'INTERVAL \'{{ num }}\' {{ date_part }}';
templates.expressions.timestamp_literal = 'from_iso8601_timestamp(\'{{ value }}\')';
return templates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class SnowflakeQuery extends BaseQuery {
templates.functions.BTRIM = 'TRIM({{ args_concat }})';
templates.expressions.extract = 'EXTRACT({{ date_part }} FROM {{ expr }})';
templates.expressions.interval = 'INTERVAL \'{{ interval }}\'';
templates.expressions.timestamp_literal = '\'{{ value }}\'::timestamp_tz';
return templates;
}
}
113 changes: 106 additions & 7 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
CubeError,
};
use chrono::{Days, NaiveDate};
use chrono::{Days, NaiveDate, SecondsFormat, TimeZone, Utc};
use cubeclient::models::V1LoadRequestQuery;
use datafusion::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -165,6 +165,48 @@ lazy_static! {
static ref DATE_PART_REGEX: Regex = Regex::new("^[A-Za-z_ ]+$").unwrap();
}

macro_rules! generate_sql_for_timestamp {
(@generic $value:ident, $value_block:expr, $sql_generator:expr, $sql_query:expr) => {
if let Some($value) = $value {
let value = $value_block.to_rfc3339_opts(SecondsFormat::Millis, true);
(
$sql_generator
.get_sql_templates()
.timestamp_literal_expr(value)
.map_err(|e| {
DataFusionError::Internal(format!(
"Can't generate SQL for timestamp: {}",
e
))
})?,
$sql_query,
)
} else {
("NULL".to_string(), $sql_query)
}
};
($value:ident, timestamp, $sql_generator:expr, $sql_query:expr) => {
generate_sql_for_timestamp!(
@generic $value, { Utc.timestamp_opt($value as i64, 0).unwrap() }, $sql_generator, $sql_query
)
};
($value:ident, timestamp_millis_opt, $sql_generator:expr, $sql_query:expr) => {
generate_sql_for_timestamp!(
@generic $value, { Utc.timestamp_millis_opt($value as i64).unwrap() }, $sql_generator, $sql_query
)
};
($value:ident, timestamp_micros, $sql_generator:expr, $sql_query:expr) => {
generate_sql_for_timestamp!(
@generic $value, { Utc.timestamp_micros($value as i64).unwrap() }, $sql_generator, $sql_query
)
};
($value:ident, $method:ident, $sql_generator:expr, $sql_query:expr) => {
generate_sql_for_timestamp!(
@generic $value, { Utc.$method($value as i64) }, $sql_generator, $sql_query
)
};
}

impl CubeScanWrapperNode {
pub async fn generate_sql(
&self,
Expand Down Expand Up @@ -310,7 +352,7 @@ impl CubeScanWrapperNode {
window_expr,
from,
joins: _joins,
filter_expr: _filter_expr,
filter_expr,
having_expr: _having_expr,
limit,
offset,
Expand Down Expand Up @@ -443,6 +485,20 @@ impl CubeScanWrapperNode {
)
.await?;

let (filter, sql) = Self::generate_column_expr(
plan.clone(),
schema.clone(),
filter_expr.clone(),
sql,
generator.clone(),
&column_remapping,
&mut next_remapping,
alias.clone(),
can_rename_columns,
ungrouped_scan_node.clone(),
)
.await?;

let (window, sql) = Self::generate_column_expr(
plan.clone(),
schema.clone(),
Expand Down Expand Up @@ -548,6 +604,17 @@ impl CubeScanWrapperNode {
})
.collect::<Result<_>>()?,
);
load_request.segments = Some(
filter
.iter()
.map(|m| {
Self::ungrouped_member_def(
m,
&ungrouped_scan_node.used_cubes,
)
})
.collect::<Result<_>>()?,
);
if !order_expr.is_empty() {
load_request.order = Some(
order_expr
Expand Down Expand Up @@ -651,7 +718,16 @@ impl CubeScanWrapperNode {
aggregate,
// TODO
from_alias.unwrap_or("".to_string()),
None,
if !filter.is_empty() {
Some(
filter
.iter()
.map(|f| f.expr.to_string())
.join(" AND "),
)
} else {
None
},
None,
order,
limit,
Expand Down Expand Up @@ -1284,10 +1360,33 @@ impl CubeScanWrapperNode {
}
}
// ScalarValue::Date64(_) => {}
// ScalarValue::TimestampSecond(_, _) => {}
// ScalarValue::TimestampMillisecond(_, _) => {}
// ScalarValue::TimestampMicrosecond(_, _) => {}
// ScalarValue::TimestampNanosecond(_, _) => {}
ScalarValue::TimestampSecond(s, _) => {
generate_sql_for_timestamp!(s, timestamp, sql_generator, sql_query)
}
ScalarValue::TimestampMillisecond(ms, None) => {
generate_sql_for_timestamp!(
ms,
timestamp_millis_opt,
sql_generator,
sql_query
)
}
ScalarValue::TimestampMicrosecond(ms, None) => {
generate_sql_for_timestamp!(
ms,
timestamp_micros,
sql_generator,
sql_query
)
}
ScalarValue::TimestampNanosecond(nanoseconds, None) => {
generate_sql_for_timestamp!(
nanoseconds,
timestamp_nanos,
sql_generator,
sql_query
)
}
ScalarValue::IntervalYearMonth(x) => {
if let Some(x) = x {
let (num, date_part) = (x, "MONTH");
Expand Down
Loading

0 comments on commit 98b5709

Please sign in to comment.