Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M5l4 rabbit #21

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ coroutines-reactive = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-react

jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }

# Logging
logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
Expand Down Expand Up @@ -73,6 +74,9 @@ spring-webflux = { module = "org.springframework.boot:spring-boot-starter-webflu
spring-webflux-ui = { module = "org.springdoc:springdoc-openapi-starter-webflux-ui", version = "2.3.0" }
spring-test = { module = "org.springframework.boot:spring-boot-starter-test" }

# Message Queues
rabbitmq-client = { module = "com.rabbitmq:amqp-client", version = "5.20.0" }

# Testing
kotest-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
kotest-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" }
Expand All @@ -81,6 +85,7 @@ kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest"
mockito-kotlin = { module = "org.mockito.kotlin:mockito-kotlin", version = "5.2.1" }

testcontainers-core = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }
testcontainers-rabbitmq = { module = "org.testcontainers:rabbitmq", version.ref = "testcontainers" }

[bundles]
kotest = ["kotest-junit5", "kotest-core", "kotest-datatest", "kotest-property"]
Expand All @@ -91,6 +96,7 @@ kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
openapi-generator = { id = "org.openapi.generator", version.ref = "openapi-generator" }
crowdproj-generator = { id = "com.crowdproj.generator", version = "0.2.0" }
kotlinx-serialization = { id = "org.jetbrains.kotlin.plugin.serialization", version.ref = "kotlin" }
shadowJar = { id = "com.github.johnrengelman.shadow", version = "8.1.1" }

# Spring
spring-boot = { id = "org.springframework.boot", version.ref = "spring-boot" }
Expand All @@ -102,3 +108,4 @@ ktor = { id = "io.ktor.plugin", version.ref = "ktor" }

