Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor #78

Open
wants to merge 3 commits into
base: feature/dvs-247-refactor-api-provider
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package it.bitrock.dvs.producer.aviationedge
import akka.actor.ActorSystem
import com.typesafe.scalalogging.LazyLogging
import it.bitrock.dvs.producer.aviationedge.model._
import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._
import it.bitrock.dvs.producer.aviationedge.services.MainFunctions._
import it.bitrock.dvs.producer.aviationedge.services.context.AviationStreamContext._
import it.bitrock.dvs.producer.aviationedge.services.context.OpenSkyStreamContext._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ trait ToValuePair[J, K, V] {

@SuppressWarnings(Array("scalafix:DisableSyntax.null"))
object ToValuePair {
implicit def messageJsonValuePair[A]: ToValuePair[A, Key, Flight.Value] =
j => (j.asInstanceOf[FlightMessageJson].flight.icaoNumber, j.asInstanceOf[FlightMessageJson].toFlightRaw)
implicit val flightValuePair: ToValuePair[FlightMessageJson, Key, Flight.Value] = j => (j.flight.icaoNumber, j.toFlightRaw)
implicit val airplaneValuePair: ToValuePair[AirplaneMessageJson, Key, Airplane.Value] = j =>
(j.numberRegistration, j.toAirplaneRaw)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Graphs {
jsonSource: Source[Either[ErrorMessageJson, Message], SourceMat],
rawSink: Sink[Message, SinkMat],
errorSink: Sink[ErrorMessageJson, SinkMat],
invalidFlightSink: Sink[Message, SinkMat]
invalidFlightSink: Sink[FlightMessageJson, SinkMat]
): RunnableGraph[(SourceMat, SinkMat, SinkMat, SinkMat)] = RunnableGraph.fromGraph(
GraphDSL.create(jsonSource, rawSink, errorSink, invalidFlightSink)((a, b, c, d) => (a, b, c, d)) {
implicit builder => (source, raw, error, invalidFlight) =>
Expand All @@ -24,12 +24,19 @@ object Graphs {
val collectErrors = builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Left(x) => x })
val collectRaws =
builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Right(x) => x }.filter(filterFunction))
val collectInvalid = builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Right(x) => x })
val collectInvalidFlight = builder.add(
Flow
.fromFunction[Either[ErrorMessageJson, Message], Option[FlightMessageJson]] {
case Right(x: FlightMessageJson) => Some(x)
case _ => None
}
.collect { case Some(x) => x }
)

source ~> partition
partition.out(ErrorPort) ~> collectErrors ~> error
partition.out(RawPort) ~> collectRaws ~> raw
partition.out(InvalidPort) ~> collectInvalid ~> invalidFlight
partition.out(InvalidPort) ~> collectInvalidFlight ~> invalidFlight

ClosedShape
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,52 +26,55 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply)
implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply)

def aviationEdgePayloadJsonReader[A]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = {
case jsArray: JsArray => jsArrayToResponsePayload(jsArray)
implicit val flightStatesJsonReader: JsonReader[FlightStateJson] = new JsonReader[FlightStateJson] {
override def read(json: JsValue): FlightStateJson =
json match {
case jsArray: JsArray =>
Try(
FlightStateJson(
callsign = jsArray.elements(1).convertTo[String].trim.toUpperCase,
time_position = jsArray.elements(3).convertTo[Long],
longitude = jsArray.elements(5).convertTo[Double],
latitude = jsArray.elements(6).convertTo[Double],
velocity = jsArray.elements(9).convertTo[Double],
true_track = jsArray.elements(10).convertTo[Double],
geo_altitude = jsArray.elements(13).convertTo[Double]
)
).getOrElse(deserializationError("Invalid JsArray for FlightStateJson, got " + jsArray))
case jsValue => deserializationError("Expected FlightStateJson as JsArray, but got " + jsValue)
}
}

def aviationEdgePayloadJsonReader[A: JsonReader]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = {
case jsArray: JsArray => jsArrayToResponsePayload[A](jsArray)
case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now)))
}

