Skip to content

Commit

Permalink
Merge pull request #322 from monasca/feature/support_kafka_topic_upgr…
Browse files Browse the repository at this point in the history
…ades

Add support for upgrading kafka topic configs
  • Loading branch information
timothyb89 authored Nov 7, 2017
2 parents 484eb35 + f05668e commit cf882e2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
2 changes: 1 addition & 1 deletion kafka-init/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG MON_KAFKA_VERSION=0.9.0.1-2.11-1.1.0
ARG MON_KAFKA_VERSION=0.9.0.1-2.11-1.1.6
FROM monasca/kafka:${MON_KAFKA_VERSION}

ENV KAFKA_HOST=kafka:9092 \
Expand Down
4 changes: 2 additions & 2 deletions kafka-init/build.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
repository: monasca/kafka-init
variants:
- tag: 0.0.2
- tag: 0.0.3
aliases:
- ':latest'
args:
MON_KAFKA_VERSION: "0.9.0.1-2.11-1.1.1"
MON_KAFKA_VERSION: "0.9.0.1-2.11-1.1.6"
48 changes: 39 additions & 9 deletions kafka-init/create_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,14 @@ def create_topics(default_config, existing_topics):
return

created = []
existing_topics_config = {}

for topic in TOPIC_STRING.split(','):
params = topic.split(':')
topic_name = params.pop(0)
configs = default_config.copy()
partitions = None
replicas = None

if topic_name in existing_topics:
logger.info('Topic already exists, will not create: %s',
topic_name)
continue

index = 0
for param in params:
if '=' in param:
Expand Down Expand Up @@ -161,13 +156,47 @@ def create_topics(default_config, existing_topics):
'partitions not set for topic %s, it will not be created!',
topic_name)
continue

if topic_name in existing_topics:
existing_topics_config[topic_name] = configs
continue
logger.info('Creating topic %s: partitions=%s, replicas=%s, config=%r',
topic_name, partitions, replicas, configs)
create_topic(topic_name, partitions, replicas, configs)
created.append(topic_name)

return created
return created, existing_topics_config


def update_topic_configs(existing_topics_config):
for topic, config in existing_topics_config.iteritems():
logger.info('Topic %s already exists, ensuring configuration options match up',
topic)
logger.info('Attempting to set configuration options to %s', config)
topic_describe_output, err = kafka_topics("describe", ["--topic", topic])
topic_describe = topic_describe_output.split('\n')[0]
current_config = {}
if "Configs" in topic_describe:
topic_config = topic_describe.split("Configs:")[1]
if topic_config:
for config_str in topic_config.split(","):
k, v = map(lambda s: s.strip(), config_str.split('=', 2))
current_config[k] = v

configs_to_delete = filter(lambda x: x not in config.keys(), current_config.keys())
if configs_to_delete:
logger.info('Removing topic configuration options %s from topic %s', configs_to_delete, topic)
config_input = []
for config_opt in configs_to_delete:
config_input.append("--delete-config")
config_input.append(config_opt)
kafka_topics("alter", ["--topic", topic] + config_input)

if config:
logger.info('Adding/Updating topic configuration options %s to topic %s', config.keys(), topic)
arg_pairs = map(lambda item: ['--config', '{0}={1}'.format(*item)],
config.items())
config_args = list(chain(*arg_pairs))
kafka_topics("alter", ["--topic", topic] + config_args)


def main():
Expand All @@ -177,10 +206,11 @@ def main():
existing_topics = list_topics()
logger.info('Kafka has topics: %r', existing_topics)

created_topics = create_topics(default_config, existing_topics)
created_topics, existing_topics_config = create_topics(default_config, existing_topics)
logger.info('Topic creation finished successfully. Created: %r',
created_topics)

update_topic_configs(existing_topics_config)

if __name__ == '__main__':
main()

0 comments on commit cf882e2

Please sign in to comment.