Skip to content

Commit

Permalink
Revert changes on pipeline connection initialization (#154)
Browse files Browse the repository at this point in the history
* Revert "Instantiate Jedis client lazily and only once per JVM process (#152)"

This reverts commit 58ebe0f.

Signed-off-by: khorshuheng <[email protected]>

* Revert "Retry connection failure while instantiating connection provider (#144)"

This reverts commit fcb80a0.

Signed-off-by: khorshuheng <[email protected]>

Co-authored-by: khorshuheng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Jun 29, 2022
1 parent 66ace80 commit 929a7d1
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 126 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
*/
package feast.ingestion.stores.redis

import feast.ingestion.errorhanders.RetryStrategy
import redis.clients.jedis.exceptions.JedisClusterOperationException
import redis.clients.jedis.{ClusterPipeline, DefaultJedisClientConfig, HostAndPort}
import redis.clients.jedis.providers.ClusterConnectionProvider

import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success, Try}

/**
* Provide pipeline for Redis cluster.
Expand All @@ -35,17 +31,7 @@ case class ClusterPipelineProvider(endpoint: RedisEndpoint) extends PipelineProv
.builder()
.password(endpoint.password)
.build()
val MAX_RECONNECTION_ATTEMPT = 2
val RETRY_INTERVAL = 2.seconds
val provider = RetryStrategy.fixedBackOff(RETRY_INTERVAL, MAX_RECONNECTION_ATTEMPT)(getProvider)

def getProvider: Either[JedisClusterOperationException, ClusterConnectionProvider] = {
Try { new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG) } match {
case Success(provider) => Right(provider)
case Failure(e: JedisClusterOperationException) => Left(e)
case Failure(e) => throw e
}
}
val provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)

/**
* @return a cluster pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint)

lazy val pipelineProvider: PipelineProvider = if (isClusterMode) {
ClusterPipelineProvider(endpoint)
} else {
SingleNodePipelineProvider(newJedisClient(endpoint))
}

def newJedisClient(endpoint: RedisEndpoint): Jedis = {
val jedis = new Jedis(endpoint.host, endpoint.port)
if (endpoint.password.nonEmpty) {
Expand Down Expand Up @@ -101,6 +95,12 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");

val pipelineProvider = if (isClusterMode) {
ClusterPipelineProvider(endpoint)
} else {
SingleNodePipelineProvider(newJedisClient(endpoint))
}

// grouped iterator to only allocate memory for a portion of rows
partition.grouped(properties.pipelineSize).foreach { batch =>
// group by key and keep only latest row per each key
Expand Down Expand Up @@ -146,6 +146,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
}
writePipeline.close()
}
pipelineProvider.close()
}
dataToStore.unpersist()
}
Expand Down

This file was deleted.

0 comments on commit 929a7d1

Please sign in to comment.