Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EclipseStore to memory storage for persistence #1484

Merged
merged 8 commits into from
Apr 11, 2024
11 changes: 10 additions & 1 deletion src/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<docker.directory>src/main/docker</docker.directory>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>
<management.api.version>0.1.74</management.api.version>
<eclipsestore.version>1.3.1</eclipsestore.version>

<!-- Cucumber Scenario Outline Examples listing released versions to test db migration against -->
<!-- anything added here must first be added as a artifact in build/plugins/plugins@maven-dependency-plugin -->
Expand Down Expand Up @@ -274,8 +276,15 @@
<dependency>
<groupId>io.k8ssandra</groupId>
<artifactId>datastax-mgmtapi-client-openapi</artifactId>
<version>0.1.70</version>
<version>${management.api.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.store</groupId>
<artifactId>storage-embedded</artifactId>
<version>${eclipsestore.version}</version>
</dependency>


<!--test scope -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private CryptographFactory cryptograph;

@JsonProperty
@Nullable
private String persistenceStoragePath;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -508,6 +512,15 @@ public void setCryptograph(@Nullable CryptographFactory cryptograph) {
this.cryptograph = cryptograph;
}

public void setPersistenceStoragePath(@Nullable String persistenceStoragePath) {
this.persistenceStoragePath = persistenceStoragePath;
}

@Nullable
public String getPersistenceStoragePath() {
return persistenceStoragePath;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.math.BigInteger;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;

@JsonDeserialize(builder = RepairSegment.Builder.class)
Expand All @@ -43,7 +43,7 @@ public final class RepairSegment {
private final String coordinatorHost;
private final DateTime startTime;
private final DateTime endTime;
private final Map<String, String> replicas;
private final Map<String, String> replicas = new ConcurrentHashMap<>();
// hostID field is only ever populated for incremental repairs. For full repairs it is always null.
private final UUID hostID;

Expand All @@ -58,9 +58,9 @@ private RepairSegment(Builder builder, @Nullable UUID id) {
this.coordinatorHost = builder.coordinatorHost;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
if (builder.replicas != null) {
this.replicas.putAll(builder.replicas);
}
this.hostID = builder.hostID;
}

Expand Down
10 changes: 5 additions & 5 deletions src/server/src/main/java/io/cassandrareaper/core/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.collect.ImmutableMap;

@JsonDeserialize(builder = Segment.Builder.class)
public final class Segment {
Expand All @@ -38,17 +38,17 @@ public final class Segment {

RingRange baseRange;
List<RingRange> tokenRanges;
Map<String, String> replicas;
Map<String, String> replicas = new ConcurrentHashMap<>();

private Segment(Builder builder) {
this.tokenRanges = builder.tokenRanges;
this.baseRange = builder.tokenRanges.get(0);
if (builder.baseRange != null) {
this.baseRange = builder.baseRange;
}
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
if (builder.replicas != null) {
this.replicas.putAll(builder.replicas);
}
}

public RingRange getBaseRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.cassandra.repair.RepairParallelism;
import org.slf4j.Logger;
Expand Down Expand Up @@ -146,7 +145,7 @@ static List<Segment> filterSegmentsByNodes(

@VisibleForTesting
static Map<String, List<RingRange>> buildEndpointToRangeMap(Map<List<String>, List<String>> rangeToEndpoint) {
Map<String, List<RingRange>> endpointToRange = Maps.newHashMap();
Map<String, List<RingRange>> endpointToRange = new ConcurrentHashMap<>();

for (Entry<List<String>, List<String>> entry : rangeToEndpoint.entrySet()) {
RingRange range = new RingRange(entry.getKey().toArray(new String[entry.getKey().size()]));
Expand All @@ -163,7 +162,7 @@ static Map<String, List<RingRange>> buildEndpointToRangeMap(Map<List<String>, Li
@VisibleForTesting
static Map<List<String>, List<RingRange>> buildReplicasToRangeMap(
Map<List<String>, List<String>> rangeToEndpoint) {
Map<List<String>, List<RingRange>> replicasToRange = Maps.newHashMap();
Map<List<String>, List<RingRange>> replicasToRange = new ConcurrentHashMap<>();

for (Entry<List<String>, List<String>> entry : rangeToEndpoint.entrySet()) {
RingRange range = new RingRange(entry.getKey().toArray(new String[entry.getKey().size()]));
Expand Down Expand Up @@ -369,7 +368,7 @@ Map<String, String> getDCsByNodeForRepairSegment(
ICassandraManagementProxy jmxConnection = clusterFacade.connect(cluster);
// when hosts are coming up or going down, this method can throw an UndeclaredThrowableException
Collection<String> nodes = clusterFacade.tokenRangeToEndpoint(cluster, keyspace, segment);
Map<String, String> dcByNode = Maps.newHashMap();
Map<String, String> dcByNode = new ConcurrentHashMap<>();
nodes.forEach(node -> dcByNode.put(node, EndpointSnitchInfoProxy.create(jmxConnection).getDataCenter(node)));
if (repairUnit.getDatacenters().isEmpty()) {
return dcByNode;
Expand All @@ -393,7 +392,7 @@ Map<String, String> getDCsByNodeForRepairSegment(
@VisibleForTesting
Map<String, RingRange> getClusterNodes(Cluster targetCluster, RepairUnit repairUnit) throws ReaperException {
ConcurrentHashMap<String, RingRange> nodesWithRanges = new ConcurrentHashMap<>();
Map<List<String>, List<String>> rangeToEndpoint = Maps.newHashMap();
Map<List<String>, List<String>> rangeToEndpoint = new ConcurrentHashMap<>();

try {
rangeToEndpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public IStorageDao initializeStorageBackend()
LOG.info("Initializing the database and performing schema migrations");

if ("memory".equalsIgnoreCase(config.getStorageType())) {
storage = new MemoryStorageFacade();
Preconditions.checkArgument(config.getPersistenceStoragePath() != null,
"persistenceStoragePath is required for memory storage type");
storage = new MemoryStorageFacade(config.getPersistenceStoragePath());
} else if (Lists.newArrayList("cassandra", "astra").contains(config.getStorageType())) {
CassandraStorageFacade.CassandraMode mode = config.getStorageType().equals("cassandra")
? CassandraStorageFacade.CassandraMode.CASSANDRA
Expand Down
Loading
Loading