Skip to content

Commit

Permalink
small improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
beineng committed Oct 22, 2018
1 parent ff4efec commit d9aeb96
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import com.thenetcircle.event_bus.tasks.kafka.extended.{EventSerializer, KafkaKe
import com.typesafe.scalalogging.StrictLogging
import net.ceedubs.ficus.Ficus._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.RetriableException

import scala.concurrent.duration._

Expand Down Expand Up @@ -91,7 +93,7 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog
def createProducerRecord(event: Event)(
implicit runningContext: TaskRunningContext
): ProducerRecord[ProducerKey, ProducerValue] = {
var topic: String = event.metadata.topic.getOrElse(settings.defaultTopic)
val topic: String = event.metadata.topic.getOrElse(settings.defaultTopic)
val key: ProducerKey = KafkaKey(event)
val value: ProducerValue = event
// val timestamp: Long = event.createdAt.getTime
Expand Down Expand Up @@ -164,8 +166,14 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog
object KafkaSink {

def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] = {
val decider: Supervision.Decider = {
case _: RetriableException => Supervision.Resume
case _: KafkaException => Supervision.Stop
case _: Throwable => Supervision.Stop
}

val producingSink: Sink[Event, _] = producingFlow
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.to(Sink.ignore)

Flow
Expand Down

0 comments on commit d9aeb96

Please sign in to comment.