Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <[email protected]>
  • Loading branch information
senivam committed Feb 4, 2025
1 parent 814dd35 commit ab15ad8
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void channelInactive(ChannelHandlerContext ctx) {
}

protected void notifyResponse() {
System.out.println("Notify response, " + jerseyResponse);
if (jerseyResponse != null) {
ClientResponse cr = jerseyResponse;
jerseyResponse = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import org.glassfish.jersey.client.ClientRequest;

import javax.ws.rs.ProcessingException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {
Expand All @@ -47,8 +42,13 @@ public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {

private HttpResponseStatus status = null;

private CountDownLatch latch = null;

private Boolean chunked = null;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Message received: " + msg);
if (checkExpectResponse(msg)) {
currentState = ExpectationState.AWAITING;
}
Expand All @@ -57,9 +57,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
final HttpResponse response = (HttpResponse) msg;
status = response.status();

boolean handshakeDone = HttpUtil.isContentLengthSet(response) || msg instanceof FullHttpMessage;
boolean handshakeDone = Boolean.FALSE.equals(chunked) || msg instanceof FullHttpMessage;
currentState = (handshakeDone) ? ExpectationState.IDLE : ExpectationState.FINISHING;

processLatch();
return;
case FINISHING:
if (msg instanceof LastHttpContent) {
Expand All @@ -72,34 +72,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

private boolean checkExpectResponse(Object msg) {
if (msg instanceof HttpResponse) {
return expectationStatuses.contains(((HttpResponse) msg).status());
if (latch != null && msg instanceof HttpResponse) {
return statusesToBeConsidered.contains(((HttpResponse) msg).status());
}
return false;
}

HttpRequest prepare100ContinueRequest(HttpRequest nettyRequest,
ClientRequest jerseyRequest)
throws InterruptedException, ExecutionException, TimeoutException {
//check for 100-Continue presence/availability
final Expect100ContinueConnectorExtension expect100ContinueExtension
= new Expect100ContinueConnectorExtension();
final DefaultFullHttpRequest preparedNettyHeadersRequest =
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
preparedNettyHeadersRequest.headers().setAll(nettyRequest.headers());

if (!preparedNettyHeadersRequest.headers().contains(HttpHeaderNames.HOST)) {
preparedNettyHeadersRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
}

//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
//enriched with the 'Expect:100-continue' header.
expect100ContinueExtension.invoke(jerseyRequest, preparedNettyHeadersRequest);

return preparedNettyHeadersRequest;
}
void processExpectationStatus()
boolean processExpectationStatus()
throws TimeoutException {
System.out.println(status);
if (status == null) {
throw new TimeoutException(); // continue without expectations
}
Expand All @@ -111,13 +92,39 @@ void processExpectationStatus()
throw new ProcessingException(LocalizationMessages
.REQUEST_ENTITY_TOO_LARGE(), null);
}

if (HttpResponseStatus.METHOD_NOT_ALLOWED.equals(status)) {
throw new TimeoutException(HttpResponseStatus
.METHOD_NOT_ALLOWED.reasonPhrase()); // Re-send request without expectations
}
if (HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
throw new TimeoutException(); // continue without expectations
throw new TimeoutException(HttpResponseStatus
.EXPECTATION_FAILED.reasonPhrase()); // continue without expectations
}
if (!HttpResponseStatus.CONTINUE.equals(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}

return true;
}

void attachCountDownLatch(CountDownLatch latch) {
this.latch = latch;
}

private void processLatch() {
if (latch != null) {
latch.countDown();
}
}

void removeLatch() {
latch = null;
}

void setChunked(Boolean chunk) {
this.chunked = chunk;
}

private enum ExpectationState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -448,23 +447,6 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
final CountDownLatch headersSet = new CountDownLatch(1);
final CountDownLatch contentLengthSet = new CountDownLatch(1);

try {
final HttpRequest expect100ContinueRequest =
expect100ContinueHandler.prepare100ContinueRequest(nettyRequest, jerseyRequest);
if (HttpUtil.is100ContinueExpected(expect100ContinueRequest)) {
//send expect request, sync and wait till either response or timeout received
boolean completed = ch.writeAndFlush(expect100ContinueRequest)
.sync().await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
if (completed) {
expect100ContinueHandler.processExpectationStatus();
} //if not we just continue without expectations
}
} catch (ExecutionException e) {
responseDone.completeExceptionally(e);
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
}

jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
Expand All @@ -488,24 +470,60 @@ public void run() {
if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
contentLengthSet.countDown();
System.out.println("Content-length triggered");
}

} catch (IOException e) {
e.printStackTrace();
responseDone.completeExceptionally(e);
}
}
});


new Expect100ContinueConnectorExtension().invoke(jerseyRequest, nettyRequest);

headersSet.await();
entityWriter.writeAndFlush(nettyRequest);

if (HttpUtil.is100ContinueExpected(nettyRequest)) {
System.out.println(nettyRequest);
final CountDownLatch expect100ContinueLatch = new CountDownLatch(1);
expect100ContinueHandler.attachCountDownLatch(expect100ContinueLatch);
expect100ContinueHandler.setChunked(NettyEntityWriter.Type.CHUNKED.equals(entityWriter.getType()));
//send expect request, sync and wait till either response or timeout received
ch.writeAndFlush(nettyRequest)
.sync().await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
expect100ContinueLatch.await();
try {
System.out.println("Processing expectation status");
expect100ContinueHandler.processExpectationStatus();
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
//so, send request directly without Expect header.
HttpUtil.set100ContinueExpected(nettyRequest, false);
expect100ContinueHandler.removeLatch();
entityWriter.writeAndFlush(nettyRequest);
System.out.println("request re-sent w/o expectations: " + nettyRequest);
} finally {
//restore request and handler to the original state.
HttpUtil.set100ContinueExpected(nettyRequest, false);
expect100ContinueHandler.removeLatch();
expect100ContinueHandler.setChunked(null);
}
} else {
entityWriter.writeAndFlush(nettyRequest);
}

System.out.println("entity processing");
if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
System.out.println("write it");
entityWriter.write(entityWriter.getChunkedInput());
}

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
System.out.println("count down latch for content length " + contentLengthSet);
contentLengthSet.await();
}
entityWriter.flush();
Expand Down
Loading

0 comments on commit ab15ad8

Please sign in to comment.