Skip to content

Commit

Permalink
onClose callback for LogWatch
Browse files Browse the repository at this point in the history
  • Loading branch information
C committed Feb 20, 2025
1 parent 364992b commit 097fced
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletionStage;


public interface LogWatch extends Closeable {

Expand All @@ -29,9 +31,21 @@ public interface LogWatch extends Closeable {
*/
InputStream getOutput();


/**
* Returns a {@link CompletionStage} released when the log stream is closed.
* If the stream is closed due to an exception (cf onFailure),
* this exception will be passed as parameter, null otherwise
*
* @return a {@link CompletionStage} released when the log stream is closed
*/
CompletionStage<Throwable> onClose();

/**
* Close the Watch.
*/
@Override
void close();


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;

public class LogWatchCallback implements LogWatch, AutoCloseable {
Expand All @@ -41,9 +42,11 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
private final OutputStream out;
private WritableByteChannel outChannel;
private volatile InputStream output;


private final AtomicBoolean closed = new AtomicBoolean(false);
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
private final CompletableFuture<Throwable> onCloseFuture = new CompletableFuture<>();
private final SerialExecutor serialExecutor;

public LogWatchCallback(OutputStream out, OperationContext context) {
Expand All @@ -53,18 +56,26 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
}
this.serialExecutor = new SerialExecutor(context.getExecutor());
}

@Override
public CompletionStage<Throwable> onClose() {
return onCloseFuture.minimalCompletionStage();
}

@Override
public void close() {
cleanUp();
cleanUp(null);
}

private void cleanUp() {
private void cleanUp(Throwable u) {
if (!closed.compareAndSet(false, true)) {
return;
}

asyncBody.thenAccept(AsyncBody::cancel);
onCloseFuture.complete(u);
serialExecutor.shutdownNow();

}

public LogWatchCallback callAndWait(HttpClient client, URL url) {
Expand Down Expand Up @@ -111,7 +122,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
if (t != null) {
onFailure(t);
} else {
cleanUp();
cleanUp(null);
}
}, serialExecutor));
}
Expand All @@ -131,9 +142,9 @@ public void onFailure(Throwable u) {
if (closed.get()) {
return;
}

LOGGER.error("Log Callback Failure.", u);
cleanUp();
cleanUp(u);
}

}

0 comments on commit 097fced

Please sign in to comment.