diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 4c4edff858e..4dfb6fedbf7 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -621,6 +621,14 @@ pekko { # "off-for-windows" of course means that it's "on" for all other platforms tcp-reuse-addr = off-for-windows + # Used to control the Netty 4's ByteBufAllocator. The default is "pooled". + # pooled : use a PooledByteBufAllocator.DEFAULT + # unpooled : use an UnpooledByteBufAllocator.DEFAULT + # unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false` + # adaptive : use an AdaptiveByteBufAllocator + # adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false` + bytebuf-allocator-type = "pooled" + # Used to configure the number of I/O worker threads on server sockets server-socket-worker-pool { # Min number of threads to cap factor-based number to diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 0956548fdfe..4016c99bcc3 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -24,6 +24,7 @@ import scala.util.Try import scala.util.control.{ NoStackTrace, NonFatal } import com.typesafe.config.Config + import org.apache.pekko import pekko.ConfigurationException import pekko.OnlyCauseStackTrace @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring import pekko.util.{ Helpers, OptionVal } import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap } -import io.netty.buffer.Unpooled +import io.netty.buffer.{ + AdaptiveByteBufAllocator, + ByteBufAllocator, + PooledByteBufAllocator, + Unpooled, + UnpooledByteBufAllocator +} import io.netty.channel.{ Channel, ChannelFuture, @@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) { case _ => getBoolean("tcp-reuse-addr") } + val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type")) + val Hostname: String = getString("hostname") match { case "" => InetAddress.getLocalHost.getHostAddress case value => value @@ -318,6 +327,17 @@ private[transport] object NettyTransport { systemName: String, hostName: Option[String]): Option[Address] = addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None) + + def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match { + case "pooled" => PooledByteBufAllocator.DEFAULT + case "unpooled" => UnpooledByteBufAllocator.DEFAULT + case "unpooled-heap" => new UnpooledByteBufAllocator(false) + case "adaptive" => new AdaptiveByteBufAllocator() + case "adaptive-heap" => new AdaptiveByteBufAllocator(false) + case other => throw new IllegalArgumentException( + "Unknown 'bytebuf-allocator-type' [" + other + "]," + + " supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.") + } } @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") @@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + // Use the same allocator for inbound and outbound buffers + bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator) + bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator) + settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz => diff --git a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala index cff4fad3af9..62434d809ef 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala @@ -13,9 +13,11 @@ package org.apache.pekko.remote -import scala.concurrent.duration._ +import io.netty.buffer.PooledByteBufAllocator +import scala.concurrent.duration._ import scala.annotation.nowarn + import language.postfixOps import org.apache.pekko @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec(""" TcpNodelay should ===(true) TcpKeepalive should ===(true) TcpReuseAddr should ===(!Helpers.isWindows) + ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT) c.getString("hostname") should ===("") c.getString("bind-hostname") should ===("") c.getString("bind-port") should ===("") diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala index 5f44b956521..58a6c222b7d 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala @@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.apache.pekko import pekko.actor.{ ActorSystem, Address } -import pekko.remote.classic.transport.netty.NettyTransportSpec._ +import pekko.remote.transport.NettyTransportSpec._ import pekko.testkit.SocketUtil trait BindCanonicalAddressBehaviors { diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala similarity index 85% rename from remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala rename to remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala index 3f84936b8d0..436994de582 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala @@ -11,23 +11,25 @@ * Copyright (C) 2018-2022 Lightbend Inc. */ -package org.apache.pekko.remote.classic.transport.netty - -import java.net.{ InetAddress, InetSocketAddress } -import java.nio.channels.ServerSocketChannel - -import scala.concurrent.Await -import scala.concurrent.duration.Duration +package org.apache.pekko.remote.transport import com.typesafe.config.ConfigFactory -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec +import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator } import org.apache.pekko import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem } import pekko.remote.BoundAddressesExtension +import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator import pekko.testkit.SocketUtil +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.net.{ InetAddress, InetSocketAddress } +import java.nio.channels.ServerSocketChannel +import scala.concurrent.Await +import scala.concurrent.duration.Duration + object NettyTransportSpec { val commonConfig = ConfigFactory.parseString(""" pekko.actor.provider = remote @@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior { Await.result(sys.terminate(), Duration.Inf) } + + "be able to specify byte buffer allocator" in { + deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT) + deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT) + + { + val allocator = deriveByteBufAllocator("unpooled-heap") + allocator shouldBe a[UnpooledByteBufAllocator] + allocator.toString.contains("directByDefault: false") should ===(true) + } + + { + val allocator = deriveByteBufAllocator("adaptive") + allocator shouldBe a[AdaptiveByteBufAllocator] + allocator.toString.contains("directByDefault: true") should ===(true) + } + + { + val allocator = deriveByteBufAllocator("adaptive-heap") + allocator shouldBe a[AdaptiveByteBufAllocator] + allocator.toString.contains("directByDefault: false") should ===(true) + } + + } } }