Skip to content

Commit

Permalink
Fix Memory storage persistence so objects are reinstantiated correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
emerkle826 committed Mar 20, 2024
1 parent 0db0cf5 commit 57a03f1
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -167,7 +168,7 @@ public Builder withPartitioner(String partitioner) {

public Builder withSeedHosts(Set<String> seedHosts) {
Preconditions.checkArgument(!seedHosts.isEmpty());
this.seedHosts = seedHosts;
this.seedHosts = seedHosts.stream().collect(Collectors.toSet());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.cassandrareaper.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -115,6 +116,21 @@ public String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), id.toString());
}

public String toPrintableString() {
StringBuilder buf = new StringBuilder();
buf.append("ID: ").append(this.id)
.append(", Repair Unit ID: ").append(this.repairUnitId)
.append(", State: ").append(this.state)
.append(", Run History: ").append(Arrays.toString(this.runHistory.toArray()))
.append(", Creation Time: ").append(this.creationTime)
.append(", Pause Time: ").append(this.pauseTime)
.append(", Next Activation: ").append(this.nextActivation)
.append(", Last Run: ").append(this.lastRun)
.append(", Owner: ").append(this.owner)
.append(", Days between: ").append(this.daysBetween);
return buf.toString();
}

public enum State {
ACTIVE,
PAUSED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public int getTimeout() {
return timeout;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("ID: ").append(this.id)
.append(", Cluster name: ").append(this.clusterName)
.append(", Keyspace name: ").append(this.keyspaceName);
return buf.toString();
}

public Builder with() {
return new Builder(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
package io.cassandrareaper.storage;

import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.DiagEventSubscription;
import io.cassandrareaper.core.PercentRepairedMetric;
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairSegment;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.storage.cluster.IClusterDao;
import io.cassandrareaper.storage.cluster.MemoryClusterDao;
import io.cassandrareaper.storage.events.IEventsDao;
Expand All @@ -37,7 +42,12 @@
import io.cassandrareaper.storage.snapshot.MemorySnapshotDao;

import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import org.eclipse.store.storage.embedded.types.EmbeddedStorage;
Expand All @@ -52,8 +62,8 @@
public final class MemoryStorageFacade implements IStorageDao {

private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class);
public final MemoryStorageRoot memoryStorageRoot;
public final EmbeddedStorageManager embeddedStorage;
private final EmbeddedStorageManager embeddedStorage;
private final MemoryStorageRoot memoryStorageRoot;
private final MemoryRepairSegmentDao memRepairSegment = new MemoryRepairSegmentDao(this);
private final MemoryRepairUnitDao memoryRepairUnitDao = new MemoryRepairUnitDao(this);
private final MemoryRepairRunDao memoryRepairRunDao =
Expand All @@ -73,19 +83,20 @@ public final class MemoryStorageFacade implements IStorageDao {
public MemoryStorageFacade(String persistenceStoragePath) {
LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath);
this.embeddedStorage = EmbeddedStorage.start(Paths.get(persistenceStoragePath));
if (embeddedStorage.root() == null) {
if (this.embeddedStorage.root() == null) {
LOG.info("Creating new data storage");
this.memoryStorageRoot = new MemoryStorageRoot();
embeddedStorage.setRoot(this.memoryStorageRoot);
this.embeddedStorage.setRoot(this.memoryStorageRoot);
} else {
LOG.info("Loading existing data from persistence storage");
this.memoryStorageRoot = (MemoryStorageRoot) embeddedStorage.root();
this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root();
LOG.info("Loaded {} clusters: {}",
memoryStorageRoot.getClusters().size(), memoryStorageRoot.getClusters().keySet());
memoryStorageRoot.getClusters().entrySet().stream().forEach(entry -> {
this.memoryStorageRoot.getClusters().size(), this.memoryStorageRoot.getClusters().keySet());
this.memoryStorageRoot.getClusters().entrySet().stream().forEach(entry -> {
Cluster cluster = entry.getValue();
LOG.info("Loaded cluster: {} / seeds: {}", cluster.getName(), cluster.getSeedHosts());
});
LOG.info("MemoryStorageRoot: {}", this.memoryStorageRoot);
}
}

Expand All @@ -100,17 +111,17 @@ public boolean isStorageConnected() {
}

private boolean addClusterAssertions(Cluster cluster) {
return memClusterDao.addClusterAssertions(cluster);
return this.memClusterDao.addClusterAssertions(cluster);
}

@Override
public List<PercentRepairedMetric> getPercentRepairedMetrics(String clusterName, UUID repairScheduleId, Long since) {
return memMetricsDao.getPercentRepairedMetrics(clusterName, repairScheduleId, since);
return this.memMetricsDao.getPercentRepairedMetrics(clusterName, repairScheduleId, since);
}

@Override
public void storePercentRepairedMetric(PercentRepairedMetric metric) {
memMetricsDao.storePercentRepairedMetric(metric);
this.memMetricsDao.storePercentRepairedMetric(metric);
}

@Override
Expand All @@ -120,7 +131,7 @@ public void start() {

@Override
public void stop() {
embeddedStorage.shutdown();
this.embeddedStorage.shutdown();
}

@Override
Expand Down Expand Up @@ -158,11 +169,137 @@ public IClusterDao getClusterDao() {
return this.memClusterDao;
}

public synchronized void persistChanges() {
LOG.info("Persisting changes to storage");
MemoryStorageRoot root = (MemoryStorageRoot) embeddedStorage.root();
this.embeddedStorage.setRoot(root);
embeddedStorage.storeRoot();
private void persist(Object... objects) {
this.embeddedStorage.storeAll(objects);
this.embeddedStorage.storeRoot();
}

// Cluster operations
public Map<String, Cluster> getClusters() {
return this.memoryStorageRoot.getClusters();
}

public Cluster addCluster(Cluster cluster) {
Cluster newCluster = this.memoryStorageRoot.addCluster(cluster);
this.persist(memoryStorageRoot.getClusters());
return newCluster;
}

public Cluster removeCluster(String clusterName) {
Cluster cluster = this.memoryStorageRoot.removeCluster(clusterName);
this.persist(memoryStorageRoot.getClusters());
return cluster;
}

// RepairSchedule operations
public RepairSchedule addRepairSchedule(RepairSchedule schedule) {
RepairSchedule newSchedule = this.memoryStorageRoot.addRepairSchedule(schedule);
this.persist(this.memoryStorageRoot.getRepairSchedules());
return newSchedule;
}

public RepairSchedule removeRepairSchedule(UUID id) {
RepairSchedule schedule = this.memoryStorageRoot.removeRepairSchedule(id);
this.persist(this.memoryStorageRoot.getRepairSchedules());
return schedule;
}

public Optional<RepairSchedule> getRepairScheduleById(UUID id) {
return Optional.ofNullable(this.memoryStorageRoot.getRepairScheduleById(id));
}

public Collection<RepairSchedule> getRepairSchedules() {
return this.memoryStorageRoot.getRepairSchedules().values();
}

// RepairRun operations
public Collection<RepairRun> getRepairRuns() {
return this.memoryStorageRoot.getRepairRuns().values();
}

public RepairRun addRepairRun(RepairRun run) {
RepairRun newRun = this.memoryStorageRoot.addRepairRun(run);
this.persist(this.memoryStorageRoot.getRepairRuns());
return newRun;
}

public RepairRun removeRepairRun(UUID id) {
RepairRun run = this.memoryStorageRoot.removeRepairRun(id);
this.persist(this.memoryStorageRoot.getRepairRuns());
return run;
}

public Optional<RepairRun> getRepairRunById(UUID id) {
return Optional.ofNullable(this.memoryStorageRoot.getRepairRunById(id));
}
}

// RepairUnit operations
public Collection<RepairUnit> getRepairUnits() {
return this.memoryStorageRoot.getRepairUnits().values();
}

public RepairUnit addRepairUnit(Optional<RepairUnit.Builder> key, RepairUnit unit) {
RepairUnit newUnit = this.memoryStorageRoot.addRepairUnit(key.get(), unit);
this.persist(this.memoryStorageRoot.getRepairUnits(), this.memoryStorageRoot.getRepairUnitsByKey());
return newUnit;
}

public RepairUnit removeRepairUnit(Optional<RepairUnit.Builder> key, UUID id) {
RepairUnit unit = this.memoryStorageRoot.removeRepairUnit(key.get(), id);
this.persist(this.memoryStorageRoot.getRepairUnits(), this.memoryStorageRoot.getRepairUnitsByKey());
return unit;
}

public RepairUnit getRepairUnitById(UUID id) {
return this.memoryStorageRoot.getrRepairUnitById(id);
}

public RepairUnit getRepairUnitByKey(RepairUnit.Builder key) {
return this.memoryStorageRoot.getRepairUnitByKey(key);
}

// RepairSegment operations
public RepairSegment addRepairSegment(RepairSegment segment) {
final RepairSegment newSegment = this.memoryStorageRoot.addRepairSegment(segment);
// also add the segment by RunId
UUID repairSegmentRunId = segment.getRunId();
LinkedHashMap<UUID, RepairSegment> segmentsByRunId =
this.memoryStorageRoot.getRepairSegmentsByRunId().get(repairSegmentRunId);
if (segmentsByRunId == null) {
segmentsByRunId = new LinkedHashMap<UUID, RepairSegment>();
this.memoryStorageRoot.getRepairSegmentsByRunId().put(repairSegmentRunId, segmentsByRunId);
}
segmentsByRunId.put(segment.getId(), segment);
this.persist(this.memoryStorageRoot.getRepairSegments(), this.memoryStorageRoot.getRepairSegmentsByRunId());
return newSegment;
}

public RepairSegment removeRepairSegment(UUID id) {
RepairSegment segment = this.memoryStorageRoot.removeRepairSegment(id);
// also remove the segment from the byRunId map
UUID repairSegmentRunId = segment.getRunId();
LinkedHashMap<UUID, RepairSegment> segmentsByRunId =
this.memoryStorageRoot.getRepairSegmentsByRunId().get(repairSegmentRunId);
if (segmentsByRunId != null) {
segmentsByRunId.remove(segment.getId());
}
this.persist(this.memoryStorageRoot.getRepairSegments(), this.memoryStorageRoot.getRepairSegmentsByRunId());
return segment;
}

public RepairSegment getRepairSegmentById(UUID id) {
return this.memoryStorageRoot.getRepairSegmentById(id);
}

public Collection<RepairSegment> getRepairSegmentsByRunId(UUID runId) {
if (this.memoryStorageRoot.getRepairSegmentsByRunId().containsKey(runId)) {
return this.memoryStorageRoot.getRepairSegmentsByRunId().get(runId).values();
}
return Collections.EMPTY_LIST;
}

// RepairSubscription operations
public Map<UUID, DiagEventSubscription> getSubscriptionsById() {
return this.memoryStorageRoot.getSubscriptionsById();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ public MemoryClusterDao(MemoryStorageFacade storage,

@Override
public Collection<Cluster> getClusters() {
return storage.memoryStorageRoot.getClusters().values();
return storage.getClusters().values();
}

@Override
public boolean addCluster(Cluster cluster) {
assert addClusterAssertions(cluster);
Cluster existing = storage.memoryStorageRoot.addCluster(cluster);
storage.embeddedStorage.storeAll(storage.memoryStorageRoot.getClusters());
storage.persistChanges();
Cluster existing = storage.addCluster(cluster);
return existing == null;
}

Expand Down Expand Up @@ -100,8 +98,8 @@ public boolean addClusterAssertions(Cluster cluster) {
@Override
public Cluster getCluster(String clusterName) {
Preconditions.checkArgument(
storage.memoryStorageRoot.getClusters().containsKey(clusterName), "no such cluster: %s", clusterName);
return storage.memoryStorageRoot.getClusters().get(clusterName);
storage.getClusters().containsKey(clusterName), "no such cluster: %s", clusterName);
return storage.getClusters().get(clusterName);
}

@Override
Expand All @@ -117,18 +115,16 @@ public Cluster deleteCluster(String clusterName) {
.filter(subscription -> subscription.getId().isPresent())
.forEach(subscription -> memEventsDao.deleteEventSubscription(subscription.getId().get()));

storage.memoryStorageRoot.repairUnits.values().stream()
storage.getRepairUnits().stream()
.filter((unit) -> unit.getClusterName().equals(clusterName))
.forEach((unit) -> {
assert memoryRepairRunDao.getRepairRunsForUnit(
unit.getId()).isEmpty() : StringUtils.join(memoryRepairRunDao.getRepairRunsForUnit(unit.getId())
);
storage.memoryStorageRoot.repairUnits.remove(unit.getId());
storage.memoryStorageRoot.repairUnitsByKey.remove(unit.with());
storage.removeRepairUnit(Optional.ofNullable(unit.with()), unit.getId());
});

Cluster removed = storage.memoryStorageRoot.removeCluster(clusterName);
storage.persistChanges();
Cluster removed = storage.removeCluster(clusterName);
return removed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public MemoryEventsDao(MemoryStorageFacade storage) {

@Override
public Collection<DiagEventSubscription> getEventSubscriptions() {
return storage.memoryStorageRoot.subscriptionsById.values();
return storage.getSubscriptionsById().values();
}

@Override
public Collection<DiagEventSubscription> getEventSubscriptions(String clusterName) {
Preconditions.checkNotNull(clusterName);
Collection<DiagEventSubscription> ret = new ArrayList<DiagEventSubscription>();
for (DiagEventSubscription sub : storage.memoryStorageRoot.subscriptionsById.values()) {
for (DiagEventSubscription sub : storage.getSubscriptionsById().values()) {
if (sub.getCluster().equals(clusterName)) {
ret.add(sub);
}
Expand All @@ -52,24 +52,22 @@ public Collection<DiagEventSubscription> getEventSubscriptions(String clusterNam

@Override
public DiagEventSubscription getEventSubscription(UUID id) {
if (storage.memoryStorageRoot.subscriptionsById.containsKey(id)) {
return storage.memoryStorageRoot.subscriptionsById.get(id);
if (storage.getSubscriptionsById().containsKey(id)) {
return storage.getSubscriptionsById().get(id);
}
throw new IllegalArgumentException("No event subscription with id " + id);
}

@Override
public DiagEventSubscription addEventSubscription(DiagEventSubscription subscription) {
Preconditions.checkArgument(subscription.getId().isPresent());
storage.memoryStorageRoot.subscriptionsById.put(subscription.getId().get(), subscription);
storage.persistChanges();
storage.getSubscriptionsById().put(subscription.getId().get(), subscription);
return subscription;
}

@Override
public boolean deleteEventSubscription(UUID id) {
boolean result = storage.memoryStorageRoot.subscriptionsById.remove(id) != null;
storage.persistChanges();
boolean result = storage.getSubscriptionsById().remove(id) != null;
return result;
}
}
Loading

0 comments on commit 57a03f1

Please sign in to comment.