From 05590b0703811d8d273954f414030bcfb96a086e Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 14 Nov 2023 11:41:51 -0800 Subject: [PATCH] Java benchmarks clusters (#34) * Add lettuce cluster client when cluster mode enabled --------- Signed-off-by: Andrew Carbonetto --- benchmarks/install_and_test.sh | 9 +- .../javababushka/benchmarks/AsyncClient.java | 2 +- .../benchmarks/BenchmarkingApp.java | 43 ++++-- .../lettuce/LettuceAsyncClient.java | 6 +- .../lettuce/LettuceAsyncClusterClient.java | 57 +++++++ .../benchmarks/utils/Benchmarking.java | 139 +++++++++++------- .../benchmarks/utils/JsonWriter.java | 3 +- .../benchmarks/utils/LatencyResults.java | 5 +- .../lettuce/LettuceAsyncClientIT.java | 10 +- .../benchmarks/lettuce/LettuceClientIT.java | 2 +- 10 files changed, 197 insertions(+), 79 deletions(-) create mode 100644 java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java diff --git a/benchmarks/install_and_test.sh b/benchmarks/install_and_test.sh index 748c77d26a..de70794752 100755 --- a/benchmarks/install_and_test.sh +++ b/benchmarks/install_and_test.sh @@ -33,6 +33,7 @@ chosenClients="all" host="localhost" port=6379 tlsFlag="--tls" +javaTlsFlag="-tls" function runPythonBenchmark(){ # generate protobuf files @@ -72,9 +73,8 @@ function runCSharpBenchmark(){ function runJavaBenchmark(){ cd ${BENCH_FOLDER}/../java - echo "./gradlew run --args=\"-resultsFile ${BENCH_FOLDER}/$1 -dataSize $2 -concurrentTasks $concurrentTasks -clientCount $clientCount -clients $chosenClients -host $host $portFlag $tlsFlag $clusterFlag\"" - ./gradlew run --args="-resultsFile \"${BENCH_FOLDER}/$1\" -dataSize $2 -concurrentTasks $concurrentTasks -clients $chosenClients -host $host -clientCount $clientCount $tlsFlag $clusterFlag $portFlag" - cd ${BENCH_FOLDER}/java + echo "./gradlew run --args=\"-resultsFile ${BENCH_FOLDER}/$1 -dataSize \"$2\" -concurrentTasks \"$concurrentTasks\" -clientCount \"$clientCount\" -clients $chosenClients -host $host $javaPortFlag $javaTlsFlag $javaClusterFlag\"" + ./gradlew run --args="-resultsFile \"${BENCH_FOLDER}/$1\" -dataSize \"$2\" -concurrentTasks \"$concurrentTasks\" -clients \"$chosenClients\" -host $host $javaPortFlag -clientCount \"$clientCount\" $javaTlsFlag $javaClusterFlag" } function runRustBenchmark(){ @@ -229,12 +229,15 @@ do -no-csv) writeResultsCSV=0 ;; -no-tls) tlsFlag= + javaTlsFlag= ;; -is-cluster) clusterFlag="--clusterModeEnabled" + javaClusterFlag="-clusterModeEnabled" ;; -port) portFlag="--port "$2 + javaPortFlag="-port "$2 shift ;; esac diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java index a688775084..92d10ac4a0 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java @@ -4,7 +4,7 @@ public interface AsyncClient extends Client { - long DEFAULT_TIMEOUT = 1000; + long DEFAULT_TIMEOUT = 1000; // Milliseconds Future asyncSet(String key, String value); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index 1e3c9ac0e0..0647b0ae11 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -8,6 +8,7 @@ import javababushka.benchmarks.jedis.JedisClient; import javababushka.benchmarks.jedis.JedisPseudoAsyncClient; import javababushka.benchmarks.lettuce.LettuceAsyncClient; +import javababushka.benchmarks.lettuce.LettuceAsyncClusterClient; import javababushka.benchmarks.lettuce.LettuceClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -17,7 +18,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -/** Benchmarking app for reporting performance of various redis-rs Java-clients */ +/** Benchmarking app for reporting performance of various Redis Java-clients */ public class BenchmarkingApp { // main application entrypoint @@ -56,10 +57,14 @@ public static void main(String[] args) { testClientSetGet(LettuceClient::new, runConfiguration, false); break; case LETTUCE_ASYNC: - testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); + if (runConfiguration.clusterModeEnabled) { + testClientSetGet(LettuceAsyncClusterClient::new, runConfiguration, true); + } else { + testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); + } break; - case BABUSHKA: - System.out.println("Babushka not yet configured"); + case BABUSHKA_ASYNC: + System.out.println("Babushka async not yet configured"); break; } } @@ -96,6 +101,13 @@ private static Options getOptions() { options.addOption( Option.builder("clientCount").hasArg(true).desc("Number of clients to run [1]").build()); options.addOption(Option.builder("tls").hasArg(false).desc("TLS [false]").build()); + options.addOption( + Option.builder("clusterModeEnabled") + .hasArg(false) + .desc("Is cluster-mode enabled, other standalone mode is used [false]") + .build()); + options.addOption( + Option.builder("debugLogging").hasArg(false).desc("Verbose logs [false]").build()); return options; } @@ -137,20 +149,16 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce return Stream.of( ClientName.JEDIS, ClientName.JEDIS_ASYNC, - ClientName.BABUSHKA, - // ClientName.BABUSHKA_ASYNC, + ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC); case ALL_ASYNC: return Stream.of( ClientName.JEDIS_ASYNC, - // ClientName.BABUSHKA_ASYNC, + ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE_ASYNC); case ALL_SYNC: - return Stream.of( - ClientName.JEDIS, - // ClientName.BABUSHKA, - ClientName.LETTUCE); + return Stream.of(ClientName.JEDIS, ClientName.LETTUCE); default: return Stream.of(e); } @@ -171,6 +179,8 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce } runConfiguration.tls = line.hasOption("tls"); + runConfiguration.clusterModeEnabled = line.hasOption("clusterModeEnabled"); + runConfiguration.debugLogging = line.hasOption("debugLogging"); return runConfiguration; } @@ -182,8 +192,12 @@ private static int[] parseIntListOption(String line) throws ParseException { if (lineValue.startsWith("[") && lineValue.endsWith("]")) { lineValue = lineValue.substring(1, lineValue.length() - 1); } + + // trim whitespace + lineValue = lineValue.trim(); + // check if it's the correct format - if (!lineValue.matches("\\d+(\\s+\\d+)?")) { + if (!lineValue.matches("\\d+(\\s+\\d+)*")) { throw new ParseException("Invalid option: " + line); } // split the string into a list of integers @@ -195,7 +209,6 @@ public enum ClientName { JEDIS_ASYNC("Jedis async"), LETTUCE("Lettuce"), LETTUCE_ASYNC("Lettuce async"), - BABUSHKA("Babushka"), BABUSHKA_ASYNC("Babushka async"), ALL("All"), ALL_SYNC("All sync"), @@ -227,6 +240,7 @@ public static class RunConfiguration { public int port; public int[] clientCount; public boolean tls; + public boolean clusterModeEnabled; public boolean debugLogging = false; public RunConfiguration() { @@ -237,12 +251,13 @@ public RunConfiguration() { clients = new ClientName[] { // ClientName.BABUSHKA_ASYNC, - ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC + ClientName.LETTUCE_ASYNC }; host = "localhost"; port = 6379; clientCount = new int[] {1, 2}; tls = false; + clusterModeEnabled = false; } } } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java index 1b8c4ba9b7..583f5e488f 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java @@ -14,9 +14,9 @@ public class LettuceAsyncClient implements AsyncClient { - RedisClient client; - RedisAsyncCommands asyncCommands; - StatefulRedisConnection connection; + private RedisClient client; + private RedisAsyncCommands asyncCommands; + private StatefulRedisConnection connection; @Override public void connectToRedis() { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java new file mode 100644 index 0000000000..84e48eb691 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java @@ -0,0 +1,57 @@ +/* + * This Java source file was generated by the Gradle 'init' task. + */ +package javababushka.benchmarks.lettuce; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; +import javababushka.benchmarks.utils.ConnectionSettings; + +public class LettuceAsyncClusterClient extends LettuceAsyncClient { + + private RedisClusterClient clusterClient; + private RedisAdvancedClusterAsyncCommands clusterAsyncCommands; + private StatefulRedisClusterConnection clusterConnection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + RedisURI uri = + RedisURI.builder() + .withHost(connectionSettings.host) + .withPort(connectionSettings.port) + .withSsl(connectionSettings.useSsl) + .build(); + clusterClient = RedisClusterClient.create(uri); + clusterConnection = clusterClient.connect(); + clusterAsyncCommands = clusterConnection.async(); + } + + @Override + public RedisFuture asyncSet(String key, String value) { + return clusterAsyncCommands.set(key, value); + } + + @Override + public RedisFuture asyncGet(String key) { + return clusterAsyncCommands.get(key); + } + + @Override + public void closeConnection() { + clusterConnection.close(); + clusterClient.shutdown(); + } + + @Override + public String getName() { + return "Lettuce Cluster Async"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index d3d7baadb1..d52f051ead 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -14,7 +15,6 @@ import javababushka.benchmarks.BenchmarkingApp; import javababushka.benchmarks.Client; import javababushka.benchmarks.SyncClient; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; public class Benchmarking { @@ -23,8 +23,9 @@ public class Benchmarking { static final int SIZE_GET_KEYSPACE = 3750000; static final int SIZE_SET_KEYSPACE = 3000000; static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; - // measurements are done in nano-seconds, but it should be converted to seconds later - static final double SECONDS_IN_NANO = 1e-9; + // measurements are done in nanoseconds, but it should be converted to seconds later + public static final double SECONDS_IN_NANO = 1e-9; + public static final double NANO_TO_SECONDS = 1e9; private static ChosenAction randomAction() { if (Math.random() > PROB_GET) { @@ -90,7 +91,11 @@ public static Map calculateResults( ArrayList latencies = entry.getValue(); double avgLatency = - SECONDS_IN_NANO * latencies.stream().mapToLong(Long::longValue).sum() / latencies.size(); + latencies.size() == 0 + ? 0 + : SECONDS_IN_NANO + * latencies.stream().mapToLong(Long::longValue).sum() + / latencies.size(); Collections.sort(latencies); results.put( @@ -100,13 +105,15 @@ public static Map calculateResults( SECONDS_IN_NANO * percentile(latencies, 50), SECONDS_IN_NANO * percentile(latencies, 90), SECONDS_IN_NANO * percentile(latencies, 99), - SECONDS_IN_NANO * stdDeviation(latencies, avgLatency))); + SECONDS_IN_NANO * stdDeviation(latencies, avgLatency), + latencies.size())); } return results; } public static void printResults(Map resultsMap) { + int totalHits = 0; for (Map.Entry entry : resultsMap.entrySet()) { ChosenAction action = entry.getKey(); LatencyResults results = entry.getValue(); @@ -116,7 +123,10 @@ public static void printResults(Map resultsMap) { System.out.println(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); System.out.println(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); System.out.println(action + " std dev in ms: " + results.stdDeviation / 1000000.0); + System.out.println(action + " total hits: " + results.totalHits); + totalHits += results.totalHits; } + System.out.println("Total hits: " + totalHits); } public static void testClientSetGet( @@ -129,12 +139,6 @@ public static void testClientSetGet( "%n =====> %s <===== %d clients %d concurrent %n%n", clientCreator.get().getName(), clientCount, concurrentNum); AtomicInteger iterationCounter = new AtomicInteger(0); - Map> actionResults = - Map.of( - ChosenAction.GET_EXISTING, new ArrayList<>(), - ChosenAction.GET_NON_EXISTING, new ArrayList<>(), - ChosenAction.SET, new ArrayList<>()); - List tasks = new ArrayList<>(); // create clients List clients = new LinkedList<>(); @@ -144,62 +148,97 @@ public static void testClientSetGet( clients.add(newClient); } + long started = System.nanoTime(); + List>>> asyncTasks = + new ArrayList<>(); for (int taskNum = 0; taskNum < concurrentNum; taskNum++) { final int taskNumDebugging = taskNum; - tasks.add( - () -> { - int iterationIncrement = iterationCounter.getAndIncrement(); - int clientIndex = iterationIncrement % clients.size(); - - if (config.debugLogging) { - System.out.printf( - "%n concurrent = %d/%d, client# = %d/%d%n", - taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); - } - while (iterationIncrement < iterations) { - if (config.debugLogging) { - System.out.printf( - "> iteration = %d/%d, client# = %d/%d%n", - iterationIncrement + 1, iterations, clientIndex + 1, clientCount); - } - - var actions = getActionMap(clients.get(clientIndex), dataSize, async); - // operate and calculate tik-tok - Pair result = measurePerformance(actions); - actionResults.get(result.getLeft()).add(result.getRight()); - - iterationIncrement = iterationCounter.getAndIncrement(); - } - }); + asyncTasks.add( + CompletableFuture.supplyAsync( + () -> { + Map> taskActionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + int tasksCompleted = 0; + int iterationIncrement = iterationCounter.getAndIncrement(); + int clientIndex = iterationIncrement % clients.size(); + + if (config.debugLogging) { + System.out.printf( + "%n concurrent = %d/%d, client# = %d/%d%n", + taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); + } + while (iterationIncrement < iterations) { + if (config.debugLogging) { + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement + 1, iterations, clientIndex + 1, clientCount); + } + + var actions = getActionMap(clients.get(clientIndex), dataSize, async); + // operate and calculate tik-tok + Pair result = measurePerformance(actions); + taskActionResults.get(result.getLeft()).add(result.getRight()); + + tasksCompleted++; + iterationIncrement = iterationCounter.getAndIncrement(); + clientIndex = iterationIncrement % clients.size(); + } + return taskActionResults; + })); } if (config.debugLogging) { System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); System.out.printf( "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", - concurrentNum, clientCount, tasks.size()); + concurrentNum, clientCount, asyncTasks.size()); } - long started = System.nanoTime(); - tasks.stream() - .map(CompletableFuture::runAsync) - .forEach( - f -> { - try { - f.get(); - } catch (Exception e) { - e.printStackTrace(); - } - }); + // This will start execution of all the concurrent tasks asynchronously + CompletableFuture>>[] completableAsyncTaskArray = + asyncTasks.toArray(new CompletableFuture[asyncTasks.size()]); + try { + // wait for all futures to complete + CompletableFuture.allOf(completableAsyncTaskArray).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + // Map to save latency results separately for each action + Map> actionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + + // for each task, call future.get() to retrieve & save the result in the map + asyncTasks.forEach( + future -> { + try { + var futureResult = future.get(); + futureResult.forEach( + (action, result) -> actionResults.get(action).addAll(result)); + } catch (Exception e) { + e.printStackTrace(); + } + }); + long after = System.nanoTime(); var calculatedResults = calculateResults(actionResults); + if (config.resultsFile.isPresent()) { + double tps = iterationCounter.get() * NANO_TO_SECONDS / (after - started); JsonWriter.Write( calculatedResults, config.resultsFile.get(), + config.clusterModeEnabled, dataSize, clientCreator.get().getName(), clientCount, concurrentNum, - iterationCounter.get() * 1e9 / (System.nanoTime() - started)); + tps); } printResults(calculatedResults); } @@ -212,7 +251,7 @@ public static void testClientSetGet( public static Map getActionMap( Client client, int dataSize, boolean async) { - String value = RandomStringUtils.randomAlphanumeric(dataSize); + String value = "0".repeat(dataSize); Map actions = new HashMap<>(); actions.put( ChosenAction.GET_EXISTING, diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java index 064148de51..24e658f56b 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java @@ -17,6 +17,7 @@ public class JsonWriter { public static void Write( Map calculatedResults, String resultsFile, + boolean isCluster, int dataSize, String client, int clientCount, @@ -35,7 +36,7 @@ public static void Write( } var data = new Measurements.MeasurementsBuilder() - // TODO: is_cluster + .is_cluster(isCluster) .data_size(dataSize) .client(client) .client_count(clientCount) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java index 1ae5f1fd2d..7c9ee65232 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java @@ -7,17 +7,20 @@ public class LatencyResults { public final double p90Latency; public final double p99Latency; public final double stdDeviation; + public final int totalHits; public LatencyResults( double avgLatency, double p50Latency, double p90Latency, double p99Latency, - double stdDeviation) { + double stdDeviation, + int totalHits) { this.avgLatency = avgLatency; this.p50Latency = p50Latency; this.p90Latency = p90Latency; this.p99Latency = p99Latency; this.stdDeviation = stdDeviation; + this.totalHits = totalHits; } } diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java index c051c1b2a4..2c1f4eb93f 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java @@ -18,7 +18,7 @@ public class LettuceAsyncClientIT { private static LettuceAsyncClient otherLettuceClient; @BeforeAll - static void initializeJedisClient() { + static void initializeLettuceClient() { lettuceClient = new LettuceAsyncClient(); lettuceClient.connectToRedis(); @@ -47,12 +47,12 @@ public void testResourceSetGet() { try { lettuceClient.waitForResult(setResult); } catch (Exception e) { - fail("Can SET redis result without Exception"); + fail("SET result failed with exception " + e); } try { otherLettuceClient.waitForResult(otherSetResult); } catch (Exception e) { - fail("Can SET other redis result without Exception"); + fail("SET result on other client failed with exception " + e); } RedisFuture getResult = lettuceClient.asyncGet(key); @@ -62,13 +62,13 @@ public void testResourceSetGet() { try { result = (String) lettuceClient.waitForResult(getResult); } catch (Exception e) { - fail("Can GET redis result without Exception"); + fail("GET result failed with exception " + e); } try { otherResult = (String) otherLettuceClient.waitForResult(otherGetResult); } catch (Exception e) { - fail("Can GET other redis result without Exception"); + fail("GET result on other client failed with exception " + e); } assertEquals(value, result); diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java index 445cc5b584..fd325a3414 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java @@ -15,7 +15,7 @@ public class LettuceClientIT { private static LettuceClient lettuceClient; @BeforeAll - static void initializeJedisClient() { + static void initializeLettuceClient() { lettuceClient = new LettuceClient(); lettuceClient.connectToRedis(); }