Skip to content

Commit

Permalink
remove usage of prometheus client, replace with micrometer
Browse files Browse the repository at this point in the history
  • Loading branch information
tuantrannav committed Jan 5, 2024
1 parent c1c3c6d commit 9af4926
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class KafkaRapid(
// metric definitions
private val consumerMetric = KafkaClientMetrics(consumer)
private val producerMetric = KafkaClientMetrics(producer)
private val rapidMetric = KafkaRapidMetrics(this)
private val rapidMetric = RapidMetrics(this)
private val riverMetrics = RiverMetrics()

private val topics = listOf(rapidTopic) + extraTopics

Expand Down Expand Up @@ -251,7 +252,7 @@ class KafkaRapid(
}
}

fun getMetrics() = listOf(consumerMetric, producerMetric, rapidMetric)
fun getMetrics() = listOf(consumerMetric, producerMetric, rapidMetric, riverMetrics)

fun getConsumerMetric() = consumerMetric
fun getProducerMetric() = producerMetric
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package no.nav.helse.rapids_rivers

import com.fasterxml.jackson.databind.JsonNode
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.LocalDateTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.MeterBinder

class KafkaRapidMetrics(private val rapid: KafkaRapid): MeterBinder {
class RapidMetrics(private val rapid: KafkaRapid): MeterBinder {

override fun bindTo(registry: MeterRegistry) {
Gauge.builder("rapids_rivers_consumer_active", this) {consumerActive()}.register(registry)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package no.nav.helse.rapids_rivers

import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import io.prometheus.client.CollectorRegistry
import no.nav.helse.rapids_rivers.River.PacketListener.Companion.Name
import java.util.*

Expand All @@ -10,7 +18,21 @@ fun interface RandomIdGenerator {
fun generateId(): String
}

class River(rapidsConnection: RapidsConnection, private val randomIdGenerator: RandomIdGenerator = RandomIdGenerator.Default) : RapidsConnection.MessageListener {
class DefaultMeterRegistry {
companion object {
val collectorRegistry = CollectorRegistry.defaultRegistry
val Default = PrometheusMeterRegistry(
PrometheusConfig.DEFAULT,
collectorRegistry,
Clock.SYSTEM
)
}


}

class River(rapidsConnection: RapidsConnection, private val riverMetrics: RiverMetrics = RiverMetrics(),
private val randomIdGenerator: RandomIdGenerator = RandomIdGenerator.Default) : RapidsConnection.MessageListener{
private val validations = mutableListOf<PacketValidation>()

private val listeners = mutableListOf<PacketListener>()
Expand Down Expand Up @@ -57,27 +79,23 @@ class River(rapidsConnection: RapidsConnection, private val randomIdGenerator: R
packet.interestedIn("@event_name")
listeners.forEach {
val eventName = packet["@event_name"].textValue() ?: "ukjent"
Metrics.onPacketHistorgram.labels(
context.rapidName(),
it.name(),
eventName
).time {
riverMetrics.timer(context.rapidName(), it.name(), eventName) {
it.onPacket(packet, context)
}
Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "ok").inc()
riverMetrics.messageCounter(context.rapidName(), it.name(), "success")
}
}

private fun onSevere(error: MessageProblems.MessageException, context: MessageContext) {
listeners.forEach {
Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "severe").inc()
riverMetrics.messageCounter(context.rapidName(), it.name(), "severe")
it.onSevere(error, context)
}
}

private fun onError(problems: MessageProblems, context: MessageContext) {
listeners.forEach {
Metrics.onMessageCounter.labels(context.rapidName(), it.name(), "error").inc()
riverMetrics.messageCounter(context.rapidName(), it.name(), "error")
it.onError(problems, context)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package no.nav.helse.rapids_rivers

import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.binder.MeterBinder
import org.slf4j.LoggerFactory

class RiverMetrics: MeterBinder {

var meterRegistry: MeterRegistry = DefaultMeterRegistry.Default

companion object {
private val LOG = LoggerFactory.getLogger(RiverMetrics::class.java)
}
override fun bindTo(registry: MeterRegistry) {
LOG.debug("Binding to meter registry {}", registry)
this.meterRegistry = registry
}

fun messageCounter(rapidName:String, riverName: String, status: String) {
LOG.debug("Incrementing message counter for rapid {} and river {} with status {}", rapidName, riverName, status)
Counter.builder("message_counter").tags("rapid",rapidName, "river", riverName, "status", "severe")
.register(meterRegistry)
.increment()
}

fun timer(rapidName: String, riverName: String, eventName: String, runnable: Runnable) {
LOG.debug("Recording timer for rapid {} and river {} and event {}", rapidName, riverName, eventName)
Timer.builder("on_packet_seconds")
.tags("rapid", rapidName, "river", riverName, "event_name", eventName)
.register(meterRegistry)
.record(runnable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.UUID

class PingPongTest {
val rapid = TestRapid()
val river = PingPong(rapid, "pingerino", "pongaroonie")
val river = PingPong(rapidsConnection = rapid, appName = "pingerino", instanceId = "pongaroonie")

@BeforeEach
fun reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.slf4j.Logger

class KtorBuilder {
private val builder = ApplicationEngineEnvironmentBuilder()
private var collectorRegistry = CollectorRegistry.defaultRegistry
private val extraMeterBinders = mutableListOf<MeterBinder>()

fun port(port: Int) = apply {
Expand All @@ -54,11 +53,7 @@ class KtorBuilder {
fun <TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration> build(factory: ApplicationEngineFactory<TEngine, TConfiguration>): ApplicationEngine = embeddedServer(factory, applicationEngineEnvironment {
module {
install(MicrometerMetrics) {
registry = PrometheusMeterRegistry(
PrometheusConfig.DEFAULT,
collectorRegistry,
Clock.SYSTEM
)
registry = DefaultMeterRegistry.Default
meterBinders = listOf(
ClassLoaderMetrics(),
JvmMemoryMetrics(),
Expand All @@ -74,7 +69,7 @@ class KtorBuilder {
val names = call.request.queryParameters.getAll("name[]")?.toSet() ?: emptySet()

call.respondTextWriter(ContentType.parse(TextFormat.CONTENT_TYPE_004)) {
TextFormat.write004(this, collectorRegistry.filteredMetricFamilySamples(names))
TextFormat.write004(this, DefaultMeterRegistry.collectorRegistry.filteredMetricFamilySamples(names))
}
}
}
Expand Down Expand Up @@ -129,10 +124,6 @@ class KtorBuilder {
}
}

fun withCollectorRegistry(registry: CollectorRegistry) = apply {
this.collectorRegistry = registry
}

fun metrics(metrics: List<MeterBinder>) = apply {
extraMeterBinders.addAll(metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.nav.helse.rapids_rivers
import io.ktor.server.application.Application
import io.ktor.server.cio.CIO
import io.ktor.server.engine.*
import io.micrometer.core.instrument.MeterRegistry
import io.prometheus.client.CollectorRegistry
import kotlinx.coroutines.delay
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -147,10 +148,6 @@ class RapidApplication internal constructor(
.readiness(rapid::isReady)
.metrics(rapid.getMetrics())

fun withCollectorRegistry(registry: CollectorRegistry = CollectorRegistry.defaultRegistry) = apply {
ktor.withCollectorRegistry(registry)
}

fun withKtorModule(module: Application.() -> Unit) = apply {
ktor.module(module)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ internal class RapidApplicationComponentTest {
@DelicateCoroutinesApi
@Test
fun `metric values`() {
withRapid(collectorRegistry = CollectorRegistry.defaultRegistry) { rapid ->
withRapid { rapid ->
waitForEvent("application_ready")
rapid.publish("""{"@event_name":"ping","@id":"${UUID.randomUUID()}","ping_time":"${LocalDateTime.now()}"}""")
waitForEvent("ping")
Expand Down Expand Up @@ -230,12 +230,10 @@ internal class RapidApplicationComponentTest {
@DelicateCoroutinesApi
private fun withRapid(
builder: RapidApplication.Builder? = null,
collectorRegistry: CollectorRegistry = CollectorRegistry(),
block: (RapidsConnection) -> Unit
) {
val rapidsConnection =
(builder ?: RapidApplication.Builder(RapidApplication.RapidApplicationConfig.fromEnv(createConfig())))
.withCollectorRegistry(collectorRegistry)
.build()
val job = GlobalScope.launch { rapidsConnection.start() }
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
package no.nav.hm.rapids_rivers.micronaut

import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import jakarta.inject.Named
import jakarta.inject.Singleton
import no.nav.helse.rapids_rivers.KafkaConfig
import no.nav.helse.rapids_rivers.KafkaRapid
import no.nav.helse.rapids_rivers.KafkaRapidMetrics
import no.nav.helse.rapids_rivers.RapidMetrics
import no.nav.helse.rapids_rivers.RiverMetrics
import org.slf4j.LoggerFactory

@Factory
Expand All @@ -36,7 +34,7 @@ class RapidsRiversFactory {
}

@Singleton
fun rapidMetrics(kafkaRapid: KafkaRapid): KafkaRapidMetrics = kafkaRapid.getRapidMetric()
fun rapidMetrics(kafkaRapid: KafkaRapid): RapidMetrics = kafkaRapid.getRapidMetric()

@Singleton
@Named("ConsumerMetric")
Expand All @@ -46,5 +44,8 @@ class RapidsRiversFactory {
@Named("ProducerMetric")
fun producerMetric(kafkaRapid: KafkaRapid): KafkaClientMetrics = kafkaRapid.getProducerMetric()

@Singleton
fun riverMetrics(): RiverMetrics = RiverMetrics()


}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package no.nav.hm.rapids_rivers.micronaut


import io.micrometer.core.instrument.MeterRegistry
import io.micronaut.context.annotation.Prototype
import no.nav.helse.rapids_rivers.RapidsConnection
import no.nav.helse.rapids_rivers.River
import no.nav.helse.rapids_rivers.RiverMetrics
import org.slf4j.LoggerFactory

@Prototype
class RiverHead(rapidsConnection: RapidsConnection) {
class RiverHead(rapidsConnection: RapidsConnection, riverMetrics: RiverMetrics) {

private val river = River(rapidsConnection)
private val river = River(rapidsConnection, riverMetrics)

companion object {
private val LOG = LoggerFactory.getLogger(River::class.java)
Expand Down

0 comments on commit 9af4926

Please sign in to comment.