diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index 3f1f0f0608..1a481388d0 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -45,14 +45,14 @@ java { application { // Define the main class for the application. mainClass = 'javababushka.benchmarks.BenchmarkingApp' - applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release" + applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } -tasks.withType(Test) { +tasks.withType(Test) { testLogging { exceptionFormat "full" events "started", "skipped", "passed", "failed" showStandardStreams true } - jvmArgs "-Djava.library.path=${projectDir}/../target/debug" + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java index 0a98478fef..e9d192e9e2 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -35,7 +35,7 @@ public void connectToRedis() { @Override public void connectToRedis(ConnectionSettings connectionSettings) { -waitForResult(asyncConnectToRedis(connectionSettings)); + waitForResult(asyncConnectToRedis(connectionSettings)); } @Override diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index b94b9544fa..ee14f7836c 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -148,15 +148,12 @@ public static void printResults( public static void testClientSetGet( Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { for (int concurrentNum : config.concurrentTasks) { - int iterations = 100000; - Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX); + int iterations = + Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX); for (int clientCount : config.clientCount) { for (int dataSize : config.dataSize) { - System.out.printf( - "%n =====> %s <===== %d clients %d concurrent %d data %n%n", - clientCreator.get().getName(), clientCount, concurrentNum, dataSize); AtomicInteger iterationCounter = new AtomicInteger(0); - // Collections.synchronizedList + Map> actionResults = Map.of( ChosenAction.GET_EXISTING, new ArrayList<>(), @@ -172,6 +169,12 @@ public static void testClientSetGet( clients.add(newClient); } + String clientName = clients.get(0).getName(); + + System.out.printf( + "%n =====> %s <===== %d clients %d concurrent %d data %n%n", + clientName, clientCount, concurrentNum, dataSize); + for (int taskNum = 0; taskNum < concurrentNum; taskNum++) { final int taskNumDebugging = taskNum; tasks.add( @@ -214,7 +217,7 @@ public static void testClientSetGet( }); } if (config.debugLogging) { - System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); + System.out.printf("%s client Benchmarking: %n", clientName); System.out.printf( "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", concurrentNum, clientCount, tasks.size()); @@ -257,7 +260,7 @@ public static void testClientSetGet( calculatedResults, config.resultsFile.get(), dataSize, - clientCreator.get().getName(), + clientName, clientCount, concurrentNum, iterations / ((after - before) / TPS_NORMALIZATION)); diff --git a/java/client/build.gradle b/java/client/build.gradle index b4a061ad13..8dd4e5b7c5 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -22,17 +22,23 @@ dependencies { tasks.register('protobuf', Exec) { doFirst { - project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString()) + project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) } commandLine 'protoc', '-Iprotobuf=babushka-core/src/protobuf/', - '--java_out=java/client/src/main/java/org/babushka/javababushka/generated', + '--java_out=java/client/src/main/java/javababushka/generated', 'babushka-core/src/protobuf/connection_request.proto', 'babushka-core/src/protobuf/redis_request.proto', 'babushka-core/src/protobuf/response.proto' workingDir Paths.get(project.rootDir.path, '..').toFile() } +tasks.register('cleanProtobuf') { + doFirst { + project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) + } +} + tasks.register('buildRust', Exec) { commandLine 'cargo', 'build', '--release' workingDir project.rootDir @@ -54,7 +60,13 @@ tasks.register('buildAll') { } compileJava.dependsOn('protobuf') +clean.dependsOn('cleanProtobuf') -test { - systemProperty("java.library.path", "${projectDir}/../target/release") +tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "started", "skipped", "passed", "failed" + showStandardStreams true + } + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } diff --git a/java/client/src/main/java/javababushka/Client.java b/java/client/src/main/java/javababushka/Client.java index 6cd4415d14..73bc8f613d 100644 --- a/java/client/src/main/java/javababushka/Client.java +++ b/java/client/src/main/java/javababushka/Client.java @@ -227,28 +227,40 @@ public void run() { } public void closeConnection() { - try { - channel.flush(); + // flush and close the channel + channel.flush(); + channel.close(); + // TODO: check that the channel is closed + + // shutdown the event loop group gracefully by waiting for the remaining response + // and then shutting down the connection + try { long waitStarted = System.nanoTime(); long waitUntil = waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos - for (var future : responses) { - if (future == null || future.isDone()) { + for (var responseFuture : responses) { + if (responseFuture == null || responseFuture.isDone()) { continue; } try { - future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); + responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); } catch (InterruptedException | ExecutionException ignored) { + // TODO: print warning } catch (TimeoutException e) { - future.cancel(true); - // TODO cancel the rest + responseFuture.cancel(true); + // TODO: cancel the rest break; } } } finally { - // channel.closeFuture().sync() - group.shutdownGracefully(); + var shuttingDown = group.shutdownGracefully(); + try { + shuttingDown.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + assert group.isShutdown() : "Redis connection did not shutdown gracefully"; } }