Skip to content

Commit

Permalink
chore: Add support for controlling the NettyTransport's byteBuf alloc…
Browse files Browse the repository at this point in the history
…ator type. (apache#1707)

* chore: Add support for controlling the NettyTransport's byteBuf allocator type.

* chore: extract deriveByteBufAllocator method

(cherry picked from commit dbc9ed3)
  • Loading branch information
He-Pin committed Jan 14, 2025
1 parent 9d8b12f commit f540b83
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
8 changes: 8 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other platforms
tcp-reuse-addr = off-for-windows

# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
# pooled : use a PooledByteBufAllocator.DEFAULT
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
# adaptive : use an AdaptiveByteBufAllocator
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
bytebuf-allocator-type = "pooled"

# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }

import com.typesafe.config.Config

import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
Expand All @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }

import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.buffer.{
AdaptiveByteBufAllocator,
ByteBufAllocator,
PooledByteBufAllocator,
Unpooled,
UnpooledByteBufAllocator
}
import io.netty.channel.{
Channel,
ChannelFuture,
Expand Down Expand Up @@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}

val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))

val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
Expand Down Expand Up @@ -318,6 +327,17 @@ private[transport] object NettyTransport {
systemName: String,
hostName: Option[String]): Option[Address] =
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)

def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}
}

@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
Expand Down Expand Up @@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)

// Use the same allocator for inbound and outbound buffers
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)

settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package org.apache.pekko.remote

import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator

import scala.concurrent.duration._
import scala.annotation.nowarn

import language.postfixOps

import org.apache.pekko
Expand Down Expand Up @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
import pekko.remote.classic.transport.netty.NettyTransportSpec._
import pekko.remote.transport.NettyTransportSpec._
import pekko.testkit.SocketUtil

trait BindCanonicalAddressBehaviors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.classic.transport.netty

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel

import scala.concurrent.Await
import scala.concurrent.duration.Duration
package org.apache.pekko.remote.transport

import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator }

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
import pekko.remote.BoundAddressesExtension
import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
import pekko.testkit.SocketUtil

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object NettyTransportSpec {
val commonConfig = ConfigFactory.parseString("""
pekko.actor.provider = remote
Expand Down Expand Up @@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior {

Await.result(sys.terminate(), Duration.Inf)
}

"be able to specify byte buffer allocator" in {
deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT)
deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT)

{
val allocator = deriveByteBufAllocator("unpooled-heap")
allocator shouldBe a[UnpooledByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: true") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive-heap")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

}
}
}

Expand Down

0 comments on commit f540b83

Please sign in to comment.