From b6f32b70d5837d8ce832bc5e8e9e8e2ea087950b Mon Sep 17 00:00:00 2001 From: Dieter De Paepe Date: Thu, 28 Feb 2019 11:06:38 +0100 Subject: [PATCH] Documentation clarifications. Remove unused pykafka.common.Message. --- pykafka/common.py | 14 +------------- pykafka/producer.py | 14 +++++++------- pykafka/protocol/message.py | 4 ++-- pykafka/simpleconsumer.py | 3 ++- pykafka/topic.py | 4 ++-- 5 files changed, 14 insertions(+), 25 deletions(-) diff --git a/pykafka/common.py b/pykafka/common.py index 0e55f3373..6db539f7e 100644 --- a/pykafka/common.py +++ b/pykafka/common.py @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -__all__ = ["Message", "CompressionType", "OffsetType"] +__all__ = ["CompressionType", "OffsetType"] import datetime as dt import logging @@ -26,18 +26,6 @@ EPOCH = dt.datetime(1970, 1, 1) -class Message(object): - """Message class. - - :ivar response_code: Response code from Kafka - :ivar topic: Originating topic - :ivar payload: Message payload - :ivar key: (optional) Message key - :ivar offset: Message offset - """ - __slots__ = [] - - class CompressionType(object): """Enum for the various compressions supported. diff --git a/pykafka/producer.py b/pykafka/producer.py index 6ff2c6dd5..9d9350041 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -372,11 +372,11 @@ def _produce_has_timed_out(self, start_time): return False return time.time() * 1000 - start_time > self._pending_timeout_ms - def produce(self, message, partition_key=None, timestamp=None): + def produce(self, value, partition_key=None, timestamp=None): """Produce a message. - :param message: The message to produce (use None to send null) - :type message: bytes + :param value: The value of the message to produce (use None to send null) + :type value: bytes :param partition_key: The key to use when deciding which partition to send this message to. This key is passed to the `partitioner`, which may or may not use it in deciding the partition. The default `RandomPartitioner` does not @@ -395,24 +395,24 @@ def produce(self, message, partition_key=None, timestamp=None): "but it got '%s'", type(partition_key), ) - if message is not None and type(message) is not bytes: + if value is not None and type(value) is not bytes: raise TypeError( "Producer.produce accepts a bytes object as message, but it " "got '%s'", - type(message), + type(value), ) if timestamp is not None and self._protocol_version < 1: raise RuntimeError("Producer.produce got a timestamp with protocol 0") if not self._running: raise ProducerStoppedException() if self._serializer is not None: - message, partition_key = self._serializer(message, partition_key) + value, partition_key = self._serializer(value, partition_key) partitions = list(self._topic.partitions.values()) partition_id = self._partitioner(partitions, partition_key).id msg = Message( - value=message, + value=value, partition_key=partition_key, partition_id=partition_id, timestamp=timestamp, diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 8b4724e42..9195f8de6 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -5,13 +5,13 @@ from six import integer_types from zlib import crc32 -from ..common import CompressionType, Message +from ..common import CompressionType from ..exceptions import MessageSetDecodeFailure from ..utils import Serializable, struct_helpers, compression from ..utils.compat import buffer -class Message(Message, Serializable): +class Message(Serializable): """Representation of a Kafka Message NOTE: Compression is handled in the protocol because of the way Kafka embeds diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index f2cf5e2fc..325cee537 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -53,7 +53,8 @@ class SimpleConsumer(object): """ - A non-balancing consumer for Kafka + A basic consumer for Kafka that does not automatically perform self-balancing (based on consumer_group). + A consumer can be used to retrieve :class:`pykafka.protocol.Message`s from a Kafka topic. """ def __init__(self, topic, diff --git a/pykafka/topic.py b/pykafka/topic.py index 996bf0be1..38be11765 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -249,7 +249,7 @@ def get_simple_consumer(self, """Return a SimpleConsumer of this topic :param consumer_group: The name of the consumer group to join - :type consumer_group: bytes + :type consumer_group: str :param use_rdkafka: Use librdkafka-backed consumer if available :type use_rdkafka: bool """ @@ -268,7 +268,7 @@ def get_balanced_consumer(self, consumer_group, managed=False, **kwargs): """Return a BalancedConsumer of this topic :param consumer_group: The name of the consumer group to join - :type consumer_group: bytes + :type consumer_group: str :param managed: If True, manage the consumer group with Kafka using the 0.9 group management api (requires Kafka >=0.9)) :type managed: bool