From 42d5662b84389f207a8eb822287964366f56fdb0 Mon Sep 17 00:00:00 2001 From: Utkarsha Kapoor Date: Mon, 8 Feb 2021 12:42:26 +0530 Subject: [PATCH] Issue #SB-21144: KafkaDispatcher changes to ingest message greater than 1MB --- .../analytics/framework/dispatcher/KafkaDispatcher.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala index 094a99db..114f11b9 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala @@ -45,6 +45,7 @@ object KafkaDispatcher extends IDispatcher { val topic = config.getOrElse("topic", null).asInstanceOf[String] val batchSize = config.getOrElse("batchSize", 100).asInstanceOf[Integer]; val lingerMs = config.getOrElse("lingerMs", 10).asInstanceOf[Integer]; + val maxRequestSize = config.getOrElse("max_request_size", 1000000).asInstanceOf[Integer] if (null == brokerList) { throw new DispatcherException("brokerList parameter is required to send output to kafka") } @@ -53,7 +54,7 @@ object KafkaDispatcher extends IDispatcher { } events.foreachPartition((partitions: Iterator[String]) => { - val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList, batchSize, lingerMs)); + val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList, batchSize, lingerMs, maxRequestSize)); partitions.foreach { message => try { kafkaSink.send(topic, message, new Callback { @@ -80,7 +81,7 @@ object KafkaDispatcher extends IDispatcher { } - private def _getKafkaProducerConfig(brokerList: String, batchSize: Integer, lingerMs: Integer): HashMap[String, Object] = { + private def _getKafkaProducerConfig(brokerList: String, batchSize: Integer, lingerMs: Integer, maxRequestSize: Integer): HashMap[String, Object] = { val props = new HashMap[String, Object]() props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); @@ -89,6 +90,7 @@ object KafkaDispatcher extends IDispatcher { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs) + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize) props }