From 6356bfffd511f8a473a0ee896cad12803f5e431e Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Thu, 22 Feb 2024 17:17:55 -0600 Subject: [PATCH] Support Kafka SASL_SSL/SSL security protocol for self signed certificate in Kafka Consumer --- data-prepper-plugins/http-common/build.gradle | 32 +++++ .../data/certificate/test_cert.crt | 14 +++ .../truststore/TrustStoreProvider.java | 113 ++++++++++++++++++ .../truststore/X509TrustAllManager.java | 22 ++++ .../truststore/TrustStoreProviderTest.java | 85 +++++++++++++ .../kafka-plugins/build.gradle | 1 + .../kafka/configuration/EncryptionConfig.java | 21 ++++ .../util/CustomClientSslEngineFactory.java | 83 +++++++++++++ .../kafka/util/KafkaSecurityConfigurer.java | 67 ++++++++--- .../CustomClientSslEngineFactoryTest.java | 39 ++++++ .../util/KafkaSecurityConfigurerTest.java | 46 +++++++ .../resources/kafka-pipeline-sasl-ssl.yaml | 18 +++ .../src/test/resources/test_cert.crt | 14 +++ settings.gradle | 1 + 14 files changed, 537 insertions(+), 19 deletions(-) create mode 100644 data-prepper-plugins/http-common/build.gradle create mode 100644 data-prepper-plugins/http-common/data/certificate/test_cert.crt create mode 100644 data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java create mode 100644 data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/X509TrustAllManager.java create mode 100644 data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProviderTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactoryTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/test_cert.crt diff --git a/data-prepper-plugins/http-common/build.gradle b/data-prepper-plugins/http-common/build.gradle new file mode 100644 index 0000000000..a4b3f202b6 --- /dev/null +++ b/data-prepper-plugins/http-common/build.gradle @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation 'org.apache.httpcomponents:httpcore:4.4.16' + testImplementation testLibs.bundles.junit +} + +test { + useJUnitPlatform() +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.90 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/http-common/data/certificate/test_cert.crt b/data-prepper-plugins/http-common/data/certificate/test_cert.crt new file mode 100644 index 0000000000..26c78d1411 --- /dev/null +++ b/data-prepper-plugins/http-common/data/certificate/test_cert.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u +MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw +MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB +dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq +HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ +O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo +Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb +uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC +FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg +/GAIzJwiZfXiaevQHRk79qI= +-----END CERTIFICATE----- diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java new file mode 100644 index 0000000000..c628822260 --- /dev/null +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.truststore; + +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.http.ssl.TrustStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; + +public class TrustStoreProvider { + private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class); + + public TrustManager[] createTrustManager(final Path certificatePath) { + LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString()); + try { + final KeyStore keyStore = createKeyStore(certificatePath); + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509"); + trustManagerFactory.init(keyStore); + return trustManagerFactory.getTrustManagers(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + + public TrustManager[] createTrustManager(final String certificateContent) { + LOG.info("Using the certificate content to create trust manager."); + try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) { + final KeyStore keyStore = createKeyStore(certificateInputStream); + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509"); + trustManagerFactory.init(keyStore); + return trustManagerFactory.getTrustManagers(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + + public TrustManager[] createTrustAllManager() { + LOG.info("Using the trust all manager to create trust manager."); + return new TrustManager[]{ + new X509TrustAllManager() + }; + } + + private KeyStore createKeyStore(final Path certificatePath) throws Exception { + try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) { + return createKeyStore(certificateInputStream); + } + } + + private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception { + final CertificateFactory factory = CertificateFactory.getInstance("X.509"); + final Certificate trustedCa = factory.generateCertificate(certificateInputStream); + final KeyStore trustStore = KeyStore.getInstance("pkcs12"); + trustStore.load(null, null); + trustStore.setCertificateEntry("ca", trustedCa); + return trustStore; + } + + public SSLContext createSSLContext(final Path certificatePath) { + LOG.info("Using the certificate path to create SSL context."); + try (InputStream is = Files.newInputStream(certificatePath)) { + return createSSLContext(is); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + + public SSLContext createSSLContext(final String certificateContent) { + LOG.info("Using the certificate content to create SSL context."); + try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) { + return createSSLContext(certificateInputStream); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + + private SSLContext createSSLContext(final InputStream certificateInputStream) throws Exception { + KeyStore trustStore = createKeyStore(certificateInputStream); + SSLContextBuilder sslContextBuilder = SSLContexts.custom() + .loadTrustMaterial(trustStore, null); + return sslContextBuilder.build(); + } + + public SSLContext createSSLContextWithTrustAllStrategy() { + LOG.info("Using the trust all strategy to create SSL context."); + try { + return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() { + @Override + public boolean isTrusted(X509Certificate[] chain, String authType) { + return true; + } + }).build(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } +} diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/X509TrustAllManager.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/X509TrustAllManager.java new file mode 100644 index 0000000000..378c29dcb8 --- /dev/null +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/X509TrustAllManager.java @@ -0,0 +1,22 @@ +package org.opensearch.dataprepper.plugins.truststore; + +import javax.net.ssl.X509TrustManager; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +public class X509TrustAllManager implements X509TrustManager { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } +} diff --git a/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProviderTest.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProviderTest.java new file mode 100644 index 0000000000..c01b36c6a8 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProviderTest.java @@ -0,0 +1,85 @@ +package org.opensearch.dataprepper.plugins.truststore; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ExtendWith(MockitoExtension.class) +class TrustStoreProviderTest { + + private TrustStoreProvider trustStoreProvider; + + @BeforeEach + void setUp() { + trustStoreProvider = new TrustStoreProvider(); + } + + @Test + void createTrustManagerWithCertificatePath() { + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certFilePath); + assertThat(trustManagers, is(notNullValue())); + } + + @Test + void createTrustManagerWithInvalidCertificatePath() { + final Path certFilePath = Path.of("data/certificate/cert_doesnt_exist.crt"); + assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager(certFilePath)); + } + + @Test + void createTrustManagerWithCertificateContent() throws IOException { + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final String certificateContent = Files.readString(certFilePath); + final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certificateContent); + assertThat(trustManagers, is(notNullValue())); + } + + @Test + void createTrustManagerWithInvalidCertificateContent() { + assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager("invalid certificate content")); + } + + @Test + void createTrustAllManager() { + final TrustManager[] trustManagers = trustStoreProvider.createTrustAllManager(); + assertThat(trustManagers, is(notNullValue())); + assertThat(trustManagers, is(arrayWithSize(1))); + assertThat(trustManagers[0], is(instanceOf(X509TrustAllManager.class))); + } + + @Test + void createSSLContextWithCertificatePath() { + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final SSLContext sslContext = trustStoreProvider.createSSLContext(certFilePath); + assertThat(sslContext, is(notNullValue())); + } + + @Test + void createSSLContextWithCertificateContent() throws IOException { + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final String certificateContent = Files.readString(certFilePath); + final SSLContext sslContext = trustStoreProvider.createSSLContext(certificateContent); + assertThat(sslContext, is(notNullValue())); + } + + @Test + void createSSLContextWithTrustAllStrategy() { + final SSLContext sslContext = trustStoreProvider.createSSLContextWithTrustAllStrategy(); + assertThat(sslContext, is(notNullValue())); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 7ed03e1d58..0ef1eb572d 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation project(':data-prepper-plugins:aws-plugin-api') implementation 'org.apache.kafka:kafka-clients:3.6.1' implementation 'org.apache.kafka:connect-json:3.6.1' + implementation project(':data-prepper-plugins:http-common') implementation libs.avro.core implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'io.micrometer:micrometer-core' diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java index 826c2edadf..9ee67e92a4 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java @@ -11,6 +11,15 @@ public class EncryptionConfig { @JsonProperty("type") private EncryptionType type = EncryptionType.SSL; + @JsonProperty("certificateContent") + private String certificateContent; + + @JsonProperty("trustStoreFilePath") + private String trustStoreFilePath; + + @JsonProperty("trustStorePassword") + private String trustStorePassword; + @JsonProperty("insecure") private boolean insecure = false; @@ -18,6 +27,18 @@ public EncryptionType getType() { return type; } + public String getCertificateContent() { + return certificateContent; + } + + public String getTrustStoreFilePath() { + return trustStoreFilePath; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + public boolean getInsecure() { return insecure; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java new file mode 100644 index 0000000000..932b818618 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.apache.kafka.common.security.auth.SslEngineFactory; +import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class CustomClientSslEngineFactory implements SslEngineFactory { + String certificateContent = null; + + @Override + public void configure(Map configs) { + certificateContent = configs.get("certificateContent").toString(); + } + + private TrustManager[] getTrustManager() { + final TrustStoreProvider trustStoreProvider = new TrustStoreProvider(); + final TrustManager[] trustManagers; + if (Objects.nonNull(certificateContent)) { + trustManagers = trustStoreProvider.createTrustManager(certificateContent); + } else { + trustManagers = trustStoreProvider.createTrustAllManager(); + } + return trustManagers; + } + + @Override + public SSLEngine createClientSslEngine(final String peerHost, final int peerPort, final String endpointIdentification) { + try { + final SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, getTrustManager(), new SecureRandom()); + SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(true); + return sslEngine; + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new RuntimeException(e); + } + } + + @Override + public SSLEngine createServerSslEngine(String peerHost, int peerPort) { + return null; + } + + @Override + public boolean shouldBeRebuilt(Map nextConfigs) { + return false; + } + + @Override + public Set reconfigurableConfigs() { + return null; + } + + @Override + public KeyStore keystore() { + return null; + } + + @Override + public KeyStore truststore() { + return null; + } + + @Override + public void close() { + + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index 779aefcab0..7b4619ec85 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -83,6 +83,10 @@ public class KafkaSecurityConfigurer { private static final String REGISTRY_BASIC_AUTH_USER_INFO = "schema.registry.basic.auth.user.info"; private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds + private static final String SSL_ENGINE_FACTORY_CLASS = "ssl.engine.factory.class"; + private static final String CERTIFICATE_CONTENT = "certificateContent"; + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; private static AwsCredentialsProvider credentialsProvider; private static GlueSchemaRegistryKafkaDeserializer glueDeserializer; @@ -108,18 +112,39 @@ public class KafkaSecurityConfigurer { properties.put(SASL_JAAS_CONFIG, String.format(PLAINTEXT_JAASCONFIG, username, password)); }*/ - private static void setPlainTextAuthProperties(Properties properties, final PlainTextAuthConfig plainTextAuthConfig, EncryptionType encryptionType) { - String username = plainTextAuthConfig.getUsername(); - String password = plainTextAuthConfig.getPassword(); + private static void setPlainTextAuthProperties(final Properties properties, final PlainTextAuthConfig plainTextAuthConfig, + final EncryptionConfig encryptionConfig) { + final String username = plainTextAuthConfig.getUsername(); + final String password = plainTextAuthConfig.getPassword(); properties.put(SASL_MECHANISM, "PLAIN"); properties.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); - if (encryptionType == EncryptionType.NONE) { - properties.put(SECURITY_PROTOCOL, "SASL_PLAINTEXT"); - } else { // EncryptionType.SSL + if (checkEncryptionType(encryptionConfig, EncryptionType.SSL)) { properties.put(SECURITY_PROTOCOL, "SASL_SSL"); + setSecurityProtocolSSLProperties(properties, encryptionConfig); + } else { // EncryptionType.NONE + properties.put(SECURITY_PROTOCOL, "SASL_PLAINTEXT"); } } + private static void setSecurityProtocolSSLProperties(final Properties properties, final EncryptionConfig encryptionConfig) { + if (Objects.nonNull(encryptionConfig.getCertificateContent())) { + setCustomSslProperties(properties, encryptionConfig.getCertificateContent()); + } else { + setTruststoreProperties(properties, encryptionConfig); + } + } + + private static void setCustomSslProperties(final Properties properties, final String certificateContent) { + properties.put("enable.ssl.certificate.verification", "true"); + properties.put(CERTIFICATE_CONTENT, certificateContent); + properties.put(SSL_ENGINE_FACTORY_CLASS, CustomClientSslEngineFactory.class); + } + + private static void setTruststoreProperties(final Properties properties, final EncryptionConfig encryptionConfig) { + properties.put(SSL_TRUSTSTORE_LOCATION, encryptionConfig.getTrustStoreFilePath()); + properties.put(SSL_TRUSTSTORE_PASSWORD, encryptionConfig.getTrustStorePassword()); + } + public static void setOauthProperties(final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Properties properties) { final OAuthConfig oAuthConfig = kafkaClusterAuthConfig.getAuthConfig().getSaslAuthConfig().getOAuthConfig(); @@ -258,27 +283,25 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth } } - public static void setAuthProperties(Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger LOG) { + public static void setAuthProperties(final Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger LOG) { final AwsConfig awsConfig = kafkaClusterAuthConfig.getAwsConfig(); final AuthConfig authConfig = kafkaClusterAuthConfig.getAuthConfig(); final EncryptionConfig encryptionConfig = kafkaClusterAuthConfig.getEncryptionConfig(); - final EncryptionType encryptionType = encryptionConfig.getType(); - credentialsProvider = DefaultCredentialsProvider.create(); String bootstrapServers = ""; if (Objects.nonNull(kafkaClusterAuthConfig.getBootstrapServers())) { bootstrapServers = String.join(",", kafkaClusterAuthConfig.getBootstrapServers()); } - AwsIamAuthConfig awsIamAuthConfig = null; + if (Objects.nonNull(authConfig)) { - AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig(); + final AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig(); if (Objects.nonNull(saslAuthConfig)) { - awsIamAuthConfig = saslAuthConfig.getAwsIamAuthConfig(); - PlainTextAuthConfig plainTextAuthConfig = saslAuthConfig.getPlainTextAuthConfig(); + final AwsIamAuthConfig awsIamAuthConfig = saslAuthConfig.getAwsIamAuthConfig(); + final PlainTextAuthConfig plainTextAuthConfig = saslAuthConfig.getPlainTextAuthConfig(); if (Objects.nonNull(awsIamAuthConfig)) { - if (encryptionType == EncryptionType.NONE) { + if (checkEncryptionType(encryptionConfig, EncryptionType.NONE)) { throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); } if (Objects.isNull(awsConfig)) { @@ -288,33 +311,39 @@ public static void setAuthProperties(Properties properties, final KafkaClusterAu bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig, LOG); } else if (Objects.nonNull(saslAuthConfig.getOAuthConfig())) { setOauthProperties(kafkaClusterAuthConfig, properties); - } else if (Objects.nonNull(plainTextAuthConfig)) { - setPlainTextAuthProperties(properties, plainTextAuthConfig, encryptionType); + } else if (Objects.nonNull(plainTextAuthConfig) && Objects.nonNull(kafkaClusterAuthConfig.getEncryptionConfig())) { + setPlainTextAuthProperties(properties, plainTextAuthConfig, kafkaClusterAuthConfig.getEncryptionConfig()); } else { throw new RuntimeException("No SASL auth config specified"); } } if (encryptionConfig.getInsecure()) { - properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); + properties.put(SSL_ENGINE_FACTORY_CLASS, InsecureSslEngineFactory.class); } } if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) { - if (encryptionType == EncryptionType.SSL) { + if (checkEncryptionType(encryptionConfig, EncryptionType.SSL)) { properties.put(SECURITY_PROTOCOL, "SSL"); + setSecurityProtocolSSLProperties(properties, encryptionConfig); } } if (Objects.isNull(bootstrapServers) || bootstrapServers.isEmpty()) { throw new RuntimeException("Bootstrap servers are not specified"); } + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + private static boolean checkEncryptionType(final EncryptionConfig encryptionConfig, final EncryptionType encryptionType) { + return Objects.nonNull(encryptionConfig) && encryptionConfig.getType() == encryptionType; + } + public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaConsumerConfig kafkaConsumerConfig) { SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig(); if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { return null; } - Map configs = new HashMap(); + Map configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion()); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactoryTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactoryTest.java new file mode 100644 index 0000000000..6056f4c35e --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactoryTest.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.junit.jupiter.api.Test; +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Objects; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; + +public class CustomClientSslEngineFactoryTest { + @Test + public void createClientSslEngine() { + try (final CustomClientSslEngineFactory customClientSslEngineFactory = new CustomClientSslEngineFactory()) { + final SSLEngine sslEngine = customClientSslEngineFactory.createClientSslEngine(anyString(), anyInt(), anyString()); + assertThat(sslEngine, is(notNullValue())); + } + } + + @Test + public void createClientSslEngineWithConfig() throws IOException { + try (final CustomClientSslEngineFactory customClientSslEngineFactory = new CustomClientSslEngineFactory()) { + final Path certFilePath = Path.of(Objects.requireNonNull(getClass().getClassLoader() + .getResource("test_cert.crt")).getPath()); + + final String certificateContent = Files.readString(certFilePath); + customClientSslEngineFactory.configure(Collections.singletonMap("certificateContent", certificateContent)); + final SSLEngine sslEngine = customClientSslEngineFactory.createClientSslEngine(anyString(), anyInt(), anyString()); + assertThat(sslEngine, is(notNullValue())); + } + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java new file mode 100644 index 0000000000..d52a5c7d1f --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java @@ -0,0 +1,46 @@ +package org.opensearch.dataprepper.plugins.kafka.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class KafkaSecurityConfigurerTest { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSecurityConfigurerTest.class); + @Test + public void testSetAuthProperties() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig(); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism")); + Assertions.assertEquals("true", props.getProperty("enable.ssl.certificate.verification")); + Assertions.assertEquals("SASL_SSL", props.getProperty("security.protocol")); + Assertions.assertEquals("CERTIFICATE_DATA", props.getProperty("certificateContent")); + Assertions.assertEquals(CustomClientSslEngineFactory.class, props.get("ssl.engine.factory.class")); + } + + private KafkaSourceConfig createKafkaSinkConfig() throws IOException { + final Yaml yaml = new Yaml(); + final FileReader fileReader = new FileReader(Objects.requireNonNull(getClass().getClassLoader() + .getResource("kafka-pipeline-sasl-ssl.yaml")).getFile()); + final Map>>> data = yaml.load(fileReader); + final Map>> logPipelineMap = data.get("log-pipeline"); + final Map> sourceMap = logPipelineMap.get("source"); + final Map kafkaConfigMap = sourceMap.get("kafka"); + final ObjectMapper mapper = new ObjectMapper(); + final String json = mapper.writeValueAsString(kafkaConfigMap); + final Reader reader = new StringReader(json); + return mapper.readValue(reader, KafkaSourceConfig.class); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl.yaml new file mode 100644 index 0000000000..16511fbdc8 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl.yaml @@ -0,0 +1,18 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + certificateContent: "CERTIFICATE_DATA" + authentication: + sasl: + plaintext: + username: username + password: password + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/test_cert.crt b/data-prepper-plugins/kafka-plugins/src/test/resources/test_cert.crt new file mode 100644 index 0000000000..26c78d1411 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/test_cert.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u +MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw +MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB +dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq +HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ +O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo +Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb +uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC +FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg +/GAIzJwiZfXiaevQHRk79qI= +-----END CERTIFICATE----- diff --git a/settings.gradle b/settings.gradle index d1d77074ee..b9f089c654 100644 --- a/settings.gradle +++ b/settings.gradle @@ -163,3 +163,4 @@ include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' +include 'data-prepper-plugins:http-common' \ No newline at end of file