Skip to content

Commit

Permalink
add handling of issues in dataform-co#1805
Browse files Browse the repository at this point in the history
1. only set metadata when dryRun is false
2. add marker to row count query part of an assertion. check for marker in create query job and don't execute query if dryRun is true
  • Loading branch information
mkamysz committed Jan 21, 2025
1 parent 377c0c7 commit 3859441
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 64 deletions.
2 changes: 2 additions & 0 deletions cli/api/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ export class Runner {
// (i.e. it must still be RUNNING, and not FAILED).
actionResult.status === dataform.ActionResult.ExecutionStatus.RUNNING &&
!(this.graph.runConfig && this.graph.runConfig.disableSetMetadata) &&
// Only set metadata if not using BigQuery dry run
!this.executionOptions.bigquery?.dryRun &&
action.type === "table"
) {
try {
Expand Down
131 changes: 69 additions & 62 deletions cli/api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,72 +330,79 @@ export class BigQueryDbAdapter implements IDbAdapter {
return retry(
async () => {
try {
const job = await this.getClient().createQueryJob({
useLegacySql: false,
jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""),
query,
params,
labels,
location,
dryRun
});
const resultStream = job[0].getQueryResultsStream();
return new Promise<IExecutionResult>((resolve, reject) => {
if (isCancelled) {
resultStream.end();
reject(new Error("Query cancelled."));
return;
}
onCancel?.(() => {
resultStream.end();
reject(new Error("Query cancelled."));
if (dryRun && query.includes('-- @dry-run:skip-results-check')) {
// If the above evaluated to true, this means we would be running the row count query of an assertion (see execution_sql.ts).
// This must be stopped during a dry run, as the view is not actually created. Therefore we skip the creation of a query job
// and instead directly return.
return { rows: [], metadata: {} };
} else {
const job = await this.getClient().createQueryJob({
useLegacySql: false,
jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""),
query,
params,
labels,
location,
dryRun
});
const resultStream = job[0].getQueryResultsStream();
return new Promise<IExecutionResult>((resolve, reject) => {
if (isCancelled) {
resultStream.end();
reject(new Error("Query cancelled."));
return;
}
onCancel?.(() => {
resultStream.end();
reject(new Error("Query cancelled."));
});

const results = new LimitedResultSet({
rowLimit,
byteLimit
});
resultStream
.on("error", e => {
// Dry run queries against BigQuery done by this package eagerly fail with
// "Not found: job". This is a workaround to avoid that.
// Example: https://github.com/googleapis/python-bigquery/issues/118.
if (dryRun && e.message?.includes("Not found: Job")) {
resolve({ rows: [], metadata: {} });
}
reject(e);
})
.on("data", row => {
if (!results.push(row)) {
resultStream.end();
}
})
.on("end", async () => {
try {
const [jobMetadata] = await job[0].getMetadata();
if (!!jobMetadata.status?.errorResult) {
reject(new Error(jobMetadata.status.errorResult.message));
return;
const results = new LimitedResultSet({
rowLimit,
byteLimit
});
resultStream
.on("error", e => {
// Dry run queries against BigQuery done by this package eagerly fail with
// "Not found: job". This is a workaround to avoid that.
// Example: https://github.com/googleapis/python-bigquery/issues/118.
if (dryRun && e.message?.includes("Not found: Job")) {
resolve({ rows: [], metadata: {} });
}
resolve({
rows: results.rows,
metadata: {
bigquery: {
jobId: jobMetadata.jobReference.jobId,
totalBytesBilled: jobMetadata.statistics.query.totalBytesBilled
? Long.fromString(jobMetadata.statistics.query.totalBytesBilled)
: Long.ZERO,
totalBytesProcessed: jobMetadata.statistics.query.totalBytesProcessed
? Long.fromString(jobMetadata.statistics.query.totalBytesProcessed)
: Long.ZERO
}
}
});
} catch (e) {
reject(e);
}
});
});
})
.on("data", row => {
if (!results.push(row)) {
resultStream.end();
}
})
.on("end", async () => {
try {
const [jobMetadata] = await job[0].getMetadata();
if (!!jobMetadata.status?.errorResult) {
reject(new Error(jobMetadata.status.errorResult.message));
return;
}
resolve({
rows: results.rows,
metadata: {
bigquery: {
jobId: jobMetadata.jobReference.jobId,
totalBytesBilled: jobMetadata.statistics.query.totalBytesBilled
? Long.fromString(jobMetadata.statistics.query.totalBytesBilled)
: Long.ZERO,
totalBytesProcessed: jobMetadata.statistics.query.totalBytesProcessed
? Long.fromString(jobMetadata.statistics.query.totalBytesProcessed)
: Long.ZERO
}
}
});
} catch (e) {
reject(e);
}
});
});
}
} catch (e) {
throw coerceAsError(e);
}
Expand Down
10 changes: 8 additions & 2 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,18 @@ from (${query}) as insertions`;

public assertTasks(
assertion: dataform.IAssertion,
projectConfig: dataform.IProjectConfig
projectConfig: dataform.IProjectConfig,
): Tasks {
const tasks = new Tasks();
const target = assertion.target;
// Create the view to check syntax of assertion
tasks.add(Task.statement(this.createOrReplaceView(target, assertion.query)));
tasks.add(Task.assertion(`select sum(1) as row_count from ${this.resolveTarget(target)}`));

// Add assertion check with special placeholder that will be recognized during execution
tasks.add(Task.assertion(`
-- @dry-run:skip-results-check
select sum(1) as row_count from ${this.resolveTarget(target)}
`));
return tasks;
}

Expand Down

0 comments on commit 3859441

Please sign in to comment.