Skip to content

Commit

Permalink
V2.3.0 (#47)
Browse files Browse the repository at this point in the history
* Update version to 2.3.0

* describe_topics and describe_cluster added

* Set avro-python3 as legacy package and install new one with [avro] tag
  • Loading branch information
robooo authored Apr 20, 2024
1 parent 0add858 commit 22403f6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pip install robotframework-confluentkafkalibrary
```

Extra packages:
* [avro] = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
* [avro] = ['fastavro >= 1.3.2', 'avro >= 1.11.1']
* [legacyavro] = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
* [json] = ['jsonschema >= 3.2.0']
* [protobuf] = ['protobuf >= 4.22.0']

Expand Down
24 changes: 24 additions & 0 deletions examples/test_adminclient.robot
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ AdminClient Alter Configs
${config}= Describe Configs ${admin_client_id} ${resource}
Should Be Equal As Integers ${54321} ${config["${resource.name}"]['log.retention.ms'].value}

AdminClient Describe Topics
${topic_names}= Create List admintesting1 admintesting2 admintesting3
${topics}= Create List
FOR ${topic} IN @{topic_names}
${topic}= New Topic ${topic} num_partitions=${1} replication_factor=${1}
Append To List ${topics} ${topic}
END

${admin_client_id}= Create Admin Client
${results}= Create Topics group_id=${admin_client_id} new_topics=${topics}
Log ${results}

${results}= Describe Topics ${admin_client_id} ${topic_names}
Log ${results}
FOR ${topic} IN @{topic_names}
${status}= Evaluate len("${results["${topic}"].topic_id}") > 0
Should Be True ${status}
END
[Teardown] Delete Topics ${admin_client_id} ${topic_names}

AdminClient Describe Cluster
${admin_client_id}= Create Admin Client
${cluster}= Describe Cluster ${admin_client_id}
Should Not Be Empty ${cluster.cluster_id}

*** Keywords ***
All Messages Are Delivered
Expand Down
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
Confluent Kafka wrapped in Robot Framework.
"""[1:-1]

AVRO_REQUIRES = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
AVRO_REQUIRES = ['fastavro >= 1.3.2', 'avro >= 1.11.1']
LEGACYAVRO_REQUIRES = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
JSON_REQUIRES = ['jsonschema >= 3.2.0']
PROTO_REQUIRES = ['protobuf >= 4.22.0']
ALL = AVRO_REQUIRES + JSON_REQUIRES + PROTO_REQUIRES
ALL = AVRO_REQUIRES + LEGACYAVRO_REQUIRES + JSON_REQUIRES + PROTO_REQUIRES
setup(name = 'robotframework-confluentkafkalibrary',
version = VERSION,
description = 'Confluent Kafka library for Robot Framework',
Expand All @@ -32,12 +33,13 @@
],
install_requires = [
'robotframework >= 3.2.1',
'confluent-kafka == 2.2.0',
'confluent-kafka == 2.3.0',
'requests >= 2.25.1',
],
extras_require={
'all': ALL,
'avro': AVRO_REQUIRES,
'legacyavro': LEGACYAVRO_REQUIRES,
'json': JSON_REQUIRES,
'protobuf': PROTO_REQUIRES,
},
Expand Down
40 changes: 39 additions & 1 deletion src/ConfluentKafkaLibrary/admin_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import uuid
from confluent_kafka.admin import AdminClient
from confluent_kafka import KafkaException
from confluent_kafka import KafkaException, TopicCollection


class KafkaAdminClient():
Expand Down Expand Up @@ -170,6 +170,44 @@ def describe_configs(self, group_id, resources, **kwargs):
return f"Invalid input: {e}"
return config_results

def describe_topics(self, group_id, topics, **kwargs):
"""Describe topics.
- ``topics`` (list(str) or str): List of topic names or only topic name to describe.
"""
if isinstance(topics, list):
topics = TopicCollection(topics)
else:
topics = TopicCollection([topics])

topics = self.admin_clients[group_id].describe_topics(topics, **kwargs)
topics_results={}
for topic, f in topics.items():
try:
if f.exception() is None:
topics_results[topic] = f.result()
else:
topics_results[topic] = f.exception()
except KafkaException as e:
return f"Failed to describe topic {topic.name}: {e}"
except (TypeError, ValueError ) as e:
return f"Invalid input: {e}"
return topics_results

def describe_cluster(self, group_id, **kwargs):
"""Describe cluster.
"""
cluster = self.admin_clients[group_id].describe_cluster(**kwargs)
try:
if cluster.exception() is None:
cluster = cluster.result()
else:
cluster = cluster.exception()
except KafkaException as e:
return f"Failed to describe cluster: {e}"
except (TypeError, ValueError ) as e:
return f"Invalid input: {e}"
return cluster

def alter_configs(self, group_id, resources, **kwargs):
"""Update configuration properties for the specified resources.
- ``resources`` (list(ConfigResource) or ConfigResource): Resources to update configuration of.
Expand Down
2 changes: 1 addition & 1 deletion src/ConfluentKafkaLibrary/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '2.2.0-1'
VERSION = '2.3.0-1'

0 comments on commit 22403f6

Please sign in to comment.