Skip to content

Commit

Permalink
Added Semaphore to limit concurrent task execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mz1999 committed Oct 31, 2023
1 parent ea8aa6a commit 06a32b9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ jobs:

strategy:
matrix:
java_version: [ 21-ea ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
distribution: 'zulu'
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
- name: Maven Build
run: |
Expand All @@ -48,15 +48,15 @@ jobs:

strategy:
matrix:
java_version: [ 21-ea ]
java_version: [ 21 ]

steps:
- name: Checkout for build
uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
distribution: 'zulu'
distribution: 'temurin'
java-version: ${{ matrix.java_version }}
- name: Maven Build
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand All @@ -21,9 +23,10 @@ public class VirtualThreadExecutorService extends AbstractExecutorService implem
private static final Logger logger = Grizzly.logger(VirtualThreadExecutorService.class);

private final ExecutorService internalExecutorService;
private Semaphore poolSemaphore;

public static VirtualThreadExecutorService createInstance() {
return createInstance(ThreadPoolConfig.defaultConfig().setPoolName("Grizzly-virt-"));
return createInstance(ThreadPoolConfig.defaultConfig().setMaxPoolSize(-1).setPoolName("Grizzly-virt-"));
}

public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg) {
Expand All @@ -33,28 +36,30 @@ public static VirtualThreadExecutorService createInstance(ThreadPoolConfig cfg)

protected VirtualThreadExecutorService(ThreadPoolConfig cfg) {
internalExecutorService = Executors.newThreadPerTaskExecutor(getThreadFactory(cfg));
if (cfg.getMaxPoolSize() > 0) {
poolSemaphore = new Semaphore(cfg.getMaxPoolSize());
} else {
poolSemaphore = new Semaphore(Integer.MAX_VALUE);
}
}

private ThreadFactory getThreadFactory(ThreadPoolConfig threadPoolConfig) {

var prefix = threadPoolConfig.getPoolName() + "-";

// virtual threads factory
final ThreadFactory factory = Thread.ofVirtual()
.name(prefix, 0L)
.uncaughtExceptionHandler(this)
.factory();

return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = factory.newThread(r);
final ClassLoader initial = threadPoolConfig.getInitialClassLoader();
if (initial != null) {
thread.setContextClassLoader(initial);
}
return thread;
return r -> {
Thread thread = factory.newThread(r);
final ClassLoader initial = threadPoolConfig.getInitialClassLoader();
if (initial != null) {
thread.setContextClassLoader(initial);
}
return thread;
};
}

Expand Down Expand Up @@ -85,7 +90,17 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE

@Override
public void execute(Runnable command) {
internalExecutorService.execute(command);
if (poolSemaphore.tryAcquire()) {
internalExecutorService.execute(() -> {
try {
command.run();
} finally {
poolSemaphore.release();
}
});
} else {
throw new RejectedExecutionException("Too Many Concurrent Requests");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package org.glassfish.grizzly;

import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import org.glassfish.grizzly.threadpool.VirtualThreadExecutorService;
import org.junit.Assert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class VirtualThreadExecutorServiceTest extends GrizzlyTestCase {

Expand All @@ -17,36 +22,51 @@ public void testCreateInstance() throws Exception {
public void testAwaitTermination() throws Exception {
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance();
final int tasks = 2000;
runTasks(r, tasks);
doTest(r, tasks);
r.shutdown();
assertTrue(r.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(r.isTerminated());
}

private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception {
final CountDownLatch cl = new CountDownLatch(tasks);
while (tasks-- > 0) {
r.execute(new Runnable() {
@Override
public void run() {
cl.countDown();
public void testQueueLimit() throws Exception {
int poolSize = 10;
ThreadPoolConfig config = ThreadPoolConfig.defaultConfig().setMaxPoolSize(poolSize);
VirtualThreadExecutorService r = VirtualThreadExecutorService.createInstance(config);

CyclicBarrier start = new CyclicBarrier(poolSize + 1);
CyclicBarrier hold = new CyclicBarrier(poolSize + 1);
AtomicInteger result = new AtomicInteger();
for (int i = 0; i < poolSize; i++) {
int taskId = i;
r.execute(() -> {
try {
System.out.println("task " + taskId + " is running");
start.await();
hold.await();
result.getAndIncrement();
} catch (Exception e) {
}
});
}
assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS));
start.await();
// Too Many Concurrent Requests
Assert.assertThrows(RejectedExecutionException.class, () -> r.execute(() -> System.out.println("cannot be executed")));
hold.await();
while (true) {
if (result.intValue() == poolSize) {
System.out.println("All tasks have been completed.");
break;
}
}
// The executor can accept new tasks
doTest(r, poolSize);
}

private void runTasks(VirtualThreadExecutorService r, int tasks) throws Exception {
private void doTest(VirtualThreadExecutorService r, int tasks) throws Exception {
final CountDownLatch cl = new CountDownLatch(tasks);
while (tasks-- > 0) {
r.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (Exception ignore) {
}
}
});
r.execute(() -> cl.countDown());
}
assertTrue("latch timed out", cl.await(30, TimeUnit.SECONDS));
}
}

0 comments on commit 06a32b9

Please sign in to comment.