Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💥 [Breaking] Asyncify slot suppliers #2433

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Sushisource
Copy link
Member

What was changed

Made the reserveSlot method on SlotSupplier interface async.

Note that, as it stands, this can actually slightly increase the number of threads used because the slot suppliers have been changed in a way that they will not block the caller (which is the whole point) but in order to do that, the resource based one at least needs a couple threads that it didn't before.

For the fixed size supplier, I think I've managed to come up with something that should always be non-blocking.

Why?

This is to support #1456 where the pollers themselves will be made async, and thus will be able to take advantage of async slot reservation as well.

Checklist

  1. Closes

  2. How was this tested:
    Existing tests

  3. Any docs updates needed?

@Sushisource Sushisource requested a review from a team as a code owner February 28, 2025 03:03
Comment on lines 45 to 46
* your implementation. You may want to catch it to perform any necessary cleanup, and then you
* should rethrow the exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then you should rethrow the exception

They can't rethrow without wrapping because the checked exception was removed from this interface. We should add back throws InterruptedException, though could consider throws Exception as a way of saying we have code to handle any exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Exception

@@ -32,6 +33,18 @@ public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSuppl
private final ResourceBasedController resourceController;
private final ResourceBasedSlotOptions options;
private Instant lastSlotIssuedAt = Instant.EPOCH;
// For slot reservations that are waiting to re-check resource usage
private static final ScheduledExecutorService scheduler =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should consider accepting this in the constructor. Can have a default, but we should allow people to control thread creation if they want.

.orElseGet(
() ->
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatGPT is telling me that cancellation does not propagate across orElseGet or thenCompose operators, but that doesn't seem right and I can't find this in the docs at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html. May need a test somehow confirming cancel after this orElseGet does actually prevent the delayed call from being invoked. If it doesn't work, I can make some suggestions I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works fine because the returned future is permitFuture which is thenComposed with the future from here. Added a test that confirms.

Copy link
Member

@cretz cretz Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that ChatGPT says that a cancel of the outer future does not propagate the cancel to the thenComposed future. Not sure I believe it because I haven't tested, but it does concern me. I would have to test Java behavior of cancel when the thenCompose has already run to create a new future. I may set aside some time to do this.

But I fear reading https://stackoverflow.com/questions/25417881/canceling-a-completablefuture-chain and poking around, there may need to be some other mechanism to make sure the outer completable future cancel propagates to cancelling the delayed executor. I couldn't tell if the test was doing that. I wouldn't be surprised if cancel is not hierarchical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceling a completable future is not a good way to tell the producer the result is no longer needed, it is to tell the consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 So I do not know of what a good way in Java to tell the implementer of something async that it is cancelled? I am guessing we use thread interruption for synchronous activity code today and disallow async/future-returning activity code? Any ideas here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the standard library would use Future for this like here https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledFuture.html. The other option is to return a special interface like SlotSupplierFuture interface that implements the CompletableFuture interface and another different "cancel" method that actually does what you want

*/
SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException;
CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
Copy link
Member

@cretz cretz Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;

I wonder if this is more flexible. That some implementations happen to use CompletableFuture doesn't affect us right? Or do we need CompletableFuture utilities besides tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's less flexible. You can't compose raw Futures or really do much of anything with them besides get them

Copy link
Member

@cretz cretz Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean more flexible for users. Obviously it is less flexible for the caller (us) of these to get a more abstract type, but if a user only has a Future, forcing a CompletableFuture on them is less flexible. But it's not a big deal if we need this response as a completable future.

* This function is called before polling for new tasks. Your implementation should return a
* Promise that is completed with a {@link SlotPermit} when one becomes available.
*
* <p>These futures may be cancelled if the worker is shutting down or otherwise abandons the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These futures may be cancelled if the worker is shutting down or otherwise abandons the

You are aware CompletableFuture cancellation does not propagate upstream correct?

@@ -96,18 +98,19 @@ public ActivityTask poll() {
PollActivityTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

CompletableFuture<SlotPermit> future =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserveSlot is a user defined function so I would except any use case to be under a try block so exceptions don't leek out and crash the worker or cause undesirable behaviour

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants