Skip to content

Commit

Permalink
Merge pull request #1509 from cloudsufi/cherry-pick/5821daa4891c6ad39…
Browse files Browse the repository at this point in the history
…95b142aa3e474f606cb58a8

[🍒][PLUGIN-1849] Error Management for BigQuery Action plugin
  • Loading branch information
psainics authored Jan 31, 2025
2 parents ad2503e + e582a67 commit 338eb41
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.auth.Credentials;
import com.google.cloud.StringEnumValue;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
Expand All @@ -33,8 +34,12 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,34 +75,74 @@ public AbstractBigQueryActionConfig getConfig() {
}

@Override
public void run(ActionContext context) throws Exception {
public void run(ActionContext context) {
config.validate(context.getFailureCollector());

QueryJobConfiguration queryConfig = config.getQueryJobConfiguration(context.getFailureCollector());
JobId jobId = JobId.newBuilder().setRandomJob().build();

// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountFilePath());
Credentials credentials = null;
try {
credentials = config.getServiceAccount() == null ? null :
GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
} catch (Exception e) {
context.getFailureCollector().addFailure(
String.format("Failed to load service account credentials, %s: %s",
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
context.getFailureCollector().getOrThrowException();
}
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

LOG.info("Executing SQL as job {}.", jobId.getJob());
LOG.debug("The BigQuery SQL {}", queryConfig.getQuery());

// Wait for the query to complete
queryJob.waitFor();
try {
queryJob.waitFor();
} catch (BigQueryException e) {
String errorMessage = String.format("The bigquery query job failed, %s: %s",
e.getClass().getName(), e.getMessage());
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, (e).getReason(), e);
} catch (InterruptedException e) {
String errorMessage = String.format("The bigquery query job interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}

// Check for errors
if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
String errorReason = String.format(
"The bigquery job failed with reason: %s. For more details, see %s",
queryJob.getStatus().getError().getReason(), GCPUtils.BQ_SUPPORTED_DOC_URL);
ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
queryJob.getStatus().getExecutionErrors().toString(), type, true, null, null,
GCPUtils.BQ_SUPPORTED_DOC_URL, null);
}
TableResult queryResults;
try {
queryResults = queryJob.getQueryResults();
} catch (BigQueryException e) {
String errorMessage = String.format("The bigquery query job failed, %s: %s",
e.getClass().getName(), e.getMessage());
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
} catch (InterruptedException e) {
String errorMessage = String.format("The bigquery query job interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, false, e);
}

TableResult queryResults = queryJob.getQueryResults();
if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"",
queryResults.getTotalRows()));
String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
queryResults.getTotalRows());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

Schema schema = queryResults.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
Expand Down Expand Up @@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map<String, String> argumentCo
String nonExistingColumnNames = argumentConditionMap.keySet().stream()
.filter(columnName -> !argumentConditionFields.containsKey(columnName))
.collect(Collectors.joining(" ,"));
throw new RuntimeException(String.format(
String error = String.format(
"Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.",
nonExistingColumnNames));
nonExistingColumnNames);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, null);
}

static void checkIfArgumentsColumnsListExistsInSource(
Expand Down
114 changes: 93 additions & 21 deletions src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,20 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.cdap.etl.common.Constants;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil;
import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,7 +99,7 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
}

@Override
public void run(ActionContext context) throws Exception {
public void run(ActionContext context) {
FailureCollector collector = context.getFailureCollector();
config.validate(collector, context.getArguments().asMap());
QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(config.getSql());
Expand Down Expand Up @@ -125,9 +131,16 @@ public void run(ActionContext context) throws Exception {
builder.setUseLegacySql(config.isLegacySQL());

// API request - starts the query.
Credentials credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountFilePath());
Credentials credentials = null;
try {
credentials = config.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountFilePath());
} catch (IOException e) {
collector.addFailure(String.format("Failed to load service account credentials, %s: %s",
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout());
//create dataset to store the results if not exists
if (config.getStoreResults() && !Strings.isNullOrEmpty(datasetName) &&
Expand All @@ -152,23 +165,46 @@ public void run(ActionContext context) throws Exception {
try {
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
} catch (Throwable e) {
throw new RuntimeException(e);
String errorMessage = String.format(
"Failed to execute query with exponential backoff, %s: %s", e.getClass().getName(),
e.getMessage());
if (e instanceof BigQueryException) {
throw BigQueryErrorUtil.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), (Exception) e);
}
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}
} else {
executeQuery(bigQuery, queryConfig, context);
try {
executeQuery(bigQuery, queryConfig, context);
} catch (Exception e) {
String errorMessage = String.format("The bigquery query execution failed, %s: %s",
e.getClass().getName(), e.getMessage());
String errorReason = null;
if (e instanceof BigQueryException) {
errorReason = ((BigQueryException) e).getReason();
}
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, errorReason, e);
}
}
}

protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
QueryJobConfiguration queryConfig, ActionContext context)
throws Throwable {
QueryJobConfiguration queryConfig, ActionContext context) {
try {
Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
} catch (FailsafeException e) {
String errorReason = String.format("The bigquery query execution failed with message: %s",
e.getMessage());
if (e.getCause() != null) {
throw e.getCause();
errorReason = String.format("The bigquery query execution failed with message: %s",
e.getCause().getMessage());
}
throw e;
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
e.getCause() == null ? e : e.getCause(), errorReason, ErrorType.UNKNOWN, true,
GCPUtils.BQ_SUPPORTED_DOC_URL);
}
}

Expand All @@ -185,7 +221,7 @@ private RetryPolicy<Object> getRetryPolicy() {
}

private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context)
throws InterruptedException, BigQueryJobExecutionException {
throws BigQueryJobExecutionException {
// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
Job queryJob;
Expand All @@ -199,25 +235,60 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
// Wait for the query to complete
queryJob = queryJob.waitFor();
} catch (BigQueryException e) {
String errorMessage = String.format("The bigquery query execution failed, %s: %s",
e.getClass().getName(), e.getMessage());
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage());
if (RETRY_ON_REASON.contains(e.getError().getReason())) {
throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
}
throw new RuntimeException(e);
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
} catch (InterruptedException e) {
String errorMessage = String.format("The bigquery query execution interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}

// Check for errors
if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError());
LOG.error("The query job {} failed with reason: {} and error: {}.", jobId.getJob(),
queryJob.getStatus().getError().getReason(),
queryJob.getStatus().getExecutionErrors().toString());
if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) {
throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
}
throw new RuntimeException(queryJob.getStatus().getError().getMessage());
String errorReason = String.format(
"The bigquery query execution failed due to reason: %s and error: %s. "
+ "For more details, see %s", queryJob.getStatus().getError().getReason(),
queryJob.getStatus().getExecutionErrors().toString(), GCPUtils.BQ_SUPPORTED_DOC_URL);
String errorMessage = String.format(
"The bigquery query execution failed due to reason: %s , error: %s and message: %s",
queryJob.getStatus().getError().getReason(),
queryJob.getStatus().getExecutionErrors().toString(),
queryJob.getStatus().getError().getMessage());
ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage,
type, true, null, null, GCPUtils.BQ_SUPPORTED_DOC_URL, null);
}

TableResult queryResults;
try {
queryResults = queryJob.getQueryResults();
} catch (BigQueryException e) {
String errorMessage = String.format("Failed to retrieve query result, %s: %s",
e.getClass().getName(), e.getMessage());
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
} catch (InterruptedException e) {
String errorMessage = String.format("Query result retrieval was interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}

TableResult queryResults = queryJob.getQueryResults();
long rows = queryResults.getTotalRows();

if (config.shouldSetAsArguments()) {
Expand Down Expand Up @@ -659,11 +730,12 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
bigQuery.create(JobInfo.of(queryJobConfiguration));
} catch (BigQueryException e) {
final String errorMessage;
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
errorMessage = String.format("Resource was not found. Please verify the resource name. If the resource " +
"will be created at runtime, then update to use a macro for the resource name. Error message received " +
"was: %s", e.getMessage());
} else {
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
errorMessage = String.format(
"Resource was not found. Please verify the resource name. If the resource will be "
+ "created at runtime, then update to use a macro for the resource name. "
+ "Error message received was %s: %s", e.getClass().getName(), e.getMessage());
} else {
errorMessage = e.getMessage();
}
failureCollector.addFailure(String.format("%s. Error code: %s.", errorMessage, e.getCode()),
Expand Down
Loading

0 comments on commit 338eb41

Please sign in to comment.