Skip to content

Commit

Permalink
Bump Akka to 2.6, Alpakka to 2.0.0 and Confluent Kafka to 5.4.0 (#39)
Browse files Browse the repository at this point in the history
Co-authored-by: Massimo Siani <[email protected]>
  • Loading branch information
simoexpo and massimosiani committed Jan 16, 2020
1 parent 33a2511 commit 01381ef
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 19 deletions.
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ object Dependencies {
object Versions {

lazy val Scala = "2.12.10"
lazy val Akka = "2.5.22"
lazy val AkkaHttp = "10.1.8"
lazy val Alpakka = "1.0"
lazy val ConfluentPlatform = "5.3.2"
lazy val Akka = "2.6.1"
lazy val AkkaHttp = "10.1.11"
lazy val Alpakka = "2.0.0"
lazy val ConfluentPlatform = "5.4.0"
lazy val JakartaWsRs = "2.1.4"
lazy val Kafka = "2.3.0"
lazy val Kafka = "2.4.0"
lazy val KafkaCommons = "0.0.5"
lazy val KafkaDVS = "0.1.14"
lazy val LogbackClassic = "1.2.3"
Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/it/bitrock/dvs/producer/Main.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package it.bitrock.dvs.producer

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.scalalogging.LazyLogging
import it.bitrock.dvs.producer.model._
import it.bitrock.dvs.producer.services.MainFunctions._
Expand All @@ -11,9 +10,8 @@ import scala.concurrent.{Await, ExecutionContext}

object Main extends App with LazyLogging {

implicit val system: ActorSystem = ActorSystem("KafkaDVSProducer")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
implicit val system: ActorSystem = ActorSystem("KafkaDVSProducer")
implicit val ec: ExecutionContext = system.dispatcher

logger.info("Starting up")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import com.typesafe.scalalogging.LazyLogging
import it.bitrock.dvs.producer.model.{MessageJson, Tick}
Expand All @@ -16,7 +15,7 @@ import JsonSupport._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

class AviationFlow()(implicit system: ActorSystem, mat: ActorMaterializer, ec: ExecutionContext) extends LazyLogging {
class AviationFlow()(implicit system: ActorSystem, ec: ExecutionContext) extends LazyLogging {

def flow(uri: Uri, apiTimeout: Int): Flow[Tick, List[MessageJson], NotUsed] = flow { () =>
// Only for development purpose (2 hours shift: 4 -> 6 and 16 -> 18)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package it.bitrock.dvs.producer.services

import akka.actor.{ActorSystem, Cancellable}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import it.bitrock.dvs.producer.config.{AppConfig, AviationConfig, KafkaConfig, ServerConfig}
import it.bitrock.dvs.producer.model._
import it.bitrock.dvs.producer.routes.Routes
Expand All @@ -15,14 +14,14 @@ object MainFunctions {
val aviationConfig: AviationConfig = AppConfig.aviation
val kafkaConfig: KafkaConfig = AppConfig.kafka

def bindRoutes()(implicit system: ActorSystem, mat: ActorMaterializer): Future[Http.ServerBinding] = {
def bindRoutes()(implicit system: ActorSystem): Future[Http.ServerBinding] = {
val host = serverConfig.host
val port = serverConfig.port
val routes = new Routes(serverConfig)
Http().bindAndHandle(routes.routes, host, port)
}

def runStream[A: AviationStreamContext]()(implicit system: ActorSystem, mat: ActorMaterializer, ec: ExecutionContext): Cancellable = {
def runStream[A: AviationStreamContext]()(implicit system: ActorSystem, ec: ExecutionContext): Cancellable = {
val config = AviationStreamContext[A].config(aviationConfig)
val source = new TickSource(config.pollingStart, config.pollingInterval).source
val flow = new AviationFlow().flow(aviationConfig.getAviationUri(config.path), aviationConfig.apiTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package it.bitrock.dvs.producer.kafka

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
Expand All @@ -24,8 +23,6 @@ class KafkaFlightSinkFactorySpec
with TestValues {
import KafkaFlightSinkFactorySpec._

implicit val mat: ActorMaterializer = ActorMaterializer()

"sink method" should {

"convert a domain model to Kafka model and push it to a topic" in ResourceLoaner.withFixture {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package it.bitrock.dvs.producer.services

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.scaladsl.TestSink
Expand All @@ -16,7 +15,6 @@ import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

class AviationFlowSpec extends TestKit(ActorSystem("AviationFlowSpec")) with Suite with WordSpecLike {
implicit val mat: ActorMaterializer = ActorMaterializer()

"flow method" should {

Expand Down

0 comments on commit 01381ef

Please sign in to comment.