Skip to content

Commit

Permalink
m5l2 - Postgresql
Browse files Browse the repository at this point in the history
  • Loading branch information
evgnep authored and phaeton03 committed Dec 11, 2023
1 parent 45fbb19 commit 18f4008
Show file tree
Hide file tree
Showing 111 changed files with 2,682 additions and 917 deletions.
37 changes: 37 additions & 0 deletions deploy/docker-compose-spring-postgres.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '3'
services:
psql:
image: postgres
container_name: postgresql
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
expose:
- "5432"
environment:
POSTGRES_PASSWORD: marketplace-pass
POSTGRES_USER: postgres
POSTGRES_DB: marketplace
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 5s
retries: 5

app:
image: ok-marketplace-app-spring
container_name: app-spring
ports:
- "8080:8080"
expose:
- "8080"
environment:
SQL_URL: jdbc:postgresql://psql:5432/marketplace
depends_on:
psql:
condition: service_healthy

volumes:
postgres_data:

3 changes: 3 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ testContainersVersion=1.17.4
cache4kVersion=0.11.0
pluginShadow=7.1.2
yandexCloudSdkVersion=2.5.1
# PSQL
postgresDriverVersion=42.6.0
exposedVersion=0.41.1

# Kafka
kafkaVersion=3.4.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ru.otus.otuskotlin.marketplace.app.common

import kotlinx.datetime.Clock
import ru.otus.otuskotlin.marketplace.api.logs.mapper.toLog
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.models.MkplState
import kotlin.reflect.KClass

