Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Add InfluxDb2 Pusher Sink #455

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,27 +268,34 @@ See section below for more information.

General Configuration (`kafka-lag-exporter{}`)

| Key | Default | Description |
|-------------------------------|----------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `reporters.influxdb.endpoint` | None | The influxdb host to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.port` | None | The influxdb port to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.database` | `kafka_lag_exporter` | The influxdb database to send metrics to |
| `reporters.influxdb.username` | None | The influxdb username to connect (if not set, username will be empty) |
| `reporters.influxdb.password` | None | The influxdb password to connect (if not set, password will be empty) |
| `reporters.influxdb.async` | `true` | Flag to enable influxdb async **non-blocking** write mode to send metrics
| `sinks` | `["PrometheusEndpointSink"]` | Specify which reporters must be used to send metrics. Possible values are: `PrometheusEndpointSink`, `InfluxDBPusherSink`, `GraphiteEndpointSink`. (if not set, only Prometheus is activated)
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table.memory.size` | `60` | The maximum window size of the in memory look up table **per partition** |
| `lookup-table.redis` | `{}` | Configuration for the Redis persistence. This category is optional and will override use of the in memory lookup table if defined |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| Key | Default | Description |
|-----------------------------------------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `reporters.influxdb.endpoint` | None | The influxdb V1 host to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.port` | None | The influxdb V1 port to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.database` | `kafka_lag_exporter` | The influxdb V1 database to send metrics to |
| `reporters.influxdb.username` | None | The influxdb V1 username to connect (if not set, username will be empty) |
| `reporters.influxdb.password` | None | The influxdb V1 password to connect (if not set, password will be empty) |
| `reporters.influxdb.async` | `true` | Flag to enable influxdb async **non-blocking** write mode to send metrics |
| `reporters.influxdb2.endpoint` | None | The influxdb V2 host to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb2.port` | None | The influxdb V2 port to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb2.bucket` | `kafka_lag_exporter` | The influxdb V2 bucket name to send metrics to |
| `reporters.influxdb2.retention-seconds` | 604800 | The influxdb V2 data bucket retention seconds default is 7 days = 604800 seconds |
| `reporters.influxdb2.org-id` | None | The influxdb V2 Org Id |
| `reporters.influxdb2.org-name` | None | The influxdb V2 Org name |
| `reporters.influxdb2.token` | None | The influxdb V2 token that can access the bucket |
| `sinks` | `["PrometheusEndpointSink"]` | Specify which reporters must be used to send metrics. Possible values are: `PrometheusEndpointSink`, `InfluxDBPusherSink`, `InfluxDB2PusherSink`, `GraphiteEndpointSink`. (if not set, only Prometheus is activated) |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table.memory.size` | `60` | The maximum window size of the in memory look up table **per partition** |
| `lookup-table.redis` | `{}` | Configuration for the Redis persistence. This category is optional and will override use of the in memory lookup table if defined |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down
15 changes: 7 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import Dependencies._
import ReleasePlugin.autoImport._
import ReleaseKeys._
import ReleaseTransformations._
import com.typesafe.sbt.packager.docker.{Cmd, DockerPermissionStrategy, ExecCmd}
import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._
import com.typesafe.sbt.packager.docker.DockerApiVersion
import com.typesafe.sbt.packager.docker.DockerPlugin.UnixSeparatorChar
import sbt.IO
import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._
import com.typesafe.sbt.packager.docker.{Cmd, DockerApiVersion, DockerPermissionStrategy, ExecCmd}
import sbtrelease.ReleasePlugin.autoImport.ReleaseKeys._
import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._
import sbtrelease.ReleasePlugin.autoImport._

import java.time.format.DateTimeFormatter
import java.time.Instant
import java.time.format.DateTimeFormatter
import scala.sys.process._

