diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 4c4edff858..4dfb6fedbf 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -621,6 +621,14 @@ pekko { # "off-for-windows" of course means that it's "on" for all other platforms tcp-reuse-addr = off-for-windows + # Used to control the Netty 4's ByteBufAllocator. The default is "pooled". + # pooled : use a PooledByteBufAllocator.DEFAULT + # unpooled : use an UnpooledByteBufAllocator.DEFAULT + # unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false` + # adaptive : use an AdaptiveByteBufAllocator + # adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false` + bytebuf-allocator-type = "pooled" + # Used to configure the number of I/O worker threads on server sockets server-socket-worker-pool { # Min number of threads to cap factor-based number to diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 0956548fdf..c45d2d671b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -24,6 +24,7 @@ import scala.util.Try import scala.util.control.{ NoStackTrace, NonFatal } import com.typesafe.config.Config + import org.apache.pekko import pekko.ConfigurationException import pekko.OnlyCauseStackTrace @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring import pekko.util.{ Helpers, OptionVal } import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap } -import io.netty.buffer.Unpooled +import io.netty.buffer.{ + AdaptiveByteBufAllocator, + ByteBufAllocator, + PooledByteBufAllocator, + Unpooled, + UnpooledByteBufAllocator +} import io.netty.channel.{ Channel, ChannelFuture, @@ -160,6 +167,17 @@ class NettyTransportSettings(config: Config) { case _ => getBoolean("tcp-reuse-addr") } + val ByteBufAllocator: ByteBufAllocator = getString("bytebuf-allocator-type") match { + case "pooled" => PooledByteBufAllocator.DEFAULT + case "unpooled" => UnpooledByteBufAllocator.DEFAULT + case "unpooled-heap" => new UnpooledByteBufAllocator(false) + case "adaptive" => new AdaptiveByteBufAllocator() + case "adaptive-heap" => new AdaptiveByteBufAllocator(false) + case other => throw new IllegalArgumentException( + "Unknown 'bytebuf-allocator-type' [" + other + "]," + + " supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.") + } + val Hostname: String = getString("hostname") match { case "" => InetAddress.getLocalHost.getHostAddress case value => value @@ -442,6 +460,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + // Use the same allocator for inbound and outbound buffers + bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator) + bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator) + settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz => diff --git a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala index cff4fad3af..62434d809e 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala @@ -13,9 +13,11 @@ package org.apache.pekko.remote -import scala.concurrent.duration._ +import io.netty.buffer.PooledByteBufAllocator +import scala.concurrent.duration._ import scala.annotation.nowarn + import language.postfixOps import org.apache.pekko @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec(""" TcpNodelay should ===(true) TcpKeepalive should ===(true) TcpReuseAddr should ===(!Helpers.isWindows) + ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT) c.getString("hostname") should ===("") c.getString("bind-hostname") should ===("") c.getString("bind-port") should ===("")