suspend inline fun <T> IMkplAppSettings.controllerHelper(
crossinline getRequest: suspend MkplContext.() -> Unit,
crossinline toResponse: suspend MkplContext.() -> T,
clazz: KClass<*>,
logId: String,
): T {
val logger = corSettings.loggerProvider.logger(clazz)
val ctx = MkplContext(
timeStart = Clock.System.now(),
)
return try {
logger.doWithLogging(logId) {
ctx.getRequest()
processor.exec(ctx)
logger.info(
msg = "Request $logId processed for ${clazz.simpleName}",
marker = "BIZ",
data = ctx.toLog(logId)
)
ctx.toResponse()
}
} catch (e: Throwable) {
logger.doWithLogging("$logId-failure") {
ctx.state = MkplState.FAILING
ctx.errors.add(e.asMkplError())
processor.exec(ctx)
ctx.toResponse()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.otus.otuskotlin.marketplace.app.common

import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings

interface IMkplAppSettings {
val processor: MkplAdProcessor
val corSettings: MkplCorSettings
}
12 changes: 0 additions & 12 deletions ok-marketplace-app-common/src/commonMain/kotlin/MkplAppSettings.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ru.otus.otuskotlin.marketplace.app.common

import kotlinx.coroutines.test.runTest
import ru.otus.otuskotlin.marketplace.api.v2.models.*
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings
import ru.otus.otuskotlin.marketplace.mappers.v2.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v2.toTransportAd
import kotlin.test.Test
import kotlin.test.assertEquals

class ControllerV2Test {

private val request = AdCreateRequest(
requestType = "create",
requestId = "1234",
ad = AdCreateObject(
title = "some ad",
description = "some description of some ad",
adType = DealSide.DEMAND,
visibility = AdVisibility.PUBLIC,
productId = "some product id",
),
debug = AdDebug(mode = AdRequestDebugMode.STUB, stub = AdRequestDebugStubs.SUCCESS)
)

private val appSettings: IMkplAppSettings = object : IMkplAppSettings {
override val corSettings: MkplCorSettings = MkplCorSettings()
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings)
}

private suspend fun createAdSpring(request: AdCreateRequest): AdCreateResponse =
appSettings.controllerHelper(
{ fromTransport(request) },
{ toTransportAd() as AdCreateResponse },
this::class,
"createAdSpring"
)

class TestApplicationCall(private val request: IRequest) {
var res: IResponse? = null

@Suppress("UNCHECKED_CAST")
fun <T : IRequest> receive(): T = request as T
fun respond(res: IResponse) {
this.res = res
}
}

private suspend fun TestApplicationCall.createAdKtor(appSettings: IMkplAppSettings) {
val resp = appSettings.controllerHelper(
{ fromTransport(receive<AdCreateRequest>()) },
{ toTransportAd() },
this::class,
"createAdKtor",
)
respond(resp)
}

@Test
fun springHelperTest() = runTest {
val res = createAdSpring(request)
assertEquals(ResponseResult.SUCCESS, res.result)
}

@Test
fun ktorHelperTest() = runTest {
val testApp = TestApplicationCall(request).apply { createAdKtor(appSettings) }
val res = testApp.res as AdCreateResponse
assertEquals(ResponseResult.SUCCESS, res.result)
}
}
1 change: 1 addition & 0 deletions ok-marketplace-app-common/src/commonTest/kotlin/package.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ru.otus.otuskotlin.marketplace.app.common
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package ru.otus.otuskotlin.marketplace.app.kafka

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings

class AppKafkaConfig(
val kafkaHosts: List<String> = KAFKA_HOSTS,
val kafkaGroupId: String = KAFKA_GROUP_ID,
val kafkaTopicInV1: String = KAFKA_TOPIC_IN_V1,
val kafkaTopicOutV1: String = KAFKA_TOPIC_OUT_V1,
val kafkaTopicInV2: String = KAFKA_TOPIC_IN_V2,
val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2
) {
val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2,
override val corSettings: MkplCorSettings = MkplCorSettings(),
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings),
): IMkplAppSettings {
companion object {
const val KAFKA_HOST_VAR = "KAFKA_HOSTS"
const val KAFKA_TOPIC_IN_V1_VAR = "KAFKA_TOPIC_IN_V1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
import mu.KotlinLogging
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.WakeupException
import ru.otus.otuskotlin.marketplace.api.logs.mapper.toLog
import ru.otus.otuskotlin.marketplace.app.common.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.common.process
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.models.MkplCommand
import java.time.Duration
import java.util.*

Expand All @@ -35,13 +30,10 @@ interface ConsumerStrategy {
class AppKafkaConsumer(
private val config: AppKafkaConfig,
consumerStrategies: List<ConsumerStrategy>,
setting: MkplAppSettings = corSettings,
private val consumer: Consumer<String, String> = config.createKafkaConsumer(),
private val producer: Producer<String, String> = config.createKafkaProducer()
) {
private val logger = setting.logger.logger(AppKafkaConsumer::class)
private val process = atomic(true) // пояснить
private val processor = setting.processor
private val topicsAndStrategyByInputTopic: Map<String, TopicsAndStrategy> = consumerStrategies.associate {
val topics = it.topics(config)
topics.input to TopicsAndStrategy(topics.input, topics.output, it)
Expand All @@ -58,12 +50,20 @@ class AppKafkaConsumer(
log.info { "Receive ${records.count()} messages" }

records.forEach { record: ConsumerRecord<String, String> ->
log.info { "process ${record.key()} from ${record.topic()}:\n${record.value()}" }
val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()] ?: throw RuntimeException("Receive message from unknown topic ${record.topic()}")
try {
val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()]
?: throw RuntimeException("Receive message from unknown topic ${record.topic()}")

processor.process(logger, "kafka",
{ strategy.deserialize(record.value(), this) },
{ sendResponse(this, strategy, outputTopic) })
val resp = config.controllerHelper(
{ strategy.deserialize(record.value(), this) },
{ strategy.serialize(this) },
this::class,
"AppKafkaConsumer",
)
sendResponse(resp, outputTopic)
} catch (ex: Exception) {
log.error(ex) { "error" }
}
}
}
} catch (ex: WakeupException) {
Expand All @@ -80,8 +80,7 @@ class AppKafkaConsumer(
}
}

private fun sendResponse(context: MkplContext, strategy: ConsumerStrategy, outputTopic: String) {
val json = strategy.serialize(context)
private fun sendResponse(json: String, outputTopic: String) {
val resRecord = ProducerRecord(
outputTopic,
UUID.randomUUID().toString(),
Expand All @@ -95,5 +94,9 @@ class AppKafkaConsumer(
process.value = false
}

private data class TopicsAndStrategy(val inputTopic: String, val outputTopic: String, val strategy: ConsumerStrategy)
private data class TopicsAndStrategy(
val inputTopic: String,
val outputTopic: String,
val strategy: ConsumerStrategy
)
}

This file was deleted.

22 changes: 22 additions & 0 deletions ok-marketplace-app-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import org.jetbrains.kotlin.util.suffixIfNot
val ktorVersion: String by project
val logbackVersion: String by project
val serializationVersion: String by project
val testContainersVersion: String by project
val kmpUUIDVersion: String by project

// ex: Converts to "io.ktor:ktor-ktor-server-netty:2.0.1" with only ktor("netty")
fun ktor(module: String, prefix: String = "server-", version: String? = this@Build_gradle.ktorVersion): Any =
Expand All @@ -19,6 +21,8 @@ plugins {
dependencies {
implementation("io.ktor:ktor-server-core-jvm:2.2.4")
implementation("io.ktor:ktor-server-websockets-jvm:2.2.4")
implementation(project(mapOf("path" to ":ok-marketplace-repo-postgresql")))
implementation(project(mapOf("path" to ":ok-marketplace-repo-postgresql")))


}
Expand Down Expand Up @@ -94,6 +98,7 @@ kotlin {
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-core:$serializationVersion")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion")

}
}

Expand Down Expand Up @@ -143,6 +148,8 @@ kotlin {
implementation("com.sndyuk:logback-more-appenders:1.8.8")
implementation("org.fluentd:fluent-logger:0.3.4")

implementation(project(":ok-marketplace-repo-postgresql"))

}
}

Expand All @@ -152,8 +159,23 @@ kotlin {
implementation(ktor("test-host")) // "io.ktor:ktor-server-test-host:$ktorVersion"
implementation(ktor("content-negotiation", prefix = "client-"))
implementation(ktor("websockets", prefix = "client-"))

implementation("org.testcontainers:postgresql:$testContainersVersion")
implementation("com.benasher44:uuid:$kmpUUIDVersion")
}
}
val nativeMain by creating {
dependsOn(commonMain)
}
val linuxX64Main by getting {
dependsOn(nativeMain)
}
val macosX64Main by getting {
dependsOn(nativeMain)
}
val macosArm64Main by getting {
dependsOn(nativeMain)
}
}
}

Expand Down
Loading

0 comments on commit 18f4008

Please sign in to comment.