Skip to content

Commit

Permalink
feat: share Kafka producers among threads (#1460)
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Voiturier <[email protected]>
  • Loading branch information
bvoiturier authored Nov 21, 2024
1 parent 8781d3d commit 5ece0dd
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 109 deletions.
16 changes: 8 additions & 8 deletions cloud-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -250,34 +250,34 @@ agent {
}
messagingService {
connectFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
issueFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
presentFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
didStateSync {
consumerCount = 5
consumerCount = 1
}
statusListSync {
consumerCount = 5
consumerCount = 1
}
inMemoryQueueCapacity = 1000
kafkaEnabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object OIDCCredentialIssuerServiceSpec
LinkSecretServiceImpl.layer,
CredentialServiceImpl.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
OIDCCredentialIssuerServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault {
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
ZLayer.succeed(WalletAccessContext(WalletId.random)),
zio.Scope.default
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault {
ZLayer.succeed(WalletAccessContext(WalletId.random)),
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId]
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
zio.Scope.default
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ object MessagingServiceTest extends ZIOAppDefault {
.fork
_ <- ZIO.never
} yield ()
effect.provide(
effect.provideSome(
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
ZLayer.succeed("Sample 'R' passed to handler")
ZLayer.succeed("Sample 'R' passed to handler"),
zio.Scope.default
)
}

Expand Down
32 changes: 13 additions & 19 deletions infrastructure/shared/docker-compose-with-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,36 +203,30 @@ services:
echo -e 'Creating kafka topics'
# Connect
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1
# Issue
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-3 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-4 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-DLQ --replication-factor 1 --partitions 1
# Present
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-3 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-4 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-DLQ --replication-factor 1 --partitions 1
# DID Publication State Sync
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 4
# Status List Sync
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 4
tail -f /dev/null
"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait CredentialServiceSpecHelper {
GenericSecretStorageInMemory.layer,
LinkSecretServiceImpl.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
CredentialServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait PresentationServiceSpecHelper {
PresentationRepositoryInMemory.layer,
CredentialRepositoryInMemory.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
) ++ defaultWalletLayer

def createIssuer(did: String): Issuer = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.hyperledger.identus.shared.messaging

import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl}
import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Task, URIO, URLayer, ZIO, ZLayer}
import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URLayer, ZIO, ZLayer}

import java.time.Instant
trait MessagingService {
def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]]
def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]]
def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]]
def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]]
}