lazy val kafkaLagExporter =
Expand Down Expand Up @@ -37,6 +35,7 @@ lazy val kafkaLagExporter =
AkkaStreams,
AkkaStreamsProtobuf,
AkkaInfluxDB,
InfluxDB2,
Fabric8Model,
Fabric8Client,
Prometheus,
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object Dependencies {
"com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val AkkaInfluxDB =
"com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "3.0.4"
var InfluxDB2 = "com.influxdb" %% "influxdb-client-scala" % "6.7.0"
val Logback = "ch.qos.logback" % "logback-classic" % "1.4.4"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot =
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object AppConfig {
Some(new PrometheusEndpointSinkConfig(sink, metricWhitelist, c))
case "InfluxDBPusherSink" =>
Some(new InfluxDBPusherSinkConfig(sink, metricWhitelist, c))
case "InfluxDB2PusherSink" =>
Some(new InfluxDB2PusherSinkConfig(sink, metricWhitelist, c))
case "GraphiteEndpointSink" =>
Some(new GraphiteEndpointConfig(sink, metricWhitelist, c))
case _ => None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2023 Sean Glover <https://seanglover.com>
*/

package com.lightbend.kafkalagexporter

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Source}

import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.scala.{InfluxDBClientScala, InfluxDBClientScalaFactory}
import com.influxdb.client.write.Point

import com.lightbend.kafkalagexporter.EndpointSink.ClusterGlobalLabels
import com.lightbend.kafkalagexporter.MetricsSink.{MetricValue, RemoveMetric}

import com.typesafe.scalalogging.Logger

import okhttp3.OkHttpClient

import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder

import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.function.{BiConsumer, Consumer}

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Try

object InfluxDB2PusherSink {

def apply(
sinkConfig: InfluxDB2PusherSinkConfig,
clusterGlobalLabels: ClusterGlobalLabels
): MetricsSink = {
Try(new InfluxDB2PusherSink(sinkConfig, clusterGlobalLabels))
.fold(
t => throw new IOException("Could not create Influx DB 2 Pusher Sink", t),
sink => sink
)
}
}

class InfluxDB2PusherSink private (
sinkConfig: InfluxDB2PusherSinkConfig,
clusterGlobalLabels: ClusterGlobalLabels
) extends EndpointSink(clusterGlobalLabels) {



implicit val system: ActorSystem = ActorSystem("InfluxDB2PusherSink")
val logger: Logger = Logger("InfluxDB2PusherSink")
private val influxDbUrl = sinkConfig.endpoint + ":" + sinkConfig.port
createBucket()
val influxDB: InfluxDBClientScala = connect()

override def report(m: MetricValue): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches) && !m.value.isNaN && !m.value.isInfinite) {
write(m)
}
}

def write(m: MetricValue): Unit = {
try {
val point = buildPoint(m)
writeAsync(point)
} catch {
case t: Throwable =>
handlingFailure(t)
}
}

def writeAsync(point: Point): Unit = {
val sourcePoint = Source.single(point)
val sinkPoint = influxDB.getWriteScalaApi.writePoint(bucket = Some(sinkConfig.bucket))
val materializedPoint = sourcePoint.toMat(sinkPoint)(Keep.right)
Await.result(materializedPoint.run(), Duration.Inf)
}

def buildPoint(m: MetricValue): Point = {
val point = Point
.measurement(m.definition.name)
for (
globalLabels <- clusterGlobalLabels.get(m.clusterName);
(tagName, tagValue) <- globalLabels
)
{
point.addTag(tagName, tagValue)
}

val fields = m.definition.labels zip m.labels

fields.foreach { field => point.addTag(field._1, field._2) }

point
.addField("value", m.value)
.time(Instant.now(), WritePrecision.NS)
}

override def remove(m: RemoveMetric): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches))
logger.warn("Remove is not supported by InfluxDBPusherSink")
}

def connect(): InfluxDBClientScala = {
val builder = new OkHttpClient.Builder()
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.connectTimeout(30, TimeUnit.SECONDS)

val options = InfluxDBClientOptions.builder()
.url(influxDbUrl)
.okHttpClient(builder)
.authenticateToken(sinkConfig.token.toCharArray)
.org(sinkConfig.orgName)
.bucket(sinkConfig.bucket)
.build()

InfluxDBClientScalaFactory
.create(options)
}

private def createBucket(): Unit = {
val httpClient = HttpClientBuilder.create().build()
val url = influxDbUrl + "/api/v2/buckets"

val bucketCreateJson =
s"""{
"orgID": "${sinkConfig.orgId}",
"name": "${sinkConfig.bucket}",
"description": "Kafka Lag exporter bucket",
"rp": "string",
"retentionRules": [
{
"type": "expire",
"everySeconds": ${sinkConfig.retentionSeconds},
"shardGroupDurationSeconds": 0
}
],
"schemaType": "implicit"
}"""

val httpPost = new HttpPost(url)
httpPost.setEntity(new StringEntity(bucketCreateJson))

httpPost.addHeader("Content-Type", "application/json")
httpPost.addHeader("Authorization", s"Token ${sinkConfig.token}")

val response = httpClient.execute(httpPost)

if (response.getStatusLine.getStatusCode == 201) {
logger.info(s"Bucket ${sinkConfig.bucket} created successfully")
}
else if(response.getStatusLine.getStatusCode == 422) {
logger.info(s"Bucket ${sinkConfig.bucket} already exist")
}
else {
logger.error(s"Bucket ${sinkConfig.bucket} couldn't be created as ${response.getStatusLine.getReasonPhrase} ")
}

}


def failQueryHandler(): Consumer[Throwable] = {
(throwable: Throwable) => {
handlingFailure(throwable)
}
}

def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] = {
(_: Iterable[Point], throwable: Throwable) => {
handlingFailure(throwable)
}
}

def handlingFailure(t: Throwable): Unit = {
logger.error("Unrecoverable exception, will stop ", t)
stop()
throw t
}

override def stop(): Unit = {
influxDB.close()
}
}


Loading