Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add support for controlling the NettyTransport's byteBuf allocator type. #1707

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other platforms
tcp-reuse-addr = off-for-windows

# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
# pooled : use a PooledByteBufAllocator.DEFAULT
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
# adaptive : use an AdaptiveByteBufAllocator
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
bytebuf-allocator-type = "pooled"

# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }

import com.typesafe.config.Config

import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
Expand All @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }

import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.buffer.{
AdaptiveByteBufAllocator,
ByteBufAllocator,
PooledByteBufAllocator,
Unpooled,
UnpooledByteBufAllocator
}
import io.netty.channel.{
Channel,
ChannelFuture,
Expand Down Expand Up @@ -160,6 +167,17 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}

val ByteBufAllocator: ByteBufAllocator = getString("bytebuf-allocator-type") match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
He-Pin marked this conversation as resolved.
Show resolved Hide resolved
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}

val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
Expand Down Expand Up @@ -442,6 +460,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)

// Use the same allocator for inbound and outbound buffers
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)

settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package org.apache.pekko.remote

import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator

import scala.concurrent.duration._
import scala.annotation.nowarn

import language.postfixOps

import org.apache.pekko
Expand Down Expand Up @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")
Expand Down
Loading