Skip to content

Commit

Permalink
DynamoDB Leader Monitor: On Next Should Always Provide MasterDescript…
Browse files Browse the repository at this point in the history
…ion (#692)

* DynamoDB Leader Monitor: On Next Should Always Provide MasterDescription
---------

Co-authored-by: Kevin Greenan <[email protected]>
  • Loading branch information
crioux-stripe and kmg-stripe authored Jul 17, 2024
1 parent 10c1dd8 commit 3431246
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
Expand All @@ -43,7 +42,7 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor

private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);

private static final MasterDescription MASTER_NULL =
public static final MasterDescription MASTER_NULL =
new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1L);
private final ThreadFactory monitorThreadFactory = r -> {
Thread thread = new Thread(r);
Expand All @@ -66,15 +65,14 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor
private final Duration gracefulShutdown;

private final BehaviorSubject<MasterDescription> masterSubject;
private final AtomicReference<MasterDescription> latestMaster = new AtomicReference<>();

private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();

/**
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
*/
public DynamoDBMasterMonitor() {
masterSubject = BehaviorSubject.create();
masterSubject = BehaviorSubject.create(MASTER_NULL);
final DynamoDBConfig conf = DynamoDBClientSingleton.getDynamoDBConf();
pollInterval = Duration.parse(conf.getDynamoDBLeaderHeartbeatDuration());
gracefulShutdown = Duration.parse(conf.getDynamoDBMonitorGracefulShutdownDuration());
Expand Down Expand Up @@ -138,13 +136,11 @@ private void getCurrentLeader() {
}

private void updateLeader(@Nullable MasterDescription nextDescription) {
final MasterDescription previousDescription = latestMaster.getAndSet(nextDescription);
final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
final MasterDescription next = (nextDescription == null) ? MASTER_NULL : nextDescription;
final MasterDescription prev =
(previousDescription == null) ? MASTER_NULL : previousDescription;
if (!prev.equals(next)) {
logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname());
masterSubject.onNext(nextDescription);
masterSubject.onNext(next);
}
}

Expand Down Expand Up @@ -178,6 +174,6 @@ public Observable<MasterDescription> getMasterObservable() {
@Override
@Nullable
public MasterDescription getLatestMaster() {
return latestMaster.get();
return Optional.ofNullable(masterSubject.getValue()).orElse(MASTER_NULL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -128,7 +129,17 @@ public void highAvailabilityServices() throws InterruptedException, IOException
.atMost(Duration.ofSeconds(3L))
.pollDelay(Duration.ofSeconds(1L))
.untilAsserted(() -> assertEquals(leaders[2], monitor.getLatestMaster()));
testSubscriber.assertValues(leaders);

// We can, depending on timing, sometimes get a MASTER_NULL value which is safe to ignore.
MasterDescription[] actualLeaders = testSubscriber.getOnNextEvents().stream()
.filter(md -> md != DynamoDBMasterMonitor.MASTER_NULL)
.collect(Collectors.toList())
.toArray(new MasterDescription[]{});

assertEquals(leaders.length, actualLeaders.length);
assertEquals(leaders[0], actualLeaders[0]);
assertEquals(leaders[1], actualLeaders[1]);
assertEquals(leaders[2], actualLeaders[2]);
monitor.shutdown();

dynamoDb.createKVTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package io.mantisrx.extensions.dynamodb;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.*;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -28,15 +27,12 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import rx.Observable;
import rx.observers.TestSubscriber;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -82,7 +78,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
TestSubscriber<MasterDescription> testSubscriber = new TestSubscriber<>();
m.getMasterObservable().subscribe(testSubscriber);
m.start();
assertNull(m.getLatestMaster());
assertEquals(m.getLatestMaster(), DynamoDBMasterMonitor.MASTER_NULL);
lockSupport.takeLock(lockKey, otherMaster);
await()
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
Expand Down Expand Up @@ -117,4 +113,35 @@ public void runShutdown() throws IOException {
verify(mockLockClient, times(1)).close();

}

@Test
public void monitorDoesNotReturnNull() throws IOException, InterruptedException {
final String lockKey = "mantis-leader";
final DynamoDBMasterMonitor m = new DynamoDBMasterMonitor(
lockSupport.getLockClient(),
lockKey,
DynamoDBLockSupportRule.heartbeatDuration,
GRACEFUL
);
TestSubscriber<MasterDescription> testSubscriber = new TestSubscriber<>();
m.getMasterObservable().subscribe(testSubscriber);
m.start();

// Write Null
lockSupport.takeLock(lockKey, null);
await()
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(DynamoDBMasterMonitor.MASTER_NULL, m.getLatestMaster()));
lockSupport.releaseLock(lockKey);

m.shutdown();

testSubscriber.assertNoTerminalEvent();
testSubscriber.assertNotCompleted();
testSubscriber.assertNoErrors();
Observable.from(testSubscriber.getOnNextEvents())
.forEach(Assert::assertNotNull);
}
}

0 comments on commit 3431246

Please sign in to comment.