diff --git a/data-prepper-pipeline-parser/src/main/resources/rules/opensearch_api-rule.yaml b/data-prepper-pipeline-parser/src/main/resources/rules/opensearch_api-rule.yaml deleted file mode 100644 index 3d8d5bcbf3..0000000000 --- a/data-prepper-pipeline-parser/src/main/resources/rules/opensearch_api-rule.yaml +++ /dev/null @@ -1,3 +0,0 @@ -apply_when: - - "$..source.opensearch_api" - - "$..source.opensearch_api.s3_bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/opensearch_api-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/opensearch_api-template.yaml deleted file mode 100644 index 14b9699bd3..0000000000 --- a/data-prepper-pipeline-parser/src/main/resources/templates/opensearch_api-template.yaml +++ /dev/null @@ -1,50 +0,0 @@ -"<>": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - opensearch_api: "<<$.<>.source.opensearch_api>>" - acknowledgments: "<<$.<>.source.opensearch_api.aws.acknowledgments>>" - delete_s3_objects_on_read: "<<$.<>.source.opensearch_api.aws.delete_s3_objects_on_read>>" - sink: - - s3: - aws: - region: "<<$.<>.source.opensearch_api.s3_region>>" - sts_role_arn: "<<$.<>.source.opensearch_api.aws.sts_role_arn>>" -# sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" -# sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.opensearch_api.s3_bucket>>" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "256kb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "api/" - codec: - event_json: -"<>-s3": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - s3: - codec: - event_json: - aws: - region: "<<$.<>.source.opensearch_api.s3_region>>" - sts_role_arn: "<<$.<>.source.opensearch_api.aws.sts_role_arn>>" -# sts_external_id: "<<$.<>.source.opensearch_api.aws.sts_external_id>>" -# sts_header_overrides: "<<$.<>.source.opensearch_api.aws.sts_header_overrides>>" - acknowledgments: "<<$.<>.source.opensearch_api.aws.acknowledgments>>" - delete_s3_objects_on_read: "<<$.<>.source.opensearch_api.aws.delete_s3_objects_on_read>>" - scan: - buckets: - - bucket: - name: "<<$.<>.source.opensearch_api.s3_bucket>>" - scheduling: - interval: "60s" - processor: "<<$.<>.processor>>" - sink: "<<$.<>.sink>>" - routes: "<<$.<>.routes>>" \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-api-source/src/main/resources/test_cert.crt b/data-prepper-plugins/http-source/src/test/resources/test_cert.crt similarity index 100% rename from data-prepper-plugins/opensearch-api-source/src/main/resources/test_cert.crt rename to data-prepper-plugins/http-source/src/test/resources/test_cert.crt diff --git a/data-prepper-plugins/opensearch-api-source/src/main/resources/test_decrypted_key.key b/data-prepper-plugins/http-source/src/test/resources/test_decrypted_key.key similarity index 100% rename from data-prepper-plugins/opensearch-api-source/src/main/resources/test_decrypted_key.key rename to data-prepper-plugins/http-source/src/test/resources/test_decrypted_key.key diff --git a/data-prepper-plugins/opensearch-api-source/README.md b/data-prepper-plugins/opensearch-api-source/README.md deleted file mode 100644 index 19d9a5a543..0000000000 --- a/data-prepper-plugins/opensearch-api-source/README.md +++ /dev/null @@ -1,122 +0,0 @@ -# Log HTTP Source - -This is a source plugin that supports HTTP protocol. Currently ONLY support Json UTF-8 codec for incoming request, e.g. -`[{"key1": "value1"}, {"key2": "value2"}]`. - - -## Usages -To get started with HTTP source, create the following `pipeline.yaml` configuration: -```yaml -source: - http: -``` - -### Response status - -* `200`: the request data has been successfully written into the buffer. -* `400`: the request data is either in mal-format or unsupported codec. -* `408`: the request data fails to be written into the buffer within the timeout. -* `413`: the request data size is larger than the configured capacity. -* `429`: the request has been rejected due to the HTTP source executor being in full capacity. - -## Configurations - -* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```2021```. -* path (Optional) => A `string` which represents the URI path for log ingestion. It should start with `/` and length should be at least 1. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`. -* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false` -* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false` -* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```. -* thread_count (Optional) => An `int` larger than 0 represents the number of threads to keep in the ScheduledThreadPool. Default is `200`. -* max_connection_count (Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`. -* max_pending_requests (Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`. -* authentication (Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information. -* compression (Optional) : The compression type applied on the client request payload. Defaults to `none`. Supported values are: - * `none`: no compression - * `gzip`: apply GZip de-compression on the incoming request. - -### Authentication Configurations - -By default, the HTTP source input is unauthenticated. - -The following is an example of how to run the server with HTTP Basic authentication: - -```yaml -source: - http: - authentication: - http_basic: - username: my-user - password: my_s3cr3t -``` - -You can also explicitly disable authentication with: - -```yaml -source: - http: - authentication: - unauthenticated: -``` - -This plugin uses pluggable authentication for HTTP servers. To provide custom authentication, -create a plugin which implements [`ArmeriaHttpAuthenticationProvider`](../armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/ArmeriaHttpAuthenticationProvider.java) - - -### SSL - -* ssl(Optional) => A `boolean` that enables TLS/SSL. Default is ```false```. -* ssl_certificate_file(Optional) => A `String` that represents the SSL certificate chain file path or AWS S3 path. S3 path example `s3:///`. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. -* ssl_key_file(Optional) => A `String` that represents the SSL key file path or AWS S3 path. S3 path example `s3:///`. Only decrypted key file is supported. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. -* use_acm_certificate_for_ssl(Optional) : A `boolean` that enables TLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default is `false`. -* acm_certificate_arn(Optional) : A `String` that represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required if `use_acm_certificate_for_ssl` is set to `true`. -* acm_private_key_password(Optional): A `String` that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated. -* acm_certificate_timeout_millis(Optional) : An `int` that represents the timeout in milliseconds for ACM to get certificates. Default value is `120000`. -* aws_region(Optional) : A `String` that represents the AWS region to use `ACM`, `S3`. Required if `use_acm_certificate_for_ssl` is set to `true` or `ssl_certificate_file` and `ssl_key_file` is `AWS S3`. - -### Example to enable SSL using OpenSSL - -Create the following http source configuration in your `pipeline.yaml`. - -```yaml -source: - http: - ssl: true - ssl_certificate_file: "/full/path/to/certfile.crt" - ssl_key_file: "/full/path/to/keyfile.key" -``` - -Generate a private key named `keyfile.key`, along with a self-signed certificate file named `certfile.crt`. - -``` -openssl req -nodes -new -x509 -keyout keyfile.key -out certfile.crt -subj "/L=test/O=Example Com Inc./OU=Example Com Inc. Root CA/CN=Example Com Inc. Root CA" -``` - -Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` for the http source configuration with the actual paths of the files. - -Send a sample log with the following https curl command - -``` -curl -k -XPOST -H "Content-Type: application/json" -d '[{"log": "sample log"}]' https://localhost:2021/log/ingest -``` - -# Metrics - -### Counter -- `requestsReceived`: measures total number of requests received by `/log/ingest` endpoint. -- `requestsRejected`: measures total number of requests rejected (429 response status code) by HTTP source plugin. -- `successRequests`: measures total number of requests successfully processed (200 response status code) by HTTP source plugin. -- `badRequests`: measures total number of requests with invalid content type or format processed by HTTP source plugin (400 response status code). -- `requestTimeouts`: measures total number of requests that time out in the HTTP source server (415 response status code). -- `requestsTooLarge`: measures total number of requests of which the events size in the content is larger than the buffer capacity (413 response status code). -- `internalServerError`: measures total number of requests processed by the HTTP source with custom exception type (500 response status code). - -### Timer -- `requestProcessDuration`: measures latency of requests processed by the HTTP source plugin in seconds. - -### Distribution Summary -- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes. - -## Developer Guide -This plugin is compatible with Java 14. See -- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) -- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) diff --git a/data-prepper-plugins/opensearch-api-source/build.gradle b/data-prepper-plugins/opensearch-api-source/build.gradle deleted file mode 100644 index 72720add49..0000000000 --- a/data-prepper-plugins/opensearch-api-source/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -plugins { - id 'java' -} - -dependencies { - implementation project(':data-prepper-api') - implementation project(':data-prepper-plugins:blocking-buffer') - implementation project(':data-prepper-plugins:http-source-common') - implementation project(':data-prepper-plugins:common') - implementation project(':data-prepper-plugins:armeria-common') - implementation libs.armeria.core - implementation libs.commons.io - implementation 'software.amazon.awssdk:acm' - implementation 'software.amazon.awssdk:s3' - implementation 'software.amazon.awssdk:apache-client' - testImplementation project(':data-prepper-api').sourceSets.test.output - testImplementation 'org.assertj:assertj-core:3.25.3' -} - -jacocoTestCoverageVerification { - dependsOn jacocoTestReport - violationRules { - rule { //in addition to core projects rule - limit { - minimum = 0.90 - } - } - } -} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/AwsAuthenticationOptions.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/AwsAuthenticationOptions.java deleted file mode 100644 index e19c8438b6..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/AwsAuthenticationOptions.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.Size; -import software.amazon.awssdk.regions.Region; - -import java.util.Map; - -public class AwsAuthenticationOptions { - @JsonProperty("region") - @Size(min = 1, message = "Region cannot be empty string") - private String awsRegion; - - @JsonProperty("sts_role_arn") - @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") - private String awsStsRoleArn; - -// @JsonProperty("sts_external_id") -// @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") -// private String awsStsExternalId; -// -// @JsonProperty("sts_header_overrides") -// @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") -// private Map awsStsHeaderOverrides; - - public Region getAwsRegion() { - return awsRegion != null ? Region.of(awsRegion) : null; - } - - public String getAwsStsRoleArn() { - return awsStsRoleArn; - } - -// public String getAwsStsExternalId() { -// return awsStsExternalId; -// } -// -// public Map getAwsStsHeaderOverrides() { -// return awsStsHeaderOverrides; -// } -} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIService.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIService.java deleted file mode 100644 index 162551a4e0..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIService.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import com.linecorp.armeria.server.ServiceRequestContext; -import com.linecorp.armeria.server.annotation.Put; -import org.opensearch.dataprepper.http.common.codec.JsonCodec; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.*; -import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.opensearch.dataprepper.model.record.Record; -import com.linecorp.armeria.common.AggregatedHttpRequest; -import com.linecorp.armeria.common.HttpData; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.server.annotation.Blocking; -import com.linecorp.armeria.server.annotation.Post; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; -import io.micrometer.core.instrument.Timer; -import org.opensearch.dataprepper.plugins.source.opensearch_api.codec.MultiLineJsonCodec; -import org.opensearch.dataprepper.plugins.source.opensearch_api.model.BulkActionRequest; -import org.opensearch.dataprepper.plugins.source.opensearch_api.model.MetadataKeyAttributes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/* -* A OpenSearch API Service. -*/ -@Blocking -public class OpenSearchAPIService { - public static final String REQUESTS_RECEIVED = "requestsReceived"; - public static final String SUCCESS_REQUESTS = "successRequests"; - public static final String PAYLOAD_SIZE = "payloadSize"; - public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; - - private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class); - - // TODO: support other data-types as request body, e.g. json_lines, msgpack - private final MultiLineJsonCodec jsonCodec = new MultiLineJsonCodec(); - private final Buffer> buffer; - private final int bufferWriteTimeoutInMillis; - private final Counter requestsReceivedCounter; - private final Counter successRequestsCounter; - private final DistributionSummary payloadSizeSummary; - private final Timer requestProcessDuration; - - public OpenSearchAPIService(final int bufferWriteTimeoutInMillis, - final Buffer> buffer, - final PluginMetrics pluginMetrics) { - this.buffer = buffer; - this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; - - requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); - successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); - payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); - requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); - } - - @Post("/_bulk") - @Put - public HttpResponse doPostBulk(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest) throws Exception { - requestsReceivedCounter.increment(); - payloadSizeSummary.record(aggregatedHttpRequest.content().length()); - - if(serviceRequestContext.isTimedOut()) { - return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT); - } - - return requestProcessDuration.recordCallable(() -> processBulkRequest(aggregatedHttpRequest)); - } - - private HttpResponse processBulkRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception { - final HttpData content = aggregatedHttpRequest.content(); - List> jsonList; - - try { - jsonList = jsonCodec.parse(content); - } catch (IOException e) { - LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); - throw new IOException("Bad request data format. Needs to be json array.", e.getCause()); - } - try { - if (buffer.isByteBuffer()) { - // jsonList is ignored in this path but parse() was done to make - // sure that the data is in the expected json format - buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis); - } else { - - List> records = new ArrayList<>(); - int idx = 0; - for (; idx jsonEntry = jsonList.get(idx); - BulkActionRequest request = new BulkActionRequest(jsonEntry); - boolean isValidBulkAction = Arrays.stream(OpenSearchBulkActions.values()) - .anyMatch(bulkAction -> bulkAction == OpenSearchBulkActions.fromOptionValue(request.getAction())); - if (isValidBulkAction) { - - final boolean isDeleteAction = request.getAction().equals(OpenSearchBulkActions.DELETE.toString()); - final JacksonEvent event = isDeleteAction ? - JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).build() : - JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonList.get(idx + 1)).build(); - event.getMetadata().setAttribute(MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, request.getAction()); - event.getMetadata().setAttribute(MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE_INDEX, request.getIndex()); - String docId = request.getId(); - if (docId != null && !docId.isEmpty() && !docId.isBlank()) { - event.getMetadata().setAttribute(MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE_ID, request.getId()); - } - - // Skip processing next line - if (isDeleteAction) idx++; - records.add(new Record<>(event)); - } - } - buffer.writeAll(records, bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - LOG.error("Failed to write the request of size {} due to: {}", content.length(), e.getMessage()); - throw e; - } - successRequestsCounter.increment(); - return HttpResponse.of(HttpStatus.OK); - } - - private Record buildRecordLog(String json) { - final JacksonEvent log = JacksonEvent.builder() - .withData(json) - .getThis() - .build(); - - return new Record<>(log); - } - - private boolean isValidBulkAction(String action) { - return Arrays.stream(OpenSearchBulkActions.values()) - .anyMatch(bulkAction -> bulkAction == OpenSearchBulkActions.fromOptionValue(action)); - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISource.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISource.java deleted file mode 100644 index c54081f2bc..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISource.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import com.linecorp.armeria.server.HttpService; -import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; -import com.linecorp.armeria.server.throttling.ThrottlingService; -import org.opensearch.dataprepper.HttpRequestExceptionHandler; -import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; -import org.opensearch.dataprepper.http.common.LogThrottlingRejectHandler; -import org.opensearch.dataprepper.http.common.LogThrottlingStrategy; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.codec.ByteDecoder; -import org.opensearch.dataprepper.model.codec.JsonDecoder; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.http.common.certificate.CertificateProviderFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Collections; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.function.Function; - -@DataPrepperPlugin(name = "opensearch_api", pluginType = Source.class, pluginConfigurationType = OpenSearchAPISourceConfig.class) -public class OpenSearchAPISource implements Source> { - private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPISource.class); - private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; - public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; - static final String SERVER_CONNECTIONS = "serverConnections"; - - private final OpenSearchAPISourceConfig sourceConfig; - private final CertificateProviderFactory certificateProviderFactory; - private final ArmeriaHttpAuthenticationProvider authenticationProvider; - private final HttpRequestExceptionHandler httpRequestExceptionHandler; - private final String pipelineName; - private Server server; - private final PluginMetrics pluginMetrics; - private static final String HTTP_HEALTH_CHECK_PATH = "/health"; - private ByteDecoder byteDecoder; - - @DataPrepperPluginConstructor - public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, - final PipelineDescription pipelineDescription) { - this.sourceConfig = sourceConfig; - this.pluginMetrics = pluginMetrics; - this.pipelineName = pipelineDescription.getPipelineName(); - this.byteDecoder = new JsonDecoder(); - this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig); - final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); - final PluginSetting authenticationPluginSetting; - - if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) { - LOG.warn("Creating http source without authentication. This is not secure."); - LOG.warn("In order to set up Http Basic authentication for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#authentication-configurations"); - } - - if(authenticationConfiguration != null) { - authenticationPluginSetting = - new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()); - } else { - authenticationPluginSetting = - new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); - } - authenticationPluginSetting.setPipelineName(pipelineName); - authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting); - httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics); - } - - @Override - public void start(final Buffer> buffer) { - if (buffer == null) { - throw new IllegalStateException("Buffer provided is null"); - } - if (server == null) { - final ServerBuilder sb = Server.builder(); - - sb.disableServerHeader(); - - if (sourceConfig.isSsl()) { - LOG.info("Creating http source with SSL/TLS enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - // TODO: enable encrypted key with password - sb.https(sourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating http source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); - sb.http(sourceConfig.getPort()); - } - - if(sourceConfig.getAuthentication() != null) { - final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); - - if (sourceConfig.isUnauthenticatedHealthCheck()) { - optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); - } else { - optionalAuthDecorator.ifPresent(sb::decorator); - } - } - - sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); - sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis())); - if(sourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes()); - } - final int threads = sourceConfig.getThreadCount(); - final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); - sb.blockingTaskExecutor(blockingTaskExecutor, true); - final int maxPendingRequests = sourceConfig.getMaxPendingRequests(); - final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( - maxPendingRequests, blockingTaskExecutor.getQueue()); - final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); - - final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); - final OpenSearchAPIService openSearchAPIService = new OpenSearchAPIService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); - - if (CompressionOption.NONE.equals(sourceConfig.getCompression())) { - sb.annotatedService(httpSourcePath, openSearchAPIService, httpRequestExceptionHandler); - } else { - sb.annotatedService(httpSourcePath, openSearchAPIService, DecodingService.newDecorator(), httpRequestExceptionHandler); - } - - if (sourceConfig.hasHealthCheckService()) { - LOG.info("HTTP source health check is enabled"); - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - server = sb.build(); - pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); - } - - try { - server.start().get(); - } catch (ExecutionException ex) { - if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { - throw (RuntimeException) ex.getCause(); - } else { - throw new RuntimeException(ex); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ex); - } - LOG.info("Started http source on port " + sourceConfig.getPort() + "..."); - } - - @Override - public ByteDecoder getDecoder() { - return byteDecoder; - } - - @Override - public void stop() { - if (server != null) { - try { - server.stop().get(); - } catch (ExecutionException ex) { - if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { - throw (RuntimeException) ex.getCause(); - } else { - throw new RuntimeException(ex); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ex); - } - } - LOG.info("Stopped http source."); - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceConfig.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceConfig.java deleted file mode 100644 index deed02007e..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import jakarta.validation.Valid; -import jakarta.validation.constraints.*; -import org.opensearch.dataprepper.http.common.HttpServerConfig; -import com.fasterxml.jackson.annotation.JsonProperty; - -public class OpenSearchAPISourceConfig extends HttpServerConfig { - static final String DEFAULT_LOG_INGEST_URI = "/"; - static final int DEFAULT_PORT = 9202; - - @JsonProperty("port") - @Min(0) - @Max(65535) - private int port = DEFAULT_PORT; - - @JsonProperty("path_prefix") - @Size(min = 1, message = "path length should be at least 1") - private String path_prefix = DEFAULT_LOG_INGEST_URI; - - @AssertTrue(message = "path should start with /") - boolean isPathValid() { - return path_prefix.startsWith("/"); - } - - @JsonProperty("acknowledgments") - private Boolean acknowledgments = false; - - @JsonProperty("s3_bucket") - private String s3Bucket; - - @JsonProperty("s3_prefix") - private String s3Prefix; - - @JsonProperty("s3_region") - private String s3Region; - - @JsonProperty("aws") - @NotNull - @Valid - private AwsAuthenticationOptions awsConfig; - - @JsonProperty("delete_s3_objects_on_read") - private boolean deleteS3ObjectsOnRead = false; - - public int getPort() { - return port; - } - - public String getPath() { - return path_prefix; - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/codec/MultiLineJsonCodec.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/codec/MultiLineJsonCodec.java deleted file mode 100644 index 25f1cf7e1b..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/codec/MultiLineJsonCodec.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.opensearch_api.codec; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.linecorp.armeria.common.HttpData; -import org.opensearch.dataprepper.http.common.codec.Codec; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class MultiLineJsonCodec implements Codec>> { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final TypeReference> MAP_TYPE_REFERENCE = - new TypeReference>() {}; - @Override - public List> parse(HttpData httpData) throws IOException { - List> jsonList = new ArrayList<>(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(httpData.toInputStream())); - - while(reader.ready()) { - final String jsonLine = reader.readLine(); - if (jsonLine.isBlank() || jsonLine.isEmpty()) continue; - jsonList.add(objectMapper.readValue(jsonLine, MAP_TYPE_REFERENCE)); - } - - return jsonList; - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/BulkActionRequest.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/BulkActionRequest.java deleted file mode 100644 index 04f1600f73..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/BulkActionRequest.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.opensearch_api.model; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.Map; - -public class BulkActionRequest { - - static final ObjectMapper objectMapper = new ObjectMapper(); - - private String action; - private Map requestModel; - - public BulkActionRequest(Map json) throws JsonProcessingException { - this.requestModel = json; - this.action = (String) requestModel.keySet().stream().findFirst().orElse(""); - } - - public String getId() { - return ((Map) this.requestModel.get(this.action)).get("_id"); - } - public String getIndex() { - return ((Map) this.requestModel.get(this.action)).get("_index"); - } - - public String getAction() { - return this.action; - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/MetadataKeyAttributes.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/MetadataKeyAttributes.java deleted file mode 100644 index 674f988e85..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch_api/model/MetadataKeyAttributes.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.opensearch_api.model; - -public class MetadataKeyAttributes { - - public static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action"; - - public static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE_INDEX = "opensearch_index"; - - public static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE_ID = "opensearch_id"; - -} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIServiceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIServiceTest.java deleted file mode 100644 index b4a7c84868..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPIServiceTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import com.linecorp.armeria.server.ServiceRequestContext; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.linecorp.armeria.common.AggregatedHttpRequest; -import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.linecorp.armeria.common.HttpData; -import com.linecorp.armeria.common.HttpMethod; -import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; -import com.linecorp.armeria.common.RequestHeaders; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; -import io.micrometer.core.instrument.Timer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.Answer; -import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class OpenSearchAPIServiceTest { - private static final ObjectMapper mapper = new ObjectMapper(); - private static final int TEST_BUFFER_CAPACITY = 3; - private static final int TEST_TIMEOUT_IN_MILLIS = 500; - - @Mock - private PluginMetrics pluginMetrics; - - @Mock - private Counter requestsReceivedCounter; - - @Mock - private Counter successRequestsCounter; - - @Mock - private DistributionSummary payloadSizeSummary; - - @Mock - private Timer requestProcessDuration; - - @Mock - private ServiceRequestContext serviceRequestContext; - - private OpenSearchAPIService openSearchAPIService; - - @BeforeEach - public void setUp() throws Exception { - when(pluginMetrics.counter(openSearchAPIService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); - when(pluginMetrics.counter(openSearchAPIService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter); - when(pluginMetrics.summary(openSearchAPIService.PAYLOAD_SIZE)).thenReturn(payloadSizeSummary); - when(pluginMetrics.timer(openSearchAPIService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration); - when(serviceRequestContext.isTimedOut()).thenReturn(false); - when(requestProcessDuration.recordCallable(ArgumentMatchers.>any())).thenAnswer( - (Answer) invocation -> { - final Object[] args = invocation.getArguments(); - @SuppressWarnings("unchecked") - final Callable callable = (Callable) args[0]; - return callable.call(); - } - ); - - Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); - openSearchAPIService = new OpenSearchAPIService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics); - } - - @Test - public void testHTTPRequestSuccess() throws Exception { - // Prepare - AggregatedHttpRequest testRequest = generateRandomValidHTTPRequest(2); - - // When - AggregatedHttpResponse postResponse = openSearchAPIService.doPostBulk(serviceRequestContext, testRequest).aggregate().get(); - - // Then - assertEquals(HttpStatus.OK, postResponse.status()); - verify(requestsReceivedCounter, times(1)).increment(); - verify(successRequestsCounter, times(1)).increment(); - final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); - verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); - assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); - verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); - } - - @Test - public void testHTTPRequestBadRequest() throws Exception { - // Prepare - AggregatedHttpRequest testBadRequest = generateBadHTTPRequest(); - - // When - assertThrows(IOException.class, () -> openSearchAPIService.doPostBulk(serviceRequestContext, testBadRequest).aggregate().get()); - - // Then - verify(requestsReceivedCounter, times(1)).increment(); - verify(successRequestsCounter, never()).increment(); - final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); - verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); - assertEquals(testBadRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); - verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); - } - - @Test - public void testHTTPRequestEntityTooLarge() throws Exception { - // Prepare - AggregatedHttpRequest testTooLargeRequest = generateRandomValidHTTPRequest(TEST_BUFFER_CAPACITY + 1); - - // When - assertThrows(SizeOverflowException.class, () -> openSearchAPIService.doPostBulk(serviceRequestContext, testTooLargeRequest).aggregate().get()); - - // Then - verify(requestsReceivedCounter, times(1)).increment(); - verify(successRequestsCounter, never()).increment(); - final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); - verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); - assertEquals(testTooLargeRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); - verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); - } - - @Test - public void testHTTPRequestTimeout() throws Exception { - // Prepare - AggregatedHttpRequest populateDataRequest = generateRandomValidHTTPRequest(3); - AggregatedHttpResponse goodResponse = openSearchAPIService.doPostBulk(serviceRequestContext, populateDataRequest).aggregate().get(); - assertEquals(HttpStatus.OK, goodResponse.status()); - AggregatedHttpRequest timeoutRequest = generateRandomValidHTTPRequest(2); - - // When - assertThrows(TimeoutException.class, () -> openSearchAPIService.doPostBulk(serviceRequestContext, timeoutRequest).aggregate().get()); - - // Then - verify(requestsReceivedCounter, times(2)).increment(); - verify(successRequestsCounter, times(1)).increment(); - final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); - verify(payloadSizeSummary, times(2)).record(payloadLengthCaptor.capture()); - assertEquals(timeoutRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); - verify(requestProcessDuration, times(2)).recordCallable(ArgumentMatchers.>any()); - } - - private AggregatedHttpRequest generateRandomValidHTTPRequest(int numJson) throws JsonProcessingException, - ExecutionException, InterruptedException { - RequestHeaders requestHeaders = RequestHeaders.builder() - .contentType(MediaType.JSON) - .method(HttpMethod.POST) - .path("/log/ingest") - .build(); - List> jsonList = new ArrayList<>(); - for (int i = 0; i < numJson; i++) { - jsonList.add(Collections.singletonMap("log", UUID.randomUUID().toString())); - } - String content = mapper.writeValueAsString(jsonList); - HttpData httpData = HttpData.ofUtf8(content); - return HttpRequest.of(requestHeaders, httpData).aggregate().get(); - } - - private AggregatedHttpRequest generateBadHTTPRequest() throws ExecutionException, InterruptedException { - RequestHeaders requestHeaders = RequestHeaders.builder() - .contentType(MediaType.JSON) - .method(HttpMethod.POST) - .path("/log/ingest") - .build(); - HttpData httpData = HttpData.ofUtf8("{"); - return HttpRequest.of(requestHeaders, httpData).aggregate().get(); - } -} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceTest.java deleted file mode 100644 index f98964dbf8..0000000000 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch_api/OpenSearchAPISourceTest.java +++ /dev/null @@ -1,809 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch_api; - -import com.linecorp.armeria.client.ClientFactory; -import com.linecorp.armeria.client.ResponseTimeoutException; -import com.linecorp.armeria.client.WebClient; -import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.linecorp.armeria.common.HttpData; -import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.HttpMethod; -import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; -import com.linecorp.armeria.common.RequestHeaders; -import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import io.micrometer.core.instrument.Measurement; -import io.micrometer.core.instrument.Statistic; -import io.netty.util.AsciiString; -import org.apache.commons.io.IOUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.HttpRequestExceptionHandler; -import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; -import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; -import org.opensearch.dataprepper.http.common.LogThrottlingRejectHandler; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.metrics.MetricsTestUtil; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.CheckpointState; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.Log; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; -import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.StringJoiner; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.zip.GZIPOutputStream; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class OpenSearchAPISourceTest { - /** - * TODO: according to the new coding guideline, consider refactoring the following test cases into OpenSearchAPISourceIT. - * - testHTTPJsonResponse200() - * - testHTTPJsonResponse400() - * - testHTTPJsonResponse413() - * - testHTTPJsonResponse415() - * - testHTTPJsonResponse429() - * - testHTTPSJsonResponse() - */ - private final String PLUGIN_NAME = "http"; - private final String TEST_PIPELINE_NAME = "test_pipeline"; - private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); - private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); - - @Mock - private ServerBuilder serverBuilder; - - @Mock - private Server server; - - @Mock - private CompletableFuture completableFuture; - - private BlockingBuffer> testBuffer; - private OpenSearchAPISource OpenSearchAPISourceUnderTest; - private List requestsReceivedMeasurements; - private List successRequestsMeasurements; - private List requestTimeoutsMeasurements; - private List badRequestsMeasurements; - private List requestsTooLargeMeasurements; - private List rejectedRequestsMeasurements; - private List requestProcessDurationMeasurements; - private List payloadSizeSummaryMeasurements; - private List serverConnectionsMeasurements; - private OpenSearchAPISourceConfig sourceConfig; - private PluginMetrics pluginMetrics; - private PluginFactory pluginFactory; - private PipelineDescription pipelineDescription; - - private BlockingBuffer> getBuffer() { - final HashMap integerHashMap = new HashMap<>(); - integerHashMap.put("buffer_size", 1); - integerHashMap.put("batch_size", 1); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - return new BlockingBuffer<>(pluginSetting); - } - - /** - * This method should be invoked after {@link OpenSearchAPISource::start(Buffer buffer)} to scrape metrics - */ - private void refreshMeasurements() { - final String metricNamePrefix = new StringJoiner(MetricNames.DELIMITER) - .add(TEST_PIPELINE_NAME).add(PLUGIN_NAME).toString(); - requestsReceivedMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(OpenSearchAPIService.REQUESTS_RECEIVED).toString()); - successRequestsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(OpenSearchAPIService.SUCCESS_REQUESTS).toString()); - requestTimeoutsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(HttpRequestExceptionHandler.REQUEST_TIMEOUTS).toString()); - badRequestsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(HttpRequestExceptionHandler.BAD_REQUESTS).toString()); - requestsTooLargeMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE).toString()); - rejectedRequestsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(LogThrottlingRejectHandler.REQUESTS_REJECTED).toString()); - requestProcessDurationMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(OpenSearchAPIService.REQUEST_PROCESS_DURATION).toString()); - payloadSizeSummaryMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(OpenSearchAPIService.PAYLOAD_SIZE).toString()); - serverConnectionsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) - .add(OpenSearchAPISource.SERVER_CONNECTIONS).toString()); - } - - private byte[] createGZipCompressedPayload(final String payload) throws IOException { - // Create a GZip compressed request body - final ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - try (final GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) { - gzipStream.write(payload.getBytes(StandardCharsets.UTF_8)); - } - return byteStream.toByteArray(); - } - - @BeforeEach - public void setUp() { - lenient().when(serverBuilder.annotatedService(any())).thenReturn(serverBuilder); - lenient().when(serverBuilder.http(anyInt())).thenReturn(serverBuilder); - lenient().when(serverBuilder.https(anyInt())).thenReturn(serverBuilder); - lenient().when(serverBuilder.build()).thenReturn(server); - lenient().when(server.start()).thenReturn(completableFuture); - - sourceConfig = mock(OpenSearchAPISourceConfig.class); - lenient().when(sourceConfig.getPort()).thenReturn(2021); - lenient().when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_LOG_INGEST_URI); - lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000); - lenient().when(sourceConfig.getThreadCount()).thenReturn(200); - lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500); - lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); - lenient().when(sourceConfig.hasHealthCheckService()).thenReturn(true); - lenient().when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); - - MetricsTestUtil.initMetrics(); - pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); - - pluginFactory = mock(PluginFactory.class); - final ArmeriaHttpAuthenticationProvider authenticationProvider = new HttpBasicArmeriaHttpAuthenticationProvider(new HttpBasicAuthenticationConfig("test", "test")); - when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))) - .thenReturn(authenticationProvider); - - testBuffer = getBuffer(); - pipelineDescription = mock(PipelineDescription.class); - when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - } - - @AfterEach - public void cleanUp() { - if (OpenSearchAPISourceUnderTest != null) { - OpenSearchAPISourceUnderTest.stop(); - } - } - - private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse response, final HttpStatus expectedStatus) { - assertThat("Http Status", response.status(), equalTo(expectedStatus)); - - final List headerKeys = response.headers() - .stream() - .map(Map.Entry::getKey) - .map(AsciiString::toString) - .collect(Collectors.toList()); - assertThat("Response Header Keys", headerKeys, not(contains("server"))); - } - - @Test - public void testHTTPJsonResponse200() { - // Prepare - final String testData = "[{\"log\": \"somelog\"}]"; - final int testPayloadSize = testData.getBytes().length; - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - - // Then - Assertions.assertFalse(testBuffer.isEmpty()); - - final Map.Entry>, CheckpointState> result = testBuffer.read(100); - List> records = new ArrayList<>(result.getKey()); - Assertions.assertEquals(1, records.size()); - final Record record = records.get(0); - Assertions.assertEquals("somelog", record.getData().get("log", String.class)); - // Verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestReceivedCount.getValue()); - final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( - successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, successRequestsCount.getValue()); - final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); - final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.MAX); - Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); - final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( - payloadSizeSummaryMeasurements, Statistic.MAX); - Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); - } - - @Test - public void testHttpCompressionResponse200() throws IOException { - // Prepare - final String testData = "[{\"log\": \"somelog\"}]"; - when(sourceConfig.getCompression()).thenReturn(CompressionOption.GZIP); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .add(HttpHeaderNames.CONTENT_ENCODING, "gzip") - .build(), - createGZipCompressedPayload(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - - // Then - Assertions.assertFalse(testBuffer.isEmpty()); - - final Map.Entry>, CheckpointState> result = testBuffer.read(100); - List> records = new ArrayList<>(result.getKey()); - Assertions.assertEquals(1, records.size()); - final Record record = records.get(0); - Assertions.assertEquals("somelog", record.getData().get("log", String.class)); - // Verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestReceivedCount.getValue()); - final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( - successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, successRequestsCount.getValue()); - final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); - final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.MAX); - Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); - } - - @Test - public void testHealthCheck() { - // Prepare - OpenSearchAPISourceUnderTest.start(testBuffer); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.GET) - .path("/health") - .build()) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - } - - @Test - public void testHealthCheckUnauthenticatedDisabled() { - // Prepare - when(sourceConfig.isUnauthenticatedHealthCheck()).thenReturn(false); - when(sourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", - Map.of( - "username", "test", - "password", "test" - ))); - pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - - OpenSearchAPISourceUnderTest.start(testBuffer); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.GET) - .path("/health") - .build()) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.UNAUTHORIZED)).join(); - } - - @Test - public void testHTTPJsonResponse400() { - // Prepare - final String testBadData = "}"; - final int testPayloadSize = testBadData.getBytes().length; - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testBadData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.BAD_REQUEST)).join(); - - // Then - Assertions.assertTrue(testBuffer.isEmpty()); - // Verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestReceivedCount.getValue()); - final Measurement badRequestsCount = MetricsTestUtil.getMeasurementFromList( - badRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, badRequestsCount.getValue()); - } - - @Test - public void testHTTPJsonResponse413() throws InterruptedException { - // Prepare - final String testData = "[{\"log\": \"test log 1\"}, {\"log\": \"test log 2\"}]"; - final int testPayloadSize = testData.getBytes().length; - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); - - // Then - Assertions.assertTrue(testBuffer.isEmpty()); - // Verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestReceivedCount.getValue()); - final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( - successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(0.0, successRequestsCount.getValue()); - final Measurement requestsTooLargeCount = MetricsTestUtil.getMeasurementFromList( - requestsTooLargeMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestsTooLargeCount.getValue()); - final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); - final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.MAX); - Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); - final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( - payloadSizeSummaryMeasurements, Statistic.MAX); - Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); - } - - @Test - public void testHTTPJsonResponse408() { - // Prepare - final int testMaxPendingRequests = 1; - final int testThreadCount = 1; - final int serverTimeoutInMillis = 500; - final int bufferTimeoutInMillis = 400; - when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis); - when(sourceConfig.getBufferTimeoutInMillis()).thenReturn(bufferTimeoutInMillis); - when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests); - when(sourceConfig.getThreadCount()).thenReturn(testThreadCount); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - // Start the source - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(); - final HttpData testHttpData = HttpData.ofUtf8("[{\"log\": \"somelog\"}]"); - - // Fill in the buffer - WebClient.of().execute(testRequestHeaders, testHttpData).aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - - // Disable client timeout - WebClient testWebClient = WebClient.builder().responseTimeoutMillis(0).build(); - - // When/Then - testWebClient.execute(testRequestHeaders, testHttpData) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_TIMEOUT)).join(); - // verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(2.0, requestReceivedCount.getValue()); - final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( - successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, successRequestsCount.getValue()); - final Measurement requestTimeoutsCount = MetricsTestUtil.getMeasurementFromList( - requestTimeoutsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestTimeoutsCount.getValue()); - final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( - requestProcessDurationMeasurements, Statistic.MAX); - final double maxDurationInMillis = 1000 * requestProcessDurationMax.getValue(); - Assertions.assertTrue(maxDurationInMillis > bufferTimeoutInMillis); - } - - @Test - public void testHTTPJsonResponse429() throws InterruptedException { - // Prepare - final int testMaxPendingRequests = 1; - final int testThreadCount = 1; - final int clientTimeoutInMillis = 100; - final int serverTimeoutInMillis = (testMaxPendingRequests + testThreadCount + 1) * clientTimeoutInMillis; - final Random rand = new Random(); - final double randomFactor = rand.nextDouble() + 1.5; - final int requestTimeoutInMillis = (int)(serverTimeoutInMillis * randomFactor); - when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(requestTimeoutInMillis); - when(sourceConfig.getBufferTimeoutInMillis()).thenReturn(serverTimeoutInMillis); - when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests); - when(sourceConfig.getThreadCount()).thenReturn(testThreadCount); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - // Start the source - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(); - final HttpData testHttpData = HttpData.ofUtf8("[{\"log\": \"somelog\"}]"); - - // Fill in the buffer - WebClient.of().execute(testRequestHeaders, testHttpData).aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - - // Send requests to throttle the server when buffer is full - // Set the client timeout to be less than source serverTimeoutInMillis / (testMaxPendingRequests + testThreadCount) - WebClient testWebClient = WebClient.builder().responseTimeoutMillis(clientTimeoutInMillis).build(); - for (int i = 0; i < testMaxPendingRequests + testThreadCount; i++) { - CompletionException actualException = Assertions.assertThrows( - CompletionException.class, () -> testWebClient.execute(testRequestHeaders, testHttpData).aggregate().join()); - assertThat(actualException.getCause(), instanceOf(ResponseTimeoutException.class)); - } - - // When/Then - testWebClient.execute(testRequestHeaders, testHttpData).aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.TOO_MANY_REQUESTS)).join(); - - // Wait until source server timeout a request processing thread - Thread.sleep(serverTimeoutInMillis); - // New request should timeout instead of being rejected - CompletionException actualException = Assertions.assertThrows( - CompletionException.class, () -> testWebClient.execute(testRequestHeaders, testHttpData).aggregate().join()); - assertThat(actualException.getCause(), instanceOf(ResponseTimeoutException.class)); - // verify metrics - final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( - requestsReceivedMeasurements, Statistic.COUNT); - Assertions.assertEquals(testMaxPendingRequests + testThreadCount + 2, requestReceivedCount.getValue()); - final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( - successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, successRequestsCount.getValue()); - final Measurement rejectedRequestsCount = MetricsTestUtil.getMeasurementFromList( - rejectedRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, rejectedRequestsCount.getValue()); - } - - @Test - public void testServerConnectionsMetric() throws InterruptedException { - // Prepare - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // Verify connections metric value is 0 - Measurement serverConnectionsMeasurement = MetricsTestUtil.getMeasurementFromList(serverConnectionsMeasurements, Statistic.VALUE); - Assertions.assertEquals(0, serverConnectionsMeasurement.getValue()); - - final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(); - final HttpData testHttpData = HttpData.ofUtf8("[{\"log\": \"somelog\"}]"); - - // Send request - WebClient.of().execute(testRequestHeaders, testHttpData).aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - - // Verify connections metric value is 1 - serverConnectionsMeasurement = MetricsTestUtil.getMeasurementFromList(serverConnectionsMeasurements, Statistic.VALUE); - Assertions.assertEquals(1.0, serverConnectionsMeasurement.getValue()); - } - - @Test - public void testServerStartCertFileSuccess() throws IOException { - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - when(server.stop()).thenReturn(completableFuture); - - final Path certFilePath = new File(TEST_SSL_CERTIFICATE_FILE).toPath(); - final Path keyFilePath = new File(TEST_SSL_KEY_FILE).toPath(); - final String certAsString = Files.readString(certFilePath); - final String keyAsString = Files.readString(keyFilePath); - - when(sourceConfig.isSsl()).thenReturn(true); - when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); - when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - OpenSearchAPISourceUnderTest.start(testBuffer); - OpenSearchAPISourceUnderTest.stop(); - - final ArgumentCaptor certificateIs = ArgumentCaptor.forClass(InputStream.class); - final ArgumentCaptor privateKeyIs = ArgumentCaptor.forClass(InputStream.class); - verify(serverBuilder).tls(certificateIs.capture(), privateKeyIs.capture()); - final String actualCertificate = IOUtils.toString(certificateIs.getValue(), StandardCharsets.UTF_8.name()); - final String actualPrivateKey = IOUtils.toString(privateKeyIs.getValue(), StandardCharsets.UTF_8.name()); - assertThat(actualCertificate, is(certAsString)); - assertThat(actualPrivateKey, is(keyAsString)); - } - } - - @Test - void testHTTPSJsonResponse() { - reset(sourceConfig); - when(sourceConfig.getPort()).thenReturn(2021); - when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_LOG_INGEST_URI); - when(sourceConfig.getThreadCount()).thenReturn(200); - when(sourceConfig.getMaxConnectionCount()).thenReturn(500); - when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); - when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200); - when(sourceConfig.isSsl()).thenReturn(true); - when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); - when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - - testBuffer = getBuffer(); - OpenSearchAPISourceUnderTest.start(testBuffer); - - WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTPS) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8("[{\"log\": \"somelog\"}]")) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - } - - @Test - void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() { - reset(sourceConfig); - when(sourceConfig.getPort()).thenReturn(2021); - when(sourceConfig.getPath()).thenReturn("/${pipelineName}/test"); - when(sourceConfig.getThreadCount()).thenReturn(200); - when(sourceConfig.getMaxConnectionCount()).thenReturn(500); - when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); - when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200); - when(sourceConfig.isSsl()).thenReturn(true); - - when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); - when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - - testBuffer = getBuffer(); - OpenSearchAPISourceUnderTest.start(testBuffer); - - final String path = "/" + TEST_PIPELINE_NAME + "/test"; - - WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTPS) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path(path) - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8("[{\"log\": \"somelog\"}]")) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - } - - @Test - public void testDoubleStart() { - // starting server - OpenSearchAPISourceUnderTest.start(testBuffer); - // double start server - Assertions.assertThrows(IllegalStateException.class, () -> OpenSearchAPISourceUnderTest.start(testBuffer)); - } - - @Test - public void testStartWithEmptyBuffer() { - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - Assertions.assertThrows(IllegalStateException.class, () -> source.start(null)); - } - - @Test - public void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - when(completableFuture.get()).thenThrow(new ExecutionException("", null)); - - // When/Then - Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); - } - } - - @Test - public void testStartWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - final NullPointerException expCause = new NullPointerException(); - when(completableFuture.get()).thenThrow(new ExecutionException("", expCause)); - - // When/Then - final RuntimeException ex = Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); - Assertions.assertEquals(expCause, ex); - } - } - - @Test - public void testStartWithInterruptedException() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - when(completableFuture.get()).thenThrow(new InterruptedException()); - - // When/Then - Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); - Assertions.assertTrue(Thread.interrupted()); - } - } - - @Test - public void testStopWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - source.start(testBuffer); - when(server.stop()).thenReturn(completableFuture); - - // When/Then - when(completableFuture.get()).thenThrow(new ExecutionException("", null)); - Assertions.assertThrows(RuntimeException.class, source::stop); - } - } - - @Test - public void testStopWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - source.start(testBuffer); - when(server.stop()).thenReturn(completableFuture); - final NullPointerException expCause = new NullPointerException(); - when(completableFuture.get()).thenThrow(new ExecutionException("", expCause)); - - // When/Then - final RuntimeException ex = Assertions.assertThrows(RuntimeException.class, source::stop); - Assertions.assertEquals(expCause, ex); - } - } - - @Test - public void testStopWithInterruptedException() throws ExecutionException, InterruptedException { - // Prepare - final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { - armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); - source.start(testBuffer); - when(server.stop()).thenReturn(completableFuture); - when(completableFuture.get()).thenThrow(new InterruptedException()); - - // When/Then - Assertions.assertThrows(RuntimeException.class, source::stop); - Assertions.assertTrue(Thread.interrupted()); - } - } - - @Test - public void testRunAnotherSourceWithSamePort() { - // starting server - OpenSearchAPISourceUnderTest.start(testBuffer); - - final OpenSearchAPISource secondSource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - //Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException - Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer)); - } - - @Test - public void request_that_exceeds_maxRequestLength_returns_413() { - lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); - OpenSearchAPISourceUnderTest = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - // Prepare - final String testData = "[{\"log\": \"somelog\"}]"; - - assertThat((long) testData.getBytes().length, greaterThan(sourceConfig.getMaxRequestLength().getBytes())); - OpenSearchAPISourceUnderTest.start(testBuffer); - refreshMeasurements(); - - // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority("127.0.0.1:2021") - .method(HttpMethod.POST) - .path("/log/ingest") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); - - // Then - Assertions.assertTrue(testBuffer.isEmpty()); - } - -}