Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REAP-10 Allow setting up per-cluster TLS connections #1524

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/src/main/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ ENV REAPER_SEGMENT_COUNT_PER_NODE=64 \
REAPER_HTTP_MANAGEMENT_ENABLE="false" \
REAPER_HTTP_MANAGEMENT_KEYSTORE_PATH="" \
REAPER_HTTP_MANAGEMENT_TRUSTSTORE_PATH="" \
REAPER_HTTP_MANAGEMENT_TRUSTSTORES_DIR="" \
REAPER_TMP_DIRECTORY="/var/tmp/cassandra-reaper" \
REAPER_MEMORY_STORAGE_DIRECTORY="/var/lib/cassandra-reaper/storage"

Expand Down
1 change: 1 addition & 0 deletions src/server/src/main/docker/cassandra-reaper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ httpManagement:
mgmtApiMetricsPort: ${REAPER_MGMT_API_METRICS_PORT}
keystore: ${REAPER_HTTP_MANAGEMENT_KEYSTORE_PATH}
truststore: ${REAPER_HTTP_MANAGEMENT_TRUSTSTORE_PATH}
truststoresDir: ${REAPER_HTTP_MANAGEMENT_TRUSTSTORES_DIR}

20 changes: 18 additions & 2 deletions src/server/src/main/java/io/cassandrareaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.cassandrareaper.storage.IDistributedStorage;
import io.cassandrareaper.storage.InitializeStorage;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -341,8 +343,9 @@ private void maybeInitializeSidecarMode(ClusterResource addClusterResource) thro