# Docker
muschko-remote = { id = "com.bmuschko.docker-remote-api", version.ref = "muschko" }
muschko-java = { id = "com.bmuschko.docker-java-application", version.ref = "muschko" }
2 changes: 2 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ kotlin {

// Stubs
implementation(project(":ok-marketplace-stubs"))
// RabbitMQ
// implementation(project(":ok-marketplace-app-rabbit"))

implementation(libs.kotlinx.serialization.core)
implementation(libs.kotlinx.serialization.json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ fun Application.moduleJvm(
}
module(appSettings)

// Неофициальное задание. Попробуйте сделать этот код работающим
// val rabbitServer = RabbitApp(appSettings, this@moduleJvm)
// rabbitServer?.start()

routing {
route("v1") {
install(ContentNegotiation) {
Expand All @@ -45,3 +49,4 @@ fun Application.moduleJvm(
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,18 @@ ktor:
# logger: socket
# socketLogger:
# port: 24224

# Пример конфига для RabbitMQ
#rabbit:
# enable: false
# host: localhost
# port: 5672
## username: guest
## password: guest
# v1:
# keyIn: mkpl-ads-v1-in
# keyOut: mkpl-ads-v1-out
# exchange: mkpl-ads-v1-exchange
# queue: mkpl-ads-v1-queue
# consumerTag: "mkpl-ads-v1-consumer"
# exchangeType: direct
37 changes: 37 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-rabbit/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id("build-jvm")
application
alias(libs.plugins.shadowJar)
alias(libs.plugins.muschko.java)
}

application {
mainClass.set("ru.otus.otuskotlin.marketplace.app.rabbit.ApplicationKt")
}

dependencies {

implementation(kotlin("stdlib"))

implementation(libs.rabbitmq.client)
implementation(libs.jackson.databind)
implementation(libs.logback)
implementation(libs.coroutines.core)

implementation(project(":ok-marketplace-common"))
implementation(project(":ok-marketplace-app-common"))
implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback")

// v1 api
implementation(project(":ok-marketplace-api-v1-jackson"))
implementation(project(":ok-marketplace-api-v1-mappers"))

// v2 api
implementation(project(":ok-marketplace-api-v2-kmp"))

implementation(project(":ok-marketplace-biz"))
implementation(project(":ok-marketplace-stubs"))

testImplementation(libs.testcontainers.rabbitmq)
testImplementation(kotlin("test"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ru.otus.otuskotlin.marketplace.app.rabbit

import kotlinx.coroutines.runBlocking
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.rabbit.config.RabbitConfig
import ru.otus.otuskotlin.marketplace.app.rabbit.mappers.fromArgs
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings
import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider
import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback

fun main(vararg args: String) = runBlocking {
val appSettings = MkplAppSettings(
rabbit = RabbitConfig.fromArgs(*args),
corSettings = MkplCorSettings(
loggerProvider = MpLoggerProvider { mpLoggerLogback(it) }
)
)
val app = RabbitApp(appSettings = appSettings, this)
app.start()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.otus.otuskotlin.marketplace.app.rabbit

import kotlinx.coroutines.*
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.IRabbitMqController
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.RabbitDirectControllerV1
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.RabbitDirectControllerV2
import java.util.concurrent.atomic.AtomicBoolean

// Класс запускает все контроллеры
@OptIn(ExperimentalCoroutinesApi::class)
class RabbitApp(
appSettings: MkplAppSettings,
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
) : AutoCloseable {
private val logger = appSettings.corSettings.loggerProvider.logger(this::class)
private val controllers: List<IRabbitMqController> = listOf(
RabbitDirectControllerV1(appSettings),
RabbitDirectControllerV2(appSettings),
)
private val runFlag = AtomicBoolean(true)

fun start() {
runFlag.set(true)
controllers.forEach {
scope.launch(
Dispatchers.IO.limitedParallelism(1) + CoroutineName("thread-${it.exchangeConfig.consumerTag}")
) {
while (runFlag.get()) {
try {
logger.info("Process...${it.exchangeConfig.consumerTag}")
it.process()
} catch (e: RuntimeException) {
// логируем, что-то делаем
logger.error("Обработка завалена, возможно из-за потери соединения с RabbitMQ. Рестартуем")
e.printStackTrace()
}
}
}
}
}

override fun close() {
runFlag.set(false)
controllers.forEach { it.close() }
scope.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings

interface IMkplAppRabbitSettings: IMkplAppSettings {
val rabbit: RabbitConfig
val controllersConfigV1: RabbitExchangeConfiguration
val controllersConfigV2: RabbitExchangeConfiguration
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings

data class MkplAppSettings(
override val corSettings: MkplCorSettings = MkplCorSettings(),
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings),
override val rabbit: RabbitConfig = RabbitConfig(),
override val controllersConfigV1: RabbitExchangeConfiguration = RabbitExchangeConfiguration.NONE,
override val controllersConfigV2: RabbitExchangeConfiguration = RabbitExchangeConfiguration.NONE,
): IMkplAppSettings, IMkplAppRabbitSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

data class RabbitConfig(
val host: String = HOST,
val port: Int = PORT,
val user: String = USER,
val password: String = PASSWORD
) {
companion object {
const val HOST = "localhost"
const val PORT = 5672
const val USER = "guest"
const val PASSWORD = "guest"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

// наш класс настроек взаимодействия с RMQ
data class RabbitExchangeConfiguration(
val keyIn: String = "",
val keyOut: String = "",
// Отправляем сообщение в обменник
val exchange: String = "",
// Подписываемся на очередь
val queue: String = "",
val consumerTag: String = "",
val exchangeType: String = "direct" // Объявляем обменник типа "direct" (сообщения передаются в те очереди, где ключ совпадает)
) {
companion object {
val NONE = RabbitExchangeConfiguration()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import ru.otus.otuskotlin.marketplace.app.rabbit.config.RabbitExchangeConfiguration

interface IRabbitMqController {
val exchangeConfig: RabbitExchangeConfiguration
suspend fun process()
fun close()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.models.MkplState
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd

// наследник RabbitProcessorBase, увязывает транспортную и бизнес-части
class RabbitDirectControllerV1(
private val appSettings: MkplAppSettings,
) : RabbitProcessorBase(
rabbitConfig = appSettings.rabbit,
exchangeConfig = appSettings.controllersConfigV1,
loggerProvider = appSettings.corSettings.loggerProvider,
) {
override suspend fun Channel.processMessage(message: Delivery) {
appSettings.controllerHelper(
{
val req = apiV1Mapper.readValue(message.body, IRequest::class.java)
fromTransport(req)
},
{
val res = toTransportAd()
apiV1Mapper.writeValueAsBytes(res).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it)
}
},
this@RabbitDirectControllerV1::class,
"rabbitmq-v1-processor"
)
}

override fun Channel.onError(e: Throwable, delivery: Delivery) {
val context = MkplContext()
e.printStackTrace()
context.state = MkplState.FAILING
context.errors.add(e.asMkplError())
val response = context.toTransportAd()
apiV1Mapper.writeValueAsBytes(response).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import ru.otus.otuskotlin.marketplace.api.v2.apiV2RequestDeserialize
import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v2.mappers.fromTransport
import ru.otus.otuskotlin.marketplace.api.v2.mappers.toTransportAd
import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.models.MkplState

class RabbitDirectControllerV2(
private val appSettings: MkplAppSettings,
) : RabbitProcessorBase(
rabbitConfig = appSettings.rabbit,
exchangeConfig = appSettings.controllersConfigV2,
loggerProvider = appSettings.corSettings.loggerProvider,
) {

override suspend fun Channel.processMessage(message: Delivery) {
appSettings.controllerHelper(
{
val req = apiV2RequestDeserialize<IRequest>(String(message.body))
fromTransport(req)
},
{
val res = toTransportAd()
apiV2ResponseSerialize(res).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it.toByteArray())
}
},
RabbitDirectControllerV2::class,
"rabbitmq-v2-processor"
)
}

override fun Channel.onError(e: Throwable, delivery: Delivery) {
val context = MkplContext()
e.printStackTrace()
context.state = MkplState.FAILING
context.errors.add(e.asMkplError())
val response = context.toTransportAd()
apiV2ResponseSerialize(response).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it.toByteArray())
}
}
}
Loading
Loading