diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index 14310381958..727b58d45a0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import java.io.EOFException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -557,6 +558,12 @@ void read(Object message) { } } + @Override + void discard() { + // note: this has to match RoutingInBoundHandler#IGNORABLE_ERROR_MESSAGE + handleUpstreamError(new EOFException("Connection closed before full body was received")); + } + @Override void handleUpstreamError(Throwable cause) { dest.error(cause); @@ -697,6 +704,12 @@ void readComplete() { void handleUpstreamError(Throwable cause) { delegate.handleUpstreamError(cause); } + + @Override + void discard() { + dispose(); + delegate.discard(); + } } /** @@ -820,6 +833,11 @@ private void write(OutboundHandler handler) { return; } + if (ctx.isRemoved()) { + handler.discardOutbound(); + return; + } + if (this.handler instanceof ContinueOutboundHandler cont) { cont.next = handler; writeSome(); diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FlagAppender.java b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FlagAppender.java index 76e78a47f88..df26d656a75 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FlagAppender.java +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FlagAppender.java @@ -19,6 +19,10 @@ public static void checkTriggered() { @Override protected void append(ILoggingEvent eventObject) { + if (eventObject.getLoggerName().equals(BufferLeakDetection.class.getName())) { + // ignore 'Canary leak detection failed.' messages + return; + } triggered = true; } } diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FuzzyInputSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FuzzyInputSpec.groovy index b4f9d564655..d286fd2d25c 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FuzzyInputSpec.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/FuzzyInputSpec.groovy @@ -2,6 +2,7 @@ package io.micronaut.http.server.netty.fuzzing import io.micronaut.context.ApplicationContext import io.micronaut.context.annotation.Requires +import io.micronaut.http.annotation.Body import io.micronaut.http.annotation.Controller import io.micronaut.http.annotation.Get import io.micronaut.http.annotation.Post @@ -84,8 +85,10 @@ class FuzzyInputSpec extends Specification { when: def embeddedChannel = embeddedServer.buildEmbeddedChannel(false) - embeddedChannel.writeInbound(Unpooled.wrappedBuffer(input)); - embeddedChannel.runPendingTasks(); + def b = embeddedChannel.alloc().buffer(input.length) + b.writeBytes(input) + embeddedChannel.writeInbound(b) + embeddedChannel.runPendingTasks() embeddedChannel.releaseOutbound() // don't release inbound, that doesn't happen normally either @@ -110,6 +113,7 @@ class FuzzyInputSpec extends Specification { Base64.decoder.decode("VCB4dCBQLzUuMQoKUDIg/CBIUFRQLzEuMgotdHlwZTo3ClRyYX5zZmVyLUVuRVRUbmc6ZGVmbGF0ZQoKL7lFUDIg/CBIUFRQLzEuMQotdHlwZTotdHlwZTo3ClRyYW5zZmVyeXBlOjf///////////////////////////////////////////////////////////////////////////////////////////8KVHJhbnNmZXItRW5jb2Rpbmc6ZGVmbGF0ZQpjb250ZW50LWxlbmd0aDo4CgoNSU9OUyAvILiqVFAvCgovuUVQMkdHR0d3AC07biE="), Base64.decoder.decode("UyAvIFAvMC4xMQpjb250ZW50LWxlbmd0aDo0ClRyYW5zZmVyLUVuY29kaW5nOmRlZmxhdGUKCi+5RVAyIPwgSC8xLjEK"), Base64.decoder.decode("cA1ACUhUVFAvOC4wCkhvc3Q6OgpPcmlnaW46Cgo="), + Base64.decoder.decode("SEVDc3QNQP/9P/8JSFRUUC8wLjEKZXB0OgoKcG9zdA1A/T/9Oi8v/y9lY2hvLXB1Ymxpc2hlcglIVFRQLzAuMQp0OgpDb250ZW50LUxlbmd0aDo1Cgr/"), ] } @@ -126,5 +130,10 @@ class FuzzyInputSpec extends Specification { public Publisher index(Publisher foo) { return foo } + + @Post("/echo-publisher") + public Publisher echo(@Body Publisher foo) { + return foo; + } } }