Skip to content

Commit

Permalink
Improved Timeout & Retries [minor] (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
ric-evans and github-actions authored Jul 11, 2023
1 parent 0aabd9b commit 9982036
Show file tree
Hide file tree
Showing 20 changed files with 821 additions and 308 deletions.
36 changes: 27 additions & 9 deletions mqclient/broker_client_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@

MessageID = Union[int, str, bytes]

TIMEOUT_MILLIS_DEFAULT = 1000 # milliseconds
RETRY_DELAY = 1 # seconds
TRY_ATTEMPTS = 3 # ex: 3 means 1 initial try and 2 retries


class ConnectingFailedException(Exception):
"""Raised when a `connect()` fails."""
Expand Down Expand Up @@ -122,7 +118,12 @@ async def close(self) -> None:
class Pub(RawQueue):
"""Publisher queue."""

async def send_message(self, msg: bytes) -> None:
async def send_message(
self,
msg: bytes,
retries: int,
retry_delay: int,
) -> None:
"""Send a message on a queue."""
raise NotImplementedError()

Expand All @@ -142,21 +143,38 @@ def _to_message(*args: Any) -> Optional[Message]:
raise NotImplementedError()

async def get_message(
self, timeout_millis: Optional[int] = TIMEOUT_MILLIS_DEFAULT
self,
timeout_millis: Optional[int],
retries: int,
retry_delay: int,
) -> Optional[Message]:
"""Get a single message from a queue."""
raise NotImplementedError()

async def ack_message(self, msg: Message) -> None:
async def ack_message(
self,
msg: Message,
retries: int,
retry_delay: int,
) -> None:
"""Ack a message from the queue."""
raise NotImplementedError()

async def reject_message(self, msg: Message) -> None:
async def reject_message(
self,
msg: Message,
retries: int,
retry_delay: int,
) -> None:
"""Reject (nack) a message from the queue."""
raise NotImplementedError()

def message_generator( # NOTE: no `async` b/c it's abstract; overriding methods will need `async`
self, timeout: int = 60, propagate_error: bool = True
self,
timeout: int,
propagate_error: bool,
retries: int,
retry_delay: int,
) -> AsyncGenerator[Optional[Message], None]:
"""Yield Messages.
Expand Down
146 changes: 97 additions & 49 deletions mqclient/broker_clients/apachepulsar.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
"""Back-end using Apache Pulsar."""

import asyncio
import functools
import logging
import time
from typing import AsyncGenerator, Optional

import pulsar # type: ignore

from .. import broker_client_interface, log_msgs
from ..broker_client_interface import (
RETRY_DELAY,
TIMEOUT_MILLIS_DEFAULT,
TRY_ATTEMPTS,
AlreadyClosedException,
ClosingFailedException,
Message,
Pub,
RawQueue,
Sub,
)
from . import utils

LOGGER = logging.getLogger("mqclient.pulsar")

Expand Down Expand Up @@ -123,13 +121,28 @@ async def close(self) -> None:
raise ClosingFailedException("No producer to sub.")
LOGGER.debug(log_msgs.CLOSED_PUB)

async def send_message(self, msg: bytes) -> None:
async def send_message(
self,
msg: bytes,
retries: int,
retry_delay: int,
) -> None:
"""Send a message on a queue."""
LOGGER.debug(log_msgs.SENDING_MESSAGE)
if not self.producer:
raise RuntimeError("queue is not connected")

self.producer.send(msg)
await utils.auto_retry_call(
func=functools.partial(
self.producer.send,
msg,
),
retries=retries,
retry_delay=retry_delay,
close=self.close,
connect=self.connect,
logger=LOGGER,
)
LOGGER.debug(log_msgs.SENT_MESSAGE)


Expand Down Expand Up @@ -204,7 +217,10 @@ def _to_message( # type: ignore[override] # noqa: F821 # pylint: disable=W0221
return Message(id_, data)

async def get_message(
self, timeout_millis: Optional[int] = TIMEOUT_MILLIS_DEFAULT
self,
timeout_millis: Optional[int],
retries: int,
retry_delay: int,
) -> Optional[Message]:
"""Get a single message from a queue.
Expand All @@ -215,69 +231,97 @@ async def get_message(
if not self.consumer:
raise RuntimeError("queue is not connected")

for i in range(TRY_ATTEMPTS):
if i > 0:
LOGGER.debug(
f"{log_msgs.GETMSG_CONNECTION_ERROR_TRY_AGAIN} (attempt #{i+1})..."
)

try:
recvd = self.consumer.receive(timeout_millis=timeout_millis)
msg = PulsarSub._to_message(recvd)
if msg:
LOGGER.debug(f"{log_msgs.GETMSG_RECEIVED_MESSAGE} ({msg}).")
return msg
else:
LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE)
return None

except Exception as e:
# https://github.com/apache/pulsar/issues/3127
if str(e) == "Pulsar error: TimeOut":
LOGGER.debug(log_msgs.GETMSG_TIMEOUT_ERROR)
return None
# https://github.com/apache/pulsar/issues/3127
if str(e) == "Pulsar error: AlreadyClosed":
await self.close()
time.sleep(RETRY_DELAY)
await self.connect()
continue
LOGGER.debug(
f"{log_msgs.GETMSG_RAISE_OTHER_ERROR} ({e.__class__.__name__})."
)
raise

LOGGER.debug(log_msgs.GETMSG_CONNECTION_ERROR_MAX_RETRIES)
raise Exception("Pulsar connection error")
try:
pulsar_msg = await utils.auto_retry_call(
func=functools.partial(
self.consumer.receive,
timeout_millis=timeout_millis,
),
retries=retries,
retry_delay=retry_delay,
close=self.close,
connect=self.connect,
logger=LOGGER,
nonretriable_conditions=lambda e: str(e) == "Pulsar error: TimeOut",
)
except Exception as e:
# https://github.com/apache/pulsar/issues/3127
# consumer timed out so there's nothing left in the tube
if str(e) == "Pulsar error: TimeOut":
LOGGER.debug(log_msgs.GETMSG_TIMEOUT_ERROR)
return None
raise
if msg := PulsarSub._to_message(pulsar_msg):
LOGGER.debug(f"{log_msgs.GETMSG_RECEIVED_MESSAGE} ({msg}).")
return msg
else:
LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE)
return None

