Skip to content

Commit

Permalink
returnerer ikke fra shutdownhook før shutdownComplete
Browse files Browse the repository at this point in the history
Etter at shutdown-signalet er sendt så vil KafkaRapid lukke consumeren, som vil medføre "partitions revoked" som igjen vil medføre at vi forsøker å committe offsets.

dette er avhengig av at vi kan få lov av brannmuren (linkerd) å ha utgående http connections.

så snart pre-stop-endepunktet returnerer så vil linkerd-proxy-sidecar få SIGTERM.

vi har derfor en hypotese om at vi må utsette å returnere fra pre-stop-endepunktet til ETTER at kafkarapid har avsluttet HELT. ellers kan det oppstå en situasjon hvor vi tror at vi ikke klarer å committe, fordi linkerd har skrudd seg av
  • Loading branch information
davidsteinsland committed Dec 18, 2024
1 parent fc0a3d5 commit 92481d6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 25 deletions.
12 changes: 7 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ val logbackClassicVersion = "1.5.12"
val logbackEncoderVersion = "8.0"
val awaitilityVersion = "4.2.2"
val kafkaTestcontainerVersion = "1.20.4"
val tbdLibsVersion = "2024.11.29-15.07-105481e3"
val tbdLibsVersion = "2024.12.18-11.39-73f8eecb"

