From bdafbf1c994e7af6a117bd138ea86a90ec01e566 Mon Sep 17 00:00:00 2001 From: Amit Kumar Singh Date: Fri, 6 Dec 2024 08:18:01 +0000 Subject: [PATCH] Added HTTP error details provider, refactored HTTP-sink and source package to handle error provider, fix sonar issues and added Validation error for linear retry duration --- .../plugin/http/common/BaseHttpConfig.java | 38 +++- .../http/common/HttpErrorDetailsProvider.java | 93 +++++++++ .../BaseHttpPaginationIterator.java | 86 +++++++-- .../pagination/CustomPaginationIterator.java | 2 +- .../http/common/pagination/page/JsonPage.java | 11 +- .../common/pagination/page/PageFactory.java | 65 ++++++- .../http/sink/batch/HTTPOutputFormat.java | 17 +- .../http/sink/batch/HTTPRecordWriter.java | 180 +++++++++++------- .../cdap/plugin/http/sink/batch/HTTPSink.java | 7 +- .../http/sink/batch/HTTPSinkConfig.java | 19 +- .../plugin/http/sink/batch/MessageBuffer.java | 67 ++++--- .../http/sink/batch/PlaceholderBean.java | 3 +- .../http/source/batch/HttpBatchSource.java | 21 +- .../source/batch/HttpBatchSourceConfig.java | 49 +++-- .../http/source/batch/HttpInputFormat.java | 1 - .../common/DelimitedSchemaDetector.java | 10 +- .../http/source/common/RawStringPerLine.java | 70 ++++++- .../common/HttpBatchSourceConfigTest.java | 6 +- .../pagination/PaginationIteratorTest.java | 3 +- 19 files changed, 549 insertions(+), 199 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java diff --git a/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java index c78b8249..fdf5c376 100644 --- a/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java +++ b/src/main/java/io/cdap/plugin/http/common/BaseHttpConfig.java @@ -349,10 +349,16 @@ public void validate(FailureCollector failureCollector) { // Validate OAuth2 properties if (!containsMacro(PROPERTY_OAUTH2_ENABLED) && this.getOauth2Enabled()) { String reasonOauth2 = "OAuth2 is enabled"; - assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); - assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); - assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); - assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + assertIsSetWithFailureCollector(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2, failureCollector); + assertIsSetWithFailureCollector(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2, failureCollector); + } + + if (!containsMacro(PROPERTY_WAIT_TIME_BETWEEN_PAGES) && waitTimeBetweenPages != null + && waitTimeBetweenPages < 0) { + failureCollector.addFailure("Wait Time Between Pages cannot be a negative number.", + null).withConfigProperty(PROPERTY_WAIT_TIME_BETWEEN_PAGES); } // Validate Authentication properties @@ -361,16 +367,18 @@ public void validate(FailureCollector failureCollector) { case OAUTH2: String reasonOauth2 = "OAuth2 is enabled"; if (!containsMacro(PROPERTY_TOKEN_URL)) { - assertIsSet(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2); + assertIsSetWithFailureCollector(getTokenUrl(), PROPERTY_TOKEN_URL, reasonOauth2, failureCollector); } if (!containsMacro(PROPERTY_CLIENT_ID)) { - assertIsSet(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2); + assertIsSetWithFailureCollector(getClientId(), PROPERTY_CLIENT_ID, reasonOauth2, failureCollector); } if (!containsMacro((PROPERTY_CLIENT_SECRET))) { - assertIsSet(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2); + assertIsSetWithFailureCollector(getClientSecret(), PROPERTY_CLIENT_SECRET, reasonOauth2, + failureCollector); } if (!containsMacro(PROPERTY_REFRESH_TOKEN)) { - assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2); + assertIsSetWithFailureCollector(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2, + failureCollector); } break; case SERVICE_ACCOUNT: @@ -390,10 +398,12 @@ public void validate(FailureCollector failureCollector) { case BASIC_AUTH: String reasonBasicAuth = "Basic Authentication is enabled"; if (!containsMacro(PROPERTY_USERNAME)) { - assertIsSet(getUsername(), PROPERTY_USERNAME, reasonBasicAuth); + assertIsSetWithFailureCollector(getUsername(), PROPERTY_USERNAME, reasonBasicAuth, + failureCollector); } if (!containsMacro(PROPERTY_PASSWORD)) { - assertIsSet(getPassword(), PROPERTY_PASSWORD, reasonBasicAuth); + assertIsSetWithFailureCollector(getPassword(), PROPERTY_PASSWORD, reasonBasicAuth, + failureCollector); } break; } @@ -405,4 +415,12 @@ public static void assertIsSet(Object propertyValue, String propertyName, String String.format("Property '%s' must be set, since %s", propertyName, reason), propertyName); } } + + public static void assertIsSetWithFailureCollector(Object propertyValue, String propertyName, String reason, + FailureCollector failureCollector) { + if (propertyValue == null) { + failureCollector.addFailure(String.format("Property '%s' must be set, since %s", propertyName, reason), + null).withConfigProperty(propertyName); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java new file mode 100644 index 00000000..db1242be --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java @@ -0,0 +1,93 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; + +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Error details provided for the HTTP + **/ +public class HttpErrorDetailsProvider implements ErrorDetailsProvider { + + /** + * Supported document URL. + */ + private static final String SUPPORTED_DOCUMENT_URL = "https://datatracker.ietf.org/doc/html/rfc7231#section-6"; + + /** + * Retrieves the supported document URL. + * + * @return the supported document URL + */ + public static String getSupportedDocumentUrl() { + return SUPPORTED_DOCUMENT_URL; + } + + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext, + ErrorType.USER); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext, + ErrorType.SYSTEM); + } + if (t instanceof InvalidConfigPropertyException) { + return getProgramFailureException((InvalidConfigPropertyException) t, errorContext, + ErrorType.USER); + } + if (t instanceof NoSuchElementException) { + return getProgramFailureException((NoSuchElementException) t, errorContext, + ErrorType.SYSTEM); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error information from {@link Exception}. + * + * @param exception The IllegalArgumentException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(Exception exception, + ErrorContext errorContext, ErrorType errorType) { + String errorMessage = exception.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), errorType, false, + exception); + } +} diff --git a/src/main/java/io/cdap/plugin/http/common/pagination/BaseHttpPaginationIterator.java b/src/main/java/io/cdap/plugin/http/common/pagination/BaseHttpPaginationIterator.java index ba02fdde..d83267b9 100644 --- a/src/main/java/io/cdap/plugin/http/common/pagination/BaseHttpPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/common/pagination/BaseHttpPaginationIterator.java @@ -15,6 +15,11 @@ */ package io.cdap.plugin.http.common.pagination; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.RetryPolicy; import io.cdap.plugin.http.common.error.ErrorHandling; import io.cdap.plugin.http.common.error.HttpErrorHandler; @@ -80,12 +85,30 @@ public BaseHttpPaginationIterator(BaseHttpSourceConfig config, PaginationIterato protected abstract String getNextPageUrl(HttpResponse httpResponse, BasePage page); public abstract boolean supportsSkippingPages(); - protected boolean visitPageAndCheckStatusCode() throws IOException { + protected boolean visitPageAndCheckStatusCode() { if (response != null) { // close previous response - response.close(); + try { + response.close(); + } catch (IOException e) { + String errorReason = String.format("Failed to close response from '%s'", currentPageUrl); + String errorMessage = String.format("Failed to close response from '%s' with message: %s", + currentPageUrl, e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + ErrorType.SYSTEM, true, e); + } } - response = new HttpResponse(getHttpClient().executeHTTP(nextPageUrl)); + try { + response = new HttpResponse(getHttpClient().executeHTTP(nextPageUrl)); + } catch (IOException e) { + String errorMessage = String.format("Failed to execute request to '%s' with message: %s", + nextPageUrl, e.getMessage()); + String errorReason = String.format("Failed to execute request to '%s'", nextPageUrl); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + ErrorType.SYSTEM, true, e); + } currentPageUrl = nextPageUrl; httpStatusCode = response.getStatusCode(); RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); @@ -94,7 +117,7 @@ protected boolean visitPageAndCheckStatusCode() throws IOException { } @Nullable - protected BasePage getNextPage() throws IOException { + protected BasePage getNextPage() { // no more pages if (nextPageUrl == null) { return null; @@ -122,8 +145,18 @@ protected BasePage getNextPage() throws IOException { case SUCCESS: break; case STOP: - throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - nextPageUrl, httpStatusCode, response.getBody())); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode); + String errorReason = String.format( + "Unable to read new page: %s. %s. For more details, see %s", httpStatusCode, + pair.getCorrectiveAction(), HttpErrorDetailsProvider.getSupportedDocumentUrl()); + String errorMessage = String.format( + "Retry failed! Unable to read new page and execute request. " + + "Fetching from '%s' returned http error status code '%s'.", config.getUrl(), + httpStatusCode); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(httpStatusCode), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), null); case SKIP: case SEND: if (!this.supportsSkippingPages()) { @@ -156,16 +189,11 @@ public String getCurrentPageUrl() { * False if no more pages to load or the page loaded has no elements. */ protected boolean ensurePageIterable() { - try { - if (currentPageReturned) { - page = getNextPage(); - currentPageReturned = false; - } - - return page != null && page.hasNext(); // check hasNext() to stop on first empty page. - } catch (IOException e) { - throw new IllegalStateException("Failed to the load page", e); + if (currentPageReturned) { + page = getNextPage(); + currentPageReturned = false; } + return page != null && page.hasNext(); // check hasNext() to stop on first empty page. } // for testing purposes @@ -175,9 +203,9 @@ public HttpClient getHttpClient() { // for testing purposes BasePage createPageInstance(BaseHttpSourceConfig config, HttpResponse httpResponse, - ErrorHandling postRetryStrategy) throws IOException { + ErrorHandling postRetryStrategy) { return PageFactory.createInstance(config, httpResponse, httpErrorHandler, - !postRetryStrategy.equals(ErrorHandling.SUCCESS)); + !postRetryStrategy.equals(ErrorHandling.SUCCESS)); } public PaginationIteratorState getCurrentState() { @@ -204,14 +232,34 @@ public boolean hasNext() { } @Override - public void close() throws IOException { + public void close() { try { if (getHttpClient() != null) { getHttpClient().close(); } + } catch (IOException e) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode); + String errorReason = String.format( + "Failed to close http client: %s. %s. For more details, see %s", httpStatusCode, + pair.getCorrectiveAction(), HttpErrorDetailsProvider.getSupportedDocumentUrl()); + String errorMessage = String.format( + "Failed to close http client with status code %s with message %s.", httpStatusCode, + e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(httpStatusCode), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), e); } finally { if (response != null) { - response.close(); + try { + response.close(); + } catch (IOException e) { + String errorMessage = String.format("Failed to close http response with message: %s.", + e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage, + ErrorType.SYSTEM, true, e); + } } } } diff --git a/src/main/java/io/cdap/plugin/http/common/pagination/CustomPaginationIterator.java b/src/main/java/io/cdap/plugin/http/common/pagination/CustomPaginationIterator.java index d39314f5..d2821942 100644 --- a/src/main/java/io/cdap/plugin/http/common/pagination/CustomPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/common/pagination/CustomPaginationIterator.java @@ -55,7 +55,7 @@ public boolean supportsSkippingPages() { } @Override - public void close() throws IOException { + public void close() { try { super.close(); } finally { diff --git a/src/main/java/io/cdap/plugin/http/common/pagination/page/JsonPage.java b/src/main/java/io/cdap/plugin/http/common/pagination/page/JsonPage.java index 49e9fb6b..1eaa63c1 100644 --- a/src/main/java/io/cdap/plugin/http/common/pagination/page/JsonPage.java +++ b/src/main/java/io/cdap/plugin/http/common/pagination/page/JsonPage.java @@ -59,16 +59,15 @@ class JsonPage extends BasePage { } else { this.insideElementJsonPathPart = config.getResultPath() == null ? "" : config.getResultPath(); } - - if (jsonElement.isJsonArray()) { + if (jsonElement != null && jsonElement.isJsonArray()) { this.iterator = jsonElement.getAsJsonArray().iterator(); - } else if (jsonElement.isJsonObject()) { + } else if (jsonElement != null && jsonElement.isJsonObject()) { this.iterator = Collections.singleton(jsonElement).iterator(); } else { - throw new IllegalArgumentException(String.format("Element found by '%s' json path is expected to be an object " + - "or an array. Primitive found", config.getResultPath())); + throw new IllegalArgumentException( + String.format("Element found by '%s' json path is expected to be an object or an array. Primitive found", + config.getResultPath())); } - this.fieldsMapping = config.getFullFieldsMapping(); } diff --git a/src/main/java/io/cdap/plugin/http/common/pagination/page/PageFactory.java b/src/main/java/io/cdap/plugin/http/common/pagination/page/PageFactory.java index c7e97b6d..505fafa1 100644 --- a/src/main/java/io/cdap/plugin/http/common/pagination/page/PageFactory.java +++ b/src/main/java/io/cdap/plugin/http/common/pagination/page/PageFactory.java @@ -15,11 +15,18 @@ */ package io.cdap.plugin.http.common.pagination.page; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.error.HttpErrorHandler; import io.cdap.plugin.http.common.http.HttpResponse; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; import java.io.IOException; +import java.net.HttpURLConnection; /** * A factory which creates instance of {@BasePage} in accordance to format configured in input config. @@ -27,26 +34,72 @@ */ public class PageFactory { public static BasePage createInstance(BaseHttpSourceConfig config, HttpResponse httpResponse, - HttpErrorHandler httpErrorHandler, boolean isError) throws IOException { + HttpErrorHandler httpErrorHandler, boolean isError) { if (isError) { return new HttpErrorPage(config, httpResponse, httpErrorHandler); } switch(config.getFormat()) { case JSON: - return new JsonPage(config, httpResponse); + try { + return new JsonPage(config, httpResponse); + } catch (Exception e) { + throw getProgramFailureExceptionBasedOnStatusCode(httpResponse, "JSON", e); + } case XML: - return new XmlPage(config, httpResponse); + try { + return new XmlPage(config, httpResponse); + } catch (Exception e) { + throw getProgramFailureExceptionBasedOnStatusCode(httpResponse, "XML", e); + } case TSV: - return new DelimitedPage(config, httpResponse, "\t"); + try { + return new DelimitedPage(config, httpResponse, "\t"); + } catch (IOException e) { + throw getProgramFailureExceptionBasedOnStatusCode(httpResponse, "TSV", e); + } case CSV: - return new DelimitedPage(config, httpResponse, ","); + try { + return new DelimitedPage(config, httpResponse, ","); + } catch (IOException e) { + throw getProgramFailureExceptionBasedOnStatusCode(httpResponse, "CSV", e); + } case TEXT: - return new TextPage(config, httpResponse); + try { + return new TextPage(config, httpResponse); + } catch (IOException e) { + throw getProgramFailureExceptionBasedOnStatusCode(httpResponse, "TEXT", e); + } case BLOB: return new BlobPage(config, httpResponse); default: throw new IllegalArgumentException(String.format("Unsupported page format: '%s'", config.getFormat())); } } + + private static ProgramFailureException getProgramFailureExceptionBasedOnStatusCode( + HttpResponse httpResponse, String fileFormat, Exception e) { + if (httpResponse.getStatusCode() != HttpURLConnection.HTTP_OK) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode( + httpResponse.getStatusCode()); + String errorReason = String.format( + "Failed to read %s page with status code: %s. %s. For more details, see %s", fileFormat, + httpResponse.getStatusCode(), pair.getCorrectiveAction(), + HttpErrorDetailsProvider.getSupportedDocumentUrl()); + String errorMessage = String.format( + "Failed to read %s page with status code: %s, message: %s", fileFormat, + httpResponse.getStatusCode(), e.getMessage()); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + pair.getErrorType(), true, ErrorCodeType.HTTP, + String.valueOf(httpResponse.getStatusCode()), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), e); + } else { + String errorReason = String.format("Failed to read %s page, %s: %s", fileFormat, + e.getClass().getName(), e.getMessage()); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorReason, + ErrorType.SYSTEM, true, e); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java index 867d46be..bf4d6fd9 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPOutputFormat.java @@ -19,6 +19,9 @@ import com.google.gson.Gson; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -37,12 +40,18 @@ public class HTTPOutputFormat extends OutputFormat getRecordWriter(TaskAttemptContext context) - throws IOException { + public RecordWriter getRecordWriter(TaskAttemptContext context) { Configuration hConf = context.getConfiguration(); HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class); - Schema inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); - return new HTTPRecordWriter(config, inputSchema); + Schema inputSchema; + try { + inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY)); + return new HTTPRecordWriter(config, inputSchema); + } catch (IOException e) { + String errorReason = String.format("Failed to parse the input schema with message: %s", e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.USER, false, e); + } } @Override diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java index 2180737b..27b38e73 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java @@ -21,6 +21,12 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.RetryPolicy; import io.cdap.plugin.http.common.error.ErrorHandling; import io.cdap.plugin.http.common.error.HttpErrorHandler; @@ -55,7 +61,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; -import java.net.ProtocolException; import java.net.URI; import java.net.URL; import java.net.URLEncoder; @@ -72,7 +77,6 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -90,9 +94,9 @@ public class HTTPRecordWriter extends RecordWriter placeHolderList; + private final List placeHolderList; private final Map headers; private AccessToken accessToken; @@ -122,7 +126,7 @@ public class HTTPRecordWriter extends RecordWriter true; HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); } - private boolean executeHTTPServiceAndCheckStatusCode() throws IOException { + private boolean executeHTTPServiceAndCheckStatusCode() { LOG.debug("HTTP Request Attempt No. : {}", ++retryCount); - CloseableHttpClient httpClient = createHttpClient(configURL); + // Try-with-resources ensures proper resource management + try (CloseableHttpClient httpClient = createHttpClient(configURL); + CloseableHttpResponse response = executeHttpRequest(httpClient, new URL(configURL))) { + httpStatusCode = response.getStatusLine().getStatusCode(); + httpResponseBody = new HttpResponse(response).getBody(); + RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); + boolean shouldRetry = errorHandlingStrategy.shouldRetry(); + if (!shouldRetry) { + messageBuffer.clear(); + retryCount = 0; + } + return !shouldRetry; + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid URL: " + configURL, e); + } catch (IOException e) { + LOG.warn("Error making {} request to URL {}.", config.getMethod(), config.getUrl()); + String errorMessage = String.format( + "Failed to execute %s request to %s with error code %s with message: %s. ", + config.getMethod(), config.getUrl(), httpStatusCode, e.getMessage()); + throw getProgramFailureException(errorMessage, + "Unable to write record and error clearing message buffer: %s. %s. " + + "For more details, see %s.", e); + } + } - CloseableHttpResponse response = null; + private CloseableHttpResponse executeHttpRequest(CloseableHttpClient httpClient, URL url) { try { - URL url = new URL(configURL); - HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(String.valueOf(url)), - config.getMethod()); - - if (url.getProtocol().equalsIgnoreCase("https")) { - // Disable SSLv3 - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - if (config.getDisableSSLValidation()) { - disableSSLValidation(); - } + HttpEntityEnclosingRequestBase request = new HttpRequest(URI.create(url.toString()), config.getMethod()); + if ("https".equalsIgnoreCase(url.getProtocol())) { + configureHttpsSettings(); } - if (!messageBuffer.isEmpty()) { String requestBodyString = messageBuffer.getMessage(); if (requestBodyString != null) { @@ -205,40 +221,43 @@ private boolean executeHTTPServiceAndCheckStatusCode() throws IOException { request.setHeaders(getRequestHeaders()); - response = httpClient.execute(request); - httpStatusCode = response.getStatusLine().getStatusCode(); - LOG.debug("Response HTTP Status code: {}", httpStatusCode); - httpResponseBody = new HttpResponse(response).getBody(); - - } catch (MalformedURLException | ProtocolException e) { - throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e); + // Execute the request and return the response + return httpClient.execute(request); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException("Error encoding the request Reason: " + e.getMessage(), e); } catch (IOException e) { - LOG.warn("Error making {} request to url {}.", config.getMethod(), config.getUrl()); - } finally { - if (response != null) { - response.close(); - } - } - RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); - boolean shouldRetry = errorHandlingStrategy.shouldRetry(); - if (!shouldRetry) { - messageBuffer.clear(); - retryCount = 0; + throw getProgramFailureException(String.format("Unable to execute HTTP request to URL: %s. " + + "Failed to write record and error clearing message buffer with error code %s with " + + "message: %s.", url, httpStatusCode, e.getMessage()), + "Failed to write record and error clearing message buffer: %s. %s. For more details, " + + "see %s", e); + } catch (Exception e) { + throw getProgramFailureException(String.format( + "Unexpected error occurred, unable to write record and error " + + "clearing message buffer. Failed to execute HTTP request to %s with error code %s " + + "with message: %s.", url, httpStatusCode, e.getMessage()), + "Unexpected error occurred, unable to write record: %s. %s. For more details, " + + "see %s.", e); } - return !shouldRetry; } + private void configureHttpsSettings() { + System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); + if (Boolean.TRUE.equals(config.getDisableSSLValidation())) { + disableSSLValidation(); + } + } - public CloseableHttpClient createHttpClient(String pageUriStr) throws IOException { + public CloseableHttpClient createHttpClient(String pageUriStr) { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); // set timeouts - Long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); - Long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); + long connectTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getConnectTimeout()); + long readTimeoutMillis = TimeUnit.SECONDS.toMillis(config.getReadTimeout()); RequestConfig.Builder requestBuilder = RequestConfig.custom(); - requestBuilder.setSocketTimeout(readTimeoutMillis.intValue()); - requestBuilder.setConnectTimeout(connectTimeoutMillis.intValue()); - requestBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue()); + requestBuilder.setSocketTimeout((int) readTimeoutMillis); + requestBuilder.setConnectTimeout((int) connectTimeoutMillis); + requestBuilder.setConnectionRequestTimeout((int) connectTimeoutMillis); httpClientBuilder.setDefaultRequestConfig(requestBuilder.build()); // basic auth @@ -265,30 +284,32 @@ public CloseableHttpClient createHttpClient(String pageUriStr) throws IOExceptio return httpClientBuilder.build(); } - private Header[] getRequestHeaders() throws IOException { + private Header[] getRequestHeaders() { ArrayList
clientHeaders = new ArrayList<>(); if (accessToken == null || OAuthUtil.tokenExpired(accessToken)) { - accessToken = OAuthUtil.getAccessToken(config); + try { + accessToken = OAuthUtil.getAccessToken(config); + } catch (IOException e) { + String errorReason = String.format("Failed to get access token with message: %s", e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.SYSTEM, true, e); + } } if (accessToken != null) { Header authorizationHeader = getAuthorizationHeader(accessToken); - if (authorizationHeader != null) { - clientHeaders.add(authorizationHeader); - } + clientHeaders.add(authorizationHeader); } headers.put("Request-Method", config.getMethod().toUpperCase()); headers.put("Instance-Follow-Redirects", String.valueOf(config.getFollowRedirects())); headers.put("charset", config.getCharset()); - if (config.getMethod().equals(REQUEST_METHOD_POST) + if ((config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PATCH) - || config.getMethod().equals(REQUEST_METHOD_PUT)) { - if (!headers.containsKey("Content-Type")) { - headers.put("Content-Type", contentType); - } + || config.getMethod().equals(REQUEST_METHOD_PUT)) && !headers.containsKey("Content-Type")) { + headers.put("Content-Type", contentType); } // set default headers @@ -336,7 +357,10 @@ private String updateURLWithPlaceholderValue(StructuredRecord inputRecord) { } return finalURLBuilder.toString(); } catch (UnsupportedEncodingException e) { - throw new IllegalStateException("Error encoding URL with placeholder value. Reason: " + e.getMessage(), e); + String errorReason = String.format("Failed to encode URL with placeholder value with message: %s", + e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.USER, false, e); } } @@ -348,17 +372,12 @@ private void flushMessageBuffer() { return; } contentType = messageBuffer.getContentType(); - try { Awaitility .await().with() .pollInterval(pollInterval) .pollDelay(config.getWaitTimeBetweenPages(), TimeUnit.MILLISECONDS) .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) .until(this::executeHTTPServiceAndCheckStatusCode); - } catch (Exception e) { - throw new RuntimeException("Error while executing http request for remaining input messages " + - "after the batch execution. " + e); - } messageBuffer.clear(); ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode) @@ -368,8 +387,12 @@ private void flushMessageBuffer() { case SUCCESS: break; case STOP: - throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", - config.getUrl(), httpStatusCode, httpResponseBody)); + throw getProgramFailureException(String.format( + "Retry failed! Unable to write and execute request. " + + "Fetching from '%s' returned http error status code '%d' with response '%s'.", + config.getUrl(), httpStatusCode, httpResponseBody), + "Unable to write and execute request: %s. %s. For more details, see %s.", + null); case SKIP: case SEND: LOG.warn(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", @@ -378,7 +401,24 @@ private void flushMessageBuffer() { default: throw new IllegalArgumentException(String.format("Unexpected http error handling: '%s'", postRetryStrategy)); } - } + /** + * Return program failure exception + * + * @param errorMessage + * @param errorInfo + * @param e + * @return + */ + private ProgramFailureException getProgramFailureException(String errorMessage, String errorInfo, + Exception e) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode); + String errorReason = String.format(errorInfo, httpStatusCode, pair.getCorrectiveAction(), + HttpErrorDetailsProvider.getSupportedDocumentUrl()); + return ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage, + pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(httpStatusCode), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), e); + } } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java index f33036f5..f739144f 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSink.java @@ -30,8 +30,10 @@ import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import java.util.Collections; import java.util.List; @@ -45,7 +47,7 @@ @Name("HTTP") @Description("Sink plugin to send the messages from the pipeline to an external http endpoint.") public class HTTPSink extends BatchSink { - private HTTPSinkConfig config; + private final HTTPSinkConfig config; public HTTPSink(HTTPSinkConfig config) { this.config = config; @@ -78,6 +80,8 @@ public void prepareRun(BatchSinkContext context) { inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()); lineageRecorder.recordWrite("Write", String.format("Wrote to HTTP '%s'", config.getUrl()), fields); + context.setErrorDetailsProvider( + new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); context.addOutput(Output.of(config.getReferenceNameOrNormalizedFQN(), new HTTPSink.HTTPOutputFormatProvider(config, inputSchema))); } @@ -108,5 +112,4 @@ public Map getOutputFormatConfiguration() { inputSchema == null ? defaultValidSchema.toString() : inputSchema.toString()); } } - } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java index cb3cc9ff..0be5a78c 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/HTTPSinkConfig.java @@ -200,13 +200,12 @@ public class HTTPSinkConfig extends BaseHttpConfig { private final Integer readTimeout; public HTTPSinkConfig(String referenceName, String url, String method, Integer batchSize, - @Nullable String delimiterForMessages, String messageFormat, @Nullable String body, - @Nullable String requestHeaders, String charset, - boolean followRedirects, boolean disableSSLValidation, @Nullable String httpErrorsHandling, - String errorHandling, String retryPolicy, @Nullable Long linearRetryInterval, - Long maxRetryDuration, @Nullable int readTimeout, @Nullable int connectTimeout, - String oauth2Enabled, String authType, @Nullable String jsonBatchKey, - Boolean writeJsonAsArray) { + @Nullable String delimiterForMessages, String messageFormat, @Nullable String body, + @Nullable String requestHeaders, String charset, boolean followRedirects, + boolean disableSSLValidation, @Nullable String httpErrorsHandling, String errorHandling, + String retryPolicy, @Nullable Long linearRetryInterval, Long maxRetryDuration, + int readTimeout, int connectTimeout, String oauth2Enabled, String authType, + @Nullable String jsonBatchKey, Boolean writeJsonAsArray) { super(referenceName); this.url = url; this.method = method; @@ -473,6 +472,12 @@ public void validate(FailureCollector collector) { .withConfigProperty(MESSAGE_FORMAT); } + if (!containsMacro(PROPERTY_LINEAR_RETRY_INTERVAL) && Objects.nonNull(linearRetryInterval) && + linearRetryInterval < 0) { + collector.addFailure("Linear Retry Interval cannot be a negative number.", null) + .withConfigProperty(PROPERTY_LINEAR_RETRY_INTERVAL); + } + if (!containsMacro(PROPERTY_MAX_RETRY_DURATION) && Objects.nonNull(maxRetryDuration) && maxRetryDuration < 0) { collector.addFailure("Max Retry Duration cannot be a negative number.", null) .withConfigProperty(PROPERTY_MAX_RETRY_DURATION); diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java index 63725d3f..7296950b 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/MessageBuffer.java @@ -19,6 +19,10 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.format.StructuredRecordStringConverter; import io.cdap.plugin.http.common.http.MessageFormatType; @@ -134,26 +138,26 @@ public String getContentType() { /** * Converts the buffer to the appropriate format and returns the message. */ - public String getMessage() throws IOException { + public String getMessage() { return messageFormatter.apply(buffer); } private String formatAsJson(List buffer) { - try { - return formatAsJsonInternal(buffer); - } catch (IOException e) { - throw new IllegalStateException("Error formatting JSON message. Reason: " + e.getMessage(), e); - } + return formatAsJsonInternal(buffer); } - private String formatAsJsonInternal(List buffer) throws IOException { + private String formatAsJsonInternal(List buffer) { boolean useJsonBatchKey = !Strings.isNullOrEmpty(jsonBatchKey); - if (!shouldWriteJsonAsArray || !useJsonBatchKey) { + if (Boolean.TRUE.equals(!shouldWriteJsonAsArray) || !useJsonBatchKey) { return getBufferAsJsonList(); } StructuredRecord wrappedMessageRecord = StructuredRecord.builder(wrappedMessageSchema) .set(jsonBatchKey, buffer).build(); - return StructuredRecordStringConverter.toJsonString(wrappedMessageRecord); + try { + return StructuredRecordStringConverter.toJsonString(wrappedMessageRecord); + } catch (IOException e) { + throw getProgramFailureExceptionDue(e); + } } private String formatAsForm(List buffer) { @@ -168,38 +172,50 @@ private String formatAsCustom(List buffer) { .collect(Collectors.joining(delimiterForMessages)); } - private String getBufferAsJsonList() throws IOException { + private String getBufferAsJsonList() { StringBuilder sb = new StringBuilder(); - String delimiter = shouldWriteJsonAsArray ? "," : delimiterForMessages; - if (shouldWriteJsonAsArray) { + String delimiter = Boolean.TRUE.equals(shouldWriteJsonAsArray) ? "," : delimiterForMessages; + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("["); } for (StructuredRecord record : buffer) { - sb.append(StructuredRecordStringConverter.toJsonString(record)); + try { + sb.append(StructuredRecordStringConverter.toJsonString(record)); + } catch (IOException e) { + throw getProgramFailureExceptionDue(e); + } sb.append(delimiter); } if (!buffer.isEmpty()) { sb.setLength(sb.length() - delimiter.length()); } - if (shouldWriteJsonAsArray) { + if (Boolean.TRUE.equals(shouldWriteJsonAsArray)) { sb.append("]"); } return sb.toString(); } + private static ProgramFailureException getProgramFailureExceptionDue(IOException exception) { + String errorMessage = "Error formatting JSON message. Reason: " + exception.getMessage(); + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, false, exception); + } + private String createFormMessage(StructuredRecord input) { boolean first = true; String formMessage = null; - StringBuilder sb = new StringBuilder(""); - for (Schema.Field field : input.getSchema().getFields()) { - if (first) { - first = false; - } else { - sb.append("&"); + StringBuilder sb = new StringBuilder(); + if (input != null && input.getSchema() != null) { + for (Schema.Field field : input.getSchema().getFields()) { + if (first) { + first = false; + } else { + sb.append("&"); + } + sb.append(field.getName()); + sb.append("="); + sb.append((String) input.get(field.getName())); } - sb.append(field.getName()); - sb.append("="); - sb.append((String) input.get(field.getName())); } try { formMessage = URLEncoder.encode(sb.toString(), charset); @@ -212,13 +228,13 @@ private String createFormMessage(StructuredRecord input) { private String createCustomMessage(StructuredRecord input) { String customMessage = customMessageBody; Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); - HashMap findReplaceMap = new HashMap(); + HashMap findReplaceMap = new HashMap<>(); while (matcher.find()) { if (input.get(matcher.group(1)) != null) { findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1))); } else { throw new IllegalArgumentException(String.format( - "Field %s doesnt exist in the input schema.", matcher.group(1))); + "Field %s doesn't exist in the input schema.", matcher.group(1))); } } Matcher replaceMatcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage); @@ -228,5 +244,4 @@ private String createCustomMessage(StructuredRecord input) { } return customMessage; } - } diff --git a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java index 1520f8e0..d219df1c 100644 --- a/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java +++ b/src/main/java/io/cdap/plugin/http/sink/batch/PlaceholderBean.java @@ -22,13 +22,12 @@ public class PlaceholderBean { private static final String PLACEHOLDER_FORMAT = "#%s"; private final String placeHolderKey; - private final String placeHolderKeyWithPrefix; private final int startIndex; private final int endIndex; public PlaceholderBean(String url, String placeHolderKey) { + String placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.placeHolderKey = placeHolderKey; - this.placeHolderKeyWithPrefix = String.format(PLACEHOLDER_FORMAT, placeHolderKey); this.startIndex = url.indexOf(placeHolderKeyWithPrefix); this.endIndex = startIndex + placeHolderKeyWithPrefix.length(); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java index 9fff34d5..943c800d 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSource.java @@ -24,6 +24,9 @@ import io.cdap.cdap.api.data.format.UnexpectedFormatException; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.InvalidEntry; @@ -31,9 +34,10 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.http.common.pagination.page.BasePage; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; @@ -88,11 +92,18 @@ public void prepareRun(BatchSourceContext context) { .setFqn(config.getUrl()).build(); LineageRecorder lineageRecorder = new LineageRecorder(context, asset); lineageRecorder.createExternalDataset(schema); - lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), - Preconditions.checkNotNull(schema.getFields()).stream() - .map(Schema.Field::getName) - .collect(Collectors.toList())); + try { + lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()), + Preconditions.checkNotNull(schema.getFields()).stream().map(Schema.Field::getName) + .collect(Collectors.toList())); + } catch (NullPointerException e) { + String errorReason = "Schema is not set"; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.USER, false, e); + } + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(HttpErrorDetailsProvider.class.getName())); context.setInput(Input.of(config.getReferenceNameOrNormalizedFQN(), new HttpInputFormatProvider(config))); } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java index 0f96f996..c0b32ef8 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpBatchSourceConfig.java @@ -17,7 +17,6 @@ import com.google.common.base.Strings; import com.google.gson.JsonSyntaxException; -import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.http.common.http.AuthType; import io.cdap.plugin.http.common.http.HttpClient; @@ -59,19 +58,14 @@ public void validate(FailureCollector failureCollector) { } public void validateCredentials(FailureCollector collector) { - try { - if (getAuthType() == AuthType.OAUTH2) { - validateOAuth2Credentials(collector); - } else if (getAuthType() == AuthType.BASIC_AUTH) { - validateBasicAuthCredentials(collector); - } - } catch (IOException e) { - String errorMessage = "Unable to authenticate the given info : " + e.getMessage(); - collector.addFailure(errorMessage, null); + if (getAuthType() == AuthType.OAUTH2) { + validateOAuth2Credentials(collector); + } else if (getAuthType() == AuthType.BASIC_AUTH) { + validateBasicAuthCredentials(collector); } } - private void validateOAuth2Credentials(FailureCollector collector) throws IOException { + private void validateOAuth2Credentials(FailureCollector collector) { if (!containsMacro(PROPERTY_CLIENT_ID) && !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_TOKEN_URL) && !containsMacro(PROPERTY_REFRESH_TOKEN) && !containsMacro(PROPERTY_PROXY_PASSWORD) && !containsMacro(PROPERTY_PROXY_USERNAME) && @@ -93,36 +87,39 @@ private void validateOAuth2Credentials(FailureCollector collector) throws IOExce } catch (JsonSyntaxException | HttpHostConnectException e) { String errorMessage = "Error occurred during credential validation : " + e.getMessage(); collector.addFailure(errorMessage, null); + } catch (IOException e) { + String errorMessage = String.format("Failed to validate OAuth for the request with message: %s", + e.getMessage()); + collector.addFailure(errorMessage, null); } } } - public void validateBasicAuthCredentials(FailureCollector collector) throws IOException { - try { - if (!containsMacro(PROPERTY_URL) && !containsMacro(PROPERTY_USERNAME) && !containsMacro(PROPERTY_PASSWORD) && - !containsMacro(PROPERTY_PROXY_USERNAME) && !containsMacro(PROPERTY_PROXY_PASSWORD) - && !containsMacro(PROPERTY_PROXY_URL)) { - HttpClient httpClient = new HttpClient(this); - validateBasicAuthResponse(collector, httpClient); - } - } catch (HttpHostConnectException e) { - String errorMessage = "Error occurred during credential validation : " + e.getMessage(); - collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); + public void validateBasicAuthCredentials(FailureCollector collector) { + if (!containsMacro(PROPERTY_URL) && !containsMacro(PROPERTY_USERNAME) && !containsMacro(PROPERTY_PASSWORD) && + !containsMacro(PROPERTY_PROXY_USERNAME) && !containsMacro(PROPERTY_PROXY_PASSWORD) + && !containsMacro(PROPERTY_PROXY_URL)) { + HttpClient httpClient = new HttpClient(this); + validateBasicAuthResponse(collector, httpClient); } } - public void validateBasicAuthResponse(FailureCollector collector, HttpClient httpClient) throws IOException { + public void validateBasicAuthResponse(FailureCollector collector, HttpClient httpClient) { try (CloseableHttpResponse response = httpClient.executeHTTP(getUrl())) { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != HttpStatus.SC_OK) { HttpEntity entity = response.getEntity(); if (entity != null) { String errorResponse = EntityUtils.toString(entity, "UTF-8"); - String errorMessage = String.format("Credential validation request failed with Http Status code: '%d', " + - "Response: '%s'", statusCode, errorResponse); + String errorMessage = + String.format("Credential validation request failed with status code: '%d', with response: '%s'", + statusCode, errorResponse); collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); } } + } catch (Exception e) { + String errorMessage = String.format("Error validating credentials for request with message: %s ", e.getMessage()); + collector.addFailure(errorMessage, "Please ensure that correct credentials are provided."); } } @@ -195,7 +192,7 @@ public static class HttpBatchSourceConfigBuilder { private String password; - public HttpBatchSourceConfigBuilder setReferenceName (String referenceName) { + public HttpBatchSourceConfigBuilder setReferenceName(String referenceName) { this.referenceName = referenceName; return this; } diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java index 69cee410..9eb89a04 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpInputFormat.java @@ -15,7 +15,6 @@ */ package io.cdap.plugin.http.source.batch; -import io.cdap.plugin.http.common.pagination.page.BasePage; import io.cdap.plugin.http.common.pagination.page.PageEntry; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; diff --git a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java index d7bc3665..14261dad 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java +++ b/src/main/java/io/cdap/plugin/http/source/common/DelimitedSchemaDetector.java @@ -46,12 +46,12 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter if (rowIndex == 0) { columnNames = DataTypeDetectorUtils.setColumnNames(line, config.getCsvSkipFirstRow(), config.getEnableQuotesValues(), delimiter); - if (config.getCsvSkipFirstRow()) { + if (Boolean.TRUE.equals(config.getCsvSkipFirstRow())) { continue; } } - DataTypeDetectorUtils.detectDataTypeOfRowValues(new HashMap<>(), dataTypeDetectorStatusKeeper, columnNames, - rowValue); + DataTypeDetectorUtils.detectDataTypeOfRowValues( + new HashMap<>(), dataTypeDetectorStatusKeeper, columnNames, rowValue); } dataTypeDetectorStatusKeeper.validateDataTypeDetector(); } catch (Exception e) { @@ -60,8 +60,8 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter .withStacktrace(e.getStackTrace()); return null; } - List fields = DataTypeDetectorUtils.detectDataTypeOfEachDatasetColumn( - new HashMap<>(), columnNames, dataTypeDetectorStatusKeeper); + List fields = DataTypeDetectorUtils.detectDataTypeOfEachDatasetColumn(new HashMap<>(), + columnNames, dataTypeDetectorStatusKeeper); return Schema.recordOf("text", fields); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java index 2943d0d1..8049442a 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java +++ b/src/main/java/io/cdap/plugin/http/source/common/RawStringPerLine.java @@ -16,14 +16,19 @@ package io.cdap.plugin.http.source.common; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.http.common.HttpErrorDetailsProvider; import io.cdap.plugin.http.common.http.HttpResponse; import java.io.BufferedReader; import java.io.Closeable; import java.io.IOException; import java.io.InputStreamReader; +import java.net.HttpURLConnection; import java.util.Iterator; -import java.util.NoSuchElementException; /** * Class that reads the raw string from the HTTP response and returns it line by line. @@ -38,17 +43,33 @@ public RawStringPerLine(HttpResponse httpResponse) { this.httpResponse = httpResponse; } - private BufferedReader getBufferedReader() throws IOException { + private BufferedReader getBufferedReader() { if (bufferedReader == null) { + try { this.bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getInputStream())); + } catch (IOException e) { + String errorMessage = "Unable to create a buffered reader for the http response."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + } } return bufferedReader; } @Override - public void close() throws IOException { + public void close() { if (bufferedReader != null) { + try { bufferedReader.close(); + } catch (IOException e) { + String errorReason = "Unable to close the buffered reader for the http response."; + String errorMessage = String.format( + "Unable to close the buffered reader for the http response, %s: %s", + e.getClass().getName(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + errorMessage, ErrorType.SYSTEM, false, e); + } } } @@ -61,14 +82,53 @@ public boolean hasNext() { isLineRead = true; return lastLine != null; } catch (IOException e) { // we need to catch this, since hasNext() does not have "throws" in parent - throw new RuntimeException("Failed to read line from http page buffer", e); + if (httpResponse.getStatusCode() != HttpURLConnection.HTTP_OK) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode( + httpResponse.getStatusCode()); + String errorReason = String.format( + "Unable to read line from http page buffer: %s. %s. For more details, see %s", + httpResponse.getStatusCode(), pair.getCorrectiveAction(), + HttpErrorDetailsProvider.getSupportedDocumentUrl()); + String errorMessage = String.format( + "Unable to read line from http page buffer with code: %s, message: %s", + httpResponse.getStatusCode(), e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + errorMessage, pair.getErrorType(), true, ErrorCodeType.HTTP, + String.valueOf(httpResponse.getStatusCode()), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), e); + } else { + String errorReason = String.format( + "Unable to read line from http page buffer with message: %s", e.getMessage()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + errorReason, ErrorType.SYSTEM, true, e); + } } } @Override public String next() { if (!hasNext()) { // calling hasNext will also read the line; - throw new NoSuchElementException(); + if (httpResponse.getStatusCode() != HttpURLConnection.HTTP_OK) { + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(httpResponse.getStatusCode()); + String errorReason = String.format( + "Failed to read the next line with error code: %s. %s. For more details, see %s", + httpResponse.getStatusCode(), pair.getCorrectiveAction(), + HttpErrorDetailsProvider.getSupportedDocumentUrl()); + String errorMessage = String.format( + "Failed to read the next line with error code: %s. %s", + httpResponse.getStatusCode(), pair.getCorrectiveAction()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + errorMessage, pair.getErrorType(), true, ErrorCodeType.HTTP, + String.valueOf(httpResponse.getStatusCode()), + HttpErrorDetailsProvider.getSupportedDocumentUrl(), null); + } else { + String errorReason = "Failed to read the next line."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorReason, ErrorType.SYSTEM, true, null); + } } isLineRead = false; return lastLine; diff --git a/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java index d864086f..99493751 100644 --- a/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/http/source/common/HttpBatchSourceConfigTest.java @@ -220,9 +220,9 @@ public void testValidConfigWithInvalidResponse() throws IOException { Mockito.when(response.getEntity()).thenReturn(entity); config.validateBasicAuthResponse(failureCollector, httpClient); Assert.assertEquals(1, failureCollector.getValidationFailures().size()); - Assert.assertEquals("Credential validation request failed with Http Status code: '400', Response: 'null'", - failureCollector - .getValidationFailures().get(0).getMessage()); + Assert.assertEquals( + "Credential validation request failed with status code: '400', with response: 'null'", + failureCollector.getValidationFailures().get(0).getMessage()); } } diff --git a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java index e8cd81bc..e898f907 100644 --- a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java +++ b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.plugin.http.common.http.HttpClient; import io.cdap.plugin.http.common.pagination.BaseHttpPaginationIterator; import io.cdap.plugin.http.common.pagination.PaginationIteratorFactory; @@ -198,7 +199,7 @@ class TestConfig extends BaseTestConfig { assertResults(results, responses, config); } - @Test(expected = IllegalStateException.class) + @Test(expected = ProgramFailureException.class) public void testErrorHttpStatus() throws IOException { class TestConfig extends BaseTestConfig { TestConfig(String referenceName) {