Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Feature SASL SCRAM support #972

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d9222cf
enable loading hosts from file during testing
Oct 4, 2019
12503c9
make _kafka_version a tuple to make comparisons easier
Oct 4, 2019
ce10315
refactor server.properties generation for test brokers
Oct 4, 2019
0fe66e7
enable sasl for test broker
Oct 4, 2019
5889a99
add tests for plain and scram sasl authentication
Oct 4, 2019
5199db2
refactor test code
Oct 4, 2019
88a5bfc
add sasl handshake to protocol
Oct 4, 2019
e74099c
fix bug where queued.max.messages.kbytes is larger than its max value
Oct 7, 2019
97b277b
Add more output to test kafka startup
Oct 7, 2019
7bec7be
make sure get_cluster picks up new sasl endpoints
Oct 7, 2019
1bce4a4
fix kafka_version in sasl test
Oct 7, 2019
19c6e5b
add new sasl authentication mechanism
Oct 7, 2019
142c52f
create and use new exceptions
Oct 7, 2019
ec7e9b3
add security_protocol parameter to Authenticators
Oct 7, 2019
203f0b3
add docstrings to sasl authenticators
Oct 7, 2019
47b0927
add sasl config sections to docs
Oct 7, 2019
60468f6
implement SaslAuthenticate request and response
Oct 7, 2019
d078467
add advertisted listeners to test kafka config
Oct 7, 2019
4d1db54
fix wrong class names in protocol.sasl
Oct 7, 2019
f3c0d57
implement advanced authentication using new SASL API
Oct 7, 2019
ed889e0
add BROKERS_SASL to ci scripts
Oct 7, 2019
cc8d9b7
fix bug in protocol.sasl
Oct 7, 2019
6595497
add more output to authenticator
Oct 7, 2019
1ea009e
tie authenticators send and receive methods together
Oct 7, 2019
beaa6b8
improve docs and refactor pykafka.sasl_authenticators
Oct 7, 2019
823da3d
add -keyalg RSA to keystore generation in order to support newer ciphers
Oct 7, 2019
55dbc5b
add timeout when waiting for kafka cluster
Oct 8, 2019
46e981f
add log-level parameter to test kafka instance
Oct 8, 2019
9a57cac
print logs on failed kafka startup
Oct 8, 2019
ffe575c
increase timeout for kafka cluster and fail if cluster dies
Oct 8, 2019
5fc3418
increase timeout for kafka test instance startup
Oct 8, 2019
835be51
check if subprocesses are still running during test kafka startup
Oct 8, 2019
05824ce
add environment variable PYTHONUNBUFFERED=1 to travis.yaml
Oct 8, 2019
8e0b7bb
set line buffering for test kafka logfiles
Oct 8, 2019
77b5a8b
use logging.basicConfig
Oct 8, 2019
962aa31
keep track of all logs and procs and output logs if they fail
Oct 8, 2019
4f57c0d
add repair java version parser in kafka-run-class.sh for certain kafk…
Oct 9, 2019
c8e554f
bump librdkafka version
Oct 9, 2019
0a75ce0
switch tests from kafka 0.8 to kafka 2.0
Oct 9, 2019
7b55fab
remove pytest from test-requirements again
Oct 9, 2019
8b42a20
downgrade librdkafka to 0.11.3
Oct 10, 2019
55e2791
renamed test module to avoid naming conflict with pykafka
Oct 10, 2019
a11b7ce
add inheritance from object to sasl_authenticators classes
Oct 10, 2019
12399ce
skip sasl rdkafka tests if it's not installed
Oct 10, 2019
19d87f8
fix bug in SaslAuthenticateResponseV1
Oct 16, 2019
73f8e8e
use sasl_authenticator also for offset channel
Jan 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,39 @@ cache:
- $HOME/.ccache
matrix:
include:
- env: TOX_ENV=py27 KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py27 KAFKA_VERSION=2.0.0
python: 2.7
- env: TOX_ENV=py27 KAFKA_VERSION=1.0.1
python: 2.7
- env: TOX_ENV=py34 KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py34 KAFKA_VERSION=2.0.0
python: 3.4
- env: TOX_ENV=py34 KAFKA_VERSION=1.0.1
python: 3.4
- env: TOX_ENV=py35 KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py35 KAFKA_VERSION=2.0.0
python: 3.5
- env: TOX_ENV=py35 KAFKA_VERSION=1.0.1
python: 3.5
- env: TOX_ENV=py36 KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py36 KAFKA_VERSION=2.0.0
python: 3.6
- env: TOX_ENV=py36 KAFKA_VERSION=1.0.1
python: 3.6
- env: TOX_ENV=pypy KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=pypy KAFKA_VERSION=2.0.0
python: pypy
- env: TOX_ENV=pypy KAFKA_VERSION=1.0.1
python: pypy
- env: TOX_ENV=py27-gevent KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py27-gevent KAFKA_VERSION=2.0.0
python: 2.7
- env: TOX_ENV=py27-gevent KAFKA_VERSION=1.0.1
python: 2.7
- env: TOX_ENV=py36-gevent KAFKA_VERSION=0.8.2.2
- env: TOX_ENV=py36-gevent KAFKA_VERSION=2.0.0
python: 3.6
- env: TOX_ENV=py36-gevent KAFKA_VERSION=1.0.1
python: 3.6
env:
global:
- PATH="/usr/lib/ccache:$PATH"
- KAFKA_BIN="$HOME/kafka-bin"
- PYTHONUNBUFFERED=1

