From d9222cfe0bb680c610ada12bbc2c273fcbfacaf1 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 11:48:59 +0200 Subject: [PATCH 01/46] enable loading hosts from file during testing --- pykafka/test/utils.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pykafka/test/utils.py b/pykafka/test/utils.py index eafbcaee9..5f553edd5 100644 --- a/pykafka/test/utils.py +++ b/pykafka/test/utils.py @@ -19,6 +19,17 @@ def get_cluster(): os.environ['BROKERS'], os.environ['ZOOKEEPER'], os.environ.get('BROKERS_SSL', None)) + elif os.environ.get('HOSTS_FILE', None): + # Broker is already running. Use that. + hosts = {} + with open(os.environ['HOSTS_FILE'], 'r') as o: + for line in o: + name, host = line.split('=', 1) + hosts[name.strip()] = host.strip() + return KafkaConnection(os.environ['KAFKA_BIN'], + hosts['BROKERS'], + hosts['ZOOKEEPER'], + hosts.get('BROKERS_SSL', None)) else: return KafkaInstance(num_instances=3) From 12503c9df073cb66e0bffc769dd0afac409c63f3 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 11:55:53 +0200 Subject: [PATCH 02/46] make _kafka_version a tuple to make comparisons easier --- pykafka/test/kafka_instance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index eefe8ed5c..1de16b8af 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -151,7 +151,7 @@ def __init__(self, use_gevent=False): """Start kafkainstace with given settings""" self._num_instances = num_instances - self._kafka_version = kafka_version + self._kafka_version = tuple(int(v) for v in kafka_version.split('.')) self._scala_version = scala_version self._bin_dir = bin_dir self._processes = [] @@ -201,7 +201,7 @@ def _download_kafka(self): url_fmt = 'https://archive.apache.org/dist/kafka/{kafka_version}/kafka_{scala_version}-{kafka_version}.tgz' url = url_fmt.format( scala_version=self._scala_version, - kafka_version=self._kafka_version + kafka_version='.'.join(str(v) for v in self._kafka_version) ) p1 = subprocess.Popen(['curl', '-vs', url], stdout=subprocess.PIPE) p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir, @@ -248,7 +248,7 @@ def _gen_ssl_certs(self): :returns: :class:`CertManager` or None upon failure """ - if self._kafka_version >= "0.9": # no SSL support in earlier versions + if self._kafka_version >= (0, 9): # no SSL support in earlier versions try: return CertManager(self._bin_dir) except: # eg. because openssl or other tools not installed From ce10315530aae6fd42fc67f63f6a2a55cd0e61f0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 12:06:52 +0200 Subject: [PATCH 03/46] refactor server.properties generation for test brokers --- pykafka/test/kafka_instance.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 1de16b8af..8810e8cc4 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -38,7 +38,7 @@ _kafka_properties = """ # Configurable settings broker.id={broker_id} -{port_config} +listeners={listeners} zookeeper.connect={zk_connstr} log.dirs={data_dir} @@ -57,7 +57,6 @@ """ _kafka_ssl_properties = """ -listeners=PLAINTEXT://localhost:{port},SSL://localhost:{ssl_port} ssl.keystore.location={keystore_path} ssl.keystore.password={store_pass} ssl.key.password={store_pass} @@ -298,15 +297,9 @@ def _start_broker_proc(self, port, ssl_port=None): Returns a proc handler for the new broker. """ # make port config for new broker - if self.certs is not None: - port_config = _kafka_ssl_properties.format( - port=port, - ssl_port=ssl_port, - keystore_path=self.certs.keystore, - truststore_path=self.certs.truststore, - store_pass=self.certs.broker_pass) - else: - port_config = "port={}".format(port) + listeners = ['PLAINTEXT://localhost:' + str(port)] + if ssl_port is not None: + listeners.append('SSL://localhost:'+str(ssl_port)) self._brokers_started += 1 i = self._brokers_started @@ -314,13 +307,21 @@ def _start_broker_proc(self, port, ssl_port=None): # write conf file for the new broker conf = os.path.join(self._conf_dir, 'kafka_{instance}.properties'.format(instance=i)) + with open(conf, 'w') as f: f.write(_kafka_properties.format( broker_id=i, - port_config=port_config, + listeners=','.join(listeners), zk_connstr=self.zookeeper, data_dir=self._data_dir + '_{instance}'.format(instance=i), )) + if ssl_port: + f.write(_kafka_ssl_properties.format( + port=port, + keystore_path=self.certs.keystore, + truststore_path=self.certs.truststore, + store_pass=self.certs.broker_pass) + ) # start process and append to self._broker_procs binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh') @@ -339,7 +340,6 @@ def _start_broker_proc(self, port, ssl_port=None): return new_proc - def _start_brokers(self): """Start all brokers and return used ports.""" ports = self._port_generator(9092) From 0fe66e748b4c07e6b4b9109fc8a242ad121da073 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 13:51:26 +0200 Subject: [PATCH 04/46] enable sasl for test broker --- pykafka/test/kafka_instance.py | 97 +++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 12 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 8810e8cc4..ec91e058b 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -32,6 +32,8 @@ from testinstances.managed_instance import ManagedInstance from pykafka.utils.compat import range, get_bytes, get_string +SASL_USER = 'alice' +SASL_PASSWORD = 'alice-secret' log = logging.getLogger(__name__) @@ -70,25 +72,50 @@ maxClientCnxns=0 """ +_kafka_sasl_properties = """ +sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 +security.inter.broker.protocol=SASL_PLAINTEXT +sasl.mechanism.inter.broker.protocol=PLAIN +""" + +_kafka_server_jaas_config = """ +KafkaServer {{ + org.apache.kafka.common.security.plain.PlainLoginModule required + username="{user}" + password="{password}" + user_{user}="{password}"; + + org.apache.kafka.common.security.scram.ScramLoginModule required + username="{user}" + password="{password}"; +}}; + +Client {{ +}}; +""" + + class KafkaConnection(object): """Connection to a Kafka cluster. Provides handy access to the shell scripts Kafka is bundled with. """ - def __init__(self, bin_dir, brokers, zookeeper, brokers_ssl=None): + def __init__(self, bin_dir, brokers, zookeeper, brokers_ssl=None, brokers_sasl=None): """Create a connection to the cluster. :param bin_dir: Location of downloaded kafka bin :param brokers: Comma-separated list of brokers - :param zookeeper: Connection straing for ZK + :param zookeeper: Connection string for ZK :param brokers_ssl: Comma-separated list of hosts with ssl-ports + :param brokers_sasl: Comma-separated list of hosts with sasl-ports """ self._bin_dir = bin_dir self.brokers = brokers self.zookeeper = zookeeper self.brokers_ssl = brokers_ssl + self.brokers_sasl = brokers_sasl self.certs = CertManager(bin_dir) if brokers_ssl is not None else None def _run_topics_sh(self, args): @@ -108,6 +135,22 @@ def create_topic(self, topic_name, num_partitions, replication_factor): '--replication-factor', replication_factor]) time.sleep(2) + def _run_configs_sh(self, args): + """Run kafka-config.sh with the provided list of arguments.""" + binfile = os.path.join(self._bin_dir, 'bin/kafka-configs.sh') + cmd = [binfile, '--zookeeper', self.zookeeper] + args + cmd = [get_string(c) for c in cmd] # execv needs only strings + log.debug('running: %s', ' '.join(cmd)) + return subprocess.check_output(cmd) + + def create_scram_user(self, user, password): + self._run_configs_sh([ + '--alter', + '--entity-type', 'users', + '--entity-name', user, + '--add-config', 'SCRAM-SHA-256=[password={pw}],SCRAM-SHA-512=[password={pw}]'.format(pw=password) + ]) + def delete_topic(self, topic_name): self._run_topics_sh(['--delete', '--topic', topic_name]) @@ -159,12 +202,21 @@ def __init__(self, self.zookeeper = None self.brokers = None self.brokers_ssl = None + self.brokers_sasl = None + self.sasl_enabled = self._kafka_version > (0, 10, 2) # SASL Scram only supported since 0.10.2 self.certs = self._gen_ssl_certs() # TODO: Need a better name so multiple can run at once. # other ManagedInstances use things like 'name-port' ManagedInstance.__init__(self, name, use_gevent=use_gevent) self.connection = KafkaConnection( bin_dir, self.brokers, self.zookeeper, self.brokers_ssl) + if self.sasl_enabled: + log.info("Creating scram user {}".format(SASL_USER)) + self.connection.create_scram_user(SASL_USER, SASL_PASSWORD) + log.info("Waiting for Cluster to fetch the userdata.") + # We could wait for the line "Processing override for entityPath: users/alice" but that's not completely + # reliable either + time.sleep(5) def _init_dirs(self): """Set up directories in the temp folder.""" @@ -242,6 +294,13 @@ def _add_ssl_broker(self, ssl_broker_port): brokers_ssl.append(new_broker_ssl) self.brokers_ssl = ",".join(brokers_ssl) + def _add_sasl_broker(self, sasl_broker_port): + if sasl_broker_port: + new_broker_sasl = "localhost:{}".format(sasl_broker_port) + brokers_sasl = self.brokers_sasl.split(",") if self.brokers_sasl else [] + brokers_sasl.append(new_broker_sasl) + self.brokers_sasl = ",".join(brokers_sasl) + def _gen_ssl_certs(self): """Attempt generating ssl certificates for testing @@ -262,10 +321,10 @@ def _start_process(self): zk_port = self._start_zookeeper() self.zookeeper = 'localhost:{port}'.format(port=zk_port) - broker_ports, broker_ssl_ports = self._start_brokers() + broker_ports, broker_ssl_ports, broker_sasl_ports = self._start_brokers() # Process is started when the port isn't free anymore - all_ports = [zk_port] + broker_ports + all_ports = [zk_port] + broker_ports + broker_ssl_ports + broker_sasl_ports for i in range(10): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') @@ -291,15 +350,17 @@ def _start_log_watcher(self): watch_thread.daemon = True watch_thread.start() - def _start_broker_proc(self, port, ssl_port=None): + def _start_broker_proc(self, port, ssl_port=None, sasl_port=None): """Start a broker proc and maintain handlers Returns a proc handler for the new broker. """ # make port config for new broker - listeners = ['PLAINTEXT://localhost:' + str(port)] + listeners = ['PLAINTEXT://localhost:{}'.format(port)] if ssl_port is not None: - listeners.append('SSL://localhost:'+str(ssl_port)) + listeners.append('SSL://localhost:{}'.format(ssl_port)) + if sasl_port is not None: + listeners.append('SASL_PLAINTEXT://localhost:{}'.format(sasl_port)) self._brokers_started += 1 i = self._brokers_started @@ -307,7 +368,7 @@ def _start_broker_proc(self, port, ssl_port=None): # write conf file for the new broker conf = os.path.join(self._conf_dir, 'kafka_{instance}.properties'.format(instance=i)) - + jaas_conf = os.path.join(self._conf_dir, 'kafka_{instance}_jaas.conf'.format(instance=i)) with open(conf, 'w') as f: f.write(_kafka_properties.format( broker_id=i, @@ -322,7 +383,10 @@ def _start_broker_proc(self, port, ssl_port=None): truststore_path=self.certs.truststore, store_pass=self.certs.broker_pass) ) - + if sasl_port: + f.write(_kafka_sasl_properties) + with open(jaas_conf, 'w') as o: + o.write(_kafka_server_jaas_config.format(user=SASL_USER, password=SASL_PASSWORD)) # start process and append to self._broker_procs binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh') logfile = os.path.join(self._log_dir, 'kafka_{instance}.log'.format(instance=i)) @@ -330,13 +394,15 @@ def _start_broker_proc(self, port, ssl_port=None): args=[binfile, conf], stderr=utils.STDOUT, stdout=open(logfile, 'w'), - use_gevent=self.use_gevent + use_gevent=self.use_gevent, + env={} if sasl_port is None else {'KAFKA_OPTS': '-Djava.security.auth.login.config={}'.format(jaas_conf)} )) self._broker_procs.append(new_proc) # add localhost:port to internal list of (ssl)brokers self._add_broker(port) self._add_ssl_broker(ssl_port) + self._add_sasl_broker(sasl_port) return new_proc @@ -345,7 +411,9 @@ def _start_brokers(self): ports = self._port_generator(9092) used_ports = [] used_ssl_ports = [] + used_sasl_ports = [] ssl_port = None + sasl_port = None for i in range(self._num_instances): port = next(ports) used_ports.append(port) @@ -354,9 +422,12 @@ def _start_brokers(self): if self.certs is not None: ssl_port = next(ports) used_ssl_ports.append(ssl_port) # to return at end - self._start_broker_proc(port, ssl_port) + if self.sasl_enabled: + sasl_port = next(ports) + used_sasl_ports.append(sasl_port) + self._start_broker_proc(port, ssl_port, sasl_port) - return used_ports, used_ssl_ports + return used_ports, used_ssl_ports, used_sasl_ports def _start_zookeeper(self): port = next(self._port_generator(2181)) @@ -578,6 +649,8 @@ def _catch_sigint(signum, frame): f.write('BROKERS={}\n'.format(cluster.brokers)) if cluster.brokers_ssl: f.write('BROKERS_SSL={}\n'.format(cluster.brokers_ssl)) + if cluster.brokers_sasl: + f.write('BROKERS_SASL={}\n'.format(cluster.brokers_sasl)) f.write('ZOOKEEPER={}\n'.format(cluster.zookeeper)) while True: From 5889a99efeb4cad6a2b802741f391c7bbdc5b1d8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 14:04:59 +0200 Subject: [PATCH 05/46] add tests for plain and scram sasl authentication --- tests/pykafka/rdkafka/test_sasl.py | 12 +++++++ tests/pykafka/test_sasl.py | 52 ++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 tests/pykafka/rdkafka/test_sasl.py create mode 100644 tests/pykafka/test_sasl.py diff --git a/tests/pykafka/rdkafka/test_sasl.py b/tests/pykafka/rdkafka/test_sasl.py new file mode 100644 index 000000000..df9ba0582 --- /dev/null +++ b/tests/pykafka/rdkafka/test_sasl.py @@ -0,0 +1,12 @@ +import platform + +import pytest + +from tests.pykafka import test_sasl + + +@pytest.mark.skipif(platform.python_implementation() == "PyPy", + reason="We pass PyObject pointers as msg_opaques for " + "delivery callbacks, which is unsafe on PyPy.") +class TestRdKafkaSasl(test_sasl.SaslIntegrationTests): + USE_RDKAFKA = True diff --git a/tests/pykafka/test_sasl.py b/tests/pykafka/test_sasl.py new file mode 100644 index 000000000..f6e754690 --- /dev/null +++ b/tests/pykafka/test_sasl.py @@ -0,0 +1,52 @@ +import os +import unittest +from uuid import uuid4 + +import pytest + +from pykafka import KafkaClient, PlainAuthenticator, ScramAuthenticator +from pykafka.test.utils import get_cluster, stop_cluster +from pykafka.test.kafka_instance import SASL_USER, SASL_PASSWORD + + +kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') + + +class SaslIntegrationTests(unittest.TestCase): + USE_RDKAFKA = False + + @classmethod + def setUpClass(cls): + cls.kafka = get_cluster() + if cls.kafka.brokers_sssl is None: + pytest.skip("Test-cluster doesn't advertise sasl ports.") + + @classmethod + def tearDownClass(cls): + stop_cluster(cls.kafka) + + @pytest.mark.parametrize('mechanism', ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) + def test_roundtrip(self, mechanism): + """Test producing then consuming + + This is mostly important to test the pykafka.rdkafka classes, which + should be passed SASL settings during producer/consumer init. + """ + if mechanism.startswith('SCRAM'): + authenticator = ScramAuthenticator(mechanism, username=SASL_USER, password=SASL_PASSWORD) + else: + authenticator = PlainAuthenticator(username=SASL_USER, password=SASL_PASSWORD) + + client = KafkaClient(self.kafka.brokers_sasl, sasl_authenticator=authenticator, + broker_version=kafka_version) + + topic_name = uuid4().hex.encode() + payload = uuid4().hex.encode() + topic = client.topics[topic_name] + + producer = topic.get_producer(use_rdkafka=self.USE_RDKAFKA, sync=True) + producer.produce(payload) + + consumer = topic.get_simple_consumer(use_rdkafka=self.USE_RDKAFKA, + consumer_timeout_ms=5000) + self.assertEqual(consumer.consume().value, payload) From 5199db2feaef0b0f22d775faa87ebe5b38998370 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 15:36:48 +0200 Subject: [PATCH 06/46] refactor test code --- tests/conftest.py | 51 ++++++++++++++++++++++++++ tests/pykafka/rdkafka/test_sasl.py | 22 +++++++----- tests/pykafka/test_sasl.py | 58 ++++++++---------------------- 3 files changed, 79 insertions(+), 52 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5c136b325..d0742dffe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,2 +1,53 @@ import logging +import os +import pytest +from pykafka.test.utils import get_cluster, stop_cluster +from pykafka import PlainAuthenticator, ScramAuthenticator +from pykafka.test.kafka_instance import SASL_USER, SASL_PASSWORD + logging.basicConfig(level=logging.DEBUG) + +KAFKA_VERSION = tuple(int(v) for v in os.environ.get('KAFKA_VERSION', '0.8.0').split('.')) + + +@pytest.fixture +def kafka_version(): + return KAFKA_VERSION + + +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def authenticator(request): + sasl_mechanism = request.param + if sasl_mechanism.startswith('SCRAM'): + return ScramAuthenticator(sasl_mechanism, user=SASL_USER, password=SASL_PASSWORD) + else: + return PlainAuthenticator(user=SASL_USER, password=SASL_PASSWORD) + + +@pytest.fixture(scope='session') +def kafka(): + kafka = get_cluster() + yield kafka + stop_cluster(kafka) + + +@pytest.fixture +def sasl_kafka(kafka): + if not kafka.brokers_sasl: + pytest.skip("Cluster has no SASL endpoint.") + else: + yield kafka \ No newline at end of file diff --git a/tests/pykafka/rdkafka/test_sasl.py b/tests/pykafka/rdkafka/test_sasl.py index df9ba0582..7bd290e55 100644 --- a/tests/pykafka/rdkafka/test_sasl.py +++ b/tests/pykafka/rdkafka/test_sasl.py @@ -1,12 +1,18 @@ -import platform +from uuid import uuid4 -import pytest +from pykafka import KafkaClient -from tests.pykafka import test_sasl +def test_sasl_roundtrip_rdkafka(sasl_kafka, authenticator, kafka_version): + client = KafkaClient(sasl_kafka.brokers_sasl, sasl_authenticator=authenticator, + broker_version='.'.join(str(v) for v in kafka_version)) -@pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="We pass PyObject pointers as msg_opaques for " - "delivery callbacks, which is unsafe on PyPy.") -class TestRdKafkaSasl(test_sasl.SaslIntegrationTests): - USE_RDKAFKA = True + topic_name = uuid4().hex.encode() + payload = uuid4().hex.encode() + topic = client.topics[topic_name] + + producer = topic.get_producer(use_rdkafka=True, sync=True) + producer.produce(payload) + + consumer = topic.get_simple_consumer(use_rdkafka=True, consumer_timeout_ms=5000) + assert consumer.consume().value == payload \ No newline at end of file diff --git a/tests/pykafka/test_sasl.py b/tests/pykafka/test_sasl.py index f6e754690..c89388110 100644 --- a/tests/pykafka/test_sasl.py +++ b/tests/pykafka/test_sasl.py @@ -1,52 +1,22 @@ -import os -import unittest from uuid import uuid4 -import pytest +from pykafka import KafkaClient -from pykafka import KafkaClient, PlainAuthenticator, ScramAuthenticator -from pykafka.test.utils import get_cluster, stop_cluster -from pykafka.test.kafka_instance import SASL_USER, SASL_PASSWORD +def test_sasl_roundtrip(sasl_kafka, authenticator, kafka_version): + """Test producing then consuming -kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') + This is mostly important to test the pykafka.rdkafka classes, which + should be passed SASL settings during producer/consumer init. + """ + client = KafkaClient(sasl_kafka.brokers_sasl, sasl_authenticator=authenticator, broker_version=kafka_version) + topic_name = uuid4().hex.encode() + payload = uuid4().hex.encode() + topic = client.topics[topic_name] -class SaslIntegrationTests(unittest.TestCase): - USE_RDKAFKA = False + producer = topic.get_producer(use_rdkafka=False, sync=True) + producer.produce(payload) - @classmethod - def setUpClass(cls): - cls.kafka = get_cluster() - if cls.kafka.brokers_sssl is None: - pytest.skip("Test-cluster doesn't advertise sasl ports.") - - @classmethod - def tearDownClass(cls): - stop_cluster(cls.kafka) - - @pytest.mark.parametrize('mechanism', ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) - def test_roundtrip(self, mechanism): - """Test producing then consuming - - This is mostly important to test the pykafka.rdkafka classes, which - should be passed SASL settings during producer/consumer init. - """ - if mechanism.startswith('SCRAM'): - authenticator = ScramAuthenticator(mechanism, username=SASL_USER, password=SASL_PASSWORD) - else: - authenticator = PlainAuthenticator(username=SASL_USER, password=SASL_PASSWORD) - - client = KafkaClient(self.kafka.brokers_sasl, sasl_authenticator=authenticator, - broker_version=kafka_version) - - topic_name = uuid4().hex.encode() - payload = uuid4().hex.encode() - topic = client.topics[topic_name] - - producer = topic.get_producer(use_rdkafka=self.USE_RDKAFKA, sync=True) - producer.produce(payload) - - consumer = topic.get_simple_consumer(use_rdkafka=self.USE_RDKAFKA, - consumer_timeout_ms=5000) - self.assertEqual(consumer.consume().value, payload) + consumer = topic.get_simple_consumer(use_rdkafka=False, consumer_timeout_ms=5000) + assert consumer.consume().value == payload \ No newline at end of file From 88a5bfc67f05c0bdbba86b65538670e05f40741a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 4 Oct 2019 16:58:06 +0200 Subject: [PATCH 07/46] add sasl handshake to protocol --- pykafka/protocol/__init__.py | 20 +++++---- pykafka/protocol/sasl.py | 79 ++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 9 deletions(-) create mode 100644 pykafka/protocol/sasl.py diff --git a/pykafka/protocol/__init__.py b/pykafka/protocol/__init__.py index f6bd62d29..90ea9f1c5 100644 --- a/pykafka/protocol/__init__.py +++ b/pykafka/protocol/__init__.py @@ -26,6 +26,7 @@ OffsetFetchRequestV1, OffsetFetchResponseV1, OffsetFetchRequestV2, OffsetFetchResponseV2) from .produce import ProduceRequest, ProduceResponse, ProducePartitionResponse +from .sasl import SaslHandshakeRequest, SaslHandshakeRequestV1, SaslHandshakeResponse, SaslHandshakeResponseV1 """ Author: Keith Bourgoin, Emmett Butler @@ -44,18 +45,18 @@ Each message is encoded as either a Request or Response: RequestOrResponse => Size (RequestMessage | ResponseMessage) -  Size => int32 + Size => int32 RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage -  ApiKey => int16 -  ApiVersion => int16 -  CorrelationId => int32 -  ClientId => string -  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest + ApiKey => int16 + ApiVersion => int16 + CorrelationId => int32 + ClientId => string + RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest Response => CorrelationId ResponseMessage -  CorrelationId => int32 -  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse + CorrelationId => int32 + ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse """ __all__ = ["MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse", "PartitionFetchRequest", "FetchRequest", "FetchPartitionResponse", @@ -79,4 +80,5 @@ "OffsetFetchResponseV1", "OffsetFetchRequestV2", "OffsetFetchResponseV2", "MetadataRequestV2", "MetadataResponseV2", "MetadataRequestV3", "MetadataResponseV3", "MetadataRequestV4", "MetadataResponseV4", - "MetadataRequestV5", "MetadataResponseV5"] + "MetadataRequestV5", "MetadataResponseV5", "SaslHandshakeRequest", + "SaslHandshakeRequestV1", "SaslHandshakeResponse", "SaslHandshakeResponseV1"] diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py new file mode 100644 index 000000000..39f965bf6 --- /dev/null +++ b/pykafka/protocol/sasl.py @@ -0,0 +1,79 @@ +import struct + +from pykafka.utils import struct_helpers +from .base import Request, Response + + +class SaslHandshakeRequest(Request): + """A SASL handshake request. + Specification:: + + SaslHandshake Request (Version: 0) => mechanism + mechanism => STRING + """ + API_KEY = 17 + + @classmethod + def get_versions(cls): + return {0: SaslHandshakeRequest, 1: SaslHandshakeRequestV1} + + def __init__(self, mechanism): + self.mechanism = mechanism.encode() + + def __len__(self): + return self.HEADER_LEN + 2 + len(self.mechanism) + + def get_bytes(self): + """Create new sasl handshake request""" + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + fmt = '!h%ds' % len(self.mechanism) + struct.pack_into(fmt, output, offset, len(self.mechanism), self.mechanism) + return output + + +class SaslHandshakeRequestV1(SaslHandshakeRequest): + """A SASL handshake request. + Specification:: + + SaslHandshake Request (Version: 1) => mechanism + mechanism => STRING + """ + + +class SaslHandshakeResponse(Response): + """A SASL handshake response. + Specification:: + + SaslHandshake Response (Version: 0) => error_code [mechanisms] + error_code => INT16 + mechanisms => STRING + """ + API_KEY = 17 + + @classmethod + def get_versions(cls): + return {0: SaslHandshakeRequest, 1: SaslHandshakeRequestV1} + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h [S]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.mechanisms = response[1] + + +class SaslHandshakeResponseV1(SaslHandshakeResponse): + """A SASL handshake response. + Specification:: + + SaslHandshake Response (Version: 1) => error_code [mechanisms] + error_code => INT16 + mechanisms => STRING + """ \ No newline at end of file From e74099cbe6e069c68e549eca4300e6829e168d9f Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:08:01 +0200 Subject: [PATCH 08/46] fix bug where queued.max.messages.kbytes is larger than its max value --- pykafka/rdkafka/simple_consumer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index d3444463d..dd6006145 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -248,9 +248,10 @@ def _mk_rdkafka_config_lists(self): # queued.max.messages.kbytes so for now we infer the implied # maximum (which, with default settings, is ~2GB per partition): "queued.min.messages": self._queued_max_messages, - "queued.max.messages.kbytes": str( - self._queued_max_messages - * self._fetch_message_max_bytes // 1024), + "queued.max.messages.kbytes": min( + 2097151, + self._queued_max_messages * self._fetch_message_max_bytes // 1024 + ), "fetch.wait.max.ms": self._fetch_wait_max_ms, "fetch.message.max.bytes": self._fetch_message_max_bytes, From 97b277bde2353541a431252aa6bc166168d8f1c6 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:09:38 +0200 Subject: [PATCH 09/46] Add more output to test kafka startup --- pykafka/test/kafka_instance.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index ec91e058b..c71c2be1f 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -641,6 +641,9 @@ def _catch_sigint(signum, frame): bin_dir=args.download_dir) print('Cluster started.') print('Brokers: {brokers}'.format(brokers=cluster.brokers)) + print('SSL Brokers: {brokers}'.format(brokers=cluster.brokers_ssl)) + print('SASL Brokers: {brokers}'.format(brokers=cluster.brokers_sasl)) + print('Log dir: {log_dirs}'.format(log_dirs=cluster._log_dir)) print('Zookeeper: {zk}'.format(zk=cluster.zookeeper)) print('Waiting for SIGINT to exit.') From 7bec7be9944a6ffaed561a36b9c75c662ea59143 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:10:27 +0200 Subject: [PATCH 10/46] make sure get_cluster picks up new sasl endpoints --- pykafka/test/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pykafka/test/utils.py b/pykafka/test/utils.py index 5f553edd5..eca6af474 100644 --- a/pykafka/test/utils.py +++ b/pykafka/test/utils.py @@ -18,7 +18,8 @@ def get_cluster(): return KafkaConnection(os.environ['KAFKA_BIN'], os.environ['BROKERS'], os.environ['ZOOKEEPER'], - os.environ.get('BROKERS_SSL', None)) + os.environ.get('BROKERS_SSL', None), + os.environ.get('BROKERS_SASL', None)) elif os.environ.get('HOSTS_FILE', None): # Broker is already running. Use that. hosts = {} @@ -29,7 +30,8 @@ def get_cluster(): return KafkaConnection(os.environ['KAFKA_BIN'], hosts['BROKERS'], hosts['ZOOKEEPER'], - hosts.get('BROKERS_SSL', None)) + hosts.get('BROKERS_SSL', None), + hosts.get('BROKERS_SASL', None)) else: return KafkaInstance(num_instances=3) From 1bce4a4f46fea00c7391ecc44de18c850150adfb Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:11:37 +0200 Subject: [PATCH 11/46] fix kafka_version in sasl test --- tests/pykafka/test_sasl.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/pykafka/test_sasl.py b/tests/pykafka/test_sasl.py index c89388110..f0266cd46 100644 --- a/tests/pykafka/test_sasl.py +++ b/tests/pykafka/test_sasl.py @@ -9,7 +9,11 @@ def test_sasl_roundtrip(sasl_kafka, authenticator, kafka_version): This is mostly important to test the pykafka.rdkafka classes, which should be passed SASL settings during producer/consumer init. """ - client = KafkaClient(sasl_kafka.brokers_sasl, sasl_authenticator=authenticator, broker_version=kafka_version) + client = KafkaClient( + sasl_kafka.brokers_sasl, + sasl_authenticator=authenticator, + broker_version=".".join(str(v) for v in kafka_version), + ) topic_name = uuid4().hex.encode() payload = uuid4().hex.encode() @@ -19,4 +23,4 @@ def test_sasl_roundtrip(sasl_kafka, authenticator, kafka_version): producer.produce(payload) consumer = topic.get_simple_consumer(use_rdkafka=False, consumer_timeout_ms=5000) - assert consumer.consume().value == payload \ No newline at end of file + assert consumer.consume().value == payload From 19c6e5be28c36f6758d2781dc9c6b3a96fd83c24 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:13:47 +0200 Subject: [PATCH 12/46] add new sasl authentication mechanism --- pykafka/__init__.py | 3 + pykafka/broker.py | 27 ++++- pykafka/client.py | 4 + pykafka/cluster.py | 6 + pykafka/connection.py | 21 +++- pykafka/rdkafka/producer.py | 5 + pykafka/rdkafka/simple_consumer.py | 5 + pykafka/sasl_authenticators.py | 184 +++++++++++++++++++++++++++++ 8 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 pykafka/sasl_authenticators.py diff --git a/pykafka/__init__.py b/pykafka/__init__.py index d9d0a595e..7f28c8543 100644 --- a/pykafka/__init__.py +++ b/pykafka/__init__.py @@ -11,6 +11,7 @@ from .balancedconsumer import BalancedConsumer from .managedbalancedconsumer import ManagedBalancedConsumer from .membershipprotocol import RangeProtocol, RoundRobinProtocol +from .sasl_authenticators import PlainAuthenticator, ScramAuthenticator __version__ = "2.8.1-dev.2" @@ -28,6 +29,8 @@ "ManagedBalancedConsumer", "RangeProtocol", "RoundRobinProtocol", + "PlainAuthenticator", + "ScramAuthenticator" ] logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/pykafka/broker.py b/pykafka/broker.py index bfcdfd15c..93733ff5c 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -62,6 +62,7 @@ def __init__(self, source_host='', source_port=0, ssl_config=None, + sasl_authenticator=None, broker_version="0.9.0", api_versions=None): """Create a Broker instance. @@ -92,6 +93,8 @@ def __init__(self, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -117,6 +120,7 @@ def __init__(self, self._req_handlers = {} self._broker_version = broker_version self._api_versions = api_versions + self._sasl_authenticator = sasl_authenticator try: self.connect() except SocketDisconnectedError: @@ -144,6 +148,7 @@ def from_metadata(cls, source_host='', source_port=0, ssl_config=None, + sasl_authenticator=None, broker_version="0.9.0", api_versions=None): """Create a Broker using BrokerMetadata @@ -169,6 +174,8 @@ def from_metadata(cls, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -184,6 +191,7 @@ def from_metadata(cls, source_host=source_host, source_port=source_port, ssl_config=ssl_config, + sasl_authenticator=sasl_authenticator, broker_version=broker_version, api_versions=api_versions) @@ -203,6 +211,22 @@ def offsets_channel_connected(self): return self._offsets_channel_connection.connected return False + @property + def authenticated(self): + """Returns True if this object's main connection to the Kafka broker + is authenticated + """ + return self._connection.authenticated + + @property + def offsets_channel_authenticated(self): + """Returns True if this object's offsets channel connection to the + Kafka broker is authenticated + """ + if self._offsets_channel_connection: + return self._offsets_channel_connection.authenticated + return False + @property def id(self): """The broker's ID within the Kafka cluster""" @@ -246,7 +270,8 @@ def connect(self, attempts=3): buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port, - ssl_config=self._ssl_config) + ssl_config=self._ssl_config, + sasl_authenticator=self._sasl_authenticator) self._connection.connect(self._socket_timeout_ms, attempts=attempts) self._req_handler = RequestHandler(self._handler, self._connection) self._req_handler.start() diff --git a/pykafka/client.py b/pykafka/client.py index 193e00f79..5b66d5742 100644 --- a/pykafka/client.py +++ b/pykafka/client.py @@ -88,6 +88,7 @@ def __init__(self, exclude_internal_topics=True, source_address='', ssl_config=None, + sasl_authenticator=None, broker_version='0.9.0'): """Create a connection to a Kafka cluster. @@ -118,6 +119,8 @@ def __init__(self, :type source_address: str `'host:port'` :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -139,6 +142,7 @@ def __init__(self, source_address=self._source_address, zookeeper_hosts=zookeeper_hosts, ssl_config=ssl_config, + sasl_authenticator=sasl_authenticator, broker_version=broker_version) self.brokers = self.cluster.brokers self.topics = self.cluster.topics diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 28c3fdfc2..a43293c63 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -175,6 +175,7 @@ def __init__(self, source_address='', zookeeper_hosts=None, ssl_config=None, + sasl_authenticator=None, broker_version='0.9.0'): """Create a new Cluster instance. @@ -199,6 +200,8 @@ def __init__(self, :type source_address: str `'host:port'` :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -214,6 +217,7 @@ def __init__(self, self._source_host = self._source_address.split(':')[0] self._source_port = 0 self._ssl_config = ssl_config + self.sasl_authenticator = sasl_authenticator self._zookeeper_connect = zookeeper_hosts self._max_connection_retries = 3 self._max_connection_retries_offset_mgr = 8 @@ -284,6 +288,7 @@ def _request_random_broker(self, broker_connects, req_fn): source_host=self._source_host, source_port=self._source_port, ssl_config=self._ssl_config, + sasl_authenticator=self.sasl_authenticator, broker_version=self._broker_version, api_versions=self._api_versions) response = req_fn(broker) @@ -402,6 +407,7 @@ def _update_brokers(self, broker_metadata, controller_id): source_host=self._source_host, source_port=self._source_port, ssl_config=self._ssl_config, + sasl_authenticator=self.sasl_authenticator, broker_version=self._broker_version, api_versions=self._api_versions) elif not self._brokers[id_].connected: diff --git a/pykafka/connection.py b/pykafka/connection.py index 203fae024..5289c13d5 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -117,7 +117,8 @@ def __init__(self, buffer_size=1024 * 1024, source_host='', source_port=0, - ssl_config=None): + ssl_config=None, + sasl_authenticator=None): """Initialize a socket connection to Kafka. :param host: The host to which to connect @@ -139,6 +140,8 @@ def __init__(self, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` """ self._buff = bytearray(buffer_size) self.host = host @@ -149,6 +152,8 @@ def __init__(self, self.source_port = source_port self._wrap_socket = ( ssl_config.wrap_socket if ssl_config else lambda x: x) + self._sasl_authenticator = sasl_authenticator + self.authenticated = sasl_authenticator is None def __del__(self): """Close this connection when the object is deleted.""" @@ -161,6 +166,7 @@ def connected(self): def connect(self, timeout, attempts=1): """Connect to the broker, retrying if specified.""" + self.authenticated = False log.debug("Connecting to %s:%s", self.host, self.port) for attempt in range(0, attempts): try: @@ -172,6 +178,9 @@ def connect(self, timeout, attempts=1): ) ) log.debug("Successfully connected to %s:%s", self.host, self.port) + if self._sasl_authenticator is not None: + self._sasl_authenticator.authenticate(self) + self.authenticated = True return except (self._handler.SockErr, self._handler.GaiError) as err: log.info("Attempt %s: failed to connect to %s:%s", attempt, self.host, self.port) @@ -193,6 +202,7 @@ def disconnect(self): pass finally: self._socket = None + self.authenticated = False def reconnect(self): """Disconnect from the broker, then reconnect""" @@ -203,6 +213,7 @@ def request(self, request): """Send a request over the socket connection""" bytes_ = request.get_bytes() if not self._socket: + self.authenticated = False raise SocketDisconnectedError("".format(self.host, self.port)) try: self._socket.sendall(bytes_) @@ -211,7 +222,7 @@ def request(self, request): self.disconnect() raise SocketDisconnectedError("".format(self.host, self.port)) - def response(self): + def response_raw(self): """Wait for a response from the broker""" size = bytes() expected_len = 4 # Size => int32 @@ -231,5 +242,9 @@ def response(self): except SocketDisconnectedError: self.disconnect() raise SocketDisconnectedError("".format(self.host, self.port)) + return self._buff[:size] + + def response(self): # Drop CorrelationId => int32 - return buffer(self._buff[4:4 + size]) + return buffer(self.response_raw()[4:]) + diff --git a/pykafka/rdkafka/producer.py b/pykafka/rdkafka/producer.py index cdb9b1e27..86ff1025c 100644 --- a/pykafka/rdkafka/producer.py +++ b/pykafka/rdkafka/producer.py @@ -57,6 +57,7 @@ def __init__(self, self._rdk_producer = None self._poller_thread = None self._stop_poller_thread = cluster.handler.Event() + self._sasl_conf = {} if cluster.sasl_authenticator is None else cluster.sasl_authenticator.get_rd_kafka_opts() # super() must come last because it calls start() super(RdKafkaProducer, self).__init__(**callargs) @@ -191,6 +192,10 @@ def _mk_rdkafka_config_lists(self): # "partitioner" # dealt with in pykafka # "opaque" } + + # append configurations necessary for sasl authentication + conf.update(self._sasl_conf) + # librdkafka expects all config values as strings: conf = [(key, str(conf[key])) for key in conf] topic_conf = [(key, str(topic_conf[key])) for key in topic_conf] diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index dd6006145..d286c89aa 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -64,6 +64,7 @@ def __init__(self, self._stop_poller_thread = cluster.handler.Event() self._broker_version = cluster._broker_version self._fetch_error_backoff_ms = valid_int(fetch_error_backoff_ms) + self._sasl_conf = {} if cluster.sasl_authenticator is None else cluster.sasl_authenticator.get_rd_kafka_opts() # super() must come last for the case where auto_start=True super(RdKafkaSimpleConsumer, self).__init__(**callargs) @@ -286,6 +287,10 @@ def _mk_rdkafka_config_lists(self): ##"offset.store.sync.interval.ms" ##"offset.store.method" } + + # append configurations necessary for sasl authentication + conf.update(self._sasl_conf) + # librdkafka expects all config values as strings: conf = [(key, str(conf[key])) for key in conf] topic_conf = [(key, str(topic_conf[key])) for key in topic_conf] diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py new file mode 100644 index 000000000..47f2b7bc7 --- /dev/null +++ b/pykafka/sasl_authenticators.py @@ -0,0 +1,184 @@ +import base64 +import hashlib +import hmac +import logging +import struct +from uuid import uuid4 + +import six + +from pykafka.protocol import SaslHandshakeResponse +from .protocol import SaslHandshakeRequest + +log = logging.getLogger(__name__) + + +if six.PY2: + def xor_bytes(left, right): + return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) +else: + def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) + + +class FakeRequest: + def __init__(self, payload): + self.payload = payload + + def get_bytes(self): + return struct.pack("!i", len(self.payload)) + self.payload + + +class BaseAuthenticator: + def __init__(self, mechanism): + self.mechanism = mechanism + self.handshake_version = None + self.auth_version = None + self._broker_connection = None + + def get_rd_kafka_opts(self): + raise NotImplementedError() + + def authenticate(self, broker_connection): + self._broker_connection = broker_connection + if self.handshake_version is None: + self.fetch_api_versions() + log.debug("Authenticating to {}:{} using mechanism {}.".format( + self._broker_connection.host, + self._broker_connection.port, + self.mechanism) + ) + self.initialize_authentication() + self.exchange_tokens() + log.debug("Authentication successful.") + + def initialize_authentication(self): + self._broker_connection.request(SaslHandshakeRequest(self.mechanism)) + response = SaslHandshakeResponse(self._broker_connection.response()) + if response.error_code != 0: + # Todo: create correct exception here + msg = "Broker only supports sasl mechanisms {}, requested was {}" + raise RuntimeError(msg.format(",".join(response.mechanisms), self.mechanism)) + + def exchange_tokens(self): + raise NotImplementedError() + + def send_token(self, token): + self._broker_connection.request(FakeRequest(token)) + + def receive_token(self): + return self._broker_connection.response_raw() + + def fetch_api_versions(self): + self.handshake_version = 0 + + # try this later + # self._broker_connection.request(ApiVersionsRequest()) + # response = ApiVersionsResponse(self._broker_connection.response()) + # self.handshake_version = response.api_versions[17] + # self.auth_version = response.api_versions.get(36, None) + + +class ScramAuthenticator(BaseAuthenticator): + MECHANISMS = {"SCRAM-SHA-256": ("sha256", hashlib.sha256), "SCRAM-SHA-512": ("sha512", hashlib.sha512)} + + def __init__(self, mechanism, user, password): + super(ScramAuthenticator, self).__init__(mechanism) + self.nonce = None + self.auth_message = None + self.salted_password = None + self.user = user + self.password = password.encode() + self.hashname, self.hashfunc = self.MECHANISMS[mechanism] + self.mechanism = mechanism + self.stored_key = None + self.client_key = None + self.client_signature = None + self.client_proof = None + self.server_key = None + self.server_signature = None + + def first_message(self): + self.nonce = str(uuid4()).replace("-", "") + client_first_bare = "n={},r={}".format(self.user, self.nonce) + self.auth_message = client_first_bare + return "n,," + client_first_bare + + def process_server_first_message(self, server_first_message): + self.auth_message += "," + server_first_message + params = dict(pair.split("=", 1) for pair in server_first_message.split(",")) + server_nonce = params["r"] + if not server_nonce.startswith(self.nonce): + # Todo: create correct exception here + raise RuntimeError("Server nonce, did not start with client nonce!") + self.nonce = server_nonce + self.auth_message += ",c=biws,r=" + self.nonce + + salt = base64.b64decode(params["s"].encode()) + iterations = int(params["i"]) + self.create_salted_password(salt, iterations) + + self.client_key = self.hmac(self.salted_password, b"Client Key") + self.stored_key = self.hashfunc(self.client_key).digest() + self.client_signature = self.hmac(self.stored_key, self.auth_message.encode()) + self.client_proof = xor_bytes(self.client_key, self.client_signature) + self.server_key = self.hmac(self.salted_password, b"Server Key") + self.server_signature = self.hmac(self.server_key, self.auth_message.encode()) + + def hmac(self, key, msg): + return hmac.new(key, msg, digestmod=self.hashfunc).digest() + + def create_salted_password(self, salt, iterations): + self.salted_password = hashlib.pbkdf2_hmac(self.hashname, self.password, salt, iterations) + + def final_message(self): + return "c=biws,r={},p={}".format(self.nonce, base64.b64encode(self.client_proof).decode()) + + def process_server_final_message(self, server_final_message): + params = dict(pair.split("=", 1) for pair in server_final_message.split(",")) + if self.server_signature != base64.b64decode(params["v"].encode()): + # Todo: create correct exception here + raise RuntimeError("Server sent wrong signature!") + + def get_rd_kafka_opts(self): + return { + "sasl.mechanisms": self.mechanism, + "sasl.username": self.user, + "sasl.password": self.password.decode(), + 'security.protocol': 'SASL_PLAINTEXT', # TODO determine this properly + } + + def exchange_tokens(self): + client_first = self.first_message() + self.send_token(client_first.encode()) + + server_first = self.receive_token().decode() + self.process_server_first_message(server_first) + + client_final = self.final_message() + self.send_token(client_final.encode()) + + server_final = self.receive_token().decode() + self.process_server_final_message(server_final) + + +class PlainAuthenticator(BaseAuthenticator): + def __init__(self, user, password): + super(PlainAuthenticator, self).__init__("PLAIN") + self.user = user + self.password = password + + def get_rd_kafka_opts(self): + return { + "sasl.mechanisms": self.mechanism, + "sasl.username": self.user, + "sasl.password": self.password, + 'security.protocol': 'SASL_PLAINTEXT', # TODO determine this properly + } + + def exchange_tokens(self): + self.send_token("\0".join([self.user, self.user, self.password]).encode()) + response = self.receive_token() + if response != b"": + # Todo: create correct exception here + raise RuntimeError("Authentication Failed!") From 142c52f8897ad350071bb48c3038c805361c93b2 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:37:04 +0200 Subject: [PATCH 13/46] create and use new exceptions --- pykafka/exceptions.py | 31 ++++++++++++++++++++++++++++++- pykafka/sasl_authenticators.py | 17 ++++++++--------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index 70bc5dc9f..e5bf911a9 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -91,6 +91,11 @@ def __init__(self, partition, *args, **kwargs): self.partition = partition +class AuthenticationException(KafkaException): + """Indicates that something went wrong during Authentication.""" + pass + + """ Protocol Client Exceptions https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes @@ -254,6 +259,26 @@ class GroupAuthorizationFailed(ProtocolClientError): ERROR_CODE = 30 +class ClusteAuthorizationFailed(ProtocolClientError): + """Cluster authorization failed.""" + ERROR_CODE = 31 + + +class UnsupportedSaslMechanism(ProtocolClientError, AuthenticationException): + """The broker does not support the requested SASL mechanism.""" + ERROR_CODE = 33 + + +class IllegalSaslState(ProtocolClientError, AuthenticationException): + """Request is not valid given the current SASL state.""" + ERROR_CODE = 34 + + +class SaslAuthenticationFailed(ProtocolClientError, AuthenticationException): + """SASL authentication failed.""" + ERROR_CODE = 58 + + ERROR_CODES = dict( (exc.ERROR_CODE, exc) for exc in (UnknownError, @@ -276,7 +301,11 @@ class GroupAuthorizationFailed(ProtocolClientError): InvalidSessionTimeout, RebalanceInProgress, TopicAuthorizationFailed, - GroupAuthorizationFailed) + GroupAuthorizationFailed, + ClusteAuthorizationFailed, + UnsupportedSaslMechanism, + IllegalSaslState, + SaslAuthenticationFailed) ) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 47f2b7bc7..37e9f1495 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -9,6 +9,7 @@ from pykafka.protocol import SaslHandshakeResponse from .protocol import SaslHandshakeRequest +from .exceptions import AuthenticationException, UnsupportedSaslMechanism, ERROR_CODES log = logging.getLogger(__name__) @@ -56,9 +57,10 @@ def initialize_authentication(self): self._broker_connection.request(SaslHandshakeRequest(self.mechanism)) response = SaslHandshakeResponse(self._broker_connection.response()) if response.error_code != 0: - # Todo: create correct exception here - msg = "Broker only supports sasl mechanisms {}, requested was {}" - raise RuntimeError(msg.format(",".join(response.mechanisms), self.mechanism)) + if response.error_code == UnsupportedSaslMechanism.ERROR_CODE: + msg = "Broker only supports sasl mechanisms {}, requested was {}" + raise UnsupportedSaslMechanism(msg.format(",".join(response.mechanisms), self.mechanism)) + raise ERROR_CODES[response.error_code]("Authentication Handshake failed") def exchange_tokens(self): raise NotImplementedError() @@ -109,8 +111,7 @@ def process_server_first_message(self, server_first_message): params = dict(pair.split("=", 1) for pair in server_first_message.split(",")) server_nonce = params["r"] if not server_nonce.startswith(self.nonce): - # Todo: create correct exception here - raise RuntimeError("Server nonce, did not start with client nonce!") + raise AuthenticationException("Server nonce, did not start with client nonce!") self.nonce = server_nonce self.auth_message += ",c=biws,r=" + self.nonce @@ -137,8 +138,7 @@ def final_message(self): def process_server_final_message(self, server_final_message): params = dict(pair.split("=", 1) for pair in server_final_message.split(",")) if self.server_signature != base64.b64decode(params["v"].encode()): - # Todo: create correct exception here - raise RuntimeError("Server sent wrong signature!") + raise AuthenticationException("Server sent wrong signature!") def get_rd_kafka_opts(self): return { @@ -180,5 +180,4 @@ def exchange_tokens(self): self.send_token("\0".join([self.user, self.user, self.password]).encode()) response = self.receive_token() if response != b"": - # Todo: create correct exception here - raise RuntimeError("Authentication Failed!") + raise AuthenticationException("Server sent unexpected response!") From ec7e9b3c8e1d315d654a0a29d8a7932dc0951b0b Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:42:42 +0200 Subject: [PATCH 14/46] add security_protocol parameter to Authenticators --- pykafka/sasl_authenticators.py | 25 +++++++++++++------------ tests/conftest.py | 20 ++++++++++++-------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 37e9f1495..60b4b5049 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -8,8 +8,8 @@ import six from pykafka.protocol import SaslHandshakeResponse +from .exceptions import AuthenticationException, ERROR_CODES, UnsupportedSaslMechanism from .protocol import SaslHandshakeRequest -from .exceptions import AuthenticationException, UnsupportedSaslMechanism, ERROR_CODES log = logging.getLogger(__name__) @@ -31,10 +31,11 @@ def get_bytes(self): class BaseAuthenticator: - def __init__(self, mechanism): + def __init__(self, mechanism, security_protocol=None): self.mechanism = mechanism self.handshake_version = None self.auth_version = None + self.security_protocol = security_protocol self._broker_connection = None def get_rd_kafka_opts(self): @@ -44,10 +45,10 @@ def authenticate(self, broker_connection): self._broker_connection = broker_connection if self.handshake_version is None: self.fetch_api_versions() - log.debug("Authenticating to {}:{} using mechanism {}.".format( - self._broker_connection.host, - self._broker_connection.port, - self.mechanism) + log.debug( + "Authenticating to {}:{} using mechanism {}.".format( + self._broker_connection.host, self._broker_connection.port, self.mechanism + ) ) self.initialize_authentication() self.exchange_tokens() @@ -84,8 +85,8 @@ def fetch_api_versions(self): class ScramAuthenticator(BaseAuthenticator): MECHANISMS = {"SCRAM-SHA-256": ("sha256", hashlib.sha256), "SCRAM-SHA-512": ("sha512", hashlib.sha512)} - def __init__(self, mechanism, user, password): - super(ScramAuthenticator, self).__init__(mechanism) + def __init__(self, mechanism, user, password, security_protocol=None): + super(ScramAuthenticator, self).__init__(mechanism, security_protocol) self.nonce = None self.auth_message = None self.salted_password = None @@ -145,7 +146,7 @@ def get_rd_kafka_opts(self): "sasl.mechanisms": self.mechanism, "sasl.username": self.user, "sasl.password": self.password.decode(), - 'security.protocol': 'SASL_PLAINTEXT', # TODO determine this properly + "security.protocol": self.security_protocol, } def exchange_tokens(self): @@ -163,8 +164,8 @@ def exchange_tokens(self): class PlainAuthenticator(BaseAuthenticator): - def __init__(self, user, password): - super(PlainAuthenticator, self).__init__("PLAIN") + def __init__(self, user, password, security_protocol=None): + super(PlainAuthenticator, self).__init__("PLAIN", security_protocol) self.user = user self.password = password @@ -173,7 +174,7 @@ def get_rd_kafka_opts(self): "sasl.mechanisms": self.mechanism, "sasl.username": self.user, "sasl.password": self.password, - 'security.protocol': 'SASL_PLAINTEXT', # TODO determine this properly + "security.protocol": self.security_protocol, } def exchange_tokens(self): diff --git a/tests/conftest.py b/tests/conftest.py index d0742dffe..815594883 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,13 +1,15 @@ import logging import os + import pytest -from pykafka.test.utils import get_cluster, stop_cluster + from pykafka import PlainAuthenticator, ScramAuthenticator -from pykafka.test.kafka_instance import SASL_USER, SASL_PASSWORD +from pykafka.test.kafka_instance import SASL_PASSWORD, SASL_USER +from pykafka.test.utils import get_cluster, stop_cluster logging.basicConfig(level=logging.DEBUG) -KAFKA_VERSION = tuple(int(v) for v in os.environ.get('KAFKA_VERSION', '0.8.0').split('.')) +KAFKA_VERSION = tuple(int(v) for v in os.environ.get("KAFKA_VERSION", "0.8.0").split(".")) @pytest.fixture @@ -32,13 +34,15 @@ def kafka_version(): ) def authenticator(request): sasl_mechanism = request.param - if sasl_mechanism.startswith('SCRAM'): - return ScramAuthenticator(sasl_mechanism, user=SASL_USER, password=SASL_PASSWORD) + if sasl_mechanism.startswith("SCRAM"): + return ScramAuthenticator( + sasl_mechanism, user=SASL_USER, password=SASL_PASSWORD, security_protocol="SASL_PLAINTEXT" + ) else: - return PlainAuthenticator(user=SASL_USER, password=SASL_PASSWORD) + return PlainAuthenticator(user=SASL_USER, password=SASL_PASSWORD, security_protocol="SASL_PLAINTEXT") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def kafka(): kafka = get_cluster() yield kafka @@ -50,4 +54,4 @@ def sasl_kafka(kafka): if not kafka.brokers_sasl: pytest.skip("Cluster has no SASL endpoint.") else: - yield kafka \ No newline at end of file + yield kafka From 203f0b3b30ca40fe6993958a25d5dee3a516eedc Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 08:54:27 +0200 Subject: [PATCH 15/46] add docstrings to sasl authenticators --- pykafka/sasl_authenticators.py | 51 ++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 60b4b5049..26e2c9cd2 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -31,7 +31,26 @@ def get_bytes(self): class BaseAuthenticator: + """ + Base class for authentication mechanisms. + Subclasses are supposed to implement: + 1. :meth:`BaseAuthenticator.get_rd_kafka_opts` which should return a dictionary + whose items will be appended to the config given to librdkafka consumers and producers. + 2. :meth:`BaseAuthenticator.exchange_tokens` which is supposed to use :meth:`BaseAuthenticator.send_token` + and :meth:`BaseAuthenticator.receive_token` to send and receive the byte strings necessary to authenticate + with the broker. + """ def __init__(self, mechanism, security_protocol=None): + """ + Base class for SASL authentication mechanisms. + + :param mechanism: The mechanism this authenticator is supposed to use. + :type mechanism: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ + self.mechanism = mechanism self.handshake_version = None self.auth_version = None @@ -83,9 +102,26 @@ def fetch_api_versions(self): class ScramAuthenticator(BaseAuthenticator): + """ + Authenticates with Kafka using the salted challenge response authentication mechanism. + """ + MECHANISMS = {"SCRAM-SHA-256": ("sha256", hashlib.sha256), "SCRAM-SHA-512": ("sha512", hashlib.sha512)} def __init__(self, mechanism, user, password, security_protocol=None): + """ + Create new ScramAuthenticator + + :param mechanism: The mechanism this authenticator is supposed to use. + :type mechanism: str, one of 'SCRAM-SHA-256' or 'SCRAM-SHA-512' + :param user: The user to authenticate as. + :type user: str + :param password: The user's password. + :type password: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ super(ScramAuthenticator, self).__init__(mechanism, security_protocol) self.nonce = None self.auth_message = None @@ -164,7 +200,22 @@ def exchange_tokens(self): class PlainAuthenticator(BaseAuthenticator): + """ + Authenticates with kafka using the Plain mechanism. I.e. sending user and password in plaintext. + """ + def __init__(self, user, password, security_protocol=None): + """ + Create new PlainAuthenticator. + + :param user: The user to authenticate as. + :type user: str + :param password: The user's password. + :type password: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ super(PlainAuthenticator, self).__init__("PLAIN", security_protocol) self.user = user self.password = password From 47b092721496864b7bfa2f618dca4be8b93cc7ff Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 09:17:52 +0200 Subject: [PATCH 16/46] add sasl config sections to docs --- README.rst | 13 +++++++++++-- doc/api/sasl_authenticators.rst | 5 +++++ 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 doc/api/sasl_authenticators.rst diff --git a/README.rst b/README.rst index 244055d57..9767e1edd 100644 --- a/README.rst +++ b/README.rst @@ -65,6 +65,13 @@ for further details): >>> client = KafkaClient(hosts="127.0.0.1:,...", ... ssl_config=config) +Or, for SASL authenticated connection, you might write (and also see i.e. ``PlainAuthenticator`` dos for further details): + +.. sourcecode:: python + + >>> from pykafka import KafkaClient, PlainAuthenticator + >>> authenticator = PlainAuthenticator(user='alice', password='alice-secret') + >>> client = KafkaClient(hosts="127.0.0.1:,...", sasl_authenticator=authenticator) If the cluster you've connected to has any topics defined on it, you can list them with: @@ -178,8 +185,10 @@ of the librdkafka shared objects. You can find this location with `locate librdk After that, all that's needed is that you pass an extra parameter ``use_rdkafka=True`` to ``topic.get_producer()``, -``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``. Note -that some configuration options may have different optimal values; it may be +``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``. +If you're using SASL authenticated connections, make sure to pass the ``security_protocol`` +parameter to your authenticator so librdkafka knows which endpoint to authenticate with. +Note that some configuration options may have different optimal values; it may be worthwhile to consult librdkafka's `configuration notes`_ for this. .. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1 diff --git a/doc/api/sasl_authenticators.rst b/doc/api/sasl_authenticators.rst new file mode 100644 index 000000000..3e676e2c1 --- /dev/null +++ b/doc/api/sasl_authenticators.rst @@ -0,0 +1,5 @@ +pykafka.sasl_authenticators +================ + +.. automodule:: pykafka.sasl_authenticators + :members: From 60468f6dfbd06bbbcab49a146e54a0348d08a0e3 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 10:20:06 +0200 Subject: [PATCH 17/46] implement SaslAuthenticate request and response --- pykafka/protocol/__init__.py | 7 ++- pykafka/protocol/sasl.py | 101 ++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/pykafka/protocol/__init__.py b/pykafka/protocol/__init__.py index 90ea9f1c5..6722a0828 100644 --- a/pykafka/protocol/__init__.py +++ b/pykafka/protocol/__init__.py @@ -26,7 +26,8 @@ OffsetFetchRequestV1, OffsetFetchResponseV1, OffsetFetchRequestV2, OffsetFetchResponseV2) from .produce import ProduceRequest, ProduceResponse, ProducePartitionResponse -from .sasl import SaslHandshakeRequest, SaslHandshakeRequestV1, SaslHandshakeResponse, SaslHandshakeResponseV1 +from .sasl import (SaslHandshakeRequest, SaslHandshakeRequestV1, SaslHandshakeResponse, SaslHandshakeResponseV1, + SaslAuthenticateRequest, SaslAuthenticateRquestV1, SaslAuthenticateResponse, SaslAuthenticateResponseV1) """ Author: Keith Bourgoin, Emmett Butler @@ -81,4 +82,6 @@ "MetadataRequestV2", "MetadataResponseV2", "MetadataRequestV3", "MetadataResponseV3", "MetadataRequestV4", "MetadataResponseV4", "MetadataRequestV5", "MetadataResponseV5", "SaslHandshakeRequest", - "SaslHandshakeRequestV1", "SaslHandshakeResponse", "SaslHandshakeResponseV1"] + "SaslHandshakeRequestV1", "SaslHandshakeResponse", "SaslHandshakeResponseV1", + "SaslAuthenticateRequest", "SaslAuthenticateRquestV1", "SaslAuthenticateResponse", + "SaslAuthenticateResponseV1"] diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py index 39f965bf6..efbd7a770 100644 --- a/pykafka/protocol/sasl.py +++ b/pykafka/protocol/sasl.py @@ -76,4 +76,103 @@ class SaslHandshakeResponseV1(SaslHandshakeResponse): SaslHandshake Response (Version: 1) => error_code [mechanisms] error_code => INT16 mechanisms => STRING - """ \ No newline at end of file + """ + + +class SaslAuthenticateRequest(Request): + """A SASL authenticate request + Specification:: + + SaslAuthenticate Request (Version: 0) => auth_bytes + auth_bytes => BYTES + """ + API_KEY = 36 + + @classmethod + def get_versions(cls): + return {0: SaslAuthenticateRequest, 1: SaslAuthenticateRquestV1} + + def __init__(self, auth_bytes): + self.auth_bytes = auth_bytes + + def __len__(self): + if self.auth_bytes is not None: + return self.HEADER_LEN + 4 + len(self.auth_bytes) + return self.HEADER_LEN + 4 + + def get_bytes(self): + """Create new sasl authenticate request""" + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + if self.auth_bytes is not None: + fmt = '!i%ds' % len(self.auth_bytes) + struct.pack_into(fmt, output, offset, len(self.auth_bytes), self.auth_bytes) + else: + fmt = '!i' + struct.pack_into(fmt, output, offset, -1) + return output + + +class SaslAuthenticateRquestV1(SaslAuthenticateRequest): + """A SASL authenticate request + Specification:: + + SaslAuthenticate Request (Version: 1) => auth_bytes + auth_bytes => BYTES + """ + + +class SaslAuthenticateResponse(Response): + """A SASL authenticate response + Specification:: + + SaslAuthenticate Response (Version: 0) => error_code error_message auth_bytes + error_code => INT16 + error_message => NULLABLE_STRING + auth_bytes => BYTES + """ + API_KEY = 36 + + @classmethod + def get_versions(cls): + return {0: SaslAuthenticateResponse, 1: SaslAuthenticateResponseV1} + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h S Y' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.error_message = response[1].decode() + self.auth_bytes = response[2] + + +class SaslAuthenticateResponseV1(SaslAuthenticateResponse): + """A SASL authenticate response + Specification:: + + SaslAuthenticate Response (Version: 1) => error_code error_message auth_bytes session_lifetime_ms + error_code => INT16 + error_message => NULLABLE_STRING + auth_bytes => BYTES + session_lifetime_ms => INT64 + """ + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h S Y q' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.error_message = response[1].decode() + self.auth_bytes = response[2] + self.session_lifetime_ms = response[3] From d0784672a4996b25603b504a461e29d88ff9aed0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 10:49:04 +0200 Subject: [PATCH 18/46] add advertisted listeners to test kafka config --- pykafka/test/kafka_instance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index c71c2be1f..d97f2080c 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -41,6 +41,7 @@ # Configurable settings broker.id={broker_id} listeners={listeners} +advertised.listeners={listeners} zookeeper.connect={zk_connstr} log.dirs={data_dir} From 4d1db540d7c72cd71c217b1a7a03b3d6070645b3 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 10:50:28 +0200 Subject: [PATCH 19/46] fix wrong class names in protocol.sasl --- pykafka/protocol/sasl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py index efbd7a770..a64aa433b 100644 --- a/pykafka/protocol/sasl.py +++ b/pykafka/protocol/sasl.py @@ -54,7 +54,7 @@ class SaslHandshakeResponse(Response): @classmethod def get_versions(cls): - return {0: SaslHandshakeRequest, 1: SaslHandshakeRequestV1} + return {0: SaslHandshakeResponse, 1: SaslHandshakeResponseV1} def __init__(self, buff): """Deserialize into a new Response From f3c0d57b03aaa64a1ed221ed42ecc8b6a2c24ef5 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 10:51:03 +0200 Subject: [PATCH 20/46] implement advanced authentication using new SASL API --- pykafka/sasl_authenticators.py | 39 ++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 26e2c9cd2..645fd2c73 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -7,9 +7,9 @@ import six -from pykafka.protocol import SaslHandshakeResponse from .exceptions import AuthenticationException, ERROR_CODES, UnsupportedSaslMechanism -from .protocol import SaslHandshakeRequest +from .protocol import (SaslHandshakeRequest, SaslHandshakeResponse, ApiVersionsRequest, ApiVersionsResponse, + SaslAuthenticateRequest, SaslAuthenticateResponse) log = logging.getLogger(__name__) @@ -40,6 +40,9 @@ class BaseAuthenticator: and :meth:`BaseAuthenticator.receive_token` to send and receive the byte strings necessary to authenticate with the broker. """ + MAX_AUTH_VERSION = 1 + MAX_HANDSHAKE_VERSION = 1 + def __init__(self, mechanism, security_protocol=None): """ Base class for SASL authentication mechanisms. @@ -74,8 +77,8 @@ def authenticate(self, broker_connection): log.debug("Authentication successful.") def initialize_authentication(self): - self._broker_connection.request(SaslHandshakeRequest(self.mechanism)) - response = SaslHandshakeResponse(self._broker_connection.response()) + self._broker_connection.request(SaslHandshakeRequest.get_versions()[self.handshake_version](self.mechanism)) + response = SaslHandshakeResponse.get_versions()[self.handshake_version](self._broker_connection.response()) if response.error_code != 0: if response.error_code == UnsupportedSaslMechanism.ERROR_CODE: msg = "Broker only supports sasl mechanisms {}, requested was {}" @@ -86,19 +89,29 @@ def exchange_tokens(self): raise NotImplementedError() def send_token(self, token): - self._broker_connection.request(FakeRequest(token)) + if self.handshake_version == 0: + req = FakeRequest(token) + else: + req = SaslAuthenticateRequest.get_versions()[self.auth_version](token) + self._broker_connection.request(req) def receive_token(self): - return self._broker_connection.response_raw() + if self.handshake_version == 0: + return self._broker_connection.response_raw() - def fetch_api_versions(self): - self.handshake_version = 0 + response = SaslAuthenticateResponse.get_versions()[self.auth_version](self._broker_connection.response()) + if response.error_code != 0: + raise ERROR_CODES[response.error_code](response.error_message) + return response.auth_bytes - # try this later - # self._broker_connection.request(ApiVersionsRequest()) - # response = ApiVersionsResponse(self._broker_connection.response()) - # self.handshake_version = response.api_versions[17] - # self.auth_version = response.api_versions.get(36, None) + def fetch_api_versions(self): + self._broker_connection.request(ApiVersionsRequest()) + response = ApiVersionsResponse(self._broker_connection.response()) + self.handshake_version = response.api_versions[SaslHandshakeRequest.API_KEY].max + self.auth_version = response.api_versions.get(SaslAuthenticateRequest.API_KEY, None) + self.handshake_version = min(self.MAX_HANDSHAKE_VERSION, self.handshake_version) + if self.auth_version is not None: + self.auth_version = min(self.auth_version.max, self.MAX_AUTH_VERSION) class ScramAuthenticator(BaseAuthenticator): From ed889e0e2cab14091f2bceaa46c14eebedbfb431 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 11:37:32 +0200 Subject: [PATCH 21/46] add BROKERS_SASL to ci scripts --- .travis.yml | 1 + tox.ini | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index c4e2f1306..e5509a450 100644 --- a/.travis.yml +++ b/.travis.yml @@ -83,6 +83,7 @@ before_script: - while true; do sleep 1; echo "Waiting for cluster..."; if [[ `grep ZOOKEEPER $TEMPFILE` ]]; then break; fi; done - export `grep BROKERS $TEMPFILE` - export `grep BROKERS_SSL $TEMPFILE` + - export `grep BROKERS_SASL $TEMPFILE` - export `grep ZOOKEEPER $TEMPFILE` script: diff --git a/tox.ini b/tox.ini index 853c680e0..26ce880f6 100644 --- a/tox.ini +++ b/tox.ini @@ -8,4 +8,4 @@ deps = gevent: gevent==1.3.6 commands = py.test {posargs} -passenv = BROKERS BROKERS_SSL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS +passenv = BROKERS BROKERS_SSL BROKERS_SASL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS From cc8d9b78739df44c68b10a02d8d4bdfc013eebeb Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 12:59:24 +0200 Subject: [PATCH 22/46] fix bug in protocol.sasl --- pykafka/protocol/sasl.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py index a64aa433b..1dfa5808a 100644 --- a/pykafka/protocol/sasl.py +++ b/pykafka/protocol/sasl.py @@ -12,6 +12,7 @@ class SaslHandshakeRequest(Request): mechanism => STRING """ API_KEY = 17 + API_VERSION = 0 @classmethod def get_versions(cls): @@ -26,7 +27,7 @@ def __len__(self): def get_bytes(self): """Create new sasl handshake request""" output = bytearray(len(self)) - self._write_header(output) + self._write_header(output, api_version=self.API_VERSION) offset = self.HEADER_LEN fmt = '!h%ds' % len(self.mechanism) struct.pack_into(fmt, output, offset, len(self.mechanism), self.mechanism) @@ -40,6 +41,7 @@ class SaslHandshakeRequestV1(SaslHandshakeRequest): SaslHandshake Request (Version: 1) => mechanism mechanism => STRING """ + API_VERSION = 1 class SaslHandshakeResponse(Response): @@ -77,6 +79,7 @@ class SaslHandshakeResponseV1(SaslHandshakeResponse): error_code => INT16 mechanisms => STRING """ + API_VERSION = 1 class SaslAuthenticateRequest(Request): @@ -87,6 +90,7 @@ class SaslAuthenticateRequest(Request): auth_bytes => BYTES """ API_KEY = 36 + API_VERSION = 0 @classmethod def get_versions(cls): @@ -103,7 +107,7 @@ def __len__(self): def get_bytes(self): """Create new sasl authenticate request""" output = bytearray(len(self)) - self._write_header(output) + self._write_header(output, api_version=self.API_VERSION) offset = self.HEADER_LEN if self.auth_bytes is not None: fmt = '!i%ds' % len(self.auth_bytes) @@ -121,6 +125,7 @@ class SaslAuthenticateRquestV1(SaslAuthenticateRequest): SaslAuthenticate Request (Version: 1) => auth_bytes auth_bytes => BYTES """ + API_VERSION = 1 class SaslAuthenticateResponse(Response): @@ -148,7 +153,7 @@ def __init__(self, buff): response = struct_helpers.unpack_from(fmt, buff, 0) self.error_code = response[0] - self.error_message = response[1].decode() + self.error_message = response[1].decode() if response[1] is not None else None self.auth_bytes = response[2] From 6595497a8a972e0e9a9783e3a9798a204aff74ec Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 12:59:51 +0200 Subject: [PATCH 23/46] add more output to authenticator --- pykafka/sasl_authenticators.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 645fd2c73..a0c067f08 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -89,6 +89,7 @@ def exchange_tokens(self): raise NotImplementedError() def send_token(self, token): + log.debug("Seding auth token") if self.handshake_version == 0: req = FakeRequest(token) else: @@ -96,15 +97,18 @@ def send_token(self, token): self._broker_connection.request(req) def receive_token(self): + log.debug("Receiving auth token") if self.handshake_version == 0: return self._broker_connection.response_raw() - response = SaslAuthenticateResponse.get_versions()[self.auth_version](self._broker_connection.response()) + data = self._broker_connection.response() + response = SaslAuthenticateResponse.get_versions()[self.auth_version](data) if response.error_code != 0: raise ERROR_CODES[response.error_code](response.error_message) return response.auth_bytes def fetch_api_versions(self): + log.debug("Fetch SASL authentication api versions.") self._broker_connection.request(ApiVersionsRequest()) response = ApiVersionsResponse(self._broker_connection.response()) self.handshake_version = response.api_versions[SaslHandshakeRequest.API_KEY].max @@ -112,7 +116,9 @@ def fetch_api_versions(self): self.handshake_version = min(self.MAX_HANDSHAKE_VERSION, self.handshake_version) if self.auth_version is not None: self.auth_version = min(self.auth_version.max, self.MAX_AUTH_VERSION) - + log.debug("Determinded handshake api version {} and authenticate api version {}".format( + self.handshake_version, self.auth_version + )) class ScramAuthenticator(BaseAuthenticator): """ From 1ea009e666fdc9dfe16109d33d081dee911818e9 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 13:03:29 +0200 Subject: [PATCH 24/46] tie authenticators send and receive methods together --- pykafka/sasl_authenticators.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index a0c067f08..42c87bc0c 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -36,8 +36,8 @@ class BaseAuthenticator: Subclasses are supposed to implement: 1. :meth:`BaseAuthenticator.get_rd_kafka_opts` which should return a dictionary whose items will be appended to the config given to librdkafka consumers and producers. - 2. :meth:`BaseAuthenticator.exchange_tokens` which is supposed to use :meth:`BaseAuthenticator.send_token` - and :meth:`BaseAuthenticator.receive_token` to send and receive the byte strings necessary to authenticate + 2. :meth:`BaseAuthenticator.exchange_tokens` which is supposed to use + :meth:`BaseAuthenticator.send_and_receive` to send and receive the byte strings necessary to authenticate with the broker. """ MAX_AUTH_VERSION = 1 @@ -88,7 +88,11 @@ def initialize_authentication(self): def exchange_tokens(self): raise NotImplementedError() - def send_token(self, token): + def send_and_receive(self, token): + self._send_token(token) + return self._receive_token() + + def _send_token(self, token): log.debug("Seding auth token") if self.handshake_version == 0: req = FakeRequest(token) @@ -96,7 +100,7 @@ def send_token(self, token): req = SaslAuthenticateRequest.get_versions()[self.auth_version](token) self._broker_connection.request(req) - def receive_token(self): + def _receive_token(self): log.debug("Receiving auth token") if self.handshake_version == 0: return self._broker_connection.response_raw() @@ -206,15 +210,11 @@ def get_rd_kafka_opts(self): def exchange_tokens(self): client_first = self.first_message() - self.send_token(client_first.encode()) - - server_first = self.receive_token().decode() + server_first = self.send_and_receive(client_first.encode()).decode() self.process_server_first_message(server_first) client_final = self.final_message() - self.send_token(client_final.encode()) - - server_final = self.receive_token().decode() + server_final = self.send_and_receive(client_final.encode()).decode() self.process_server_final_message(server_final) @@ -248,7 +248,7 @@ def get_rd_kafka_opts(self): } def exchange_tokens(self): - self.send_token("\0".join([self.user, self.user, self.password]).encode()) - response = self.receive_token() + token = "\0".join([self.user, self.user, self.password]).encode() + response = self.send_and_receive(token) if response != b"": raise AuthenticationException("Server sent unexpected response!") From beaa6b8cbdce2601da7dbdb281e7ec2a534b1491 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 13:42:19 +0200 Subject: [PATCH 25/46] improve docs and refactor pykafka.sasl_authenticators --- pykafka/sasl_authenticators.py | 133 +++++++++++++++++++++++++++------ 1 file changed, 111 insertions(+), 22 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index 42c87bc0c..d31531c65 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -8,8 +8,14 @@ import six from .exceptions import AuthenticationException, ERROR_CODES, UnsupportedSaslMechanism -from .protocol import (SaslHandshakeRequest, SaslHandshakeResponse, ApiVersionsRequest, ApiVersionsResponse, - SaslAuthenticateRequest, SaslAuthenticateResponse) +from .protocol import ( + SaslHandshakeRequest, + SaslHandshakeResponse, + ApiVersionsRequest, + ApiVersionsResponse, + SaslAuthenticateRequest, + SaslAuthenticateResponse, +) log = logging.getLogger(__name__) @@ -22,8 +28,19 @@ def xor_bytes(left, right): return bytes(lb ^ rb for lb, rb in zip(left, right)) -class FakeRequest: +class BytesWrapper: + """ + Class that implements :meth:`get_bytes` and wraps some payload so it can be used for + :meth:`connection.BrokerConnection.request` during legacy sasl authentication sequence. + """ + def __init__(self, payload): + """ + Create a new FakeRequest. + + :param payload: The payload to wrap + :type payload: bytes + """ self.payload = payload def get_bytes(self): @@ -40,6 +57,7 @@ class BaseAuthenticator: :meth:`BaseAuthenticator.send_and_receive` to send and receive the byte strings necessary to authenticate with the broker. """ + MAX_AUTH_VERSION = 1 MAX_HANDSHAKE_VERSION = 1 @@ -61,22 +79,39 @@ def __init__(self, mechanism, security_protocol=None): self._broker_connection = None def get_rd_kafka_opts(self): + """ + Creates the config entries necessary for librdkafka to successfully authenticate with the broker. + + :return: Dictionary to enrich config for librdkafka based consumers and producers. + """ raise NotImplementedError() def authenticate(self, broker_connection): + """ + Runs the authentication sequence on the given broker connection. + + .. warning:: + This is not thread safe! + + :param broker_connection: The broker connection to authenticate with. + :type broker_connection: :class:`pykafka.connection.BrokerConnection` + """ self._broker_connection = broker_connection if self.handshake_version is None: - self.fetch_api_versions() + self._fetch_api_versions() log.debug( "Authenticating to {}:{} using mechanism {}.".format( self._broker_connection.host, self._broker_connection.port, self.mechanism ) ) - self.initialize_authentication() + self._initialize_authentication() self.exchange_tokens() log.debug("Authentication successful.") - def initialize_authentication(self): + def _initialize_authentication(self): + """ + Initializes the authentication sequence. + """ self._broker_connection.request(SaslHandshakeRequest.get_versions()[self.handshake_version](self.mechanism)) response = SaslHandshakeResponse.get_versions()[self.handshake_version](self._broker_connection.response()) if response.error_code != 0: @@ -86,16 +121,30 @@ def initialize_authentication(self): raise ERROR_CODES[response.error_code]("Authentication Handshake failed") def exchange_tokens(self): + """ + Runs the authentication sequence. Implementation varies among SASL mechanism and has to be supplied by + subclasses. See also :meth:`PlainAuthenticator.exchange_tokens` or :meth:`ScramAuthenticator.exchange_tokens` + for exemplary implementations. + """ raise NotImplementedError() def send_and_receive(self, token): + """ + Sends the given token to the broker and receives the brokers response. + This will automatically use the appropriate mechanism to do so. + I.e. use SaslAuthenticateRequest if the server supports it or just send the bytes directly if it doesn't. + + :param token: The token to be sent to the broker. + :type token: bytes + :return: bytes, the servers response + """ self._send_token(token) return self._receive_token() def _send_token(self, token): log.debug("Seding auth token") if self.handshake_version == 0: - req = FakeRequest(token) + req = BytesWrapper(token) else: req = SaslAuthenticateRequest.get_versions()[self.auth_version](token) self._broker_connection.request(req) @@ -111,18 +160,27 @@ def _receive_token(self): raise ERROR_CODES[response.error_code](response.error_message) return response.auth_bytes - def fetch_api_versions(self): + def _fetch_api_versions(self): + """ + The api version request can be run without authentication in order to determine which authentication api + versions to use. That's what this method does. + """ log.debug("Fetch SASL authentication api versions.") self._broker_connection.request(ApiVersionsRequest()) response = ApiVersionsResponse(self._broker_connection.response()) + self.handshake_version = response.api_versions[SaslHandshakeRequest.API_KEY].max self.auth_version = response.api_versions.get(SaslAuthenticateRequest.API_KEY, None) + self.handshake_version = min(self.MAX_HANDSHAKE_VERSION, self.handshake_version) if self.auth_version is not None: self.auth_version = min(self.auth_version.max, self.MAX_AUTH_VERSION) - log.debug("Determinded handshake api version {} and authenticate api version {}".format( - self.handshake_version, self.auth_version - )) + log.debug( + "Determinded handshake api version {} and authenticate api version {}".format( + self.handshake_version, self.auth_version + ) + ) + class ScramAuthenticator(BaseAuthenticator): """ @@ -160,13 +218,24 @@ def __init__(self, mechanism, user, password, security_protocol=None): self.server_key = None self.server_signature = None - def first_message(self): + def client_first_message(self): + """ + Create and return the client first message. This will also reset all internal variables. + :return: str, the client first message + """ self.nonce = str(uuid4()).replace("-", "") client_first_bare = "n={},r={}".format(self.user, self.nonce) self.auth_message = client_first_bare return "n,," + client_first_bare def process_server_first_message(self, server_first_message): + """ + Parse and process server first message, this will extract all necessary information from the server's first + response such as iteration count or salt and use it to prepare the client final message. + + :param server_first_message: The first message sent by the server + :type server_first_message: str + """ self.auth_message += "," + server_first_message params = dict(pair.split("=", 1) for pair in server_first_message.split(",")) server_nonce = params["r"] @@ -177,25 +246,45 @@ def process_server_first_message(self, server_first_message): salt = base64.b64decode(params["s"].encode()) iterations = int(params["i"]) - self.create_salted_password(salt, iterations) + self._create_salted_password(salt, iterations) - self.client_key = self.hmac(self.salted_password, b"Client Key") + self.client_key = self._hmac(self.salted_password, b"Client Key") self.stored_key = self.hashfunc(self.client_key).digest() - self.client_signature = self.hmac(self.stored_key, self.auth_message.encode()) + self.client_signature = self._hmac(self.stored_key, self.auth_message.encode()) self.client_proof = xor_bytes(self.client_key, self.client_signature) - self.server_key = self.hmac(self.salted_password, b"Server Key") - self.server_signature = self.hmac(self.server_key, self.auth_message.encode()) + self.server_key = self._hmac(self.salted_password, b"Server Key") + self.server_signature = self._hmac(self.server_key, self.auth_message.encode()) - def hmac(self, key, msg): + def _hmac(self, key, msg): + """ + Run the hmac algorithm on `key` and `msg` using the appropriate digest method for the configures scram + mechanism. + :param key: The key for the hmac algorithm + :type key: bytes + :param msg: The message for the hmac algorithm + :type msg: bytes + :return: bytes, the result of applying hmac on `key` and `msg` + """ return hmac.new(key, msg, digestmod=self.hashfunc).digest() - def create_salted_password(self, salt, iterations): + def _create_salted_password(self, salt, iterations): self.salted_password = hashlib.pbkdf2_hmac(self.hashname, self.password, salt, iterations) - def final_message(self): + def client_final_message(self): + """ + Create and return the client final message. + :return: str, the client final message + """ return "c=biws,r={},p={}".format(self.nonce, base64.b64encode(self.client_proof).decode()) def process_server_final_message(self, server_final_message): + """ + Parse and process server final message. This will run validation on the server's response to make sure that + everything is all right. + + :param server_final_message: The first message sent by the server + :type server_final_message: str + """ params = dict(pair.split("=", 1) for pair in server_final_message.split(",")) if self.server_signature != base64.b64decode(params["v"].encode()): raise AuthenticationException("Server sent wrong signature!") @@ -209,11 +298,11 @@ def get_rd_kafka_opts(self): } def exchange_tokens(self): - client_first = self.first_message() + client_first = self.client_first_message() server_first = self.send_and_receive(client_first.encode()).decode() self.process_server_first_message(server_first) - client_final = self.final_message() + client_final = self.client_final_message() server_final = self.send_and_receive(client_final.encode()).decode() self.process_server_final_message(server_final) From 823da3dfd62c1570e5332827124384582c9631e6 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 7 Oct 2019 17:16:48 +0200 Subject: [PATCH 26/46] add -keyalg RSA to keystore generation in order to support newer ciphers --- pykafka/test/kafka_instance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index d97f2080c..a1d57ae35 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -558,6 +558,7 @@ def _gen_broker_keystore(self): '-keystore', self.keystore, '-storepass:env', 'BROKER_PASS', '-keypass:env', 'BROKER_PASS', + '-keyalg', 'RSA', '-noprompt'] subprocess.check_call(cmd, env=env) cmd = ['keytool', '-certreq', '-alias', 'broker', From 55dbc5ba80bbc9606cef075390e4bd9daa5dbee3 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 14:47:18 +0200 Subject: [PATCH 27/46] add timeout when waiting for kafka cluster --- .travis.yml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index e5509a450..077f54930 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,7 +80,16 @@ before_script: - export CFLAGS="-coverage" - TEMPFILE=`tempfile` - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE & - - while true; do sleep 1; echo "Waiting for cluster..."; if [[ `grep ZOOKEEPER $TEMPFILE` ]]; then break; fi; done + - | + start=$(date +%s) + until grep ZOOKEEPER $TEMPFILE 1>/dev/null 2>/dev/null; do + sleep 1 + echo "Waiting for cluster..." + if [[ $(($(date +%s) - start)) -gt 30 ]]; then + echo "Timeout waiting for cluster!" + exit 1 + fi; + done - export `grep BROKERS $TEMPFILE` - export `grep BROKERS_SSL $TEMPFILE` - export `grep BROKERS_SASL $TEMPFILE` From 46e981f98fb4e1a57989dedaa7a1cd047583abdd Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 15:05:16 +0200 Subject: [PATCH 28/46] add log-level parameter to test kafka instance --- pykafka/test/kafka_instance.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index a1d57ae35..35f0ed606 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -332,7 +332,6 @@ def _start_process(self): return # hooray! success log.info('Waiting for cluster to start....') time.sleep(6) # Waits 60s total - # If it got this far, it's an error raise ProcessNotStartingError('Unable to start Kafka cluster.') @@ -628,8 +627,27 @@ def _gen_client_cert(self): help='Scala version for kafka build') parser.add_argument('--export-hosts', type=str, help='Write host strings to given file path') + parser.add_argument( + "--log-level", + choices=["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help='Set log level to given value, if "NOTSET" (default) no logging is active.', + default="NOTSET", + ) args = parser.parse_args() + if args.log_level != "NOTSET": + base_logger = logging.getLogger() + log_level = getattr(logging, args.log_level) + + formatter = logging.Formatter("%(levelname)-8s - %(name)-12s - %(message)s") + + stream_handler = logging.StreamHandler() + stream_handler.setLevel(log_level) + stream_handler.setFormatter(formatter) + + base_logger.setLevel(log_level) + base_logger.addHandler(stream_handler) + _exiting = False def _catch_sigint(signum, frame): global _exiting From 9a57cacece00d329612448d69dd8522ef9e94b39 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 15:12:59 +0200 Subject: [PATCH 29/46] print logs on failed kafka startup --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 077f54930..39b571a4e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -79,7 +79,8 @@ before_script: - export LD_LIBRARY_PATH=$HOME/lib:$LD_LIBRARY_PATH - export CFLAGS="-coverage" - TEMPFILE=`tempfile` - - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE & + - KAFKA_LOGS=`tempfile` + - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE --log-level INFO 2>$KAFKA_LOGS & - | start=$(date +%s) until grep ZOOKEEPER $TEMPFILE 1>/dev/null 2>/dev/null; do @@ -87,6 +88,7 @@ before_script: echo "Waiting for cluster..." if [[ $(($(date +%s) - start)) -gt 30 ]]; then echo "Timeout waiting for cluster!" + cat $KAFKA_LOGS exit 1 fi; done From ffe575ce29c5627c3954f3cfceb0f3f999322a0d Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 15:57:09 +0200 Subject: [PATCH 30/46] increase timeout for kafka cluster and fail if cluster dies --- .travis.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 39b571a4e..02aeb735a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -82,15 +82,21 @@ before_script: - KAFKA_LOGS=`tempfile` - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE --log-level INFO 2>$KAFKA_LOGS & - | + kafka_pid=$! start=$(date +%s) until grep ZOOKEEPER $TEMPFILE 1>/dev/null 2>/dev/null; do sleep 1 echo "Waiting for cluster..." - if [[ $(($(date +%s) - start)) -gt 30 ]]; then + if [[ $(($(date +%s) - start)) -gt 300 ]]; then echo "Timeout waiting for cluster!" cat $KAFKA_LOGS exit 1 fi; + if ! kill -0 "$kafka_pid" >/dev/null 2>&1 ; then + echo "Kafka test cluster died during startup!" + cat $KAFKA_LOGS + exit 2 + fi done - export `grep BROKERS $TEMPFILE` - export `grep BROKERS_SSL $TEMPFILE` From 5fc341800994e4a30d094f0fbba5e24b2438d261 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 16:35:30 +0200 Subject: [PATCH 31/46] increase timeout for kafka test instance startup --- pykafka/test/kafka_instance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 35f0ed606..b89dacceb 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -326,7 +326,7 @@ def _start_process(self): # Process is started when the port isn't free anymore all_ports = [zk_port] + broker_ports + broker_ssl_ports + broker_sasl_ports - for i in range(10): + for i in range(180): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success From 835be5113a5a6a6c7023fcfcae26aa92c009a909 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 16:51:28 +0200 Subject: [PATCH 32/46] check if subprocesses are still running during test kafka startup --- pykafka/test/kafka_instance.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index b89dacceb..fcc810bd6 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -326,10 +326,23 @@ def _start_process(self): # Process is started when the port isn't free anymore all_ports = [zk_port] + broker_ports + broker_ssl_ports + broker_sasl_ports - for i in range(180): + all_procs = [('zookeeper', self._zk_proc)] + for i, broker_proc in enumerate(self._broker_procs): + all_procs.append(("broker_{}".format(i), broker_proc)) + + for _ in range(180): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success + if any(proc.poll() is not None for _, proc in all_procs): + msg = "One or more processes terminated already!" + for name, proc in all_procs: + returncode = proc.poll() + if returncode is None: + msg += "\n {} is still running".format(name) + else: + msg += "\n {} exited with {}".format(name, returncode) + raise ProcessNotStartingError(msg) log.info('Waiting for cluster to start....') time.sleep(6) # Waits 60s total # If it got this far, it's an error From 05824ce9ba3a6ee98e2b5e1e1226f85c1b6a00c7 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 17:01:55 +0200 Subject: [PATCH 33/46] add environment variable PYTHONUNBUFFERED=1 to travis.yaml --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 02aeb735a..1e890607d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -38,6 +38,7 @@ env: global: - PATH="/usr/lib/ccache:$PATH" - KAFKA_BIN="$HOME/kafka-bin" + - PYTHONUNBUFFERED=1 addons: apt: From 8e0b7bb8de4a9579cb5845e3f8d12079ab7d6552 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 17:09:00 +0200 Subject: [PATCH 34/46] set line buffering for test kafka logfiles --- pykafka/test/kafka_instance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index fcc810bd6..20ff548f1 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -406,7 +406,7 @@ def _start_broker_proc(self, port, ssl_port=None, sasl_port=None): new_proc = (utils.Popen( args=[binfile, conf], stderr=utils.STDOUT, - stdout=open(logfile, 'w'), + stdout=open(logfile, 'w', buffering=1), use_gevent=self.use_gevent, env={} if sasl_port is None else {'KAFKA_OPTS': '-Djava.security.auth.login.config={}'.format(jaas_conf)} )) @@ -456,7 +456,7 @@ def _start_zookeeper(self): self._zk_proc = utils.Popen( args=[binfile, conf], stderr=utils.STDOUT, - stdout=open(logfile, 'w'), + stdout=open(logfile, 'w', buffering=1), use_gevent=self.use_gevent ) return port From 77b5a8bb955ee1425b9dddc569712c1025eb1e01 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 18:06:01 +0200 Subject: [PATCH 35/46] use logging.basicConfig --- pykafka/test/kafka_instance.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 20ff548f1..a7b322f56 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -342,6 +342,7 @@ def _start_process(self): msg += "\n {} is still running".format(name) else: msg += "\n {} exited with {}".format(name, returncode) + raise ProcessNotStartingError(msg) log.info('Waiting for cluster to start....') time.sleep(6) # Waits 60s total @@ -649,17 +650,8 @@ def _gen_client_cert(self): args = parser.parse_args() if args.log_level != "NOTSET": - base_logger = logging.getLogger() - log_level = getattr(logging, args.log_level) - - formatter = logging.Formatter("%(levelname)-8s - %(name)-12s - %(message)s") - - stream_handler = logging.StreamHandler() - stream_handler.setLevel(log_level) - stream_handler.setFormatter(formatter) - - base_logger.setLevel(log_level) - base_logger.addHandler(stream_handler) + logging.basicConfig(level=getattr(logging, args.log_level), + format='%(threadName)10s - %(levelname)-8s - %(name)-12s - %(message)s') _exiting = False def _catch_sigint(signum, frame): From 962aa3168aa95efa5cb8e415e74605928192186a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 8 Oct 2019 18:13:49 +0200 Subject: [PATCH 36/46] keep track of all logs and procs and output logs if they fail --- pykafka/test/kafka_instance.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index a7b322f56..d8648f21a 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -200,6 +200,8 @@ def __init__(self, self._processes = [] self._broker_procs = [] self._brokers_started = 0 # incremented by _start_broker + self._all_log_files = {} + self._all_procs = {} self.zookeeper = None self.brokers = None self.brokers_ssl = None @@ -326,22 +328,24 @@ def _start_process(self): # Process is started when the port isn't free anymore all_ports = [zk_port] + broker_ports + broker_ssl_ports + broker_sasl_ports - all_procs = [('zookeeper', self._zk_proc)] - for i, broker_proc in enumerate(self._broker_procs): - all_procs.append(("broker_{}".format(i), broker_proc)) for _ in range(180): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success - if any(proc.poll() is not None for _, proc in all_procs): + if any(proc.poll() is not None for proc in self._all_procs.values()): msg = "One or more processes terminated already!" - for name, proc in all_procs: + for name, proc in self._all_procs.items(): returncode = proc.poll() if returncode is None: msg += "\n {} is still running".format(name) else: msg += "\n {} exited with {}".format(name, returncode) + log.error("{} exited with {}".format(name, returncode)) + log.error("{} logs:".format(name)) + with open(self._all_log_files[name], 'r') as o: + for line in o: + log.error(line) raise ProcessNotStartingError(msg) log.info('Waiting for cluster to start....') @@ -412,6 +416,8 @@ def _start_broker_proc(self, port, ssl_port=None, sasl_port=None): env={} if sasl_port is None else {'KAFKA_OPTS': '-Djava.security.auth.login.config={}'.format(jaas_conf)} )) self._broker_procs.append(new_proc) + self._all_procs["kafka_{}".format(i)] = new_proc + self._all_log_files["kafka_{}".format(i)] = logfile # add localhost:port to internal list of (ssl)brokers self._add_broker(port) @@ -460,6 +466,9 @@ def _start_zookeeper(self): stdout=open(logfile, 'w', buffering=1), use_gevent=self.use_gevent ) + + self._all_procs["zookeeper"] = self._zk_proc + self._all_log_files["zookeeper"] = logfile return port def terminate(self): From 4f57c0d4c29ab9000f7e29899e08da4ced36ad67 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 9 Oct 2019 08:13:34 +0200 Subject: [PATCH 37/46] add repair java version parser in kafka-run-class.sh for certain kafka versions --- pykafka/test/kafka_instance.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index d8648f21a..7ea085980 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -236,8 +236,7 @@ def _init_dirs(self): def _download_kafka(self): """Make sure the Kafka code has been downloaded to the right dir.""" - binfile = os.path.join(self._bin_dir, - 'bin/kafka-server-start.sh') + binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh') if os.path.exists(binfile): return # already there @@ -264,9 +263,24 @@ def _download_kafka(self): p1.stdout.close() output, err = p2.communicate() os.chdir(curr_dir) - + if (1, 0, 0) <= self._kafka_version < (1, 1, 1): + # java version parsing is broken for some java versions in this kafka version range + log.info("Fixing java version parser in kafka-run-class.sh") + self._fix_run_class_sh() log.info('Downloaded Kafka to %s', self._bin_dir) + def _fix_run_class_sh(self): + run_class_sh = os.path.join(self._bin_dir, 'bin/kafka-run-class.sh') + parser_line = " JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version \"([0-9]*).*$/\\1/p')\n" + with open(run_class_sh, 'r') as o: + script_lines = o.readlines() + with open(run_class_sh, 'w') as o: + for line in script_lines: + if line.strip().startswith("JAVA_MAJOR_VERSION="): + o.write(parser_line) + else: + o.write(line) + def _is_port_free(self, port): """Check to see if a port is open""" try: @@ -345,7 +359,7 @@ def _start_process(self): log.error("{} logs:".format(name)) with open(self._all_log_files[name], 'r') as o: for line in o: - log.error(line) + log.error(line.strip()) raise ProcessNotStartingError(msg) log.info('Waiting for cluster to start....') From c8e554f9da4575c475f86178e32c145d7897c255 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 9 Oct 2019 13:55:32 +0200 Subject: [PATCH 38/46] bump librdkafka version --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1e890607d..f38946edd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,9 +69,9 @@ install: fi - pip install -U pip setuptools - pip install codecov kazoo tox testinstances - - wget https://github.com/edenhill/librdkafka/archive/v0.9.5.tar.gz - - tar -xzf v0.9.5.tar.gz - - cd librdkafka-0.9.5/ && ./configure --prefix=$HOME + - wget https://github.com/edenhill/librdkafka/archive/v1.1.0.tar.gz + - tar -xzf v1.1.0.tar.gz + - cd librdkafka-1.1.0/ && ./configure --prefix=$HOME - make -j 2 && make -j 2 install && cd - before_script: From 0a75ce04fe00e395eeb76574281b2bf0804b7881 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 9 Oct 2019 17:51:40 +0200 Subject: [PATCH 39/46] switch tests from kafka 0.8 to kafka 2.0 --- .travis.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index f38946edd..3f2c77b73 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,31 +6,31 @@ cache: - $HOME/.ccache matrix: include: - - env: TOX_ENV=py27 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py27 KAFKA_VERSION=2.0.0 python: 2.7 - env: TOX_ENV=py27 KAFKA_VERSION=1.0.1 python: 2.7 - - env: TOX_ENV=py34 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py34 KAFKA_VERSION=2.0.0 python: 3.4 - env: TOX_ENV=py34 KAFKA_VERSION=1.0.1 python: 3.4 - - env: TOX_ENV=py35 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py35 KAFKA_VERSION=2.0.0 python: 3.5 - env: TOX_ENV=py35 KAFKA_VERSION=1.0.1 python: 3.5 - - env: TOX_ENV=py36 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py36 KAFKA_VERSION=2.0.0 python: 3.6 - env: TOX_ENV=py36 KAFKA_VERSION=1.0.1 python: 3.6 - - env: TOX_ENV=pypy KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=pypy KAFKA_VERSION=2.0.0 python: pypy - env: TOX_ENV=pypy KAFKA_VERSION=1.0.1 python: pypy - - env: TOX_ENV=py27-gevent KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py27-gevent KAFKA_VERSION=2.0.0 python: 2.7 - env: TOX_ENV=py27-gevent KAFKA_VERSION=1.0.1 python: 2.7 - - env: TOX_ENV=py36-gevent KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py36-gevent KAFKA_VERSION=2.0.0 python: 3.6 - env: TOX_ENV=py36-gevent KAFKA_VERSION=1.0.1 python: 3.6 From 7b55fab45b2ac7548cd7720b154fd2e19059ffba Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 9 Oct 2019 18:19:49 +0200 Subject: [PATCH 40/46] remove pytest from test-requirements again --- test-requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index 793ff30e3..2e02f79b8 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,5 @@ lz4==2.1.10 lz4tools==1.3.1.2 -pytest==4.6.2 pytest-cov python-snappy mock From 8b42a20c186e4d2084b0870b91cf041ad4485b48 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 10 Oct 2019 08:16:27 +0200 Subject: [PATCH 41/46] downgrade librdkafka to 0.11.3 --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3f2c77b73..99b966979 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,9 +69,9 @@ install: fi - pip install -U pip setuptools - pip install codecov kazoo tox testinstances - - wget https://github.com/edenhill/librdkafka/archive/v1.1.0.tar.gz - - tar -xzf v1.1.0.tar.gz - - cd librdkafka-1.1.0/ && ./configure --prefix=$HOME + - wget https://github.com/edenhill/librdkafka/archive/v0.11.3.tar.gz + - tar -xzf v0.11.3.tar.gz + - cd librdkafka-0.11.3/ && ./configure --prefix=$HOME - make -j 2 && make -j 2 install && cd - before_script: From 55e2791cf6a4658ffc0073803952d9c548bf0fd2 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 10 Oct 2019 09:33:51 +0200 Subject: [PATCH 42/46] renamed test module to avoid naming conflict with pykafka --- tests/{pykafka => pykafka_tests}/__init__.py | 0 tests/{pykafka => pykafka_tests}/rdkafka/__init__.py | 0 .../rdkafka/test_rd_kafka_consumer.py | 0 tests/{pykafka => pykafka_tests}/rdkafka/test_sasl.py | 0 .../{pykafka => pykafka_tests}/rdkafka/test_simple_consumer.py | 2 +- tests/{pykafka => pykafka_tests}/rdkafka/test_ssl.py | 2 +- tests/{pykafka => pykafka_tests}/test_balancedconsumer.py | 0 tests/{pykafka => pykafka_tests}/test_cluster.py | 0 tests/{pykafka => pykafka_tests}/test_connection.py | 0 tests/{pykafka => pykafka_tests}/test_partition.py | 0 tests/{pykafka => pykafka_tests}/test_partitioners.py | 0 tests/{pykafka => pykafka_tests}/test_producer.py | 0 tests/{pykafka => pykafka_tests}/test_protocol.py | 0 tests/{pykafka => pykafka_tests}/test_sasl.py | 0 tests/{pykafka => pykafka_tests}/test_simpleconsumer.py | 0 tests/{pykafka => pykafka_tests}/test_ssl.py | 0 tests/{pykafka => pykafka_tests}/utils/__init__.py | 0 tests/{pykafka => pykafka_tests}/utils/test_compression.py | 0 tests/{pykafka => pykafka_tests}/utils/test_struct_helpers.py | 0 19 files changed, 2 insertions(+), 2 deletions(-) rename tests/{pykafka => pykafka_tests}/__init__.py (100%) rename tests/{pykafka => pykafka_tests}/rdkafka/__init__.py (100%) rename tests/{pykafka => pykafka_tests}/rdkafka/test_rd_kafka_consumer.py (100%) rename tests/{pykafka => pykafka_tests}/rdkafka/test_sasl.py (100%) rename tests/{pykafka => pykafka_tests}/rdkafka/test_simple_consumer.py (97%) rename tests/{pykafka => pykafka_tests}/rdkafka/test_ssl.py (88%) rename tests/{pykafka => pykafka_tests}/test_balancedconsumer.py (100%) rename tests/{pykafka => pykafka_tests}/test_cluster.py (100%) rename tests/{pykafka => pykafka_tests}/test_connection.py (100%) rename tests/{pykafka => pykafka_tests}/test_partition.py (100%) rename tests/{pykafka => pykafka_tests}/test_partitioners.py (100%) rename tests/{pykafka => pykafka_tests}/test_producer.py (100%) rename tests/{pykafka => pykafka_tests}/test_protocol.py (100%) rename tests/{pykafka => pykafka_tests}/test_sasl.py (100%) rename tests/{pykafka => pykafka_tests}/test_simpleconsumer.py (100%) rename tests/{pykafka => pykafka_tests}/test_ssl.py (100%) rename tests/{pykafka => pykafka_tests}/utils/__init__.py (100%) rename tests/{pykafka => pykafka_tests}/utils/test_compression.py (100%) rename tests/{pykafka => pykafka_tests}/utils/test_struct_helpers.py (100%) diff --git a/tests/pykafka/__init__.py b/tests/pykafka_tests/__init__.py similarity index 100% rename from tests/pykafka/__init__.py rename to tests/pykafka_tests/__init__.py diff --git a/tests/pykafka/rdkafka/__init__.py b/tests/pykafka_tests/rdkafka/__init__.py similarity index 100% rename from tests/pykafka/rdkafka/__init__.py rename to tests/pykafka_tests/rdkafka/__init__.py diff --git a/tests/pykafka/rdkafka/test_rd_kafka_consumer.py b/tests/pykafka_tests/rdkafka/test_rd_kafka_consumer.py similarity index 100% rename from tests/pykafka/rdkafka/test_rd_kafka_consumer.py rename to tests/pykafka_tests/rdkafka/test_rd_kafka_consumer.py diff --git a/tests/pykafka/rdkafka/test_sasl.py b/tests/pykafka_tests/rdkafka/test_sasl.py similarity index 100% rename from tests/pykafka/rdkafka/test_sasl.py rename to tests/pykafka_tests/rdkafka/test_sasl.py diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka_tests/rdkafka/test_simple_consumer.py similarity index 97% rename from tests/pykafka/rdkafka/test_simple_consumer.py rename to tests/pykafka_tests/rdkafka/test_simple_consumer.py index ed3489468..d53a80d34 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka_tests/rdkafka/test_simple_consumer.py @@ -1,6 +1,6 @@ import pytest -from tests.pykafka import test_simpleconsumer, test_balancedconsumer +from tests.pykafka_tests import test_simpleconsumer, test_balancedconsumer from pykafka.utils.compat import range try: from pykafka.rdkafka import _rd_kafka # noqa diff --git a/tests/pykafka/rdkafka/test_ssl.py b/tests/pykafka_tests/rdkafka/test_ssl.py similarity index 88% rename from tests/pykafka/rdkafka/test_ssl.py rename to tests/pykafka_tests/rdkafka/test_ssl.py index 9e501d7f1..3145b0aff 100644 --- a/tests/pykafka/rdkafka/test_ssl.py +++ b/tests/pykafka_tests/rdkafka/test_ssl.py @@ -2,7 +2,7 @@ import pytest -from tests.pykafka import test_ssl +from tests.pykafka_tests import test_ssl @pytest.mark.skipif(platform.python_implementation() == "PyPy", diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka_tests/test_balancedconsumer.py similarity index 100% rename from tests/pykafka/test_balancedconsumer.py rename to tests/pykafka_tests/test_balancedconsumer.py diff --git a/tests/pykafka/test_cluster.py b/tests/pykafka_tests/test_cluster.py similarity index 100% rename from tests/pykafka/test_cluster.py rename to tests/pykafka_tests/test_cluster.py diff --git a/tests/pykafka/test_connection.py b/tests/pykafka_tests/test_connection.py similarity index 100% rename from tests/pykafka/test_connection.py rename to tests/pykafka_tests/test_connection.py diff --git a/tests/pykafka/test_partition.py b/tests/pykafka_tests/test_partition.py similarity index 100% rename from tests/pykafka/test_partition.py rename to tests/pykafka_tests/test_partition.py diff --git a/tests/pykafka/test_partitioners.py b/tests/pykafka_tests/test_partitioners.py similarity index 100% rename from tests/pykafka/test_partitioners.py rename to tests/pykafka_tests/test_partitioners.py diff --git a/tests/pykafka/test_producer.py b/tests/pykafka_tests/test_producer.py similarity index 100% rename from tests/pykafka/test_producer.py rename to tests/pykafka_tests/test_producer.py diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka_tests/test_protocol.py similarity index 100% rename from tests/pykafka/test_protocol.py rename to tests/pykafka_tests/test_protocol.py diff --git a/tests/pykafka/test_sasl.py b/tests/pykafka_tests/test_sasl.py similarity index 100% rename from tests/pykafka/test_sasl.py rename to tests/pykafka_tests/test_sasl.py diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka_tests/test_simpleconsumer.py similarity index 100% rename from tests/pykafka/test_simpleconsumer.py rename to tests/pykafka_tests/test_simpleconsumer.py diff --git a/tests/pykafka/test_ssl.py b/tests/pykafka_tests/test_ssl.py similarity index 100% rename from tests/pykafka/test_ssl.py rename to tests/pykafka_tests/test_ssl.py diff --git a/tests/pykafka/utils/__init__.py b/tests/pykafka_tests/utils/__init__.py similarity index 100% rename from tests/pykafka/utils/__init__.py rename to tests/pykafka_tests/utils/__init__.py diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka_tests/utils/test_compression.py similarity index 100% rename from tests/pykafka/utils/test_compression.py rename to tests/pykafka_tests/utils/test_compression.py diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka_tests/utils/test_struct_helpers.py similarity index 100% rename from tests/pykafka/utils/test_struct_helpers.py rename to tests/pykafka_tests/utils/test_struct_helpers.py From a11b7cea979cbf76633a7a0bc06a5c630e75027c Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 10 Oct 2019 10:01:21 +0200 Subject: [PATCH 43/46] add inheritance from object to sasl_authenticators classes --- pykafka/sasl_authenticators.py | 4 ++-- tox.ini | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py index d31531c65..7f573ef07 100644 --- a/pykafka/sasl_authenticators.py +++ b/pykafka/sasl_authenticators.py @@ -28,7 +28,7 @@ def xor_bytes(left, right): return bytes(lb ^ rb for lb, rb in zip(left, right)) -class BytesWrapper: +class BytesWrapper(object): """ Class that implements :meth:`get_bytes` and wraps some payload so it can be used for :meth:`connection.BrokerConnection.request` during legacy sasl authentication sequence. @@ -47,7 +47,7 @@ def get_bytes(self): return struct.pack("!i", len(self.payload)) + self.payload -class BaseAuthenticator: +class BaseAuthenticator(object): """ Base class for authentication mechanisms. Subclasses are supposed to implement: diff --git a/tox.ini b/tox.ini index 26ce880f6..f2afe3a01 100644 --- a/tox.ini +++ b/tox.ini @@ -8,4 +8,4 @@ deps = gevent: gevent==1.3.6 commands = py.test {posargs} -passenv = BROKERS BROKERS_SSL BROKERS_SASL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS +passenv = BROKERS BROKERS_SSL BROKERS_SASL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS HOSTS_FILE From 12399ce96031ae16e351b6bc31636d451ef298b6 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 10 Oct 2019 10:57:37 +0200 Subject: [PATCH 44/46] skip sasl rdkafka tests if it's not installed --- tests/pykafka_tests/rdkafka/test_sasl.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/pykafka_tests/rdkafka/test_sasl.py b/tests/pykafka_tests/rdkafka/test_sasl.py index 7bd290e55..72b208951 100644 --- a/tests/pykafka_tests/rdkafka/test_sasl.py +++ b/tests/pykafka_tests/rdkafka/test_sasl.py @@ -1,8 +1,15 @@ +import pytest from uuid import uuid4 from pykafka import KafkaClient +try: + from pykafka.rdkafka import _rd_kafka + RDKAFKA = True +except ImportError: + RDKAFKA = False # C extension not built +@pytest.mark.skipif(not RDKAFKA, reason="C extension for librdkafka not built.") def test_sasl_roundtrip_rdkafka(sasl_kafka, authenticator, kafka_version): client = KafkaClient(sasl_kafka.brokers_sasl, sasl_authenticator=authenticator, broker_version='.'.join(str(v) for v in kafka_version)) From 19d87f8286eb24fba98df2453f2b65db1ae996c8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 13:21:07 +0200 Subject: [PATCH 45/46] fix bug in SaslAuthenticateResponseV1 --- pykafka/protocol/sasl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py index 1dfa5808a..a0634e01a 100644 --- a/pykafka/protocol/sasl.py +++ b/pykafka/protocol/sasl.py @@ -178,6 +178,6 @@ def __init__(self, buff): response = struct_helpers.unpack_from(fmt, buff, 0) self.error_code = response[0] - self.error_message = response[1].decode() + self.error_message = response[1].decode() if response[1] is not None else None self.auth_bytes = response[2] self.session_lifetime_ms = response[3] From 73f8e8ef624efba72188b5c4b506944e6fac7235 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 16 Jan 2020 15:55:31 +0100 Subject: [PATCH 46/46] use sasl_authenticator also for offset channel --- pykafka/broker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pykafka/broker.py b/pykafka/broker.py index 93733ff5c..fc644c0da 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -287,7 +287,8 @@ def connect_offsets_channel(self, attempts=3): self.host, self.port, self._handler, buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port, - ssl_config=self._ssl_config) + ssl_config=self._ssl_config, + sasl_authenticator=self._sasl_authenticator) self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms, attempts=attempts) self._offsets_channel_req_handler = RequestHandler(