group = "com.github.navikt"
version = properties["version"] ?: "local-build"
Expand Down Expand Up @@ -54,10 +54,12 @@ kotlin {
tasks {
jar {
manifest {
attributes(mapOf(
"Implementation-Title" to project.name,
"Implementation-Version" to project.version
))
attributes(
mapOf(
"Implementation-Title" to project.name,
"Implementation-Version" to project.version
)
)
}
}
withType<Test> {
Expand Down
4 changes: 3 additions & 1 deletion src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
Expand All @@ -17,6 +18,7 @@ class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListen
private companion object {
val log = LoggerFactory.getLogger(this::class.java)
}

// bruker CONFLATED som er en channel med buffer på 1, hvor hver ny melding overskriver den forrige
// i praksis vil dette bety at vi ikke blokkerer senderen av shutdown-signalet
private val shutdownChannel = Channel<Boolean>(CONFLATED)
Expand All @@ -25,7 +27,7 @@ class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListen
rapid.register(this)
}

override fun onShutdown(rapidsConnection: RapidsConnection) {
override fun onShutdownComplete(rapidsConnection: RapidsConnection) {
runBlocking(Dispatchers.IO) {
try {
withTimeout(1.seconds) {
Expand Down
57 changes: 38 additions & 19 deletions src/main/kotlin/no/nav/helse/rapids_rivers/RapidApplication.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ class RapidApplication internal constructor(
}
}

override fun onMessage(message: String, context: MessageContext, metadata: MessageMetadata, meterRegistry: MeterRegistry) {
override fun onMessage(
message: String,
context: MessageContext,
metadata: MessageMetadata,
meterRegistry: MeterRegistry
) {
notifyMessage(message, context, metadata, meterRegistry)
}

Expand All @@ -54,11 +59,7 @@ class RapidApplication internal constructor(
rapid.start()
} finally {
onKtorShutdown()
val gracePeriod = 5000L
val forcefulShutdownTimeout = 30000L
log.info("shutting down ktor, waiting $gracePeriod ms for workers to exit. Forcing shutdown after $forcefulShutdownTimeout ms")
ktor.stop(gracePeriod, forcefulShutdownTimeout)
log.info("ktor shutdown complete: end of life. goodbye.")
log.info("shutdown complete: end of life. goodbye.")
}
}

Expand Down Expand Up @@ -113,16 +114,20 @@ class RapidApplication internal constructor(
log.info("publishing $event event for app_name=$appName, instance_id=$instanceId")
try {
rapidsConnection.publish(it)
} catch (err: Exception) { log.info("failed to publish event: {}", err.message, err) }
} catch (err: Exception) {
log.info("failed to publish event: {}", err.message, err)
}
}
}

private fun applicationEvent(event: String): String? {
if (appName == null) return null
val packet = JsonMessage.newMessage(event, mapOf(
"app_name" to appName,
"instance_id" to instanceId
))
val packet = JsonMessage.newMessage(
event, mapOf(
"app_name" to appName,
"instance_id" to instanceId
)
)
return packet.toJson()
}

Expand All @@ -132,7 +137,11 @@ class RapidApplication internal constructor(
fun create(
env: Map<String, String>,
consumerProducerFactory: ConsumerProducerFactory = ConsumerProducerFactory(AivenConfig.default),
meterRegistry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT, PrometheusRegistry.defaultRegistry, Clock.SYSTEM),
meterRegistry: PrometheusMeterRegistry = PrometheusMeterRegistry(
PrometheusConfig.DEFAULT,
PrometheusRegistry.defaultRegistry,
Clock.SYSTEM
),
builder: Builder.() -> Unit = {},
configure: (EmbeddedServer<CIOApplicationEngine, CIOApplicationEngine.Configuration>, KafkaRapid) -> Unit = { _, _ -> }
): RapidsConnection {
Expand All @@ -157,9 +166,15 @@ class RapidApplication internal constructor(
}

private fun generateAppName(env: Map<String, String>): String? {
val appName = env["NAIS_APP_NAME"] ?: return log.info("not generating app name because NAIS_APP_NAME not set").let { null }
val namespace = env["NAIS_NAMESPACE"] ?: return log.info("not generating app name because NAIS_NAMESPACE not set").let { null }
val cluster = env["NAIS_CLUSTER_NAME"] ?: return log.info("not generating app name because NAIS_CLUSTER_NAME not set").let { null }
val appName =
env["NAIS_APP_NAME"] ?: return log.info("not generating app name because NAIS_APP_NAME not set")
.let { null }
val namespace =
env["NAIS_NAMESPACE"] ?: return log.info("not generating app name because NAIS_NAMESPACE not set")
.let { null }
val cluster =
env["NAIS_CLUSTER_NAME"] ?: return log.info("not generating app name because NAIS_CLUSTER_NAME not set")
.let { null }
return "$appName-$cluster-$namespace"
}
}
Expand Down Expand Up @@ -191,9 +206,10 @@ class RapidApplication internal constructor(
this.ktor = ktor
}

fun withKtor(ktor: (PreStopHook, KafkaRapid) -> EmbeddedServer<CIOApplicationEngine, CIOApplicationEngine.Configuration>) = apply {
this.ktor = ktor(stopHook, rapid)
}
fun withKtor(ktor: (PreStopHook, KafkaRapid) -> EmbeddedServer<CIOApplicationEngine, CIOApplicationEngine.Configuration>) =
apply {
this.ktor = ktor(stopHook, rapid)
}

fun withKtorModule(module: Application.() -> Unit) = apply {
this.modules.add(module)
Expand Down Expand Up @@ -223,7 +239,10 @@ class RapidApplication internal constructor(
naisEndpoints = naisEndpoints.copy(preStopEndpoint = preStopHookEndpoint)
}

fun build(configure: (EmbeddedServer<CIOApplicationEngine, CIOApplicationEngine.Configuration>, KafkaRapid) -> Unit = { _, _ -> }, cioConfiguration: CIOApplicationEngine.Configuration.() -> Unit = { } ): RapidsConnection {
fun build(
configure: (EmbeddedServer<CIOApplicationEngine, CIOApplicationEngine.Configuration>, KafkaRapid) -> Unit = { _, _ -> },
cioConfiguration: CIOApplicationEngine.Configuration.() -> Unit = { }
): RapidsConnection {
val app = ktor ?: defaultKtorApp(cioConfiguration)
configure(app, rapid)
with(meterRegistry) {
Expand Down

0 comments on commit 92481d6

Please sign in to comment.