Skip to content

Commit

Permalink
Updates (#2)
Browse files Browse the repository at this point in the history
* Remove stop of daemon thread option

* Make consumers and producers public to allow list topics for both of them

* Extend tests with List Topics keyword

* Temp possible fix of list topics

* Change return of reference in decode_data to new list

* Update test for debug

* Update producer flush

* Usage of OFFSET vars in robot tests

* FIx return only copy of messages to not overwrite

* Remove unused args

* Update get_messages_from_thread args

* Update assign_to_topic_partition

* Add topic partition related keywords

* Add seek keyword

* pep8 updates

* Make decode_data private

* Remove remove_zero_bytes from _decode_data

* Allow poll full message object via only_value arg

* Temp tests for travis validation

* Bump requirements versions

* Bump requirements versions

* Revert Poll error handling

* Revert Poll error handling

* Add partition for produce

* Change only_value default to True

* Minor test updates

* Raise poll error

* Refactor is_assigned

* Introduce unassign

* Add get_position keyword

* Fix indent

* Update confluent-kafka-library

* Run enterprise kafka for testing

* Fix arg order in  GetMessagesThread

* Add Create Topic Partition keyword

* Refactoring and bugfixing

* Update examples

* Update readme

* Bump version
  • Loading branch information
robooo authored Apr 11, 2020
1 parent 98a3320 commit f6ff2a3
Show file tree
Hide file tree
Showing 14 changed files with 538 additions and 161 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ services:
- docker

before_install:
- docker pull johnnypark/kafka-zookeeper
- docker run -td -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper
- cd examples && docker-compose up -d && cd ..
- docker ps -a
- pip install -r requirements.txt
- pip install requests
Expand All @@ -14,4 +13,4 @@ before_install:
- pip install .

script:
- python3 -m robot examples/test.robot
- python3 -m robot examples/
18 changes: 1 addition & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,7 @@

ConfluentKafkaLibrary library is wrapper for [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python).

Still in development, right now supports:

* [ ] Consumer
* [X] Poll
* [X] Un/Subscribe
* [X] Create / Stop consumer
* [X] Assign
* [X] List topics
* [ ] commit
* [ ] offsets
* [X] Run in thread
* [X] Decode option of data from topic
* [X] Producer


ConfluentKafkaLibrary works with latest confluent-kafka-python 1.0.0.

ConfluentKafkaLibrary works with latest confluent-kafka-python, tags are 1:1 (ConfluentKafkaLibrary 1.3.0 == confluent-kafka-python 1.3.0 ). Bugfixes and updates are set after the '-' e.g. `1.3.0-1`.

## Documentation

Expand Down
4 changes: 2 additions & 2 deletions docs/index.html

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

broker:
image: confluentinc/cp-enterprise-kafka:5.3.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.1
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
11 changes: 11 additions & 0 deletions examples/schema/producer/KeySchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
18 changes: 18 additions & 0 deletions examples/schema/producer/ValueSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "number",
"type": [
"int",
"null"
]
}
]
}
169 changes: 150 additions & 19 deletions examples/test.robot
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,163 @@ Suite Setup Starting Test


*** Test Cases ***
Verify Topics
${group_id}= Create Consumer auto_offset_reset=earliest
${topics}= List Topics ${group_id}
Dictionary Should Contain Key ${topics} ${TEST_TOPIC}
[Teardown] Close Consumer ${group_id}

Basic Consumer
${group_id}= Create Consumer auto_offset_reset=earliest
Subscribe Topic group_id=${group_id} topics=test
${messages}= Poll group_id=${group_id} max_records=3
${messages}= Decode Data ${messages} decode_format=utf8
Subscribe Topic group_id=${group_id} topics=${TEST_TOPIC}
${messages}= Poll group_id=${group_id} max_records=3 decode_format=utf8
${data}= Create List Hello World {'test': 1}
Lists Should Be Equal ${messages} ${data}
Unsubscribe ${group_id}
Close Consumer ${group_id}
[Teardown] Basic Teardown ${group_id}

