Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Documentation clarifications. Remove unused pykafka.common.Message. #922

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions pykafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
__all__ = ["Message", "CompressionType", "OffsetType"]
__all__ = ["CompressionType", "OffsetType"]
import datetime as dt
import logging

Expand All @@ -26,18 +26,6 @@
EPOCH = dt.datetime(1970, 1, 1)


class Message(object):
"""Message class.

:ivar response_code: Response code from Kafka
:ivar topic: Originating topic
:ivar payload: Message payload
:ivar key: (optional) Message key
:ivar offset: Message offset
"""
__slots__ = []


class CompressionType(object):
"""Enum for the various compressions supported.

Expand Down
14 changes: 7 additions & 7 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,11 @@ def _produce_has_timed_out(self, start_time):
return False
return time.time() * 1000 - start_time > self._pending_timeout_ms

def produce(self, message, partition_key=None, timestamp=None):
def produce(self, value, partition_key=None, timestamp=None):
"""Produce a message.

:param message: The message to produce (use None to send null)
:type message: bytes
:param value: The value of the message to produce (use None to send null)
:type value: bytes
:param partition_key: The key to use when deciding which partition to send this
message to. This key is passed to the `partitioner`, which may or may not
use it in deciding the partition. The default `RandomPartitioner` does not
Expand All @@ -395,24 +395,24 @@ def produce(self, message, partition_key=None, timestamp=None):
"but it got '%s'",
type(partition_key),
)
if message is not None and type(message) is not bytes:
if value is not None and type(value) is not bytes:
raise TypeError(
"Producer.produce accepts a bytes object as message, but it "
"got '%s'",
type(message),
type(value),
)
if timestamp is not None and self._protocol_version < 1:
raise RuntimeError("Producer.produce got a timestamp with protocol 0")
if not self._running:
raise ProducerStoppedException()
if self._serializer is not None:
message, partition_key = self._serializer(message, partition_key)
value, partition_key = self._serializer(value, partition_key)

partitions = list(self._topic.partitions.values())
partition_id = self._partitioner(partitions, partition_key).id

msg = Message(
value=message,
value=value,
partition_key=partition_key,
partition_id=partition_id,
timestamp=timestamp,
Expand Down
4 changes: 2 additions & 2 deletions pykafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from six import integer_types
from zlib import crc32

from ..common import CompressionType, Message
from ..common import CompressionType
from ..exceptions import MessageSetDecodeFailure
from ..utils import Serializable, struct_helpers, compression
from ..utils.compat import buffer


class Message(Message, Serializable):
class Message(Serializable):
"""Representation of a Kafka Message

NOTE: Compression is handled in the protocol because of the way Kafka embeds
Expand Down
3 changes: 2 additions & 1 deletion pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@

class SimpleConsumer(object):
"""
A non-balancing consumer for Kafka
A basic consumer for Kafka that does not automatically perform self-balancing (based on consumer_group).
A consumer can be used to retrieve :class:`pykafka.protocol.Message`s from a Kafka topic.
"""
def __init__(self,
topic,
Expand Down
4 changes: 2 additions & 2 deletions pykafka/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def get_simple_consumer(self,
"""Return a SimpleConsumer of this topic

:param consumer_group: The name of the consumer group to join
:type consumer_group: bytes
:type consumer_group: str
:param use_rdkafka: Use librdkafka-backed consumer if available
:type use_rdkafka: bool
"""
Expand All @@ -268,7 +268,7 @@ def get_balanced_consumer(self, consumer_group, managed=False, **kwargs):
"""Return a BalancedConsumer of this topic

:param consumer_group: The name of the consumer group to join
:type consumer_group: bytes
:type consumer_group: str
:param managed: If True, manage the consumer group with Kafka using the 0.9
group management api (requires Kafka >=0.9))
:type managed: bool
Expand Down