Skip to content

Commit

Permalink
improve cache consistency of event api (again)
Browse files Browse the repository at this point in the history
  • Loading branch information
derklaro committed Dec 30, 2024
1 parent 90a8d54 commit 2486cb0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,4 @@ class CacheConfiguration {
.expireAfterWrite(90, TimeUnit.MINUTES)
.build());
}

/**
* Cache for ids from the internal event bus that were marked as removed. This cache prevents updates from being
* applied to snapshots while a remove frame was received previously.
*/
@Bean
public @Nonnull Cache removedIdsCache() {
return new CaffeineCache(
"removed_ids_cache",
Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
.build(),
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@

package tools.simrail.backend.api.eventbus.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import tools.simrail.backend.api.eventbus.dto.EventbusDispatchPostSnapshotDto;
import tools.simrail.backend.api.eventbus.dto.EventbusJourneySnapshotDto;
import tools.simrail.backend.api.eventbus.dto.EventbusServerSnapshotDto;
Expand All @@ -60,11 +61,10 @@ public final class SitSnapshotCache {
private final Map<String, EventbusJourneySnapshotDto> journeySnapshots;
private final Map<String, EventbusDispatchPostSnapshotDto> dispatchPostSnapshots;

private final Cache removedIdsCache;
private final Cache<String, Boolean> removedIdsCache;

@Autowired
SitSnapshotCache(
@Nonnull CacheManager cacheManager,
@Nonnull EventbusServerRepository serverRepository,
@Nonnull EventbusJourneyRepository journeyRepository,
@Nonnull EventbusDispatchPostRepository dispatchPostRepository
Expand All @@ -73,9 +73,6 @@ public final class SitSnapshotCache {
this.journeyRepository = journeyRepository;
this.dispatchPostRepository = dispatchPostRepository;

this.removedIdsCache = cacheManager.getCache("removed_ids_cache");
Assert.notNull(this.removedIdsCache, "removed ids cache not registered");

// cache active servers
var activeServers = serverRepository.findSnapshotsOfAllActiveServers();
this.serverSnapshots = activeServers.stream().collect(Collectors.toMap(
Expand All @@ -99,6 +96,22 @@ public final class SitSnapshotCache {
Function.identity(),
(left, _) -> left,
ConcurrentHashMap::new));

// construct the cache for the ids that were removed to not apply any updates anymore
// once a remove frame for them was received. after 2 minutes in the cache they are removed
// from the cache and the caching maps are cleaned up one more time to ensure that the id
// is actually no longer cached due to some race condition
this.removedIdsCache = Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
.evictionListener((key, _, cause) -> {
if (cause == RemovalCause.EXPIRED) {
var removedId = (String) key;
this.serverSnapshots.remove(removedId);
this.journeySnapshots.remove(removedId);
this.dispatchPostSnapshots.remove(removedId);
}
})
.build();
}

/**
Expand All @@ -110,14 +123,23 @@ private void markIdAsRemoved(@Nonnull String id) {
this.removedIdsCache.put(id, Boolean.TRUE);
}

/**
* Removes the marking as removed from the given id.
*
* @param id the id to no longer mark as removed.
*/
private void unmarkIdAsRemoved(@Nonnull String id) {
this.removedIdsCache.invalidate(id);
}

/**
* Get if the given id has been marked as removed previously.
*
* @param id the id to check.
* @return true if the given id has been marked as removed, false otherwise.
*/
private boolean isIdMarkedAsRemoved(@Nonnull String id) {
return this.removedIdsCache.get(id) != null;
return this.removedIdsCache.getIfPresent(id) != null;
}

/**
Expand All @@ -134,6 +156,11 @@ private boolean isIdMarkedAsRemoved(@Nonnull String id) {
return this.serverSnapshots.remove(frame.getServerId());
}

// remove the removal marking for the id of the given server if the action is an add
if (updateType == UpdateType.ADD) {
this.unmarkIdAsRemoved(frame.getServerId());
}

// resolve the server with the provided server id and cache it
// apply the frame as an update in case the server was updated
var serverToUpdate = this.serverSnapshots.computeIfAbsent(frame.getServerId(), _ -> {
Expand Down Expand Up @@ -170,6 +197,11 @@ private boolean isIdMarkedAsRemoved(@Nonnull String id) {
return this.journeySnapshots.remove(frame.getJourneyId());
}

// remove the removal marking for the id of the given journey if the action is an add
if (updateType == UpdateType.ADD) {
this.unmarkIdAsRemoved(frame.getJourneyId());
}

// resolve the journey with the provided journey id and cache it
// apply the frame as an update in case the journey was updated
var journeyToUpdate = this.journeySnapshots.computeIfAbsent(frame.getJourneyId(), _ -> {
Expand Down Expand Up @@ -208,6 +240,11 @@ private boolean isIdMarkedAsRemoved(@Nonnull String id) {
return this.dispatchPostSnapshots.remove(frame.getPostId());
}

// remove the removal marking for the id of the given dispatch post if the action is an add
if (updateType == UpdateType.ADD) {
this.unmarkIdAsRemoved(frame.getPostId());
}

// resolve the dispatch post with the provided dispatch post id and cache it
// apply the frame as an update in case the dispatch post was updated
var postToUpdate = this.dispatchPostSnapshots.computeIfAbsent(frame.getPostId(), _ -> {
Expand Down

0 comments on commit 2486cb0

Please sign in to comment.