From 2180a69327e2a63fc8e8c84f375b1e4aeb457827 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Tue, 4 Jun 2024 19:49:07 -0400 Subject: [PATCH] Refactor http source functionality for supporting a new OpenSearch API source in DataPrepper (#4570) Refactor http source configuration to a separate http source common package. Signed-off-by: Souvik Bose --- .../http-source-common/build.gradle | 29 ++ .../http/BaseHttpServerConfig.java | 260 +++++++++++++ .../dataprepper/http/HttpServerConfig.java | 72 ++++ .../http}/LogThrottlingRejectHandler.java | 2 +- .../http}/LogThrottlingStrategy.java | 2 +- .../CertificateProviderFactory.java | 32 +- .../dataprepper/http}/codec/Codec.java | 2 +- .../dataprepper/http}/codec/JsonCodec.java | 2 +- .../http/BaseHttpServerConfigTest.java | 368 ++++++++++++++++++ .../http}/LogThrottlingRejectHandlerTest.java | 2 +- .../http}/LogThrottlingStrategyTest.java | 2 +- .../CertificateProviderFactoryTest.java | 34 +- .../http}/codec/JsonCodecTest.java | 2 +- .../org.mockito.plugins.MockMaker | 3 + .../src/test/resources/test_cert.crt | 14 + .../src/test/resources/test_decrypted_key.key | 15 + data-prepper-plugins/http-source/build.gradle | 4 +- .../plugins/source/loghttp/HTTPSource.java | 7 +- .../source/loghttp/HTTPSourceConfig.java | 222 +---------- .../source/loghttp/LogHTTPService.java | 2 +- .../source/loghttp/HTTPSourceConfigTest.java | 272 +------------ .../source/loghttp/HTTPSourceTest.java | 1 + settings.gradle | 1 + 23 files changed, 823 insertions(+), 527 deletions(-) create mode 100644 data-prepper-plugins/http-source-common/build.gradle create mode 100644 data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpServerConfig.java create mode 100644 data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/HttpServerConfig.java rename data-prepper-plugins/{http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/main/java/org/opensearch/dataprepper/http}/LogThrottlingRejectHandler.java (96%) rename data-prepper-plugins/{http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/main/java/org/opensearch/dataprepper/http}/LogThrottlingStrategy.java (97%) rename data-prepper-plugins/{http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/main/java/org/opensearch/dataprepper/http}/certificate/CertificateProviderFactory.java (81%) rename data-prepper-plugins/{http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/main/java/org/opensearch/dataprepper/http}/codec/Codec.java (90%) rename data-prepper-plugins/{http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/main/java/org/opensearch/dataprepper/http}/codec/JsonCodec.java (97%) create mode 100644 data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/BaseHttpServerConfigTest.java rename data-prepper-plugins/{http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/test/java/org/opensearch/dataprepper/http}/LogThrottlingRejectHandlerTest.java (97%) rename data-prepper-plugins/{http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/test/java/org/opensearch/dataprepper/http}/LogThrottlingStrategyTest.java (97%) rename data-prepper-plugins/{http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/test/java/org/opensearch/dataprepper/http}/certificate/CertificateProviderFactoryTest.java (70%) rename data-prepper-plugins/{http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp => http-source-common/src/test/java/org/opensearch/dataprepper/http}/codec/JsonCodecTest.java (97%) create mode 100644 data-prepper-plugins/http-source-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 data-prepper-plugins/http-source-common/src/test/resources/test_cert.crt create mode 100644 data-prepper-plugins/http-source-common/src/test/resources/test_decrypted_key.key diff --git a/data-prepper-plugins/http-source-common/build.gradle b/data-prepper-plugins/http-source-common/build.gradle new file mode 100644 index 0000000000..49b282a1f2 --- /dev/null +++ b/data-prepper-plugins/http-source-common/build.gradle @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-plugins:common') + implementation libs.armeria.core + implementation 'software.amazon.awssdk:acm' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:apache-client' + testImplementation 'org.assertj:assertj-core:3.25.3' + testImplementation project(':data-prepper-test-common') +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.90 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpServerConfig.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpServerConfig.java new file mode 100644 index 0000000000..8ad972379a --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpServerConfig.java @@ -0,0 +1,260 @@ +package org.opensearch.dataprepper.http; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.micrometer.core.instrument.util.StringUtils; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; + +/** + * BaseHttpServerConfig class holds the common Http related configurations defined in the customer's source configuration along with default set of configuration values. +*/ +public class BaseHttpServerConfig implements HttpServerConfig { + static final String COMPRESSION = "compression"; + static final String SSL = "ssl"; + static final String SSL_CERTIFICATE_FILE = "ssl_certificate_file"; + static final String SSL_KEY_FILE = "ssl_key_file"; + static final boolean DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL = false; + static final int DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS = 120000; + static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; + static final double BUFFER_TIMEOUT_FRACTION = 0.8; + static final int DEFAULT_THREAD_COUNT = 200; + static final int DEFAULT_MAX_CONNECTION_COUNT = 500; + static final int DEFAULT_MAX_PENDING_REQUESTS = 1024; + static final boolean DEFAULT_HEALTH_CHECK = false; + static final String HEALTH_CHECK_SERVICE = "health_check_service"; + static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check"; + static final String S3_PREFIX = "s3://"; + + @JsonProperty("port") + @Min(0) + @Max(65535) + private int port = getDefaultPort(); + + @Override + public int getDefaultPort() { + return 0; + } + + @JsonProperty("path") + @Size(min = 1, message = "path length should be at least 1") + private String path = getDefaultPath(); + + @Override + public String getDefaultPath() { + return ""; + } + + @JsonProperty("request_timeout") + @Min(2) + private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS; + + @JsonProperty("thread_count") + @Min(0) + private int threadCount = DEFAULT_THREAD_COUNT; + + @JsonProperty("max_connection_count") + @Min(0) + private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT; + + @JsonProperty("max_pending_requests") + @Min(0) + private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS; + + @JsonProperty(SSL) + private boolean ssl; + + @JsonProperty(SSL_CERTIFICATE_FILE) + private String sslCertificateFile; + + @JsonProperty(SSL_KEY_FILE) + private String sslKeyFile; + + @JsonProperty("use_acm_certificate_for_ssl") + private boolean useAcmCertificateForSsl = DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL; + + @JsonProperty("acm_certificate_arn") + private String acmCertificateArn; + + @JsonProperty("acm_private_key_password") + private String acmPrivateKeyPassword; + + @JsonProperty("acm_certificate_timeout_millis") + @Min(0) + private Integer acmCertificateTimeoutMillis = DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS; + + @JsonProperty("aws_region") + private String awsRegion; + + @JsonProperty(HEALTH_CHECK_SERVICE) + private boolean healthCheckService = DEFAULT_HEALTH_CHECK; + + @JsonProperty(UNAUTHENTICATED_HEALTH_CHECK) + private boolean unauthenticatedHealthCheck = false; + + @JsonProperty("max_request_length") + private ByteCount maxRequestLength; + + private PluginModel authentication; + + @JsonProperty(COMPRESSION) + private CompressionOption compression = CompressionOption.NONE; + + @Override + @AssertTrue(message = "path should start with /") + public boolean isPathValid() { + return path.startsWith("/"); + } + + @Override + public int getPort() { + return port; + } + + @Override + public String getPath() { + return path; + } + + @Override + public CompressionOption getCompression() { + return compression; + } + + @Override + public boolean isSslCertAndKeyFileInS3() { + return ssl && sslCertificateFile.toLowerCase().startsWith(S3_PREFIX) && + sslKeyFile.toLowerCase().startsWith(S3_PREFIX); + } + + @AssertTrue(message = "ssl_certificate_file cannot be a empty or null when ssl is enabled") + @Override + public boolean isSslCertificateFileValid() { + if (ssl && !useAcmCertificateForSsl) { + return StringUtils.isNotEmpty(sslCertificateFile); + } + else { + return true; + } + } + + @AssertTrue(message = "ssl_key_file cannot be a empty or null when ssl is enabled") + @Override + public boolean isSslKeyFileValid() { + if (ssl && !useAcmCertificateForSsl) { + return StringUtils.isNotEmpty(sslKeyFile); + } + else { + return true; + } + } + + @AssertTrue(message = "acm_certificate_arn cannot be a empty or null when ACM is used for ssl") + @Override + public boolean isAcmCertificateArnValid() { + if (ssl && useAcmCertificateForSsl) { + return StringUtils.isNotEmpty(acmCertificateArn); + } + else { + return true; + } + } + + @AssertTrue(message = "aws_region cannot be a empty or null when ACM / S3 is used for ssl") + @Override + public boolean isAwsRegionValid() { + if (ssl && (useAcmCertificateForSsl || isSslCertAndKeyFileInS3())) { + return StringUtils.isNotEmpty(awsRegion); + } + return true; + } + + @Override + public int getRequestTimeoutInMillis() { + return requestTimeoutInMillis; + } + + @Override + public int getBufferTimeoutInMillis() { + return (int)(BUFFER_TIMEOUT_FRACTION * requestTimeoutInMillis); + } + + @Override + public int getThreadCount() { + return threadCount; + } + + @Override + public int getMaxConnectionCount() { + return maxConnectionCount; + } + + @Override + public int getMaxPendingRequests() { + return maxPendingRequests; + } + + @Override + public boolean isSsl() { + return ssl; + } + + @Override + public String getSslCertificateFile() { + return sslCertificateFile; + } + + @Override + public String getSslKeyFile() { + return sslKeyFile; + } + + @Override + public boolean isUseAcmCertificateForSsl() { + return useAcmCertificateForSsl; + } + + @Override + public String getAcmCertificateArn() { + return acmCertificateArn; + } + + @Override + public String getAcmPrivateKeyPassword() { + return acmPrivateKeyPassword; + } + + @Override + public int getAcmCertificateTimeoutMillis() { + return acmCertificateTimeoutMillis; + } + + @Override + public String getAwsRegion() { + return awsRegion; + } + + @Override + public PluginModel getAuthentication() { + return authentication; + } + + @Override + public boolean hasHealthCheckService() { + return healthCheckService; + } + + @Override + public boolean isUnauthenticatedHealthCheck() { + return unauthenticatedHealthCheck; + } + + @Override + public ByteCount getMaxRequestLength() { + return maxRequestLength; + } +} diff --git a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/HttpServerConfig.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/HttpServerConfig.java new file mode 100644 index 0000000000..f62c116631 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/HttpServerConfig.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.http; + +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; + +/** + * HttpServerConfig is an interface for the Http based source configurations to be shared across different types of Http based sources +*/ +public interface HttpServerConfig { + + int getDefaultPort(); + + String getDefaultPath(); + + boolean isPathValid(); + + int getPort(); + + String getPath(); + + CompressionOption getCompression(); + + boolean isSslCertAndKeyFileInS3(); + + boolean isSslCertificateFileValid(); + + boolean isSslKeyFileValid(); + + boolean isAcmCertificateArnValid(); + + boolean isAwsRegionValid(); + + int getRequestTimeoutInMillis(); + + int getBufferTimeoutInMillis(); + + int getThreadCount(); + + int getMaxConnectionCount(); + + int getMaxPendingRequests(); + + boolean isSsl(); + + String getSslCertificateFile(); + + String getSslKeyFile(); + + boolean isUseAcmCertificateForSsl(); + + String getAcmCertificateArn(); + + String getAcmPrivateKeyPassword(); + + int getAcmCertificateTimeoutMillis(); + + String getAwsRegion(); + + PluginModel getAuthentication(); + + boolean hasHealthCheckService(); + + boolean isUnauthenticatedHealthCheck(); + + ByteCount getMaxRequestLength(); +} diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandler.java similarity index 96% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java rename to data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandler.java index 2ae46a837f..2df60da3d2 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandler.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.http; import org.opensearch.dataprepper.metrics.PluginMetrics; import com.linecorp.armeria.common.HttpRequest; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategy.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingStrategy.java similarity index 97% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategy.java rename to data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingStrategy.java index bceb7c41a3..32782034a4 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategy.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/LogThrottlingStrategy.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.http; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.util.UnmodifiableFuture; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactory.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactory.java similarity index 81% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactory.java rename to data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactory.java index a546766a49..49adaca1ac 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactory.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp.certificate; +package org.opensearch.dataprepper.http.certificate; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; @@ -11,7 +11,7 @@ import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider; import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider; import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; -import org.opensearch.dataprepper.plugins.source.loghttp.HTTPSourceConfig; +import org.opensearch.dataprepper.http.HttpServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -36,13 +36,13 @@ public class CertificateProviderFactory { private static final long ACM_CLIENT_BASE_BACKOFF_MILLIS = 1000l; private static final long ACM_CLIENT_MAX_BACKOFF_MILLIS = 60000l; - final HTTPSourceConfig httpSourceConfig; - public CertificateProviderFactory(final HTTPSourceConfig httpSourceConfig) { - this.httpSourceConfig = httpSourceConfig; + final HttpServerConfig httpServerConfig; + public CertificateProviderFactory(final HttpServerConfig httpServerConfig) { + this.httpServerConfig = httpServerConfig; } public CertificateProvider getCertificateProvider() { - if (httpSourceConfig.isUseAcmCertificateForSsl()) { + if (httpServerConfig.isUseAcmCertificateForSsl()) { LOG.info("Using ACM certificate and private key for SSL/TLS."); final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() .addCredentialsProvider(DefaultCredentialsProvider.create()) @@ -65,7 +65,7 @@ public CertificateProvider getCertificateProvider() { final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); final AcmClient awsCertificateManager = AcmClient.builder() - .region(Region.of(httpSourceConfig.getAwsRegion())) + .region(Region.of(httpServerConfig.getAwsRegion())) .credentialsProvider(credentialsProvider) .overrideConfiguration(clientConfig) .httpClientBuilder(ApacheHttpClient.builder()) @@ -73,30 +73,30 @@ public CertificateProvider getCertificateProvider() { .build(); return new ACMCertificateProvider(awsCertificateManager, - httpSourceConfig.getAcmCertificateArn(), - httpSourceConfig.getAcmCertificateTimeoutMillis(), - httpSourceConfig.getAcmPrivateKeyPassword()); - } else if (httpSourceConfig.isSslCertAndKeyFileInS3()) { + httpServerConfig.getAcmCertificateArn(), + httpServerConfig.getAcmCertificateTimeoutMillis(), + httpServerConfig.getAcmPrivateKeyPassword()); + } else if (httpServerConfig.isSslCertAndKeyFileInS3()) { LOG.info("Using S3 to fetch certificate and private key for SSL/TLS."); final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() .addCredentialsProvider(DefaultCredentialsProvider.create()).build(); final S3Client s3Client = S3Client.builder() - .region(Region.of(httpSourceConfig.getAwsRegion())) + .region(Region.of(httpServerConfig.getAwsRegion())) .credentialsProvider(credentialsProvider) .httpClientBuilder(ApacheHttpClient.builder()) .build(); return new S3CertificateProvider( s3Client, - httpSourceConfig.getSslCertificateFile(), - httpSourceConfig.getSslKeyFile() + httpServerConfig.getSslCertificateFile(), + httpServerConfig.getSslKeyFile() ); } else { LOG.info("Using local file system to get certificate and private key for SSL/TLS."); return new FileCertificateProvider( - httpSourceConfig.getSslCertificateFile(), - httpSourceConfig.getSslKeyFile() + httpServerConfig.getSslCertificateFile(), + httpServerConfig.getSslKeyFile() ); } } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/Codec.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/Codec.java similarity index 90% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/Codec.java rename to data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/Codec.java index 709727bada..3a71abbd3d 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/Codec.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/Codec.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp.codec; +package org.opensearch.dataprepper.http.codec; import com.linecorp.armeria.common.HttpData; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java similarity index 97% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java rename to data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java index b00fc121a2..fc25193a9d 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp.codec; +package org.opensearch.dataprepper.http.codec; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/BaseHttpServerConfigTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/BaseHttpServerConfigTest.java new file mode 100644 index 0000000000..d80f107807 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/BaseHttpServerConfigTest.java @@ -0,0 +1,368 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.dataprepper.http.BaseHttpServerConfig.S3_PREFIX; + +public class BaseHttpServerConfigTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String PLUGIN_NAME = "http"; + private static final String USERNAME = "test_user"; + private static final String PASSWORD = "test_password"; + + private static Stream provideCompressionOption() { + return Stream.of(Arguments.of(CompressionOption.GZIP)); + } + + @Test + void testDefault() { + // Prepare + final HttpServerConfig sourceConfig = new BaseHttpServerConfig(); + + // When/Then + assertEquals(BaseHttpServerConfig.DEFAULT_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis()); + assertEquals(BaseHttpServerConfig.DEFAULT_THREAD_COUNT, sourceConfig.getThreadCount()); + assertEquals(BaseHttpServerConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount()); + assertEquals(BaseHttpServerConfig.DEFAULT_MAX_PENDING_REQUESTS, sourceConfig.getMaxPendingRequests()); + assertEquals(BaseHttpServerConfig.DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL, sourceConfig.isUseAcmCertificateForSsl()); + assertEquals(BaseHttpServerConfig.DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS, sourceConfig.getAcmCertificateTimeoutMillis()); + assertEquals((int)(BaseHttpServerConfig.DEFAULT_REQUEST_TIMEOUT_MS * BaseHttpServerConfig.BUFFER_TIMEOUT_FRACTION), + sourceConfig.getBufferTimeoutInMillis()); + } + + @Test + void getPath_should_return_correct_path() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "path", "/my/custom/path"); + + assertThat(objectUnderTest.isPathValid(), equalTo(true)); + assertThat(objectUnderTest.getPath(), equalTo("/my/custom/path")); + } + + @Test + void isPathValid_should_return_false_for_invalid_path() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "path", "my/custom/path"); + + assertThat(objectUnderTest.isPathValid(), equalTo(false)); + } + + @Test + void testValidPort() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "port", 2021); + + assertThat(objectUnderTest.getPort(), equalTo(2021)); + } + + @Test + void testValidAWSRegion() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "awsRegion", "us-east-1"); + + assertThat(objectUnderTest.getAwsRegion(), equalTo("us-east-1")); + } + + @Test + void testMaxRequestLength() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "maxRequestLength", ByteCount.ofBytes(4)); + + assertThat(objectUnderTest.getMaxRequestLength(), equalTo(ByteCount.ofBytes(4))); + } + + @Test + void testHealthCheckService() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "healthCheckService", true); + + assertEquals(objectUnderTest.hasHealthCheckService(), true); + } + + @Test + void testUnauthenticatedHealthCheck() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "unauthenticatedHealthCheck", true); + + assertThat(objectUnderTest.isUnauthenticatedHealthCheck(), equalTo(true)); + } + + @ParameterizedTest + @MethodSource("provideCompressionOption") + void testValidCompression(final CompressionOption compressionOption) { + // Prepare + final Map settings = new HashMap<>(); + settings.put(BaseHttpServerConfig.COMPRESSION, compressionOption.name()); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); + final BaseHttpServerConfig httpSourceConfig = OBJECT_MAPPER.convertValue( + pluginSetting.getSettings(), BaseHttpServerConfig.class); + + // When/Then + assertEquals(compressionOption, httpSourceConfig.getCompression()); + } + + @Test + void testAuthentication() throws NoSuchFieldException, IllegalAccessException { + PluginModel authentication = new PluginModel("http_basic", + Map.of( + "username", USERNAME, + "password", PASSWORD + )); + + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "authentication", authentication); + + assertThat(objectUnderTest.getAuthentication(), equalTo(authentication)); + } + + @Nested + class SslValidationWithFile { + @Test + void isSslCertificateFileValidation_should_return_true_if_ssl_is_false() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", false); + + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSsl(), equalTo(false)); + } + + @Test + void isSslCertificateFileValidation_should_return_false_if_ssl_is_true_and_sslCertificateFile_is_null() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(false)); + assertThat(objectUnderTest.isSsl(), equalTo(true)); + } + + @Test + void isSslCertificateFileValidation_should_return_true_if_ssl_is_true_and_sslCertificateFile_is_a_valid_file() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + + final String sslCertificateFile = UUID.randomUUID().toString(); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", sslCertificateFile); + + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.getSslCertificateFile(), equalTo(sslCertificateFile)); + } + + @Test + void isSslKeyFileValidation_should_return_true_if_ssl_is_false() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", false); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + } + + @Test + void isSslKeyFileValidation_should_return_false_if_ssl_is_true_and_sslKeyFile_is_null() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(false)); + } + + @Test + void isSslKeyFileValidation_should_return_true_if_ssl_is_true_and_sslKeyFile_is_a_valid_file() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + + final String sslKeyFile = UUID.randomUUID().toString(); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", sslKeyFile); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.getSslKeyFile(), equalTo(sslKeyFile)); + } + + } + + @Nested + class SslValidationWithS3 { + @Test + void isSslCertAndKeyFileInS3_should_return_true_if_ssl_is_true_and_KeyFile_and_certFile_are_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", getS3FilePath()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); + } + + @Test + void isSslCertAndKeyFileInS3_should_return_false_if_ssl_is_true_and_KeyFile_and_certFile_are_not_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", UUID.randomUUID().toString()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", UUID.randomUUID().toString()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(false)); + } + + @Test + void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_null_without_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", UUID.randomUUID().toString()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", UUID.randomUUID().toString()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(false)); + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); + } + + @Test + void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", getS3FilePath()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(false)); + } + + @Test + void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_not_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "awsRegion", UUID.randomUUID().toString()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); + } + + @Test + void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_not_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslCertificateFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "sslKeyFile", getS3FilePath()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "awsRegion", UUID.randomUUID().toString()); + + assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); + assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); + } + } + + @Nested + class SslValidationWithAcm { + @Test + void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_null_with_acm() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "useAcmCertificateForSsl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "acmCertificateArn", "acm-certificate-arn"); + + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(false)); + assertThat(objectUnderTest.getAcmCertificateArn(), equalTo("acm-certificate-arn")); + } + + @Test + void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_not_null_with_acm() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "useAcmCertificateForSsl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "awsRegion", UUID.randomUUID().toString()); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "acmCertificateArn", "acm-certificate-arn"); + + assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); + assertThat(objectUnderTest.getAcmCertificateArn(), equalTo("acm-certificate-arn")); + } + + @Test + void isAcmCertificateArnValid_should_return_false_if_ssl_is_true_and_acm_is_true_and_arn_is_null() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "useAcmCertificateForSsl", true); + + assertThat(objectUnderTest.isAcmCertificateArnValid(), equalTo(false)); + } + + @Test + void isAcmCertificateArnValid_should_return_true_if_ssl_is_false_and_acm_is_true_and_arn_is_null() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", false); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "useAcmCertificateForSsl", true); + + assertThat(objectUnderTest.isAcmCertificateArnValid(), equalTo(true)); + } + + @Test + void isAcmCertificateArnValid_should_return_true_if_ssl_is_true_and_acm_is_true_and_arn_is_not_null() throws NoSuchFieldException, IllegalAccessException { + final BaseHttpServerConfig objectUnderTest = new BaseHttpServerConfig(); + + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "ssl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "useAcmCertificateForSsl", true); + ReflectivelySetField.setField(BaseHttpServerConfig.class, objectUnderTest, "acmCertificateArn", UUID.randomUUID().toString()); + + assertThat(objectUnderTest.isAcmCertificateArnValid(), equalTo(true)); + } + } + + private String getS3FilePath() { + return S3_PREFIX.concat(UUID.randomUUID().toString()); + } +} diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandlerTest.java similarity index 97% rename from data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java rename to data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandlerTest.java index d0b7823fd3..cc6cbd8b28 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingRejectHandlerTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.http; import org.opensearch.dataprepper.metrics.PluginMetrics; import com.linecorp.armeria.common.AggregatedHttpResponse; diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategyTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingStrategyTest.java similarity index 97% rename from data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategyTest.java rename to data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingStrategyTest.java index 0a1d3028ef..2edbedaeec 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogThrottlingStrategyTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/LogThrottlingStrategyTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.http; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.util.UnmodifiableFuture; diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactoryTest.java similarity index 70% rename from data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java rename to data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactoryTest.java index 8b1122aa07..f309cec2b7 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/certificate/CertificateProviderFactoryTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp.certificate; +package org.opensearch.dataprepper.http.certificate; import org.hamcrest.core.IsInstanceOf; import org.junit.jupiter.api.BeforeEach; @@ -12,7 +12,7 @@ import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider; import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider; import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider; -import org.opensearch.dataprepper.plugins.source.loghttp.HTTPSourceConfig; +import org.opensearch.dataprepper.http.HttpServerConfig; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; @@ -22,21 +22,21 @@ class CertificateProviderFactoryTest { private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); - private HTTPSourceConfig httpSourceConfig; + private HttpServerConfig httpServerConfig; private CertificateProviderFactory certificateProviderFactory; @BeforeEach void setUp() { - httpSourceConfig = mock(HTTPSourceConfig.class); + httpServerConfig = mock(HttpServerConfig.class); } @Test void getCertificateProviderFileCertificateProviderSuccess() { - when(httpSourceConfig.isSsl()).thenReturn(true); - when(httpSourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); - when(httpSourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); + when(httpServerConfig.isSsl()).thenReturn(true); + when(httpServerConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); + when(httpServerConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - certificateProviderFactory = new CertificateProviderFactory(httpSourceConfig); + certificateProviderFactory = new CertificateProviderFactory(httpServerConfig); final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); assertThat(certificateProvider, IsInstanceOf.instanceOf(FileCertificateProvider.class)); @@ -44,12 +44,12 @@ void getCertificateProviderFileCertificateProviderSuccess() { @Test void getCertificateProviderS3ProviderSuccess() { - when(httpSourceConfig.isSslCertAndKeyFileInS3()).thenReturn(true); - when(httpSourceConfig.getAwsRegion()).thenReturn("us-east-1"); - when(httpSourceConfig.getSslCertificateFile()).thenReturn("s3://data/certificate/test_cert.crt"); - when(httpSourceConfig.getSslKeyFile()).thenReturn("s3://data/certificate/test_decrypted_key.key"); + when(httpServerConfig.isSslCertAndKeyFileInS3()).thenReturn(true); + when(httpServerConfig.getAwsRegion()).thenReturn("us-east-1"); + when(httpServerConfig.getSslCertificateFile()).thenReturn("s3://data/certificate/test_cert.crt"); + when(httpServerConfig.getSslKeyFile()).thenReturn("s3://data/certificate/test_decrypted_key.key"); - certificateProviderFactory = new CertificateProviderFactory(httpSourceConfig); + certificateProviderFactory = new CertificateProviderFactory(httpServerConfig); final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); assertThat(certificateProvider, IsInstanceOf.instanceOf(S3CertificateProvider.class)); @@ -57,11 +57,11 @@ void getCertificateProviderS3ProviderSuccess() { @Test void getCertificateProviderAcmProviderSuccess() { - when(httpSourceConfig.isUseAcmCertificateForSsl()).thenReturn(true); - when(httpSourceConfig.getAwsRegion()).thenReturn("us-east-1"); - when(httpSourceConfig.getAcmCertificateArn()).thenReturn("arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); + when(httpServerConfig.isUseAcmCertificateForSsl()).thenReturn(true); + when(httpServerConfig.getAwsRegion()).thenReturn("us-east-1"); + when(httpServerConfig.getAcmCertificateArn()).thenReturn("arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); - certificateProviderFactory = new CertificateProviderFactory(httpSourceConfig); + certificateProviderFactory = new CertificateProviderFactory(httpServerConfig); final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); assertThat(certificateProvider, IsInstanceOf.instanceOf(ACMCertificateProvider.class)); diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java similarity index 97% rename from data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java rename to data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java index 12b3400906..4863667bc0 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp.codec; +package org.opensearch.dataprepper.http.codec; import com.linecorp.armeria.common.HttpData; import org.junit.jupiter.api.Test; diff --git a/data-prepper-plugins/http-source-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/http-source-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..78ccc25012 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline \ No newline at end of file diff --git a/data-prepper-plugins/http-source-common/src/test/resources/test_cert.crt b/data-prepper-plugins/http-source-common/src/test/resources/test_cert.crt new file mode 100644 index 0000000000..26c78d1411 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/test/resources/test_cert.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u +MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw +MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB +dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq +HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ +O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo +Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb +uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC +FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg +/GAIzJwiZfXiaevQHRk79qI= +-----END CERTIFICATE----- diff --git a/data-prepper-plugins/http-source-common/src/test/resources/test_decrypted_key.key b/data-prepper-plugins/http-source-common/src/test/resources/test_decrypted_key.key new file mode 100644 index 0000000000..479b877131 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/test/resources/test_decrypted_key.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd +5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC +4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB +AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq +H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU +zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn +ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445 +KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS +aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk +thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI +xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD +jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4 +yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0= +-----END RSA PRIVATE KEY----- diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index 459513f6c7..7d54d5f177 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -10,6 +10,7 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:blocking-buffer') + implementation project(':data-prepper-plugins:http-source-common') implementation project(':data-prepper-plugins:common') implementation project(':data-prepper-plugins:armeria-common') implementation libs.armeria.core @@ -17,8 +18,9 @@ dependencies { implementation 'software.amazon.awssdk:acm' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:apache-client' - testImplementation project(':data-prepper-api').sourceSets.test.output testImplementation 'org.assertj:assertj-core:3.25.3' + testImplementation project(':data-prepper-api').sourceSets.test.output + testImplementation project(':data-prepper-test-common') } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 5f7ad0ecfa..cea9e252f6 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -13,6 +13,9 @@ import com.linecorp.armeria.server.throttling.ThrottlingService; import org.opensearch.dataprepper.HttpRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.http.HttpServerConfig; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; +import org.opensearch.dataprepper.http.LogThrottlingStrategy; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -29,7 +32,7 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.source.loghttp.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +52,7 @@ public class HTTPSource implements Source> { public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; static final String SERVER_CONNECTIONS = "serverConnections"; - private final HTTPSourceConfig sourceConfig; + private final HttpServerConfig sourceConfig; private final CertificateProviderFactory certificateProviderFactory; private final ArmeriaHttpAuthenticationProvider authenticationProvider; private final HttpRequestExceptionHandler httpRequestExceptionHandler; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java index c8ad8397d0..9ab52afce0 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java @@ -5,224 +5,20 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import jakarta.validation.constraints.Size; -import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.micrometer.core.instrument.util.StringUtils; -import jakarta.validation.constraints.AssertTrue; -import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Min; +import org.opensearch.dataprepper.http.BaseHttpServerConfig; + +public class HTTPSourceConfig extends BaseHttpServerConfig { -public class HTTPSourceConfig { static final String DEFAULT_LOG_INGEST_URI = "/log/ingest"; - static final String SSL = "ssl"; - static final String SSL_CERTIFICATE_FILE = "ssl_certificate_file"; - static final String SSL_KEY_FILE = "ssl_key_file"; - static final String COMPRESSION = "compression"; - static final boolean DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL = false; - static final int DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS = 120000; static final int DEFAULT_PORT = 2021; - static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; - static final double BUFFER_TIMEOUT_FRACTION = 0.8; - static final int DEFAULT_THREAD_COUNT = 200; - static final int DEFAULT_MAX_CONNECTION_COUNT = 500; - static final int DEFAULT_MAX_PENDING_REQUESTS = 1024; - static final boolean DEFAULT_HEALTH_CHECK = false; - static final String HEALTH_CHECK_SERVICE = "health_check_service"; - static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check"; - static final String S3_PREFIX = "s3://"; - - @JsonProperty("port") - @Min(0) - @Max(65535) - private int port = DEFAULT_PORT; - - @JsonProperty("path") - @Size(min = 1, message = "path length should be at least 1") - private String path = DEFAULT_LOG_INGEST_URI; - - @JsonProperty("request_timeout") - @Min(2) - private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS; - - @JsonProperty("thread_count") - @Min(0) - private int threadCount = DEFAULT_THREAD_COUNT; - - @JsonProperty("max_connection_count") - @Min(0) - private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT; - - @JsonProperty("max_pending_requests") - @Min(0) - private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS; - - @JsonProperty(SSL) - private boolean ssl; - - @JsonProperty(SSL_CERTIFICATE_FILE) - private String sslCertificateFile; - - @JsonProperty(SSL_KEY_FILE) - private String sslKeyFile; - - @JsonProperty("use_acm_certificate_for_ssl") - private boolean useAcmCertificateForSsl = DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL; - - @JsonProperty("acm_certificate_arn") - private String acmCertificateArn; - - @JsonProperty("acm_private_key_password") - private String acmPrivateKeyPassword; - - @JsonProperty("acm_certificate_timeout_millis") - @Min(0) - private Integer acmCertificateTimeoutMillis = DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS; - - @JsonProperty("aws_region") - private String awsRegion; - - @JsonProperty(HEALTH_CHECK_SERVICE) - private boolean healthCheckService = DEFAULT_HEALTH_CHECK; - - @JsonProperty(UNAUTHENTICATED_HEALTH_CHECK) - private boolean unauthenticatedHealthCheck = false; - - @JsonProperty(COMPRESSION) - private CompressionOption compression = CompressionOption.NONE; - - @JsonProperty("max_request_length") - private ByteCount maxRequestLength; - - private PluginModel authentication; - - public boolean isSslCertAndKeyFileInS3() { - return ssl && sslCertificateFile.toLowerCase().startsWith(S3_PREFIX) && - sslKeyFile.toLowerCase().startsWith(S3_PREFIX); - } - - @AssertTrue(message = "path should start with /") - boolean isPathValid() { - return path.startsWith("/"); - } - - @AssertTrue(message = "ssl_certificate_file cannot be a empty or null when ssl is enabled") - boolean isSslCertificateFileValid() { - if (ssl && !useAcmCertificateForSsl) { - return StringUtils.isNotEmpty(sslCertificateFile); - } - else { - return true; - } - } - - @AssertTrue(message = "ssl_key_file cannot be a empty or null when ssl is enabled") - boolean isSslKeyFileValid() { - if (ssl && !useAcmCertificateForSsl) { - return StringUtils.isNotEmpty(sslKeyFile); - } - else { - return true; - } - } - - @AssertTrue(message = "acm_certificate_arn cannot be a empty or null when ACM is used for ssl") - boolean isAcmCertificateArnValid() { - if (ssl && useAcmCertificateForSsl) { - return StringUtils.isNotEmpty(acmCertificateArn); - } - else { - return true; - } - } - - @AssertTrue(message = "aws_region cannot be a empty or null when ACM / S3 is used for ssl") - boolean isAwsRegionValid() { - if (ssl && (useAcmCertificateForSsl || isSslCertAndKeyFileInS3())) { - return StringUtils.isNotEmpty(awsRegion); - } - return true; - } - - public int getPort() { - return port; - } - - public String getPath() { - return path; - } - - public int getRequestTimeoutInMillis() { - return requestTimeoutInMillis; - } - - public int getBufferTimeoutInMillis() { - return (int)(BUFFER_TIMEOUT_FRACTION * requestTimeoutInMillis); - } - - public int getThreadCount() { - return threadCount; - } - - public int getMaxConnectionCount() { - return maxConnectionCount; - } - - public int getMaxPendingRequests() { - return maxPendingRequests; - } - - public boolean isSsl() { - return ssl; - } - - public String getSslCertificateFile() { - return sslCertificateFile; - } - - public String getSslKeyFile() { - return sslKeyFile; - } - - public boolean isUseAcmCertificateForSsl() { - return useAcmCertificateForSsl; - } - - public String getAcmCertificateArn() { - return acmCertificateArn; - } - - public String getAcmPrivateKeyPassword() { - return acmPrivateKeyPassword; - } - - public int getAcmCertificateTimeoutMillis() { - return acmCertificateTimeoutMillis; - } - - public String getAwsRegion() { - return awsRegion; - } - - public PluginModel getAuthentication() { - return authentication; - } - - public boolean hasHealthCheckService() { - return healthCheckService; - } - - public boolean isUnauthenticatedHealthCheck() { - return unauthenticatedHealthCheck; - } - public CompressionOption getCompression() { - return compression; + @Override + public int getDefaultPort() { + return DEFAULT_PORT; } - public ByteCount getMaxRequestLength() { - return maxRequestLength; + @Override + public String getDefaultPath() { + return DEFAULT_LOG_INGEST_URI; } } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java index 5b7ec1d152..8384315aa4 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -21,7 +21,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; -import org.opensearch.dataprepper.plugins.source.loghttp.codec.JsonCodec; +import org.opensearch.dataprepper.http.codec.JsonCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java index c1671f5ac0..bf05e6b6b2 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java @@ -5,33 +5,11 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Stream; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.opensearch.dataprepper.plugins.source.loghttp.HTTPSourceConfig.S3_PREFIX; public class HTTPSourceConfigTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String PLUGIN_NAME = "http"; - - private static Stream provideCompressionOption() { - return Stream.of(Arguments.of(CompressionOption.GZIP)); - } @Test void testDefault() { @@ -41,254 +19,8 @@ void testDefault() { // When/Then assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getPort()); assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath()); - assertEquals(HTTPSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis()); - assertEquals(HTTPSourceConfig.DEFAULT_THREAD_COUNT, sourceConfig.getThreadCount()); - assertEquals(HTTPSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount()); - assertEquals(HTTPSourceConfig.DEFAULT_MAX_PENDING_REQUESTS, sourceConfig.getMaxPendingRequests()); - assertEquals(HTTPSourceConfig.DEFAULT_USE_ACM_CERTIFICATE_FOR_SSL, sourceConfig.isUseAcmCertificateForSsl()); - assertEquals(HTTPSourceConfig.DEFAULT_ACM_CERTIFICATE_TIMEOUT_MILLIS, sourceConfig.getAcmCertificateTimeoutMillis()); - assertEquals((int)(HTTPSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS * HTTPSourceConfig.BUFFER_TIMEOUT_FRACTION), - sourceConfig.getBufferTimeoutInMillis()); - assertEquals(CompressionOption.NONE, sourceConfig.getCompression()); - } - - @Nested - class SslValidationWithFile { - @Test - void isSslCertificateFileValidation_should_return_true_if_ssl_is_false() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", false); - - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - } - - @Test - void isSslCertificateFileValidation_should_return_false_if_ssl_is_true_and_sslCertificateFile_is_null() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(false)); - } - - @Test - void isSslCertificateFileValidation_should_return_true_if_ssl_is_true_and_sslCertificateFile_is_a_valid_file() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - } - - @Test - void isSslKeyFileValidation_should_return_true_if_ssl_is_false() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", false); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - } - - @Test - void isSslKeyFileValidation_should_return_false_if_ssl_is_true_and_sslKeyFile_is_null() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(false)); - } - - @Test - void isSslKeyFileValidation_should_return_true_if_ssl_is_true_and_sslKeyFile_is_a_valid_file() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslKeyFile", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - } - - } - - @Nested - class SslValidationWithS3 { - @Test - void isSslCertAndKeyFileInS3_should_return_true_if_ssl_is_true_and_KeyFile_and_certFile_are_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "sslKeyFile", getS3FilePath()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); - } - - @Test - void isSslCertAndKeyFileInS3_should_return_false_if_ssl_is_true_and_KeyFile_and_certFile_are_not_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", UUID.randomUUID().toString()); - reflectivelySetField(objectUnderTest, "sslKeyFile", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(false)); - } - - @Test - void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_null_without_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", UUID.randomUUID().toString()); - reflectivelySetField(objectUnderTest, "sslKeyFile", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(false)); - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); - } - - @Test - void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "sslKeyFile", getS3FilePath()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(false)); - } + assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort()); + assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath()); - @Test - void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_not_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "sslKeyFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "awsRegion", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); - } - - @Test - void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_not_null_with_s3_paths() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "sslCertificateFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "sslKeyFile", getS3FilePath()); - reflectivelySetField(objectUnderTest, "awsRegion", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isSslKeyFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertificateFileValid(), equalTo(true)); - assertThat(objectUnderTest.isSslCertAndKeyFileInS3(), equalTo(true)); - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); - } - } - - @Nested - class SslValidationWithAcm { - @Test - void isAwsRegionValid_should_return_false_if_ssl_is_true_and_aws_region_is_null_with_acm() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "useAcmCertificateForSsl", true); - - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(false)); - } - - @Test - void isAwsRegionValid_should_return_true_if_ssl_is_true_and_aws_region_is_not_null_with_acm() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "useAcmCertificateForSsl", true); - reflectivelySetField(objectUnderTest, "awsRegion", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isAwsRegionValid(), equalTo(true)); - } - - @Test - void isAcmCertificateArnValid_should_return_false_if_ssl_is_true_and_acm_is_true_and_arn_is_null() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "useAcmCertificateForSsl", true); - - assertThat(objectUnderTest.isAcmCertificateArnValid(), equalTo(false)); - } - - @Test - void isAcmCertificateArnValid_should_return_true_if_ssl_is_true_and_acm_is_true_and_arn_is_not_null() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "ssl", true); - reflectivelySetField(objectUnderTest, "useAcmCertificateForSsl", true); - reflectivelySetField(objectUnderTest, "acmCertificateArn", UUID.randomUUID().toString()); - - assertThat(objectUnderTest.isAcmCertificateArnValid(), equalTo(true)); - } } - - @ParameterizedTest - @MethodSource("provideCompressionOption") - void testValidCompression(final CompressionOption compressionOption) { - // Prepare - final Map settings = new HashMap<>(); - settings.put(HTTPSourceConfig.COMPRESSION, compressionOption.name()); - - final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); - final HTTPSourceConfig httpSourceConfig = OBJECT_MAPPER.convertValue( - pluginSetting.getSettings(), HTTPSourceConfig.class); - - // When/Then - assertEquals(compressionOption, httpSourceConfig.getCompression()); - } - - @Test - void getPath_should_return_correct_path() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "path", "/my/custom/path"); - - assertThat(objectUnderTest.isPathValid(), equalTo(true)); - assertThat(objectUnderTest.getPath(), equalTo("/my/custom/path")); - } - - @Test - void isPathValid_should_return_false_for_invalid_path() throws NoSuchFieldException, IllegalAccessException { - final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); - - reflectivelySetField(objectUnderTest, "path", "my/custom/path"); - - assertThat(objectUnderTest.isPathValid(), equalTo(false)); - } - - private void reflectivelySetField(final HTTPSourceConfig httpSourceConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { - final Field field = HTTPSourceConfig.class.getDeclaredField(fieldName); - try { - field.setAccessible(true); - field.set(httpSourceConfig, value); - } finally { - field.setAccessible(false); - } - } - - private String getS3FilePath() { - return S3_PREFIX.concat(UUID.randomUUID().toString()); - } } diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index 29cac75de7..c6078c4095 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.HttpRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.metrics.PluginMetrics; diff --git a/settings.gradle b/settings.gradle index 3d07f3114b..a2495d9ffc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -173,3 +173,4 @@ include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:flatten-processor' include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' +include 'data-prepper-plugins:http-source-common' \ No newline at end of file