Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidirectional streaming for pubsub (async client) #742

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -39,12 +39,14 @@
AioRpcError,
)

from dapr.aio.clients.grpc.subscription import Subscription
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamInactiveError
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -94,6 +96,7 @@
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -482,6 +485,72 @@

return DaprResponse(await call.initial_metadata())

async def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.

Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
await subscription.start()
return subscription

async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.

Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

Check warning on line 532 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L532

Added line #L532 was not covered by tests

async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)

Check warning on line 541 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L534-L541

Added lines #L534 - L541 were not covered by tests
else:
continue
except StreamInactiveError:
break

Check warning on line 545 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L543-L545

Added lines #L543 - L545 were not covered by tests

async def close_subscription():
await subscription.close()

Check warning on line 548 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L547-L548

Added lines #L547 - L548 were not covered by tests

asyncio.create_task(stream_messages(subscription))

Check warning on line 550 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L550

Added line #L550 was not covered by tests

return close_subscription

Check warning on line 552 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L552

Added line #L552 was not covered by tests

async def get_state(
self,
store_name: str,
Expand Down
22 changes: 19 additions & 3 deletions dapr/aio/clients/grpc/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import namedtuple
from typing import List, Tuple

from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
from grpc.aio import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, ClientCallDetails # type: ignore

from dapr.conf import settings

Expand Down Expand Up @@ -51,7 +51,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
return continuation(client_call_details, request)


class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor, StreamStreamClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.

Expand Down Expand Up @@ -115,8 +115,24 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
return response

async def intercept_stream_stream(self, continuation, client_call_details, request):
"""This method intercepts a stream-stream gRPC call. This is the implementation of the
abstract method defined in StreamStreamClientInterceptor defined in grpc. This is invoked
automatically by grpc based on the order in which interceptors are added to the channel.

Args:
continuation: a callable to be invoked to continue with the RPC or next interceptor
client_call_details: a ClientCallDetails object describing the outgoing RPC
request: the request value for the RPC

# Pre-process or intercept call
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
Expand Down
110 changes: 110 additions & 0 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
from grpc import StatusCode
from grpc.aio import AioRpcError

from dapr.clients.grpc._response import TopicEventResponse
from dapr.clients.health import DaprHealth
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage
from dapr.proto import api_v1, appcallback_v1


class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
self._stub = stub
self._pubsub_name = pubsub_name
self._topic = topic
self._metadata = metadata or {}
self._dead_letter_topic = dead_letter_topic or ''
self._stream = None
self._send_queue = asyncio.Queue()
self._stream_active = asyncio.Event()

async def start(self):
async def outgoing_request_iterator():
try:
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
pubsub_name=self._pubsub_name,
topic=self._topic,
metadata=self._metadata,
dead_letter_topic=self._dead_letter_topic,
)
)
yield initial_request

while self._stream_active.is_set():
try:
response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0)
yield response
except asyncio.TimeoutError:
continue

Check warning on line 40 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L40

Added line #L40 was not covered by tests
except Exception as e:
raise Exception(f'Error while writing to stream: {e}')

Check warning on line 42 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L42

Added line #L42 was not covered by tests

self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
self._stream_active.set()
await self._stream.read() # discard the initial message

async def reconnect_stream(self):
await self.close()
DaprHealth.wait_until_ready()
print('Attempting to reconnect...')
await self.start()

async def next_message(self):
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')

try:
if self._stream is not None:
message = await self._stream.read()
if message is None:
return None

Check warning on line 62 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L62

Added line #L62 was not covered by tests
return SubscriptionMessage(message.event_message)
except AioRpcError as e:
if e.code() == StatusCode.UNAVAILABLE:
print(
f'gRPC error while reading from stream: {e.details()}, '
f'Status Code: {e.code()}. '
f'Attempting to reconnect...'
)
await self.reconnect_stream()
elif e.code() != StatusCode.CANCELLED:
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
except Exception as e:
raise Exception(f'Error while fetching message: {e}')

Check warning on line 75 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L72-L75

Added lines #L72 - L75 were not covered by tests

return None

async def respond(self, message, status):
try:
status = appcallback_v1.TopicEventResponse(status=status.value)
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
id=message.id(), status=status
)
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')

Check warning on line 87 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L87

Added line #L87 was not covered by tests
await self._send_queue.put(msg)
except Exception as e:
print(f"Can't send message: {e}")

Check warning on line 90 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L89-L90

Added lines #L89 - L90 were not covered by tests

async def respond_success(self, message):
await self.respond(message, TopicEventResponse('success').status)

async def respond_retry(self, message):
await self.respond(message, TopicEventResponse('retry').status)

Check warning on line 96 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L96

Added line #L96 was not covered by tests

async def respond_drop(self, message):
await self.respond(message, TopicEventResponse('drop').status)

Check warning on line 99 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L99

Added line #L99 was not covered by tests

async def close(self):
if self._stream:
try:
self._stream.cancel()
self._stream_active.clear()
except AioRpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

Check warning on line 110 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L106-L110

Added lines #L106 - L110 were not covered by tests
76 changes: 75 additions & 1 deletion dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import threading
import time
import socket
import json
Expand Down Expand Up @@ -41,6 +41,7 @@
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc.subscription import Subscription, StreamInactiveError
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
Expand Down Expand Up @@ -85,6 +86,7 @@
StartWorkflowResponse,
EncryptResponse,
DecryptResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -481,6 +483,78 @@ def publish_event(

return DaprResponse(call.initial_metadata())

def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.

Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
subscription.start()
return subscription

def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable:
"""
Subscribe to a topic with a bidirectional stream and a message handler function

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.
"""
subscription = self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

def stream_messages(sub):
while True:
try:
message = sub.next_message()
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue
except StreamInactiveError:
break

def close_subscription():
subscription.close()

streaming_thread = threading.Thread(target=stream_messages, args=(subscription,))
streaming_thread.start()

return close_subscription

def get_state(
self,
store_name: str,
Expand Down
Loading
Loading