From 45d7f6e467333855d12f5ccc41147caf44a4054e Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 9 Nov 2023 12:40:49 -0800 Subject: [PATCH] Upgrade async operations to complete all tasks Signed-off-by: acarbonetto --- .../benchmarks/utils/Benchmarking.java | 113 ++++++++++-------- 1 file changed, 60 insertions(+), 53 deletions(-) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index be03f95010..6bef1a3c05 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -1,7 +1,5 @@ package javababushka.benchmarks.utils; -import static java.util.concurrent.CompletableFuture.runAsync; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -9,12 +7,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import java.util.stream.Collectors; import javababushka.benchmarks.AsyncClient; import javababushka.benchmarks.BenchmarkingApp; import javababushka.benchmarks.Client; @@ -95,7 +91,11 @@ public static Map calculateResults( ArrayList latencies = entry.getValue(); double avgLatency = - SECONDS_IN_NANO * latencies.stream().mapToLong(Long::longValue).sum() / latencies.size(); + latencies.size() <= 0 + ? 0 + : SECONDS_IN_NANO + * latencies.stream().mapToLong(Long::longValue).sum() + / latencies.size(); Collections.sort(latencies); results.put( @@ -139,12 +139,6 @@ public static void testClientSetGet( "%n =====> %s <===== %d clients %d concurrent %n%n", clientCreator.get().getName(), clientCount, concurrentNum); AtomicInteger iterationCounter = new AtomicInteger(0); - Map> actionResults = - Map.of( - ChosenAction.GET_EXISTING, new ArrayList<>(), - ChosenAction.GET_NON_EXISTING, new ArrayList<>(), - ChosenAction.SET, new ArrayList<>()); - List tasks = new ArrayList<>(); // create clients List clients = new LinkedList<>(); @@ -154,63 +148,76 @@ public static void testClientSetGet( clients.add(newClient); } + long started = System.nanoTime(); + List>>> asyncTasks = + new ArrayList<>(); for (int taskNum = 0; taskNum < concurrentNum; taskNum++) { final int taskNumDebugging = taskNum; - tasks.add( - () -> { - int iterationIncrement = iterationCounter.getAndIncrement(); - int clientIndex = iterationIncrement % clients.size(); + asyncTasks.add( + CompletableFuture.supplyAsync( + () -> { + Map> taskActionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + int tasksCompleted = 0; + int iterationIncrement = iterationCounter.getAndIncrement(); + int clientIndex = iterationIncrement % clients.size(); - if (config.debugLogging) { - System.out.printf( - "%n concurrent = %d/%d, client# = %d/%d%n", - taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); - } - while (iterationIncrement < iterations) { - if (config.debugLogging) { - System.out.printf( - "> iteration = %d/%d, client# = %d/%d%n", - iterationIncrement + 1, iterations, clientIndex + 1, clientCount); - } + if (config.debugLogging) { + System.out.printf( + "%n concurrent = %d/%d, client# = %d/%d%n", + taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); + } + while (iterationIncrement < iterations) { + if (config.debugLogging) { + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement + 1, iterations, clientIndex + 1, clientCount); + } - var actions = getActionMap(clients.get(clientIndex), dataSize, async); - // operate and calculate tik-tok - Pair result = measurePerformance(actions); - actionResults.get(result.getLeft()).add(result.getRight()); + var actions = getActionMap(clients.get(clientIndex), dataSize, async); + // operate and calculate tik-tok + Pair result = measurePerformance(actions); + taskActionResults.get(result.getLeft()).add(result.getRight()); - iterationIncrement = iterationCounter.getAndIncrement(); - } - } - ); + tasksCompleted++; + iterationIncrement = iterationCounter.getAndIncrement(); + } + System.out.println( + "Tasks " + taskNumDebugging + " completed " + tasksCompleted + " tasks"); + return taskActionResults; + })); } if (config.debugLogging) { System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); System.out.printf( "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", - concurrentNum, clientCount, tasks.size()); + concurrentNum, clientCount, asyncTasks.size()); } - ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum); - long started = System.nanoTime(); - // create threads and add them to the async pool. - // This will start execution of all the concurrent tasks. - List asyncTasks = - tasks.stream().map((runnable) -> runAsync(runnable, threadPool)).collect(Collectors.toList()); - // close pool and await for tasks to complete - threadPool.shutdown(); - while (!threadPool.isTerminated()) { - try { - // wait before waiting for threads to complete - Thread.sleep(100); - } catch (InterruptedException interruptedException) { - interruptedException.printStackTrace(); - } + // This will start execution of all the concurrent tasks asynchronously + CompletableFuture>>[] completableAsyncTaskArray = + asyncTasks.toArray(new CompletableFuture[asyncTasks.size()]); + try { + // wait for all futures to complete + CompletableFuture.allOf(completableAsyncTaskArray).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + throw new RuntimeException(e); } - // wait for all futures to complete + Map> actionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); asyncTasks.forEach( future -> { try { - future.get(); + var futureResult = future.get(); + futureResult.forEach( + (action, result) -> actionResults.get(action).addAll(result)); } catch (Exception e) { e.printStackTrace(); }