diff --git a/build.gradle.kts b/build.gradle.kts index 60cd520..5bc2834 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { implementation("io.quarkus:quarkus-config-yaml") implementation("io.quarkus:quarkus-scheduler") implementation("io.quarkus:quarkus-resteasy-reactive-jackson") + implementation("io.quarkus:quarkus-smallrye-reactive-messaging-rabbitmq") implementation("io.quarkus:quarkus-container-image-docker") implementation("io.quarkus:quarkus-hibernate-orm-panache-kotlin") implementation("io.quarkus:quarkus-jdbc-mariadb") diff --git a/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt b/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt index a2f6629..0204824 100644 --- a/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt +++ b/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt @@ -5,6 +5,8 @@ import com.faforever.icebreaker.persistence.IceSessionEntity import com.faforever.icebreaker.persistence.IceSessionRepository import com.faforever.icebreaker.security.CurrentUserService import com.faforever.icebreaker.util.AsyncRunner +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.convertValue import io.quarkus.scheduler.Scheduled import io.quarkus.security.ForbiddenException import io.quarkus.security.UnauthorizedException @@ -12,10 +14,14 @@ import io.quarkus.security.identity.SecurityIdentity import io.smallrye.jwt.build.Jwt import io.smallrye.mutiny.Multi import io.smallrye.mutiny.helpers.MultiEmitterProcessor +import io.vertx.core.json.JsonObject import jakarta.enterprise.inject.Instance import jakarta.inject.Singleton import jakarta.transaction.Transactional import org.eclipse.microprofile.jwt.JsonWebToken +import org.eclipse.microprofile.reactive.messaging.Channel +import org.eclipse.microprofile.reactive.messaging.Emitter +import org.eclipse.microprofile.reactive.messaging.Incoming import org.slf4j.Logger import org.slf4j.LoggerFactory import java.time.Instant @@ -30,10 +36,13 @@ class SessionService( private val iceSessionRepository: IceSessionRepository, private val securityIdentity: SecurityIdentity, private val currentUserService: CurrentUserService, + private val objectMapper: ObjectMapper, + @Channel("events-out") + private val rabbitmqEventEmitter: Emitter, ) { private val activeSessionHandlers = sessionHandlers.filter { it.active } - private val eventEmitter = MultiEmitterProcessor.create() - private val eventBroadcast: Multi = eventEmitter.toMulti().broadcast().toAllSubscribers() + private val localEventEmitter = MultiEmitterProcessor.create() + private val localEventBroadcast: Multi = localEventEmitter.toMulti().broadcast().toAllSubscribers() fun buildToken(gameId: Long): String { val userId = @@ -126,9 +135,9 @@ class SessionService( fun listenForEventMessages(gameId: Long): Multi { val userId = currentUserService.getCurrentUserId() - eventEmitter.emit(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!)) + rabbitmqEventEmitter.send(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!)) - return eventBroadcast.filter { + return localEventBroadcast.filter { it.gameId == gameId && (it.recipientId == userId || (it.recipientId == null && it.senderId != userId)) } } @@ -144,6 +153,12 @@ class SessionService( "current user id $currentUserId from endpoint does not match sourceId ${candidatesMessage.senderId} in candidateMessage" } - eventEmitter.emit(candidatesMessage) + rabbitmqEventEmitter.send(candidatesMessage) + } + + @Incoming("events-in") + fun onEventMessage(eventMessage: JsonObject) { + val parsedMessage = objectMapper.convertValue(eventMessage.map) + localEventEmitter.emit(parsedMessage) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 887849f..5a5c7e0 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -53,6 +53,30 @@ smallrye: jwt: sign: key: ${JWT_PRIVATE_KEY_PATH} +mp: + messaging: + incoming: + events-in: + connector: smallrye-rabbitmq + virtual-host: ${RABBITMQ_VHOST:/faf-core} + queue: + name: events.${HOSTNAME:local} + auto-delete: true + exclusive: true + exchange: + name: ice + outgoing: + events-out: + connector: smallrye-rabbitmq + virtual-host: ${RABBITMQ_VHOST:/faf-core} + exchange: + name: ice + +rabbitmq-host: ${RABBITMQ_HOST:localhost} +rabbitmq-port: ${RABBITMQ_PORT:5672} +rabbitmq-username: ${RABBITMQ_USER:admin} +rabbitmq-password: ${RABBITMQ_PASSWORD:banana} + "%dev": smallrye: jwt: