diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java index d88923192e5..9a8b901df7d 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java @@ -577,8 +577,9 @@ public boolean registerSubscriber(HttpBodySubscriberWrapper subscriber) { if (debug.on()) { debug.log("body subscriber registered: " + count); } + return true; } - return true; + return false; } } finally { selmgr.unlock(); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index 5ae9f707d7c..30015484c7b 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -191,7 +191,7 @@ private void schedule() { if (debug.on()) debug.log("subscribing user subscriber"); subscriber.onSubscribe(userSubscription); } - while (!inputQ.isEmpty()) { + while (!inputQ.isEmpty() && errorRef.get() == null) { Http2Frame frame = inputQ.peek(); if (frame instanceof ResetFrame rf) { inputQ.remove(); @@ -425,6 +425,10 @@ private void sendDataFrame(DataFrame frame) { // pushes entire response body into response subscriber // blocking when required by local or remote flow control CompletableFuture receiveData(BodySubscriber bodySubscriber, Executor executor) { + // ensure that the body subscriber will be subscribed and onError() is + // invoked + pendingResponseSubscriber = bodySubscriber; + // We want to allow the subscriber's getBody() method to block so it // can work with InputStreams. So, we offload execution. responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber, @@ -435,9 +439,6 @@ CompletableFuture receiveData(BodySubscriber bodySubscriber, Executor exec responseBodyCF.completeExceptionally(t); } - // ensure that the body subscriber will be subscribed and onError() is - // invoked - pendingResponseSubscriber = bodySubscriber; sched.runOrSchedule(); // in case data waiting already to be processed, or error return responseBodyCF; diff --git a/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java b/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java index 077ae346462..dc7ca3fe9d5 100644 --- a/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java +++ b/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java @@ -474,6 +474,7 @@ private void testThrowing(String uri, boolean sameClient, if (response != null) { finisher.finish(where, response, thrower); } + var tracker = TRACKER.getTracker(client); if (!sameClient) { // Wait for the client to be garbage collected. // we use the ReferenceTracker API rather than HttpClient::close here, @@ -482,7 +483,6 @@ private void testThrowing(String uri, boolean sameClient, // By using the ReferenceTracker, we will get some diagnosis about what // is keeping the client alive if it doesn't get GC'ed within the // expected time frame. - var tracker = TRACKER.getTracker(client); client = null; System.gc(); System.out.println(now() + "waiting for client to shutdown: " + tracker.getName()); @@ -491,6 +491,14 @@ private void testThrowing(String uri, boolean sameClient, if (error != null) throw error; System.out.println(now() + "client shutdown normally: " + tracker.getName()); System.err.println(now() + "client shutdown normally: " + tracker.getName()); + } else { + System.out.println(now() + "waiting for operation to finish: " + tracker.getName()); + System.err.println(now() + "waiting for operation to finish: " + tracker.getName()); + var error = TRACKER.checkFinished(tracker, 10000); + if (error != null) throw error; + System.out.println(now() + "operation finished normally: " + tracker.getName()); + System.err.println(now() + "operation finished normally: " + tracker.getName()); + } } } @@ -800,7 +808,7 @@ public void teardown() throws Exception { sharedClient == null ? null : sharedClient.toString(); sharedClient = null; Thread.sleep(100); - AssertionError fail = TRACKER.check(500); + AssertionError fail = TRACKER.check(5000); try { httpTestServer.stop(); httpsTestServer.stop(); diff --git a/test/jdk/java/net/httpclient/ReferenceTracker.java b/test/jdk/java/net/httpclient/ReferenceTracker.java index dbc4a128675..d7e16d01201 100644 --- a/test/jdk/java/net/httpclient/ReferenceTracker.java +++ b/test/jdk/java/net/httpclient/ReferenceTracker.java @@ -115,6 +115,14 @@ public AssertionError check(Tracker tracker, long graceDelayMs) { "outstanding operations or unreleased resources", true); } + public AssertionError checkFinished(Tracker tracker, long graceDelayMs) { + Predicate hasOperations = (t) -> t.getOutstandingOperations() > 0; + Predicate hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0; + return check(tracker, graceDelayMs, + hasOperations.or(hasSubscribers), + "outstanding operations or unreleased resources", false); + } + public AssertionError check(long graceDelayMs) { Predicate hasOperations = (t) -> t.getOutstandingOperations() > 0; Predicate hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;