Skip to content

Commit

Permalink
Reset the shared state even earlier when signaling completion in asyn…
Browse files Browse the repository at this point in the history
…c_rw_mutex

Reset the shared state before updating the head of the queue. Once the head of the queue is updated,
there's a small time window where continuations could be run inline, and resetting the shared state
in `done` could release the last reference to the shared state. Since we want to ensure that the
last reference is always released in a continuation, we move the resetting of the shared state to
happen before calling `done`. It's safe to do because if no continuations have been added, the
shared state is still kept alive by senders, and if continuations have been added, they'll also hold
references to the shared state.
  • Loading branch information
msimberg committed Jan 16, 2025
1 parent 5957ee8 commit 0af4fda
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,22 @@ namespace pika::execution::experimental {
{
if (next_state)
{
// We pass the ownership of the intrusive_ptr of the next state to the next
// state itself, so that it can choose when to release it. If we can avoid it,
// we don't want this shared state to to hold on to the reference longer than
// necessary.
// We are also not accessing this shared state directly anymore, so we reset
// the next_state before calling done to avoid continuations being triggered by
// this reference being the last reference (if done after swapping the head of
// the queue that happens in done). When resetting before the swap of the head
// of the queue, we also know this can't be the last reference since senders
// that reference the shared state can't be used without adding a continuation
// to the queue (a continuation will hold another reference to the shared
// state). Continuations can run inline, but that can only happen after the head
// of the queue has been swapped. In summary, there must be at least two
// references to the shared state at this point, so we can safely reset it early.
async_rw_mutex_shared_state_base* p = next_state.get();
p->done(std::move(next_state));

PIKA_ASSERT(next_state.use_count() > 1);
next_state.reset();

p->done();
}
}

Expand Down Expand Up @@ -180,7 +190,7 @@ namespace pika::execution::experimental {
current->continuation();
}

void done(shared_state_ptr_type p) noexcept
void done() noexcept
{
// `this` is not an async_rw_mutex_operation_state_base*, but is a known value to
// signal that the queue has been processed
Expand All @@ -190,10 +200,6 @@ namespace pika::execution::experimental {
// We have now successfully acquired the head of the queue, and signaled to other
// threads that they can't add any more items to the queue. We can now process the
// queue without further synchronization.
//
// We are also not accessing this shared state directly anymore, so we can
// reset p early.
p.reset();

// Because of the way operation states are linked together, they will be accessed in
// LIFO order (op_state_head points to the last operation state to be added, or
Expand Down Expand Up @@ -446,7 +452,7 @@ namespace pika::execution::experimental {
// value can be passed from the previous state to the next
// state.
if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); }
else { state->done(nullptr); }
else { state->done(); }
}

return {state};
Expand All @@ -462,7 +468,7 @@ namespace pika::execution::experimental {
// a previous state we set the next state so that the value can be
// passed from the previous state to the next state.
if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); }
else { state->done(nullptr); }
else { state->done(); }

return {state};
}
Expand Down Expand Up @@ -618,7 +624,7 @@ namespace pika::execution::experimental {

// Only the first access has no previous shared state.
if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); }
else { state->done(nullptr); }
else { state->done(); }
}

return {state};
Expand All @@ -634,7 +640,7 @@ namespace pika::execution::experimental {

// Only the first access has no previous shared state.
if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); }
else { state->done(nullptr); }
else { state->done(); }

return {state};
}
Expand Down

0 comments on commit 0af4fda

Please sign in to comment.