Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 committed Nov 15, 2023
1 parent 65fe0bd commit 165a529
Showing 1 changed file with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class ResourceManagerGatewayCxn extends ExponentialBackoffAbstractScheduledServi
@Setter
private volatile ResourceClusterGateway gateway;

private final LongDynamicProperty heartBeatIntervalDp;
private final LongDynamicProperty heartBeatTimeoutDp;
private final LongDynamicProperty heartBeatIntervalInMsDp;
private final LongDynamicProperty heartBeatTimeoutInMsDp;

private final long registrationRetryInitialDelayMillis;
private final double registrationRetryMultiplier;
Expand All @@ -76,8 +76,8 @@ class ResourceManagerGatewayCxn extends ExponentialBackoffAbstractScheduledServi
int idx,
TaskExecutorRegistration taskExecutorRegistration,
ResourceClusterGateway gateway,
LongDynamicProperty heartBeatIntervalDp,
LongDynamicProperty heartBeatTimeoutDp,
LongDynamicProperty heartBeatIntervalInMsDp,
LongDynamicProperty heartBeatTimeoutInMsDp,
TaskExecutor taskExecutor,
int tolerableConsecutiveHeartbeatFailures,
long heartbeatRetryInitialDelayMillis,
Expand All @@ -92,8 +92,8 @@ class ResourceManagerGatewayCxn extends ExponentialBackoffAbstractScheduledServi
this.idx = idx;
this.taskExecutorRegistration = taskExecutorRegistration;
this.gateway = gateway;
this.heartBeatIntervalDp = heartBeatIntervalDp;
this.heartBeatTimeoutDp = heartBeatTimeoutDp;
this.heartBeatIntervalInMsDp = heartBeatIntervalInMsDp;
this.heartBeatTimeoutInMsDp = heartBeatTimeoutInMsDp;
this.taskExecutor = taskExecutor;
this.registrationRetryInitialDelayMillis = registrationRetryInitialDelayMillis;
this.registrationRetryMultiplier = registrationRetryMultiplier;
Expand Down Expand Up @@ -131,7 +131,7 @@ protected Scheduler scheduler() {
protected Schedule getNextSchedule() {
// no delay on first run.
return new Schedule(
hasRan ? ResourceManagerGatewayCxn.this.heartBeatIntervalDp.getValue() : 0,
hasRan ? ResourceManagerGatewayCxn.this.heartBeatIntervalInMsDp.getValue() : 0,
TimeUnit.MILLISECONDS);
}
};
Expand Down Expand Up @@ -168,7 +168,7 @@ private Ack registerTaskExecutor() throws ExecutionException, InterruptedExcepti
taskExecutorRegistrationCounter.increment();
return gateway
.registerTaskExecutor(taskExecutorRegistration)
.get(heartBeatTimeoutDp.getValue(), TimeUnit.MILLISECONDS);
.get(heartBeatTimeoutInMsDp.getValue(), TimeUnit.MILLISECONDS);
}

private void disconnectTaskExecutor() throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -177,7 +177,7 @@ private void disconnectTaskExecutor() throws ExecutionException, InterruptedExce
gateway.disconnectTaskExecutor(
new TaskExecutorDisconnection(taskExecutorRegistration.getTaskExecutorID(),
taskExecutorRegistration.getClusterID()))
.get(2 * heartBeatTimeoutDp.getValue(), TimeUnit.MILLISECONDS);
.get(2 * heartBeatTimeoutInMsDp.getValue(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Disconnection has failed", e);
taskExecutorDisconnectionFailureCounter.increment();
Expand All @@ -187,9 +187,7 @@ private void disconnectTaskExecutor() throws ExecutionException, InterruptedExce

@Override
protected void runIteration() throws Exception {
if (!hasRan) {
hasRan = true;
}
hasRan = true;

if (!registered && !alreadyRegistered.getState()) {
log.info("Trying to register with resource manager {}", gateway);
Expand All @@ -204,13 +202,13 @@ protected void runIteration() throws Exception {
}

try {
log.debug("Start TE HB with timeout: {}", heartBeatTimeoutDp.getValue());
log.debug("Start TE HB with timeout: {}", heartBeatTimeoutInMsDp.getValue());
taskExecutor.getCurrentReport()
.thenComposeAsync(report -> {
log.debug("Sending heartbeat to resource manager {} with report {}", gateway, report);
return gateway.heartBeatFromTaskExecutor(new TaskExecutorHeartbeat(taskExecutorRegistration.getTaskExecutorID(), taskExecutorRegistration.getClusterID(), report));
})
.get(heartBeatTimeoutDp.getValue(), TimeUnit.MILLISECONDS);
.get(heartBeatTimeoutInMsDp.getValue(), TimeUnit.MILLISECONDS);

// the heartbeat was successful, let's reset the counter and set the registered flag
registered = true;
Expand Down

0 comments on commit 165a529

Please sign in to comment.