Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8342075: HttpClient: improve HTTP/2 flow control checks #1427

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -58,6 +58,10 @@ abstract class ExchangeImpl<T> {

final Exchange<T> exchange;

// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
private volatile boolean unprocessedByPeer;

ExchangeImpl(Exchange<T> e) {
// e == null means a http/2 pushed stream
this.exchange = e;
Expand Down Expand Up @@ -264,4 +268,13 @@ void upgraded() { }
// Called when server returns non 100 response to
// an Expect-Continue
void expectContinueFailed(int rcode) { }

final boolean isUnprocessedByPeer() {
return this.unprocessedByPeer;
}

// Marks the exchange as unprocessed by the peer
final void markUnprocessedByPeer() {
this.unprocessedByPeer = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.frame.SettingsFrame;

import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
Expand Down Expand Up @@ -289,9 +291,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) {
// and the connection window size.
int defaultValue = Math.max(streamWindow, K*K*32);

// The min value is the max between the streamWindow and
// the initial connection window size
int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);

return getParameter(
"jdk.httpclient.connectionWindowSize",
streamWindow, Integer.MAX_VALUE, defaultValue);
minValue, Integer.MAX_VALUE, defaultValue);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -396,6 +397,7 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro
private final String key; // for HttpClientImpl.connections map
private final FramesDecoder framesDecoder;
private final FramesEncoder framesEncoder = new FramesEncoder();
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);

/**
* Send Window controller for both connection and stream windows.
Expand Down Expand Up @@ -802,7 +804,9 @@ final int maxConcurrentServerInitiatedStreams() {

void close() {
if (markHalfClosedLocal()) {
if (connection.channel().isOpen()) {
// we send a GOAWAY frame only if the remote side hasn't already indicated
// the intention to close the connection by previously sending a GOAWAY of its own
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0,
ErrorFrame.NO_ERROR,
Expand Down Expand Up @@ -1064,6 +1068,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) {
return null;
}

// This method is called when a DataFrame that was added
// to a Stream::inputQ is later dropped from the queue
// without being consumed.
//
// Before adding a frame to the queue, the Stream calls
// connection.windowUpdater.canBufferUnprocessedBytes(), which
// increases the count of unprocessed bytes in the connection.
// After consuming the frame, it calls connection.windowUpdater::processed,
// which decrements the count of unprocessed bytes, and possibly
// sends a window update to the peer.
//
// This method is called when connection.windowUpdater::processed
// will not be called, which can happen when consuming the frame
// fails, or when an empty DataFrame terminates the stream,
// or when the stream is cancelled while data is still
// sitting in its inputQ. In the later case, it is called for
// each frame that is dropped from the queue.
final void releaseUnconsumed(DataFrame df) {
windowUpdater.released(df.payloadLength());
dropDataFrame(df);
}

// This method can be called directly when a DataFrame is dropped
// before/without having been added to any Stream::inputQ.
// In that case, the number of unprocessed bytes hasn't been incremented
// by the stream, and does not need to be decremented.
// Otherwise, if the frame is dropped after having been added to the
// inputQ, releaseUnconsumed above should be called.
final void dropDataFrame(DataFrame df) {
if (isMarked(closedState, SHUTDOWN_REQUESTED)) return;
if (debug.on()) {
Expand Down Expand Up @@ -1354,13 +1386,46 @@ private void handlePing(PingFrame frame)
sendUnorderedFrame(frame);
}

private void handleGoAway(GoAwayFrame frame)
throws IOException
{
if (markHalfClosedLRemote()) {
shutdown(new IOException(
connection.channel().getLocalAddress()
+ ": GOAWAY received"));
private void handleGoAway(final GoAwayFrame frame) {
final long lastProcessedStream = frame.getLastStream();
assert lastProcessedStream >= 0 : "unexpected last stream id: "
+ lastProcessedStream + " in GOAWAY frame";

markHalfClosedRemote();
setFinalStream(); // don't allow any new streams on this connection
if (debug.on()) {
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
lastProcessedStream, frame);
}
// see if this connection has previously received a GOAWAY from the peer and if yes
// then check if this new last processed stream id is lesser than the previous
// known last processed stream id. Only update the last processed stream id if the new
// one is lesser than the previous one.
long prevLastProcessed = lastProcessedStreamInGoAway.get();
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
lastProcessedStream)) {
break;
}
prevLastProcessed = lastProcessedStreamInGoAway.get();
}
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
}

private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
streams.forEach((id, exchange) -> {
if (id > lastProcessedStream) {
// any streams with an stream id higher than the last processed stream
// can be retried (on a new connection). we close the exchange as unprocessed
// to facilitate the retrying.
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
numClosed.incrementAndGet();
}
});
if (debug.on()) {
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
+ ", will be closed as unprocessed");
}
}

Expand Down Expand Up @@ -1416,11 +1481,12 @@ private void sendConnectionPreface() throws IOException {
// Note that the default initial window size, not to be confused
// with the initial window size, is defined by RFC 7540 as
// 64K -1.
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
if (len != 0) {
final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
assert len >= 0;
if (len > 0) {
if (Log.channel()) {
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
}
windowUpdater.sendWindowUpdate(len);
}
Expand Down Expand Up @@ -1874,6 +1940,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection,
int getStreamId() {
return 0;
}

@Override
protected boolean windowSizeExceeded(long received) {
if (connection.isOpen()) {
try {
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
"connection window exceeded");
} catch (IOException io) {
connection.shutdown(io);
}
}
return true;
}
}

/**
Expand Down Expand Up @@ -1911,7 +1990,7 @@ private boolean markHalfClosedLocal() {
return markClosedState(HALF_CLOSED_LOCAL);
}

private boolean markHalfClosedLRemote() {
private boolean markHalfClosedRemote() {
return markClosedState(HALF_CLOSED_REMOTE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,23 +282,25 @@ public String toString() {
}
}

static void registerPending(PendingRequest pending) {
static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return;
if (pending.cf.isDone()) return res;

var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
pending.ref = ref;
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
return ref;
}

static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
Expand Down Expand Up @@ -575,8 +577,9 @@ public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (debug.on()) {
debug.log("body subscriber registered: " + count);
}
return true;
}
return true;
return false;
}
} finally {
selmgr.unlock();
Expand Down Expand Up @@ -930,8 +933,9 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
if (cf != null) {
cf.cancel(true);
}
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
Expand Down Expand Up @@ -1053,19 +1057,23 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}

// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}

// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
registerPending(pending);
return res;
} catch(Throwable t) {
res = registerPending(pending, res);

if (exchangeExecutor != null) {
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
return res.isDone() ? res
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
} else {
// make a defensive copy that can be safely canceled
// by the caller
return res.isDone() ? res : res.copy();
}
} catch (Throwable t) {
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
Expand Down
Loading