Skip to content

Commit

Permalink
Backport 6273ab97dc1a0d3c1f51ba94694d9594dd7593d4
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeybakhtin committed Feb 21, 2025
1 parent 4803367 commit c0f83e9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,9 @@ public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (debug.on()) {
debug.log("body subscriber registered: " + count);
}
return true;
}
return true;
return false;
}
} finally {
selmgr.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void schedule() {
if (debug.on()) debug.log("subscribing user subscriber");
subscriber.onSubscribe(userSubscription);
}
while (!inputQ.isEmpty()) {
while (!inputQ.isEmpty() && errorRef.get() == null) {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame rf) {
inputQ.remove();
Expand Down Expand Up @@ -425,6 +425,10 @@ private void sendDataFrame(DataFrame frame) {
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
// ensure that the body subscriber will be subscribed and onError() is
// invoked
pendingResponseSubscriber = bodySubscriber;

// We want to allow the subscriber's getBody() method to block so it
// can work with InputStreams. So, we offload execution.
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
Expand All @@ -435,9 +439,6 @@ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor exec
responseBodyCF.completeExceptionally(t);
}

// ensure that the body subscriber will be subscribed and onError() is
// invoked
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed, or error

return responseBodyCF;
Expand Down
12 changes: 10 additions & 2 deletions test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
if (response != null) {
finisher.finish(where, response, thrower);
}
var tracker = TRACKER.getTracker(client);
if (!sameClient) {
// Wait for the client to be garbage collected.
// we use the ReferenceTracker API rather than HttpClient::close here,
Expand All @@ -482,7 +483,6 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
// By using the ReferenceTracker, we will get some diagnosis about what
// is keeping the client alive if it doesn't get GC'ed within the
// expected time frame.
var tracker = TRACKER.getTracker(client);
client = null;
System.gc();
System.out.println(now() + "waiting for client to shutdown: " + tracker.getName());
Expand All @@ -491,6 +491,14 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
if (error != null) throw error;
System.out.println(now() + "client shutdown normally: " + tracker.getName());
System.err.println(now() + "client shutdown normally: " + tracker.getName());
} else {
System.out.println(now() + "waiting for operation to finish: " + tracker.getName());
System.err.println(now() + "waiting for operation to finish: " + tracker.getName());
var error = TRACKER.checkFinished(tracker, 10000);
if (error != null) throw error;
System.out.println(now() + "operation finished normally: " + tracker.getName());
System.err.println(now() + "operation finished normally: " + tracker.getName());

}
}
}
Expand Down Expand Up @@ -800,7 +808,7 @@ public void teardown() throws Exception {
sharedClient == null ? null : sharedClient.toString();
sharedClient = null;
Thread.sleep(100);
AssertionError fail = TRACKER.check(500);
AssertionError fail = TRACKER.check(5000);
try {
httpTestServer.stop();
httpsTestServer.stop();
Expand Down
8 changes: 8 additions & 0 deletions test/jdk/java/net/httpclient/ReferenceTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public AssertionError check(Tracker tracker, long graceDelayMs) {
"outstanding operations or unreleased resources", true);
}

public AssertionError checkFinished(Tracker tracker, long graceDelayMs) {
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;
return check(tracker, graceDelayMs,
hasOperations.or(hasSubscribers),
"outstanding operations or unreleased resources", false);
}

public AssertionError check(long graceDelayMs) {
Predicate<Tracker> hasOperations = (t) -> t.getOutstandingOperations() > 0;
Predicate<Tracker> hasSubscribers = (t) -> t.getOutstandingSubscribers() > 0;
Expand Down

0 comments on commit c0f83e9

Please sign in to comment.