Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add support for controlling the NettyTransport's byteBuf allocator type. #1707

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other platforms
tcp-reuse-addr = off-for-windows

# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
# pooled : use a PooledByteBufAllocator.DEFAULT
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
# adaptive : use an AdaptiveByteBufAllocator
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
bytebuf-allocator-type = "pooled"

# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }

import com.typesafe.config.Config

import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
Expand All @@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }

import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.buffer.{
AdaptiveByteBufAllocator,
ByteBufAllocator,
PooledByteBufAllocator,
Unpooled,
UnpooledByteBufAllocator
}
import io.netty.channel.{
Channel,
ChannelFuture,
Expand Down Expand Up @@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}

val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))

val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
Expand Down Expand Up @@ -318,6 +327,17 @@ private[transport] object NettyTransport {
systemName: String,
hostName: Option[String]): Option[Address] =
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)

def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}
}

@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
Expand Down Expand Up @@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)

// Use the same allocator for inbound and outbound buffers
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)

settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package org.apache.pekko.remote

import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator

import scala.concurrent.duration._
import scala.annotation.nowarn

import language.postfixOps

import org.apache.pekko
Expand Down Expand Up @@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
import pekko.remote.classic.transport.netty.NettyTransportSpec._
import pekko.remote.transport.NettyTransportSpec._
import pekko.testkit.SocketUtil

trait BindCanonicalAddressBehaviors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.classic.transport.netty

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel

import scala.concurrent.Await
import scala.concurrent.duration.Duration
package org.apache.pekko.remote.transport

import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator }

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
import pekko.remote.BoundAddressesExtension
import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
import pekko.testkit.SocketUtil

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object NettyTransportSpec {
val commonConfig = ConfigFactory.parseString("""
pekko.actor.provider = remote
Expand Down Expand Up @@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior {

Await.result(sys.terminate(), Duration.Inf)
}

"be able to specify byte buffer allocator" in {
deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT)
deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT)

{
val allocator = deriveByteBufAllocator("unpooled-heap")
allocator shouldBe a[UnpooledByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: true") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive-heap")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

}
}
}

Expand Down
Loading