Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka SASL_SSL/SSL security protocol for self signed certificate in Kafka Consumer #4181

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions data-prepper-plugins/http-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
testImplementation testLibs.bundles.junit
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
14 changes: 14 additions & 0 deletions data-prepper-plugins/http-common/data/certificate/test_cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV
UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u
MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw
MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB
dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w
DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq
HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ
O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo
Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb
uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC
FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg
/GAIzJwiZfXiaevQHRk79qI=
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.truststore;

import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class TrustStoreProvider {
private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class);

public TrustManager[] createTrustManager(final Path certificatePath) {
LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString());
try {
final KeyStore keyStore = createKeyStore(certificatePath);
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
trustManagerFactory.init(keyStore);
return trustManagerFactory.getTrustManagers();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public TrustManager[] createTrustManager(final String certificateContent) {
LOG.info("Using the certificate content to create trust manager.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
final KeyStore keyStore = createKeyStore(certificateInputStream);
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
trustManagerFactory.init(keyStore);
return trustManagerFactory.getTrustManagers();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public TrustManager[] createTrustAllManager() {
LOG.info("Using the trust all manager to create trust manager.");
return new TrustManager[]{
new X509TrustAllManager()
};
}

private KeyStore createKeyStore(final Path certificatePath) throws Exception {
try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) {
return createKeyStore(certificateInputStream);
}
}

private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
final Certificate trustedCa = factory.generateCertificate(certificateInputStream);
final KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
return trustStore;
}

public SSLContext createSSLContext(final Path certificatePath) {
LOG.info("Using the certificate path to create SSL context.");
try (InputStream is = Files.newInputStream(certificatePath)) {
return createSSLContext(is);
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

public SSLContext createSSLContext(final String certificateContent) {
LOG.info("Using the certificate content to create SSL context.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
return createSSLContext(certificateInputStream);
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}

private SSLContext createSSLContext(final InputStream certificateInputStream) throws Exception {
KeyStore trustStore = createKeyStore(certificateInputStream);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
}

public SSLContext createSSLContextWithTrustAllStrategy() {
LOG.info("Using the trust all strategy to create SSL context.");
try {
return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) {
return true;
}
}).build();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.dataprepper.plugins.truststore;

import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class X509TrustAllManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {

}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {

}

@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.opensearch.dataprepper.plugins.truststore;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

@ExtendWith(MockitoExtension.class)
class TrustStoreProviderTest {

private TrustStoreProvider trustStoreProvider;

@BeforeEach
void setUp() {
trustStoreProvider = new TrustStoreProvider();
}

@Test
void createTrustManagerWithCertificatePath() {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certFilePath);
assertThat(trustManagers, is(notNullValue()));
}

@Test
void createTrustManagerWithInvalidCertificatePath() {
final Path certFilePath = Path.of("data/certificate/cert_doesnt_exist.crt");
assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager(certFilePath));
}

@Test
void createTrustManagerWithCertificateContent() throws IOException {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final String certificateContent = Files.readString(certFilePath);
final TrustManager[] trustManagers = trustStoreProvider.createTrustManager(certificateContent);
assertThat(trustManagers, is(notNullValue()));
}

@Test
void createTrustManagerWithInvalidCertificateContent() {
assertThrows(RuntimeException.class, () -> trustStoreProvider.createTrustManager("invalid certificate content"));
}

@Test
void createTrustAllManager() {
final TrustManager[] trustManagers = trustStoreProvider.createTrustAllManager();
assertThat(trustManagers, is(notNullValue()));
assertThat(trustManagers, is(arrayWithSize(1)));
assertThat(trustManagers[0], is(instanceOf(X509TrustAllManager.class)));
}

@Test
void createSSLContextWithCertificatePath() {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final SSLContext sslContext = trustStoreProvider.createSSLContext(certFilePath);
assertThat(sslContext, is(notNullValue()));
}

@Test
void createSSLContextWithCertificateContent() throws IOException {
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final String certificateContent = Files.readString(certFilePath);
final SSLContext sslContext = trustStoreProvider.createSSLContext(certificateContent);
assertThat(sslContext, is(notNullValue()));
}

@Test
void createSSLContextWithTrustAllStrategy() {
final SSLContext sslContext = trustStoreProvider.createSSLContextWithTrustAllStrategy();
assertThat(sslContext, is(notNullValue()));
}
}
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'org.apache.kafka:kafka-clients:3.6.1'
implementation 'org.apache.kafka:connect-json:3.6.1'
implementation project(':data-prepper-plugins:http-common')
implementation libs.avro.core
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,34 @@ public class EncryptionConfig {
@JsonProperty("type")
private EncryptionType type = EncryptionType.SSL;

@JsonProperty("certificate_content")
private String certificateContent;

@JsonProperty("trust_store_file_path")
private String trustStoreFilePath;

@JsonProperty("trust_store_password")
private String trustStorePassword;

@JsonProperty("insecure")
private boolean insecure = false;

public EncryptionType getType() {
return type;
}

public String getCertificateContent() {
return certificateContent;
}

public String getTrustStoreFilePath() {
return trustStoreFilePath;
}

public String getTrustStorePassword() {
return trustStorePassword;
}

public boolean getInsecure() {
return insecure;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class CustomClientSslEngineFactory implements SslEngineFactory {
String certificateContent = null;

@Override
public void configure(Map<String, ?> configs) {
certificateContent = configs.get("certificateContent").toString();
}

private TrustManager[] getTrustManager() {
final TrustStoreProvider trustStoreProvider = new TrustStoreProvider();
final TrustManager[] trustManagers;
if (Objects.nonNull(certificateContent)) {
trustManagers = trustStoreProvider.createTrustManager(certificateContent);
} else {
trustManagers = trustStoreProvider.createTrustAllManager();
}
return trustManagers;
}

@Override
public SSLEngine createClientSslEngine(final String peerHost, final int peerPort, final String endpointIdentification) {
try {
final SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, getTrustManager(), new SecureRandom());
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(true);
return sslEngine;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}

@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
return null;
}

@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return false;
}

@Override
public Set<String> reconfigurableConfigs() {
return null;
}

@Override
public KeyStore keystore() {
return null;
}

@Override
public KeyStore truststore() {
return null;
}

@Override
public void close() {

}
}
Loading
Loading