forked from miguno/kafka-storm-starter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaEmbedded.scala
97 lines (84 loc) · 3.76 KB
/
KafkaEmbedded.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.miguno.kafkastorm.kafka
import java.io.File
import java.util.Properties
import com.miguno.kafkastorm.logging.LazyLogging
import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServerStartable}
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
/**
* Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by default.
*
* Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance running at
* `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the `zookeeper.connect` parameter in the
* broker's configuration.
*
* @param config Broker configuration settings. Used to modify, for example, on which port the broker should listen to.
* Note that you cannot change the `log.dirs` setting currently.
*/
class KafkaEmbedded(config: Properties = new Properties) extends LazyLogging {
private val defaultZkConnect = "127.0.0.1:2181"
private val logDir = {
val random = (new scala.util.Random).nextInt()
val path = Seq(System.getProperty("java.io.tmpdir"), "kafka-test", "logs-" + random).mkString(File.separator)
new File(path)
}
private val effectiveConfig = {
val c = new Properties
c.load(this.getClass.getResourceAsStream("/broker-defaults.properties"))
c.putAll(config)
c.setProperty("log.dirs", logDir.getAbsolutePath)
c
}
private val kafkaConfig = new KafkaConfig(effectiveConfig)
private val kafka = new KafkaServerStartable(kafkaConfig)
/**
* This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
*
* You can use this to tell Kafka producers and consumers how to connect to this instance.
*/
val brokerList = kafka.serverConfig.hostName + ":" + kafka.serverConfig.port
/**
* The ZooKeeper connection string aka `zookeeper.connect`.
*/
val zookeeperConnect = {
val zkConnectLookup = Option(effectiveConfig.getProperty("zookeeper.connect"))
zkConnectLookup match {
case Some(zkConnect) => zkConnect
case _ =>
logger.warn(s"zookeeper.connect is not configured -- falling back to default setting $defaultZkConnect")
defaultZkConnect
}
}
/**
* Start the broker.
*/
def start() {
logger.debug(s"Starting embedded Kafka broker at $brokerList (with ZK server at $zookeeperConnect) ...")
kafka.startup()
logger.debug(s"Startup of embedded Kafka broker at $brokerList completed (with ZK server at $zookeeperConnect)")
}
/**
* Stop the broker.
*/
def stop() {
logger.debug(s"Shutting down embedded Kafka broker at $brokerList (with ZK server at $zookeeperConnect)...")
kafka.shutdown()
FileUtils.deleteQuietly(logDir)
logger.debug(s"Shutdown of embedded Kafka broker at $brokerList completed (with ZK server at $zookeeperConnect)")
}
def createTopic(topic: String, partitions: Int = 1, replicationFactor: Int = 1, config: Properties = new Properties): Unit = {
logger.debug(s"Creating topic { name: $topic, partitions: $partitions, replicationFactor: $replicationFactor, config: $config }")
val sessionTimeout = 10.seconds
val connectionTimeout = 8.seconds
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then createTopic() will only
// seem to work (it will return without error). Topic will exist in only ZooKeeper, and will be returned when
// listing topics, but Kafka itself does not create the topic.
val zkClient = new ZkClient(zookeeperConnect, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt,
ZKStringSerializer)
AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, config)
zkClient.close()
}
}