From 96066ca7a6c94fe571d0eaa2d4e748bcfb4df663 Mon Sep 17 00:00:00 2001 From: beineng Date: Wed, 24 Oct 2018 14:33:36 +0800 Subject: [PATCH] refactor logging --- admin/backend/src/main/resources/logback.xml | 28 ++++++++ core/src/main/resources/logback.xml | 66 ++++++++++++++----- .../thenetcircle/event_bus/AbstractApp.scala | 4 +- ...MissedEventHandler.scala => Logging.scala} | 19 ++++-- .../event_bus/misc/ZooKeeperManager.scala | 3 +- .../thenetcircle/event_bus/story/Story.scala | 7 +- .../story/StoryZooKeeperListener.scala | 7 +- .../tasks/cassandra/CassandraFallback.scala | 13 ++-- .../event_bus/tasks/http/HttpSink.scala | 39 +++++++---- .../event_bus/tasks/http/HttpSource.scala | 13 ++-- .../event_bus/tasks/kafka/KafkaSink.scala | 21 +++--- .../event_bus/tasks/kafka/KafkaSource.scala | 31 +++++---- .../tasks/tnc/TNCDinoEventsForwarder.scala | 6 +- .../tasks/tnc/TNCKafkaTopicResolver.scala | 15 ++--- .../com/thenetcircle/event_bus/TestBase.scala | 4 +- 15 files changed, 175 insertions(+), 101 deletions(-) create mode 100644 admin/backend/src/main/resources/logback.xml rename core/src/main/scala/com/thenetcircle/event_bus/misc/{MissedEventHandler.scala => Logging.scala} (58%) diff --git a/admin/backend/src/main/resources/logback.xml b/admin/backend/src/main/resources/logback.xml new file mode 100644 index 0000000..7eb5d32 --- /dev/null +++ b/admin/backend/src/main/resources/logback.xml @@ -0,0 +1,28 @@ + + + + + + + + ${EB_LOGPREFIX}.admin.log + true + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n + + + + + + + + + + + \ No newline at end of file diff --git a/core/src/main/resources/logback.xml b/core/src/main/resources/logback.xml index 27c6b76..c133794 100644 --- a/core/src/main/resources/logback.xml +++ b/core/src/main/resources/logback.xml @@ -1,14 +1,16 @@ - + - - - ${EB_LOGFILE:-./log/default}.log + + + + + ${EB_LOGPREFIX}.system.log true - ${EB_LOGFILE:-./log/default}.%d{yyyy-MM-dd}.log + ${EB_LOGPREFIX}.system.%d{yyyy-MM-dd}.log 30 3GB @@ -17,28 +19,38 @@ - - + + - + + + ${EB_LOGPREFIX}.task.log + true + + ${EB_LOGPREFIX}.task.%d{yyyy-MM-dd}.log + 30 + 3GB + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n - - - WARN - + + - - ${EB_MISSED_EVENTS_FILE:-./log/missed_events} + + + ${EB_LOGPREFIX}.missed.log true true + + ${EB_LOGPREFIX}.missed.%d{yyyy-MM-dd_HH}.log + - %msg%n + %msg%n @@ -51,10 +63,23 @@ true + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n + + + + + + WARN + + + - + @@ -62,8 +87,13 @@ - - + + + + + + + \ No newline at end of file diff --git a/core/src/main/scala/com/thenetcircle/event_bus/AbstractApp.scala b/core/src/main/scala/com/thenetcircle/event_bus/AbstractApp.scala index 7302b77..de4476e 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/AbstractApp.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/AbstractApp.scala @@ -19,13 +19,13 @@ package com.thenetcircle.event_bus import akka.actor.ActorSystem import com.thenetcircle.event_bus.context.AppContext +import com.thenetcircle.event_bus.misc.Logging import com.typesafe.config.Config -import com.typesafe.scalalogging.StrictLogging import scala.concurrent.Await import scala.concurrent.duration._ -trait AbstractApp extends StrictLogging { +trait AbstractApp extends Logging { logger.info("Application is initializing.") diff --git a/core/src/main/scala/com/thenetcircle/event_bus/misc/MissedEventHandler.scala b/core/src/main/scala/com/thenetcircle/event_bus/misc/Logging.scala similarity index 58% rename from core/src/main/scala/com/thenetcircle/event_bus/misc/MissedEventHandler.scala rename to core/src/main/scala/com/thenetcircle/event_bus/misc/Logging.scala index 9778fbf..29ca467 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/misc/MissedEventHandler.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/misc/Logging.scala @@ -17,12 +17,21 @@ package com.thenetcircle.event_bus.misc -import com.thenetcircle.event_bus.interfaces.Event -import com.typesafe.scalalogging.StrictLogging +import com.typesafe.scalalogging.{LazyLogging, Logger} +import org.slf4j.LoggerFactory -object MissedEventHandler extends StrictLogging { +trait Logging extends LazyLogging { - def handle(event: Event): Unit = - logger.warn(event.body.data) + protected lazy val taskLogger: Logger = + Logger(LoggerFactory.getLogger(Logging.taskLoggerPrefix + "." + getClass.getName.split('.').last)) + +} + +object Logging { + + val missedLogger: Logger = + Logger(LoggerFactory.getLogger(s"com.thenetcircle.event_bus.misc.Logging.missed")) + + val taskLoggerPrefix = s"com.thenetcircle.event_bus.misc.Logging.task" } diff --git a/core/src/main/scala/com/thenetcircle/event_bus/misc/ZooKeeperManager.scala b/core/src/main/scala/com/thenetcircle/event_bus/misc/ZooKeeperManager.scala index 59810e9..f2aa333 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/misc/ZooKeeperManager.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/misc/ZooKeeperManager.scala @@ -18,7 +18,6 @@ package com.thenetcircle.event_bus.misc import com.thenetcircle.event_bus.context.AppContext -import com.typesafe.scalalogging.StrictLogging import org.apache.curator.framework.imps.CuratorFrameworkState import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode import org.apache.curator.framework.recipes.cache._ @@ -28,7 +27,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry import scala.collection.JavaConverters._ class ZooKeeperManager private (client: CuratorFramework, rootPath: String)(implicit appContext: AppContext) - extends StrictLogging { + extends Logging { val appRootPath: String = appContext.getSystemConfig().getString("app.zookeeper.rootpath") + s"/${appContext.getAppName()}" diff --git a/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala b/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala index 01a445d..f38b6ae 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala @@ -23,13 +23,12 @@ import akka.{Done, NotUsed} import com.thenetcircle.event_bus.context.TaskRunningContext import com.thenetcircle.event_bus.interfaces.EventStatus.{Norm, ToFB} import com.thenetcircle.event_bus.interfaces.{Event, _} -import com.thenetcircle.event_bus.misc.MonitoringHelp +import com.thenetcircle.event_bus.misc.{Logging, MonitoringHelp} import com.thenetcircle.event_bus.story.StoryStatus.StoryStatus -import com.typesafe.scalalogging.StrictLogging import scala.concurrent.Future -import scala.util.{Failure, Success} import scala.util.control.NonFatal +import scala.util.{Failure, Success} case class StorySettings(name: String, status: StoryStatus = StoryStatus.INIT) @@ -39,7 +38,7 @@ class Story( val sinkTask: SinkTask, val transformTasks: Option[List[TransformTask]] = None, val fallbackTask: Option[FallbackTask] = None -) extends StrictLogging +) extends Logging with MonitoringHelp { type Payload = (EventStatus, Event) // middle result type diff --git a/core/src/main/scala/com/thenetcircle/event_bus/story/StoryZooKeeperListener.scala b/core/src/main/scala/com/thenetcircle/event_bus/story/StoryZooKeeperListener.scala index f1ae71f..6e7c719 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/story/StoryZooKeeperListener.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/story/StoryZooKeeperListener.scala @@ -20,15 +20,14 @@ package com.thenetcircle.event_bus.story import akka.actor.{ActorRef, ActorSystem, Cancellable} import com.thenetcircle.event_bus.BuildInfo import com.thenetcircle.event_bus.context.AppContext -import com.thenetcircle.event_bus.misc.{Util, ZooKeeperManager} -import com.typesafe.scalalogging.StrictLogging +import com.thenetcircle.event_bus.misc.{Logging, Util, ZooKeeperManager} import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type._ import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent} import org.apache.curator.framework.recipes.leader.LeaderLatch +import scala.concurrent.duration._ import scala.util.Random import scala.util.control.NonFatal -import scala.concurrent.duration._ object StoryZooKeeperListener { def apply(runnerName: String, storyRunner: ActorRef, storyBuilder: StoryBuilder)( @@ -41,7 +40,7 @@ object StoryZooKeeperListener { class StoryZooKeeperListener(runnerName: String, storyRunner: ActorRef, storyBuilder: StoryBuilder)( implicit appContext: AppContext, system: ActorSystem -) extends StrictLogging { +) extends Logging { require( appContext.getZooKeeperManager().isDefined, diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/cassandra/CassandraFallback.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/cassandra/CassandraFallback.scala index c12a824..bb47db8 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/cassandra/CassandraFallback.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/cassandra/CassandraFallback.scala @@ -28,8 +28,7 @@ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFut import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, InFB, ToFB} import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, FallbackTask, FallbackTaskBuilder} -import com.thenetcircle.event_bus.misc.Util -import com.typesafe.scalalogging.StrictLogging +import com.thenetcircle.event_bus.misc.{Logging, Util} import net.ceedubs.ficus.Ficus._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -37,7 +36,7 @@ import scala.util.control.NonFatal case class CassandraSettings(contactPoints: List[String], port: Int = 9042, parallelism: Int = 2) -class CassandraFallback(val settings: CassandraSettings) extends FallbackTask with StrictLogging { +class CassandraFallback(val settings: CassandraSettings) extends FallbackTask with Logging { private var clusterOption: Option[Cluster] = None private var sessionOption: Option[Session] = None @@ -115,12 +114,16 @@ class CassandraFallback(val settings: CassandraSettings) extends FallbackTask wi .map[(EventStatus, Event)](result => (InFB, event)) .recover { case NonFatal(ex) => - logger.warn(s"sending to cassandra[1] fallback was failed with error $ex") + taskLogger.warn( + s"sending to cassandra[1] fallback was failed with error $ex" + ) (Fail(ex), event) } } catch { case NonFatal(ex) => - logger.debug(s"sending to cassandra[2] fallback failed with error $ex") + taskLogger.debug( + s"sending to cassandra[2] fallback failed with error $ex" + ) Future.successful((Fail(ex), event)) } } diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSink.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSink.scala index a7ed8a8..c48b16d 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSink.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSink.scala @@ -30,12 +30,11 @@ import akka.stream._ import akka.stream.scaladsl.Flow import akka.util.Timeout import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} -import com.thenetcircle.event_bus.misc.Util import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, Norm, ToFB} import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SinkTask, SinkTaskBuilder} +import com.thenetcircle.event_bus.misc.{Logging, Util} import com.thenetcircle.event_bus.tasks.http.HttpSink.RetrySender import com.typesafe.config.Config -import com.typesafe.scalalogging.StrictLogging import net.ceedubs.ficus.Ficus._ import scala.concurrent.ExecutionContext @@ -54,7 +53,7 @@ case class HttpSinkSettings( poolSettings: Option[ConnectionPoolSettings] = None ) -class HttpSink(val settings: HttpSinkSettings) extends SinkTask with StrictLogging { +class HttpSink(val settings: HttpSinkSettings) extends SinkTask with Logging { def createRequest(event: Event): HttpRequest = { var _request = settings.defaultRequest.withEntity(HttpEntity(event.body.data)) @@ -100,15 +99,19 @@ class HttpSink(val settings: HttpSinkSettings) extends SinkTask with StrictLoggi .mapTo[Try[HttpResponse]] .map[(EventStatus, Event)] { case Success(resp) => - logger.info(s"sending event [$eventBrief] to [$endPoint] succeeded.") + taskLogger.info(s"sending event [$eventBrief] to [$endPoint] succeeded.") (Norm, event) case Failure(ex) => - logger.warn(s"sending event [$eventBrief] to [$endPoint] failed with error $ex") + taskLogger.warn( + s"sending event [$eventBrief] to [$endPoint] failed with error $ex" + ) (ToFB(Some(ex)), event) } .recover { case ex: AskTimeoutException => - logger.warn(s"sending event [$eventBrief] to [$endPoint] timeout, exceed [$retryTimeout]") + taskLogger.warn( + s"sending event [$eventBrief] to [$endPoint] timeout, exceed [$retryTimeout]" + ) (Fail(ex), event) } } @@ -124,7 +127,7 @@ class HttpSink(val settings: HttpSinkSettings) extends SinkTask with StrictLoggi } } -object HttpSink { +object HttpSink extends Logging { val RESPONSE_OK = "ok" val RESPONSE_EXPONENTIAL_BACKOFF = "exponential_backoff" @@ -170,7 +173,7 @@ object HttpSink { ) def replyToReceiver(result: Try[HttpResponse], receiver: ActorRef): Unit = { - log.debug(s"replying response to http-sink") + taskLogger.debug(s"replying response to http-sink") receiver ! result } @@ -195,9 +198,13 @@ object HttpSink { case Success(resp) => self.tell(CheckResp(resp, req), receiver) case Failure(ex) => if (retryTimes == 1) - log.warning(s"sending request to $requestUrl failed with error $ex, going to retry now.") + taskLogger.warn( + s"sending request to $requestUrl failed with error $ex, going to retry now." + ) else - log.info(s"resending request to $requestUrl failed with error $ex, retry-times is $retryTimes") + taskLogger.info( + s"resending request to $requestUrl failed with error $ex, retry-times is $retryTimes" + ) self.tell(Retry(req), receiver) } @@ -209,11 +216,15 @@ object HttpSink { .byteStringUnmarshaller(entity) .map { _body => val body = _body.utf8String - log.debug(s"get response from upstream with status code ${status.value} and body $body") + taskLogger.info( + s"get response from upstream with status code ${status.value} and body $body" + ) if (body == RESPONSE_OK) { replyToReceiver(Success(resp), receiver) } else if (body == RESPONSE_EXPONENTIAL_BACKOFF) { - log.debug(s"going to retry now since got retry signal from the endpoint") + taskLogger.info( + s"going to retry now since got retry signal from the endpoint" + ) self.tell(Retry(req), receiver) } else { // the response body was not expected @@ -225,7 +236,7 @@ object HttpSink { } } else { val errorMsg = s"get response from upstream with non-200 [$status] status code" - log.debug(errorMsg) + taskLogger.debug(errorMsg) resp.discardEntityBytes() replyToReceiver(Failure(new UnexpectedResponseException(errorMsg)), receiver) } @@ -236,7 +247,7 @@ object HttpSink { } } -class HttpSinkBuilder() extends SinkTaskBuilder with StrictLogging { +class HttpSinkBuilder() extends SinkTaskBuilder with Logging { override def build( configString: String diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSource.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSource.scala index 77c45a2..669c4e6 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSource.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/http/HttpSource.scala @@ -31,9 +31,8 @@ import com.thenetcircle.event_bus.event.extractor.DataFormat.DataFormat import com.thenetcircle.event_bus.event.extractor.{DataFormat, EventExtractingException, EventExtractorFactory} import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, Norm, SuccStatus, ToFB} import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SourceTask, SourceTaskBuilder} -import com.thenetcircle.event_bus.misc.Util +import com.thenetcircle.event_bus.misc.{Logging, Util} import com.typesafe.config.Config -import com.typesafe.scalalogging.StrictLogging import net.ceedubs.ficus.Ficus._ import scala.concurrent.duration._ @@ -48,7 +47,7 @@ case class HttpSourceSettings( serverSettings: Option[ServerSettings] = None ) -class HttpSource(val settings: HttpSourceSettings) extends SourceTask with StrictLogging { +class HttpSource(val settings: HttpSourceSettings) extends SourceTask with Logging { def createResponse(result: (EventStatus, Event)): HttpResponse = result match { @@ -82,13 +81,13 @@ class HttpSource(val settings: HttpSourceSettings) extends SourceTask with Stric .mapAsync(1)(request => { unmarshaller(request.entity) .map[(EventStatus, Event)](event => { - logger.info("received a new event: " + Util.getBriefOfEvent(event)) - logger.debug(s"event content: $event") + taskLogger.info("received a new event: " + Util.getBriefOfEvent(event)) + taskLogger.debug(s"event content: $event") (Norm, event) }) .recover { case ex: EventExtractingException => - logger.warn(s"extract event from a http request failed with error $ex") + taskLogger.warn(s"extract event from a http request failed with error $ex") (Fail(ex), EventImpl.createFromFailure(ex)) } }) @@ -141,7 +140,7 @@ class HttpSource(val settings: HttpSourceSettings) extends SourceTask with Stric } } -class HttpSourceBuilder() extends SourceTaskBuilder with StrictLogging { +class HttpSourceBuilder() extends SourceTaskBuilder with Logging { override def build( configString: String diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala index da34c01..24b9395 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSink.scala @@ -30,9 +30,8 @@ import akka.stream.stage._ import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} import com.thenetcircle.event_bus.interfaces.EventStatus.Norm import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SinkTask, SinkTaskBuilder} -import com.thenetcircle.event_bus.misc.{MissedEventHandler, Util} +import com.thenetcircle.event_bus.misc.{Logging, Util} import com.thenetcircle.event_bus.tasks.kafka.extended.{EventSerializer, KafkaKey, KafkaKeySerializer, KafkaPartitioner} -import com.typesafe.scalalogging.StrictLogging import net.ceedubs.ficus.Ficus._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.KafkaException @@ -51,7 +50,7 @@ case class KafkaSinkSettings( asyncBufferSize: Int = 100 ) -class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLogging { +class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with Logging { require(settings.bootstrapServers.nonEmpty, "bootstrap servers is required.") @@ -86,7 +85,7 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog implicit runningContext: TaskRunningContext ): Envelope[ProducerKey, ProducerValue, Event] = { val record = createProducerRecord(event) - logger.debug(s"new kafka record $record is created") + taskLogger.debug(s"new kafka record $record is created") Message(record, event) } @@ -142,7 +141,7 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog val kafkaBrief = s"topic: ${metadata.topic()}, partition: ${metadata.partition()}, offset: ${metadata .offset()}, key: ${Option(message.record.key()).map(_.rawData).getOrElse("")}" - logger.info(s"sending event [$eventBrief] to kafka [$kafkaBrief] succeeded.") + taskLogger.info(s"sending event [$eventBrief] to kafka [$kafkaBrief] succeeded.") (Norm, message.passThrough) } @@ -163,7 +162,7 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog } } -object KafkaSink { +object KafkaSink extends Logging { def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] = { val decider: Supervision.Decider = { @@ -198,12 +197,12 @@ object KafkaSink { override def createLogic( inheritedAttributes: Attributes - ): GraphStageLogic = new GraphStageLogic(shape) with InHandler with StageLogging { + ): GraphStageLogic = new GraphStageLogic(shape) with InHandler { private val buffer: util.Queue[Event] = new LinkedBlockingQueue(bufferSize) private def flushBuffer(): Unit = while (!buffer.isEmpty) { - MissedEventHandler.handle(buffer.poll()) + Logging.missedLogger.warn(buffer.poll().body.data) } override def onPush(): Unit = { @@ -217,8 +216,10 @@ object KafkaSink { push(out1, event) } else { if (!buffer.offer(event)) { // if the buffer is full - log.warning("A event [" + Util.getBriefOfEvent(event) + "] is dropped since the AsyncBuffer is full.") - MissedEventHandler.handle(event) + taskLogger.warn( + "A event [" + Util.getBriefOfEvent(event) + "] is dropped since the AsyncBuffer is full." + ) + Logging.missedLogger.warn(event.body.data) } } } diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala index ff425f4..4d79c65 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala @@ -26,20 +26,19 @@ import akka.{Done, NotUsed} import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} import com.thenetcircle.event_bus.event.EventImpl import com.thenetcircle.event_bus.event.extractor.{EventExtractingException, EventExtractorFactory} -import com.thenetcircle.event_bus.misc.Util import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, Norm, SuccStatus, ToFB} import com.thenetcircle.event_bus.interfaces._ +import com.thenetcircle.event_bus.misc.{Logging, Util} import com.thenetcircle.event_bus.tasks.kafka.extended.KafkaKeyDeserializer -import com.typesafe.scalalogging.StrictLogging import net.ceedubs.ficus.Ficus._ import org.apache.kafka.common.serialization.ByteArrayDeserializer import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} import scala.util.control.NonFatal +import scala.util.{Failure, Success} -class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with StrictLogging { +class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Logging { require(settings.bootstrapServers.nonEmpty, "bootstrap servers is required.") @@ -115,14 +114,14 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str val kafkaBrief = s"topic: $kafkaTopic, partition: ${message.record.partition()}, offset: ${message.record .offset()}, key: ${Option(message.record.key()).map(_.rawData).getOrElse("")}" - logger.info(s"extracted a new event: [$eventBrief] from kafka: [$kafkaBrief]") + taskLogger.info(s"extracted a new event: [$eventBrief] from kafka: [$kafkaBrief]") (Norm, eve) }) .recover { case ex: EventExtractingException => val eventFormat = eventExtractor.getFormat() - logger.warn( + taskLogger.warn( s"The event read from Kafka was extracting failed with format: $eventFormat and error: $ex" ) ( @@ -148,7 +147,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str val kafkaConsumerSettings = getConsumerSettings() val kafkaSubscription = getSubscription() - logger.info(s"going to subscribe kafka topics: $kafkaSubscription") + taskLogger.info(s"going to subscribe kafka topics: $kafkaSubscription") val (killSwitch, doneFuture) = Consumer @@ -157,7 +156,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str .mapAsyncUnordered(settings.maxConcurrentPartitions) { case (topicPartition, source) => try { - logger.info( + taskLogger.info( s"A new topicPartition $topicPartition was assigned to runner ${runningContext.getStoryRunnerName()}." ) @@ -168,30 +167,30 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str case (_: SuccStatus, event) => event.getPassThrough[CommittableOffset] match { case Some(co) => - logger.debug(s"The event ${event.uuid} is adding to the kafka batch committer") + taskLogger.debug(s"The event ${event.uuid} is adding to the kafka batch committer") // co.commitScaladsl() // the commit logic Success(co) case None => val errorMessage = s"The event ${event.uuid} missed PassThrough[CommittableOffset]" - logger.error(errorMessage) + taskLogger.error(errorMessage) throw new IllegalStateException(errorMessage) } case (ToFB(exOp), event) => - logger.error( + taskLogger.error( s"Event ${event.uuid} reaches the end with ToFB status" + exOp.map(e => s" and error ${e.getMessage}").getOrElse("") ) throw new RuntimeException("Non handled ToFB status") case (Fail(ex), event) => - logger.error(s"Event ${event.uuid} reaches the end with error $ex") + taskLogger.error(s"Event ${event.uuid} reaches the end with error $ex") // complete the stream if failure, before was using Future.successful(Done) throw ex } // TODO some test .recover { case NonFatal(ex) => - logger.info( + taskLogger.info( s"The substream listening on topicPartition $topicPartition was failed with error: $ex, " + s"Not it's recovered to be a Failure()" ) @@ -207,7 +206,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str .toMat(Sink.ignore)(Keep.right) .run() .map(done => { - logger + taskLogger .info( s"The substream listening on topicPartition $topicPartition was completed." ) @@ -215,14 +214,14 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Str }) .recover { // recover after run, to recover the stream running status case NonFatal(ex) => - logger.error( + taskLogger.error( s"The substream listening on topicPartition $topicPartition was failed with error: $ex" ) Done } } catch { case NonFatal(ex) ⇒ - logger.error( + taskLogger.error( s"Could not materialize topic $topicPartition listening stream with error: $ex" ) throw ex diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCDinoEventsForwarder.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCDinoEventsForwarder.scala index afefc8d..ebf3329 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCDinoEventsForwarder.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCDinoEventsForwarder.scala @@ -22,11 +22,11 @@ import akka.stream.scaladsl.Flow import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} import com.thenetcircle.event_bus.interfaces.EventStatus.Norm import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, TransformTask, TransformTaskBuilder} -import com.typesafe.scalalogging.StrictLogging +import com.thenetcircle.event_bus.misc.Logging import scala.util.matching.Regex -class TNCDinoEventsForwarder() extends TransformTask with StrictLogging { +class TNCDinoEventsForwarder() extends TransformTask with Logging { def appendTitleField(event: Event): Event = { val verbOption = event.getExtra("verb") @@ -35,7 +35,7 @@ class TNCDinoEventsForwarder() extends TransformTask with StrictLogging { val newTitle = "dino." + shortGroup + verbOption.get val newBody = event.body.data.replaceFirst(Regex.quote("{"), s"""{"title": "$newTitle",""") - logger.debug(s"appending new group: $shortGroup, new title: $newTitle to the event ${event.uuid}") + taskLogger.debug(s"appending new group: $shortGroup, new title: $newTitle to the event ${event.uuid}") event.withNoTopic().withName(newTitle).withBody(newBody) } else { event diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCKafkaTopicResolver.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCKafkaTopicResolver.scala index 589dd23..6292688 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCKafkaTopicResolver.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/tnc/TNCKafkaTopicResolver.scala @@ -22,8 +22,7 @@ import akka.stream.scaladsl.Flow import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext} import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, Norm} import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, TransformTask, TransformTaskBuilder} -import com.thenetcircle.event_bus.misc.{Util, ZooKeeperManager} -import com.typesafe.scalalogging.StrictLogging +import com.thenetcircle.event_bus.misc.{Logging, Util, ZooKeeperManager} import org.apache.curator.framework.recipes.cache.NodeCache import spray.json._ @@ -37,9 +36,7 @@ object TopicInfoProtocol extends DefaultJsonProtocol { implicit val topicInfoFormat = jsonFormat3(TopicInfo) } -class TNCKafkaTopicResolver(zkManager: ZooKeeperManager, val _defaultTopic: String) - extends TransformTask - with StrictLogging { +class TNCKafkaTopicResolver(zkManager: ZooKeeperManager, val _defaultTopic: String) extends TransformTask with Logging { import TopicInfoProtocol._ @@ -118,7 +115,7 @@ class TNCKafkaTopicResolver(zkManager: ZooKeeperManager, val _defaultTopic: Stri case Success(newEvent) => (Norm, newEvent) case Failure(ex) => - logger.error(s"resolve kafka topic failed with error $ex") + taskLogger.error(s"resolve kafka topic failed with error $ex") (Fail(ex), event) } }) @@ -149,16 +146,16 @@ class TNCKafkaTopicResolver(zkManager: ZooKeeperManager, val _defaultTopic: Stri def resolveEvent(event: Event): Event = { if (event.metadata.topic.isDefined) { - logger.info(s"event ${event.uuid} has topic ${event.metadata.topic.get} already, will not resolve it.") + taskLogger.info(s"event ${event.uuid} has topic ${event.metadata.topic.get} already, will not resolve it.") return event } if (event.metadata.name.isEmpty && event.metadata.channel.isEmpty) { - logger.debug(s"event ${event.uuid} has no name and channel, will be send to default topic $defaultTopic.") + taskLogger.info(s"event ${event.uuid} has no name and channel, will be send to default topic $defaultTopic.") return event.withTopic(defaultTopic) } val newTopic = getTopicFromIndex(event).getOrElse(defaultTopic) - logger.debug(s"event ${event.uuid} has been resolved to new topic $newTopic") + taskLogger.info(s"event ${event.uuid} has been resolved to new topic $newTopic") event.withTopic(newTopic) } diff --git a/core/src/test/scala/com/thenetcircle/event_bus/TestBase.scala b/core/src/test/scala/com/thenetcircle/event_bus/TestBase.scala index e1af073..152c09d 100644 --- a/core/src/test/scala/com/thenetcircle/event_bus/TestBase.scala +++ b/core/src/test/scala/com/thenetcircle/event_bus/TestBase.scala @@ -24,9 +24,9 @@ import com.thenetcircle.event_bus.context.{AppContext, TaskBuildingContext, Task import com.thenetcircle.event_bus.event.EventImpl import com.thenetcircle.event_bus.event.extractor.DataFormat import com.thenetcircle.event_bus.interfaces.{Event, EventBody, EventMetaData} +import com.thenetcircle.event_bus.misc.Logging import com.thenetcircle.event_bus.story.{StoryBuilder, StorySettings, TaskBuilderFactory} import com.typesafe.config.ConfigFactory -import com.typesafe.scalalogging.StrictLogging import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} import scala.concurrent.ExecutionContext @@ -38,7 +38,7 @@ abstract class TestBase(_appContext: AppContext) with FlatSpecLike with Matchers with BeforeAndAfterAll - with StrictLogging { + with Logging { implicit val defaultTimeOut: FiniteDuration = 3.seconds lazy implicit val appContext: AppContext = _appContext