Verify Threaded Consumer
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Produce Without Value
${topic_name}= Set Variable topicwithoutvaluee
Produce group_id=${PRODUCER_ID} topic=${topic_name}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${group_id}= Create Consumer auto_offset_reset=earliest
Subscribe Topic group_id=${group_id} topics=test
${messages}= Poll group_id=${group_id} max_records=3
List Should Contain Sub List ${thread_messages} ${messages}
Unsubscribe ${group_id}
Close Consumer ${group_id}
Subscribe Topic group_id=${group_id} topics=${topic_name}
${messages}= Poll group_id=${group_id} max_records=1
Should Be Equal As Strings ${messages} [None]

Verify Position
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${OFFSET_END}
Assign To Topic Partition ${group_id} ${tp}
Sleep 5sec # Need to wait for an assignment
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_before}= Set Variable ${position[0].offset}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Dummy partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_produce}= Set Variable ${position[0].offset}
Should Be Equal As Integers ${position_before} ${position_after_produce}

${messages}= Poll group_id=${group_id} max_records=1 decode_format=utf8
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_poll_1}= Set Variable ${position[0].offset}
Should Not Be Equal As Integers ${position_after_poll_1} ${position_after_produce}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Dummy partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${messages}= Poll group_id=${group_id} max_records=1 decode_format=utf8
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_poll_2}= Set Variable ${position[0].offset}
Should Be Equal As Integers ${position_after_poll_1 + 1} ${position_after_poll_2}
[Teardown] Basic Teardown ${group_id}

Consumer With Assignment To Last Message After Get Of Watermark Offsets
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID}
${offset}= Get Watermark Offsets ${group_id} ${tp}
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${offset[1]}
Assign To Topic Partition ${group_id} ${tp}
Prepare Data
${messages}= Poll group_id=${group_id} max_records=6 decode_format=utf8
Lists Should Be Equal ${TEST_DATA} ${messages}
[Teardown] Basic Teardown ${group_id}

Consumer With Assignment To OFFSET_END
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${OFFSET_END}
Assign To Topic Partition ${group_id} ${tp}
# Need to wait for an async assignment, be aware the Is Assigned could return True but
# that doesn't mean assignment is completed
Sleep 5sec
Prepare Data
${messages}= Poll group_id=${group_id} poll_attempts=30 max_records=6 timeout=5 decode_format=utf8
Lists Should Be Equal ${TEST_DATA} ${messages}
[Teardown] Unassign Teardown ${group_id}

Verify Test And Threaded Consumer
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${group_id}= Create Consumer
Subscribe Topic group_id=${group_id} topics=${TEST_TOPIC}
${messages}= Poll group_id=${group_id}
Prepare Data
${thread_messages}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
${messages}= Poll group_id=${group_id} max_records=6 decode_format=utf8
Lists Should Be Equal ${thread_messages} ${messages}
[Teardown] Run Keywords Basic Teardown ${group_id} AND
... Clear Messages From Thread ${MAIN_THREAD}

Verify Clean Of Threaded Consumer Messages
[Setup] Prepare Data
${thread_messages1}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Clear Messages From Thread ${MAIN_THREAD}
${thread_messages2}= Get Messages From Thread ${MAIN_THREAD}
Lists Should Be Equal ${TEST_DATA} ${thread_messages1}
Should Be Empty ${thread_messages2}
[Teardown] Clear Messages From Thread ${MAIN_THREAD}

Remove And Publish New Messages From Threaded Consumer
[Setup] Prepare Data
${thread_messages1}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Clear Messages From Thread ${MAIN_THREAD}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=After partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Clear partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec # if next command is polling messages in thread we need to wait a second

${thread_messages2}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
${data}= Create List After Clear
Should Be Equal ${data} ${thread_messages2}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=LAST partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec
Append To List ${data} LAST
${thread_messages2}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Should Be Equal ${TEST_DATA} ${thread_messages1}
Should Be Equal ${data} ${thread_messages2}
[Teardown] Clear Messages From Thread ${MAIN_THREAD}


*** Keywords ***
Starting Test
${thread}= Start Consumer Threaded topics=test auto_offset_reset=earliest
Set Global Variable ${MAIN_THREAD} ${thread}

