Skip to content

Commit

Permalink
Allow setting up per-cluster TLS connections
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Sep 26, 2024
1 parent 572132d commit b0ef67f
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 70 deletions.
1 change: 1 addition & 0 deletions src/server/src/main/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ ENV REAPER_SEGMENT_COUNT_PER_NODE=64 \
REAPER_HTTP_MANAGEMENT_ENABLE="false" \
REAPER_HTTP_MANAGEMENT_KEYSTORE_PATH="" \
REAPER_HTTP_MANAGEMENT_TRUSTSTORE_PATH="" \
REAPER_HTTP_MANAGEMENT_TRUSTSTORES_DIR="" \
REAPER_TMP_DIRECTORY="/var/tmp/cassandra-reaper" \
REAPER_MEMORY_STORAGE_DIRECTORY="/var/lib/cassandra-reaper/storage"

Expand Down
1 change: 1 addition & 0 deletions src/server/src/main/docker/cassandra-reaper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ httpManagement:
mgmtApiMetricsPort: ${REAPER_MGMT_API_METRICS_PORT}
keystore: ${REAPER_HTTP_MANAGEMENT_KEYSTORE_PATH}
truststore: ${REAPER_HTTP_MANAGEMENT_TRUSTSTORE_PATH}
truststoreDir: ${REAPER_HTTP_MANAGEMENT_TRUSTSTORES_DIR}

20 changes: 18 additions & 2 deletions src/server/src/main/java/io/cassandrareaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.cassandrareaper.storage.IDistributedStorage;
import io.cassandrareaper.storage.InitializeStorage;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -341,8 +343,9 @@ private void maybeInitializeSidecarMode(ClusterResource addClusterResource) thro

