Skip to content

Commit

Permalink
Add recovery for http request failure + Stop application in case of s… (
Browse files Browse the repository at this point in the history
  • Loading branch information
simoexpo authored and anisini committed Feb 7, 2020
1 parent 0e73170 commit 24fedb5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
18 changes: 12 additions & 6 deletions src/main/scala/it/bitrock/dvs/producer/aviationedge/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import it.bitrock.dvs.producer.aviationedge.model._
import it.bitrock.dvs.producer.aviationedge.services.MainFunctions._

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.{Await, ExecutionContext, Future}

object Main extends App with LazyLogging {

Expand All @@ -21,11 +21,17 @@ object Main extends App with LazyLogging {
logger.info(s"Exposing to ${serverBinding.localAddress}")
}

val (cancellableFlight, _, _) = runStream[FlightStream.type]()
val (cancellableAirplane, _, _) = runStream[AirplaneStream.type]()
val (cancellableAirport, _, _) = runStream[AirportStream.type]()
val (cancellableAirline, _, _) = runStream[AirlineStream.type]()
val (cancellableCity, _, _) = runStream[CityStream.type]()
val (cancellableFlight, flightCompletion, _) = runStream[FlightStream.type]()
val (cancellableAirplane, airplaneCompletion, _) = runStream[AirplaneStream.type]()
val (cancellableAirport, airportCompletion, _) = runStream[AirportStream.type]()
val (cancellableAirline, airlineCompletion, _) = runStream[AirlineStream.type]()
val (cancellableCity, cityCompletion, _) = runStream[CityStream.type]()

val streamsCompletion = List(flightCompletion, airplaneCompletion, airportCompletion, airlineCompletion, cityCompletion)
Future.firstCompletedOf(streamsCompletion).foreach { _ =>
logger.error("An unexpected error caused a stream completion. Terminating the application...")
sys.exit(1)
}

sys.addShutdownHook {
logger.info("Shutting down")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import it.bitrock.dvs.producer.aviationedge.services.JsonSupport._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging {

Expand All @@ -26,6 +27,10 @@ class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends
.singleRequest(HttpRequest(HttpMethods.GET, uri))
.flatMap(response => extractBody(response.entity, response.status, apiTimeout))
.flatMap(body => unmarshalBody(body, uri.path.toString))
.recover {
case NonFatal(ex) =>
List(Left(ErrorMessageJson(uri.path.toString, ex.getMessage, "", Instant.now)))
}
}

def extractBody(entity: ResponseEntity, status: StatusCode, timeout: Int): Future[String] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
package it.bitrock.dvs.producer.aviationedge.services

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes}
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes, Uri}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.testkit.TestKit
import it.bitrock.dvs.producer.aviationedge.TestValues
import it.bitrock.dvs.producer.aviationedge.model._
import it.bitrock.testcommons.Suite
import org.scalatest.EitherValues
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class AviationFlowSpec
extends TestKit(ActorSystem("AviationFlowSpec"))
with Suite
with AnyWordSpecLike
with TestValues
with EitherValues
with ScalaFutures {
with ScalaFutures
with IntegrationPatience {

private val aviationFlow = new AviationFlow()

"flow method" should {
"recover http request failure" in {
val flow = aviationFlow.flow(Uri("invalid-url"), 1)
whenReady(Source.tick(0.seconds, 1.second, Tick()).via(flow).take(1).toMat(Sink.head)(Keep.right).run()) { result =>
result.head.left.value.errorSource shouldBe "invalid-url"
}
}
}

"extract method" should {
"return the body for any correct response" in {
val response = HttpResponse(status = StatusCodes.OK, entity = HttpEntity(Content))
Expand Down

0 comments on commit 24fedb5

Please sign in to comment.