Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Adding support for Redis lookup tables (#333)
Browse files Browse the repository at this point in the history
* Adding support for Redis lookup tables

* overrides

* tick

* test cleanup

* cleanup

* Removing retention parameter for redis lookup

* Fixing error on scalafmt

* Fix failing test for "table retention"

* Simplify redis key name

Co-authored-by: Sean Glover <[email protected]>
  • Loading branch information
Guillaume Roland and seglo authored Sep 24, 2022
1 parent 443bc06 commit b2431cb
Show file tree
Hide file tree
Showing 17 changed files with 1,063 additions and 132 deletions.
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ helm install kafka-lag-exporter/kafka-lag-exporter \
--set serviceAccount.create=true
```

Install with Redis persistence enabled

```
helm install kafka-lag-exporter/kafka-lag-exporter \
--name kafka-lag-exporter \
--namespace myproject \
--set redis.enabled=true \
--set redis.host=myredisserver \
--set clusters\[0\].name=my-cluster \
--set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap.myproject:9092
```

Run a debug install (`DEBUG` logging, debug helm chart install, force docker pull policy to `Always`).

```
Expand Down Expand Up @@ -270,7 +282,8 @@ General Configuration (`kafka-lag-exporter{}`)
| `reporters.influxdb.async` | `true` | Flag to enable influxdb async **non-blocking** write mode to send metrics
| `sinks` | `["PrometheusEndpointSink"]` | Specify which reporters must be used to send metrics. Possible values are: `PrometheusEndpointSink`, `InfluxDBPusherSink`, `GraphiteEndpointSink`. (if not set, only Prometheus is activated)
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `lookup-table.memory.size` | `60` | The maximum window size of the in memory look up table **per partition** |
| `lookup-table.redis` | `{}` | Configuration for the Redis persistence. This category is optional and will override use of the in memory lookup table if defined |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
Expand All @@ -291,6 +304,20 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |

Redis Details (`kafka-lag-exporter.lookup-table.redis{}`)

| Key | Default | Required | Description |
|--------------|------------------------|----------|---------------------------------------------------------------------------------------------------------------------|
| `database` | `0` | No | Redis database number. |
| `host` | `"localhost"` | No | Redis server to use. |
| `port` | `6379` | No | Redis port to use. |
| `timeout` | `60` | No | Redis connection timeout. |
| `prefix` | `"kafka-lag-exporter"` | No | Prefix used by all the keys. |
| `separator` | `":"` | No | Separator used to build the keys. |
| `retention` | `"1 day"` | No | Retention of the lookup table. Points will get removed from the table after that. |
| `expiration` | `"1 day"` | No | Expiration (TTL) of all the keys. |


Watchers (`kafka-lag-exporters.watchers{}`)

| Key | Default | Description |
Expand All @@ -308,7 +335,7 @@ kafka-lag-exporter {
port = 9999
}
}
lookup-table-size = 120
lookup-table.memory.size = 120
clusters = [
{
name = "a-cluster"
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ lazy val kafkaLagExporter =
PrometheusHttpServer,
ScalaJava8Compat,
AkkaHttp,
ScalaRedis,
Logback,
IAMAuthLib,
ScalaTest,
Expand All @@ -52,7 +53,8 @@ lazy val kafkaLagExporter =
AkkaStreamsTestKit,
AlpakkaKafkaTestKit,
TestcontainersKafka,
TestcontainersInfluxDb
TestcontainersInfluxDb,
TestcontainersRedis
),
dockerApiVersion := Some(DockerApiVersion(1, 41)),
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY"))
Expand Down
19 changes: 18 additions & 1 deletion charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ data:
kafka-lag-exporter {
port = {{ .Values.service.port }}
poll-interval = {{ .Values.pollIntervalSeconds }} seconds
lookup-table-size = {{ .Values.lookupTableSize }}
client-group-id = "{{ .Values.clientGroupId }}"
kafka-client-timeout = {{ .Values.kafkaClientTimeoutSeconds }} seconds
clusters = [
Expand Down Expand Up @@ -73,6 +72,24 @@ data:
}
{{- end }}
]
{{- if .Values.lookup.redis.enabled }}
{{- with .Values.lookup.redis }}
lookup-table.redis = {
host = {{ .host | quote }}
database = {{ .database | quote }}
port = {{ .port | quote }}
timeout = {{ .timeout }}
prefix = {{ .prefix | quote }}
separator = {{ .separator | quote }}
retention = {{ .retention | quote }}
expiration = {{ .expiration | quote }}
}
{{- end }}
{{- else }}
lookup-table.memory = {
size = {{ .Values.lookupTableSize | default .Values.lookup.memory.size }}
}
{{- end }}
{{- $sinks := list -}}
{{- if .Values.reporters.prometheus.enabled }}
Expand Down
18 changes: 16 additions & 2 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,24 @@ clusters: {}
# location: ny
# zone: "us-east"

# Lookup Table
lookup:
memory:
## Size of the sliding window of offsets to keep in each partition's lookup table
size: 60
redis:
enabled: false
# host: localhost
# database: 0
# port: 6379
# timeout: 60
# prefix: kafka-lag-exporter
# separator: ":"
# retention: 1 day
# expiration: 7 days

## The interval between refreshing metrics
pollIntervalSeconds: 30
## Size of the sliding window of offsets to keep in each partition's lookup table
lookupTableSize: 60
## The Consumer Group `group.id` to use when connecting to Kafka
clientGroupId: "kafkalagexporter"
## The timeout when communicating with Kafka clusters
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
# https://hub.docker.com/r/wurstmeister/zookeeper/
version: '2'
services:
redis:
image: redis:6.2.6
ports:
- "6379:6379"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
Expand Down
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object Version {
val Kafka = "3.2.2"
val Testcontainers = "1.17.3"
val IAMAuth = "1.1.4"
val Redis = "3.42"
}

object Dependencies {
Expand Down Expand Up @@ -46,6 +47,7 @@ object Dependencies {
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.2.10"
val IAMAuthLib = "software.amazon.msk" % "aws-msk-iam-auth" % Version.IAMAuth
val ScalaRedis = "net.debasishg" %% "redisclient" % Version.Redis

/* Test */
val AkkaTypedTestKit =
Expand All @@ -60,4 +62,6 @@ object Dependencies {
"org.testcontainers" % "kafka" % Version.Testcontainers % Test
val TestcontainersInfluxDb =
"org.testcontainers" % "influxdb" % Version.Testcontainers % Test
val TestcontainersRedis =
"org.testcontainers" % "spock" % Version.Testcontainers % Test
}
26 changes: 20 additions & 6 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,32 @@ kafka-lag-exporter {
reporters.prometheus.port = ${?KAFKA_LAG_EXPORTER_PORT}
poll-interval = 30 seconds
poll-interval = ${?KAFKA_LAG_EXPORTER_POLL_INTERVAL_SECONDS}
lookup-table-size = 60
lookup-table-size = ${?KAFKA_LAG_EXPORTER_LOOKUP_TABLE_SIZE}
# Deprecated
lookup-table-size = ${kafka-lag-exporter.lookup-table.memory.size}
lookup-table = {
memory = {
size = 60
size = ${?KAFKA_LAG_EXPORTER_LOOKUP_TABLE_SIZE}
}
# redis = {
# database = 0
# host = "localhost"
# port = 6379
# timeout = 60 seconds
# prefix = "kafkalagexporter"
# separator = ":"
# retention = 1 day
# expiration = 1 day
# }
}
client-group-id = "kafkalagexporter"
client-group-id = ${?KAFKA_LAG_EXPORTER_CLIENT_GROUP_ID}
kafka-client-timeout = 10 seconds
kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS}
clusters = []
clusters = ${?KAFKA_LAG_EXPORTER_CLUSTERS}
watchers = {
strimzi = "false"
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
watchers.strimzi = "false"
watchers.strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
metric-whitelist = [".*"]
}

Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ object AppConfig {
def apply(config: Config): AppConfig = {
val c = config.getConfig("kafka-lag-exporter")
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")

val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList

Expand All @@ -42,6 +41,7 @@ object AppConfig {
}
}

val lookupTable = LookupTableConfig(c)
val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters =
Expand Down Expand Up @@ -106,7 +106,7 @@ object AppConfig {

AppConfig(
pollInterval,
lookupTableSize,
lookupTable,
sinkConfigs,
clientGroupId,
kafkaClientTimeout,
Expand Down Expand Up @@ -195,7 +195,7 @@ final case class KafkaCluster(

final case class AppConfig(
pollInterval: FiniteDuration,
lookupTableSize: Int,
lookupTable: LookupTableConfig,
sinkConfigs: List[SinkConfig],
clientGroupId: String,
clientTimeout: FiniteDuration,
Expand All @@ -208,9 +208,10 @@ final case class AppConfig(
" (none)"
else clusters.map(_.toString).mkString("\n")
val sinksString = sinkConfigs.mkString("")
val lookupTableString = lookupTable.toString()
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|$lookupTableString
|Metrics whitelist: [${sinkConfigs.head.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
Expand Down
Loading

0 comments on commit b2431cb

Please sign in to comment.