def openSkyResponsePayloadJsonFormat[A]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = {
case jsObject: JsObject => jsObjectToResponsePayload(jsObject)
def openSkyResponsePayloadJsonFormat[A: JsonReader]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = {
case jsObject: JsObject => jsObjectToResponsePayload[A](jsObject)
case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now)))
}

implicit def unmarshallerFrom[A](rf: RootJsonReader[A]): Unmarshaller[String, A] =
_fromStringUnmarshallerFromByteStringUnmarshaller(sprayJsonByteStringUnmarshaller(rf))

private def jsObjectToResponsePayload[A <: FlightStateJson](json: JsObject): List[Either[ErrorMessageJson, A]] =
private def jsObjectToResponsePayload[A: JsonReader](json: JsObject): List[Either[ErrorMessageJson, A]] =
Try(json.convertTo[FlightStatesJson]) match {
case Failure(ex) => List(Left(ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now)))
case Success(flightStates) =>
flightStates.states.map { state =>
Try(
FlightStateJson(
callsign = state(1).convertTo[String].trim.toUpperCase,
time_position = state(3).convertTo[Long],
longitude = state(5).convertTo[Double],
latitude = state(6).convertTo[Double],
velocity = state(9).convertTo[Double],
true_track = state(10).convertTo[Double],
geo_altitude = state(13).convertTo[Double]
).asInstanceOf[A]
JsArray(state.toVector).convertTo[A]
).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now))
}
}

private def jsArrayToResponsePayload[A <: MessageJson](json: JsArray): List[Either[ErrorMessageJson, A]] =
private def jsArrayToResponsePayload[A: JsonReader](json: JsArray): List[Either[ErrorMessageJson, A]] =
json
.asInstanceOf[JsArray]
.elements
.map(json =>
Try(
json.asJsObject match {
case j: JsObject if j.getFields("flight") != Seq() => json.convertTo[FlightMessageJson].asInstanceOf[A]
case j: JsObject if j.getFields("airplaneId") != Seq() => json.convertTo[AirplaneMessageJson].asInstanceOf[A]
case j: JsObject if j.getFields("airportId") != Seq() => json.convertTo[AirportMessageJson].asInstanceOf[A]
case j: JsObject if j.getFields("airlineId") != Seq() => json.convertTo[AirlineMessageJson].asInstanceOf[A]
case j: JsObject if j.getFields("cityId") != Seq() => json.convertTo[CityMessageJson].asInstanceOf[A]
}
).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now))
Try(json.convertTo[A]).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now))
)
.toList
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import it.bitrock.dvs.producer.aviationedge.services.context.{
AviationStreamContext,
OpenSkyStreamContext
}
import spray.json.JsonReader

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -29,7 +30,7 @@ object MainFunctions {
Http().bindAndHandle(routes.routes, host, port)
}

