Skip to content

Commit

Permalink
Added ErrorTypeMappingFile
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi committed Jan 23, 2025
1 parent 1a7ee03 commit 01eb986
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.auth.Credentials;
import com.google.cloud.StringEnumValue;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
Expand All @@ -38,6 +39,7 @@
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.common.BigQueryErrorCodes;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,8 +88,8 @@ public void run(ActionContext context) {
GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
} catch (Exception e) {
context.getFailureCollector().addFailure(
String.format("Failed to load service account credentials: %s", e.getMessage()), null)
.withStacktrace(e.getStackTrace());
String.format("Failed to load service account credentials, %s: %s",
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
context.getFailureCollector().getOrThrowException();
}
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Expand All @@ -99,10 +101,16 @@ public void run(ActionContext context) {
// Wait for the query to complete
try {
queryJob.waitFor();
} catch (InterruptedException e) {
String errorMessage = String.format("The query job was interrupted: %s", e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, errorMessage, ErrorType.SYSTEM, true, e);
} catch (BigQueryException | InterruptedException e) {
String errorMessage = String.format("The query job was interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
if (e instanceof BigQueryException) {
throw BigQueryErrorCodes.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), e);
}
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}

// Check for errors
Expand All @@ -114,10 +122,16 @@ public void run(ActionContext context) {
TableResult queryResults;
try {
queryResults = queryJob.getQueryResults();
} catch (InterruptedException e) {
String errorMessage = String.format("The query job was interrupted: %s", e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, errorMessage, ErrorType.UNKNOWN, false, e);
} catch (BigQueryException | InterruptedException e) {
String errorMessage = String.format("The query job was interrupted, %s: %s",
e.getClass().getName(), e.getMessage());
if (e instanceof BigQueryException) {
throw BigQueryErrorCodes.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), e);
}
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, false, e);
}
if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.gcp.bigquery.action;

import com.google.api.client.http.HttpResponseException;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand Down Expand Up @@ -54,6 +55,7 @@
import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.BigQueryErrorCodes;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
Expand Down Expand Up @@ -135,8 +137,8 @@ public void run(ActionContext context) {
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountFilePath());
} catch (IOException e) {
collector.addFailure(String.format("Failed to load service account credentials: %s", e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.addFailure(String.format("Failed to load service account credentials, %s: %s",
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
context.getFailureCollector().getOrThrowException();
}
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout());
Expand All @@ -163,24 +165,30 @@ public void run(ActionContext context) {
try {
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
} catch (Throwable e) {
String errorMessage = String.format(
"Failed to execute query with exponential backoff, %s: %s", e.getClass().getName(),
e.getMessage());
if (e instanceof BigQueryException) {
throw BigQueryErrorCodes.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), (Exception) e);
}
if (e instanceof Exception) {
String error =
String.format("Failed to execute query with exponential backoff with message: %s", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error,
ErrorType.USER, true, GCPUtils.BQ_SUPPORTED_DOC_URL);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}
String errorMessage = String.format("Failed to execute query with exponential backoff with message: %s",
e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, errorMessage, ErrorType.UNKNOWN, true, null);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, null);
}
} else {
try {
executeQuery(bigQuery, queryConfig, context);
} catch (Exception e) {
String errorMessage = String.format("Failed to execute query with message: %s", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorMessage, ErrorType.UNKNOWN,
true, GCPUtils.BQ_SUPPORTED_DOC_URL);
String errorMessage = String.format("Failed to execute query, %s: %s",
e.getClass().getName(), e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorMessage,
ErrorType.UNKNOWN, true, GCPUtils.BQ_SUPPORTED_DOC_URL);
}
}
}
Expand Down Expand Up @@ -212,7 +220,7 @@ private RetryPolicy<Object> getRetryPolicy() {
}

private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context)
throws InterruptedException, BigQueryJobExecutionException {
throws BigQueryJobExecutionException {
// Location must match that of the dataset(s) referenced in the query.
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
Job queryJob;
Expand All @@ -225,14 +233,22 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,

// Wait for the query to complete
queryJob = queryJob.waitFor();
} catch (BigQueryException e) {
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage());
if (RETRY_ON_REASON.contains(e.getError().getReason())) {
throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
} catch (BigQueryException | InterruptedException e) {
String errorMessage = String.format("Failed to execute query, %s: %s", e.getClass().getName(),
e.getMessage());
if (e instanceof BigQueryException) {
LOG.error("The query job {} failed. Error: {}", jobId.getJob(),
((BigQueryException) e).getError().getMessage());
if (RETRY_ON_REASON.contains(((BigQueryException) e).getError().getReason())) {
throw new BigQueryJobExecutionException(((BigQueryException) e).getError().getMessage(),
e);
}
throw BigQueryErrorCodes.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), e);
}
String error = String.format("Failed to execute query with message: %s", e.getMessage());
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, error, ErrorType.UNKNOWN,
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}

// Check for errors
Expand All @@ -249,7 +265,20 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
error, error, ErrorType.UNKNOWN, true, null);
}

TableResult queryResults = queryJob.getQueryResults();
TableResult queryResults;
try {
queryResults = queryJob.getQueryResults();
} catch (BigQueryException | InterruptedException e) {
String errorMessage = String.format("Failed to retrieve query result, %s: %s",
e.getClass().getName(), e.getMessage());
if (e instanceof BigQueryException) {
throw BigQueryErrorCodes.getProgramFailureException(errorMessage,
((BigQueryException) e).getReason(), e);
}
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.UNKNOWN, true, e);
}
long rows = queryResults.getTotalRows();

if (config.shouldSetAsArguments()) {
Expand Down Expand Up @@ -691,11 +720,12 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
bigQuery.create(JobInfo.of(queryJobConfiguration));
} catch (BigQueryException e) {
final String errorMessage;
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
errorMessage = String.format("Resource was not found. Please verify the resource name. If the resource " +
"will be created at runtime, then update to use a macro for the resource name. Error message received " +
"was: %s", e.getMessage());
} else {
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
errorMessage = String.format(
"Resource was not found. Please verify the resource name. If the resource will be "
+ "created at runtime, then update to use a macro for the resource name. "
+ "Error message received was %s, %s", e.getClass().getName(), e.getMessage());
} else {
errorMessage = e.getMessage();
}
failureCollector.addFailure(String.format("%s. Error code: %s.", errorMessage, e.getCode()),
Expand Down

0 comments on commit 01eb986

Please sign in to comment.