Skip to content

Commit

Permalink
Add OpenSky flight state producer (#67)
Browse files Browse the repository at this point in the history
* Add OpenSky flight state producer

* scalafix <3

* Add producers base url default value

* Extract collectRightMessagesGraph in a separate function

* Start polling from open sky immediatly
  • Loading branch information
simoexpo authored Feb 25, 2020
1 parent 1575874 commit 70edf49
Show file tree
Hide file tree
Showing 25 changed files with 526 additions and 190 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
lazy val JakartaWsRs = "2.1.6"
lazy val Kafka = "2.4.0"
lazy val KafkaCommons = "0.0.8"
lazy val KafkaDVS = "1.0.15"
lazy val KafkaDVS = "1.0.18"
lazy val LogbackClassic = "1.2.3"
lazy val PureConfig = "0.12.2"
lazy val ScalaLogging = "3.9.2"
Expand Down
105 changes: 65 additions & 40 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,79 @@ kafka {
parser-error-topic = "parser_error"
invalid-flight-raw-topic = "flight_raw_invalid"
monitoring-topic = "monitoring_aviation_edge_producer_flight"
flight-open-sky-raw-topic = "flight_opensky_raw"
enable-interceptors = true
}

application-id = "kafka-dvs-aviation-edge-producer"

aviation {
host = ""
host = ${?AVIATION_EDGE.BASE_URL}
key = ""
key = ${?AVIATION_EDGE.KEY}
api-timeout = 20
api-timeout = ${?AVIATION_EDGE.TIMEOUT}
flight-speed-limit = 1200
flight-speed-limit = ${?AVIATION_EDGE.FLIGHT_SPEED_LIMIT}
api-provider {
aviation-edge {
host = "http://aviation-edge.com"
host = ${?AVIATION_EDGE.BASE_URL}
key = ""
key = ${?AVIATION_EDGE.KEY}
api-timeout = 20
api-timeout = ${?AVIATION_EDGE.TIMEOUT}
flight-speed-limit = 1200
flight-speed-limit = ${?AVIATION_EDGE.FLIGHT_SPEED_LIMIT}

tick-source {
poll-lower-hour-limit = 0
poll-lower-hour-limit = ${?AVIATION_EDGE.POLL_LOWER_HOUR_LIMIT_UTC}
poll-upper-hour-limit = 23
poll-upper-hour-limit = ${?AVIATION_EDGE.POLL_UPPER_HOUR_LIMIT_UTC}
poll-excluded-days = []
poll-excluded-days = ${?AVIATION_EDGE.POLL_EXCLUDED_DAYS}
}
tick-source {
poll-lower-hour-limit = 0
poll-lower-hour-limit = ${?AVIATION_EDGE.POLL_LOWER_HOUR_LIMIT_UTC}
poll-upper-hour-limit = 23
poll-upper-hour-limit = ${?AVIATION_EDGE.POLL_UPPER_HOUR_LIMIT_UTC}
poll-excluded-days = []
poll-excluded-days = ${?AVIATION_EDGE.POLL_EXCLUDED_DAYS}
}

flight-stream {
path = "/v2/public/flights"
polling-start = 60
polling-interval = 100
}
airplane-stream {
path = "/v2/public/airplaneDatabase"
polling-start = 30
polling-interval = 432000
}
airport-stream {
path = "/v2/public/airportDatabase"
polling-start = 30
polling-interval = 432000
}
airline-stream {
path = "/v2/public/airlineDatabase"
polling-start = 10
polling-interval = 432000
flight-stream {
path = "/v2/public/flights"
polling-start = 60
polling-interval = 100
}
airplane-stream {
path = "/v2/public/airplaneDatabase"
polling-start = 30
polling-interval = 432000
}
airport-stream {
path = "/v2/public/airportDatabase"
polling-start = 30
polling-interval = 432000
}
airline-stream {
path = "/v2/public/airlineDatabase"
polling-start = 10
polling-interval = 432000
}
city-stream {
path = "/v2/public/cityDatabase"
polling-start = 20
polling-interval = 432000
}
}
city-stream {
path = "/v2/public/cityDatabase"
polling-start = 20
polling-interval = 432000

open-sky {
host = "https://opensky-network.org/api"
host = ${?OPEN_SKY.BASE_URL}
api-timeout = 20
api-timeout = ${?OPEN_SKY.TIMEOUT}

tick-source {
poll-lower-hour-limit = 0
poll-lower-hour-limit = ${?OPEN_SKY.POLL_LOWER_HOUR_LIMIT_UTC}
poll-upper-hour-limit = 23
poll-upper-hour-limit = ${?OPEN_SKY.POLL_UPPER_HOUR_LIMIT_UTC}
poll-excluded-days = []
poll-excluded-days = ${?OPEN_SKY.POLL_EXCLUDED_DAYS}
}

flight-state-stream {
path = "/states/all"
polling-start = 0
polling-interval = 10
}
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import akka.actor.ActorSystem
import com.typesafe.scalalogging.LazyLogging
import it.bitrock.dvs.producer.aviationedge.model._
import it.bitrock.dvs.producer.aviationedge.services.MainFunctions._
import it.bitrock.dvs.producer.aviationedge.services.context.AviationStreamContext._
import it.bitrock.dvs.producer.aviationedge.services.context.OpenSkyStreamContext._

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand All @@ -18,13 +20,15 @@ object Main extends App with LazyLogging {

bindingFuture.map(serverBinding => logger.info(s"Exposing to ${serverBinding.localAddress}"))

val (cancellableFlight, flightCompletion, _, _) = runStream[FlightStream.type]()
val (cancellableAirplane, airplaneCompletion, _, _) = runStream[AirplaneStream.type]()
val (cancellableAirport, airportCompletion, _, _) = runStream[AirportStream.type]()
val (cancellableAirline, airlineCompletion, _, _) = runStream[AirlineStream.type]()
val (cancellableCity, cityCompletion, _, _) = runStream[CityStream.type]()
val (cancellableFlight, flightCompletion, _, _) = runAviationEdgeStream[FlightStream.type]()
val (cancellableAirplane, airplaneCompletion, _, _) = runAviationEdgeStream[AirplaneStream.type]()
val (cancellableAirport, airportCompletion, _, _) = runAviationEdgeStream[AirportStream.type]()
val (cancellableAirline, airlineCompletion, _, _) = runAviationEdgeStream[AirlineStream.type]()
val (cancellableCity, cityCompletion, _, _) = runAviationEdgeStream[CityStream.type]()
val (cancellableFlightState, flightStateCompletion) = runOpenSkyStream[FlightStateStream.type]()

val streamsCompletion = List(flightCompletion, airplaneCompletion, airportCompletion, airlineCompletion, cityCompletion)
val streamsCompletion =
List(flightCompletion, airplaneCompletion, airportCompletion, airlineCompletion, cityCompletion, flightStateCompletion)
Future.firstCompletedOf(streamsCompletion).foreach { _ =>
logger.error("An unexpected error caused a stream completion. Terminating the application...")
sys.exit(1)
Expand All @@ -37,6 +41,7 @@ object Main extends App with LazyLogging {
cancellableAirport.cancel()
cancellableAirline.cancel()
cancellableCity.cancel()
cancellableFlightState.cancel()
val resourcesClosed = for {
binding <- bindingFuture
_ <- binding.terminate(hardDeadline = 3.seconds)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package it.bitrock.dvs.producer.aviationedge.config

import java.net.URI
import java.time.DayOfWeek

import akka.http.scaladsl.model.Uri
import pureconfig.ConfigSource
import pureconfig.generic.auto._

final case class ApiProviderConfig(aviationEdge: AviationConfig, openSky: OpenSkyConfig)

final case class AviationConfig(
host: URI,
key: String,
apiTimeout: Int,
flightSpeedLimit: Int,
tickSource: TickSourceConfig,
flightStream: ApiProviderStreamConfig,
airplaneStream: ApiProviderStreamConfig,
airportStream: ApiProviderStreamConfig,
airlineStream: ApiProviderStreamConfig,
cityStream: ApiProviderStreamConfig
) {
def getAviationUri(path: String): String = {
val query = Uri.Query("key" -> key)
Uri(host.resolve(path).toString)
.withQuery(query)
.toString
}
}

object AviationConfig {
def load: AviationConfig = ConfigSource.default.at("aviation").loadOrThrow[AviationConfig]
}

final case class OpenSkyConfig(
host: URI,
apiTimeout: Int,
tickSource: TickSourceConfig,
flightStateStream: ApiProviderStreamConfig
) {
def getOpenSkyUri(path: String): String =
host.resolve(path).toString
}

object OpenSkyConfig {
def load: OpenSkyConfig = ConfigSource.default.at("opensky").loadOrThrow[OpenSkyConfig]
}

final case class ApiProviderStreamConfig(
path: String,
pollingStart: Int,
pollingInterval: Int
)

final case class TickSourceConfig(pollLowerHourLimit: Int, pollUpperHourLimit: Int, pollExcludedDays: List[DayOfWeek])
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import pureconfig.generic.auto._
final case class AppConfig(
kafka: KafkaConfig,
server: ServerConfig,
aviation: AviationConfig
apiProvider: ApiProviderConfig
)

object AppConfig {
val config: AppConfig = ConfigSource.default.loadOrThrow[AppConfig]

def server: ServerConfig = config.server
def aviation: AviationConfig = config.aviation
def kafka: KafkaConfig = config.kafka
def server: ServerConfig = config.server
def apiProvider: ApiProviderConfig = config.apiProvider
def kafka: KafkaConfig = config.kafka
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ final case class KafkaConfig(
parserErrorTopic: String,
invalidFlightRawTopic: String,
monitoringTopic: String,
flightOpenSkyRawTopic: String,
enableInterceptors: Boolean
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package it.bitrock.dvs.producer.aviationedge.kafka

import it.bitrock.dvs.model.avro.monitoring.FlightRequestComputationStatus
import it.bitrock.dvs.model.avro.{AirlineRaw, AirplaneRaw, AirportRaw, CityRaw, FlightRaw, ParserError}
import it.bitrock.dvs.model.avro.{AirlineRaw, AirplaneRaw, AirportRaw, CityRaw, FlightRaw, FlightStateRaw, ParserError}

object KafkaTypes {
type Key = String
Expand All @@ -27,4 +27,7 @@ object KafkaTypes {
object Monitoring {
type Value = FlightRequestComputationStatus
}
object FlightState {
type Value = FlightStateRaw
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import it.bitrock.dvs.producer.aviationedge.model.{
CityMessageJson,
ErrorMessageJson,
FlightMessageJson,
FlightStateJson,
MessageJson,
MonitoringMessageJson
}
Expand All @@ -32,4 +33,7 @@ object ToValuePair {
implicit val parserErrorValuePair: ToValuePair[ErrorMessageJson, Key, Error.Value] = j => (null, j.toParserError)
implicit val monitoringErrorValuePair: ToValuePair[MonitoringMessageJson, Key, Monitoring.Value] = j =>
(null, j.toFlightRequestMetrics)
implicit val flightStateValuePair: ToValuePair[MessageJson, Key, FlightState.Value] = j =>
(j.asInstanceOf[FlightStateJson].callsign, j.asInstanceOf[FlightStateJson].toFlightStateRaw)

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,19 @@ object RawImplicitConversions {
mrse.total
)
}

implicit class FlightStateStreamEventOps(flightStateJson: FlightStateJson) {
def toFlightStateRaw: FlightStateRaw =
FlightStateRaw(
flightStateJson.callsign,
Instant.ofEpochSecond(flightStateJson.time_position),
Geography(
flightStateJson.latitude,
flightStateJson.longitude,
flightStateJson.geo_altitude,
flightStateJson.true_track
),
flightStateJson.velocity
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package it.bitrock.dvs.producer.aviationedge.model

import java.time.Instant

sealed trait AviationStream
case object FlightStream extends AviationStream
case object AirplaneStream extends AviationStream
case object AirportStream extends AviationStream
case object AirlineStream extends AviationStream
case object CityStream extends AviationStream
sealed trait ApiProviderStream

sealed trait AviationStream extends ApiProviderStream
case object FlightStream extends AviationStream
case object AirplaneStream extends AviationStream
case object AirportStream extends AviationStream
case object AirlineStream extends AviationStream
case object CityStream extends AviationStream

sealed trait OpenSkyStream extends ApiProviderStream
case object FlightStateStream extends OpenSkyStream

case class Tick()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package it.bitrock.dvs.producer.aviationedge.model

import spray.json.JsValue

sealed trait MessageJson extends Product

final case class FlightMessageJson(
Expand Down Expand Up @@ -105,3 +107,15 @@ final case class FlightJson(
final case class SystemJson(
updated: Long
)

final case class FlightStatesJson(time: Long, states: List[List[JsValue]])

final case class FlightStateJson(
callsign: String,
time_position: Long,
longitude: Double,
latitude: Double,
velocity: Double,
true_track: Double,
geo_altitude: Double
) extends MessageJson
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging {
class ApiProviderFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging {
def flow(uri: Uri, apiTimeout: Int): Flow[Tick, List[Either[ErrorMessageJson, MessageJson]], NotUsed] =
Flow
.fromFunction(identity[Tick])
Expand Down
Loading

0 comments on commit 70edf49

Please sign in to comment.