Skip to content

Commit

Permalink
Remove ack_timeout [minor] (#107)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
ric-evans and github-actions authored Jul 26, 2023
1 parent 197fb05 commit c4b9491
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 138 deletions.
22 changes: 4 additions & 18 deletions dependencies-integration.log
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,24 @@
#
aio-pika==9.1.5
# via wipac-keycloak-rest-services
aiormq==6.7.6
aiormq==6.7.7
# via aio-pika
anyio==3.7.1
# via httpcore
cachetools==5.3.1
# via wipac-rest-tools
certifi==2023.7.22
# via
# httpcore
# requests
# via requests
cffi==1.15.1
# via cryptography
charset-normalizer==3.2.0
# via requests
cryptography==41.0.2
# via pyjwt
dnspython==2.4.0
dnspython==2.4.1
# via pymongo
execnet==2.0.2
# via pytest-xdist
h11==0.14.0
# via httpcore
httpcore==0.17.3
# via dnspython
idna==3.4
# via
# anyio
# requests
# yarl
iniconfig==2.0.0
Expand Down Expand Up @@ -73,11 +64,6 @@ requests==2.31.0
# wipac-rest-tools
requests-futures==1.0.1
# via wipac-rest-tools
sniffio==1.3.0
# via
# anyio
# dnspython
# httpcore
tornado==6.3.2
# via wipac-rest-tools
typing-extensions==4.7.1
Expand All @@ -91,7 +77,7 @@ wipac-dev-tools==1.6.16
# oms-mqclient (setup.py)
# wipac-keycloak-rest-services
# wipac-rest-tools
wipac-keycloak-rest-services==1.4.16
wipac-keycloak-rest-services==1.4.18
# via oms-mqclient (setup.py)
wipac-rest-tools==1.4.20
# via
Expand Down
19 changes: 3 additions & 16 deletions dependencies-mypy.log
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
#
aio-pika==9.1.5
# via wipac-keycloak-rest-services
aiormq==6.7.6
aiormq==6.7.7
# via aio-pika
anyio==3.7.1
# via httpcore
asyncstdlib==3.10.8
# via oms-mqclient (setup.py)
backoff==2.2.1
Expand All @@ -18,7 +16,6 @@ cachetools==5.3.1
# via wipac-rest-tools
certifi==2023.7.22
# via
# httpcore
# pulsar-client
# requests
cffi==1.15.1
Expand All @@ -35,7 +32,7 @@ deprecated==1.2.14
# via
# opentelemetry-api
# opentelemetry-exporter-otlp-proto-http
dnspython==2.4.0
dnspython==2.4.1
# via pymongo
ed25519==1.5
# via nkeys
Expand All @@ -47,15 +44,10 @@ googleapis-common-protos==1.56.2
# opentelemetry-exporter-otlp-proto-http
grpcio==1.56.2
# via opentelemetry-exporter-jaeger-proto-grpc
h11==0.14.0
# via httpcore
httpcore==0.17.3
# via dnspython
humanfriendly==10.0
# via coloredlogs
idna==3.4
# via
# anyio
# requests
# yarl
importlib-metadata==6.8.0
Expand Down Expand Up @@ -157,11 +149,6 @@ requests-futures==1.0.1
# via wipac-rest-tools
six==1.16.0
# via thrift
sniffio==1.3.0
# via
# anyio
# dnspython
# httpcore
thrift==0.16.0
# via opentelemetry-exporter-jaeger-thrift
tornado==6.3.2
Expand All @@ -181,7 +168,7 @@ wipac-dev-tools==1.6.16
# wipac-keycloak-rest-services
# wipac-rest-tools
# wipac-telemetry
wipac-keycloak-rest-services==1.4.16
wipac-keycloak-rest-services==1.4.18
# via oms-mqclient (setup.py)
wipac-rest-tools==1.4.20
# via
Expand Down
2 changes: 0 additions & 2 deletions mqclient/broker_client_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ async def create_pub_queue(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
) -> Pub:
"""Create a publishing queue."""
raise NotImplementedError()
Expand All @@ -224,7 +223,6 @@ async def create_sub_queue(
name: str,
prefetch: int,
auth_token: str,
ack_timeout: Optional[int],
) -> Sub:
"""Create a subscription queue."""
raise NotImplementedError()
28 changes: 12 additions & 16 deletions mqclient/broker_clients/apachepulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import functools
import logging
import os
from typing import AsyncGenerator, Optional

import pulsar # type: ignore
Expand Down Expand Up @@ -33,7 +34,6 @@ def __init__(
address: str,
topic: str,
auth_token: str,
ack_timeout: Optional[int],
) -> None:
"""Set address, topic, and client.
Expand All @@ -52,7 +52,6 @@ def __init__(
self.client: pulsar.Client = None
self.auth = pulsar.AuthenticationToken(auth_token) if auth_token else None
self._auth_token = auth_token
self.ack_timeout = ack_timeout

async def connect(self) -> None:
"""Set up client."""
Expand Down Expand Up @@ -86,10 +85,9 @@ def __init__(
address: str,
topic: str,
auth_token: str,
ack_timeout: Optional[int],
) -> None:
LOGGER.debug(f"{log_msgs.INIT_PUB} ({address}; {topic})")
super().__init__(address, topic, auth_token, ack_timeout)
super().__init__(address, topic, auth_token)
self.producer: pulsar.Producer = None

async def connect(self) -> None:
Expand All @@ -104,7 +102,6 @@ async def connect(self) -> None:
self.topic,
BrokerClient.SUBSCRIPTION_NAME,
self._auth_token,
self.ack_timeout,
prefetch=1,
)
await inner_sub.connect()
Expand Down Expand Up @@ -172,11 +169,10 @@ def __init__(
topic: str,
subscription_name: str,
auth_token: str,
ack_timeout: Optional[int],
prefetch: int,
) -> None:
LOGGER.debug(f"{log_msgs.INIT_SUB} ({address}; {topic})")
super().__init__(address, topic, auth_token, ack_timeout)
super().__init__(address, topic, auth_token)
self.consumer: pulsar.Consumer = None
self.subscription_name = subscription_name
self.prefetch = prefetch
Expand All @@ -186,16 +182,20 @@ async def connect(self) -> None:
LOGGER.debug(log_msgs.CONNECTING_SUB)
await super().connect()

ack_timeout: Optional[int] # for mypy
try:
ack_timeout = int(os.environ["PULSAR_UNACKED_MESSAGES_TIMEOUT_SEC"])
if ack_timeout < 10:
ack_timeout = None
except: # noqa: E722
ack_timeout = None

self.consumer = self.client.subscribe(
self.topic,
self.subscription_name,
# Neither receive with timeout nor partitioned topics can be used if the consumer queue size is zero.
receiver_queue_size=max(self.prefetch, 1),
unacked_messages_timeout_ms=(
self.ack_timeout * 1000
if self.ack_timeout and self.ack_timeout > 10
else None
),
unacked_messages_timeout_ms=ack_timeout,
consumer_type=pulsar.ConsumerType.Shared,
initial_position=pulsar.InitialPosition.Earliest,
negative_ack_redelivery_delay_ms=0,
Expand Down Expand Up @@ -419,14 +419,12 @@ async def create_pub_queue(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
) -> PulsarPub:
"""Create a publishing queue."""
q = PulsarPub( # pylint: disable=invalid-name
address,
name,
auth_token,
ack_timeout,
)
await q.connect()
return q
Expand All @@ -437,15 +435,13 @@ async def create_sub_queue(
name: str,
prefetch: int,
auth_token: str,
ack_timeout: Optional[int],
) -> PulsarSub:
"""Create a subscription queue."""
q = PulsarSub( # pylint: disable=invalid-name
address,
name,
BrokerClient.SUBSCRIPTION_NAME,
auth_token,
ack_timeout,
prefetch,
)
await q.connect()
Expand Down
8 changes: 6 additions & 2 deletions mqclient/broker_clients/nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,14 @@ async def create_pub_queue(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
) -> NATSPub:
"""Create a publishing queue.
# NOTE - `auth_token` is not used currently
"""
if auth_token:
LOGGER.warning("NATS broker client does not use 'auth_token'")

q = NATSPub( # pylint: disable=invalid-name
address,
name + "-stream",
Expand All @@ -435,12 +437,14 @@ async def create_sub_queue(
name: str,
prefetch: int,
auth_token: str,
ack_timeout: Optional[int],
) -> NATSSub:
"""Create a subscription queue.
# NOTE - `auth_token` is not used currently
"""
if auth_token:
LOGGER.warning("NATS broker client does not use 'auth_token'")

q = NATSSub( # pylint: disable=invalid-name
address,
name + "-stream",
Expand Down
20 changes: 7 additions & 13 deletions mqclient/broker_clients/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,15 @@ def __init__(
address: str,
queue: str,
auth_token: str,
ack_timeout: Optional[int],
) -> None:
super().__init__()
LOGGER.info(f"Requested MQClient for queue '{queue}' @ {address}")
cp_args, username, password = _parse_url(address)
cp_args, _user, _pass = _parse_url(address)

# set up connection parameters
if creds := _get_credentials(username, password, auth_token):
if creds := _get_credentials(_user, _pass, auth_token):
cp_args["credentials"] = creds
if ack_timeout:
cp_args["heartbeat"] = ack_timeout

self.parameters = pika.connection.ConnectionParameters(**cp_args)

self.queue = queue
Expand Down Expand Up @@ -147,10 +145,9 @@ def __init__(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
) -> None:
LOGGER.debug(f"{log_msgs.INIT_PUB} ({address}; {name})")
super().__init__(address, name, auth_token, ack_timeout)
super().__init__(address, name, auth_token)

async def connect(self) -> None:
"""Set up connection, channel, and queue.
Expand Down Expand Up @@ -237,11 +234,10 @@ def __init__(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
prefetch: int,
) -> None:
LOGGER.debug(f"{log_msgs.INIT_SUB} ({address}; {name})")
super().__init__(address, name, auth_token, ack_timeout)
super().__init__(address, name, auth_token)
self.consumer_id = None
self.prefetch = prefetch

Expand Down Expand Up @@ -501,7 +497,6 @@ async def create_pub_queue(
address: str,
name: str,
auth_token: str,
ack_timeout: Optional[int],
) -> RabbitMQPub:
"""Create a publishing queue.
Expand All @@ -513,7 +508,7 @@ async def create_pub_queue(
RawQueue: queue
"""
# pylint: disable=invalid-name
q = RabbitMQPub(address, name, auth_token, ack_timeout)
q = RabbitMQPub(address, name, auth_token)
await q.connect()
return q

Expand All @@ -523,7 +518,6 @@ async def create_sub_queue(
name: str,
prefetch: int,
auth_token: str,
ack_timeout: Optional[int],
) -> RabbitMQSub:
"""Create a subscription queue.
Expand All @@ -535,6 +529,6 @@ async def create_sub_queue(
RawQueue: queue
"""
# pylint: disable=invalid-name
q = RabbitMQSub(address, name, auth_token, ack_timeout, prefetch=prefetch)
q = RabbitMQSub(address, name, auth_token, prefetch)
await q.connect()
return q
Loading

0 comments on commit c4b9491

Please sign in to comment.