Skip to content

Commit

Permalink
Support Kafka SASL_SSL/SSL security protocol for self signed certific…
Browse files Browse the repository at this point in the history
…ate in Kafka Consumer

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Feb 23, 2024
1 parent 1144035 commit 23d0476
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ private static void setSecurityProtocolSSLProperties(final Properties properties
}

private static void setCustomSslProperties(final Properties properties, final String certificateContent) {
properties.put("enable.ssl.certificate.verification", "true");
properties.put(CERTIFICATE_CONTENT, certificateContent);
properties.put(SSL_ENGINE_FACTORY_CLASS, CustomClientSslEngineFactory.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.dataprepper.plugins.kafka.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.kafka.source.KafkaSourceConfig;
import org.slf4j.Logger;
Expand All @@ -16,24 +15,73 @@
import java.util.Objects;
import java.util.Properties;

import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

public class KafkaSecurityConfigurerTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSecurityConfigurerTest.class);
@Test
public void testSetAuthProperties() throws Exception {
public void testSetAuthPropertiesWithSaslPlainCertificate() throws Exception {
final Properties props = new Properties();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-sasl-ssl-certificate-content.yaml");
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
assertThat(props.getProperty("sasl.mechanism"), is("PLAIN"));
assertThat(props.getProperty("security.protocol"), is("SASL_SSL"));
assertThat(props.getProperty("certificateContent"), is("CERTIFICATE_DATA"));
assertThat(props.getProperty("ssl.truststore.location"), is(nullValue()));
assertThat(props.getProperty("ssl.truststore.password"), is(nullValue()));
assertThat(props.get("ssl.engine.factory.class"), is(CustomClientSslEngineFactory.class));
}

@Test
public void testSetAuthPropertiesWithNoAuthSsl() throws Exception {
final Properties props = new Properties();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-no-auth-ssl.yaml");
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
assertThat(props.getProperty("sasl.mechanism"), is(nullValue()));
assertThat(props.getProperty("security.protocol"), is("SSL"));
assertThat(props.getProperty("certificateContent"), is("CERTIFICATE_DATA"));
assertThat(props.get("ssl.engine.factory.class"), is(CustomClientSslEngineFactory.class));
}
@Test
public void testSetAuthPropertiesWithNoAuthSslNone() throws Exception {
final Properties props = new Properties();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-no-auth-ssl-none.yaml");
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
assertThat(props.getProperty("sasl.mechanism"), is(nullValue()));
assertThat(props.getProperty("security.protocol"), is(nullValue()));
assertThat(props.getProperty("certificateContent"), is(nullValue()));
assertThat(props.get("ssl.engine.factory.class"), is(nullValue()));
}

@Test
public void testSetAuthPropertiesWithNoAuthInsecure() throws Exception {
final Properties props = new Properties();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-auth-insecure.yaml");
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
assertThat(props.getProperty("sasl.mechanism"), is("PLAIN"));
assertThat(props.getProperty("security.protocol"), is("SASL_PLAINTEXT"));
assertThat(props.getProperty("certificateContent"), is(nullValue()));
assertThat(props.get("ssl.engine.factory.class"), is(InsecureSslEngineFactory.class));
}
@Test
public void testSetAuthPropertiesAuthSslWithTrustStore() throws Exception {
final Properties props = new Properties();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig();
final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-sasl-ssl-truststore.yaml");
KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG);
Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism"));
Assertions.assertEquals("true", props.getProperty("enable.ssl.certificate.verification"));
Assertions.assertEquals("SASL_SSL", props.getProperty("security.protocol"));
Assertions.assertEquals("CERTIFICATE_DATA", props.getProperty("certificateContent"));
Assertions.assertEquals(CustomClientSslEngineFactory.class, props.get("ssl.engine.factory.class"));
assertThat(props.getProperty("sasl.mechanism"), is("PLAIN"));
assertThat(props.getProperty("security.protocol"), is("SASL_SSL"));
assertThat(props.getProperty("certificateContent"), is(nullValue()));
assertThat(props.getProperty("ssl.truststore.location"), is("some-file-path"));
assertThat(props.getProperty("ssl.truststore.password"), is("some-password"));
assertThat(props.get("ssl.engine.factory.class"), is(nullValue()));
}

private KafkaSourceConfig createKafkaSinkConfig() throws IOException {
private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IOException {
final Yaml yaml = new Yaml();
final FileReader fileReader = new FileReader(Objects.requireNonNull(getClass().getClassLoader()
.getResource("kafka-pipeline-sasl-ssl.yaml")).getFile());
.getResource(fileName)).getFile());
final Map<String, Map<String, Map<String, Map<String, Object>>>> data = yaml.load(fileReader);
final Map<String, Map<String, Map<String, Object>>> logPipelineMap = data.get("log-pipeline");
final Map<String, Map<String, Object>> sourceMap = logPipelineMap.get("source");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
log-pipeline :
source:
kafka:
bootstrap_servers:
- "localhost:9092"
encryption:
type: "NONE"
certificateContent: "CERTIFICATE_DATA"
insecure: "true"
authentication:
sasl:
plaintext:
username: username
password: password
topics:
- name: "quickstart-events"
group_id: "groupdID1"
sink:
stdout:
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
log-pipeline :
source:
kafka:
bootstrap_servers:
- "localhost:9092"
encryption:
type: "NONE"
certificateContent: "CERTIFICATE_DATA"
topics:
- name: "quickstart-events"
group_id: "groupdID1"
sink:
stdout:
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
log-pipeline :
source:
kafka:
bootstrap_servers:
- "localhost:9092"
encryption:
type: "SSL"
certificateContent: "CERTIFICATE_DATA"
topics:
- name: "quickstart-events"
group_id: "groupdID1"
sink:
stdout:
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
log-pipeline :
source:
kafka:
bootstrap_servers:
- "localhost:9092"
encryption:
type: "SSL"
certificateContent: "CERTIFICATE_DATA"
authentication:
sasl:
plaintext:
username: username
password: password
topics:
- name: "quickstart-events"
group_id: "groupdID1"
sink:
stdout:
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
log-pipeline :
source:
kafka:
bootstrap_servers:
- "localhost:9092"
encryption:
type: "SSL"
trustStoreFilePath: "some-file-path"
trustStorePassword: "some-password"
authentication:
sasl:
plaintext:
username: username
password: password
topics:
- name: "quickstart-events"
group_id: "groupdID1"
sink:
stdout:

0 comments on commit 23d0476

Please sign in to comment.