From ed3f91069208a22282691e754db6581aef945409 Mon Sep 17 00:00:00 2001 From: Simone Esposito Date: Wed, 19 Feb 2020 11:48:30 +0100 Subject: [PATCH 1/3] Add log on http call failure (#59) --- .../dvs/producer/aviationedge/services/AviationFlow.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/AviationFlow.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/AviationFlow.scala index 447e907..f6e3c18 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/AviationFlow.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/AviationFlow.scala @@ -28,6 +28,7 @@ class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends .flatMap(body => unmarshalBody(body, uri.path.toString)) .recover { case NonFatal(ex) => + logger.warn(s"Error on call: $uri, $ex") List(Left(ErrorMessageJson(uri.path.toString, ex.getMessage, "", Instant.now))) } } From 5b3106d287dfe96c0a3df850e58b99fe2fb607e6 Mon Sep 17 00:00:00 2001 From: Simone Esposito Date: Wed, 19 Feb 2020 12:29:59 +0100 Subject: [PATCH 2/3] Fix monitoringFlow --- project/Dependencies.scala | 2 +- .../dvs/producer/aviationedge/model/Definition.scala | 6 +++--- .../dvs/producer/aviationedge/services/Graphs.scala | 12 ++++++------ .../dvs/producer/aviationedge/TestValues.scala | 5 +++-- .../producer/aviationedge/services/GraphsSpec.scala | 11 ++++++----- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ce2890a..5e4ccef 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { lazy val JakartaWsRs = "2.1.6" lazy val Kafka = "2.4.0" lazy val KafkaCommons = "0.0.8" - lazy val KafkaDVS = "1.0.13" + lazy val KafkaDVS = "1.0.15" lazy val LogbackClassic = "1.2.3" lazy val PureConfig = "0.12.2" lazy val ScalaLogging = "3.9.2" diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/model/Definition.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/model/Definition.scala index b09a49c..af983f8 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/model/Definition.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/model/Definition.scala @@ -20,9 +20,9 @@ case class ErrorMessageJson( case class MonitoringMessageJson( messageReceivedOn: Instant, - minUpdated: Instant, - maxUpdated: Instant, - averageUpdated: Instant, + minUpdated: Option[Instant], + maxUpdated: Option[Instant], + averageUpdated: Option[Instant], numErrors: Int, numValid: Int, numInvalid: Int, diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala index 78f92cf..2a78b49 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala @@ -61,18 +61,18 @@ object Graphs { } val numValid = validFlights.size - val minUpdated = validFlights.minBy(_.system.updated).system.updated - val averageUpdated = validFlights.view.map(_.system.updated).sum / numValid - val maxUpdated = validFlights.maxBy(_.system.updated).system.updated + val minUpdated = validFlights.view.map(_.system.updated).reduceOption(_ min _) + val averageUpdated = validFlights.view.map(_.system.updated).reduceOption(_ + _).map(_ / numValid) + val maxUpdated = validFlights.view.map(_.system.updated).reduceOption(_ max _) val numErrors = flightMessages.count(_.isLeft) val total = flightMessages.size val numInvalid = total - numValid - numErrors MonitoringMessageJson( messageReceivedOn = Instant.now, - minUpdated = Instant.ofEpochSecond(minUpdated), - maxUpdated = Instant.ofEpochSecond(maxUpdated), - averageUpdated = Instant.ofEpochSecond(averageUpdated), + minUpdated = minUpdated.map(Instant.ofEpochSecond), + maxUpdated = maxUpdated.map(Instant.ofEpochSecond), + averageUpdated = averageUpdated.map(Instant.ofEpochSecond), numErrors = numErrors, numValid = numValid, numInvalid = numInvalid, diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/TestValues.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/TestValues.scala index 52a0ebc..653f37e 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/TestValues.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/TestValues.scala @@ -42,8 +42,9 @@ trait TestValues { final val ErrorMessage = ErrorMessageJson(Path, "a message", "a failed json", Timestamp) final val ExpectedParserError = ParserError(Path, "a message", "a failed json", Timestamp) - final val MonitoringMessage = MonitoringMessageJson(Timestamp, Timestamp, Timestamp, Timestamp, 0, 0, 0, 0) - final val ExpectedMonitoringMessage = FlightRequestComputationStatus(Timestamp, Timestamp, Timestamp, Timestamp, 0, 0, 0, 0) + final val MonitoringMessage = MonitoringMessageJson(Timestamp, Some(Timestamp), Some(Timestamp), Some(Timestamp), 0, 0, 0, 0) + final val ExpectedMonitoringMessage = + FlightRequestComputationStatus(Timestamp, Some(Timestamp), Some(Timestamp), Some(Timestamp), 0, 0, 0, 0) final val ValidAirlineMessage = AirlineMessageJson(0, "", "", "", "", "active", 0, "", "") final val InvalidAirlineMessage = AirlineMessageJson(0, "", "", "", "", "invalid status", 0, "", "") diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala index 2e0a5a3..53f85c0 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala @@ -11,7 +11,7 @@ import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MessageJson import it.bitrock.dvs.producer.aviationedge.services.Graphs._ import it.bitrock.testcommons.Suite import net.manub.embeddedkafka.schemaregistry._ -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.wordspec.AnyWordSpecLike @@ -27,7 +27,8 @@ class GraphsSpec with EmbeddedKafka with TestValues with ScalaFutures - with LazyLogging { + with LazyLogging + with OptionValues { private val timeout = Timeout(3.seconds) "graphs" should { @@ -74,9 +75,9 @@ class GraphsSpec whenReady(futureMonitoring, timeout) { m => m.size shouldBe 1 - m.head.minUpdated shouldBe Instant.ofEpochSecond(MinUpdated) - m.head.maxUpdated shouldBe Instant.ofEpochSecond(MaxUpdated) - m.head.averageUpdated shouldBe Instant.ofEpochSecond((MinUpdated + MaxUpdated + Updated) / 3) + m.head.minUpdated.value shouldBe Instant.ofEpochSecond(MinUpdated) + m.head.maxUpdated.value shouldBe Instant.ofEpochSecond(MaxUpdated) + m.head.averageUpdated.value shouldBe Instant.ofEpochSecond((MinUpdated + MaxUpdated + Updated) / 3) m.head.numErrors shouldBe 1 m.head.numValid shouldBe 4 m.head.numInvalid shouldBe 1 From 704c44088621eb7e37ef158c2f86ee8ec80f3973 Mon Sep 17 00:00:00 2001 From: Simone Esposito Date: Wed, 19 Feb 2020 12:38:45 +0100 Subject: [PATCH 3/3] Add test in GraphSpec for no valid messages --- .../aviationedge/services/GraphsSpec.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala index 53f85c0..7e2a260 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/GraphsSpec.scala @@ -11,10 +11,10 @@ import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MessageJson import it.bitrock.dvs.producer.aviationedge.services.Graphs._ import it.bitrock.testcommons.Suite import net.manub.embeddedkafka.schemaregistry._ -import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.{BeforeAndAfterAll, OptionValues} import scala.concurrent.Future import scala.concurrent.duration._ @@ -55,7 +55,7 @@ class GraphsSpec } } - "produce monitoring messages to monitoring sink" in { + "produce monitoring messages to monitoring sink when there are valid messages" in { val source = Source.single( List( Right(FlightMessage), @@ -84,5 +84,28 @@ class GraphsSpec m.head.total shouldBe 6 } } + + "produce monitoring messages to monitoring sink when there are no valid messages" in { + val source = Source.single( + List( + Left(ErrorMessage.copy(errorSource = "/v2/public/flights")) + ) + ) + val monitoringSink: Sink[MonitoringMessageJson, Future[List[MonitoringMessageJson]]] = + Sink.fold[List[MonitoringMessageJson], MonitoringMessageJson](Nil)(_ :+ _) + + val futureMonitoring = source.viaMat(monitoringGraph(monitoringSink))(Keep.right).to(Sink.ignore).run() + + whenReady(futureMonitoring, timeout) { m => + m.size shouldBe 1 + m.head.minUpdated shouldBe empty + m.head.maxUpdated shouldBe empty + m.head.averageUpdated shouldBe empty + m.head.numErrors shouldBe 1 + m.head.numValid shouldBe 0 + m.head.numInvalid shouldBe 0 + m.head.total shouldBe 1 + } + } } }