def runAviationEdgeStream[A: ApiProviderStreamContext]()(
def runAviationEdgeStream[A: ApiProviderStreamContext: JsonReader]()(
implicit system: ActorSystem,
ec: ExecutionContext
): (Cancellable, Future[Done], Future[Done], Future[Done]) = {
Expand All @@ -43,15 +44,15 @@ object MainFunctions {
)
val rawSink = AviationStreamContext[A].sink(kafkaConfig)
val errorSink = SideStreamContext.errorSink(kafkaConfig)
val invalidFlightSink = SideStreamContext.invalidSink(kafkaConfig)
val invalidFlightSink = SideStreamContext.invalidFlightSink(kafkaConfig)
val monitoringSink = SideStreamContext.monitoringSink(kafkaConfig)

val jsonSource = tickSource.via(aviationFlow).via(monitoringGraph(monitoringSink)).mapConcat(identity)

mainGraph(jsonSource, rawSink, errorSink, invalidFlightSink).run()
}

def runOpenSkyStream[A: ApiProviderStreamContext]()(
def runOpenSkyStream[A: ApiProviderStreamContext: JsonReader]()(
implicit system: ActorSystem,
ec: ExecutionContext
): (Cancellable, Future[Done]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.stream.scaladsl.Sink
import it.bitrock.dvs.producer.aviationedge.config.KafkaConfig
import it.bitrock.dvs.producer.aviationedge.kafka.KafkaTypes.{Error, Flight, Key, Monitoring}
import it.bitrock.dvs.producer.aviationedge.kafka.{KafkaSinkFactory, ProducerSettingsFactory}
import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MonitoringMessageJson}
import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, FlightMessageJson, MonitoringMessageJson}

import scala.concurrent.Future

Expand All @@ -21,8 +21,8 @@ object SideStreamContext {
new KafkaSinkFactory[MonitoringMessageJson, Key, Monitoring.Value](kafkaConfig.monitoringTopic, producerSettings).sink
}

def invalidSink[A](kafkaConfig: KafkaConfig)(implicit system: ActorSystem): Sink[A, Future[Done]] = {
def invalidFlightSink(kafkaConfig: KafkaConfig)(implicit system: ActorSystem): Sink[FlightMessageJson, Future[Done]] = {
val producerSettings = ProducerSettingsFactory.from[Flight.Value](kafkaConfig)
new KafkaSinkFactory[A, Key, Flight.Value](kafkaConfig.invalidFlightRawTopic, producerSettings).sink
new KafkaSinkFactory[FlightMessageJson, Key, Flight.Value](kafkaConfig.invalidFlightRawTopic, producerSettings).sink
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.testkit.TestKit
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import it.bitrock.dvs.producer.aviationedge.TestValues
import it.bitrock.dvs.producer.aviationedge.kafka.KafkaTypes.{Flight, Key}
import it.bitrock.dvs.producer.aviationedge.model.MessageJson
import it.bitrock.dvs.producer.aviationedge.model.FlightMessageJson
import it.bitrock.kafkacommons.serialization.ImplicitConversions._
import it.bitrock.testcommons.{FixtureLoanerAnyResult, Suite}
import net.manub.embeddedkafka.schemaregistry._
Expand Down Expand Up @@ -51,7 +51,7 @@ class KafkaFlightSinkFactorySpec
s"http://localhost:${embeddedKafkaConfig.schemaRegistryPort}"
)

val factory = new KafkaSinkFactory[MessageJson, Key, Flight.Value](
val factory = new KafkaSinkFactory[FlightMessageJson, Key, Flight.Value](
outputTopic,
producerSettings
)
Expand All @@ -76,6 +76,6 @@ object KafkaFlightSinkFactorySpec {
final case class Resource(
embeddedKafkaConfig: EmbeddedKafkaConfig,
keySerde: Serde[Key],
factory: KafkaSinkFactory[MessageJson, Key, Flight.Value]
factory: KafkaSinkFactory[FlightMessageJson, Key, Flight.Value]
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ApiProviderFlowSpec

"flow method" should {
"recover http request failure" in {
val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader[Nothing])
val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader[String])
whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result =>
result.head.left.value.errorSource shouldBe "invalid-url"
}
Expand Down Expand Up @@ -106,15 +106,15 @@ class ApiProviderFlowSpec
}
"create an ErrorMessageJson with the field failedJson equals to the response body" when {
"the provider is aviation-edge" in {
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader[Nothing])
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader[String])
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.failedJson shouldBe ErrorResponse
}
}
"the provider is open-sky" in {
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat[Nothing])
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat[String])
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
Expand All @@ -124,15 +124,15 @@ class ApiProviderFlowSpec
}
"create an ErrorMessageJson if at least one of the fields of the response is incorrect" when {
"the provider is aviation-edge" in {
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader[Nothing])
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader[String])
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.errorSource shouldBe Path
}
}
"the provider is open-sky" in {
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat[Nothing])
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat[String])
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
Expand Down