Skip to content

Commit

Permalink
DynamoDBMasterMonitor: Don't publish MASTER_NULL on Dynamo failure (#696
Browse files Browse the repository at this point in the history
)
  • Loading branch information
crioux-stripe authored Jul 30, 2024
1 parent a61cf1d commit d265bfb
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.gradle.parallel=true
org.gradle.caching=false
org.gradle.configureondemand=true
org.gradle.jvmargs=-Xmx1G "-XX:MaxMetaspaceSize=384m"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.LockItem;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.server.core.master.MasterDescription;
Expand Down Expand Up @@ -68,28 +71,44 @@ public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor

private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();

private final Metrics metrics;

private final Counter noLockPresentCounter;
private final Counter lockDecodeFailedCounter;
private final Counter nullNextLeaderCounter;

/**
* Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector}
*/
public DynamoDBMasterMonitor() {
masterSubject = BehaviorSubject.create(MASTER_NULL);
final DynamoDBConfig conf = DynamoDBClientSingleton.getDynamoDBConf();
pollInterval = Duration.parse(conf.getDynamoDBLeaderHeartbeatDuration());
gracefulShutdown = Duration.parse(conf.getDynamoDBMonitorGracefulShutdownDuration());
lockClient = DynamoDBClientSingleton.getLockClient();
partitionKey = DynamoDBClientSingleton.getPartitionKey();
this(DynamoDBClientSingleton.getLockClient(),
DynamoDBClientSingleton.getPartitionKey(),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()),
Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration()));
}

public DynamoDBMasterMonitor(
AmazonDynamoDBLockClient lockClient,
String partitionKey,
Duration pollInterval,
Duration gracefulShutdown) {
masterSubject = BehaviorSubject.create();
masterSubject = BehaviorSubject.create(MASTER_NULL);
this.lockClient = lockClient;
this.partitionKey = partitionKey;
this.pollInterval = pollInterval;
this.gracefulShutdown = gracefulShutdown;

Metrics m = new Metrics.Builder()
.id("DynamoDBMasterMonitor")
.addCounter("no_lock_present")
.addCounter("lock_decode_failed")
.addCounter("null_next_leader")
.build();
this.metrics = MetricsRegistry.getInstance().registerAndGet(m);

this.noLockPresentCounter = metrics.getCounter("no_lock_present");
this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed");
this.nullNextLeaderCounter = metrics.getCounter("null_next_leader");
}

@Override
Expand Down Expand Up @@ -128,11 +147,19 @@ private void getCurrentLeader() {
if (optionalLock.isPresent()) {
final LockItem lock = optionalLock.get();
nextDescription = lock.getData().map(this::bytesToMaster).orElse(null);
logger.warn("failed to decode leader bytes");
this.lockDecodeFailedCounter.increment();
} else {
nextDescription = null;
logger.warn("no leader found");
this.noLockPresentCounter.increment();
}

if (nextDescription != null) {
updateLeader(nextDescription);
} else {
this.nullNextLeaderCounter.increment();
}
updateLeader(nextDescription);
}

private void updateLeader(@Nullable MasterDescription nextDescription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.mantisrx.extensions.dynamodb;

import static io.mantisrx.extensions.dynamodb.DynamoDBMasterMonitor.MASTER_NULL;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.*;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
TestSubscriber<MasterDescription> testSubscriber = new TestSubscriber<>();
m.getMasterObservable().subscribe(testSubscriber);
m.start();
assertEquals(m.getLatestMaster(), DynamoDBMasterMonitor.MASTER_NULL);
assertEquals(MASTER_NULL, m.getLatestMaster());
lockSupport.takeLock(lockKey, otherMaster);
await()
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
Expand All @@ -92,7 +93,7 @@ public void getCurrentLeader() throws JsonProcessingException, InterruptedExcept
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster));
testSubscriber.assertValues(otherMaster, thatMaster);
testSubscriber.assertValues(MASTER_NULL, otherMaster, thatMaster);
m.shutdown();
}

Expand Down Expand Up @@ -133,7 +134,7 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException
.atLeast(DynamoDBLockSupportRule.heartbeatDuration)
.pollDelay(DynamoDBLockSupportRule.heartbeatDuration)
.atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2))
.untilAsserted(() -> assertEquals(DynamoDBMasterMonitor.MASTER_NULL, m.getLatestMaster()));
.untilAsserted(() -> assertEquals(MASTER_NULL, m.getLatestMaster()));
lockSupport.releaseLock(lockKey);

m.shutdown();
Expand Down

0 comments on commit d265bfb

Please sign in to comment.