diff --git a/src/main/scala/redis/Redis.scala b/src/main/scala/redis/Redis.scala index 49530be9..5d48a867 100644 --- a/src/main/scala/redis/Redis.scala +++ b/src/main/scala/redis/Redis.scala @@ -10,6 +10,8 @@ import redis.api.pubsub._ import java.util.concurrent.atomic.AtomicLong import akka.event.Logging +import scala.concurrent.duration.FiniteDuration + trait RedisCommands extends Keys with Strings @@ -23,7 +25,7 @@ trait RedisCommands with Server with HyperLogLog -abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisDispatcher) extends ActorRequest { +abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisDispatcher, connectTimeout: Option[FiniteDuration] = None) extends ActorRequest { var host: String var port: Int val name: String @@ -31,9 +33,8 @@ abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisD val db: Option[Int] = None implicit val executionContext = system.dispatchers.lookup(redisDispatcher.name) - val redisConnection: ActorRef = system.actorOf( - Props(classOf[RedisClientActor], new InetSocketAddress(host, port), getConnectOperations, - onConnectStatus, redisDispatcher.name) + val redisConnection: ActorRef = system.actorOf(RedisClientActor.props(new InetSocketAddress(host, port), + getConnectOperations, onConnectStatus, redisDispatcher.name, connectTimeout) .withDispatcher(redisDispatcher.name), name + '-' + Redis.tempName() ) @@ -51,7 +52,7 @@ abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisD db.foreach(redis.select(_)) } - def onConnectStatus(): (Boolean) => Unit = (status: Boolean) => { + def onConnectStatus: (Boolean) => Unit = (status: Boolean) => { } @@ -76,10 +77,11 @@ case class RedisClient(var host: String = "localhost", var port: Int = 6379, override val password: Option[String] = None, override val db: Option[Int] = None, - name: String = "RedisClient") + name: String = "RedisClient", + connectTimeout: Option[FiniteDuration] = None) (implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher - ) extends RedisClientActorLike(_system, redisDispatcher) with RedisCommands with Transactions { + ) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with RedisCommands with Transactions { } @@ -87,10 +89,11 @@ case class RedisBlockingClient(var host: String = "localhost", var port: Int = 6379, override val password: Option[String] = None, override val db: Option[Int] = None, - name: String = "RedisBlockingClient") + name: String = "RedisBlockingClient", + connectTimeout: Option[FiniteDuration] = None) (implicit _system: ActorSystem, redisDispatcher: RedisDispatcher = Redis.dispatcher - ) extends RedisClientActorLike(_system, redisDispatcher) with BLists { + ) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with BLists { } case class RedisPubSub( diff --git a/src/main/scala/redis/RedisPool.scala b/src/main/scala/redis/RedisPool.scala index 00be014b..53b4e26b 100644 --- a/src/main/scala/redis/RedisPool.scala +++ b/src/main/scala/redis/RedisPool.scala @@ -77,10 +77,9 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi } def makeRedisClientActor(server: RedisServer, active: Ref[Boolean]): ActorRef = { - system.actorOf( - Props(classOf[RedisClientActor], new InetSocketAddress(server.host, server.port), - getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name) - .withDispatcher(redisDispatcher.name), + system.actorOf(RedisClientActor.props(new InetSocketAddress(server.host, server.port), + getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name) + .withDispatcher(redisDispatcher.name), name + '-' + Redis.tempName() ) } diff --git a/src/main/scala/redis/actors/RedisClientActor.scala b/src/main/scala/redis/actors/RedisClientActor.scala index bbaaeec8..5279714b 100644 --- a/src/main/scala/redis/actors/RedisClientActor.scala +++ b/src/main/scala/redis/actors/RedisClientActor.scala @@ -1,14 +1,26 @@ package redis.actors -import akka.util.{ByteString, ByteStringBuilder} import java.net.InetSocketAddress -import redis.{RedisDispatcher, Redis, Operation, Transaction} + +import akka.actor.SupervisorStrategy.Stop import akka.actor._ +import akka.util.{ByteString, ByteStringBuilder} +import redis.{Operation, Transaction} + import scala.collection.mutable -import akka.actor.SupervisorStrategy.Stop +import scala.concurrent.duration.FiniteDuration + +object RedisClientActor { + + def props( address: InetSocketAddress, getConnectOperations: () => Seq[Operation[_, _]], + onConnectStatus: Boolean => Unit, + dispatcherName: String, + connectTimeout: Option[FiniteDuration] = None) = + Props(new RedisClientActor(address, getConnectOperations, onConnectStatus, dispatcherName, connectTimeout)) +} class RedisClientActor(override val address: InetSocketAddress, getConnectOperations: () => - Seq[Operation[_, _]], onConnectStatus: Boolean => Unit, dispatcherName: String) extends RedisWorkerIO(address,onConnectStatus) { + Seq[Operation[_, _]], onConnectStatus: Boolean => Unit, dispatcherName: String, connectTimeout: Option[FiniteDuration] = None) extends RedisWorkerIO(address, onConnectStatus, connectTimeout) { import context._ @@ -24,7 +36,7 @@ class RedisClientActor(override val address: InetSocketAddress, getConnectOperat var queuePromises = mutable.Queue[Operation[_, _]]() def writing: Receive = { - case op : Operation[_,_] => + case op: Operation[_, _] => queuePromises enqueue op write(op.redisCommand.encodedRequest) case Transaction(commands) => { diff --git a/src/main/scala/redis/actors/RedisWorkerIO.scala b/src/main/scala/redis/actors/RedisWorkerIO.scala index 6ceb514e..32202217 100644 --- a/src/main/scala/redis/actors/RedisWorkerIO.scala +++ b/src/main/scala/redis/actors/RedisWorkerIO.scala @@ -10,8 +10,9 @@ import akka.io.Tcp.Register import akka.io.Tcp.Connect import akka.io.Tcp.CommandFailed import akka.io.Tcp.Received +import scala.concurrent.duration.FiniteDuration -abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Boolean => Unit ) extends Actor with ActorLogging { +abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Boolean => Unit, connectTimeout: Option[FiniteDuration] = None) extends Actor with ActorLogging { private var currAddress = address @@ -33,7 +34,7 @@ abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Bo log.info(s"Connect to $currAddress") // Create a new InetSocketAddress to clear the cached IP address. currAddress = new InetSocketAddress(currAddress.getHostName, currAddress.getPort) - tcp ! Connect(currAddress, options = SO.KeepAlive(on = true) :: Nil) + tcp ! Connect(remoteAddress = currAddress, options = SO.KeepAlive(on = true) :: Nil, timeout = connectTimeout) } def reconnect() = {