From 384e2c6b45a6f821e2d16c9b35ac4046b88f5bc2 Mon Sep 17 00:00:00 2001 From: mhoppal Date: Mon, 6 Nov 2017 16:19:42 -0700 Subject: [PATCH 1/2] Add the support for upgrading kafka topic configs --- kafka-init/Dockerfile | 2 +- kafka-init/build.yml | 4 ++-- kafka-init/create_topics.py | 48 ++++++++++++++++++++++++++++++------- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/kafka-init/Dockerfile b/kafka-init/Dockerfile index fdd217395..f68f9bfc4 100644 --- a/kafka-init/Dockerfile +++ b/kafka-init/Dockerfile @@ -1,4 +1,4 @@ -ARG MON_KAFKA_VERSION=0.9.0.1-2.11-1.1.0 +ARG MON_KAFKA_VERSION=0.9.0.1-2.11-1.1.6 FROM monasca/kafka:${MON_KAFKA_VERSION} ENV KAFKA_HOST=kafka:9092 \ diff --git a/kafka-init/build.yml b/kafka-init/build.yml index a0c167806..55f6070a7 100644 --- a/kafka-init/build.yml +++ b/kafka-init/build.yml @@ -1,7 +1,7 @@ repository: monasca/kafka-init variants: - - tag: 0.0.2 + - tag: 0.0.3 aliases: - ':latest' args: - MON_KAFKA_VERSION: "0.9.0.1-2.11-1.1.1" + MON_KAFKA_VERSION: "0.9.0.1-2.11-1.1.6" diff --git a/kafka-init/create_topics.py b/kafka-init/create_topics.py index 527fa3f40..3a1e80789 100644 --- a/kafka-init/create_topics.py +++ b/kafka-init/create_topics.py @@ -114,6 +114,7 @@ def create_topics(default_config, existing_topics): return created = [] + existing_topics_config = {} for topic in TOPIC_STRING.split(','): params = topic.split(':') @@ -121,12 +122,6 @@ def create_topics(default_config, existing_topics): configs = default_config.copy() partitions = None replicas = None - - if topic_name in existing_topics: - logger.info('Topic already exists, will not create: %s', - topic_name) - continue - index = 0 for param in params: if '=' in param: @@ -161,13 +156,46 @@ def create_topics(default_config, existing_topics): 'partitions not set for topic %s, it will not be created!', topic_name) continue - + if topic_name in existing_topics: + logger.info('Topic %s already exists, ensuring configuration options match up', + topic_name) + existing_topics_config[topic_name] = configs + continue logger.info('Creating topic %s: partitions=%s, replicas=%s, config=%r', topic_name, partitions, replicas, configs) create_topic(topic_name, partitions, replicas, configs) created.append(topic_name) - return created + return created, existing_topics_config + + +def update_topic_configs(existing_topics_config): + for topic, config in existing_topics_config.iteritems(): + topic_describe_output, err = kafka_topics("describe", ["--topic", topic]) + topic_describe = topic_describe_output.split('\n')[0] + current_config = {} + if "Configs" in topic_describe: + topic_config = topic_describe.split("Configs:")[1] + if topic_config: + for config_str in topic_config.split(","): + k, v = map(lambda s: s.strip(), config_str.split('=', 2)) + current_config[k] = v + + configs_to_delete = filter(lambda x: x not in config.keys(), current_config.keys()) + if configs_to_delete: + logger.info('Removing topic configuration options %s from topic %s', configs_to_delete, topic) + config_input = [] + for config_opt in configs_to_delete: + config_input.append("--delete-config") + config_input.append(config_opt) + kafka_topics("alter", ["--topic", topic] + config_input) + + if config: + logger.info('Adding/Updating topic configuration options %s to topic %s', config.keys(), topic) + arg_pairs = map(lambda item: ['--config', '{0}={1}'.format(*item)], + config.items()) + config_args = list(chain(*arg_pairs)) + kafka_topics("alter", ["--topic", topic] + config_args) def main(): @@ -177,10 +205,12 @@ def main(): existing_topics = list_topics() logger.info('Kafka has topics: %r', existing_topics) - created_topics = create_topics(default_config, existing_topics) + created_topics, existing_topics_config = create_topics(default_config, existing_topics) logger.info('Topic creation finished successfully. Created: %r', created_topics) + logger.info(existing_topics_config) + update_topic_configs(existing_topics_config) if __name__ == '__main__': main() From f05668e2d667f501775098eed8e48c274af665de Mon Sep 17 00:00:00 2001 From: mhoppal Date: Tue, 7 Nov 2017 09:28:52 -0700 Subject: [PATCH 2/2] Move around log statements --- kafka-init/create_topics.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-init/create_topics.py b/kafka-init/create_topics.py index 3a1e80789..e0324e1c6 100644 --- a/kafka-init/create_topics.py +++ b/kafka-init/create_topics.py @@ -157,8 +157,6 @@ def create_topics(default_config, existing_topics): topic_name) continue if topic_name in existing_topics: - logger.info('Topic %s already exists, ensuring configuration options match up', - topic_name) existing_topics_config[topic_name] = configs continue logger.info('Creating topic %s: partitions=%s, replicas=%s, config=%r', @@ -171,6 +169,9 @@ def create_topics(default_config, existing_topics): def update_topic_configs(existing_topics_config): for topic, config in existing_topics_config.iteritems(): + logger.info('Topic %s already exists, ensuring configuration options match up', + topic) + logger.info('Attempting to set configuration options to %s', config) topic_describe_output, err = kafka_topics("describe", ["--topic", topic]) topic_describe = topic_describe_output.split('\n')[0] current_config = {} @@ -209,7 +210,6 @@ def main(): logger.info('Topic creation finished successfully. Created: %r', created_topics) - logger.info(existing_topics_config) update_topic_configs(existing_topics_config) if __name__ == '__main__':