Skip to content

Commit

Permalink
Merge pull request #71 from bitrockteam/hotfix/icao-match
Browse files Browse the repository at this point in the history
small improvement on the open sky api
  • Loading branch information
massimosiani authored Feb 26, 2020
2 parents db0ed0a + f3c405f commit a59593e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.scaladsl.Flow
import com.typesafe.scalalogging.LazyLogging
import it.bitrock.dvs.producer.aviationedge.model.{ErrorMessageJson, MessageJson, Tick}
import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class ApiProviderFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging {
def flow(uri: Uri, apiTimeout: Int): Flow[Tick, List[Either[ErrorMessageJson, MessageJson]], NotUsed] =
def flow(uri: Uri, apiTimeout: Int)(
implicit um: Unmarshaller[String, List[Either[ErrorMessageJson, MessageJson]]]
): Flow[Tick, List[Either[ErrorMessageJson, MessageJson]], NotUsed] =
Flow
.fromFunction(identity[Tick])
.mapAsync(1) { _ =>
Expand All @@ -39,7 +40,9 @@ class ApiProviderFlow()(implicit system: ActorSystem, ec: ExecutionContext) exte
entity.toStrict(timeout.seconds).map(_.data.utf8String)
}

def unmarshalBody(apiResponseBody: String, path: String): Future[List[Either[ErrorMessageJson, MessageJson]]] =
def unmarshalBody(apiResponseBody: String, path: String)(
implicit um: Unmarshaller[String, List[Either[ErrorMessageJson, MessageJson]]]
): Future[List[Either[ErrorMessageJson, MessageJson]]] =
Unmarshal(apiResponseBody)
.to[List[Either[ErrorMessageJson, MessageJson]]]
.map(list => addPathToLeft(list, path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package it.bitrock.dvs.producer.aviationedge.services
import java.time.Instant

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers._fromStringUnmarshallerFromByteStringUnmarshaller
import akka.http.scaladsl.unmarshalling.Unmarshaller
import it.bitrock.dvs.producer.aviationedge.model._
import spray.json._

Expand All @@ -24,16 +26,18 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val cityMessageJsonFormat: RootJsonFormat[CityMessageJson] = jsonFormat6(CityMessageJson.apply)
implicit val flightStatesJsonFormat: RootJsonFormat[FlightStatesJson] = jsonFormat2(FlightStatesJson.apply)

implicit val responsePayloadJsonFormat: RootJsonFormat[List[Either[ErrorMessageJson, MessageJson]]] =
new RootJsonFormat[List[Either[ErrorMessageJson, MessageJson]]] {
def write(obj: List[Either[ErrorMessageJson, MessageJson]]): JsValue = JsNull
def read(json: JsValue): List[Either[ErrorMessageJson, MessageJson]] =
json match {
case jsObject: JsObject => jsObjectToResponsePayload(jsObject)
case jsArray: JsArray => jsArrayToResponsePayload(jsArray)
case _ => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now)))
}
}
val aviationEdgePayloadJsonReader: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = {
case jsArray: JsArray => jsArrayToResponsePayload(jsArray)
case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now)))
}

val openSkyResponsePayloadJsonFormat: RootJsonReader[List[Either[ErrorMessageJson, MessageJson]]] = {
case jsObject: JsObject => jsObjectToResponsePayload(jsObject)
case json => List(Left(ErrorMessageJson("", "", json.compactPrint, Instant.now)))
}

implicit def unmarshallerFrom[A](rf: RootJsonReader[A]): Unmarshaller[String, A] =
_fromStringUnmarshallerFromByteStringUnmarshaller(sprayJsonByteStringUnmarshaller(rf))