private boolean selfRegisterClusterForSidecar(ClusterResource addClusterResource, String seedHost)
throws ReaperException {
final Optional<Cluster> cluster = addClusterResource.findClusterWithSeedHost(seedHost, Optional.empty(),
Optional.empty());
final Optional<Cluster> cluster = addClusterResource.findClusterWithSeedHost(
seedHost, Optional.empty(),Optional.empty()
);
if (!cluster.isPresent()) {
return false;
}
Expand Down Expand Up @@ -427,6 +430,19 @@ private void checkConfiguration(ReaperApplicationConfiguration config) {
LOG.debug("repairParallelism: {}", config.getRepairParallelism());
LOG.debug("hangingRepairTimeoutMins: {}", config.getHangingRepairTimeoutMins());
LOG.debug("jmxPorts: {}", config.getJmxPorts());

if (config.getHttpManagement() != null) {
if (config.getHttpManagement().isEnabled()) {
if (config.getHttpManagement().getTruststoresDir() != null) {
if (!Files.exists(Paths.get(config.getHttpManagement().getTruststoresDir()))) {
throw new RuntimeException(String.format(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: There's odd logic here. If getTrustStoresDir() is null, then we won't have any RuntimeException.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you rely on the truststore value to cause the RuntimeException somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I've added it here, in a checkConfiguration(), so that it fails early. Then the stacktrace is smaller and more obvious.

Otherwise, the RuntimeException would happen later, when it's trying to setup the watchers for changes, which in turn happens when actually adding a cluster. The stack traces at this point are incredibly long and nested.

"HttpManagement truststores directory is configured as %s but it does not exist",
config.getHttpManagement().getTruststoresDir()
));
}
}
}
}
}

private void tryInitializeStorage(ReaperApplicationConfiguration config, Environment environment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,9 @@ public static final class HttpManagement {
@JsonProperty
private String truststore;

@JsonProperty
private String truststoresDir;

@JsonProperty
private Integer mgmtApiMetricsPort;

Expand All @@ -772,6 +775,10 @@ public String getTruststore() {
return truststore;
}

public String getTruststoresDir() {
return truststoresDir;
}

@VisibleForTesting
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
Expand All @@ -787,6 +794,11 @@ public void setTruststore(String truststore) {
this.truststore = truststore;
}

@VisibleForTesting
public void setTruststoresDir(String truststoresDir) {
this.truststoresDir = truststoresDir;
}

public int getMgmtApiMetricsPort() {
return mgmtApiMetricsPort == null ? DEFAULT_MGMT_API_METRICS_PORT : mgmtApiMetricsPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,12 +893,10 @@ public ICassandraManagementProxy connect(Node node, Collection<String> endpoints
private ICassandraManagementProxy connectImpl(Cluster cluster, Collection<String> endpoints)
throws ReaperException {
try {
ICassandraManagementProxy proxy = context.managementConnectionFactory
.connectAny(
endpoints
.stream()
.map(host -> Node.builder().withCluster(cluster).withHostname(host).build())
.collect(Collectors.toList()));
ICassandraManagementProxy proxy = context.managementConnectionFactory.connectAny(endpoints.stream()
.map(host -> Node.builder().withCluster(cluster).withHostname(host).build())
.collect(Collectors.toList())
);

Async.markClusterActive(cluster, context);
return proxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@

public class HttpManagementConnectionFactory implements IManagementConnectionFactory {
private static final char[] KEYSTORE_PASSWORD = "changeit".toCharArray();

private static final String KEYSTORE_COMPONENT_NAME = "keystore.jks";

private static final String TRUSTSTORE_COMPONENT_NAME = "truststore.jks";

private static final Logger LOG = LoggerFactory.getLogger(HttpManagementConnectionFactory.class);
private static final ConcurrentMap<String, HttpCassandraManagementProxy> HTTP_CONNECTIONS = Maps.newConcurrentMap();
private final MetricRegistry metricRegistry;
Expand All @@ -95,13 +100,22 @@ public HttpManagementConnectionFactory(AppContext context, ScheduledExecutorServ
this.config = context.config;
registerConnectionsGauge();
this.jobStatusPollerExecutor = jobStatusPollerExecutor;
if (context.config.getHttpManagement().getKeystore() != null && !context.config.getHttpManagement().getKeystore()
.isEmpty()) {
try {
createSslWatcher();
} catch (IOException e) {
throw new RuntimeException(e);

String ts = context.config.getHttpManagement().getTruststore();
boolean watchTruststore = ts != null && !ts.isEmpty();
String ks = context.config.getHttpManagement().getKeystore();
boolean watchKeystore = ks != null && !ks.isEmpty();
String tsd = context.config.getHttpManagement().getTruststoresDir();
boolean watchTruststoreDir = tsd != null && !tsd.isEmpty() && Files.isDirectory(Paths.get(tsd));

try {
if (watchKeystore || watchTruststore || watchTruststoreDir) {
createSslWatcher(watchTruststore, watchKeystore, watchTruststoreDir);
} else {
LOG.debug("Not setting up any SSL watchers");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -170,19 +184,26 @@ private HttpCassandraManagementProxy connectImpl(Node node)
@Override
public HttpCassandraManagementProxy apply(@Nullable String hostName) {
ReaperApplicationConfiguration.HttpManagement httpConfig = config.getHttpManagement();
boolean useMtls = httpConfig.getKeystore() != null && !httpConfig.getKeystore().isEmpty();

boolean useMtls = (httpConfig.getKeystore() != null && !httpConfig.getKeystore().isEmpty())
|| (httpConfig.getTruststoresDir() != null && !httpConfig.getTruststoresDir().isEmpty());

OkHttpClient.Builder clientBuilder = new OkHttpClient().newBuilder();

String protocol = "http";

if (useMtls) {

Path truststoreName = getTruststoreComponentPath(node, TRUSTSTORE_COMPONENT_NAME);
Path keystoreName = getTruststoreComponentPath(node, KEYSTORE_COMPONENT_NAME);

LOG.debug("Using TLS connection to " + node.getHostname());
// We have to split TrustManagers to its own function to please OkHttpClient
TrustManager[] trustManagers;
SSLContext sslContext;
try {
trustManagers = getTrustManagers();
sslContext = createSslContext(trustManagers);
trustManagers = getTrustManagers(truststoreName);
sslContext = createSslContext(trustManagers, keystoreName);
} catch (ReaperException e) {
LOG.error("Failed to create SSLContext: " + e.getLocalizedMessage(), e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -218,8 +239,7 @@ public HttpCassandraManagementProxy apply(@Nullable String hostName) {
}

@VisibleForTesting
SSLContext createSslContext(TrustManager[] tms) throws ReaperException {
Path keyStorePath = Paths.get(config.getHttpManagement().getKeystore());
SSLContext createSslContext(TrustManager[] tms, Path keyStorePath) throws ReaperException {

try (InputStream ksIs = Files.newInputStream(keyStorePath, StandardOpenOption.READ)) {

Expand All @@ -238,8 +258,11 @@ SSLContext createSslContext(TrustManager[] tms) throws ReaperException {
}
}

private TrustManager[] getTrustManagers() throws ReaperException {
Path trustStorePath = Paths.get(config.getHttpManagement().getTruststore());
@VisibleForTesting
TrustManager[] getTrustManagers(Path trustStorePath) throws ReaperException {

LOG.trace(String.format("Calling getSingleTrustManager with %s", trustStorePath));

try (InputStream tsIs = Files.newInputStream(trustStorePath, StandardOpenOption.READ)) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(tsIs, KEYSTORE_PASSWORD);
Expand All @@ -249,30 +272,67 @@ private TrustManager[] getTrustManagers() throws ReaperException {

return tmf.getTrustManagers();
} catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
throw new ReaperException(e);
throw new ReaperException("Error loading trust managers");
}
}

@VisibleForTesting
Path getTruststoreComponentPath(Node node, String truststoreComponentName) {
Path trustStorePath;

String clusterName = node.getClusterName();
// the cluster name is not available, or we don't have the per-cluster truststores
// we fall back to the global trust stores
if (clusterName.equals("") || config.getHttpManagement().getTruststoresDir() == null) {

trustStorePath = truststoreComponentName.equals(TRUSTSTORE_COMPONENT_NAME)
? Paths.get(config.getHttpManagement().getTruststore()).toAbsolutePath()
: Paths.get(config.getHttpManagement().getKeystore()).toAbsolutePath();
} else {
// load a cluster-specific trust store otherwise
Path storesRootPath = Paths.get(config.getHttpManagement().getTruststoresDir());
trustStorePath = storesRootPath
.resolve(String.format("%s-%s", clusterName, truststoreComponentName))
.toAbsolutePath();
}

return trustStorePath;
}

@VisibleForTesting
void createSslWatcher() throws IOException {
void createSslWatcher(boolean watchTruststore, boolean watchKeystore, boolean watchTruststoreDir) throws IOException {

WatchService watchService = FileSystems.getDefault().newWatchService();
Path trustStorePath = Paths.get(config.getHttpManagement().getTruststore());
Path keyStorePath = Paths.get(config.getHttpManagement().getKeystore());
Path keystoreParent = trustStorePath.getParent();
Path trustStoreParent = keyStorePath.getParent();

keystoreParent.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);

if (!keystoreParent.equals(trustStoreParent)) {
trustStoreParent.register(

Path trustStorePath = watchTruststore ? Paths.get(config.getHttpManagement().getTruststore()) : null;
Path keyStorePath = watchKeystore ? Paths.get(config.getHttpManagement().getKeystore()) : null ;
Path truststoreDirPath = watchTruststoreDir ? Paths.get(config.getHttpManagement().getTruststoresDir()) : null ;

if (watchKeystore) {
keyStorePath.getParent().register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
if (watchTruststore && watchKeystore) {
if (!trustStorePath.getParent().equals(keyStorePath.getParent())) {
trustStorePath.getParent().register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
}
if (watchTruststoreDir) {
truststoreDirPath.register(
watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
StandardWatchEventKinds.ENTRY_MODIFY
);
}

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -290,11 +350,23 @@ void createSslWatcher() throws IOException {
WatchEvent<java.nio.file.Path> ev = (WatchEvent<Path>) event;
Path eventFilename = ev.context();

if (keystoreParent.resolve(eventFilename).equals(keyStorePath)
|| trustStoreParent.resolve(eventFilename).equals(trustStorePath)) {
// Something in the TLS has been modified.. recreate HTTP connections
reloadNeeded = true;
if (watchKeystore) {
if (keyStorePath.getParent().resolve(eventFilename).equals(keyStorePath)) {
reloadNeeded = true;
}
}
if (watchTruststore) {
if (trustStorePath.getParent().resolve(eventFilename).equals(trustStorePath)) {
// Something in the TLS has been modified.. recreate HTTP connections
reloadNeeded = true;
}
}
if (watchTruststoreDir) {
if (eventFilename.toString().endsWith(".jks")) {
reloadNeeded = true;
}
}

}
if (!key.reset()) {
// The watched directories have disappeared..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ public Response addOrUpdateCluster(
@QueryParam("jmxPort") Optional<Integer> jmxPort) {

LOG.info("POST addOrUpdateCluster called with seedHost: {}", seedHost.orElse(null));
return addOrUpdateCluster(uriInfo, Optional.empty(), seedHost, jmxPort, Optional.empty(), Optional.empty());
return addOrUpdateCluster(
uriInfo, Optional.empty(), seedHost, jmxPort, Optional.empty(), Optional.empty()
);
}

@POST
Expand Down Expand Up @@ -248,7 +250,9 @@ public Response addOrUpdateCluster(
"PUT addOrUpdateCluster called with: cluster_name = {}, seedHost = {}",
clusterName, seedHost.orElse(null));

return addOrUpdateCluster(uriInfo, Optional.of(clusterName), seedHost, jmxPort, Optional.empty(), Optional.empty());
return addOrUpdateCluster(
uriInfo, Optional.of(clusterName), seedHost, jmxPort, Optional.empty(), Optional.empty()
);
}

@PUT
Expand Down Expand Up @@ -297,8 +301,10 @@ private Response addOrUpdateCluster(
}
}

final Optional<Cluster> cluster = findClusterWithSeedHost(seedHost.get(), jmxPort,
Optional.ofNullable(jmxCredentials));
final Optional<Cluster> cluster = findClusterWithSeedHost(
seedHost.get(), jmxPort, Optional.ofNullable(jmxCredentials)
);

if (!cluster.isPresent()) {
return Response
.status(Response.Status.BAD_REQUEST)
Expand Down Expand Up @@ -361,9 +367,11 @@ private Response addOrUpdateCluster(
return Response.created(location).build();
}

public Optional<Cluster> findClusterWithSeedHost(String seedHost,
Optional<Integer> jmxPort,
Optional<JmxCredentials> jmxCredentials) {
public Optional<Cluster> findClusterWithSeedHost(
String seedHost,
Optional<Integer> jmxPort,
Optional<JmxCredentials> jmxCredentials
) {
Set<String> seedHosts = parseSeedHosts(seedHost);
try {
Cluster.Builder clusterBuilder = Cluster.builder()
Expand Down
Loading
Loading