Skip to content

Commit

Permalink
chore: Add support for controlling the NettyTransport's byteBuf alloc…
Browse files Browse the repository at this point in the history
…ator type.
  • Loading branch information
He-Pin committed Jan 13, 2025
1 parent 9844a1b commit 7230af8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
8 changes: 8 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other platforms
tcp-reuse-addr = off-for-windows

# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
# pooled : use a PooledByteBufAllocator.DEFAULT
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
# adaptive : use an AdaptiveByteBufAllocator with prefer direct `true`
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
bytebuf-allocator-type = "pooled"

# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }

import com.typesafe.config.Config

import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
Expand All @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }

import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.buffer.{
AdaptiveByteBufAllocator,
ByteBufAllocator,
PooledByteBufAllocator,
Unpooled,
UnpooledByteBufAllocator
}
import io.netty.channel.{
Channel,
ChannelFuture,
Expand Down Expand Up @@ -160,6 +167,17 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}

val ByteBufAllocator: ByteBufAllocator = getString("bytebuf-allocator-type") match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}

val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
Expand Down Expand Up @@ -442,6 +460,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)

// Use the same allocator for inbound and outbound buffers
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)

settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package org.apache.pekko.remote

import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator

import scala.concurrent.duration._
import scala.annotation.nowarn

import language.postfixOps

import org.apache.pekko
Expand Down Expand Up @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")
Expand Down

0 comments on commit 7230af8

Please sign in to comment.