Skip to content

Commit

Permalink
Upgrade async operations to complete all tasks
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Nov 9, 2023
1 parent 4ac9b33 commit 45d7f6e
Showing 1 changed file with 60 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
package javababushka.benchmarks.utils;

import static java.util.concurrent.CompletableFuture.runAsync;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javababushka.benchmarks.AsyncClient;
import javababushka.benchmarks.BenchmarkingApp;
import javababushka.benchmarks.Client;
Expand Down Expand Up @@ -95,7 +91,11 @@ public static Map<ChosenAction, LatencyResults> calculateResults(
ArrayList<Long> latencies = entry.getValue();

double avgLatency =
SECONDS_IN_NANO * latencies.stream().mapToLong(Long::longValue).sum() / latencies.size();
latencies.size() <= 0
? 0
: SECONDS_IN_NANO
* latencies.stream().mapToLong(Long::longValue).sum()
/ latencies.size();

Collections.sort(latencies);
results.put(
Expand Down Expand Up @@ -139,12 +139,6 @@ public static void testClientSetGet(
"%n =====> %s <===== %d clients %d concurrent %n%n",
clientCreator.get().getName(), clientCount, concurrentNum);
AtomicInteger iterationCounter = new AtomicInteger(0);
Map<ChosenAction, ArrayList<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
ChosenAction.GET_NON_EXISTING, new ArrayList<>(),
ChosenAction.SET, new ArrayList<>());
List<Runnable> tasks = new ArrayList<>();

// create clients
List<Client> clients = new LinkedList<>();
Expand All @@ -154,63 +148,76 @@ public static void testClientSetGet(
clients.add(newClient);
}

long started = System.nanoTime();
List<CompletableFuture<Map<ChosenAction, ArrayList<Long>>>> asyncTasks =
new ArrayList<>();
for (int taskNum = 0; taskNum < concurrentNum; taskNum++) {
final int taskNumDebugging = taskNum;
tasks.add(
() -> {
int iterationIncrement = iterationCounter.getAndIncrement();
int clientIndex = iterationIncrement % clients.size();
asyncTasks.add(
CompletableFuture.supplyAsync(
() -> {
Map<ChosenAction, ArrayList<Long>> taskActionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
ChosenAction.GET_NON_EXISTING, new ArrayList<>(),
ChosenAction.SET, new ArrayList<>());
int tasksCompleted = 0;
int iterationIncrement = iterationCounter.getAndIncrement();
int clientIndex = iterationIncrement % clients.size();

if (config.debugLogging) {
System.out.printf(
"%n concurrent = %d/%d, client# = %d/%d%n",
taskNumDebugging, concurrentNum, clientIndex + 1, clientCount);
}
while (iterationIncrement < iterations) {
if (config.debugLogging) {
System.out.printf(
"> iteration = %d/%d, client# = %d/%d%n",
iterationIncrement + 1, iterations, clientIndex + 1, clientCount);
}
if (config.debugLogging) {
System.out.printf(
"%n concurrent = %d/%d, client# = %d/%d%n",
taskNumDebugging, concurrentNum, clientIndex + 1, clientCount);
}
while (iterationIncrement < iterations) {
if (config.debugLogging) {
System.out.printf(
"> iteration = %d/%d, client# = %d/%d%n",
iterationIncrement + 1, iterations, clientIndex + 1, clientCount);
}

var actions = getActionMap(clients.get(clientIndex), dataSize, async);
// operate and calculate tik-tok
Pair<ChosenAction, Long> result = measurePerformance(actions);
actionResults.get(result.getLeft()).add(result.getRight());
var actions = getActionMap(clients.get(clientIndex), dataSize, async);
// operate and calculate tik-tok
Pair<ChosenAction, Long> result = measurePerformance(actions);
taskActionResults.get(result.getLeft()).add(result.getRight());

iterationIncrement = iterationCounter.getAndIncrement();
}
}
);
tasksCompleted++;
iterationIncrement = iterationCounter.getAndIncrement();
}
System.out.println(
"Tasks " + taskNumDebugging + " completed " + tasksCompleted + " tasks");
return taskActionResults;
}));
}
if (config.debugLogging) {
System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName());
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
concurrentNum, clientCount, asyncTasks.size());
}

ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum);
long started = System.nanoTime();
// create threads and add them to the async pool.
// This will start execution of all the concurrent tasks.
List<CompletableFuture> asyncTasks =
tasks.stream().map((runnable) -> runAsync(runnable, threadPool)).collect(Collectors.toList());
// close pool and await for tasks to complete
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
// wait before waiting for threads to complete
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
// This will start execution of all the concurrent tasks asynchronously
CompletableFuture<Map<ChosenAction, ArrayList<Long>>>[] completableAsyncTaskArray =
asyncTasks.toArray(new CompletableFuture[asyncTasks.size()]);
try {
// wait for all futures to complete
CompletableFuture.allOf(completableAsyncTaskArray).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// wait for all futures to complete
Map<ChosenAction, ArrayList<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
ChosenAction.GET_NON_EXISTING, new ArrayList<>(),
ChosenAction.SET, new ArrayList<>());
asyncTasks.forEach(
future -> {
try {
future.get();
var futureResult = future.get();
futureResult.forEach(
(action, result) -> actionResults.get(action).addAll(result));
} catch (Exception e) {
e.printStackTrace();
}
Expand Down

0 comments on commit 45d7f6e

Please sign in to comment.