From d9aeb96418b62e403475ae3dba7865b60aea0a83 Mon Sep 17 00:00:00 2001 From: beineng Date: Mon, 22 Oct 2018 16:29:39 +0800 Subject: [PATCH] small improvement --- .../event_bus/tasks/kafka/KafkaSink.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala index fc458c9..da34c01 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala @@ -35,6 +35,8 @@ import com.thenetcircle.event_bus.tasks.kafka.extended.{EventSerializer, KafkaKe import com.typesafe.scalalogging.StrictLogging import net.ceedubs.ficus.Ficus._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.RetriableException import scala.concurrent.duration._ @@ -91,7 +93,7 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog def createProducerRecord(event: Event)( implicit runningContext: TaskRunningContext ): ProducerRecord[ProducerKey, ProducerValue] = { - var topic: String = event.metadata.topic.getOrElse(settings.defaultTopic) + val topic: String = event.metadata.topic.getOrElse(settings.defaultTopic) val key: ProducerKey = KafkaKey(event) val value: ProducerValue = event // val timestamp: Long = event.createdAt.getTime @@ -164,8 +166,14 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog object KafkaSink { def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] = { + val decider: Supervision.Decider = { + case _: RetriableException => Supervision.Resume + case _: KafkaException => Supervision.Stop + case _: Throwable => Supervision.Stop + } + val producingSink: Sink[Event, _] = producingFlow - .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .withAttributes(ActorAttributes.supervisionStrategy(decider)) .to(Sink.ignore) Flow