From 3f065010bc451e019d9d97876cc731fb825e5d43 Mon Sep 17 00:00:00 2001 From: Massimo Siani Date: Wed, 26 Feb 2020 13:06:42 +0100 Subject: [PATCH 1/3] small improvement on the open sky api --- .../services/ApiProviderFlow.scala | 11 +++-- .../aviationedge/services/JsonSupport.scala | 40 +++++++++++-------- .../aviationedge/services/MainFunctions.scala | 14 ++++--- .../services/ApiProviderFlowSpec.scala | 40 ++++++++++++------- 4 files changed, 65 insertions(+), 40 deletions(-) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlow.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlow.scala index 776187e..7f3f0e2 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlow.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlow.scala @@ -6,18 +6,19 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.scaladsl.Flow import com.typesafe.scalalogging.LazyLogging import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MessageJson, Tick} -import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal class ApiProviderFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging { - def flow(uri: Uri, apiTimeout: Int): Flow[Tick, List[Either[ErrorMessageJson, MessageJson]], NotUsed] = + def flow(uri: Uri, apiTimeout: Int)( + implicit um: Unmarshaller[String, List[Either[ErrorMessageJson, MessageJson]]] + ): Flow[Tick, List[Either[ErrorMessageJson, MessageJson]], NotUsed] = Flow .fromFunction(identity[Tick]) .mapAsync(1) { _ => @@ -39,7 +40,9 @@ class ApiProviderFlow()(implicit system: ActorSystem, ec: ExecutionContext) exte entity.toStrict(timeout.seconds).map(_.data.utf8String) } - def unmarshalBody(apiResponseBody: String, path: String): Future[List[Either[ErrorMessageJson, MessageJson]]] = + def unmarshalBody(apiResponseBody: String, path: String)( + implicit um: Unmarshaller[String, List[Either[ErrorMessageJson, MessageJson]]] + ): Future[List[Either[ErrorMessageJson, MessageJson]]] = Unmarshal(apiResponseBody) .to[List[Either[ErrorMessageJson, MessageJson]]] .map(list => addPathToLeft(list, path)) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala index 94830a9..ecd8ab7 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala @@ -3,6 +3,8 @@ package it.bitrock.dvs.producer.aviationedge.services import java.time.Instant import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers._fromStringUnmarshallerFromByteStringUnmarshaller +import akka.http.scaladsl.unmarshalling.Unmarshaller import it.bitrock.dvs.producer.aviationedge.model._ import spray.json._ @@ -24,16 +26,20 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply) implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply) - implicit val responsePayloadJsonFormat: RootJsonFormat[List[Either[ErrorMessageJson, MessageJson]]] = - new RootJsonFormat[List[Either[ErrorMessageJson, MessageJson]]] { - def write(obj: List[Either[ErrorMessageJson, MessageJson]]): JsValue = JsNull - def read(json: JsValue): List[Either[ErrorMessageJson, MessageJson]] = - json match { - case jsObject: JsObject => jsObjectToResponsePayload(jsObject) - case jsArray: JsArray => jsArrayToResponsePayload(jsArray) - case _ => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) - } - } + implicit val responsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { + case jsArray: JsArray => jsArrayToResponsePayload(jsArray) + case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) + } + + implicit val openSkyResponsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { + case jsObject: JsObject => jsObjectToResponsePayload(jsObject) + case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) + } + + implicit def unmarshallerFrom[A]( + rf: RootJsonReader[A] + ): Unmarshaller[String, A] = + _fromStringUnmarshallerFromByteStringUnmarshaller(sprayJsonByteStringUnmarshaller(rf)) private def jsObjectToResponsePayload(json: JsObject): List[Either[ErrorMessageJson, FlightStateJson]] = Try(json.convertTo[FlightStatesJson]) match { @@ -42,13 +48,13 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { flightStates.states.map { state => Try( FlightStateJson( - state(1).convertTo[String], - state(3).convertTo[Long], - state(5).convertTo[Double], - state(6).convertTo[Double], - state(9).convertTo[Double], - state(10).convertTo[Double], - state(13).convertTo[Double] + callsign = state(1).convertTo[String].trim.toUpperCase, + time_position = state(3).convertTo[Long], + longitude = state(5).convertTo[Double], + latitude = state(6).convertTo[Double], + velocity = state(9).convertTo[Double], + true_track = state(10).convertTo[Double], + geo_altitude = state(13).convertTo[Double] ) ).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now)) } diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala index a191bc2..340ae3b 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala @@ -13,6 +13,7 @@ import it.bitrock.dvs.producer.aviationedge.config.{ } import it.bitrock.dvs.producer.aviationedge.routes.Routes import it.bitrock.dvs.producer.aviationedge.services.Graphs._ +import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._ import it.bitrock.dvs.producer.aviationedge.services.context.{ ApiProviderStreamContext, AviationStreamContext, @@ -41,8 +42,9 @@ object MainFunctions { ): (Cancellable, Future[Done], Future[Done], Future[Done]) = { val config = AviationStreamContext[A].config(apiProviderConfig) - val tickSource = new TickSource(config.pollingStart, config.pollingInterval, aviationConfig.tickSource).source - val aviationFlow = new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout) + val tickSource = new TickSource(config.pollingStart, config.pollingInterval, aviationConfig.tickSource).source + val aviationFlow = + new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)(responsePayloadJsonFormat) val rawSink = AviationStreamContext[A].sink(kafkaConfig) val errorSink = SideStreamContext.errorSink(kafkaConfig) val invalidFlightSink = SideStreamContext.invalidSink(kafkaConfig) @@ -59,9 +61,11 @@ object MainFunctions { ): (Cancellable, Future[Done]) = { val config = OpenSkyStreamContext[A].config(apiProviderConfig) - val tickSource = new TickSource(config.pollingStart, config.pollingInterval, openSkyConfig.tickSource).source - val openSkyFlow = new ApiProviderFlow().flow(openSkyConfig.getOpenSkyUri(config.path), openSkyConfig.apiTimeout) - val rawSink = AviationStreamContext[A].sink(kafkaConfig) + val tickSource = new TickSource(config.pollingStart, config.pollingInterval, openSkyConfig.tickSource).source + val openSkyFlow = new ApiProviderFlow().flow(openSkyConfig.getOpenSkyUri(config.path), openSkyConfig.apiTimeout)( + openSkyResponsePayloadJsonFormat + ) + val rawSink = AviationStreamContext[A].sink(kafkaConfig) val jsonSource = tickSource.via(openSkyFlow).mapConcat(identity) diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala index 8ddec42..4761c8b 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala @@ -6,6 +6,7 @@ import akka.stream.scaladsl.{Keep, Sink, Source} import akka.testkit.TestKit import it.bitrock.dvs.producer.aviationedge.TestValues import it.bitrock.dvs.producer.aviationedge.model._ +import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._ import it.bitrock.testcommons.Suite import org.scalatest.EitherValues import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} @@ -26,7 +27,7 @@ class ApiProviderFlowSpec "flow method" should { "recover http request failure" in { - val flow = apiProviderFlow.flow(Uri("invalid-url"), 1) + val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(responsePayloadJsonFormat) whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result => result.head.left.value.errorSource shouldBe "invalid-url" } @@ -48,7 +49,7 @@ class ApiProviderFlowSpec "unmarshal method" should { "parse a flight JSON message into FlightMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -56,7 +57,7 @@ class ApiProviderFlowSpec } } "parse a airplane JSON message into AirplaneMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -64,7 +65,7 @@ class ApiProviderFlowSpec } } "parse a airport JSON message into AirportMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -72,7 +73,7 @@ class ApiProviderFlowSpec } } "parse a airline JSON message into AirlineMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -80,7 +81,7 @@ class ApiProviderFlowSpec } } "parse a city JSON message into CityMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -88,23 +89,34 @@ class ApiProviderFlowSpec } } "parse a flight states JSON message into FlightStateJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("flightStatesDatabase"), Path) + val futureResult = + apiProviderFlow.unmarshalBody(readFixture("flightStatesDatabase"), Path)(openSkyResponsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true result.head.right.value shouldBe a[FlightStateJson] } } - "create an ErrorMessageJson with the field failedJson equals to the response body" in { - val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path) - whenReady(futureResult) { result => - result.size shouldBe 1 - result.head.isLeft shouldBe true - result.head.left.value.failedJson shouldBe ErrorResponse + "create an ErrorMessageJson with the field failedJson equals to the response body" when { + "the provider is aviation-edge" in { + val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(responsePayloadJsonFormat) + whenReady(futureResult) { result => + result.size shouldBe 1 + result.head.isLeft shouldBe true + result.head.left.value.failedJson shouldBe ErrorResponse + } + } + "the provider is open-sky" in { + val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat) + whenReady(futureResult) { result => + result.size shouldBe 1 + result.head.isLeft shouldBe true + result.head.left.value.failedJson shouldBe ErrorResponse + } } } "create an ErrorMessageJson if at least one of the fields of the response is incorrect" in { - val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path) + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(responsePayloadJsonFormat) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true From 42185afc2811dd5f2210521ac7a647d5ed9a2637 Mon Sep 17 00:00:00 2001 From: Massimo Siani Date: Wed, 26 Feb 2020 13:23:38 +0100 Subject: [PATCH 2/3] rename to aviation edge --- .../aviationedge/services/JsonSupport.scala | 6 ++---- .../aviationedge/services/MainFunctions.scala | 4 +++- .../services/ApiProviderFlowSpec.scala | 16 ++++++++-------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala index ecd8ab7..0b90041 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala @@ -26,7 +26,7 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply) implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply) - implicit val responsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { + implicit val aviationEdgePayloadJsonReader: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { case jsArray: JsArray => jsArrayToResponsePayload(jsArray) case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } @@ -36,9 +36,7 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } - implicit def unmarshallerFrom[A]( - rf: RootJsonReader[A] - ): Unmarshaller[String, A] = + implicit def unmarshallerFrom[A](rf: RootJsonReader[A]): Unmarshaller[String, A] = _fromStringUnmarshallerFromByteStringUnmarshaller(sprayJsonByteStringUnmarshaller(rf)) private def jsObjectToResponsePayload(json: JsObject): List[Either[ErrorMessageJson, FlightStateJson]] = diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala index 340ae3b..1ce2355 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/MainFunctions.scala @@ -44,7 +44,9 @@ object MainFunctions { val tickSource = new TickSource(config.pollingStart, config.pollingInterval, aviationConfig.tickSource).source val aviationFlow = - new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)(responsePayloadJsonFormat) + new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)( + aviationEdgePayloadJsonReader + ) val rawSink = AviationStreamContext[A].sink(kafkaConfig) val errorSink = SideStreamContext.errorSink(kafkaConfig) val invalidFlightSink = SideStreamContext.invalidSink(kafkaConfig) diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala index 4761c8b..0597bae 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala @@ -27,7 +27,7 @@ class ApiProviderFlowSpec "flow method" should { "recover http request failure" in { - val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(responsePayloadJsonFormat) + val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader) whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result => result.head.left.value.errorSource shouldBe "invalid-url" } @@ -49,7 +49,7 @@ class ApiProviderFlowSpec "unmarshal method" should { "parse a flight JSON message into FlightMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -57,7 +57,7 @@ class ApiProviderFlowSpec } } "parse a airplane JSON message into AirplaneMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -65,7 +65,7 @@ class ApiProviderFlowSpec } } "parse a airport JSON message into AirportMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -73,7 +73,7 @@ class ApiProviderFlowSpec } } "parse a airline JSON message into AirlineMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -81,7 +81,7 @@ class ApiProviderFlowSpec } } "parse a city JSON message into CityMessageJson" in { - val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isRight shouldBe true @@ -99,7 +99,7 @@ class ApiProviderFlowSpec } "create an ErrorMessageJson with the field failedJson equals to the response body" when { "the provider is aviation-edge" in { - val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true @@ -116,7 +116,7 @@ class ApiProviderFlowSpec } } "create an ErrorMessageJson if at least one of the fields of the response is incorrect" in { - val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(responsePayloadJsonFormat) + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader) whenReady(futureResult) { result => result.size shouldBe 1 result.head.isLeft shouldBe true From f3c405fd6e19d50453f1630469332a407af38f3e Mon Sep 17 00:00:00 2001 From: Massimo Siani Date: Wed, 26 Feb 2020 15:19:50 +0100 Subject: [PATCH 3/3] add a test --- .../aviationedge/services/JsonSupport.scala | 4 ++-- .../services/ApiProviderFlowSpec.scala | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala index 0b90041..0ed8f98 100644 --- a/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala +++ b/src/main/scala/it/bitrock/dvs/producer/aviationedge/services/JsonSupport.scala @@ -26,12 +26,12 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply) implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply) - implicit val aviationEdgePayloadJsonReader: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { + val aviationEdgePayloadJsonReader: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { case jsArray: JsArray => jsArrayToResponsePayload(jsArray) case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } - implicit val openSkyResponsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { + val openSkyResponsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = { case jsObject: JsObject => jsObjectToResponsePayload(jsObject) case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now))) } diff --git a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala index 0597bae..91f2b7f 100644 --- a/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala +++ b/src/test/scala/it/bitrock/dvs/producer/aviationedge/services/ApiProviderFlowSpec.scala @@ -115,12 +115,22 @@ class ApiProviderFlowSpec } } } - "create an ErrorMessageJson if at least one of the fields of the response is incorrect" in { - val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader) - whenReady(futureResult) { result => - result.size shouldBe 1 - result.head.isLeft shouldBe true - result.head.left.value.errorSource shouldBe Path + "create an ErrorMessageJson if at least one of the fields of the response is incorrect" when { + "the provider is aviation-edge" in { + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader) + whenReady(futureResult) { result => + result.size shouldBe 1 + result.head.isLeft shouldBe true + result.head.left.value.errorSource shouldBe Path + } + } + "the provider is open-sky" in { + val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat) + whenReady(futureResult) { result => + result.size shouldBe 1 + result.head.isLeft shouldBe true + result.head.left.value.errorSource shouldBe Path + } } } }