Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODPUBSUB-199 #205

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.folio.rest.util.MessagingModuleFilter;

import java.util.List;
import java.util.Set;

/**
* Messaging module data access object
Expand Down Expand Up @@ -41,5 +42,5 @@ public interface MessagingModuleDao {
*
* @return list of Messaging Modules
*/
Future<List<MessagingModule>> getAll();
Future<Set<MessagingModule>> getAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -110,11 +111,11 @@ public Future<Void> delete(MessagingModuleFilter filter) {
}

@Override
public Future<List<MessagingModule>> getAll() {
public Future<Set<MessagingModule>> getAll() {
Promise<RowSet<Row>> promise = Promise.promise();
String preparedQuery = format(GET_ALL_SQL, MODULE_SCHEMA, TABLE_NAME);
pgClientFactory.getInstance().select(preparedQuery, promise);
return promise.future().map(this::mapResultSetToMessagingModuleList);
return promise.future().map(this::mapResultSetToMessagingModuleSet);
}

private Future<Boolean> delete(MessagingModuleFilter filter, AsyncResult<SQLConnection> sqlConnection) {
Expand Down Expand Up @@ -142,6 +143,14 @@ private List<MessagingModule> mapResultSetToMessagingModuleList(RowSet<Row> resu
.collect(Collectors.toList());
}

private Set<MessagingModule> mapResultSetToMessagingModuleSet(RowSet<Row> resultSet) {
return Stream.generate(resultSet.iterator()::next)
.limit(resultSet.size())
.map(this::mapRowJsonToMessagingModule)
.collect(Collectors.toSet());
}


private String buildWhereClause(MessagingModuleFilter filter) {
StringBuilder conditionBuilder = new StringBuilder("WHERE TRUE");
if (filter.getEventType() != null) {
Expand Down
80 changes: 62 additions & 18 deletions mod-pubsub-server/src/main/java/org/folio/services/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,102 @@

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.MessagingModuleDao;
import org.folio.rest.jaxrs.model.MessagingModule;
import org.folio.services.util.ClockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.commons.collections4.IterableUtils.isEmpty;
import static org.folio.services.util.ClockUtil.*;

/**
* In-memory storage for messaging modules
*/
@Component
public class Cache {
private static final String MESSAGING_MODULES_CACHE_KEY = "messaging_modules";
private static final Logger LOGGER = LogManager.getLogger();

private AsyncLoadingCache<String, Set<MessagingModule>> loadingCache;
@Value("${CACHE_EXPIRATION_TIMING}")
private long cacheExpirationTiming;
private Vertx vertx;
private LocalDateTime lastTimeLoaded;
private com.github.benmanes.caffeine.cache.Cache<String, String> subscriptions;
private com.github.benmanes.caffeine.cache.Cache<String, String> tenantToken;
private MessagingModuleDao messagingModuleDao;
private final ReentrantLock mutex = new ReentrantLock();

public Cache(@Autowired Vertx vertx, @Autowired MessagingModuleDao messagingModuleDao) {
this.messagingModuleDao = messagingModuleDao;
this.vertx = vertx;
this.loadingCache = Caffeine.newBuilder()
.executor(serviceExecutor -> vertx.runOnContext(ar -> serviceExecutor.run()))
.buildAsync(k -> new HashSet<>());
this.subscriptions = Caffeine.newBuilder().build();
this.tenantToken = Caffeine.newBuilder().build();
}

public Future<Set<MessagingModule>> getMessagingModules() {
Promise<Set<MessagingModule>> promise = Promise.promise();
loadingCache
.get(MESSAGING_MODULES_CACHE_KEY)
.whenComplete((messagingModules, throwable) -> {
if (throwable == null) {
if (isEmpty(messagingModules)) {
messagingModuleDao.getAll()
.map(messagingModules::addAll)
.onComplete(ar -> promise.complete(messagingModules));
} else {
promise.complete(messagingModules);
}
} else {
promise.fail(throwable);
}
});
return promise.future();
public Future<Set<MessagingModule>> getMessagingModules() {
Promise<Set<MessagingModule>> promise = Promise.promise();
loadingCache
.get(MESSAGING_MODULES_CACHE_KEY)
.whenComplete((messagingModules, throwable) -> {
if (throwable == null) {
if (isEmpty(messagingModules)) {
loadModulesToCache(messagingModules)
.onComplete(ar -> {
lastTimeLoaded = getLocalDateTime();
promise.complete(messagingModules);
});
} else {
HashSet<MessagingModule> modulesBeforeUpdate = new HashSet<>(messagingModules);
promise.complete(modulesBeforeUpdate);
if (mutex.tryLock() && (Duration.between(lastTimeLoaded, getLocalDateTime())
.toMillis() >= cacheExpirationTiming)) {
vertx.runOnContext(ignored ->
loadModulesToCache(messagingModules)
.onSuccess(ar ->
lastTimeLoaded = getLocalDateTime()
//TODO Test successful case(after some amount of time we publish the event with 201)
//TODO and on failure(add incorrect data to the db)
//TODO Question - how do we know in the tests that this piece of logic got executed and what's the result?
)
.onComplete(ar -> mutex.unlock())
.onFailure(ar -> {
LOGGER.error("Failed to load messaging modules asynchronously");
messagingModules.clear();
messagingModules.addAll(modulesBeforeUpdate);
})
);
}
}
} else {
promise.fail(throwable);
}
});
return promise.future();
}


private Future<Boolean> loadModulesToCache(Set<MessagingModule> messagingModules) {
return messagingModuleDao.getAll()
.map(messagingModules::addAll);
}

public boolean containsSubscription(String topic) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.folio.services.util;

import java.time.Clock;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;

/**
* A clock manager for safely getting and setting the time.
* <p>
* Provides management of the clock that is then exposed and used as the
* default clock for all methods within this utility.
* <p>
* Use these methods rather than using the now() methods for any given date and
* time related class.
* Failure to do so may result in the inability to properly perform tests.
*/
public class ClockUtil {
private static Clock clock = Clock.systemUTC();

private ClockUtil() {
throw new UnsupportedOperationException("Do not instantiate");
}

/**
* Set the clock assigned to the clock manager to a given clock.
*/
public static void setClock(Clock clock) {
if (clock == null) {
throw new IllegalArgumentException("clock cannot be null");
}

ClockUtil.clock = clock;
}

public static Clock getClock() {
return clock;
}

/**
* Get the current system time according to the clock manager.
*
* @return
* A LocalDateTime as if now() is called.
* Time is truncated to milliseconds.
*/
public static LocalDateTime getLocalDateTime() {
return LocalDateTime.now(clock).truncatedTo(ChronoUnit.MILLIS);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractRestTest {
private static final String KAFKA_HOST = "KAFKA_HOST";
private static final String KAFKA_PORT = "KAFKA_PORT";
private static final String OKAPI_URL_ENV = "OKAPI_URL";
public static final String CACHE_EXPIRATION_TIMING = "CACHE_EXPIRATION_TIMING";

private static final int PORT = NetworkUtils.nextFreePort();
protected static final String OKAPI_URL = "http://localhost:" + PORT;
Expand All @@ -74,6 +75,7 @@ public static void setUpClass(final TestContext context) throws Exception {
System.setProperty(OKAPI_URL_ENV, OKAPI_URL);
System.setProperty(SYSTEM_USER_NAME_ENV, SYSTEM_USER_NAME);
System.setProperty(SYSTEM_USER_PASSWORD_ENV, SYSTEM_USER_PASSWORD);
System.setProperty(CACHE_EXPIRATION_TIMING, String.valueOf(4000L));
deployVerticle(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@
import org.folio.rest.jaxrs.model.PublisherDescriptor;
import org.folio.rest.jaxrs.model.SubscriberDescriptor;
import org.folio.rest.jaxrs.model.SubscriptionDefinition;
import org.folio.services.util.ClockUtil;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;

@RunWith(VertxUnitRunner.class)
Expand Down Expand Up @@ -91,6 +99,37 @@ public void shouldPublishEventWithPayload() {
.statusCode(HttpStatus.SC_NO_CONTENT);
}

@Test
public void shouldSuccessfullyReloadCache() {
Instant instant = Instant.parse("2022-08-19T16:02:42.00Z");

ZoneId zoneId = ZoneId.of("Europe/Paris");

ClockUtil.setClock(Clock.fixed(instant, zoneId));
EventDescriptor eventDescriptor = postEventDescriptor(EVENT_DESCRIPTOR);
registerPublisher(eventDescriptor);
registerSubscriber(eventDescriptor);

RestAssured.given()
.spec(spec)
.body(EVENT.put("eventPayload", "something very important").encode())
.when()
.post(PUBLISH_PATH)
.then()
.statusCode(HttpStatus.SC_NO_CONTENT);

ClockUtil.setClock(Clock.fixed(Instant.ofEpochMilli(ClockUtil.getClock().instant().toEpochMilli()
+ Long.parseLong(System.getProperty(CACHE_EXPIRATION_TIMING))), zoneId));

RestAssured.given()
.spec(spec)
.body(EVENT.put("eventPayload", "something very important").encode())
.when()
.post(PUBLISH_PATH)
.then()
.statusCode(HttpStatus.SC_NO_CONTENT);
}

private void registerPublisher(EventDescriptor eventDescriptor) {
PublisherDescriptor publisherDescriptor = new PublisherDescriptor()
.withEventDescriptors(Collections.singletonList(eventDescriptor))
Expand Down