addons:
apt:
Expand Down Expand Up @@ -68,9 +69,9 @@ install:
fi
- pip install -U pip setuptools
- pip install codecov kazoo tox testinstances
- wget https://github.com/edenhill/librdkafka/archive/v0.9.5.tar.gz
- tar -xzf v0.9.5.tar.gz
- cd librdkafka-0.9.5/ && ./configure --prefix=$HOME
- wget https://github.com/edenhill/librdkafka/archive/v0.11.3.tar.gz
- tar -xzf v0.11.3.tar.gz
- cd librdkafka-0.11.3/ && ./configure --prefix=$HOME
- make -j 2 && make -j 2 install && cd -

before_script:
Expand All @@ -79,10 +80,28 @@ before_script:
- export LD_LIBRARY_PATH=$HOME/lib:$LD_LIBRARY_PATH
- export CFLAGS="-coverage"
- TEMPFILE=`tempfile`
- python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE &
- while true; do sleep 1; echo "Waiting for cluster..."; if [[ `grep ZOOKEEPER $TEMPFILE` ]]; then break; fi; done
- KAFKA_LOGS=`tempfile`
- python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE --log-level INFO 2>$KAFKA_LOGS &
- |
kafka_pid=$!
start=$(date +%s)
until grep ZOOKEEPER $TEMPFILE 1>/dev/null 2>/dev/null; do
sleep 1
echo "Waiting for cluster..."
if [[ $(($(date +%s) - start)) -gt 300 ]]; then
echo "Timeout waiting for cluster!"
cat $KAFKA_LOGS
exit 1
fi;
if ! kill -0 "$kafka_pid" >/dev/null 2>&1 ; then
echo "Kafka test cluster died during startup!"
cat $KAFKA_LOGS
exit 2
fi
done
- export `grep BROKERS $TEMPFILE`
- export `grep BROKERS_SSL $TEMPFILE`
- export `grep BROKERS_SASL $TEMPFILE`
- export `grep ZOOKEEPER $TEMPFILE`

script:
Expand Down
13 changes: 11 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ for further details):
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
... ssl_config=config)

Or, for SASL authenticated connection, you might write (and also see i.e. ``PlainAuthenticator`` dos for further details):

.. sourcecode:: python

>>> from pykafka import KafkaClient, PlainAuthenticator
>>> authenticator = PlainAuthenticator(user='alice', password='alice-secret')
>>> client = KafkaClient(hosts="127.0.0.1:<sasl-port>,...", sasl_authenticator=authenticator)
If the cluster you've connected to has any topics defined on it, you can list
them with:

