Skip to content

Commit

Permalink
chore: extract deriveByteBufAllocator method
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 14, 2025
1 parent 5a4a9a0 commit f0b1852
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,7 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}

val ByteBufAllocator: ByteBufAllocator = getString("bytebuf-allocator-type") match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}
val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))

val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
Expand Down Expand Up @@ -336,6 +327,17 @@ private[transport] object NettyTransport {
systemName: String,
hostName: Option[String]): Option[Address] =
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)

def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}
}

@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
import pekko.remote.classic.transport.netty.NettyTransportSpec._
import pekko.remote.transport.NettyTransportSpec._
import pekko.testkit.SocketUtil

trait BindCanonicalAddressBehaviors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.classic.transport.netty

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel

import scala.concurrent.Await
import scala.concurrent.duration.Duration
package org.apache.pekko.remote.transport

import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator }

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
import pekko.remote.BoundAddressesExtension
import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
import pekko.testkit.SocketUtil

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object NettyTransportSpec {
val commonConfig = ConfigFactory.parseString("""
pekko.actor.provider = remote
Expand Down Expand Up @@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior {

Await.result(sys.terminate(), Duration.Inf)
}

"be able to specify byte buffer allocator" in {
deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT)
deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT)

{
val allocator = deriveByteBufAllocator("unpooled-heap")
allocator shouldBe a[UnpooledByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: true") should ===(true)
}

{
val allocator = deriveByteBufAllocator("adaptive-heap")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}

}
}
}

Expand Down

0 comments on commit f0b1852

Please sign in to comment.