From a59356cf8191d09bcb4eed3e251ab6f7c57580d9 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Wed, 7 Feb 2024 23:19:23 +0000 Subject: [PATCH 1/3] Adds error handling for the async client Signed-off-by: Elena Kolevska --- dapr/aio/clients/grpc/client.py | 128 ++++++++++++------- tests/clients/test_dapr_async_grpc_client.py | 69 +++++++++- 2 files changed, 147 insertions(+), 50 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 498bed9a..bbbb51cb 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -36,9 +36,10 @@ UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor, + AioRpcError, ) -from dapr.clients.exceptions import DaprInternalError +from dapr.clients.exceptions import DaprInternalError, DaprGrpcError from dapr.clients.grpc._state import StateOptions, StateItem from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus from dapr.conf.helpers import GrpcEndpoint @@ -446,11 +447,13 @@ async def publish_event( metadata=publish_metadata, ) - call = self._stub.PublishEvent(req, metadata=metadata) - # response is google.protobuf.Empty - await call - - return DaprResponse(await call.initial_metadata()) + try: + call = self._stub.PublishEvent(req, metadata=metadata) + # response is google.protobuf.Empty + await call + return DaprResponse(await call.initial_metadata()) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def get_state( self, @@ -491,12 +494,17 @@ async def get_state( if not store_name or len(store_name) == 0 or len(store_name.strip()) == 0: raise ValueError('State store name cannot be empty') + req = api_v1.GetStateRequest(store_name=store_name, key=key, metadata=state_metadata) - call = self._stub.GetState(req, metadata=metadata) - response = await call - return StateResponse( - data=response.data, etag=response.etag, headers=await call.initial_metadata() - ) + + try: + call = self._stub.GetState(req, metadata=metadata) + response = await call + return StateResponse( + data=response.data, etag=response.etag, headers=await call.initial_metadata() + ) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def get_bulk_state( self, @@ -542,15 +550,18 @@ async def get_bulk_state( req = api_v1.GetBulkStateRequest( store_name=store_name, keys=keys, parallelism=parallelism, metadata=states_metadata ) - call = self._stub.GetBulkState(req, metadata=metadata) - response = await call - items = [] - for item in response.items: - items.append( - BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error) - ) - return BulkStatesResponse(items=items, headers=await call.initial_metadata()) + try: + call = self._stub.GetBulkState(req, metadata=metadata) + response = await call + items = [] + for item in response.items: + items.append( + BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error) + ) + return BulkStatesResponse(items=items, headers=await call.initial_metadata()) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def query_state( self, store_name: str, query: str, states_metadata: Optional[Dict[str, str]] = dict() @@ -601,21 +612,26 @@ async def query_state( if not store_name or len(store_name) == 0 or len(store_name.strip()) == 0: raise ValueError('State store name cannot be empty') req = api_v1.QueryStateRequest(store_name=store_name, query=query, metadata=states_metadata) - call = self._stub.QueryStateAlpha1(req) - response = await call - results = [] - for item in response.results: - results.append( - QueryResponseItem(key=item.key, value=item.data, etag=item.etag, error=item.error) - ) + try: + call = self._stub.QueryStateAlpha1(req) + response = await call + results = [] + for item in response.results: + results.append( + QueryResponseItem( + key=item.key, value=item.data, etag=item.etag, error=item.error + ) + ) - return QueryResponse( - token=response.token, - results=results, - metadata=response.metadata, - headers=await call.initial_metadata(), - ) + return QueryResponse( + token=response.token, + results=results, + metadata=response.metadata, + headers=await call.initial_metadata(), + ) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def save_state( self, @@ -691,9 +707,12 @@ async def save_state( ) req = api_v1.SaveStateRequest(store_name=store_name, states=[state]) - call = self._stub.SaveState(req, metadata=metadata) - await call - return DaprResponse(headers=await call.initial_metadata()) + try: + call = self._stub.SaveState(req, metadata=metadata) + await call + return DaprResponse(headers=await call.initial_metadata()) + except AioRpcError as e: + raise DaprInternalError(e.details()) from e async def save_bulk_state( self, store_name: str, states: List[StateItem], metadata: Optional[MetadataTuple] = None @@ -749,9 +768,13 @@ async def save_bulk_state( ] req = api_v1.SaveStateRequest(store_name=store_name, states=req_states) - call = self._stub.SaveState(req, metadata=metadata) - await call - return DaprResponse(headers=await call.initial_metadata()) + + try: + call = self._stub.SaveState(req, metadata=metadata) + await call + return DaprResponse(headers=await call.initial_metadata()) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def execute_state_transaction( self, @@ -815,9 +838,13 @@ async def execute_state_transaction( req = api_v1.ExecuteStateTransactionRequest( storeName=store_name, operations=req_ops, metadata=transactional_metadata ) - call = self._stub.ExecuteStateTransaction(req, metadata=metadata) - await call - return DaprResponse(headers=await call.initial_metadata()) + + try: + call = self._stub.ExecuteStateTransaction(req, metadata=metadata) + await call + return DaprResponse(headers=await call.initial_metadata()) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def delete_state( self, @@ -880,9 +907,13 @@ async def delete_state( options=state_options, metadata=state_metadata, ) - call = self._stub.DeleteState(req, metadata=metadata) - await call - return DaprResponse(headers=await call.initial_metadata()) + + try: + call = self._stub.DeleteState(req, metadata=metadata) + await call + return DaprResponse(headers=await call.initial_metadata()) + except AioRpcError as err: + raise DaprGrpcError(err) from err async def get_secret( self, @@ -1522,8 +1553,13 @@ async def get_metadata(self) -> GetMetadataResponse: information about supported features in the form of component capabilities. """ - call = self._stub.GetMetadata(GrpcEmpty()) - _resp = await call + + try: + call = self._stub.GetMetadata(GrpcEmpty()) + _resp = await call + except AioRpcError as err: + raise DaprGrpcError(err) from err + response: api_v1.GetMetadataResponse = _resp # type alias # Convert to more pythonic formats active_actors_count = { diff --git a/tests/clients/test_dapr_async_grpc_client.py b/tests/clients/test_dapr_async_grpc_client.py index f0539f76..dbef2fb8 100644 --- a/tests/clients/test_dapr_async_grpc_client.py +++ b/tests/clients/test_dapr_async_grpc_client.py @@ -20,8 +20,11 @@ from unittest.mock import patch +from google.rpc import status_pb2, code_pb2 + from dapr.aio.clients.grpc.client import DaprGrpcClientAsync from dapr.aio.clients import DaprClient +from dapr.clients.exceptions import DaprGrpcError from dapr.proto import common_v1 from .fake_dapr_server import FakeDaprSidecar from dapr.conf import settings @@ -202,10 +205,18 @@ async def test_invoke_binding_no_create(self): async def test_publish_event(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') - resp = await dapr.publish_event(pubsub_name='pubsub', topic_name='example', data=b'haha') + resp = await dapr.publish_event( + pubsub_name='pubsub', topic_name='example', data=b'test_data' + ) self.assertEqual(2, len(resp.headers)) - self.assertEqual(['haha'], resp.headers['hdata']) + self.assertEqual(['test_data'], resp.headers['hdata']) + + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError): + await dapr.publish_event(pubsub_name='pubsub', topic_name='example', data=b'test_data') async def test_publish_event_with_content_type(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') @@ -292,12 +303,19 @@ async def test_get_save_delete_state(self): self.assertEqual(resp.data, b'') self.assertEqual(resp.etag, '') + # Check a DaprGrpcError is raised + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError) as context: + await dapr.get_state(store_name='my_statestore', key='key||') + await dapr.delete_state(store_name='statestore', key=key) resp = await dapr.get_state(store_name='statestore', key=key) self.assertEqual(resp.data, b'') self.assertEqual(resp.etag, '') - with self.assertRaises(Exception) as context: + with self.assertRaises(DaprGrpcError) as context: await dapr.delete_state( store_name='statestore', key=key, state_metadata={'must_delete': '1'} ) @@ -359,7 +377,20 @@ async def test_transaction_then_get_states(self): self.assertEqual(resp.items[1].key, another_key) self.assertEqual(resp.items[1].data, to_bytes(another_value.upper())) - async def test_save_then_get_states(self): + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError): + await dapr.execute_state_transaction( + store_name='statestore', + operations=[ + TransactionalStateOperation(key=key, data=value, etag='foo'), + TransactionalStateOperation(key=another_key, data=another_value), + ], + transactional_metadata={'metakey': 'metavalue'}, + ) + + async def test_bulk_save_then_get_states(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') key = str(uuid.uuid4()) @@ -394,6 +425,27 @@ async def test_save_then_get_states(self): self.assertEqual(resp.items[1].etag, '1') self.assertEqual(resp.items[1].data, to_bytes(another_value.upper())) + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError): + await dapr.save_bulk_state( + store_name='statestore', + states=[ + StateItem(key=key, value=value, metadata={'capitalize': '1'}), + StateItem(key=another_key, value=another_value, etag='1'), + ], + metadata=(('metakey', 'metavalue'),), + ) + + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError): + await dapr.get_bulk_state( + store_name='statestore', keys=[key, another_key], states_metadata={'upper': '1'} + ) + async def test_get_secret(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') key1 = 'key_1' @@ -512,6 +564,15 @@ async def test_query_state(self): self.assertEqual(resp.results[0].key, '3') self.assertEqual(len(resp.results), 3) + self._fake_dapr_server.raise_exception_on_next_call( + status_pb2.Status(code=code_pb2.INVALID_ARGUMENT, message='my invalid argument message') + ) + with self.assertRaises(DaprGrpcError): + await dapr.query_state( + store_name='statestore', + query=json.dumps({'filter': {}, 'page': {'limit': 2}}), + ) + async def test_shutdown(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') await dapr.shutdown() From 7161cbb7e4a67bc30aa2213921750662e8647882 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Thu, 8 Feb 2024 00:09:31 +0000 Subject: [PATCH 2/3] Clean up Signed-off-by: Elena Kolevska --- dapr/aio/clients/grpc/client.py | 52 ++++++++++++++++----------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index bbbb51cb..84ded392 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -451,10 +451,11 @@ async def publish_event( call = self._stub.PublishEvent(req, metadata=metadata) # response is google.protobuf.Empty await call - return DaprResponse(await call.initial_metadata()) except AioRpcError as err: raise DaprGrpcError(err) from err + return DaprResponse(await call.initial_metadata()) + async def get_state( self, store_name: str, @@ -500,12 +501,12 @@ async def get_state( try: call = self._stub.GetState(req, metadata=metadata) response = await call - return StateResponse( - data=response.data, etag=response.etag, headers=await call.initial_metadata() - ) except AioRpcError as err: raise DaprGrpcError(err) from err + return StateResponse(data=response.data, etag=response.etag, + headers=await call.initial_metadata()) + async def get_bulk_state( self, store_name: str, @@ -554,15 +555,15 @@ async def get_bulk_state( try: call = self._stub.GetBulkState(req, metadata=metadata) response = await call - items = [] - for item in response.items: - items.append( - BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error) - ) - return BulkStatesResponse(items=items, headers=await call.initial_metadata()) except AioRpcError as err: raise DaprGrpcError(err) from err + items = [] + for item in response.items: + items.append( + BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error)) + return BulkStatesResponse(items=items, headers=await call.initial_metadata()) + async def query_state( self, store_name: str, query: str, states_metadata: Optional[Dict[str, str]] = dict() ) -> QueryResponse: @@ -616,23 +617,17 @@ async def query_state( try: call = self._stub.QueryStateAlpha1(req) response = await call - results = [] - for item in response.results: - results.append( - QueryResponseItem( - key=item.key, value=item.data, etag=item.etag, error=item.error - ) - ) - - return QueryResponse( - token=response.token, - results=results, - metadata=response.metadata, - headers=await call.initial_metadata(), - ) except AioRpcError as err: raise DaprGrpcError(err) from err + results = [] + for item in response.results: + results.append( + QueryResponseItem(key=item.key, value=item.data, etag=item.etag, error=item.error)) + + return QueryResponse(token=response.token, results=results, metadata=response.metadata, + headers=await call.initial_metadata(), ) + async def save_state( self, store_name: str, @@ -772,10 +767,11 @@ async def save_bulk_state( try: call = self._stub.SaveState(req, metadata=metadata) await call - return DaprResponse(headers=await call.initial_metadata()) except AioRpcError as err: raise DaprGrpcError(err) from err + return DaprResponse(headers=await call.initial_metadata()) + async def execute_state_transaction( self, store_name: str, @@ -842,10 +838,11 @@ async def execute_state_transaction( try: call = self._stub.ExecuteStateTransaction(req, metadata=metadata) await call - return DaprResponse(headers=await call.initial_metadata()) except AioRpcError as err: raise DaprGrpcError(err) from err + return DaprResponse(headers=await call.initial_metadata()) + async def delete_state( self, store_name: str, @@ -911,10 +908,11 @@ async def delete_state( try: call = self._stub.DeleteState(req, metadata=metadata) await call - return DaprResponse(headers=await call.initial_metadata()) except AioRpcError as err: raise DaprGrpcError(err) from err + return DaprResponse(headers=await call.initial_metadata()) + async def get_secret( self, store_name: str, From cbc54ea42cf960aebd5e220630398a93d79261e6 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Thu, 8 Feb 2024 00:11:54 +0000 Subject: [PATCH 3/3] Ruff Signed-off-by: Elena Kolevska --- dapr/aio/clients/grpc/client.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 84ded392..44578c73 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -504,8 +504,9 @@ async def get_state( except AioRpcError as err: raise DaprGrpcError(err) from err - return StateResponse(data=response.data, etag=response.etag, - headers=await call.initial_metadata()) + return StateResponse( + data=response.data, etag=response.etag, headers=await call.initial_metadata() + ) async def get_bulk_state( self, @@ -561,7 +562,8 @@ async def get_bulk_state( items = [] for item in response.items: items.append( - BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error)) + BulkStateItem(key=item.key, data=item.data, etag=item.etag, error=item.error) + ) return BulkStatesResponse(items=items, headers=await call.initial_metadata()) async def query_state( @@ -623,10 +625,15 @@ async def query_state( results = [] for item in response.results: results.append( - QueryResponseItem(key=item.key, value=item.data, etag=item.etag, error=item.error)) + QueryResponseItem(key=item.key, value=item.data, etag=item.etag, error=item.error) + ) - return QueryResponse(token=response.token, results=results, metadata=response.metadata, - headers=await call.initial_metadata(), ) + return QueryResponse( + token=response.token, + results=results, + metadata=response.metadata, + headers=await call.initial_metadata(), + ) async def save_state( self,