From 92481d65a0e77c209e1185f0790f67d1dc71915f Mon Sep 17 00:00:00 2001 From: david steinsland Date: Wed, 18 Dec 2024 11:53:01 +0100 Subject: [PATCH] =?UTF-8?q?returnerer=20ikke=20fra=20shutdownhook=20f?= =?UTF-8?q?=C3=B8r=20shutdownComplete?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Etter at shutdown-signalet er sendt så vil KafkaRapid lukke consumeren, som vil medføre "partitions revoked" som igjen vil medføre at vi forsøker å committe offsets. dette er avhengig av at vi kan få lov av brannmuren (linkerd) å ha utgående http connections. så snart pre-stop-endepunktet returnerer så vil linkerd-proxy-sidecar få SIGTERM. vi har derfor en hypotese om at vi må utsette å returnere fra pre-stop-endepunktet til ETTER at kafkarapid har avsluttet HELT. ellers kan det oppstå en situasjon hvor vi tror at vi ikke klarer å committe, fordi linkerd har skrudd seg av --- build.gradle.kts | 12 ++-- .../no/nav/helse/rapids_rivers/PreStopHook.kt | 4 +- .../helse/rapids_rivers/RapidApplication.kt | 57 ++++++++++++------- 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index b9bc6b1..06af270 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ val logbackClassicVersion = "1.5.12" val logbackEncoderVersion = "8.0" val awaitilityVersion = "4.2.2" val kafkaTestcontainerVersion = "1.20.4" -val tbdLibsVersion = "2024.11.29-15.07-105481e3" +val tbdLibsVersion = "2024.12.18-11.39-73f8eecb" group = "com.github.navikt" version = properties["version"] ?: "local-build" @@ -54,10 +54,12 @@ kotlin { tasks { jar { manifest { - attributes(mapOf( - "Implementation-Title" to project.name, - "Implementation-Version" to project.version - )) + attributes( + mapOf( + "Implementation-Title" to project.name, + "Implementation-Version" to project.version + ) + ) } } withType { diff --git a/src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt b/src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt index ad9bf53..0771929 100644 --- a/src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt +++ b/src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS +import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout @@ -17,6 +18,7 @@ class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListen private companion object { val log = LoggerFactory.getLogger(this::class.java) } + // bruker CONFLATED som er en channel med buffer på 1, hvor hver ny melding overskriver den forrige // i praksis vil dette bety at vi ikke blokkerer senderen av shutdown-signalet private val shutdownChannel = Channel(CONFLATED) @@ -25,7 +27,7 @@ class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListen rapid.register(this) } - override fun onShutdown(rapidsConnection: RapidsConnection) { + override fun onShutdownComplete(rapidsConnection: RapidsConnection) { runBlocking(Dispatchers.IO) { try { withTimeout(1.seconds) { diff --git a/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt b/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt index 7a436b0..3eef18d 100644 --- a/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt +++ b/src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt @@ -43,7 +43,12 @@ class RapidApplication internal constructor( } } - override fun onMessage(message: String, context: MessageContext, metadata: MessageMetadata, meterRegistry: MeterRegistry) { + override fun onMessage( + message: String, + context: MessageContext, + metadata: MessageMetadata, + meterRegistry: MeterRegistry + ) { notifyMessage(message, context, metadata, meterRegistry) } @@ -54,11 +59,7 @@ class RapidApplication internal constructor( rapid.start() } finally { onKtorShutdown() - val gracePeriod = 5000L - val forcefulShutdownTimeout = 30000L - log.info("shutting down ktor, waiting $gracePeriod ms for workers to exit. Forcing shutdown after $forcefulShutdownTimeout ms") - ktor.stop(gracePeriod, forcefulShutdownTimeout) - log.info("ktor shutdown complete: end of life. goodbye.") + log.info("shutdown complete: end of life. goodbye.") } } @@ -113,16 +114,20 @@ class RapidApplication internal constructor( log.info("publishing $event event for app_name=$appName, instance_id=$instanceId") try { rapidsConnection.publish(it) - } catch (err: Exception) { log.info("failed to publish event: {}", err.message, err) } + } catch (err: Exception) { + log.info("failed to publish event: {}", err.message, err) + } } } private fun applicationEvent(event: String): String? { if (appName == null) return null - val packet = JsonMessage.newMessage(event, mapOf( - "app_name" to appName, - "instance_id" to instanceId - )) + val packet = JsonMessage.newMessage( + event, mapOf( + "app_name" to appName, + "instance_id" to instanceId + ) + ) return packet.toJson() } @@ -132,7 +137,11 @@ class RapidApplication internal constructor( fun create( env: Map, consumerProducerFactory: ConsumerProducerFactory = ConsumerProducerFactory(AivenConfig.default), - meterRegistry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT, PrometheusRegistry.defaultRegistry, Clock.SYSTEM), + meterRegistry: PrometheusMeterRegistry = PrometheusMeterRegistry( + PrometheusConfig.DEFAULT, + PrometheusRegistry.defaultRegistry, + Clock.SYSTEM + ), builder: Builder.() -> Unit = {}, configure: (EmbeddedServer, KafkaRapid) -> Unit = { _, _ -> } ): RapidsConnection { @@ -157,9 +166,15 @@ class RapidApplication internal constructor( } private fun generateAppName(env: Map): String? { - val appName = env["NAIS_APP_NAME"] ?: return log.info("not generating app name because NAIS_APP_NAME not set").let { null } - val namespace = env["NAIS_NAMESPACE"] ?: return log.info("not generating app name because NAIS_NAMESPACE not set").let { null } - val cluster = env["NAIS_CLUSTER_NAME"] ?: return log.info("not generating app name because NAIS_CLUSTER_NAME not set").let { null } + val appName = + env["NAIS_APP_NAME"] ?: return log.info("not generating app name because NAIS_APP_NAME not set") + .let { null } + val namespace = + env["NAIS_NAMESPACE"] ?: return log.info("not generating app name because NAIS_NAMESPACE not set") + .let { null } + val cluster = + env["NAIS_CLUSTER_NAME"] ?: return log.info("not generating app name because NAIS_CLUSTER_NAME not set") + .let { null } return "$appName-$cluster-$namespace" } } @@ -191,9 +206,10 @@ class RapidApplication internal constructor( this.ktor = ktor } - fun withKtor(ktor: (PreStopHook, KafkaRapid) -> EmbeddedServer) = apply { - this.ktor = ktor(stopHook, rapid) - } + fun withKtor(ktor: (PreStopHook, KafkaRapid) -> EmbeddedServer) = + apply { + this.ktor = ktor(stopHook, rapid) + } fun withKtorModule(module: Application.() -> Unit) = apply { this.modules.add(module) @@ -223,7 +239,10 @@ class RapidApplication internal constructor( naisEndpoints = naisEndpoints.copy(preStopEndpoint = preStopHookEndpoint) } - fun build(configure: (EmbeddedServer, KafkaRapid) -> Unit = { _, _ -> }, cioConfiguration: CIOApplicationEngine.Configuration.() -> Unit = { } ): RapidsConnection { + fun build( + configure: (EmbeddedServer, KafkaRapid) -> Unit = { _, _ -> }, + cioConfiguration: CIOApplicationEngine.Configuration.() -> Unit = { } + ): RapidsConnection { val app = ktor ?: defaultKtorApp(cioConfiguration) configure(app, rapid) with(meterRegistry) {