Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💥 [Breaking] Asyncify slot suppliers #2433

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -96,18 +98,19 @@ public ActivityTask poll() {
PollActivityTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

CompletableFuture<SlotPermit> future =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserveSlot is a user defined function so I would except any use case to be under a try block so exceptions don't leek out and crash the worker or cause undesirable behaviour

slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,22 @@ private void processQueue() {
QueuedLARequest request = null;
try {
request = requestQueue.take();

CompletableFuture<SlotPermit> future = slotSupplier.reserveSlot(request.data);
try {
slotPermit = slotSupplier.reserveSlot(request.data);
slotPermit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
} catch (ExecutionException e) {
log.error(
"Error reserving local activity slot, dropped activity id {}",
request.task.getActivityId(),
e);
continue;
}

request.task.getExecutionContext().setPermit(slotPermit);
afterReservedCallback.apply(request.task);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,18 +88,19 @@ public NexusTask poll() {
PollNexusTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

CompletableFuture<SlotPermit> future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,10 +49,10 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
publishSlotsMetric();
}

public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
SlotPermit p = inner.reserveSlot(createCtx(dat));
issuedSlots.incrementAndGet();
return p;
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationData dat) {
CompletableFuture<SlotPermit> future = inner.reserveSlot(createCtx(dat));
future.thenAccept(permit -> issuedSlots.incrementAndGet());
return future;
}

public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.WorkflowSlotInfo;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -123,17 +127,19 @@ public WorkflowPollTask(
public WorkflowTask poll() {
boolean isSuccessful = false;
SlotPermit permit;
CompletableFuture<SlotPermit> future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
permit = future.get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package io.temporal.worker.tuning;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
* This implementation of {@link SlotSupplier} provides a fixed number of slots backed by a
Expand All @@ -32,18 +35,83 @@
*/
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
private final int numSlots;
private final Semaphore executorSlotsSemaphore;
private final AsyncSemaphore executorSlotsSemaphore;

/**
* A simple version of an async semaphore. Unfortunately there's not any readily available
* properly licensed library I could find for this which is a bit shocking, but this
* implementation should be suitable for our needs
*/
static class AsyncSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
private int permits;

AsyncSemaphore(int initialPermits) {
this.permits = initialPermits;
}

/**
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
* otherwise returns a future that will be completed when a permit is released.
*/
public CompletableFuture<Void> acquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> waiter = new CompletableFuture<>();
waiters.add(waiter);
return waiter;
}
} finally {
lock.unlock();
}
}

public boolean tryAcquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return true;
}
return false;
} finally {
lock.unlock();
}
}

/**
* Release a permit. If there are waiting futures, completes the next one instead of
* incrementing the permit count.
*/
public void release() {
lock.lock();
try {
CompletableFuture<Void> waiter = waiters.poll();
if (waiter != null) {
waiter.complete(null);
} else {
permits++;
}
} finally {
lock.unlock();
}
}
}

public FixedSizeSlotSupplier(int numSlots) {
Preconditions.checkArgument(numSlots > 0, "FixedSizeSlotSupplier must have at least one slot");
this.numSlots = numSlots;
executorSlotsSemaphore = new Semaphore(numSlots);
executorSlotsSemaphore = new AsyncSemaphore(numSlots);
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.*;

/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
@Experimental
Expand All @@ -32,6 +33,18 @@ public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSuppl
private final ResourceBasedController resourceController;
private final ResourceBasedSlotOptions options;
private Instant lastSlotIssuedAt = Instant.EPOCH;
// For slot reservations that are waiting to re-check resource usage
private static final ScheduledExecutorService scheduler =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should consider accepting this in the constructor. Can have a default, but we should allow people to control thread creation if they want.

Executors.newScheduledThreadPool(
// Two threads seem needed here, so that reading PID decisions doesn't interfere overly
// with firing off scheduled tasks or one another.
2,
r -> {
Thread t = new Thread(r);
t.setName("ResourceBasedSlotSupplier.scheduler");
t.setDaemon(true);
return t;
});

/**
* Construct a slot supplier for workflow tasks with the given resource controller and options.
Expand Down Expand Up @@ -139,29 +152,43 @@ private ResourceBasedSlotSupplier(
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
while (true) {
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
return new SlotPermit();
} else {
Duration mustWaitFor;
try {
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
} catch (ArithmeticException e) {
mustWaitFor = Duration.ZERO;
}
if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
Thread.sleep(mustWaitFor.toMillis());
}

Optional<SlotPermit> permit = tryReserveSlot(ctx);
if (permit.isPresent()) {
return permit.get();
} else {
Thread.sleep(10);
}
}
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
return CompletableFuture.completedFuture(new SlotPermit());
}
return tryReserveSlot(ctx)
.map(CompletableFuture::completedFuture)
.orElseGet(() -> scheduleSlotAcquisition(ctx));
}

private CompletableFuture<SlotPermit> scheduleSlotAcquisition(SlotReserveContext<SI> ctx) {
Duration mustWaitFor;
try {
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
} catch (ArithmeticException e) {
mustWaitFor = Duration.ZERO;
}

CompletableFuture<Void> permitFuture;
if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
permitFuture =
CompletableFuture.supplyAsync(() -> null, delayedExecutor(mustWaitFor.toMillis()));
} else {
permitFuture = CompletableFuture.completedFuture(null);
}

// After the delay, try to reserve the slot
return permitFuture.thenCompose(
ignored -> {
Optional<SlotPermit> permit = tryReserveSlot(ctx);
// If we couldn't get a slot this time, delay for a short period and try again
return permit
.map(CompletableFuture::completedFuture)
.orElseGet(
() ->
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatGPT is telling me that cancellation does not propagate across orElseGet or thenCompose operators, but that doesn't seem right and I can't find this in the docs at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html. May need a test somehow confirming cancel after this orElseGet does actually prevent the delayed call from being invoked. If it doesn't work, I can make some suggestions I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works fine because the returned future is permitFuture which is thenComposed with the future from here. Added a test that confirms.

Copy link
Member

@cretz cretz Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that ChatGPT says that a cancel of the outer future does not propagate the cancel to the thenComposed future. Not sure I believe it because I haven't tested, but it does concern me. I would have to test Java behavior of cancel when the thenCompose has already run to create a new future. I may set aside some time to do this.

But I fear reading https://stackoverflow.com/questions/25417881/canceling-a-completablefuture-chain and poking around, there may need to be some other mechanism to make sure the outer completable future cancel propagates to cancelling the delayed executor. I couldn't tell if the test was doing that. I wouldn't be surprised if cancel is not hierarchical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceling a completable future is not a good way to tell the producer the result is no longer needed, it is to tell the consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 So I do not know of what a good way in Java to tell the implementer of something async that it is cancelled? I am guessing we use thread interruption for synchronous activity code today and disallow async/future-returning activity code? Any ideas here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the standard library would use Future for this like here https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledFuture.html. The other option is to return a special interface like SlotSupplierFuture interface that implements the CompletableFuture interface and another different "cancel" method that actually does what you want

});
}

@Override
Expand Down Expand Up @@ -190,4 +217,9 @@ public ResourceBasedController getResourceController() {
private Duration timeSinceLastSlotIssued() {
return Duration.between(lastSlotIssuedAt, Instant.now());
}

// Polyfill for Java 9 delayedExecutor
private static Executor delayedExecutor(long delay) {
return r -> scheduler.schedule(() -> scheduler.execute(r), delay, TimeUnit.MILLISECONDS);
}
}
Loading
Loading