object MessagingService {
Expand All @@ -22,7 +22,7 @@ object MessagingService {
groupId: String,
handler: Message[K, V] => RIO[HR, Unit],
steps: Seq[RetryStep]
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] = {
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] = {
for {
messagingService <- ZIO.service[MessagingService]
messageProducer <- ZIO.service[Producer[K, V]]
Expand Down Expand Up @@ -67,7 +67,7 @@ object MessagingService {
topicName: String,
consumerCount: Int,
handler: Message[K, V] => RIO[HR, Unit]
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] =
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] =
consumeWithRetryStrategy(groupId, handler, Seq(RetryStep(topicName, consumerCount, 0.seconds, None)))

val serviceLayer: URLayer[MessagingServiceConfig, MessagingService] =
Expand All @@ -81,15 +81,16 @@ object MessagingService {
def producerLayer[K: EnvironmentTag, V: EnvironmentTag](implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for {
): RLayer[Scope & MessagingService, Producer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
_ <- ZIO.logInfo("Producer layer invoked!!")
producer <- messagingService.makeProducer[K, V]()
} yield producer)

def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for {
): RLayer[Scope & MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
consumer <- messagingService.makeConsumer[K, V](groupId)
} yield consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ class InMemoryMessagingService(
]
) extends MessagingService {

override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] = {
override def makeConsumer[K, V](
groupId: String
)(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = {
ZIO.succeed(new InMemoryConsumer[K, V](groupId, topicQueues, processedMessagesMap))
}

override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] =
override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] =
ZIO.succeed(new InMemoryProducer[K, V](topicQueues, queueCapacity))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.hyperledger.identus.shared.messaging.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.header.Headers
import org.hyperledger.identus.shared.messaging.*
import zio.{Duration, RIO, Task, URIO, URLayer, ZIO, ZLayer}
import zio.{Duration, RIO, Scope, Task, URIO, URLayer, ZIO, ZLayer}
import zio.kafka.consumer.{
Consumer as ZKConsumer,
ConsumerSettings as ZKConsumerSettings,
Expand All @@ -21,23 +21,34 @@ class ZKafkaMessagingServiceImpl(
pollTimeout: Duration,
rebalanceSafeCommits: Boolean
) extends MessagingService {
override def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] =
ZIO.succeed(
new ZKafkaConsumerImpl[K, V](
bootstrapServers,
groupId,
kSerde,
vSerde,
autoCreateTopics,
maxPollRecords,
maxPollInterval,
pollTimeout,
rebalanceSafeCommits
override def makeConsumer[K, V](
groupId: String
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] =
for {
zkConsumer <- ZKConsumer.make(
ZKConsumerSettings(bootstrapServers)
.withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString)
.withGroupId(groupId)
// 'max.poll.records' default is 500. This is a Kafka property.
.withMaxPollRecords(maxPollRecords)
// 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property.
.withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record'
// 'pollTimeout' default is 50 millis. This is a ZIO Kafka property.
.withPollTimeout(pollTimeout)
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
.withRebalanceSafeCommits(rebalanceSafeCommits)
// .withMaxRebalanceDuration(30.seconds)
)
} yield new ZKafkaConsumerImpl[K, V](
zkConsumer,
kSerde,
vSerde
)

override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] =
ZIO.succeed(new ZKafkaProducerImpl[K, V](bootstrapServers, kSerde, vSerde))
override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] =
for {
zkProducer <- ZKProducer.make(ZKProducerSettings(bootstrapServers))
} yield new ZKafkaProducerImpl[K, V](zkProducer, kSerde, vSerde)
}

object ZKafkaMessagingServiceImpl {
Expand All @@ -60,32 +71,10 @@ object ZKafkaMessagingServiceImpl {
}

class ZKafkaConsumerImpl[K, V](
bootstrapServers: List[String],
groupId: String,
zkConsumer: ZKConsumer,
kSerde: Serde[K],
vSerde: Serde[V],
autoCreateTopics: Boolean,
maxPollRecords: Int,
maxPollInterval: Duration,
pollTimeout: Duration,
rebalanceSafeCommits: Boolean
vSerde: Serde[V]
) extends Consumer[K, V] {
private val zkConsumer = ZLayer.scoped(
ZKConsumer.make(
ZKConsumerSettings(bootstrapServers)
.withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString)
.withGroupId(groupId)
// 'max.poll.records' default is 500. This is a Kafka property.
.withMaxPollRecords(maxPollRecords)
// 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property.
.withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record'
// 'pollTimeout' default is 50 millis. This is a ZIO Kafka property.
.withPollTimeout(pollTimeout)
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
.withRebalanceSafeCommits(rebalanceSafeCommits)
// .withMaxRebalanceDuration(30.seconds)
)
)

private val zkKeyDeserializer = new ZKDeserializer[Any, K] {
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, K] =
Expand All @@ -98,9 +87,8 @@ class ZKafkaConsumerImpl[K, V](
}

override def consume[HR](topic: String, topics: String*)(handler: Message[K, V] => URIO[HR, Unit]): RIO[HR, Unit] =
ZKConsumer
zkConsumer
.plainStream(ZKSubscription.topics(topic, topics*), zkKeyDeserializer, zkValueDeserializer)
.provideSomeLayer(zkConsumer)
.mapZIO(record =>
handler(Message(record.key, record.value, record.offset.offset, record.timestamp)).as(record.offset)
)
Expand All @@ -109,13 +97,7 @@ class ZKafkaConsumerImpl[K, V](
.runDrain
}

class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], vSerde: Serde[V])
extends Producer[K, V] {
private val zkProducer = ZLayer.scoped(
ZKProducer.make(
ZKProducerSettings(bootstrapServers)
)
)
class ZKafkaProducerImpl[K, V](zkProducer: ZKProducer, kSerde: Serde[K], vSerde: Serde[V]) extends Producer[K, V] {

private val zkKeySerializer = new ZKSerializer[Any, K] {
override def serialize(topic: String, headers: Headers, value: K): RIO[Any, Array[Byte]] =
Expand All @@ -128,10 +110,9 @@ class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K],
}

override def produce(topic: String, key: K, value: V): Task[Unit] =
ZKProducer
zkProducer
.produce(topic, key, value, zkKeySerializer, zkValueSerializer)
.tap(metadata => ZIO.logInfo(s"Message produced: ${metadata.offset()}"))
.map(_ => ())
.provideSome(zkProducer)

}
Loading

0 comments on commit 5ece0dd

Please sign in to comment.