Skip to content

Commit

Permalink
Fix dangling request when connection is closed without error (#11528)
Browse files Browse the repository at this point in the history
* Fix dangling request when connection is closed without error
When a connection is closed without error, a streaming body would never finish, no response would be written, and thus the request would never be released. This patch changes PipeliningServerHandler so that:

- a normal closure is treated as an EOF by the streaming inbound handler
- the outbound handler can properly discard a response written after the PipeliningServerHandler is already being removed

Found by fuzzing.

* make FuzzyInputSpec less flaky
  • Loading branch information
yawkat authored Jan 22, 2025
1 parent d3ae72c commit cff9539
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.io.EOFException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -557,6 +558,12 @@ void read(Object message) {
}
}

@Override
void discard() {
// note: this has to match RoutingInBoundHandler#IGNORABLE_ERROR_MESSAGE
handleUpstreamError(new EOFException("Connection closed before full body was received"));
}

@Override
void handleUpstreamError(Throwable cause) {
dest.error(cause);
Expand Down Expand Up @@ -697,6 +704,12 @@ void readComplete() {
void handleUpstreamError(Throwable cause) {
delegate.handleUpstreamError(cause);
}

@Override
void discard() {
dispose();
delegate.discard();
}
}

/**
Expand Down Expand Up @@ -820,6 +833,11 @@ private void write(OutboundHandler handler) {
return;
}

if (ctx.isRemoved()) {
handler.discardOutbound();
return;
}

if (this.handler instanceof ContinueOutboundHandler cont) {
cont.next = handler;
writeSome();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public static void checkTriggered() {

@Override
protected void append(ILoggingEvent eventObject) {
if (eventObject.getLoggerName().equals(BufferLeakDetection.class.getName())) {
// ignore 'Canary leak detection failed.' messages
return;
}
triggered = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.micronaut.http.server.netty.fuzzing

import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
Expand Down Expand Up @@ -84,8 +85,10 @@ class FuzzyInputSpec extends Specification {
when:
def embeddedChannel = embeddedServer.buildEmbeddedChannel(false)

embeddedChannel.writeInbound(Unpooled.wrappedBuffer(input));
embeddedChannel.runPendingTasks();
def b = embeddedChannel.alloc().buffer(input.length)
b.writeBytes(input)
embeddedChannel.writeInbound(b)
embeddedChannel.runPendingTasks()

embeddedChannel.releaseOutbound()
// don't release inbound, that doesn't happen normally either
Expand All @@ -110,6 +113,7 @@ class FuzzyInputSpec extends Specification {
Base64.decoder.decode("VCB4dCBQLzUuMQoKUDIg/CBIUFRQLzEuMgotdHlwZTo3ClRyYX5zZmVyLUVuRVRUbmc6ZGVmbGF0ZQoKL7lFUDIg/CBIUFRQLzEuMQotdHlwZTotdHlwZTo3ClRyYW5zZmVyeXBlOjf///////////////////////////////////////////////////////////////////////////////////////////8KVHJhbnNmZXItRW5jb2Rpbmc6ZGVmbGF0ZQpjb250ZW50LWxlbmd0aDo4CgoNSU9OUyAvILiqVFAvCgovuUVQMkdHR0d3AC07biE="),
Base64.decoder.decode("UyAvIFAvMC4xMQpjb250ZW50LWxlbmd0aDo0ClRyYW5zZmVyLUVuY29kaW5nOmRlZmxhdGUKCi+5RVAyIPwgSC8xLjEK"),
Base64.decoder.decode("cA1ACUhUVFAvOC4wCkhvc3Q6OgpPcmlnaW46Cgo="),
Base64.decoder.decode("SEVDc3QNQP/9P/8JSFRUUC8wLjEKZXB0OgoKcG9zdA1A/T/9Oi8v/y9lY2hvLXB1Ymxpc2hlcglIVFRQLzAuMQp0OgpDb250ZW50LUxlbmd0aDo1Cgr/"),
]
}

Expand All @@ -126,5 +130,10 @@ class FuzzyInputSpec extends Specification {
public Publisher<String> index(Publisher<String> foo) {
return foo
}

@Post("/echo-publisher")
public Publisher<byte[]> echo(@Body Publisher<byte[]> foo) {
return foo;
}
}
}

0 comments on commit cff9539

Please sign in to comment.