Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TE connection improvement #564

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ CompletableFuture<TaskExecutorID> getTaskExecutorFor(
*/
CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(TaskExecutorID taskExecutorID);

CompletableFuture<Ack> reconnectGateway(TaskExecutorID taskExecutorID);

CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ConnectionFailedException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.NoResourceAvailableException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus;
Expand Down Expand Up @@ -209,7 +208,6 @@ public Receive createReceive() {
.match(ResourceOverviewRequest.class, this::onResourceOverviewRequest)
.match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest)
.match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest)
.match(TaskExecutorGatewayReconnectRequest.class, this::onTaskExecutorGatewayReconnectRequest)
.match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest)
.match(CheckDisabledTaskExecutors.class, this::findAndMarkDisabledTaskExecutors)
.match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry)
Expand Down Expand Up @@ -372,54 +370,6 @@ private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) {
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
try {
// let's try one more time by reconnecting with the gateway.
sender().tell(state.reconnect().join(), self());
} catch (Exception e1) {
metrics.incrementCounter(
ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE,
TagList.create(ImmutableMap.of(
"resourceCluster",
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
sender().tell(new Status.Failure(new ConnectionFailedException(e)), self());
}
}
}
}

private void onTaskExecutorGatewayReconnectRequest(TaskExecutorGatewayReconnectRequest request) {
log.info("Requesting to reconnect to TaskExecutor: {}", request);
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state == null) {
sender().tell(
new Status.Failure(new NullPointerException("Null TaskExecutor state: " + request.getTaskExecutorID())),
self());
} else {
try {
if (state.isRegistered()) {
state.reconnect().whenComplete((res, throwable) -> {
if (throwable != null) {
log.error("failed to reconnect to {}", request.getTaskExecutorID(), throwable);
}
});
sender().tell(Ack.getInstance(), self());
} else {
sender().tell(
new Status.Failure(
new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())),
self());
}
} catch (Exception e) {
metrics.incrementCounter(
ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE,
TagList.create(ImmutableMap.of(
"resourceCluster",
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
sender().tell(new Status.Failure(new ConnectionFailedException(e)), self());
}
}
}
Expand Down Expand Up @@ -921,13 +871,6 @@ static class TaskExecutorGatewayRequest {
ClusterID clusterID;
}

@Value
static class TaskExecutorGatewayReconnectRequest {
TaskExecutorID taskExecutorID;

ClusterID clusterID;
}

@Value
static class GetRegisteredTaskExecutorsRequest implements HasAttributes {
ClusterID clusterID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList;
Expand Down Expand Up @@ -212,17 +211,6 @@ public CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(
});
}

@Override
public CompletableFuture<Ack> reconnectGateway(
TaskExecutorID taskExecutorID) {
return
Patterns
.ask(resourceClusterManagerActor, new TaskExecutorGatewayReconnectRequest(taskExecutorID, clusterID),
askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}

@Override
public CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
Expand Down Expand Up @@ -144,8 +143,6 @@ public Receive createReceive() {
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayReconnectRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(DisableTaskExecutorsRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(AddNewJobArtifactsToCacheRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,24 @@ TaskExecutorRegistration getRegistration() {
}

protected CompletableFuture<TaskExecutorGateway> getGatewayAsync() {
if (this.registration == null || this.state == RegistrationState.Unregistered) {
throw new IllegalStateException("TE is unregistered");
}

if (this.gateway == null) {
throw new IllegalStateException("gateway is null");
}
return this.gateway;
}

protected CompletableFuture<TaskExecutorGateway> reconnect() {
this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
if (this.gateway.isCompletedExceptionally()) {
log.warn("gateway connection encountered error, reconnect: {}.", registration.getTaskExecutorAddress());
this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
}

return this.gateway;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)
event.getScheduleRequestEvent(),
event.getTaskExecutorID()))
.exceptionally(
throwable -> new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(), throwable))
throwable ->
new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(),
ExceptionUtils.stripCompletionException(throwable))
)
.whenCompleteAsync((res, err) ->
{
if (err == null) {
Expand All @@ -177,9 +180,15 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)

)
.exceptionally(
throwable -> new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(), throwable));
// Note: throwable is the wrapped completable error (inside is akka rpc actor selection
// error).
// On this error, we want to:
// 1) trigger rpc service reconnection (to fix the missing action).
// 2) re-schedule worker node with delay (to avoid a fast loop to exhaust idle TE pool).
throwable ->
event.getScheduleRequestEvent()
.onFailure(ExceptionUtils.stripCompletionException(throwable))
);
pipe(ackFuture, getContext().getDispatcher()).to(self());
}
} catch (Exception e) {
Expand All @@ -197,11 +206,17 @@ private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) {
if (event.getAttempt() >= this.maxScheduleRetries) {
log.error("Failed to submit the request {} because of ", event.getScheduleRequestEvent(), event.getThrowable());
} else {
log.error("Failed to submit the request {}; Retrying in {} because of ", event.getScheduleRequestEvent(), intervalBetweenRetries, event.getThrowable());
// honor the readyAt attribute from schedule request's rate limiter.
Duration timeout = Duration.ofMillis(
Math.max(
event.getScheduleRequestEvent().getRequest().getReadyAt() - Instant.now().toEpochMilli(),
intervalBetweenRetries.toMillis()));
log.error("Failed to submit the request {}; Retrying in {} because of ",
event.getScheduleRequestEvent(), timeout, event.getThrowable());
getTimers().startSingleTimer(
getSchedulingQueueKeyFor(event.getScheduleRequestEvent().getRequest().getWorkerId()),
event.onRetry(),
intervalBetweenRetries);
timeout);
}
}

Expand Down Expand Up @@ -240,23 +255,6 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE
event.getScheduleRequestEvent().getRequest().getWorkerId(),
event.getScheduleRequestEvent().getRequest().getStageNum(),
Throwables.getStackTraceAsString(event.throwable)));

try {
resourceCluster.reconnectGateway(event.getTaskExecutorID())
.whenComplete((res, throwable) -> {
if (throwable != null) {
log.error("Failed to request reconnect to gateway for {}", event.getTaskExecutorID(), throwable);
}
else {
log.debug("Acked from reconnection request for {}", event.getTaskExecutorID());
}
});
} catch (Exception e) {
log.warn(
"Failed to establish re-connection with the task executor {} on failed schedule request",
event.getTaskExecutorID(), e);
connectionFailures.increment();
}
}

private void onCancelRequestEvent(CancelRequestEvent event) {
Expand Down
Loading