Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💥 [Breaking] Asyncify slot suppliers #2433

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -96,18 +98,19 @@ public ActivityTask poll() {
PollActivityTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

CompletableFuture<SlotPermit> future =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserveSlot is a user defined function so I would except any use case to be under a try block so exceptions don't leek out and crash the worker or cause undesirable behaviour

slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,22 @@ private void processQueue() {
QueuedLARequest request = null;
try {
request = requestQueue.take();

CompletableFuture<SlotPermit> future = slotSupplier.reserveSlot(request.data);
try {
slotPermit = slotSupplier.reserveSlot(request.data);
slotPermit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
} catch (ExecutionException e) {
log.error(
"Error reserving local activity slot, dropped activity id {}",
request.task.getActivityId(),
e);
continue;
}

request.task.getExecutionContext().setPermit(slotPermit);
afterReservedCallback.apply(request.task);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,18 +88,19 @@ public NexusTask poll() {
PollNexusTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

CompletableFuture<SlotPermit> future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,10 +49,15 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
publishSlotsMetric();
}

public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
SlotPermit p = inner.reserveSlot(createCtx(dat));
issuedSlots.incrementAndGet();
return p;
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationData dat) {
CompletableFuture<SlotPermit> future = null;
try {
future = inner.reserveSlot(createCtx(dat));
} catch (Exception e) {
throw new RuntimeException(e);
}
future.thenAccept(permit -> issuedSlots.incrementAndGet());
return future;
}

public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.WorkflowSlotInfo;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -123,17 +127,19 @@ public WorkflowPollTask(
public WorkflowTask poll() {
boolean isSuccessful = false;
SlotPermit permit;
CompletableFuture<SlotPermit> future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package io.temporal.worker.tuning;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
* This implementation of {@link SlotSupplier} provides a fixed number of slots backed by a
Expand All @@ -32,18 +35,83 @@
*/
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
private final int numSlots;
private final Semaphore executorSlotsSemaphore;
private final AsyncSemaphore executorSlotsSemaphore;

/**
* A simple version of an async semaphore. Unfortunately there's not any readily available
* properly licensed library I could find for this which is a bit shocking, but this
* implementation should be suitable for our needs
*/
static class AsyncSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
private int permits;

AsyncSemaphore(int initialPermits) {
this.permits = initialPermits;
}

/**
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
* otherwise returns a future that will be completed when a permit is released.
*/
public CompletableFuture<Void> acquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> waiter = new CompletableFuture<>();
waiters.add(waiter);
return waiter;
}
} finally {
lock.unlock();
}
}

public boolean tryAcquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return true;
}
return false;
} finally {
lock.unlock();
}
}

/**
* Release a permit. If there are waiting futures, completes the next one instead of
* incrementing the permit count.
*/
public void release() {
lock.lock();
try {
CompletableFuture<Void> waiter = waiters.poll();
if (waiter != null) {
waiter.complete(null);
} else {
permits++;
}
} finally {
lock.unlock();
}
}
}

public FixedSizeSlotSupplier(int numSlots) {
Preconditions.checkArgument(numSlots > 0, "FixedSizeSlotSupplier must have at least one slot");
this.numSlots = numSlots;
executorSlotsSemaphore = new Semaphore(numSlots);
executorSlotsSemaphore = new AsyncSemaphore(numSlots);
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
}

@Override
Expand Down
Loading
Loading