diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala index 888601b..121cb58 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala @@ -3,6 +3,7 @@ package it.bitrock.dvs.producer.aviationedge import akka.actor.ActorSystem import com.typesafe.scalalogging.LazyLogging import it.bitrock.dvs.producer.aviationedge.model._ +import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._ import it.bitrock.dvs.producer.aviationedge.services.MainFunctions._ import it.bitrock.dvs.producer.aviationedge.services.context.AviationStreamContext._ import it.bitrock.dvs.producer.aviationedge.services.context.OpenSkyStreamContext._ diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/kafka/ToValuePair.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/kafka/ToValuePair.scala index 8e78b54..6b5eb19 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/kafka/ToValuePair.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/kafka/ToValuePair.scala @@ -19,8 +19,6 @@ trait ToValuePair[J, K, V] { @SuppressWarnings(Array("scalafix:DisableSyntax.null")) object ToValuePair { - implicit def messageJsonValuePair[A]: ToValuePair[A, Key, Flight.Value] = - j => (j.asInstanceOf[FlightMessageJson].flight.icaoNumber, j.asInstanceOf[FlightMessageJson].toFlightRaw) implicit val flightValuePair: ToValuePair[FlightMessageJson, Key, Flight.Value] = j => (j.flight.icaoNumber, j.toFlightRaw) implicit val airplaneValuePair: ToValuePair[AirplaneMessageJson, Key, Airplane.Value] = j => (j.numberRegistration, j.toAirplaneRaw) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala index e01ace8..99dea3d 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/Graphs.scala @@ -14,7 +14,7 @@ object Graphs { jsonSource: Source[Either[ErrorMessageJson, Message], SourceMat], rawSink: Sink[Message, SinkMat], errorSink: Sink[ErrorMessageJson, SinkMat], - invalidFlightSink: Sink[Message, SinkMat] + invalidFlightSink: Sink[FlightMessageJson, SinkMat] ): RunnableGraph[(SourceMat, SinkMat, SinkMat, SinkMat)] = RunnableGraph.fromGraph( GraphDSL.create(jsonSource, rawSink, errorSink, invalidFlightSink)((a, b, c, d) => (a, b, c, d)) { implicit builder => (source, raw, error, invalidFlight) => @@ -24,12 +24,19 @@ object Graphs { val collectErrors = builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Left(x) => x }) val collectRaws = builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Right(x) => x }.filter(filterFunction)) - val collectInvalid = builder.add(Flow[Either[ErrorMessageJson, Message]].collect { case Right(x) => x }) + val collectInvalidFlight = builder.add( + Flow + .fromFunction[Either[ErrorMessageJson, Message], Option[FlightMessageJson]] { + case Right(x: FlightMessageJson) => Some(x) + case _ => None + } + .collect { case Some(x) => x } + ) source ~> partition partition.out(ErrorPort) ~> collectErrors ~> error partition.out(RawPort) ~> collectRaws ~> raw - partition.out(InvalidPort) ~> collectInvalid ~> invalidFlight + partition.out(InvalidPort) ~> collectInvalidFlight ~> invalidFlight ClosedShape } diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala index 3d30ee3..bf2c052 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala @@ -26,52 +26,55 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply) implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply) - def aviationEdgePayloadJsonReader[A]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = { - case jsArray: JsArray => jsArrayToResponsePayload(jsArray) + implicit val flightStatesJsonReader: JsonReader[FlightStateJson] = new JsonReader[FlightStateJson] { + override def read(json: JsValue): FlightStateJson = + json match { + case jsArray: JsArray => + Try( + FlightStateJson( + callsign = jsArray.elements(1).convertTo[String].trim.toUpperCase, + time_position = jsArray.elements(3).convertTo[Long], + longitude = jsArray.elements(5).convertTo[Double], + latitude = jsArray.elements(6).convertTo[Double], + velocity = jsArray.elements(9).convertTo[Double], + true_track = jsArray.elements(10).convertTo[Double], + geo_altitude = jsArray.elements(13).convertTo[Double] + ) + ).getOrElse(deserializationError("Invalid JsArray for FlightStateJson, got " + jsArray)) + case jsValue => deserializationError("Expected FlightStateJson as JsArray, but got " + jsValue) + } + } + + def aviationEdgePayloadJsonReader[A: JsonReader]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = { + case jsArray: JsArray => jsArrayToResponsePayload[A](jsArray) case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } - def openSkyResponsePayloadJsonFormat[A]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = { - case jsObject: JsObject => jsObjectToResponsePayload(jsObject) + def openSkyResponsePayloadJsonFormat[A: JsonReader]: RootJsonReader[List[Either[ErrorMessageJson, A]]] = { + case jsObject: JsObject => jsObjectToResponsePayload[A](jsObject) case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } implicit def unmarshallerFrom[A](rf: RootJsonReader[A]): Unmarshaller[String, A] = _fromStringUnmarshallerFromByteStringUnmarshaller(sprayJsonByteStringUnmarshaller(rf)) - private def jsObjectToResponsePayload[A <: FlightStateJson](json: JsObject): List[Either[ErrorMessageJson, A]] = + private def jsObjectToResponsePayload[A: JsonReader](json: JsObject): List[Either[ErrorMessageJson, A]] = Try(json.convertTo[FlightStatesJson]) match { case Failure(ex) => List(Left(ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now))) case Success(flightStates) => flightStates.states.map { state => Try( - FlightStateJson( - callsign = state(1).convertTo[String].trim.toUpperCase, - time_position = state(3).convertTo[Long], - longitude = state(5).convertTo[Double], - latitude = state(6).convertTo[Double], - velocity = state(9).convertTo[Double], - true_track = state(10).convertTo[Double], - geo_altitude = state(13).convertTo[Double] - ).asInstanceOf[A] + JsArray(state.toVector).convertTo[A] ).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now)) } } - private def jsArrayToResponsePayload[A <: MessageJson](json: JsArray): List[Either[ErrorMessageJson, A]] = + private def jsArrayToResponsePayload[A: JsonReader](json: JsArray): List[Either[ErrorMessageJson, A]] = json .asInstanceOf[JsArray] .elements .map(json => - Try( - json.asJsObject match { - case j: JsObject if j.getFields("flight") != Seq() => json.convertTo[FlightMessageJson].asInstanceOf[A] - case j: JsObject if j.getFields("airplaneId") != Seq() => json.convertTo[AirplaneMessageJson].asInstanceOf[A] - case j: JsObject if j.getFields("airportId") != Seq() => json.convertTo[AirportMessageJson].asInstanceOf[A] - case j: JsObject if j.getFields("airlineId") != Seq() => json.convertTo[AirlineMessageJson].asInstanceOf[A] - case j: JsObject if j.getFields("cityId") != Seq() => json.convertTo[CityMessageJson].asInstanceOf[A] - } - ).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now)) + Try(json.convertTo[A]).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now)) ) .toList } diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala index fc60aba..ef73d6e 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala @@ -12,6 +12,7 @@ import it.bitrock.dvs.producer.aviationedge.services.context.{ AviationStreamContext, OpenSkyStreamContext } +import spray.json.JsonReader import scala.concurrent.{ExecutionContext, Future} @@ -29,7 +30,7 @@ object MainFunctions { Http().bindAndHandle(routes.routes, host, port) } - def runAviationEdgeStream[A: ApiProviderStreamContext]()( + def runAviationEdgeStream[A: ApiProviderStreamContext: JsonReader]()( implicit system: ActorSystem, ec: ExecutionContext ): (Cancellable, Future[Done], Future[Done], Future[Done]) = { @@ -43,7 +44,7 @@ object MainFunctions { ) val rawSink = AviationStreamContext[A].sink(kafkaConfig) val errorSink = SideStreamContext.errorSink(kafkaConfig) - val invalidFlightSink = SideStreamContext.invalidSink(kafkaConfig) + val invalidFlightSink = SideStreamContext.invalidFlightSink(kafkaConfig) val monitoringSink = SideStreamContext.monitoringSink(kafkaConfig) val jsonSource = tickSource.via(aviationFlow).via(monitoringGraph(monitoringSink)).mapConcat(identity) @@ -51,7 +52,7 @@ object MainFunctions { mainGraph(jsonSource, rawSink, errorSink, invalidFlightSink).run() } - def runOpenSkyStream[A: ApiProviderStreamContext]()( + def runOpenSkyStream[A: ApiProviderStreamContext: JsonReader]()( implicit system: ActorSystem, ec: ExecutionContext ): (Cancellable, Future[Done]) = { diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/SideStreamContext.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/SideStreamContext.scala index 4f1a5d8..4cba022 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/SideStreamContext.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/SideStreamContext.scala @@ -6,7 +6,7 @@ import akka.stream.scaladsl.Sink import it.bitrock.dvs.producer.aviationedge.config.KafkaConfig import it.bitrock.dvs.producer.aviationedge.kafka.KafkaTypes.{Error, Flight, Key, Monitoring} import it.bitrock.dvs.producer.aviationedge.kafka.{KafkaSinkFactory, ProducerSettingsFactory} -import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MonitoringMessageJson} +import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, FlightMessageJson, MonitoringMessageJson} import scala.concurrent.Future @@ -21,8 +21,8 @@ object SideStreamContext { new KafkaSinkFactory[MonitoringMessageJson, Key, Monitoring.Value](kafkaConfig.monitoringTopic, producerSettings).sink } - def invalidSink[A](kafkaConfig: KafkaConfig)(implicit system: ActorSystem): Sink[A, Future[Done]] = { + def invalidFlightSink(kafkaConfig: KafkaConfig)(implicit system: ActorSystem): Sink[FlightMessageJson, Future[Done]] = { val producerSettings = ProducerSettingsFactory.from[Flight.Value](kafkaConfig) - new KafkaSinkFactory[A, Key, Flight.Value](kafkaConfig.invalidFlightRawTopic, producerSettings).sink + new KafkaSinkFactory[FlightMessageJson, Key, Flight.Value](kafkaConfig.invalidFlightRawTopic, producerSettings).sink } } diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/kafka/KafkaFlightSinkFactorySpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/kafka/KafkaFlightSinkFactorySpec.scala index 082c931..2a716db 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/kafka/KafkaFlightSinkFactorySpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/kafka/KafkaFlightSinkFactorySpec.scala @@ -7,7 +7,7 @@ import akka.testkit.TestKit import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig import it.bitrock.dvs.producer.aviationedge.TestValues import it.bitrock.dvs.producer.aviationedge.kafka.KafkaTypes.{Flight, Key} -import it.bitrock.dvs.producer.aviationedge.model.MessageJson +import it.bitrock.dvs.producer.aviationedge.model.FlightMessageJson import it.bitrock.kafkacommons.serialization.ImplicitConversions._ import it.bitrock.testcommons.{FixtureLoanerAnyResult, Suite} import net.manub.embeddedkafka.schemaregistry._ @@ -51,7 +51,7 @@ class KafkaFlightSinkFactorySpec s"http://localhost:${embeddedKafkaConfig.schemaRegistryPort}" ) - val factory = new KafkaSinkFactory[MessageJson, Key, Flight.Value]( + val factory = new KafkaSinkFactory[FlightMessageJson, Key, Flight.Value]( outputTopic, producerSettings ) @@ -76,6 +76,6 @@ object KafkaFlightSinkFactorySpec { final case class Resource( embeddedKafkaConfig: EmbeddedKafkaConfig, keySerde: Serde[Key], - factory: KafkaSinkFactory[MessageJson, Key, Flight.Value] + factory: KafkaSinkFactory[FlightMessageJson, Key, Flight.Value] ) } diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala index 8a85ab8..0613241 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala @@ -27,7 +27,7 @@ class ApiProviderFlowSpec "flow method" should { "recover http request failure" in { - val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader[Nothing]) + val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader[String]) whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result => result.head.left.value.errorSource shouldBe "invalid-url" } @@ -106,7 +106,7 @@ class ApiProviderFlowSpec } "create an ErrorMessageJson with the field failedJson equals to the response body" when { "the provider is aviation-edge" in { - val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader[Nothing]) + val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader[String]) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true @@ -114,7 +114,7 @@ class ApiProviderFlowSpec } } "the provider is open-sky" in { - val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat[Nothing]) + val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat[String]) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true @@ -124,7 +124,7 @@ class ApiProviderFlowSpec } "create an ErrorMessageJson if at least one of the fields of the response is incorrect" when { "the provider is aviation-edge" in { - val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader[Nothing]) + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader[String]) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true @@ -132,7 +132,7 @@ class ApiProviderFlowSpec } } "the provider is open-sky" in { - val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat[Nothing]) + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat[String]) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true