Set Suite Variable ${TEST_TOPIC} test
${thread}= Start Consumer Threaded topics=${TEST_TOPIC}
Set Suite Variable ${MAIN_THREAD} ${thread}
${producer_group_id}= Create Producer
Produce group_id=${producer_group_id} topic=test value=Hello
Produce group_id=${producer_group_id} topic=test value=World
Produce group_id=${producer_group_id} topic=test value={'test': 1}
Flush ${producer_group_id}
Set Suite Variable ${PRODUCER_ID} ${producer_group_id}

${topics}= List Topics ${producer_group_id}
${partitions}= Get Topic Partitions ${topics['${TEST_TOPIC}']}
${partition_id}= Set Variable ${partitions[0].id}
Set Suite Variable ${P_ID} ${partition_id}
${tp}= Create Topic Partition ${TEST_TOPIC} ${partition_id} ${OFFSET_BEGINNING}

${data}= Create List Hello World {'test': 1} {'test': 2} {'test': 3} {'test': 4}
Set Suite Variable ${TEST_DATA} ${data}
Prepare Data

Prepare Data
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Hello partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=World partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 1} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 2} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 3} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 4} partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec # if next command is polling messages in thread we need to wait a second

All Messages Are Delivered
[Arguments] ${producer_id}
${count}= Flush ${producer_id}
Log Reaming messages to be delivered: ${count}
Should Be Equal As Integers ${count} 0

Basic Teardown
[Arguments] ${group_id}
Unsubscribe ${group_id}
Close Consumer ${group_id}

Unassign Teardown
[Arguments] ${group_id}
Unassign ${group_id}
Close Consumer ${group_id}
74 changes: 74 additions & 0 deletions examples/test_avro.robot
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
*** Settings ***
Library ConfluentKafkaLibrary
Library Collections
Library String

Suite Setup Starting Test


*** Test Cases ***
Avro Producer With Schemas As String Argument
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${value_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"},{"name": "number","type": ["int","null"]}]}
${key_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"}]}
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
... value_schema=${value_schema} key_schema=${key_schema}
${value}= Create Dictionary name=Robot number=${10}
Produce group_id=${producer_id} topic=avro_testing1 partition=${0} value=${value} key=${KEY}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
Sleep 1s

${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing1
${messages}= Poll group_id=${consumer_group_id}
Should Be Equal ${TEST_DATA} ${messages}
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Should Be Equal ${TEST_DATA} ${thread_messages}
[Teardown] Basic Teardown ${consumer_group_id}

Avro Producer With Path To Schemas
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${value_schema_file_path}= Set Variable examples/schema/producer/ValueSchema.avsc
${key_schema_file_path}= Set Variable examples/schema/producer/KeySchema.avsc
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
... value_schema=${value_schema_file_path} key_schema=${key_schema_file_path}
${value}= Create Dictionary name=Robot number=${10}
Produce group_id=${producer_id} topic=avro_testing2 partition=${0} value=${value} key=${KEY}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
Sleep 1s

${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing2
${messages}= Poll group_id=${consumer_group_id}
Should Be Equal ${TEST_DATA} ${messages}
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Should Be Equal ${TEST_DATA} ${thread_messages}
[Teardown] Basic Teardown ${consumer_group_id}


*** Keywords ***
Starting Test
Set Suite Variable @{TEST_TOPIC} avro_testing1 avro_testing2
Set Suite Variable &{KEY} name=testkey
${value}= Create Dictionary name=Robot number=${10}
${data}= Create List ${value}
Set Suite Variable ${TEST_DATA} ${data}

${thread}= Start Consumer Threaded topics=${TEST_TOPIC} schema_registry_url=http://127.0.0.1:8081 auto_offset_reset=latest
Set Suite Variable ${MAIN_THREAD} ${thread}

All Messages Are Delivered
[Arguments] ${producer_id}
${count}= Flush ${producer_id}
Log Reaming messages to be delivered: ${count}
Should Be Equal As Integers ${count} 0

Basic Teardown
[Arguments] ${group_id}
Unsubscribe ${group_id}
Close Consumer ${group_id}

Unassign Teardown
[Arguments] ${group_id}
Unassign ${group_id}
Close Consumer ${group_id}
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
robotframework==3.1.1
confluent-kafka==1.0.0
robotframework==3.1.2
confluent-kafka==1.2.0
uuid==1.30
Loading

0 comments on commit f6ff2a3

Please sign in to comment.