async def ack_message(self, msg: Message) -> None:
async def ack_message(
self,
msg: Message,
retries: int,
retry_delay: int,
) -> None:
"""Ack a message from the queue."""
LOGGER.debug(log_msgs.ACKING_MESSAGE)
if not self.consumer:
raise RuntimeError("queue is not connected")

if isinstance(msg.msg_id, bytes):
self.consumer.acknowledge(pulsar.MessageId.deserialize(msg.msg_id))
pulsar_msg = pulsar.MessageId.deserialize(msg.msg_id)
else:
self.consumer.acknowledge(msg.msg_id)
pulsar_msg = msg.msg_id

await utils.auto_retry_call(
func=functools.partial(
self.consumer.acknowledge,
pulsar_msg,
),
retries=retries,
retry_delay=retry_delay,
close=self.close,
connect=self.connect,
logger=LOGGER,
)
LOGGER.debug(f"{log_msgs.ACKED_MESSAGE} ({msg}).")

async def reject_message(self, msg: Message) -> None:
async def reject_message(
self,
msg: Message,
retries: int,
retry_delay: int,
) -> None:
"""Reject (nack) a message from the queue."""
LOGGER.debug(log_msgs.NACKING_MESSAGE)
if not self.consumer:
raise RuntimeError("queue is not connected")

if isinstance(msg.msg_id, bytes):
self.consumer.negative_acknowledge(pulsar.MessageId.deserialize(msg.msg_id))
pulsar_msg = pulsar.MessageId.deserialize(msg.msg_id)
else:
self.consumer.negative_acknowledge(msg.msg_id)
pulsar_msg = msg.msg_id

await utils.auto_retry_call(
func=functools.partial(
self.consumer.negative_acknowledge,
pulsar_msg,
),
retries=retries,
retry_delay=retry_delay,
close=self.close,
connect=self.connect,
logger=LOGGER,
)
LOGGER.debug(f"{log_msgs.NACKED_MESSAGE} ({msg}).")

async def message_generator(
self, timeout: int = 60, propagate_error: bool = True
self,
timeout: int,
propagate_error: bool,
retries: int,
retry_delay: int,
) -> AsyncGenerator[Optional[Message], None]:
"""Yield Messages.
Expand All @@ -297,7 +341,11 @@ async def message_generator(
while True:
# get message
LOGGER.debug(log_msgs.MSGGEN_GET_NEW_MESSAGE)
msg = await self.get_message(timeout_millis=timeout * 1000)
msg = await self.get_message(
timeout_millis=timeout * 1000,
retries=retries,
retry_delay=retry_delay,
)
if msg is None:
LOGGER.info(log_msgs.MSGGEN_NO_MESSAGE_LOOK_BACK_IN_QUEUE)
break
Expand Down
Loading

0 comments on commit 9982036

Please sign in to comment.