Skip to content

Commit

Permalink
Add SSLContext creation based on the provided mTLS jks Keystore/Trust…
Browse files Browse the repository at this point in the history
…store

Add two new config options to http, keystore and truststore. They must be mounted as jks keystore files.

If the keystore is set, Reaper tries to build HttpManagementProxy with mTLS enabled.
  • Loading branch information
burmanm committed Nov 23, 2023
1 parent 71440cc commit 8cb25f6
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 30 deletions.
4 changes: 0 additions & 4 deletions src/server/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@
<property name="format" value="[\t]"/>
<property name="message" value="Tabs in java file"/>
</module>
<module name="RegexpSinglelineJava">
<property name="format" value="[=+-]$"/>
<property name="message" value="Operators must carry"/>
</module>
<module name="AvoidStarImport"/>
<module name="OneTopLevelClass"/>
<module name="NoLineWrap"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public final class ReaperApplicationConfiguration extends Configuration {

public static final int DEFAULT_MGMT_API_METRICS_PORT = 9000;
private static final int DEFAULT_MGMT_API_PORT = 8080;
private static final int DEFAULT_SEGMENT_COUNT_PER_NODE = 64;
private static final Integer DEFAULT_MAX_PENDING_COMPACTIONS = 20;

Expand Down Expand Up @@ -167,9 +168,6 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private CryptographFactory cryptograph;

@JsonProperty
private Integer mgmtApiMetricsPort;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -509,15 +507,6 @@ public void setCryptograph(@Nullable CryptographFactory cryptograph) {
this.cryptograph = cryptograph;
}

public int getMgmtApiMetricsPort() {
return mgmtApiMetricsPort == null ? DEFAULT_MGMT_API_METRICS_PORT : mgmtApiMetricsPort;
}

@JsonProperty("mgmtApiMetricsPort")
public void setMgmtApiMetricsPort(int mgmtApiMetricsPort) {
this.mgmtApiMetricsPort = mgmtApiMetricsPort;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down Expand Up @@ -721,9 +710,46 @@ public static final class HttpManagement {
@JsonProperty
private Boolean enabled = false;

@JsonProperty
private String keystore;

@JsonProperty
private String trustStore;

@JsonProperty
private Integer mgmtApiMetricsPort;

@JsonProperty
private Integer managementApiPort;

public Boolean isEnabled() {
return enabled;
}
// TODO: Add ports and root paths here.

public String getKeystore() {
return keystore;
}

public String getTrustStore() {
return trustStore;
}

public int getMgmtApiMetricsPort() {
return mgmtApiMetricsPort == null ? DEFAULT_MGMT_API_METRICS_PORT : mgmtApiMetricsPort;
}

public void setManagementApiPort(Integer managementApiPort) {
this.managementApiPort = managementApiPort;
}

public Integer getManagementApiPort() {
return managementApiPort == null ? DEFAULT_MGMT_API_PORT : managementApiPort;
}

@JsonProperty("mgmtApiMetricsPort")
public void setMgmtApiMetricsPort(int mgmtApiMetricsPort) {
this.mgmtApiMetricsPort = mgmtApiMetricsPort;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,65 @@
package io.cassandrareaper.management.http;

import io.cassandrareaper.AppContext;
import io.cassandrareaper.ReaperApplicationConfiguration;
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Node;
import io.cassandrareaper.management.HostConnectionCounters;
import io.cassandrareaper.management.ICassandraManagementProxy;
import io.cassandrareaper.management.IManagementConnectionFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import javax.ws.rs.core.Response;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.datastax.mgmtapi.client.api.DefaultApi;
import com.datastax.mgmtapi.client.invoker.ApiClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import okhttp3.OkHttpClient;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpManagementConnectionFactory implements IManagementConnectionFactory {
private static final char[] KEYSTORE_PASSWORD = "changeit".toCharArray();
private static final Logger LOG = LoggerFactory.getLogger(HttpManagementConnectionFactory.class);
private static final ConcurrentMap<String, HttpCassandraManagementProxy> HTTP_CONNECTIONS = Maps.newConcurrentMap();
private final MetricRegistry metricRegistry;
private final HostConnectionCounters hostConnectionCounters;
private final int metricsPort;

private final ScheduledExecutorService jobStatusPollerExecutor;
private ReaperApplicationConfiguration config;

private final Set<String> accessibleDatacenters = Sets.newHashSet();

Expand All @@ -64,7 +85,7 @@ public HttpManagementConnectionFactory(AppContext context, ScheduledExecutorServ
this.metricRegistry
= context.metricRegistry == null ? new MetricRegistry() : context.metricRegistry;
hostConnectionCounters = new HostConnectionCounters(metricRegistry);
this.metricsPort = context.config.getMgmtApiMetricsPort();
this.config = context.config;
registerConnectionsGauge();
this.jobStatusPollerExecutor = jobStatusPollerExecutor;
}
Expand Down Expand Up @@ -113,45 +134,106 @@ private void registerConnectionsGauge() {
.containsKey(MetricRegistry.name(HttpManagementConnectionFactory.class, "openHttpManagementConnections"))) {
this.metricRegistry.register(
MetricRegistry.name(HttpManagementConnectionFactory.class, "openHttpManagementConnections"),
(Gauge<Integer>) () -> HTTP_CONNECTIONS.size());
(Gauge<Integer>) HTTP_CONNECTIONS::size);
}
} catch (IllegalArgumentException e) {
LOG.warn("Cannot create openHttoManagementConnections metric gauge", e);
}
}

private ICassandraManagementProxy connectImpl(Node node)
private HttpCassandraManagementProxy connectImpl(Node node)
throws ReaperException, InterruptedException {
Integer managementPort = 8080; // TODO - get this from the config.
String rootPath = ""; // TODO - get this from the config.
Response pidResponse = getPid(node);
if (pidResponse.getStatus() != 200) {
throw new ReaperException("Could not get PID for node " + node.getHostname());
}
Integer managementPort = config.getHttpManagement().getManagementApiPort();
String rootPath = ""; // TODO - get this from the config.

String host = node.getHostname();

HTTP_CONNECTIONS.computeIfAbsent(host, new Function<String, HttpCassandraManagementProxy>() {
return HTTP_CONNECTIONS.computeIfAbsent(node.getHostname(), new Function<String, HttpCassandraManagementProxy>() {
@Nullable
@Override
public HttpCassandraManagementProxy apply(@Nullable String hostName) {
DefaultApi apiClient = new DefaultApi(
new ApiClient().setBasePath("http://" + hostName + ":" + managementPort + rootPath));
ReaperApplicationConfiguration.HttpManagement httpConfig = config.getHttpManagement();
boolean useMtls = httpConfig.getKeystore() != null && !httpConfig.getKeystore().isEmpty();

OkHttpClient.Builder clientBuilder = new OkHttpClient().newBuilder();

String protocol = "http";
if (useMtls) {
// We have to split TrustManagers to its own function to please OkHttpClient
TrustManager[] trustManagers;
SSLContext sslContext;
try {
trustManagers = getTrustManagers();
sslContext = createSslContext(trustManagers);
} catch (ReaperException e) {
throw new RuntimeException(e);
}
clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustManagers[0]);
protocol = "https";
}

OkHttpClient okHttpClient = clientBuilder
.build();

ApiClient apiClient = new ApiClient().setBasePath(
protocol + "://" + node.getHostname() + ":" + managementPort + rootPath)
.setHttpClient(okHttpClient);

DefaultApi mgmtApiClient = new DefaultApi(apiClient);

InstrumentedScheduledExecutorService statusTracker = new InstrumentedScheduledExecutorService(
jobStatusPollerExecutor, metricRegistry);

return new HttpCassandraManagementProxy(
metricRegistry,
rootPath,
new InetSocketAddress(node.getHostname(), managementPort),
statusTracker,
apiClient,
metricsPort,
mgmtApiClient,
config.getHttpManagement().getMgmtApiMetricsPort(),
node
);
}
});
return HTTP_CONNECTIONS.get(host);
}

@VisibleForTesting
SSLContext createSslContext(TrustManager[] tms) throws ReaperException {
Path keyStorePath = Paths.get(config.getHttpManagement().getKeystore());

try (InputStream ksIs = Files.newInputStream(keyStorePath, StandardOpenOption.READ)) {

KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(ksIs, KEYSTORE_PASSWORD);

KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, KEYSTORE_PASSWORD);

SSLContext sslCtx = SSLContext.getInstance("TLS");
sslCtx.init(kmf.getKeyManagers(), tms, null);
return sslCtx;
} catch (CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException
| UnrecoverableKeyException | IOException e) {
throw new ReaperException(e);
}
}

private TrustManager[] getTrustManagers() throws ReaperException {
Path trustStorePath = Paths.get(config.getHttpManagement().getTrustStore());
try (InputStream tsIs = Files.newInputStream(trustStorePath, StandardOpenOption.READ)) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(tsIs, KEYSTORE_PASSWORD);

TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);

return tmf.getTrustManagers();
} catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
throw new ReaperException(e);
}

}

private Response getPid(Node node) {
Expand Down

0 comments on commit 8cb25f6

Please sign in to comment.