Skip to content

Commit

Permalink
Retry connection failure while instantiating connection provider (#144)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored May 16, 2022
1 parent 2871ba6 commit fcb80a0
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2022 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.errorhanders

import scala.annotation.tailrec
import scala.concurrent.duration.Duration

object RetryStrategy {

@tailrec
def fixedBackOff[T <: Exception, U](retryInterval: Duration, maxRetries: Int)(
fn: => Either[T, U]
): U = {
fn match {
case Right(x) => x
case Left(_) if maxRetries > 0 => {
Thread.sleep(retryInterval.toMillis)
fixedBackOff(retryInterval, maxRetries - 1)(fn)
}
case Left(e) => throw (e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package feast.ingestion.stores.redis

import feast.ingestion.errorhanders.RetryStrategy
import redis.clients.jedis.exceptions.JedisClusterOperationException
import redis.clients.jedis.{ClusterPipeline, DefaultJedisClientConfig, HostAndPort}
import redis.clients.jedis.providers.ClusterConnectionProvider

import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success, Try}

/**
* Provide pipeline for Redis cluster.
Expand All @@ -31,7 +35,17 @@ case class ClusterPipelineProvider(endpoint: RedisEndpoint) extends PipelineProv
.builder()
.password(endpoint.password)
.build()
val provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)
val MAX_RECONNECTION_ATTEMPT = 2
val RETRY_INTERVAL = 2.seconds
val provider = RetryStrategy.fixedBackOff(RETRY_INTERVAL, MAX_RECONNECTION_ATTEMPT)(getProvider)

def getProvider: Either[JedisClusterOperationException, ClusterConnectionProvider] = {
Try { new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG) } match {
case Success(provider) => Right(provider)
case Failure(e: JedisClusterOperationException) => Left(e)
case Failure(e) => throw e
}
}

/**
* @return a cluster pipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2022 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.errorhandlers

import feast.ingestion.UnitSpec
import feast.ingestion.errorhanders.RetryStrategy

import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Try}

class RetryStrategySpec extends UnitSpec {

case class SomeException(message: String) extends Exception

"function that always succeed" should "not be retried" in {
var i = 0

def alwaysSucceedFunc: Either[SomeException, Int] = {
i += 1
Right(0)
}
val result = RetryStrategy.fixedBackOff(1.second, 2)(alwaysSucceedFunc)
i should be(1)
result should be(0)
}

"function that always failed" should "be retried up to the maximum attempt and throw exception" in {
var i = 0

def alwaysFailFunc: Either[SomeException, Int] = {
i += 1
Left(SomeException("error"))
}
val result = Try { RetryStrategy.fixedBackOff(10.milli, 2)(alwaysFailFunc) }
i should be(3)
result should matchPattern { case Failure(_: SomeException) => }
}

"function that succeeds when retried" should "immediately return value when succeeded" in {
var i = 0

def succeedWhenRetriedFunc: Either[SomeException, Int] = {
i += 1
if (i < 2) {
Left(SomeException("error"))
} else {
Right(0)
}
}
val result = RetryStrategy.fixedBackOff(1.second, 2)(succeedWhenRetriedFunc)
i should be(2)
result should be(0)
}
}

0 comments on commit fcb80a0

Please sign in to comment.