Skip to content

Commit

Permalink
Merge pull request #60 from bitrockteam/release/1.1.6
Browse files Browse the repository at this point in the history
Release/1.1.6
  • Loading branch information
simoexpo authored Feb 19, 2020
2 parents a7a5aee + 704c440 commit 713b72f
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 18 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
lazy val JakartaWsRs = "2.1.6"
lazy val Kafka = "2.4.0"
lazy val KafkaCommons = "0.0.8"
lazy val KafkaDVS = "1.0.13"
lazy val KafkaDVS = "1.0.15"
lazy val LogbackClassic = "1.2.3"
lazy val PureConfig = "0.12.2"
lazy val ScalaLogging = "3.9.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ case class ErrorMessageJson(

case class MonitoringMessageJson(
messageReceivedOn: Instant,
minUpdated: Instant,
maxUpdated: Instant,
averageUpdated: Instant,
minUpdated: Option[Instant],
maxUpdated: Option[Instant],
averageUpdated: Option[Instant],
numErrors: Int,
numValid: Int,
numInvalid: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends
.flatMap(body => unmarshalBody(body, uri.path.toString))
.recover {
case NonFatal(ex) =>
logger.warn(s"Error on call: $uri, $ex")
List(Left(ErrorMessageJson(uri.path.toString, ex.getMessage, "", Instant.now)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ object Graphs {
}

val numValid = validFlights.size
val minUpdated = validFlights.minBy(_.system.updated).system.updated
val averageUpdated = validFlights.view.map(_.system.updated).sum / numValid
val maxUpdated = validFlights.maxBy(_.system.updated).system.updated
val minUpdated = validFlights.view.map(_.system.updated).reduceOption(_ min _)
val averageUpdated = validFlights.view.map(_.system.updated).reduceOption(_ + _).map(_ / numValid)
val maxUpdated = validFlights.view.map(_.system.updated).reduceOption(_ max _)
val numErrors = flightMessages.count(_.isLeft)
val total = flightMessages.size
val numInvalid = total - numValid - numErrors

MonitoringMessageJson(
messageReceivedOn = Instant.now,
minUpdated = Instant.ofEpochSecond(minUpdated),
maxUpdated = Instant.ofEpochSecond(maxUpdated),
averageUpdated = Instant.ofEpochSecond(averageUpdated),
minUpdated = minUpdated.map(Instant.ofEpochSecond),
maxUpdated = maxUpdated.map(Instant.ofEpochSecond),
averageUpdated = averageUpdated.map(Instant.ofEpochSecond),
numErrors = numErrors,
numValid = numValid,
numInvalid = numInvalid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ trait TestValues {
final val ErrorMessage = ErrorMessageJson(Path, "a message", "a failed json", Timestamp)
final val ExpectedParserError = ParserError(Path, "a message", "a failed json", Timestamp)

final val MonitoringMessage = MonitoringMessageJson(Timestamp, Timestamp, Timestamp, Timestamp, 0, 0, 0, 0)
final val ExpectedMonitoringMessage = FlightRequestComputationStatus(Timestamp, Timestamp, Timestamp, Timestamp, 0, 0, 0, 0)
final val MonitoringMessage = MonitoringMessageJson(Timestamp, Some(Timestamp), Some(Timestamp), Some(Timestamp), 0, 0, 0, 0)
final val ExpectedMonitoringMessage =
FlightRequestComputationStatus(Timestamp, Some(Timestamp), Some(Timestamp), Some(Timestamp), 0, 0, 0, 0)

final val ValidAirlineMessage = AirlineMessageJson(0, "", "", "", "", "active", 0, "", "")
final val InvalidAirlineMessage = AirlineMessageJson(0, "", "", "", "", "invalid status", 0, "", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MessageJson
import it.bitrock.dvs.producer.aviationedge.services.Graphs._
import it.bitrock.testcommons.Suite
import net.manub.embeddedkafka.schemaregistry._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterAll, OptionValues}

import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -27,7 +27,8 @@ class GraphsSpec
with EmbeddedKafka
with TestValues
with ScalaFutures
with LazyLogging {
with LazyLogging
with OptionValues {
private val timeout = Timeout(3.seconds)

"graphs" should {
Expand All @@ -54,7 +55,7 @@ class GraphsSpec
}
}

"produce monitoring messages to monitoring sink" in {
"produce monitoring messages to monitoring sink when there are valid messages" in {
val source = Source.single(
List(
Right(FlightMessage),
Expand All @@ -74,14 +75,37 @@ class GraphsSpec

whenReady(futureMonitoring, timeout) { m =>
m.size shouldBe 1
m.head.minUpdated shouldBe Instant.ofEpochSecond(MinUpdated)
m.head.maxUpdated shouldBe Instant.ofEpochSecond(MaxUpdated)
m.head.averageUpdated shouldBe Instant.ofEpochSecond((MinUpdated + MaxUpdated + Updated) / 3)
m.head.minUpdated.value shouldBe Instant.ofEpochSecond(MinUpdated)
m.head.maxUpdated.value shouldBe Instant.ofEpochSecond(MaxUpdated)
m.head.averageUpdated.value shouldBe Instant.ofEpochSecond((MinUpdated + MaxUpdated + Updated) / 3)
m.head.numErrors shouldBe 1
m.head.numValid shouldBe 4
m.head.numInvalid shouldBe 1
m.head.total shouldBe 6
}
}

"produce monitoring messages to monitoring sink when there are no valid messages" in {
val source = Source.single(
List(
Left(ErrorMessage.copy(errorSource = "/v2/public/flights"))
)
)
val monitoringSink: Sink[MonitoringMessageJson, Future[List[MonitoringMessageJson]]] =
Sink.fold[List[MonitoringMessageJson], MonitoringMessageJson](Nil)(_ :+ _)

val futureMonitoring = source.viaMat(monitoringGraph(monitoringSink))(Keep.right).to(Sink.ignore).run()

whenReady(futureMonitoring, timeout) { m =>
m.size shouldBe 1
m.head.minUpdated shouldBe empty
m.head.maxUpdated shouldBe empty
m.head.averageUpdated shouldBe empty
m.head.numErrors shouldBe 1
m.head.numValid shouldBe 0
m.head.numInvalid shouldBe 0
m.head.total shouldBe 1
}
}
}
}

0 comments on commit 713b72f

Please sign in to comment.