Expand Down Expand Up @@ -178,8 +185,10 @@ of the librdkafka shared objects. You can find this location with `locate librdk

After that, all that's needed is that you pass an extra parameter
``use_rdkafka=True`` to ``topic.get_producer()``,
``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``. Note
that some configuration options may have different optimal values; it may be
``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``.
If you're using SASL authenticated connections, make sure to pass the ``security_protocol``
parameter to your authenticator so librdkafka knows which endpoint to authenticate with.
Note that some configuration options may have different optimal values; it may be
worthwhile to consult librdkafka's `configuration notes`_ for this.

.. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1
Expand Down
5 changes: 5 additions & 0 deletions doc/api/sasl_authenticators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pykafka.sasl_authenticators
================

.. automodule:: pykafka.sasl_authenticators
:members:
3 changes: 3 additions & 0 deletions pykafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .balancedconsumer import BalancedConsumer
from .managedbalancedconsumer import ManagedBalancedConsumer
from .membershipprotocol import RangeProtocol, RoundRobinProtocol
from .sasl_authenticators import PlainAuthenticator, ScramAuthenticator

__version__ = "2.8.1-dev.2"

Expand All @@ -28,6 +29,8 @@
"ManagedBalancedConsumer",
"RangeProtocol",
"RoundRobinProtocol",
"PlainAuthenticator",
"ScramAuthenticator"
]

logging.getLogger(__name__).addHandler(logging.NullHandler())
30 changes: 28 additions & 2 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self,
source_host='',
source_port=0,
ssl_config=None,
sasl_authenticator=None,
broker_version="0.9.0",
api_versions=None):
"""Create a Broker instance.
Expand Down Expand Up @@ -92,6 +93,8 @@ def __init__(self,
:type source_port: int
:param ssl_config: Config object for SSL connection
:type ssl_config: :class:`pykafka.connection.SslConfig`
:param sasl_authenticator: Authenticator to use for authentication using sasl.
:type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
:param broker_version: The protocol version of the cluster being connected to.
If this parameter doesn't match the actual broker version, some pykafka
features may not work properly.
Expand All @@ -117,6 +120,7 @@ def __init__(self,
self._req_handlers = {}
self._broker_version = broker_version
self._api_versions = api_versions
self._sasl_authenticator = sasl_authenticator
try:
self.connect()
except SocketDisconnectedError:
Expand Down Expand Up @@ -144,6 +148,7 @@ def from_metadata(cls,
source_host='',
source_port=0,
ssl_config=None,
sasl_authenticator=None,
broker_version="0.9.0",
api_versions=None):
"""Create a Broker using BrokerMetadata
Expand All @@ -169,6 +174,8 @@ def from_metadata(cls,
:type source_port: int
:param ssl_config: Config object for SSL connection
:type ssl_config: :class:`pykafka.connection.SslConfig`
:param sasl_authenticator: Authenticator to use for authentication using sasl.
:type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
:param broker_version: The protocol version of the cluster being connected to.
If this parameter doesn't match the actual broker version, some pykafka
features may not work properly.
Expand All @@ -184,6 +191,7 @@ def from_metadata(cls,
source_host=source_host,
source_port=source_port,
ssl_config=ssl_config,
sasl_authenticator=sasl_authenticator,
broker_version=broker_version,
api_versions=api_versions)

Expand All @@ -203,6 +211,22 @@ def offsets_channel_connected(self):
return self._offsets_channel_connection.connected
return False

@property
def authenticated(self):
"""Returns True if this object's main connection to the Kafka broker
is authenticated
"""
return self._connection.authenticated

@property
def offsets_channel_authenticated(self):
"""Returns True if this object's offsets channel connection to the
Kafka broker is authenticated
"""
if self._offsets_channel_connection:
return self._offsets_channel_connection.authenticated
return False

@property
def id(self):
"""The broker's ID within the Kafka cluster"""
Expand Down Expand Up @@ -246,7 +270,8 @@ def connect(self, attempts=3):
buffer_size=self._buffer_size,
source_host=self._source_host,
source_port=self._source_port,
ssl_config=self._ssl_config)
ssl_config=self._ssl_config,
sasl_authenticator=self._sasl_authenticator)
self._connection.connect(self._socket_timeout_ms, attempts=attempts)
self._req_handler = RequestHandler(self._handler, self._connection)
self._req_handler.start()
Expand All @@ -262,7 +287,8 @@ def connect_offsets_channel(self, attempts=3):
self.host, self.port, self._handler,
buffer_size=self._buffer_size,
source_host=self._source_host, source_port=self._source_port,
ssl_config=self._ssl_config)
ssl_config=self._ssl_config,
sasl_authenticator=self._sasl_authenticator)
self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms,
attempts=attempts)
self._offsets_channel_req_handler = RequestHandler(
Expand Down
4 changes: 4 additions & 0 deletions pykafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self,
exclude_internal_topics=True,
source_address='',
ssl_config=None,
sasl_authenticator=None,
broker_version='0.9.0'):
"""Create a connection to a Kafka cluster.

Expand Down Expand Up @@ -118,6 +119,8 @@ def __init__(self,
:type source_address: str `'host:port'`
:param ssl_config: Config object for SSL connection
:type ssl_config: :class:`pykafka.connection.SslConfig`
:param sasl_authenticator: Authenticator to use for authentication using sasl.
:type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
:param broker_version: The protocol version of the cluster being connected to.
If this parameter doesn't match the actual broker version, some pykafka
features may not work properly.
Expand All @@ -139,6 +142,7 @@ def __init__(self,
source_address=self._source_address,
zookeeper_hosts=zookeeper_hosts,
ssl_config=ssl_config,
sasl_authenticator=sasl_authenticator,
broker_version=broker_version)
self.brokers = self.cluster.brokers
self.topics = self.cluster.topics
Expand Down
6 changes: 6 additions & 0 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def __init__(self,
source_address='',
zookeeper_hosts=None,
ssl_config=None,
sasl_authenticator=None,
broker_version='0.9.0'):
"""Create a new Cluster instance.

Expand All @@ -199,6 +200,8 @@ def __init__(self,
:type source_address: str `'host:port'`
:param ssl_config: Config object for SSL connection
:type ssl_config: :class:`pykafka.connection.SslConfig`
:param sasl_authenticator: Authenticator to use for authentication using sasl.
:type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
:param broker_version: The protocol version of the cluster being connected to.
If this parameter doesn't match the actual broker version, some pykafka
features may not work properly.
Expand All @@ -214,6 +217,7 @@ def __init__(self,
self._source_host = self._source_address.split(':')[0]
self._source_port = 0
self._ssl_config = ssl_config
self.sasl_authenticator = sasl_authenticator
self._zookeeper_connect = zookeeper_hosts
self._max_connection_retries = 3
self._max_connection_retries_offset_mgr = 8
Expand Down Expand Up @@ -284,6 +288,7 @@ def _request_random_broker(self, broker_connects, req_fn):
source_host=self._source_host,
source_port=self._source_port,
ssl_config=self._ssl_config,
sasl_authenticator=self.sasl_authenticator,
broker_version=self._broker_version,
api_versions=self._api_versions)
response = req_fn(broker)
Expand Down Expand Up @@ -402,6 +407,7 @@ def _update_brokers(self, broker_metadata, controller_id):
source_host=self._source_host,
source_port=self._source_port,
ssl_config=self._ssl_config,
sasl_authenticator=self.sasl_authenticator,
broker_version=self._broker_version,
api_versions=self._api_versions)
elif not self._brokers[id_].connected:
Expand Down
21 changes: 18 additions & 3 deletions pykafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def __init__(self,
buffer_size=1024 * 1024,
source_host='',
source_port=0,
ssl_config=None):
ssl_config=None,
sasl_authenticator=None):
"""Initialize a socket connection to Kafka.

:param host: The host to which to connect
Expand All @@ -139,6 +140,8 @@ def __init__(self,
:type source_port: int
:param ssl_config: Config object for SSL connection
:type ssl_config: :class:`pykafka.connection.SslConfig`
:param sasl_authenticator: Authenticator to use for authentication using sasl.
:type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
"""
self._buff = bytearray(buffer_size)
self.host = host
Expand All @@ -149,6 +152,8 @@ def __init__(self,
self.source_port = source_port
self._wrap_socket = (
ssl_config.wrap_socket if ssl_config else lambda x: x)
self._sasl_authenticator = sasl_authenticator
self.authenticated = sasl_authenticator is None

def __del__(self):
"""Close this connection when the object is deleted."""
Expand All @@ -161,6 +166,7 @@ def connected(self):

def connect(self, timeout, attempts=1):
"""Connect to the broker, retrying if specified."""
self.authenticated = False
log.debug("Connecting to %s:%s", self.host, self.port)
for attempt in range(0, attempts):
try:
Expand All @@ -172,6 +178,9 @@ def connect(self, timeout, attempts=1):
)
)
log.debug("Successfully connected to %s:%s", self.host, self.port)
if self._sasl_authenticator is not None:
self._sasl_authenticator.authenticate(self)
self.authenticated = True
return
except (self._handler.SockErr, self._handler.GaiError) as err:
log.info("Attempt %s: failed to connect to %s:%s", attempt, self.host, self.port)
Expand All @@ -193,6 +202,7 @@ def disconnect(self):
pass
finally:
self._socket = None
self.authenticated = False

def reconnect(self):
"""Disconnect from the broker, then reconnect"""
Expand All @@ -203,6 +213,7 @@ def request(self, request):
"""Send a request over the socket connection"""
bytes_ = request.get_bytes()
if not self._socket:
self.authenticated = False
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
try:
self._socket.sendall(bytes_)
Expand All @@ -211,7 +222,7 @@ def request(self, request):
self.disconnect()
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))

def response(self):
def response_raw(self):
"""Wait for a response from the broker"""
size = bytes()
expected_len = 4 # Size => int32
Expand All @@ -231,5 +242,9 @@ def response(self):
except SocketDisconnectedError:
self.disconnect()
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
return self._buff[:size]

def response(self):
# Drop CorrelationId => int32
return buffer(self._buff[4:4 + size])
return buffer(self.response_raw()[4:])

Loading