private def jsObjectToResponsePayload(json: JsObject): List[Either[ErrorMessageJson, FlightStateJson]] =
Try(json.convertTo[FlightStatesJson]) match {
Expand All @@ -42,13 +46,13 @@ object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
flightStates.states.map { state =>
Try(
FlightStateJson(
state(1).convertTo[String],
state(3).convertTo[Long],
state(5).convertTo[Double],
state(6).convertTo[Double],
state(9).convertTo[Double],
state(10).convertTo[Double],
state(13).convertTo[Double]
callsign = state(1).convertTo[String].trim.toUpperCase,
time_position = state(3).convertTo[Long],
longitude = state(5).convertTo[Double],
latitude = state(6).convertTo[Double],
velocity = state(9).convertTo[Double],
true_track = state(10).convertTo[Double],
geo_altitude = state(13).convertTo[Double]
)
).toEither.left.map(ex => ErrorMessageJson("", ex.getMessage, json.compactPrint, Instant.now))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import it.bitrock.dvs.producer.aviationedge.config.{
}
import it.bitrock.dvs.producer.aviationedge.routes.Routes
import it.bitrock.dvs.producer.aviationedge.services.Graphs._
import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._
import it.bitrock.dvs.producer.aviationedge.services.context.{
ApiProviderStreamContext,
AviationStreamContext,
Expand Down Expand Up @@ -41,8 +42,11 @@ object MainFunctions {
): (Cancellable, Future[Done], Future[Done], Future[Done]) = {
val config = AviationStreamContext[A].config(apiProviderConfig)

val tickSource = new TickSource(config.pollingStart, config.pollingInterval, aviationConfig.tickSource).source
val aviationFlow = new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)
val tickSource = new TickSource(config.pollingStart, config.pollingInterval, aviationConfig.tickSource).source
val aviationFlow =
new ApiProviderFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)(
aviationEdgePayloadJsonReader
)
val rawSink = AviationStreamContext[A].sink(kafkaConfig)
val errorSink = SideStreamContext.errorSink(kafkaConfig)
val invalidFlightSink = SideStreamContext.invalidSink(kafkaConfig)
Expand All @@ -59,9 +63,11 @@ object MainFunctions {
): (Cancellable, Future[Done]) = {
val config = OpenSkyStreamContext[A].config(apiProviderConfig)

val tickSource = new TickSource(config.pollingStart, config.pollingInterval, openSkyConfig.tickSource).source
val openSkyFlow = new ApiProviderFlow().flow(openSkyConfig.getOpenSkyUri(config.path), openSkyConfig.apiTimeout)
val rawSink = AviationStreamContext[A].sink(kafkaConfig)
val tickSource = new TickSource(config.pollingStart, config.pollingInterval, openSkyConfig.tickSource).source
val openSkyFlow = new ApiProviderFlow().flow(openSkyConfig.getOpenSkyUri(config.path), openSkyConfig.apiTimeout)(
openSkyResponsePayloadJsonFormat
)
val rawSink = AviationStreamContext[A].sink(kafkaConfig)

val jsonSource = tickSource.via(openSkyFlow).mapConcat(identity)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.testkit.TestKit
import it.bitrock.dvs.producer.aviationedge.TestValues
import it.bitrock.dvs.producer.aviationedge.model._
import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._
import it.bitrock.testcommons.Suite
import org.scalatest.EitherValues
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
Expand All @@ -26,7 +27,7 @@ class ApiProviderFlowSpec

"flow method" should {
"recover http request failure" in {
val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)
val flow = apiProviderFlow.flow(Uri("invalid-url"), 1)(aviationEdgePayloadJsonReader)
whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result =>
result.head.left.value.errorSource shouldBe "invalid-url"
}
Expand All @@ -48,67 +49,88 @@ class ApiProviderFlowSpec

"unmarshal method" should {
"parse a flight JSON message into FlightMessageJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path)
val futureResult = apiProviderFlow.unmarshalBody(readFixture("flight"), Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[FlightMessageJson]
}
}
"parse a airplane JSON message into AirplaneMessageJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path)
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airplaneDatabase"), Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[AirplaneMessageJson]
}
}
"parse a airport JSON message into AirportMessageJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path)
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airportDatabase"), Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[AirportMessageJson]
}
}
"parse a airline JSON message into AirlineMessageJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path)
val futureResult = apiProviderFlow.unmarshalBody(readFixture("airlineDatabase"), Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[AirlineMessageJson]
}
}
"parse a city JSON message into CityMessageJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path)
val futureResult = apiProviderFlow.unmarshalBody(readFixture("cityDatabase"), Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[CityMessageJson]
}
}
"parse a flight states JSON message into FlightStateJson" in {
val futureResult = apiProviderFlow.unmarshalBody(readFixture("flightStatesDatabase"), Path)
val futureResult =
apiProviderFlow.unmarshalBody(readFixture("flightStatesDatabase"), Path)(openSkyResponsePayloadJsonFormat)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isRight shouldBe true
result.head.right.value shouldBe a[FlightStateJson]
}
}
"create an ErrorMessageJson with the field failedJson equals to the response body" in {
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.failedJson shouldBe ErrorResponse
"create an ErrorMessageJson with the field failedJson equals to the response body" when {
"the provider is aviation-edge" in {
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.failedJson shouldBe ErrorResponse
}
}
"the provider is open-sky" in {
val futureResult = apiProviderFlow.unmarshalBody(ErrorResponse, Path)(openSkyResponsePayloadJsonFormat)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.failedJson shouldBe ErrorResponse
}
}
}
"create an ErrorMessageJson if at least one of the fields of the response is incorrect" in {
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.errorSource shouldBe Path
"create an ErrorMessageJson if at least one of the fields of the response is incorrect" when {
"the provider is aviation-edge" in {
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(aviationEdgePayloadJsonReader)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.errorSource shouldBe Path
}
}
"the provider is open-sky" in {
val futureResult = apiProviderFlow.unmarshalBody(IncorrectJsonAirline, Path)(openSkyResponsePayloadJsonFormat)
whenReady(futureResult) { result =>
result.size shouldBe 1
result.head.isLeft shouldBe true
result.head.left.value.errorSource shouldBe Path
}
}
}
}
Expand Down

0 comments on commit a59593e

Please sign in to comment.