Skip to content

Commit

Permalink
Support Kafka SASL_SSL/SSL security protocol for self signed certific…
Browse files Browse the repository at this point in the history
…ate in Kafka Consumer
  • Loading branch information
dinujoh committed Feb 23, 2024
1 parent 1aede50 commit 6356bff
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 19 deletions.
32 changes: 32 additions & 0 deletions data-prepper-plugins/http-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
testImplementation testLibs.bundles.junit
}

test {
useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
14 changes: 14 additions & 0 deletions data-prepper-plugins/http-common/data/certificate/test_cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.truststore;

import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class TrustStoreProvider {
private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class);

public TrustManager[] createTrustManager(final Path certificatePath) {
LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString());
try {
final KeyStore keyStore = createKeyStore(certificatePath);
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
trustManagerFactory.init(keyStore);
return trustManagerFactory.getTrustManagers();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public TrustManager[] createTrustManager(final String certificateContent) {
LOG.info("Using the certificate content to create trust manager.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
final KeyStore keyStore = createKeyStore(certificateInputStream);
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
trustManagerFactory.init(keyStore);
return trustManagerFactory.getTrustManagers();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public TrustManager[] createTrustAllManager() {
LOG.info("Using the trust all manager to create trust manager.");
return new TrustManager[]{
new X509TrustAllManager()
};
}

private KeyStore createKeyStore(final Path certificatePath) throws Exception {
try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) {
return createKeyStore(certificateInputStream);
}
}

private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
final Certificate trustedCa = factory.generateCertificate(certificateInputStream);
final KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
return trustStore;
}

public SSLContext createSSLContext(final Path certificatePath) {
LOG.info("Using the certificate path to create SSL context.");
try (InputStream is = Files.newInputStream(certificatePath)) {
return createSSLContext(is);
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public SSLContext createSSLContext(final String certificateContent) {
LOG.info("Using the certificate content to create SSL context.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
return createSSLContext(certificateInputStream);
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

private SSLContext createSSLContext(final InputStream certificateInputStream) throws Exception {
KeyStore trustStore = createKeyStore(certificateInputStream);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
}

public SSLContext createSSLContextWithTrustAllStrategy() {
LOG.info("Using the trust all strategy to create SSL context.");
try {
return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) {
return true;
}
}).build();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.dataprepper.plugins.truststore;

import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class X509TrustAllManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {

}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {

}

@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.opensearch.dataprepper.plugins.truststore;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

@ExtendWith(MockitoExtension.class)
class TrustStoreProviderTest {

private TrustStoreProvider trustStoreProvider;

@BeforeEach
void setUp() {
trustStoreProvider = new TrustStoreProvider();
}

@Test
void createTrustManagerWithCertificatePath() {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certFilePath);
assertThat(trustManagers, is(notNullValue()));
}

@Test
void createTrustManagerWithInvalidCertificatePath() {
final Path certFilePath = Path.of("data/certificate/cert_doesnt_exist.crt");
assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager(certFilePath));
}

@Test
void createTrustManagerWithCertificateContent() throws IOException {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final String certificateContent = Files.readString(certFilePath);
final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certificateContent);
assertThat(trustManagers, is(notNullValue()));
}

@Test
void createTrustManagerWithInvalidCertificateContent() {
assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager("invalid certificate content"));
}

@Test
void createTrustAllManager() {
final TrustManager[] trustManagers = trustStoreProvider.createTrustAllManager();
assertThat(trustManagers, is(notNullValue()));
assertThat(trustManagers, is(arrayWithSize(1)));
assertThat(trustManagers[0], is(instanceOf(X509TrustAllManager.class)));
}

@Test
void createSSLContextWithCertificatePath() {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final SSLContext sslContext = trustStoreProvider.createSSLContext(certFilePath);
assertThat(sslContext, is(notNullValue()));
}

@Test
void createSSLContextWithCertificateContent() throws IOException {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final String certificateContent = Files.readString(certFilePath);
final SSLContext sslContext = trustStoreProvider.createSSLContext(certificateContent);
assertThat(sslContext, is(notNullValue()));
}

@Test
void createSSLContextWithTrustAllStrategy() {
final SSLContext sslContext = trustStoreProvider.createSSLContextWithTrustAllStrategy();
assertThat(sslContext, is(notNullValue()));
}
}
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'org.apache.kafka:kafka-clients:3.6.1'
implementation 'org.apache.kafka:connect-json:3.6.1'
implementation project(':data-prepper-plugins:http-common')
implementation libs.avro.core
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,34 @@ public class EncryptionConfig {
@JsonProperty("type")
private EncryptionType type = EncryptionType.SSL;

@JsonProperty("certificateContent")
private String certificateContent;

@JsonProperty("trustStoreFilePath")
private String trustStoreFilePath;

@JsonProperty("trustStorePassword")
private String trustStorePassword;

@JsonProperty("insecure")
private boolean insecure = false;

public EncryptionType getType() {
return type;
}

public String getCertificateContent() {
return certificateContent;
}

public String getTrustStoreFilePath() {
return trustStoreFilePath;
}

public String getTrustStorePassword() {
return trustStorePassword;
}

public boolean getInsecure() {
return insecure;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class CustomClientSslEngineFactory implements SslEngineFactory {
String certificateContent = null;

@Override
public void configure(Map<String, ?> configs) {
certificateContent = configs.get("certificateContent").toString();
}

private TrustManager[] getTrustManager() {
final TrustStoreProvider trustStoreProvider = new TrustStoreProvider();
final TrustManager[] trustManagers;
if (Objects.nonNull(certificateContent)) {
trustManagers = trustStoreProvider.createTrustManager(certificateContent);
} else {
trustManagers = trustStoreProvider.createTrustAllManager();
}
return trustManagers;
}

@Override
public SSLEngine createClientSslEngine(final String peerHost, final int peerPort, final String endpointIdentification) {
try {
final SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, getTrustManager(), new SecureRandom());
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(true);
return sslEngine;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}

@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
return null;
}

@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return false;
}

@Override
public Set<String> reconfigurableConfigs() {
return null;
}

@Override
public KeyStore keystore() {
return null;
}

@Override
public KeyStore truststore() {
return null;
}

@Override
public void close() {

}
}
Loading

0 comments on commit 6356bff

Please sign in to comment.