Skip to content

Commit

Permalink
Improve message body handlers (#11002)
Browse files Browse the repository at this point in the history
- Previous raw handlers didn't allow to define a custom `String` handler of a different media type
- Raw handlers replaced by `Type*` reader/writer that include an argument.
- Extracted raw handlers to a separate writer / reader bean (no need to hide them with `@Bean(typed=`)
- Improved how dynamic body writer works, it will be always included from the registry for object types
- Introduced a separate writer/reader for a String and a JSON type. That is simply threads it a plain type.
- Small corrections

---------

Co-authored-by: Jonas Konrad <[email protected]>
  • Loading branch information
dstepanov and yawkat authored Jul 23, 2024
1 parent bf860b1 commit f4a7f9d
Show file tree
Hide file tree
Showing 38 changed files with 1,161 additions and 832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Primary;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.naming.NameUtils;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class ApplicationConfiguration {
/**
* @return The default charset to use.
*/
@SuppressWarnings("unchecked")
@NonNull
public Charset getDefaultCharset() {
return defaultCharset;
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/io/micronaut/core/type/Argument.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ static <T> Argument<T> of(
return new DefaultArgument<>(type, null, AnnotationMetadata.EMPTY_METADATA, Collections.emptyMap(), Argument.ZERO_ARGUMENTS);
}

/**
* Creates a new argument for the type of the given instance.
*
* @param instance The argument instance
* @param <T> The generic type
* @return The argument instance
* @since 4.6
*/
@NonNull
static <T> Argument<T> ofInstance(@NonNull T instance) {
return Argument.of((Class<T>) instance.getClass());
}

/**
* Creates a new argument for the given type and name.
*
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/java/io/micronaut/core/type/MutableHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.micronaut.core.type;


import io.micronaut.core.annotation.NonNull;

/**
* Common interface for all mutable header types.
*
Expand Down Expand Up @@ -48,9 +50,26 @@ public interface MutableHeaders extends Headers {
* @return This headers
* @since 1.3.3
*/
default MutableHeaders set(CharSequence header, CharSequence value) {
@NonNull
default MutableHeaders set(@NonNull CharSequence header, @NonNull CharSequence value) {
remove(header);
add(header, value);
return this;
}

/**
* Sets an HTTP header if missing.
*
* @param header The header
* @param value The value
* @return This headers
* @since 4.6
*/
@NonNull
default MutableHeaders setIfMissing(@NonNull CharSequence header, @NonNull CharSequence value) {
if (!contains(header.toString())) {
add(header, value);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ public void bind(
String headerValue = headerAnnotation.stringValue().orElse(null);
MutableHttpHeaders headers = request.getHeaders();

if (StringUtils.isNotEmpty(headerName) &&
StringUtils.isNotEmpty(headerValue) &&
!headers.contains(headerName)
) {
headers.set(headerName, headerValue);
if (StringUtils.isNotEmpty(headerName) && StringUtils.isNotEmpty(headerValue)) {
headers.setIfMissing(headerName, headerValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.lang.annotation.Annotation;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private SslContext buildWebsocketSslContext(DefaultHttpClient.RequestKey request
}
}
}
} else if (configuration.getProxyAddress().isEmpty()){
} else if (configuration.getProxyAddress().isEmpty()) {
throw decorate(new HttpClientException("Cannot send WSS request. SSL is disabled"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.micronaut.http.bind.RequestBinderRegistry;
import io.micronaut.http.body.ChunkedMessageBodyReader;
import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry;
import io.micronaut.http.body.DynamicMessageBodyWriter;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyReader;
import io.micronaut.http.client.BlockingHttpClient;
Expand Down Expand Up @@ -92,7 +91,8 @@
import io.micronaut.http.netty.NettyHttpHeaders;
import io.micronaut.http.netty.NettyHttpRequestBuilder;
import io.micronaut.http.netty.NettyHttpResponseBuilder;
import io.micronaut.http.netty.body.ByteBufRawMessageBodyHandler;
import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler;
import io.micronaut.http.netty.body.NettyCharSequenceBodyWriter;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.http.netty.body.NettyJsonStreamHandler;
import io.micronaut.http.netty.body.NettyWritableBodyWriter;
Expand Down Expand Up @@ -1292,7 +1292,7 @@ protected NettyRequestWriter buildNettyRequest(
}
}

HttpPostRequestEncoder postRequestEncoder = null;
HttpPostRequestEncoder postRequestEncoder;
if (permitsBody) {
Optional<?> body = request.getBody();
boolean hasBody = body.isPresent();
Expand All @@ -1315,16 +1315,17 @@ protected NettyRequestWriter buildNettyRequest(
ByteBuf bodyContent;
if (hasBody) {
Object bodyValue = body.get();
DynamicMessageBodyWriter dynamicWriter = new DynamicMessageBodyWriter(handlerRegistry, List.of(requestContentType));
if (Publishers.isConvertibleToPublisher(bodyValue)) {
boolean isSingle = Publishers.isSingle(bodyValue.getClass());

Publisher<?> publisher = conversionService.convert(bodyValue, Publisher.class).orElseThrow(() ->
new IllegalArgumentException("Unconvertible reactive type: " + bodyValue)
);

Flux<HttpContent> requestBodyPublisher = Flux.from(publisher).map(o -> {
ByteBuffer<?> buffer = dynamicWriter.writeTo(Argument.OBJECT_ARGUMENT, requestContentType, o, request.getHeaders(), byteBufferFactory);
Flux<HttpContent> requestBodyPublisher = Flux.from(publisher).map(value -> {
Argument<Object> type = Argument.ofInstance(value);
ByteBuffer<?> buffer = handlerRegistry.getWriter(type, List.of(requestContentType))
.writeTo(type, requestContentType, value, request.getHeaders(), byteBufferFactory);
return new DefaultHttpContent(((ByteBuf) buffer.asNativeBuffer()));
});

Expand All @@ -1340,7 +1341,9 @@ protected NettyRequestWriter buildNettyRequest(
} else if (bodyValue instanceof CharSequence sequence) {
bodyContent = charSequenceToByteBuf(sequence, requestContentType);
} else {
ByteBuffer<?> buffer = dynamicWriter.writeTo(Argument.OBJECT_ARGUMENT, requestContentType, bodyValue, request.getHeaders(), byteBufferFactory);
Argument<Object> type = Argument.ofInstance(bodyValue);
ByteBuffer<?> buffer = handlerRegistry.getWriter(type, List.of(requestContentType))
.writeTo(type, requestContentType, bodyValue, request.getHeaders(), byteBufferFactory);
bodyContent = (ByteBuf) buffer.asNativeBuffer();
}
if (bodyContent == null) {
Expand Down Expand Up @@ -1792,9 +1795,15 @@ private static MediaTypeCodecRegistry createDefaultMediaTypeRegistry() {

private static MessageBodyHandlerRegistry createDefaultMessageBodyHandlerRegistry() {
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
ContextlessMessageBodyHandlerRegistry registry = new ContextlessMessageBodyHandlerRegistry(applicationConfiguration, NettyByteBufferFactory.DEFAULT, new ByteBufRawMessageBodyHandler(), new NettyWritableBodyWriter(applicationConfiguration));
ContextlessMessageBodyHandlerRegistry registry = new ContextlessMessageBodyHandlerRegistry(
applicationConfiguration,
NettyByteBufferFactory.DEFAULT,
new NettyByteBufMessageBodyHandler(),
new NettyWritableBodyWriter(applicationConfiguration)
);
JsonMapper mapper = JsonMapper.createDefault();
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyJsonHandler<>(mapper));
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyCharSequenceBodyWriter());
registry.add(MediaType.APPLICATION_JSON_STREAM_TYPE, new NettyJsonStreamHandler<>(mapper));
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public <T> Optional<T> getBody(Argument<T> type) {
}

);
if (LOG.isTraceEnabled() && !result.isPresent()) {
if (LOG.isTraceEnabled() && result.isEmpty()) {
LOG.trace("Unable to convert response body to target type {}", type.getType());
}
return result;
Expand All @@ -213,21 +213,22 @@ private <T> Optional<T> convertByteBuf(Argument<T> type) {
}
return Optional.empty();
}
// All content operation should call slice to prevent reading the buffer completely
Optional<MediaType> contentType = getContentType();
if (contentType.isPresent()) {
Optional<MessageBodyReader<T>> reader = handlerRegistry.findReader(type, List.of(contentType.get()));
if (reader.isPresent()) {
MessageBodyReader<T> r = reader.get();
MediaType ct = contentType.get();
if (r.isReadable(type, ct)) {
return Optional.of(r.read(type, ct, headers, NettyByteBufferFactory.DEFAULT.wrap(unpooledContent)));
return Optional.of(r.read(type, ct, headers, NettyByteBufferFactory.DEFAULT.wrap(unpooledContent.slice())));
}
}
} else if (LOG.isTraceEnabled()) {
LOG.trace("Missing or unknown Content-Type received from server.");
}
// last chance, try type conversion
return conversionService.convert(unpooledContent, ByteBuf.class, type);
return conversionService.convert(unpooledContent.slice(), ByteBuf.class, type);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.micronaut.http.netty.body;

import io.micronaut.buffer.netty.NettyByteBufferFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.core.annotation.Experimental;
import io.micronaut.core.annotation.Internal;
Expand All @@ -26,7 +25,8 @@
import io.micronaut.core.type.Headers;
import io.micronaut.core.type.MutableHeaders;
import io.micronaut.http.MediaType;
import io.micronaut.http.body.RawMessageBodyHandler;
import io.micronaut.http.body.ChunkedMessageBodyReader;
import io.micronaut.http.body.TypedMessageBodyHandler;
import io.micronaut.http.codec.CodecException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
Expand All @@ -38,8 +38,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;

/**
* Handler for netty {@link ByteBuf}.
Expand All @@ -51,8 +49,13 @@
@Singleton
@Experimental
@BootstrapContextCompatible
@Bean(typed = RawMessageBodyHandler.class)
public final class ByteBufRawMessageBodyHandler implements RawMessageBodyHandler<ByteBuf> {
public final class NettyByteBufMessageBodyHandler implements TypedMessageBodyHandler<ByteBuf>, ChunkedMessageBodyReader<ByteBuf> {

@Override
public Argument<ByteBuf> getType() {
return Argument.of(ByteBuf.class);
}

@Override
public Publisher<ByteBuf> readChunked(Argument<ByteBuf> type, MediaType mediaType, Headers httpHeaders, Publisher<ByteBuffer<?>> input) {
return Flux.from(input).map(bb -> (ByteBuf) bb.asNativeBuffer());
Expand All @@ -76,6 +79,7 @@ public ByteBuf read(Argument<ByteBuf> type, MediaType mediaType, Headers httpHea
public void writeTo(Argument<ByteBuf> type, MediaType mediaType, ByteBuf object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException {
try {
new ByteBufInputStream(object).transferTo(outputStream);
// ByteBufInputStream#close doesn't release properly
object.release();
} catch (IOException e) {
throw new CodecException("Failed to transfer byte buffer", e);
Expand All @@ -87,8 +91,4 @@ public ByteBuffer<?> writeTo(Argument<ByteBuf> type, MediaType mediaType, ByteBu
return NettyByteBufferFactory.DEFAULT.wrap(object);
}

@Override
public Collection<Class<ByteBuf>> getTypes() {
return List.of(ByteBuf.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 original authors
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,18 +18,14 @@
import io.micronaut.context.annotation.Replaces;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.Headers;
import io.micronaut.core.type.MutableHeaders;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpHeaders;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Consumes;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.body.MessageBodyHandler;
import io.micronaut.http.body.CharSequenceBodyWriter;
import io.micronaut.http.body.MessageBodyWriter;
import io.micronaut.http.body.TextPlainHandler;
import io.micronaut.http.codec.CodecException;
import io.micronaut.http.netty.NettyHttpHeaders;
import io.netty.buffer.ByteBuf;
Expand All @@ -42,21 +38,25 @@
import io.netty.handler.codec.http.HttpVersion;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

/**
* A JSON body should not be escaped or parsed as a JSON value.
*
* @author Denis Stepanov
* @since 4.6
*/
@Singleton
@Replaces(TextPlainHandler.class)
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.TEXT_PLAIN)
@Replaces(CharSequenceBodyWriter.class)
@Internal
final class NettyTextPlainHandler implements MessageBodyHandler<CharSequence>, NettyBodyWriter<CharSequence> {
private final TextPlainHandler defaultHandler = new TextPlainHandler();
public final class NettyCharSequenceBodyWriter implements MessageBodyWriter<CharSequence>, NettyBodyWriter<CharSequence> {
private final CharSequenceBodyWriter defaultHandler = new CharSequenceBodyWriter(StandardCharsets.UTF_8);

@Override
public void writeTo(HttpRequest<?> request, MutableHttpResponse<CharSequence> outgoingResponse, Argument<CharSequence> type, MediaType mediaType, CharSequence object, NettyWriteContext nettyContext) throws CodecException {
MutableHttpHeaders headers = outgoingResponse.getHeaders();
ByteBuf byteBuf = Unpooled.wrappedBuffer(object.toString().getBytes(MessageBodyWriter.getCharset(headers)));
ByteBuf byteBuf = Unpooled.copiedBuffer(object.toString(), MessageBodyWriter.getCharset(mediaType, headers));
NettyHttpHeaders nettyHttpHeaders = (NettyHttpHeaders) headers;
io.netty.handler.codec.http.HttpHeaders nettyHeaders = nettyHttpHeaders.getNettyHeaders();
if (!nettyHttpHeaders.contains(HttpHeaders.CONTENT_TYPE)) {
Expand All @@ -78,8 +78,4 @@ public void writeTo(Argument<CharSequence> type, MediaType mediaType, CharSequen
defaultHandler.writeTo(type, mediaType, object, outgoingHeaders, outputStream);
}

@Override
public String read(Argument<CharSequence> type, MediaType mediaType, Headers httpHeaders, InputStream inputStream) throws CodecException {
return defaultHandler.read(type, mediaType, httpHeaders, inputStream);
}
}
Loading

0 comments on commit f4a7f9d

Please sign in to comment.