Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EclipseStore to memory storage for persistence #1484

Merged
merged 8 commits into from
Apr 11, 2024
11 changes: 10 additions & 1 deletion src/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<docker.directory>src/main/docker</docker.directory>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>
<management.api.version>0.1.74</management.api.version>
<eclipsestore.version>1.3.1</eclipsestore.version>

<!-- Cucumber Scenario Outline Examples listing released versions to test db migration against -->
<!-- anything added here must first be added as a artifact in build/plugins/plugins@maven-dependency-plugin -->
Expand Down Expand Up @@ -274,8 +276,15 @@
<dependency>
<groupId>io.k8ssandra</groupId>
<artifactId>datastax-mgmtapi-client-openapi</artifactId>
<version>0.1.70</version>
<version>${management.api.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.store</groupId>
<artifactId>storage-embedded</artifactId>
<version>${eclipsestore.version}</version>
</dependency>


<!--test scope -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private CryptographFactory cryptograph;

@JsonProperty
@Nullable
private String persistenceStoragePath;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -508,6 +512,15 @@ public void setCryptograph(@Nullable CryptographFactory cryptograph) {
this.cryptograph = cryptograph;
}

public void setPersistenceStoragePath(@Nullable String persistenceStoragePath) {
this.persistenceStoragePath = persistenceStoragePath;
}

@Nullable
public String getPersistenceStoragePath() {
return persistenceStoragePath;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public IStorageDao initializeStorageBackend()
LOG.info("Initializing the database and performing schema migrations");

if ("memory".equalsIgnoreCase(config.getStorageType())) {
storage = new MemoryStorageFacade();
Preconditions.checkArgument(config.getPersistenceStoragePath() != null,
"persistenceStoragePath is required for memory storage type");
storage = new MemoryStorageFacade(config.getPersistenceStoragePath());
} else if (Lists.newArrayList("cassandra", "astra").contains(config.getStorageType())) {
CassandraStorageFacade.CassandraMode mode = config.getStorageType().equals("cassandra")
? CassandraStorageFacade.CassandraMode.CASSANDRA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
package io.cassandrareaper.storage;

import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.DiagEventSubscription;
import io.cassandrareaper.core.PercentRepairedMetric;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairSegment;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.storage.cluster.IClusterDao;
import io.cassandrareaper.storage.cluster.MemoryClusterDao;
import io.cassandrareaper.storage.events.IEventsDao;
import io.cassandrareaper.storage.events.MemoryEventsDao;
import io.cassandrareaper.storage.memory.MemoryStorageRoot;
import io.cassandrareaper.storage.metrics.MemoryMetricsDao;
import io.cassandrareaper.storage.repairrun.IRepairRunDao;
import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao;
Expand All @@ -35,9 +41,18 @@
import io.cassandrareaper.storage.snapshot.ISnapshotDao;
import io.cassandrareaper.storage.snapshot.MemorySnapshotDao;

import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import org.eclipse.serializer.persistence.types.PersistenceFieldEvaluator;
import org.eclipse.store.storage.embedded.types.EmbeddedStorage;
import org.eclipse.store.storage.embedded.types.EmbeddedStorageManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,12 +63,22 @@
public final class MemoryStorageFacade implements IStorageDao {

private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class);

/** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects
* that sometimes use the transient keyword for some of their implementation's backing stores**/
private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR =
(clazz, field) -> !field.getName().startsWith("_");

private final EmbeddedStorageManager embeddedStorage;
private final MemoryStorageRoot memoryStorageRoot;
private final MemoryRepairSegmentDao memRepairSegment = new MemoryRepairSegmentDao(this);
private final MemoryRepairUnitDao memoryRepairUnitDao = new MemoryRepairUnitDao();
private final MemoryRepairRunDao memoryRepairRunDao = new MemoryRepairRunDao(memRepairSegment, memoryRepairUnitDao);
private final MemoryRepairScheduleDao memRepairScheduleDao = new MemoryRepairScheduleDao(memoryRepairUnitDao);
private final MemoryEventsDao memEventsDao = new MemoryEventsDao();
private final MemoryRepairUnitDao memoryRepairUnitDao = new MemoryRepairUnitDao(this);
private final MemoryRepairRunDao memoryRepairRunDao =
new MemoryRepairRunDao(this, memRepairSegment, memoryRepairUnitDao);
private final MemoryRepairScheduleDao memRepairScheduleDao = new MemoryRepairScheduleDao(this, memoryRepairUnitDao);
private final MemoryEventsDao memEventsDao = new MemoryEventsDao(this);
private final MemoryClusterDao memClusterDao = new MemoryClusterDao(
this,
memoryRepairUnitDao,
memoryRepairRunDao,
memRepairScheduleDao,
Expand All @@ -62,6 +87,33 @@ public final class MemoryStorageFacade implements IStorageDao {
private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao();
private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao();

public MemoryStorageFacade(String persistenceStoragePath) {
LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath);
this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath))
.onConnectionFoundation(
c -> {
c.setFieldEvaluatorPersistable(TRANSIENT_FIELD_EVALUATOR);
}
).createEmbeddedStorageManager();
this.embeddedStorage.start();
if (this.embeddedStorage.root() == null) {
LOG.info("Creating new data storage");
this.memoryStorageRoot = new MemoryStorageRoot();
this.embeddedStorage.setRoot(this.memoryStorageRoot);
} else {
LOG.info("Loading existing data from persistence storage");
this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root();
this.memoryStorageRoot.getClusters().entrySet().stream().forEach(entry -> {
Cluster cluster = entry.getValue();
LOG.info("Loaded cluster: {} / seeds: {}", cluster.getName(), cluster.getSeedHosts());
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: these lines can be removed, they were added for debugging purposes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I was thinking of leaving it in as it is a quick way to tell if the EclipseStore stuff is broken, but it is unnecessary noise in the logs at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: This still needs to be moved to debug level at least.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got caught up trying to resolve the serialization issues and forgot to address your earlier comments. I'll address them all in another commit

}
}

public MemoryStorageFacade() {
this("/tmp/" + UUID.randomUUID().toString());
}

@Override
public boolean isStorageConnected() {
// Just assuming the MemoryStorage is always functional when instantiated.
Expand Down Expand Up @@ -89,7 +141,7 @@ public void start() {

@Override
public void stop() {
// no-op
this.embeddedStorage.shutdown();
}

@Override
Expand Down Expand Up @@ -127,4 +179,122 @@ public IClusterDao getClusterDao() {
return this.memClusterDao;
}

}
private void persist(Object... objects) {
this.embeddedStorage.storeAll(objects);
this.embeddedStorage.storeRoot();
}

// Cluster operations
public Map<String, Cluster> getClusters() {
return this.memoryStorageRoot.getClusters();
}

public Cluster addCluster(Cluster cluster) {
Cluster newCluster = this.memoryStorageRoot.addCluster(cluster);
this.persist(memoryStorageRoot.getClusters());
return newCluster;
}

public Cluster removeCluster(String clusterName) {
Cluster cluster = this.memoryStorageRoot.removeCluster(clusterName);
this.persist(memoryStorageRoot.getClusters());
return cluster;
}

// RepairSchedule operations
public RepairSchedule addRepairSchedule(RepairSchedule schedule) {
RepairSchedule newSchedule = this.memoryStorageRoot.addRepairSchedule(schedule);
this.persist(this.memoryStorageRoot.getRepairSchedules());
return newSchedule;
}

public RepairSchedule removeRepairSchedule(UUID id) {
RepairSchedule schedule = this.memoryStorageRoot.removeRepairSchedule(id);
this.persist(this.memoryStorageRoot.getRepairSchedules());
return schedule;
}

public Optional<RepairSchedule> getRepairScheduleById(UUID id) {
return Optional.ofNullable(this.memoryStorageRoot.getRepairScheduleById(id));
}

public Collection<RepairSchedule> getRepairSchedules() {
return this.memoryStorageRoot.getRepairSchedules().values();
}

// RepairRun operations
public Collection<RepairRun> getRepairRuns() {
return this.memoryStorageRoot.getRepairRuns().values();
}

public RepairRun addRepairRun(RepairRun run) {
LOG.info("Adding RepairRun ID: {}", run.getId());
RepairRun newRun = this.memoryStorageRoot.addRepairRun(run);
this.persist(this.memoryStorageRoot.getRepairRuns());
return newRun;
}

public RepairRun removeRepairRun(UUID id) {
RepairRun run = this.memoryStorageRoot.removeRepairRun(id);
this.persist(this.memoryStorageRoot.getRepairRuns());
return run;
}

public Optional<RepairRun> getRepairRunById(UUID id) {
return Optional.ofNullable(this.memoryStorageRoot.getRepairRunById(id));
}

// RepairUnit operations
public Collection<RepairUnit> getRepairUnits() {
return this.memoryStorageRoot.getRepairUnits().values();
}

public RepairUnit addRepairUnit(Optional<RepairUnit.Builder> key, RepairUnit unit) {
RepairUnit newUnit = this.memoryStorageRoot.addRepairUnit(key.get(), unit);
this.persist(this.memoryStorageRoot.getRepairUnits(), this.memoryStorageRoot.getRepairUnitsByKey());
return newUnit;
}

public RepairUnit removeRepairUnit(Optional<RepairUnit.Builder> key, UUID id) {
RepairUnit unit = this.memoryStorageRoot.removeRepairUnit(key.get(), id);
this.persist(this.memoryStorageRoot.getRepairUnits(), this.memoryStorageRoot.getRepairUnitsByKey());
return unit;
}

public RepairUnit getRepairUnitById(UUID id) {
return this.memoryStorageRoot.getrRepairUnitById(id);
}

public RepairUnit getRepairUnitByKey(RepairUnit.Builder key) {
return this.memoryStorageRoot.getRepairUnitByKey(key);
}

// RepairSegment operations
public RepairSegment addRepairSegment(RepairSegment segment) {
LOG.info("Adding RepairSegment Token Ranges: {}",
Arrays.toString(segment.getTokenRange().getTokenRanges().toArray()));
final RepairSegment newSegment = this.memoryStorageRoot.addRepairSegment(segment);
this.persist(this.memoryStorageRoot.getRepairSegments());
return newSegment;
}

public RepairSegment removeRepairSegment(UUID id) {
RepairSegment segment = this.memoryStorageRoot.removeRepairSegment(id);
this.persist(this.memoryStorageRoot.getRepairSegments());
return segment;
}

public RepairSegment getRepairSegmentById(UUID id) {
return this.memoryStorageRoot.getRepairSegmentById(id);
}

public Collection<RepairSegment> getRepairSegmentsByRunId(UUID runId) {
return this.memoryStorageRoot.getRepairSegments().values().stream()
.filter(segment -> segment.getRunId().equals(runId)).collect(Collectors.toSet());
}

// RepairSubscription operations
public Map<UUID, DiagEventSubscription> getSubscriptionsById() {
return this.memoryStorageRoot.getSubscriptionsById();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.cassandrareaper.storage.cluster;

import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.storage.MemoryStorageFacade;
import io.cassandrareaper.storage.events.MemoryEventsDao;
import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao;
import io.cassandrareaper.storage.repairschedule.MemoryRepairScheduleDao;
Expand All @@ -27,39 +28,38 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;

public class MemoryClusterDao implements IClusterDao {
public final ConcurrentMap<String, Cluster> clusters = Maps.newConcurrentMap();
private final MemoryRepairUnitDao memoryRepairUnitDao;

private final MemoryRepairRunDao memoryRepairRunDao;
private final MemoryRepairScheduleDao memRepairScheduleDao;
private final MemoryStorageFacade storage;

private final MemoryEventsDao memEventsDao;

public MemoryClusterDao(MemoryRepairUnitDao memoryRepairUnitDao,
public MemoryClusterDao(MemoryStorageFacade storage,
MemoryRepairUnitDao memoryRepairUnitDao,
MemoryRepairRunDao memoryRepairRunDao,
MemoryRepairScheduleDao memRepairScheduleDao,
MemoryEventsDao memEventsDao) {
this.memoryRepairUnitDao = memoryRepairUnitDao;
this.memoryRepairRunDao = memoryRepairRunDao;
this.memRepairScheduleDao = memRepairScheduleDao;
this.memEventsDao = memEventsDao;
this.storage = storage;
}

@Override
public Collection<Cluster> getClusters() {
return clusters.values();
return storage.getClusters().values();
}

@Override
public boolean addCluster(Cluster cluster) {
assert addClusterAssertions(cluster);
Cluster existing = clusters.put(cluster.getName(), cluster);
Cluster existing = storage.addCluster(cluster);
return existing == null;
}

Expand Down Expand Up @@ -97,8 +97,9 @@ public boolean addClusterAssertions(Cluster cluster) {

@Override
public Cluster getCluster(String clusterName) {
Preconditions.checkArgument(clusters.containsKey(clusterName), "no such cluster: %s", clusterName);
return clusters.get(clusterName);
Preconditions.checkArgument(
storage.getClusters().containsKey(clusterName), "no such cluster: %s", clusterName);
return storage.getClusters().get(clusterName);
}

@Override
Expand All @@ -114,16 +115,16 @@ public Cluster deleteCluster(String clusterName) {
.filter(subscription -> subscription.getId().isPresent())
.forEach(subscription -> memEventsDao.deleteEventSubscription(subscription.getId().get()));

memoryRepairUnitDao.repairUnits.values().stream()
storage.getRepairUnits().stream()
.filter((unit) -> unit.getClusterName().equals(clusterName))
.forEach((unit) -> {
assert memoryRepairRunDao.getRepairRunsForUnit(
unit.getId()).isEmpty() : StringUtils.join(memoryRepairRunDao.getRepairRunsForUnit(unit.getId())
);
memoryRepairUnitDao.repairUnits.remove(unit.getId());
memoryRepairUnitDao.repairUnitsByKey.remove(unit.with());
storage.removeRepairUnit(Optional.ofNullable(unit.with()), unit.getId());
});

return clusters.remove(clusterName);
Cluster removed = storage.removeCluster(clusterName);
return removed;
}
}
Loading
Loading