From 23d0476d00a96cd107f9d7b90e56c0ca3e7e38bf Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Thu, 22 Feb 2024 17:17:55 -0600 Subject: [PATCH] Support Kafka SASL_SSL/SSL security protocol for self signed certificate in Kafka Consumer Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../kafka/util/KafkaSecurityConfigurer.java | 1 - .../util/KafkaSecurityConfigurerTest.java | 68 ++++++++++++++++--- .../kafka-pipeline-auth-insecure.yaml | 19 ++++++ .../kafka-pipeline-no-auth-ssl-none.yaml | 13 ++++ .../resources/kafka-pipeline-no-auth-ssl.yaml | 13 ++++ ...pipeline-sasl-ssl-certificate-content.yaml | 18 +++++ .../kafka-pipeline-sasl-ssl-truststore.yaml | 19 ++++++ 7 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-auth-insecure.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl-none.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-certificate-content.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-truststore.yaml diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index 7b4619ec85..7357fca2e1 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -135,7 +135,6 @@ private static void setSecurityProtocolSSLProperties(final Properties properties } private static void setCustomSslProperties(final Properties properties, final String certificateContent) { - properties.put("enable.ssl.certificate.verification", "true"); properties.put(CERTIFICATE_CONTENT, certificateContent); properties.put(SSL_ENGINE_FACTORY_CLASS, CustomClientSslEngineFactory.class); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java index d52a5c7d1f..949f920f66 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java @@ -1,7 +1,6 @@ package org.opensearch.dataprepper.plugins.kafka.util; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig; import org.slf4j.Logger; @@ -16,24 +15,73 @@ import java.util.Objects; import java.util.Properties; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + public class KafkaSecurityConfigurerTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaSecurityConfigurerTest.class); @Test - public void testSetAuthProperties() throws Exception { + public void testSetAuthPropertiesWithSaslPlainCertificate() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-sasl-ssl-certificate-content.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("sasl.mechanism"), is("PLAIN")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is("CERTIFICATE_DATA")); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(CustomClientSslEngineFactory.class)); + } + + @Test + public void testSetAuthPropertiesWithNoAuthSsl() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-no-auth-ssl.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("sasl.mechanism"), is(nullValue())); + assertThat(props.getProperty("security.protocol"), is("SSL")); + assertThat(props.getProperty("certificateContent"), is("CERTIFICATE_DATA")); + assertThat(props.get("ssl.engine.factory.class"), is(CustomClientSslEngineFactory.class)); + } + @Test + public void testSetAuthPropertiesWithNoAuthSslNone() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-no-auth-ssl-none.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("sasl.mechanism"), is(nullValue())); + assertThat(props.getProperty("security.protocol"), is(nullValue())); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + } + + @Test + public void testSetAuthPropertiesWithNoAuthInsecure() throws Exception { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-auth-insecure.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("sasl.mechanism"), is("PLAIN")); + assertThat(props.getProperty("security.protocol"), is("SASL_PLAINTEXT")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(InsecureSslEngineFactory.class)); + } + @Test + public void testSetAuthPropertiesAuthSslWithTrustStore() throws Exception { final Properties props = new Properties(); - final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-sasl-ssl-truststore.yaml"); KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); - Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism")); - Assertions.assertEquals("true", props.getProperty("enable.ssl.certificate.verification")); - Assertions.assertEquals("SASL_SSL", props.getProperty("security.protocol")); - Assertions.assertEquals("CERTIFICATE_DATA", props.getProperty("certificateContent")); - Assertions.assertEquals(CustomClientSslEngineFactory.class, props.get("ssl.engine.factory.class")); + assertThat(props.getProperty("sasl.mechanism"), is("PLAIN")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is("some-file-path")); + assertThat(props.getProperty("ssl.truststore.password"), is("some-password")); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); } - private KafkaSourceConfig createKafkaSinkConfig() throws IOException { + private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IOException { final Yaml yaml = new Yaml(); final FileReader fileReader = new FileReader(Objects.requireNonNull(getClass().getClassLoader() - .getResource("kafka-pipeline-sasl-ssl.yaml")).getFile()); + .getResource(fileName)).getFile()); final Map>>> data = yaml.load(fileReader); final Map>> logPipelineMap = data.get("log-pipeline"); final Map> sourceMap = logPipelineMap.get("source"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-auth-insecure.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-auth-insecure.yaml new file mode 100644 index 0000000000..0092fac3b4 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-auth-insecure.yaml @@ -0,0 +1,19 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "NONE" + certificateContent: "CERTIFICATE_DATA" + insecure: "true" + authentication: + sasl: + plaintext: + username: username + password: password + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl-none.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl-none.yaml new file mode 100644 index 0000000000..fef3ceb423 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl-none.yaml @@ -0,0 +1,13 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "NONE" + certificateContent: "CERTIFICATE_DATA" + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl.yaml new file mode 100644 index 0000000000..7c526e3723 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-no-auth-ssl.yaml @@ -0,0 +1,13 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + certificateContent: "CERTIFICATE_DATA" + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-certificate-content.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-certificate-content.yaml new file mode 100644 index 0000000000..16511fbdc8 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-certificate-content.yaml @@ -0,0 +1,18 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + certificateContent: "CERTIFICATE_DATA" + authentication: + sasl: + plaintext: + username: username + password: password + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-truststore.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-truststore.yaml new file mode 100644 index 0000000000..ad010746e5 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-sasl-ssl-truststore.yaml @@ -0,0 +1,19 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + trustStoreFilePath: "some-file-path" + trustStorePassword: "some-password" + authentication: + sasl: + plaintext: + username: username + password: password + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file