private boolean selfRegisterClusterForSidecar(ClusterResource addClusterResource, String seedHost)
throws ReaperException {
final Optional<Cluster> cluster = addClusterResource.findClusterWithSeedHost(seedHost, Optional.empty(),
Optional.empty());
final Optional<Cluster> cluster = addClusterResource.findClusterWithSeedHost(
seedHost, Optional.empty(),Optional.empty()
);
if (!cluster.isPresent()) {
return false;
}
Expand Down Expand Up @@ -427,6 +430,19 @@ private void checkConfiguration(ReaperApplicationConfiguration config) {
LOG.debug("repairParallelism: {}", config.getRepairParallelism());
LOG.debug("hangingRepairTimeoutMins: {}", config.getHangingRepairTimeoutMins());
LOG.debug("jmxPorts: {}", config.getJmxPorts());

if (config.getHttpManagement() != null) {
if (config.getHttpManagement().isEnabled()) {
if (config.getHttpManagement().getTruststoreDir() != null) {
if (!Files.exists(Paths.get(config.getHttpManagement().getTruststoreDir()))) {
throw new RuntimeException(String.format(
"HttpManagement truststore is configured as %s but it does not exist",
config.getHttpManagement().getTruststoreDir()
));
}
}
}
}
}

private void tryInitializeStorage(ReaperApplicationConfiguration config, Environment environment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,9 @@ public static final class HttpManagement {
@JsonProperty
private String truststore;

@JsonProperty
private String truststoreDir;

@JsonProperty
private Integer mgmtApiMetricsPort;

Expand All @@ -772,6 +775,10 @@ public String getTruststore() {
return truststore;
}

public String getTruststoreDir() {
return truststoreDir;
}

@VisibleForTesting
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
Expand All @@ -787,6 +794,11 @@ public void setTruststore(String truststore) {
this.truststore = truststore;
}

@VisibleForTesting
public void setTruststoreDir(String truststoreDir) {
this.truststoreDir = truststoreDir;
}

public int getMgmtApiMetricsPort() {
return mgmtApiMetricsPort == null ? DEFAULT_MGMT_API_METRICS_PORT : mgmtApiMetricsPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,12 +893,10 @@ public ICassandraManagementProxy connect(Node node, Collection<String> endpoints
private ICassandraManagementProxy connectImpl(Cluster cluster, Collection<String> endpoints)
throws ReaperException {
try {
ICassandraManagementProxy proxy = context.managementConnectionFactory
.connectAny(
endpoints
.stream()
.map(host -> Node.builder().withCluster(cluster).withHostname(host).build())
.collect(Collectors.toList()));
ICassandraManagementProxy proxy = context.managementConnectionFactory.connectAny(endpoints.stream()
.map(host -> Node.builder().withCluster(cluster).withHostname(host).build())
.collect(Collectors.toList())
);

Async.markClusterActive(cluster, context);
return proxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@

public class HttpManagementConnectionFactory implements IManagementConnectionFactory {
private static final char[] KEYSTORE_PASSWORD = "changeit".toCharArray();

private static final String KEYSTORE_COMPONENT_NAME = "keystore.jks";

private static final String TRUSTSTORE_COMPONENT_NAME = "truststore.jks";

private static final Logger LOG = LoggerFactory.getLogger(HttpManagementConnectionFactory.class);
private static final ConcurrentMap<String, HttpCassandraManagementProxy> HTTP_CONNECTIONS = Maps.newConcurrentMap();
private final MetricRegistry metricRegistry;
Expand All @@ -95,13 +100,18 @@ public HttpManagementConnectionFactory(AppContext context, ScheduledExecutorServ
this.config = context.config;
registerConnectionsGauge();
this.jobStatusPollerExecutor = jobStatusPollerExecutor;
if (context.config.getHttpManagement().getKeystore() != null && !context.config.getHttpManagement().getKeystore()
.isEmpty()) {
try {
createSslWatcher();
} catch (IOException e) {
throw new RuntimeException(e);
}

String ts = context.config.getHttpManagement().getTruststore();
boolean watchTruststore = ts != null && !ts.isEmpty();
String ks = context.config.getHttpManagement().getKeystore();
boolean watchKeystore = ks != null && !ks.isEmpty();
String tsd = context.config.getHttpManagement().getTruststoreDir();
boolean watchTruststoreDir = tsd != null && !tsd.isEmpty() && Files.isDirectory(Paths.get(tsd));

try {
createSslWatcher(watchTruststore, watchKeystore, watchTruststoreDir);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -170,19 +180,25 @@ private HttpCassandraManagementProxy connectImpl(Node node)
@Override
public HttpCassandraManagementProxy apply(@Nullable String hostName) {
ReaperApplicationConfiguration.HttpManagement httpConfig = config.getHttpManagement();
boolean useMtls = httpConfig.getKeystore() != null && !httpConfig.getKeystore().isEmpty();

boolean useMtls = (httpConfig.getKeystore() != null && !httpConfig.getKeystore().isEmpty())
|| (httpConfig.getTruststoreDir() != null && !httpConfig.getTruststoreDir().isEmpty());

OkHttpClient.Builder clientBuilder = new OkHttpClient().newBuilder();

String protocol = "http";

Path truststoreName = getTruststoreComponentPath(node, TRUSTSTORE_COMPONENT_NAME);
Path keystoreName = getTruststoreComponentPath(node, KEYSTORE_COMPONENT_NAME);

if (useMtls) {
LOG.debug("Using TLS connection to " + node.getHostname());
// We have to split TrustManagers to its own function to please OkHttpClient
TrustManager[] trustManagers;
SSLContext sslContext;
try {
trustManagers = getTrustManagers();
sslContext = createSslContext(trustManagers);
trustManagers = getTrustManagers(truststoreName);
sslContext = createSslContext(trustManagers, keystoreName);
} catch (ReaperException e) {
LOG.error("Failed to create SSLContext: " + e.getLocalizedMessage(), e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -218,8 +234,7 @@ public HttpCassandraManagementProxy apply(@Nullable String hostName) {
}

@VisibleForTesting
SSLContext createSslContext(TrustManager[] tms) throws ReaperException {
Path keyStorePath = Paths.get(config.getHttpManagement().getKeystore());
SSLContext createSslContext(TrustManager[] tms, Path keyStorePath) throws ReaperException {

try (InputStream ksIs = Files.newInputStream(keyStorePath, StandardOpenOption.READ)) {

Expand All @@ -238,8 +253,11 @@ SSLContext createSslContext(TrustManager[] tms) throws ReaperException {
}
}

private TrustManager[] getTrustManagers() throws ReaperException {
Path trustStorePath = Paths.get(config.getHttpManagement().getTruststore());
@VisibleForTesting
TrustManager[] getTrustManagers(Path trustStorePath) throws ReaperException {

LOG.trace(String.format("Calling getSingleTrustManager with %s", trustStorePath));

try (InputStream tsIs = Files.newInputStream(trustStorePath, StandardOpenOption.READ)) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(tsIs, KEYSTORE_PASSWORD);
Expand All @@ -249,30 +267,65 @@ private TrustManager[] getTrustManagers() throws ReaperException {

return tmf.getTrustManagers();
} catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
throw new ReaperException(e);
throw new ReaperException("Error loading trust managers");
}
}

@VisibleForTesting
Path getTruststoreComponentPath(Node node, String truststoreComponentName) {
Path trustStorePath;

String clusterName = node.getClusterName();
if (clusterName.equals("")) {
// the cluster name is not available, so we will try to use the global certs
trustStorePath = truststoreComponentName.equals(TRUSTSTORE_COMPONENT_NAME)
? Paths.get(config.getHttpManagement().getTruststore()).toAbsolutePath()
: Paths.get(config.getHttpManagement().getKeystore()).toAbsolutePath();
} else {
// load a cluster-specific trust store otherwise
Path storesRootPath = Paths.get(config.getHttpManagement().getTruststoreDir());
trustStorePath = storesRootPath
.resolve(String.format("%s-%s", clusterName, truststoreComponentName))
.toAbsolutePath();
}

return trustStorePath;
}

@VisibleForTesting
void createSslWatcher() throws IOException {
void createSslWatcher(boolean watchTruststore, boolean watchKeystore, boolean watchTruststoreDir) throws IOException {

WatchService watchService = FileSystems.getDefault().newWatchService();
Path trustStorePath = Paths.get(config.getHttpManagement().getTruststore());
Path keyStorePath = Paths.get(config.getHttpManagement().getKeystore());
Path keystoreParent = trustStorePath.getParent();
Path trustStoreParent = keyStorePath.getParent();

keystoreParent.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);

if (!keystoreParent.equals(trustStoreParent)) {
trustStoreParent.register(

Path trustStorePath = watchTruststore ? Paths.get(config.getHttpManagement().getTruststore()) : null;
Path keyStorePath = watchKeystore ? Paths.get(config.getHttpManagement().getKeystore()) : null ;
Path truststoreDirPath = watchTruststoreDir ? Paths.get(config.getHttpManagement().getTruststoreDir()) : null ;

if (watchKeystore) {
keyStorePath.getParent().register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
if (watchTruststore && watchKeystore) {
if (!trustStorePath.getParent().equals(keyStorePath.getParent())) {
trustStorePath.getParent().register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
}
if (watchTruststoreDir) {
truststoreDirPath.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
StandardWatchEventKinds.ENTRY_MODIFY
);
}

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -290,11 +343,23 @@ void createSslWatcher() throws IOException {
WatchEvent<java.nio.file.Path> ev = (WatchEvent<Path>) event;
Path eventFilename = ev.context();

if (keystoreParent.resolve(eventFilename).equals(keyStorePath)
|| trustStoreParent.resolve(eventFilename).equals(trustStorePath)) {
// Something in the TLS has been modified.. recreate HTTP connections
reloadNeeded = true;
if (watchKeystore) {
if (keyStorePath.getParent().resolve(eventFilename).equals(keyStorePath)) {
reloadNeeded = true;
}
}
if (watchTruststore) {
if (trustStorePath.getParent().resolve(eventFilename).equals(trustStorePath)) {
// Something in the TLS has been modified.. recreate HTTP connections
reloadNeeded = true;
}
}
if (watchTruststoreDir) {
if (eventFilename.toString().endsWith(".jks")) {
reloadNeeded = true;
}
}

}
if (!key.reset()) {
// The watched directories have disappeared..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ public Response addOrUpdateCluster(
@QueryParam("jmxPort") Optional<Integer> jmxPort) {

LOG.info("POST addOrUpdateCluster called with seedHost: {}", seedHost.orElse(null));
return addOrUpdateCluster(uriInfo, Optional.empty(), seedHost, jmxPort, Optional.empty(), Optional.empty());
return addOrUpdateCluster(
uriInfo, Optional.empty(), seedHost, jmxPort, Optional.empty(), Optional.empty()
);
}

@POST
Expand Down Expand Up @@ -248,7 +250,9 @@ public Response addOrUpdateCluster(
"PUT addOrUpdateCluster called with: cluster_name = {}, seedHost = {}",
clusterName, seedHost.orElse(null));

return addOrUpdateCluster(uriInfo, Optional.of(clusterName), seedHost, jmxPort, Optional.empty(), Optional.empty());
return addOrUpdateCluster(
uriInfo, Optional.of(clusterName), seedHost, jmxPort, Optional.empty(), Optional.empty()
);
}

@PUT
Expand Down Expand Up @@ -297,8 +301,10 @@ private Response addOrUpdateCluster(
}
}

final Optional<Cluster> cluster = findClusterWithSeedHost(seedHost.get(), jmxPort,
Optional.ofNullable(jmxCredentials));
final Optional<Cluster> cluster = findClusterWithSeedHost(
seedHost.get(), jmxPort, Optional.ofNullable(jmxCredentials)
);

if (!cluster.isPresent()) {
return Response
.status(Response.Status.BAD_REQUEST)
Expand Down Expand Up @@ -361,9 +367,11 @@ private Response addOrUpdateCluster(
return Response.created(location).build();
}

public Optional<Cluster> findClusterWithSeedHost(String seedHost,
Optional<Integer> jmxPort,
Optional<JmxCredentials> jmxCredentials) {
public Optional<Cluster> findClusterWithSeedHost(
String seedHost,
Optional<Integer> jmxPort,
Optional<JmxCredentials> jmxCredentials
) {
Set<String> seedHosts = parseSeedHosts(seedHost);
try {
Cluster.Builder clusterBuilder = Cluster.builder()
Expand Down
Loading

0 comments on commit b0ef67f

Please sign in to comment.