Skip to content

Commit

Permalink
prevent invalid cache state due to bad update frame order
Browse files Browse the repository at this point in the history
in case an update frame is received after a remove frame, the update frame will re-add the data into the cache after it was removed. this can lead to data being in the cache which is actually no longer present, so update frames received a short period after the remove frame are ignored
  • Loading branch information
derklaro committed Dec 29, 2024
1 parent 3a05396 commit 323da2f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,18 @@ class CacheConfiguration {
.expireAfterWrite(90, TimeUnit.MINUTES)
.build());
}

/**
* Cache for ids from the internal event bus that were marked as removed. This cache prevents updates from being
* applied to snapshots while a remove frame was received previously.
*/
@Bean
public @Nonnull Cache removedIdsCache() {
return new CaffeineCache(
"removed_ids_cache",
Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
.build(),
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import tools.simrail.backend.api.eventbus.dto.EventbusDispatchPostSnapshotDto;
import tools.simrail.backend.api.eventbus.dto.EventbusJourneySnapshotDto;
import tools.simrail.backend.api.eventbus.dto.EventbusServerSnapshotDto;
Expand All @@ -57,8 +60,11 @@ public final class SitSnapshotCache {
private final Map<String, EventbusJourneySnapshotDto> journeySnapshots;
private final Map<String, EventbusDispatchPostSnapshotDto> dispatchPostSnapshots;

private final Cache removedIdsCache;

@Autowired
SitSnapshotCache(
@Nonnull CacheManager cacheManager,
@Nonnull EventbusServerRepository serverRepository,
@Nonnull EventbusJourneyRepository journeyRepository,
@Nonnull EventbusDispatchPostRepository dispatchPostRepository
Expand All @@ -67,6 +73,9 @@ public final class SitSnapshotCache {
this.journeyRepository = journeyRepository;
this.dispatchPostRepository = dispatchPostRepository;

this.removedIdsCache = cacheManager.getCache("removed_ids_cache");
Assert.notNull(this.removedIdsCache, "removed ids cache not registered");

// cache active servers
var activeServers = serverRepository.findSnapshotsOfAllActiveServers();
this.serverSnapshots = activeServers.stream().collect(Collectors.toMap(
Expand All @@ -92,6 +101,25 @@ public final class SitSnapshotCache {
ConcurrentHashMap::new));
}

/**
* Marks the given id as removed for a short amount of time to prevent further updates from being applied.
*
* @param id the id to mark as removed.
*/
private void markIdAsRemoved(@Nonnull String id) {
this.removedIdsCache.put(id, Boolean.TRUE);
}

/**
* Get if the given id has been marked as removed previously.
*
* @param id the id to check.
* @return true if the given id has been marked as removed, false otherwise.
*/
private boolean isIdMarkedAsRemoved(@Nonnull String id) {
return this.removedIdsCache.get(id) != null;
}

/**
* Handles the update of the server represented with the given update frame.
*
Expand All @@ -102,19 +130,29 @@ public final class SitSnapshotCache {
// handle the remove of a server
var updateType = frame.getUpdateType();
if (updateType == UpdateType.REMOVE) {
this.markIdAsRemoved(frame.getServerId());
return this.serverSnapshots.remove(frame.getServerId());
}

// resolve the server with the provided server id and cache it
// apply the frame as an update in case the server was updated
var serverToUpdate = this.serverSnapshots.computeIfAbsent(frame.getServerId(), _ -> {
var serverId = UUID.fromString(frame.getServerId());
return this.serverRepository.findServerSnapshotById(serverId).orElse(null);
return this.serverRepository.findServerSnapshotById(serverId)
.filter(_ -> !this.isIdMarkedAsRemoved(frame.getServerId()))
.orElse(null);
});
if (serverToUpdate != null) {
serverToUpdate.applyUpdateFrame(frame);
}

// check if the id was marked as removed while the update/add was applied
// to prevent caching something that will never receive an update again
if (serverToUpdate != null && this.isIdMarkedAsRemoved(frame.getServerId())) {
this.serverSnapshots.remove(frame.getServerId());
return null;
}

return serverToUpdate;
}

Expand All @@ -128,19 +166,29 @@ public final class SitSnapshotCache {
// handle the remove of a journey
var updateType = frame.getUpdateType();
if (updateType == UpdateType.REMOVE) {
this.markIdAsRemoved(frame.getJourneyId());
return this.journeySnapshots.remove(frame.getJourneyId());
}

// resolve the journey with the provided journey id and cache it
// apply the frame as an update in case the journey was updated
var journeyToUpdate = this.journeySnapshots.computeIfAbsent(frame.getJourneyId(), _ -> {
var journeyId = UUID.fromString(frame.getJourneyId());
return this.journeyRepository.findJourneySnapshotById(journeyId).orElse(null);
return this.journeyRepository.findJourneySnapshotById(journeyId)
.filter(_ -> !this.isIdMarkedAsRemoved(frame.getJourneyId()))
.orElse(null);
});
if (journeyToUpdate != null) {
journeyToUpdate.applyUpdateFrame(frame);
}

// check if the id was marked as removed while the update/add was applied
// to prevent caching something that will never receive an update again
if (journeyToUpdate != null && this.isIdMarkedAsRemoved(frame.getJourneyId())) {
this.journeySnapshots.remove(frame.getJourneyId());
return null;
}

return journeyToUpdate;
}

Expand All @@ -156,19 +204,29 @@ public final class SitSnapshotCache {
// handle the remove of a dispatch post
var updateType = frame.getUpdateType();
if (updateType == UpdateType.REMOVE) {
this.markIdAsRemoved(frame.getPostId());
return this.dispatchPostSnapshots.remove(frame.getPostId());
}

// resolve the dispatch post with the provided dispatch post id and cache it
// apply the frame as an update in case the dispatch post was updated
var postToUpdate = this.dispatchPostSnapshots.computeIfAbsent(frame.getPostId(), _ -> {
var postId = UUID.fromString(frame.getPostId());
return this.dispatchPostRepository.findDispatchPostSnapshotById(postId).orElse(null);
return this.dispatchPostRepository.findDispatchPostSnapshotById(postId)
.filter(_ -> !this.isIdMarkedAsRemoved(frame.getPostId()))
.orElse(null);
});
if (postToUpdate != null) {
postToUpdate.applyUpdateFrame(frame);
}

// check if the id was marked as removed while the update/add was applied
// to prevent caching something that will never receive an update again
if (postToUpdate != null && this.isIdMarkedAsRemoved(frame.getPostId())) {
this.dispatchPostSnapshots.remove(frame.getPostId());
return null;
}

return postToUpdate;
}

Expand Down

0 comments on commit 323da2f

Please sign in to comment.