Skip to content

Commit

Permalink
fix: throw reject when SingleThreadExecutor drainTo in progress and q…
Browse files Browse the repository at this point in the history
…ueue is empty
  • Loading branch information
nodece committed Aug 25, 2024
1 parent 7c41204 commit cd0eeb9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.bookkeeper.common.util;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -54,6 +56,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe
private final LongAdder tasksRejected = new LongAdder();
private final LongAdder tasksFailed = new LongAdder();

private final int maxQueueCapacity;
private static final AtomicIntegerFieldUpdater<SingleThreadExecutor> waiterCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class, "waiterCount");
private volatile int waiterCount = 0;

enum State {
Running,
Shutdown,
Expand All @@ -80,6 +87,8 @@ public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, boolean reje
} else {
this.queue = new GrowableMpScArrayConsumerBlockingQueue<>();
}
this.maxQueueCapacity = maxQueueCapacity;

this.runner = tf.newThread(this);
this.state = State.Running;
this.rejectExecution = rejectExecution;
Expand Down Expand Up @@ -134,6 +143,9 @@ public void run() {

private boolean safeRunTask(Runnable r) {
try {
if (maxQueueCapacity > 0) {
waiterCountUpdater.decrementAndGet(this);
}
r.run();
tasksCompleted.increment();
} catch (Throwable t) {
Expand Down Expand Up @@ -162,7 +174,10 @@ public List<Runnable> shutdownNow() {
this.state = State.Shutdown;
this.runner.interrupt();
List<Runnable> remainingTasks = new ArrayList<>();
queue.drainTo(remainingTasks);
int n = queue.drainTo(remainingTasks);
if (maxQueueCapacity > 0) {
waiterCountUpdater.addAndGet(this, -n);
}
return remainingTasks;
}

Expand Down Expand Up @@ -204,6 +219,11 @@ public long getFailedTasksCount() {

@Override
public void execute(Runnable r) {
execute(r, null);
}

@VisibleForTesting
void execute(Runnable r, List<Runnable> runnableList) {
if (state != State.Running) {
throw new RejectedExecutionException("Executor is shutting down");
}
Expand All @@ -213,18 +233,30 @@ public void execute(Runnable r) {
queue.put(r);
tasksCount.increment();
} else {
if (queue.offer(r)) {
tasksCount.increment();
int delta = r != null ? 1 : runnableList.size();
validateQueueCapacity(delta);
if (r != null ? queue.offer(r) : queue.addAll(runnableList)) {
tasksCount.add(delta);
} else {
tasksRejected.increment();
throw new ExecutorRejectedException("Executor queue is full");
reject();
}
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Executor thread was interrupted", e);
}
}

private void validateQueueCapacity(int delta) {
if (maxQueueCapacity > 0 && waiterCountUpdater.addAndGet(this, delta) > maxQueueCapacity) {
reject();
}
}

private void reject() {
tasksRejected.increment();
throw new ExecutorRejectedException("Executor queue is full");
}

public void registerMetrics(StatsLogger statsLogger) {
// Register gauges
statsLogger.scopeLabel("thread", runner.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,61 @@ public void testRejectWhenQueueIsFull() throws Exception {
assertEquals(0, ste.getFailedTasksCount());
}

@Test
public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception {
@Cleanup("shutdownNow")
SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true);

CyclicBarrier barrier = new CyclicBarrier(10);
CountDownLatch startedLatch = new CountDownLatch(1);
List<Runnable> tasks = new ArrayList<>();

for (int i = 0; i < 10; i++) {
int n = i;
tasks.add(() -> {
if (n == 0) {
startedLatch.countDown();
} else {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// ignore
}
}
});
}
ste.execute(null, tasks);

// Wait until the first task is done.
try {
startedLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// Next task should go through, because the runner thread has already pulled out the first and second items
// from the queue.
List<Runnable> nextTasks = new ArrayList<>();
nextTasks.add(() -> {
});
nextTasks.add(() -> {
});
ste.execute(null, nextTasks);

// Now the queue is really full and should reject tasks
try {
ste.execute(() -> {
});
fail("should have rejected the task");
} catch (RejectedExecutionException e) {
// Expected
}

assertEquals(12, ste.getSubmittedTasksCount());
assertEquals(1, ste.getRejectedTasksCount());
assertEquals(0, ste.getFailedTasksCount());
}

@Test
public void testBlockWhenQueueIsFull() throws Exception {
@Cleanup("shutdown")
Expand Down

0 comments on commit cd0eeb9

Please sign in to comment.