From 5f9533a0fc6ae646de0d155f58dacc17cd729ade Mon Sep 17 00:00:00 2001 From: C Date: Tue, 11 Feb 2025 23:02:44 +0100 Subject: [PATCH] onClose callback for LogWatch --- .../kubernetes/client/dsl/LogWatch.java | 14 +++++++++++++ .../client/dsl/internal/LogWatchCallback.java | 21 ++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java index 8faa137531e..cc662639bc4 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java @@ -18,6 +18,8 @@ import java.io.Closeable; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.CompletionStage; + public interface LogWatch extends Closeable { @@ -29,9 +31,21 @@ public interface LogWatch extends Closeable { */ InputStream getOutput(); + + /** + * Returns a {@link CompletionStage} released when the log stream is closed. + * If the stream is closed due to an exception (cf onFailure), + * this exception will be passed as parameter, null otherwise + * + * @return a {@link CompletionStage} released when the log stream is closed + */ + CompletionStage onClose(); + /** * Close the Watch. */ @Override void close(); + + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index b3a118a7e1c..f6fd0013536 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -32,6 +32,7 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; public class LogWatchCallback implements LogWatch, AutoCloseable { @@ -41,9 +42,11 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private final OutputStream out; private WritableByteChannel outChannel; private volatile InputStream output; + private final AtomicBoolean closed = new AtomicBoolean(false); private final CompletableFuture asyncBody = new CompletableFuture<>(); + private final CompletableFuture onCloseFuture = new CompletableFuture<>(); private final SerialExecutor serialExecutor; public LogWatchCallback(OutputStream out, OperationContext context) { @@ -53,18 +56,26 @@ public LogWatchCallback(OutputStream out, OperationContext context) { } this.serialExecutor = new SerialExecutor(context.getExecutor()); } + + @Override + public CompletionStage onClose() { + return onCloseFuture.minimalCompletionStage(); + } @Override public void close() { - cleanUp(); + cleanUp(null); } - private void cleanUp() { + private void cleanUp(Throwable u) { if (!closed.compareAndSet(false, true)) { return; } + asyncBody.thenAccept(AsyncBody::cancel); + onCloseFuture.complete(u); serialExecutor.shutdownNow(); + } public LogWatchCallback callAndWait(HttpClient client, URL url) { @@ -111,7 +122,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) { if (t != null) { onFailure(t); } else { - cleanUp(); + cleanUp(null); } }, serialExecutor)); } @@ -131,9 +142,9 @@ public void onFailure(Throwable u) { if (closed.get()) { return; } - + LOGGER.error("Log Callback Failure.", u); - cleanUp(); + cleanUp(u); } }