diff --git a/dapr/actor/client/proxy.py b/dapr/actor/client/proxy.py index e7baa90c..fd62d271 100644 --- a/dapr/actor/client/proxy.py +++ b/dapr/actor/client/proxy.py @@ -98,7 +98,7 @@ class ActorProxy: communication. """ - _default_proxy_factory = ActorProxyFactory() + _default_proxy_factory = None def __init__( self, @@ -127,6 +127,13 @@ def actor_type(self) -> str: """Returns actor type.""" return self._actor_type + @classmethod + def _get_default_factory_instance(cls): + """Lazily initializes and returns the default ActorProxyFactory instance.""" + if cls._default_proxy_factory is None: + cls._default_proxy_factory = ActorProxyFactory() + return cls._default_proxy_factory + @classmethod def create( cls, @@ -146,8 +153,11 @@ def create( Returns: :class:`ActorProxy': new Actor Proxy client. + @param actor_proxy_factory: """ - factory = cls._default_proxy_factory if not actor_proxy_factory else actor_proxy_factory + factory = ( + actor_proxy_factory if actor_proxy_factory else cls._get_default_factory_instance() + ) return factory.create(actor_type, actor_id, actor_interface) async def invoke_method(self, method: str, raw_body: Optional[bytes] = None) -> bytes: diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 44578c73..89f473e2 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -42,6 +42,7 @@ from dapr.clients.exceptions import DaprInternalError, DaprGrpcError from dapr.clients.grpc._state import StateOptions, StateItem from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus +from dapr.clients.health import DaprHealth from dapr.conf.helpers import GrpcEndpoint from dapr.conf import settings from dapr.proto import api_v1, api_service_v1, common_v1 @@ -129,6 +130,8 @@ def __init__( max_grpc_messsage_length (int, optional): The maximum grpc send and receive message length in bytes. """ + DaprHealth.wait_until_ready() + useragent = f'dapr-sdk-python/{__version__}' if not max_grpc_message_length: options = [ diff --git a/dapr/clients/__init__.py b/dapr/clients/__init__.py index 042f0dd4..b39124b0 100644 --- a/dapr/clients/__init__.py +++ b/dapr/clients/__init__.py @@ -24,6 +24,7 @@ from dapr.conf import settings from google.protobuf.message import Message as GrpcMessage + __all__ = [ 'DaprClient', 'DaprActorClientBase', @@ -32,6 +33,7 @@ 'ERROR_CODE_UNKNOWN', ] + from grpc import ( # type: ignore UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index 2fb69b71..55ee735d 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -40,6 +40,7 @@ from dapr.clients.exceptions import DaprInternalError, DaprGrpcError from dapr.clients.grpc._state import StateOptions, StateItem from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus +from dapr.clients.health import DaprHealth from dapr.conf import settings from dapr.proto import api_v1, api_service_v1, common_v1 from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse @@ -127,6 +128,8 @@ def __init__( max_grpc_messsage_length (int, optional): The maximum grpc send and receive message length in bytes. """ + DaprHealth.wait_until_ready() + useragent = f'dapr-sdk-python/{__version__}' if not max_grpc_message_length: options = [ diff --git a/dapr/clients/health.py b/dapr/clients/health.py new file mode 100644 index 00000000..120b5593 --- /dev/null +++ b/dapr/clients/health.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import urllib.request +import urllib.error +import time + +from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT +from dapr.clients.http.helpers import get_api_url +from dapr.conf import settings + + +class DaprHealth: + @staticmethod + def wait_until_ready(): + health_url = f'{get_api_url()}/healthz/outbound' + headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} + if settings.DAPR_API_TOKEN is not None: + headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN + timeout = settings.DAPR_HEALTH_TIMEOUT + + start = time.time() + while True: + try: + req = urllib.request.Request(health_url, headers=headers) + with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response: + if 200 <= response.status < 300: + break + except urllib.error.URLError as e: + print(f'Health check on {health_url} failed: {e.reason}') + except Exception as e: + print(f'Unexpected error during health check: {e}') + + remaining = (start + timeout) - time.time() + if remaining <= 0: + raise TimeoutError(f'Dapr health check timed out, after {timeout}.') + time.sleep(min(1, remaining)) + + @staticmethod + def get_ssl_context(): + # This method is used (overwritten) from tests + # to return context for self-signed certificates + return None diff --git a/dapr/clients/http/client.py b/dapr/clients/http/client.py index cfde7380..0d591156 100644 --- a/dapr/clients/http/client.py +++ b/dapr/clients/http/client.py @@ -17,18 +17,20 @@ from typing import Callable, Mapping, Dict, Optional, Union, Tuple, TYPE_CHECKING +from dapr.clients.http.conf import ( + DAPR_API_TOKEN_HEADER, + USER_AGENT_HEADER, + DAPR_USER_AGENT, + CONTENT_TYPE_HEADER, +) +from dapr.clients.health import DaprHealth + if TYPE_CHECKING: from dapr.serializers import Serializer from dapr.conf import settings from dapr.clients.base import DEFAULT_JSON_CONTENT_TYPE from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_DOES_NOT_EXIST, ERROR_CODE_UNKNOWN -from dapr.version import __version__ - -CONTENT_TYPE_HEADER = 'content-type' -DAPR_API_TOKEN_HEADER = 'dapr-api-token' -USER_AGENT_HEADER = 'User-Agent' -DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}' class DaprHttpClient: @@ -47,18 +49,12 @@ def __init__( timeout (int, optional): Timeout in seconds, defaults to 60. headers_callback (lambda: Dict[str, str]], optional): Generates header for each request. """ + DaprHealth.wait_until_ready() + self._timeout = aiohttp.ClientTimeout(total=timeout) self._serializer = message_serializer self._headers_callback = headers_callback - def get_api_url(self) -> str: - if settings.DAPR_HTTP_ENDPOINT: - return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION) - - return 'http://{}:{}/{}'.format( - settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION - ) - async def send_bytes( self, method: str, diff --git a/dapr/clients/http/conf.py b/dapr/clients/http/conf.py new file mode 100644 index 00000000..2dce2834 --- /dev/null +++ b/dapr/clients/http/conf.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from dapr.version import __version__ + +CONTENT_TYPE_HEADER = 'content-type' +DAPR_API_TOKEN_HEADER = 'dapr-api-token' +USER_AGENT_HEADER = 'User-Agent' +DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}' diff --git a/dapr/clients/http/dapr_actor_http_client.py b/dapr/clients/http/dapr_actor_http_client.py index 0a462587..a4fccfc1 100644 --- a/dapr/clients/http/dapr_actor_http_client.py +++ b/dapr/clients/http/dapr_actor_http_client.py @@ -15,6 +15,8 @@ from typing import Callable, Dict, Optional, Union, TYPE_CHECKING +from dapr.clients.http.helpers import get_api_url + if TYPE_CHECKING: from dapr.serializers import Serializer @@ -145,4 +147,4 @@ async def unregister_timer(self, actor_type: str, actor_id: str, name: str) -> N await self._client.send_bytes(method='DELETE', url=url, data=None) def _get_base_url(self, actor_type: str, actor_id: str) -> str: - return '{}/actors/{}/{}'.format(self._client.get_api_url(), actor_type, actor_id) + return '{}/actors/{}/{}'.format(get_api_url(), actor_type, actor_id) diff --git a/dapr/clients/http/dapr_invocation_http_client.py b/dapr/clients/http/dapr_invocation_http_client.py index 3e93d0a5..ca1a5dfa 100644 --- a/dapr/clients/http/dapr_invocation_http_client.py +++ b/dapr/clients/http/dapr_invocation_http_client.py @@ -18,9 +18,11 @@ from typing import Callable, Dict, Optional, Union from multidict import MultiDict -from dapr.clients.http.client import DaprHttpClient, CONTENT_TYPE_HEADER +from dapr.clients.http.client import DaprHttpClient from dapr.clients.grpc._helpers import MetadataTuple, GrpcMessage from dapr.clients.grpc._response import InvokeMethodResponse +from dapr.clients.http.conf import CONTENT_TYPE_HEADER +from dapr.clients.http.helpers import get_api_url from dapr.serializers import DefaultJSONSerializer from dapr.version import __version__ @@ -88,7 +90,7 @@ async def invoke_method_async( headers[USER_AGENT_HEADER] = DAPR_USER_AGENT - url = f'{self._client.get_api_url()}/invoke/{app_id}/method/{method_name}' + url = f'{get_api_url()}/invoke/{app_id}/method/{method_name}' if isinstance(data, GrpcMessage): body = data.SerializeToString() diff --git a/dapr/clients/http/helpers.py b/dapr/clients/http/helpers.py new file mode 100644 index 00000000..00d7a250 --- /dev/null +++ b/dapr/clients/http/helpers.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from dapr.conf import settings + + +def get_api_url() -> str: + if settings.DAPR_HTTP_ENDPOINT: + return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION) + + return 'http://{}:{}/{}'.format( + settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION + ) diff --git a/dapr/conf/global_settings.py b/dapr/conf/global_settings.py index 5fe5647f..b7cb885b 100644 --- a/dapr/conf/global_settings.py +++ b/dapr/conf/global_settings.py @@ -25,6 +25,7 @@ DAPR_HTTP_PORT = 3500 DAPR_GRPC_PORT = 50001 DAPR_API_VERSION = 'v1.0' +DAPR_HEALTH_TIMEOUT = 60 # seconds DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index 47f76c83..3030f64a 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -74,6 +74,14 @@ If your Dapr instance is configured to require the `DAPR_API_TOKEN` environment set it in the environment and the client will use it automatically. You can read more about Dapr API token authentication [here](https://docs.dapr.io/operations/security/api-token/). +##### Health timeout +On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outboud`). +The client will wait for the sidecar to be up and running before proceeding. + +The default timeout is 60 seconds, but it can be overridden by setting the `DAPR_HEALTH_TIMEOUT` +environment variable. + + ## Error handling Initially, errors in Dapr followed the [Standard gRPC error model](https://grpc.io/docs/guides/error/#standard-error-model). However, to provide more detailed and informative error messages, in version 1.13 an enhanced error model has been introduced which aligns with the gRPC [Richer error model](https://grpc.io/docs/guides/error/#richer-error-model). In response, the Python SDK implemented `DaprGrpcError`, a custom exception class designed to improve the developer experience. It's important to note that the transition to using `DaprGrpcError` for all gRPC status exceptions is a work in progress. As of now, not every API call in the SDK has been updated to leverage this custom exception. We are actively working on this enhancement and welcome contributions from the community. diff --git a/examples/README.md b/examples/README.md index f2a26428..602c0069 100644 --- a/examples/README.md +++ b/examples/README.md @@ -2,18 +2,19 @@ These examples demonstrate how to use the Dapr Python SDK: -| Example | Description | -|---------|-------------| -| [Service invocation](./invoke-simple) | Invoke service by passing bytes data +| Example | Description | +|-------------------------------------------------------|-------------| +| [Service invocation](./invoke-simple) | Invoke service by passing bytes data | [Service invocation (advanced)](./invoke-custom-data) | Invoke service by using custom protobuf message -| [State management](./state_store) | Save and get state to/from the state store -| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events -| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources -| [Virtual actors](./demo_actor) | Try Dapr virtual actor features -| [Secrets](./secret_store) | Get secrets from a defined secret store -| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support -| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks -| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor +| [State management](./state_store) | Save and get state to/from the state store +| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events +| [Error handling](./error_handling) | Error handling +| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources +| [Virtual actors](./demo_actor) | Try Dapr virtual actor features +| [Secrets](./secret_store) | Get secrets from a defined secret store +| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support +| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks +| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor ## More information diff --git a/examples/configuration/configuration.py b/examples/configuration/configuration.py index c6e613fa..caf676e6 100644 --- a/examples/configuration/configuration.py +++ b/examples/configuration/configuration.py @@ -26,9 +26,6 @@ async def executeConfiguration(): keys = ['orderId1', 'orderId2'] - # Wait for sidecar to be up within 20 seconds. - d.wait(20) - global configuration # Get one configuration by key. diff --git a/examples/error_handling/error_handling.py b/examples/error_handling/error_handling.py index 94768d33..b75ebed9 100644 --- a/examples/error_handling/error_handling.py +++ b/examples/error_handling/error_handling.py @@ -1,15 +1,13 @@ from dapr.clients import DaprClient from dapr.clients.exceptions import DaprGrpcError + with DaprClient() as d: storeName = 'statestore' key = 'key||' value = 'value_1' - # Wait for sidecar to be up within 5 seconds. - d.wait(5) - # Save single state. try: d.save_state(store_name=storeName, key=key, value=value) diff --git a/examples/state_store/state_store.py b/examples/state_store/state_store.py index 411e809a..f87167f5 100644 --- a/examples/state_store/state_store.py +++ b/examples/state_store/state_store.py @@ -9,6 +9,7 @@ from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType from dapr.clients.grpc._state import StateItem + with DaprClient() as d: storeName = 'statestore' @@ -22,9 +23,6 @@ yet_another_key = 'key_3' yet_another_value = 'value_3' - # Wait for sidecar to be up within 5 seconds. - d.wait(5) - # Save single state. d.save_state(store_name=storeName, key=key, value=value) print(f'State store has successfully saved {value} with {key} as key') @@ -63,8 +61,7 @@ # StatusCode should be StatusCode.ABORTED. print(f'Cannot save bulk due to bad etags. ErrorCode={err.code()}') - # For detailed error messages from the dapr runtime: - # print(f"Details={err.details()}) + # For detailed error messages from the dapr runtime: # print(f"Details={err.details()}) # Get one state by key. state = d.get_state(store_name=storeName, key=key, state_metadata={'metakey': 'metavalue'}) diff --git a/examples/state_store_query/state_store_query.py b/examples/state_store_query/state_store_query.py index 90aa23be..f532f0eb 100644 --- a/examples/state_store_query/state_store_query.py +++ b/examples/state_store_query/state_store_query.py @@ -3,20 +3,17 @@ """ from dapr.clients import DaprClient -from dapr.clients.grpc._state import StateItem import json -with DaprClient() as d: - storeName = 'statestore' - # Wait for sidecar to be up within 5 seconds. - d.wait(5) +with DaprClient() as d: + store_name = 'statestore' # Query the state store query = open('query.json', 'r').read() - res = d.query_state(store_name=storeName, query=query) + res = d.query_state(store_name=store_name, query=query) for r in res.results: print(r.key, json.dumps(json.loads(str(r.value, 'UTF-8')), sort_keys=True)) print('Token:', res.token) @@ -24,7 +21,7 @@ # Get more results using a pagination token query = open('query-token.json', 'r').read() - res = d.query_state(store_name=storeName, query=query) + res = d.query_state(store_name=store_name, query=query) for r in res.results: print(r.key, json.dumps(json.loads(str(r.value, 'UTF-8')), sort_keys=True)) print('Token:', res.token) diff --git a/ext/dapr-ext-fastapi/tests/test_app.py b/ext/dapr-ext-fastapi/tests/test_app.py index 98da225b..831d55eb 100644 --- a/ext/dapr-ext-fastapi/tests/test_app.py +++ b/ext/dapr-ext-fastapi/tests/test_app.py @@ -1,9 +1,10 @@ +import unittest + from fastapi import FastAPI from fastapi.testclient import TestClient -from dapr.ext.fastapi import DaprApp from pydantic import BaseModel -import unittest +from dapr.ext.fastapi import DaprApp class Message(BaseModel): diff --git a/ext/flask_dapr/tests/test_app.py b/ext/flask_dapr/tests/test_app.py index 8fb764a5..7ddfa14f 100644 --- a/ext/flask_dapr/tests/test_app.py +++ b/ext/flask_dapr/tests/test_app.py @@ -1,9 +1,8 @@ -from flask import Flask -from flask_dapr import DaprApp - +import json import unittest -import json +from flask import Flask +from flask_dapr import DaprApp class DaprAppTest(unittest.TestCase): diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index d8b209c0..d9b602c9 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -23,6 +23,7 @@ from dapr.actor.runtime.context import ActorRuntimeContext from dapr.actor.runtime.runtime import ActorRuntime from dapr.actor.runtime._type_information import ActorTypeInformation +from dapr.conf import settings from dapr.serializers import DefaultJSONSerializer from tests.actor.fake_actor_classes import ( @@ -33,11 +34,21 @@ ) from tests.actor.fake_client import FakeDaprActorClient - from tests.actor.utils import _async_mock, _run +from tests.clients.fake_http_server import FakeHttpServer class ActorTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.server = FakeHttpServer(3500) + cls.server.start() + settings.DAPR_HTTP_PORT = 3500 + + @classmethod + def tearDownClass(cls): + cls.server.shutdown_server() + def setUp(self): ActorRuntime._actor_managers = {} ActorRuntime.set_actor_config(ActorRuntimeConfig()) diff --git a/tests/actor/test_actor_reentrancy.py b/tests/actor/test_actor_reentrancy.py index 40b948a5..834273f4 100644 --- a/tests/actor/test_actor_reentrancy.py +++ b/tests/actor/test_actor_reentrancy.py @@ -20,6 +20,7 @@ from dapr.actor.runtime.runtime import ActorRuntime from dapr.actor.runtime.config import ActorRuntimeConfig, ActorReentrancyConfig +from dapr.conf import settings from dapr.serializers import DefaultJSONSerializer from tests.actor.fake_actor_classes import ( @@ -29,9 +30,20 @@ ) from tests.actor.utils import _run +from tests.clients.fake_http_server import FakeHttpServer class ActorRuntimeTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.server = FakeHttpServer(3500) + cls.server.start() + settings.DAPR_HTTP_PORT = 3500 + + @classmethod + def tearDownClass(cls): + cls.server.shutdown_server() + def setUp(self): ActorRuntime._actor_managers = {} ActorRuntime.set_actor_config( diff --git a/tests/actor/test_actor_runtime.py b/tests/actor/test_actor_runtime.py index 3aa24289..f17f96cc 100644 --- a/tests/actor/test_actor_runtime.py +++ b/tests/actor/test_actor_runtime.py @@ -19,6 +19,7 @@ from dapr.actor.runtime.runtime import ActorRuntime from dapr.actor.runtime.config import ActorRuntimeConfig +from dapr.conf import settings from dapr.serializers import DefaultJSONSerializer from tests.actor.fake_actor_classes import ( @@ -28,9 +29,20 @@ ) from tests.actor.utils import _run +from tests.clients.fake_http_server import FakeHttpServer class ActorRuntimeTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.server = FakeHttpServer(3500) + cls.server.start() + settings.DAPR_HTTP_PORT = 3500 + + @classmethod + def tearDownClass(cls): + cls.server.shutdown_server() + def setUp(self): ActorRuntime._actor_managers = {} ActorRuntime.set_actor_config(ActorRuntimeConfig()) diff --git a/tests/actor/test_client_proxy.py b/tests/actor/test_client_proxy.py index 2e21b634..fe667d62 100644 --- a/tests/actor/test_client_proxy.py +++ b/tests/actor/test_client_proxy.py @@ -12,20 +12,20 @@ See the License for the specific language governing permissions and limitations under the License. """ - import unittest from unittest import mock + from dapr.actor.id import ActorId from dapr.actor.client.proxy import ActorProxy from dapr.serializers import DefaultJSONSerializer - from tests.actor.fake_actor_classes import ( FakeMultiInterfacesActor, FakeActorCls2Interface, ) + from tests.actor.fake_client import FakeDaprActorClient from tests.actor.utils import _async_mock, _run diff --git a/tests/clients/certs.py b/tests/clients/certs.py index 5fb9b8a6..a30b2531 100644 --- a/tests/clients/certs.py +++ b/tests/clients/certs.py @@ -1,42 +1,83 @@ import os +import ssl +import grpc from OpenSSL import crypto -PRIVATE_KEY_PATH = os.path.join(os.path.dirname(__file__), 'private.key') -CERTIFICATE_CHAIN_PATH = os.path.join(os.path.dirname(__file__), 'selfsigned.pem') +class Certs: + server_type = 'grpc' -def create_certificates(server_type='grpc'): - # create a key pair - k = crypto.PKey() - k.generate_key(crypto.TYPE_RSA, 4096) + @classmethod + def create_certificates(cls): + # create a key pair + k = crypto.PKey() + k.generate_key(crypto.TYPE_RSA, 4096) - # create a self-signed cert - cert = crypto.X509() - cert.get_subject().organizationName = 'Dapr' - cert.get_subject().commonName = 'localhost' - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(24 * 60 * 60) - cert.set_issuer(cert.get_subject()) - cert.set_pubkey(k) + # create a self-signed cert + cert = crypto.X509() + cert.get_subject().organizationName = 'Dapr' + cert.get_subject().commonName = 'localhost' + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(24 * 60 * 60) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(k) - if server_type == 'http': - cert.add_extensions([crypto.X509Extension(b'subjectAltName', False, b'DNS:localhost')]) + if cls.server_type == 'http': + cert.add_extensions([crypto.X509Extension(b'subjectAltName', False, b'DNS:localhost')]) - cert.sign(k, 'sha512') + cert.sign(k, 'sha512') - f_cert = open(CERTIFICATE_CHAIN_PATH, 'wt') - f_cert.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')) - f_cert.close() + with open(cls.get_cert_path(), 'wt') as f_cert: + f_cert.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')) - f_key = open(PRIVATE_KEY_PATH, 'wt') - f_key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('utf-8')) - f_key.close() + with open(cls.get_pk_path(), 'wt') as f_key: + f_key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('utf-8')) + @classmethod + def get_pk_path(cls): + return os.path.join(os.path.dirname(__file__), '{}_private.key').format(cls.server_type) -def delete_certificates(): - if os.path.exists(PRIVATE_KEY_PATH): - os.remove(PRIVATE_KEY_PATH) + @classmethod + def get_cert_path(cls): + return os.path.join(os.path.dirname(__file__), '{}_selfsigned.pem').format(cls.server_type) - if os.path.exists(CERTIFICATE_CHAIN_PATH): - os.remove(CERTIFICATE_CHAIN_PATH) + @classmethod + def delete_certificates(cls): + pk = cls.get_pk_path() + if os.path.exists(pk): + os.remove(pk) + + cert = cls.get_cert_path() + if os.path.exists(cert): + os.remove(cert) + + +class GrpcCerts(Certs): + server_type = 'grpc' + + +class HttpCerts(Certs): + server_type = 'http' + + +def replacement_get_credentials_func(a): + """ + Used temporarily, so we can trust self-signed certificates in unit tests + until they get their own environment variable + """ + with open(GrpcCerts.get_cert_path(), 'rb') as f: + creds = grpc.ssl_channel_credentials(f.read()) + return creds + + +def replacement_get_health_context(): + """ + This method is used (overwritten) from tests + to return context for self-signed certificates + """ + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + return context diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index f4e6bd81..2de14984 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -5,6 +5,7 @@ from google.protobuf.any_pb2 import Any as GrpcAny from google.protobuf import empty_pb2 from grpc_status import rpc_status + from dapr.clients.grpc._helpers import to_bytes from dapr.proto import api_service_v1, common_v1, api_v1 from dapr.proto.common.v1.common_pb2 import ConfigurationItem @@ -31,18 +32,17 @@ ) from typing import Dict -from tests.clients.certs import ( - create_certificates, - delete_certificates, - PRIVATE_KEY_PATH, - CERTIFICATE_CHAIN_PATH, -) +from tests.clients.certs import GrpcCerts +from tests.clients.fake_http_server import FakeHttpServer class FakeDaprSidecar(api_service_v1.DaprServicer): - def __init__(self): - self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - api_service_v1.add_DaprServicer_to_server(self, self._server) + def __init__(self, grpc_port: int = 50001, http_port: int = 8080): + self.grpc_port = grpc_port + self.http_port = http_port + self._grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self._http_server = FakeHttpServer(self.http_port) # Needed for the healthcheck endpoint + api_service_v1.add_DaprServicer_to_server(self, self._grpc_server) self.store = {} self.shutdown_received = False self.locks_to_owner = {} # (store_name, resource_id) -> lock_owner @@ -51,35 +51,39 @@ def __init__(self): self.metadata: Dict[str, str] = {} self._next_exception = None - def start(self, port: int = 8080): - self._server.add_insecure_port(f'[::]:{port}') - self._server.start() + def start(self): + self._grpc_server.add_insecure_port(f'[::]:{self.grpc_port}') + self._grpc_server.start() + self._http_server.start() - def start_secure(self, port: int = 4443): - create_certificates() + def start_secure(self): + GrpcCerts.create_certificates() - private_key_file = open(PRIVATE_KEY_PATH, 'rb') + private_key_file = open(GrpcCerts.get_pk_path(), 'rb') private_key_content = private_key_file.read() private_key_file.close() - certificate_chain_file = open(CERTIFICATE_CHAIN_PATH, 'rb') + certificate_chain_file = open(GrpcCerts.get_cert_path(), 'rb') certificate_chain_content = certificate_chain_file.read() certificate_chain_file.close() - certificate_chain_file.close() credentials = grpc.ssl_server_credentials( [(private_key_content, certificate_chain_content)] ) - self._server.add_secure_port(f'[::]:{port}', credentials) - self._server.start() + self._grpc_server.add_secure_port(f'[::]:{self.grpc_port}', credentials) + self._grpc_server.start() + + self._http_server.start_secure() def stop(self): - self._server.stop(None) + self._http_server.shutdown_server() + self._grpc_server.stop(None) def stop_secure(self): - self._server.stop(None) - delete_certificates() + self._http_server.shutdown_server() + self._grpc_server.stop(None) + GrpcCerts.delete_certificates() def raise_exception_on_next_call(self, exception): """ diff --git a/tests/clients/fake_http_server.py b/tests/clients/fake_http_server.py index e019b3ff..e08e82d2 100644 --- a/tests/clients/fake_http_server.py +++ b/tests/clients/fake_http_server.py @@ -4,12 +4,7 @@ from threading import Thread from http.server import BaseHTTPRequestHandler, HTTPServer -from tests.clients.certs import ( - CERTIFICATE_CHAIN_PATH, - PRIVATE_KEY_PATH, - create_certificates, - delete_certificates, -) +from tests.clients.certs import HttpCerts class DaprHandler(BaseHTTPRequestHandler): @@ -20,6 +15,11 @@ def serve_forever(self): self.handle_request() def do_request(self, verb): + if self.path == '/v1.0/healthz/outbound': + self.send_response(200) + self.end_headers() + return + if self.server.sleep_time is not None: time.sleep(self.server.sleep_time) self.received_verb = verb @@ -53,19 +53,14 @@ def do_DELETE(self): class FakeHttpServer(Thread): - def __init__(self, secure=False): + secure = False + + def __init__(self, port: int = 8080): super().__init__() - self.secure = secure - self.port = 4443 if secure else 8080 + self.port = port self.server = HTTPServer(('localhost', self.port), DaprHandler) - if self.secure: - create_certificates('http') - ssl_context = SSLContext(PROTOCOL_TLS_SERVER) - ssl_context.load_cert_chain(CERTIFICATE_CHAIN_PATH, PRIVATE_KEY_PATH) - self.server.socket = ssl_context.wrap_socket(self.server.socket, server_side=True) - self.server.response_body = b'' self.server.response_code = 200 self.server.response_header_list = [] @@ -86,7 +81,7 @@ def shutdown_server(self): self.server.socket.close() self.join() if self.secure: - delete_certificates() + HttpCerts.delete_certificates() def request_path(self): return self.server.path @@ -101,5 +96,22 @@ def get_request_body(self): def set_server_delay(self, delay_seconds): self.server.sleep_time = delay_seconds + def start_secure(self): + self.secure = True + + HttpCerts.create_certificates() + ssl_context = SSLContext(PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(HttpCerts.get_cert_path(), HttpCerts.get_pk_path()) + self.server.socket = ssl_context.wrap_socket(self.server.socket, server_side=True) + + self.start() + def run(self): self.server.serve_forever() + + def reset(self): + self.server.response_body = b'' + self.server.response_code = 200 + self.server.response_header_list = [] + self.server.request_body = b'' + self.server.sleep_time = None diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index 6480bcb5..21f99a58 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -42,19 +42,24 @@ class DaprGrpcClientTests(unittest.TestCase): - server_port = 8080 + grpc_port = 50001 + http_port = 3500 scheme = '' error = None - def setUp(self): - self._fake_dapr_server = FakeDaprSidecar() - self._fake_dapr_server.start(self.server_port) + @classmethod + def setUpClass(cls): + cls._fake_dapr_server = FakeDaprSidecar(grpc_port=cls.grpc_port, http_port=cls.http_port) + cls._fake_dapr_server.start() + settings.DAPR_HTTP_PORT = cls.http_port + settings.DAPR_HTTP_ENDPOINT = 'http://127.0.0.1:{}'.format(cls.http_port) - def tearDown(self): - self._fake_dapr_server.stop() + @classmethod + def tearDownClass(cls): + cls._fake_dapr_server.stop() def test_http_extension(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Test POST verb without querystring ext = dapr._get_http_extension('POST') @@ -76,7 +81,7 @@ def test_http_extension(self): self.assertEqual('query1=string1&query2=string2&query1=string+3', ext.querystring) def test_invoke_method_bytes_data(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -95,7 +100,7 @@ def test_invoke_method_bytes_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) def test_invoke_method_no_data(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -113,7 +118,7 @@ def test_invoke_method_no_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) def test_invoke_method_async(self): - dapr = DaprClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprClient(f'{self.scheme}localhost:{self.grpc_port}') dapr.invocation_client = None # force to use grpc client with self.assertRaises(NotImplementedError): @@ -133,7 +138,7 @@ def test_invoke_method_async(self): ) def test_invoke_method_proto_data(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') req = common_v1.StateItem(key='test') resp = dapr.invoke_method( app_id='targetId', @@ -155,7 +160,7 @@ def test_invoke_method_proto_data(self): self.assertEqual('test', new_resp.key) def test_invoke_binding_bytes_data(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_binding( binding_name='binding', operation='create', @@ -172,7 +177,7 @@ def test_invoke_binding_bytes_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) def test_invoke_binding_no_metadata(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_binding( binding_name='binding', operation='create', @@ -184,7 +189,7 @@ def test_invoke_binding_no_metadata(self): self.assertEqual(0, len(resp.headers)) def test_invoke_binding_no_data(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_binding( binding_name='binding', operation='create', @@ -195,7 +200,7 @@ def test_invoke_binding_no_data(self): self.assertEqual(0, len(resp.headers)) def test_invoke_binding_no_create(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_binding( binding_name='binding', operation='delete', @@ -207,7 +212,7 @@ def test_invoke_binding_no_create(self): self.assertEqual(0, len(resp.headers)) def test_publish_event(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.publish_event(pubsub_name='pubsub', topic_name='example', data=b'test_data') self.assertEqual(2, len(resp.headers)) @@ -220,7 +225,7 @@ def test_publish_event(self): dapr.publish_event(pubsub_name='pubsub', topic_name='example', data=b'test_data') def test_publish_event_with_content_type(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.publish_event( pubsub_name='pubsub', topic_name='example', @@ -233,7 +238,7 @@ def test_publish_event_with_content_type(self): self.assertEqual(['application/json'], resp.headers['data_content_type']) def test_publish_event_with_metadata(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.publish_event( pubsub_name='pubsub', topic_name='example', @@ -247,7 +252,7 @@ def test_publish_event_with_metadata(self): self.assertEqual(['100'], resp.headers['metadata_ttl_in_seconds']) def test_publish_error(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') with self.assertRaisesRegex(ValueError, "invalid type for data "): dapr.publish_event( pubsub_name='pubsub', @@ -257,7 +262,7 @@ def test_publish_error(self): @patch.object(settings, 'DAPR_API_TOKEN', 'test-token') def test_dapr_api_token_insertion(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -276,7 +281,7 @@ def test_dapr_api_token_insertion(self): self.assertEqual(['test-token'], resp.headers['hdapr-api-token']) def test_get_save_delete_state(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') key = 'key_1' value = 'value_1' options = StateOptions( @@ -322,7 +327,7 @@ def test_get_save_delete_state(self): self.assertTrue('delete failed' in str(context.exception)) def test_get_save_state_etag_none(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') value = 'test' no_etag_key = 'no_etag' @@ -344,7 +349,7 @@ def test_get_save_state_etag_none(self): self.assertEqual(resp.etag, '') def test_transaction_then_get_states(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') key = str(uuid.uuid4()) value = str(uuid.uuid4()) @@ -390,7 +395,7 @@ def test_transaction_then_get_states(self): ) def test_bulk_save_then_get_states(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') key = str(uuid.uuid4()) value = str(uuid.uuid4()) @@ -446,7 +451,7 @@ def test_bulk_save_then_get_states(self): ) def test_get_secret(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') key1 = 'key_1' resp = dapr.get_secret( store_name='store_1', @@ -462,7 +467,7 @@ def test_get_secret(self): self.assertEqual({key1: 'val'}, resp._secret) def test_get_secret_metadata_absent(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') key1 = 'key_1' resp = dapr.get_secret( store_name='store_1', @@ -474,7 +479,7 @@ def test_get_secret_metadata_absent(self): self.assertEqual({key1: 'val'}, resp._secret) def test_get_bulk_secret(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.get_bulk_secret( store_name='store_1', metadata=( @@ -488,7 +493,7 @@ def test_get_bulk_secret(self): self.assertEqual({'keya': {'keyb': 'val'}}, resp._secrets) def test_get_bulk_secret_metadata_absent(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.get_bulk_secret(store_name='store_1') self.assertEqual(1, len(resp.headers)) @@ -496,7 +501,7 @@ def test_get_bulk_secret_metadata_absent(self): self.assertEqual({'keya': {'keyb': 'val'}}, resp._secrets) def test_get_configuration(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') keys = ['k', 'k1'] value = 'value' version = '1.5.0' @@ -521,7 +526,7 @@ def test_get_configuration(self): self.assertEqual(item.metadata, metadata) def test_subscribe_configuration(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') def mock_watch(self, stub, store_name, keys, handler, config_metadata): handler( @@ -542,12 +547,12 @@ def handler(id: str, resp: ConfigurationResponse): ) def test_unsubscribe_configuration(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') res = dapr.unsubscribe_configuration(store_name='configurationstore', id='k') self.assertTrue(res) def test_query_state(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') resp = dapr.query_state( store_name='statestore', @@ -573,12 +578,12 @@ def test_query_state(self): ) def test_shutdown(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') dapr.shutdown() self.assertTrue(self._fake_dapr_server.shutdown_received) def test_wait_ok(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') dapr.wait(0.1) def test_wait_timeout(self): @@ -593,7 +598,7 @@ def test_wait_timeout(self): self.assertTrue('Connection refused' in str(context.exception)) def test_lock_acquire_success(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -606,7 +611,7 @@ def test_lock_acquire_success(self): self.assertEqual(UnlockResponseStatus.success, unlock_response.status) def test_lock_release_twice_fails(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -622,7 +627,7 @@ def test_lock_release_twice_fails(self): self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) def test_lock_conflict(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -644,14 +649,14 @@ def test_lock_conflict(self): self.assertEqual(UnlockResponseStatus.success, unlock_response.status) def test_lock_not_previously_acquired(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') unlock_response = dapr.unlock( store_name='lockstore', resource_id=str(uuid.uuid4()), lock_owner=str(uuid.uuid4()) ) self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) def test_lock_release_twice_fails_with_context_manager(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -670,7 +675,7 @@ def test_lock_release_twice_fails_with_context_manager(self): self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) def test_lock_are_not_reentrant(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -684,7 +689,7 @@ def test_lock_are_not_reentrant(self): self.assertFalse(second_attempt.success) def test_lock_input_validation(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -711,7 +716,7 @@ def test_lock_input_validation(self): self.assertTrue(res.success) def test_unlock_input_validation(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -733,7 +738,7 @@ def test_unlock_input_validation(self): # def test_workflow(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters workflow_name = 'test_workflow' event_name = 'eventName' @@ -798,7 +803,7 @@ def test_workflow(self): # def test_get_metadata(self): - with DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') as dapr: + with DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') as dapr: response = dapr.get_metadata() self.assertIsNotNone(response) @@ -828,7 +833,7 @@ def test_get_metadata(self): def test_set_metadata(self): metadata_key = 'test_set_metadata_attempt' - with DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') as dapr: + with DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') as dapr: for metadata_value in [str(i) for i in range(10)]: dapr.set_metadata(attributeName=metadata_key, attributeValue=metadata_value) response = dapr.get_metadata() @@ -845,11 +850,11 @@ def test_set_metadata(self): self.assertEqual(response.extended_metadata[metadata_key], metadata_value) def test_set_metadata_input_validation(self): - dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') valid_attr_name = 'attribute name' valid_attr_value = 'attribute value' # Invalid inputs for string arguments - with DaprGrpcClient(f'{self.scheme}localhost:{self.server_port}') as dapr: + with DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') as dapr: for invalid_attr_name in [None, '', ' ']: with self.assertRaises(ValueError): dapr.set_metadata(invalid_attr_name, valid_attr_value) diff --git a/tests/clients/test_dapr_async_grpc_client.py b/tests/clients/test_dapr_grpc_client_async.py similarity index 96% rename from tests/clients/test_dapr_async_grpc_client.py rename to tests/clients/test_dapr_grpc_client_async.py index dbef2fb8..95d7e405 100644 --- a/tests/clients/test_dapr_async_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -40,18 +40,24 @@ class DaprGrpcClientAsyncTests(unittest.IsolatedAsyncioTestCase): - server_port = 8080 + grpc_port = 50001 + http_port = 3500 scheme = '' - def setUp(self): - self._fake_dapr_server = FakeDaprSidecar() - self._fake_dapr_server.start(self.server_port) + @classmethod + def setUpClass(cls): + cls._fake_dapr_server = FakeDaprSidecar(grpc_port=cls.grpc_port, http_port=cls.http_port) + cls._fake_dapr_server.start() - def tearDown(self): - self._fake_dapr_server.stop() + settings.DAPR_HTTP_PORT = cls.http_port + settings.DAPR_HTTP_ENDPOINT = 'http://127.0.0.1:{}'.format(cls.http_port) + + @classmethod + def tearDownClass(cls): + cls._fake_dapr_server.stop() async def test_http_extension(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Test POST verb without querystring ext = dapr._get_http_extension('POST') @@ -73,7 +79,7 @@ async def test_http_extension(self): self.assertEqual('query1=string1&query2=string2&query1=string+3', ext.querystring) async def test_invoke_method_bytes_data(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -92,7 +98,7 @@ async def test_invoke_method_bytes_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) async def test_invoke_method_no_data(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -110,7 +116,7 @@ async def test_invoke_method_no_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) async def test_invoke_method_with_dapr_client(self): - dapr = DaprClient(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprClient(f'{self.scheme}localhost:{self.grpc_port}') dapr.invocation_client = None # force to use grpc client resp = await dapr.invoke_method( @@ -130,7 +136,7 @@ async def test_invoke_method_with_dapr_client(self): self.assertEqual(['value1'], resp.headers['hkey1']) async def test_invoke_method_proto_data(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') req = common_v1.StateItem(key='test') resp = await dapr.invoke_method( app_id='targetId', @@ -152,7 +158,7 @@ async def test_invoke_method_proto_data(self): self.assertEqual('test', new_resp.key) async def test_invoke_binding_bytes_data(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_binding( binding_name='binding', operation='create', @@ -169,7 +175,7 @@ async def test_invoke_binding_bytes_data(self): self.assertEqual(['value1'], resp.headers['hkey1']) async def test_invoke_binding_no_metadata(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_binding( binding_name='binding', operation='create', @@ -181,7 +187,7 @@ async def test_invoke_binding_no_metadata(self): self.assertEqual(0, len(resp.headers)) async def test_invoke_binding_no_data(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_binding( binding_name='binding', operation='create', @@ -192,7 +198,7 @@ async def test_invoke_binding_no_data(self): self.assertEqual(0, len(resp.headers)) async def test_invoke_binding_no_create(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_binding( binding_name='binding', operation='delete', @@ -204,7 +210,7 @@ async def test_invoke_binding_no_create(self): self.assertEqual(0, len(resp.headers)) async def test_publish_event(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.publish_event( pubsub_name='pubsub', topic_name='example', data=b'test_data' ) @@ -219,7 +225,7 @@ async def test_publish_event(self): await dapr.publish_event(pubsub_name='pubsub', topic_name='example', data=b'test_data') async def test_publish_event_with_content_type(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.publish_event( pubsub_name='pubsub', topic_name='example', @@ -232,7 +238,7 @@ async def test_publish_event_with_content_type(self): self.assertEqual(['application/json'], resp.headers['data_content_type']) async def test_publish_event_with_metadata(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.publish_event( pubsub_name='pubsub', topic_name='example', @@ -246,7 +252,7 @@ async def test_publish_event_with_metadata(self): self.assertEqual(['100'], resp.headers['metadata_ttl_in_seconds']) async def test_publish_error(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') with self.assertRaisesRegex(ValueError, "invalid type for data "): await dapr.publish_event( pubsub_name='pubsub', @@ -256,7 +262,7 @@ async def test_publish_error(self): @patch.object(settings, 'DAPR_API_TOKEN', 'test-token') async def test_dapr_api_token_insertion(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.invoke_method( app_id='targetId', method_name='bytes', @@ -275,7 +281,7 @@ async def test_dapr_api_token_insertion(self): self.assertEqual(['test-token'], resp.headers['hdapr-api-token']) async def test_get_save_delete_state(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') key = 'key_1' value = 'value_1' options = StateOptions( @@ -323,7 +329,7 @@ async def test_get_save_delete_state(self): self.assertTrue('delete failed' in str(context.exception)) async def test_get_save_state_etag_none(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') value = 'test' no_etag_key = 'no_etag' @@ -345,7 +351,7 @@ async def test_get_save_state_etag_none(self): self.assertEqual(resp.etag, '') async def test_transaction_then_get_states(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') key = str(uuid.uuid4()) value = str(uuid.uuid4()) @@ -391,7 +397,7 @@ async def test_transaction_then_get_states(self): ) async def test_bulk_save_then_get_states(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') key = str(uuid.uuid4()) value = str(uuid.uuid4()) @@ -447,7 +453,7 @@ async def test_bulk_save_then_get_states(self): ) async def test_get_secret(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') key1 = 'key_1' resp = await dapr.get_secret( store_name='store_1', @@ -463,7 +469,7 @@ async def test_get_secret(self): self.assertEqual({key1: 'val'}, resp._secret) async def test_get_secret_metadata_absent(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') key1 = 'key_1' resp = await dapr.get_secret( store_name='store_1', @@ -475,7 +481,7 @@ async def test_get_secret_metadata_absent(self): self.assertEqual({key1: 'val'}, resp._secret) async def test_get_bulk_secret(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.get_bulk_secret( store_name='store_1', metadata=( @@ -489,7 +495,7 @@ async def test_get_bulk_secret(self): self.assertEqual({'keya': {'keyb': 'val'}}, resp._secrets) async def test_get_bulk_secret_metadata_absent(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.get_bulk_secret(store_name='store_1') self.assertEqual(1, len(resp.headers)) @@ -497,7 +503,7 @@ async def test_get_bulk_secret_metadata_absent(self): self.assertEqual({'keya': {'keyb': 'val'}}, resp._secrets) async def test_get_configuration(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') keys = ['k', 'k1'] value = 'value' version = '1.5.0' @@ -522,7 +528,7 @@ async def test_get_configuration(self): self.assertEqual(item.metadata, metadata) async def test_subscribe_configuration(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') def mock_watch(self, stub, store_name, keys, handler, config_metadata): handler( @@ -543,12 +549,12 @@ def handler(id: str, resp: ConfigurationResponse): ) async def test_unsubscribe_configuration(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') res = await dapr.unsubscribe_configuration(store_name='configurationstore', id='k') self.assertTrue(res) async def test_query_state(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') resp = await dapr.query_state( store_name='statestore', @@ -574,12 +580,12 @@ async def test_query_state(self): ) async def test_shutdown(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') await dapr.shutdown() self.assertTrue(self._fake_dapr_server.shutdown_received) async def test_wait_ok(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') await dapr.wait(0.1) async def test_wait_timeout(self): @@ -594,7 +600,7 @@ async def test_wait_timeout(self): self.assertTrue('Connection refused' in str(context.exception)) async def test_lock_acquire_success(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -607,7 +613,7 @@ async def test_lock_acquire_success(self): self.assertEqual(UnlockResponseStatus.success, unlock_response.status) async def test_lock_release_twice_fails(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -623,7 +629,7 @@ async def test_lock_release_twice_fails(self): self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) async def test_lock_conflict(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -645,14 +651,14 @@ async def test_lock_conflict(self): self.assertEqual(UnlockResponseStatus.success, unlock_response.status) async def test_lock_not_previously_acquired(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') unlock_response = await dapr.unlock( store_name='lockstore', resource_id=str(uuid.uuid4()), lock_owner=str(uuid.uuid4()) ) self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) async def test_lock_release_twice_fails_with_context_manager(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -675,7 +681,7 @@ async def test_lock_release_twice_fails_with_context_manager(self): self.assertEqual(UnlockResponseStatus.lock_does_not_exist, unlock_response.status) async def test_lock_are_not_reentrant(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Lock parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -693,7 +699,7 @@ async def test_lock_are_not_reentrant(self): self.assertFalse(second_attempt.success) async def test_lock_input_validation(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -728,7 +734,7 @@ async def test_lock_input_validation(self): self.assertTrue(res.success) async def test_unlock_input_validation(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') # Sane parameters store_name = 'lockstore' resource_id = str(uuid.uuid4()) @@ -750,7 +756,7 @@ async def test_unlock_input_validation(self): # async def test_get_metadata(self): - async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') as dapr: + async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') as dapr: response = await dapr.get_metadata() self.assertIsNotNone(response) @@ -780,7 +786,7 @@ async def test_get_metadata(self): async def test_set_metadata(self): metadata_key = 'test_set_metadata_attempt' - async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') as dapr: + async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') as dapr: for metadata_value in [str(i) for i in range(10)]: await dapr.set_metadata(attributeName=metadata_key, attributeValue=metadata_value) response = await dapr.get_metadata() @@ -797,11 +803,11 @@ async def test_set_metadata(self): self.assertEqual(response.extended_metadata[metadata_key], metadata_value) async def test_set_metadata_input_validation(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') + dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') valid_attr_name = 'attribute name' valid_attr_value = 'attribute value' # Invalid inputs for string arguments - async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.server_port}') as dapr: + async with DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') as dapr: for invalid_attr_name in [None, '', ' ']: with self.assertRaises(ValueError): await dapr.set_metadata(invalid_attr_name, valid_attr_value) diff --git a/tests/clients/test_secure_dapr_async_grpc_client.py b/tests/clients/test_dapr_grpc_client_async_secure.py similarity index 63% rename from tests/clients/test_secure_dapr_async_grpc_client.py rename to tests/clients/test_dapr_grpc_client_async_secure.py index 37a4e7c0..652feac2 100644 --- a/tests/clients/test_secure_dapr_async_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client_async_secure.py @@ -13,54 +13,49 @@ limitations under the License. """ -import os import unittest from unittest.mock import patch -import grpc - from dapr.aio.clients.grpc.client import DaprGrpcClientAsync -from tests.clients.test_dapr_async_grpc_client import DaprGrpcClientAsyncTests +from dapr.clients.health import DaprHealth +from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context +from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests from .fake_dapr_server import FakeDaprSidecar from dapr.conf import settings -# Used temporarily, so we can trust self-signed certificates in unit tests -# until they get their own environment variable -def replacement_get_credentials_func(a): - f = open(os.path.join(os.path.dirname(__file__), 'selfsigned.pem'), 'rb') - creds = grpc.ssl_channel_credentials(f.read()) - f.close() - - return creds - - DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func +DaprHealth.get_ssl_context = replacement_get_health_context class DaprSecureGrpcClientAsyncTests(DaprGrpcClientAsyncTests): - server_port = 4443 + grpc_port = 50001 + http_port = 4443 # The http server is used for health checks only, and doesn't need TLS scheme = 'https://' - def setUp(self): - self._fake_dapr_server = FakeDaprSidecar() - self._fake_dapr_server.start_secure(self.server_port) + @classmethod + def setUpClass(cls): + cls._fake_dapr_server = FakeDaprSidecar(grpc_port=cls.grpc_port, http_port=cls.http_port) + cls._fake_dapr_server.start_secure() + settings.DAPR_HTTP_PORT = cls.http_port + settings.DAPR_HTTP_ENDPOINT = 'https://127.0.0.1:{}'.format(cls.http_port) - def tearDown(self): - self._fake_dapr_server.stop_secure() + @classmethod + def tearDownClass(cls): + cls._fake_dapr_server.stop_secure() - @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'https://domain1.com:5000') + @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'dns:domain1.com:5000') def test_init_with_DAPR_GRPC_ENDPOINT(self): dapr = DaprGrpcClientAsync() self.assertEqual('dns:domain1.com:5000', dapr._uri.endpoint) - @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'https://domain1.com:5000') + @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'dns:domain1.com:5000') def test_init_with_DAPR_GRPC_ENDPOINT_and_argument(self): - dapr = DaprGrpcClientAsync('https://domain2.com:5002') + dapr = DaprGrpcClientAsync('dns:domain2.com:5002') self.assertEqual('dns:domain2.com:5002', dapr._uri.endpoint) - @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'https://domain1.com:5000') + @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'dns:domain1.com:5000') @patch.object(settings, 'DAPR_RUNTIME_HOST', 'domain2.com') @patch.object(settings, 'DAPR_GRPC_PORT', '5002') def test_init_with_DAPR_GRPC_ENDPOINT_and_DAPR_RUNTIME_HOST(self): @@ -70,7 +65,7 @@ def test_init_with_DAPR_GRPC_ENDPOINT_and_DAPR_RUNTIME_HOST(self): @patch.object(settings, 'DAPR_RUNTIME_HOST', 'domain1.com') @patch.object(settings, 'DAPR_GRPC_PORT', '5000') def test_init_with_argument_and_DAPR_GRPC_ENDPOINT_and_DAPR_RUNTIME_HOST(self): - dapr = DaprGrpcClientAsync('https://domain2.com:5002') + dapr = DaprGrpcClientAsync('dns:domain2.com:5002') self.assertEqual('dns:domain2.com:5002', dapr._uri.endpoint) async def test_dapr_api_token_insertion(self): diff --git a/tests/clients/test_secure_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client_secure.py similarity index 73% rename from tests/clients/test_secure_dapr_grpc_client.py rename to tests/clients/test_dapr_grpc_client_secure.py index 07a128ea..41dedca1 100644 --- a/tests/clients/test_secure_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client_secure.py @@ -12,41 +12,36 @@ See the License for the specific language governing permissions and limitations under the License. """ -import os import unittest from unittest.mock import patch -import grpc # type: ignore from dapr.clients.grpc.client import DaprGrpcClient +from dapr.clients.health import DaprHealth from dapr.conf import settings +from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context from tests.clients.test_dapr_grpc_client import DaprGrpcClientTests from .fake_dapr_server import FakeDaprSidecar -# Used temporarily, so we can trust self-signed certificates in unit tests -# until they get their own environment variable -def replacement_get_credentials_func(a): - f = open(os.path.join(os.path.dirname(__file__), 'selfsigned.pem'), 'rb') - creds = grpc.ssl_channel_credentials(f.read()) - f.close() - - return creds - - -DaprGrpcClient.get_credentials = replacement_get_credentials_func - - class DaprSecureGrpcClientTests(DaprGrpcClientTests): - server_port = 4443 + grpc_port = 50001 + http_port = 4443 # The http server is used for health checks only, and doesn't need TLS scheme = 'https://' - def setUp(self): - self._fake_dapr_server = FakeDaprSidecar() - self._fake_dapr_server.start_secure(self.server_port) + DaprGrpcClient.get_credentials = replacement_get_credentials_func + DaprHealth.get_ssl_context = replacement_get_health_context + + @classmethod + def setUpClass(cls): + cls._fake_dapr_server = FakeDaprSidecar(grpc_port=cls.grpc_port, http_port=cls.http_port) + cls._fake_dapr_server.start_secure() + settings.DAPR_HTTP_PORT = cls.http_port + settings.DAPR_HTTP_ENDPOINT = 'https://127.0.0.1:{}'.format(cls.http_port) - def tearDown(self): - self._fake_dapr_server.stop_secure() + @classmethod + def tearDownClass(cls): + cls._fake_dapr_server.stop_secure() @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'https://domain1.com:5000') def test_init_with_DAPR_GRPC_ENDPOINT(self): diff --git a/tests/clients/test_exceptions.py b/tests/clients/test_exceptions.py index d6f2890a..fb349b09 100644 --- a/tests/clients/test_exceptions.py +++ b/tests/clients/test_exceptions.py @@ -7,6 +7,7 @@ from dapr.clients import DaprGrpcClient from dapr.clients.exceptions import DaprGrpcError +from dapr.conf import settings from .fake_dapr_server import FakeDaprSidecar @@ -89,17 +90,23 @@ def create_expected_status(): class DaprExceptionsTestCase(unittest.TestCase): - def setUp(self): - self._server_port = 8080 - self._fake_dapr_server = FakeDaprSidecar() - self._fake_dapr_server.start(self._server_port) - self._expected_status = create_expected_status() + _grpc_port = 50001 + _http_port = 3500 - def tearDown(self): - self._fake_dapr_server.stop() + @classmethod + def setUpClass(cls): + cls._fake_dapr_server = FakeDaprSidecar(grpc_port=cls._grpc_port, http_port=cls._http_port) + settings.DAPR_HTTP_PORT = cls._http_port + settings.DAPR_HTTP_ENDPOINT = 'http://127.0.0.1:{}'.format(cls._http_port) + cls._fake_dapr_server.start() + cls._expected_status = create_expected_status() + + @classmethod + def tearDownClass(cls): + cls._fake_dapr_server.stop() def test_exception_status_parsing(self): - dapr = DaprGrpcClient(f'localhost:{self._server_port}') + dapr = DaprGrpcClient(f'localhost:{self._grpc_port}') self._fake_dapr_server.raise_exception_on_next_call(self._expected_status) with self.assertRaises(DaprGrpcError) as context: @@ -186,7 +193,7 @@ def test_exception_status_parsing(self): ) def test_error_code(self): - dapr = DaprGrpcClient(f'localhost:{self._server_port}') + dapr = DaprGrpcClient(f'localhost:{self._grpc_port}') expected_status = create_expected_status() diff --git a/tests/clients/test_http_helpers.py b/tests/clients/test_http_helpers.py new file mode 100644 index 00000000..ab173cd7 --- /dev/null +++ b/tests/clients/test_http_helpers.py @@ -0,0 +1,22 @@ +import unittest +from unittest.mock import patch + +from dapr.conf import settings +from dapr.clients.http.helpers import get_api_url + + +class DaprHttpClientHelpersTests(unittest.TestCase): + def test_get_api_url_default(self, dapr=None): + self.assertEqual( + 'http://{}:{}/{}'.format( + settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION + ), + get_api_url(), + ) + + @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'https://domain1.com:5000') + def test_get_api_url_endpoint_as_env_variable(self): + self.assertEqual( + 'https://domain1.com:5000/{}'.format(settings.DAPR_API_VERSION), + get_api_url(), + ) diff --git a/tests/clients/test_http_service_invocation_client.py b/tests/clients/test_http_service_invocation_client.py index a977d9ca..d45c530b 100644 --- a/tests/clients/test_http_service_invocation_client.py +++ b/tests/clients/test_http_service_invocation_client.py @@ -17,7 +17,6 @@ import typing import unittest from asyncio import TimeoutError -from unittest.mock import patch from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider @@ -25,55 +24,39 @@ from opentelemetry.sdk.trace.sampling import ALWAYS_ON from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from dapr.clients import DaprClient + from dapr.clients.exceptions import DaprInternalError from dapr.conf import settings from dapr.proto import common_v1 from .fake_http_server import FakeHttpServer +from dapr.clients import DaprClient class DaprInvocationHttpClientTests(unittest.TestCase): - def setUp(self): - self.server = FakeHttpServer() - self.server_port = self.server.get_port() - self.server.start() - settings.DAPR_HTTP_PORT = self.server_port - settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' - self.client = DaprClient() - self.app_id = 'fakeapp' - self.method_name = 'fakemethod' - self.invoke_url = f'/v1.0/invoke/{self.app_id}/method/{self.method_name}' + server_port = 3500 - def tearDown(self): - self.server.shutdown_server() - settings.DAPR_API_TOKEN = None - settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' + @classmethod + def setUpClass(cls): + cls.server = FakeHttpServer(cls.server_port) + cls.server.start() - def test_get_api_url_default(self): - client = DaprClient() - self.assertEqual( - 'http://{}:{}/{}'.format( - settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION - ), - client.invocation_client._client.get_api_url(), - ) + cls.app_id = 'fakeapp' + cls.method_name = 'fakemethod' + cls.invoke_url = f'/v1.0/invoke/{cls.app_id}/method/{cls.method_name}' - @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'https://domain1.com:5000') - def test_dont_get_api_url_endpoint_as_argument(self): - client = DaprClient('http://localhost:5000') - self.assertEqual( - 'https://domain1.com:5000/{}'.format(settings.DAPR_API_VERSION), - client.invocation_client._client.get_api_url(), - ) + @classmethod + def tearDownClass(cls): + cls.server.shutdown_server() - @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'https://domain1.com:5000') - def test_get_api_url_endpoint_as_env_variable(self): - client = DaprClient() - self.assertEqual( - 'https://domain1.com:5000/{}'.format(settings.DAPR_API_VERSION), - client.invocation_client._client.get_api_url(), - ) + def setUp(self): + settings.DAPR_API_TOKEN = None + settings.DAPR_HTTP_PORT = self.server_port + settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' + settings.DAPR_HTTP_ENDPOINT = 'http://127.0.0.1:{}'.format(self.server_port) + + self.server.reset() + self.client = DaprClient() def test_basic_invoke(self): self.server.set_response(b'STRING_BODY') diff --git a/tests/clients/test_secure_http_service_invocation_client.py b/tests/clients/test_secure_http_service_invocation_client.py index 160f3521..f23bc11c 100644 --- a/tests/clients/test_secure_http_service_invocation_client.py +++ b/tests/clients/test_secure_http_service_invocation_client.py @@ -15,7 +15,6 @@ import ssl import typing from asyncio import TimeoutError -from unittest.mock import patch from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider @@ -23,48 +22,71 @@ from opentelemetry.sdk.trace.sampling import ALWAYS_ON from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from dapr.clients import DaprClient +from dapr.clients import DaprClient, DaprGrpcClient +from dapr.clients.health import DaprHealth from dapr.clients.http.client import DaprHttpClient from dapr.conf import settings from dapr.proto import common_v1 -from .certs import CERTIFICATE_CHAIN_PATH +from .certs import replacement_get_health_context, replacement_get_credentials_func, GrpcCerts from .fake_http_server import FakeHttpServer from .test_http_service_invocation_client import DaprInvocationHttpClientTests -def replacement_get_ssl_context(a): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ssl_context.load_verify_locations(CERTIFICATE_CHAIN_PATH) +def replacement_get_client_ssl_context(a): + """ + This method is used (overwritten) from tests + to return context for self-signed certificates + """ + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE - return ssl_context + return context + + +DaprHttpClient.get_ssl_context = replacement_get_client_ssl_context +DaprGrpcClient.get_credentials = replacement_get_credentials_func +DaprHealth.get_ssl_context = replacement_get_health_context class DaprSecureInvocationHttpClientTests(DaprInvocationHttpClientTests): - def setUp(self): - DaprHttpClient.get_ssl_context = replacement_get_ssl_context + server_port = 4443 - self.server = FakeHttpServer(secure=True) - self.server_port = self.server.get_port() - self.server.start() - settings.DAPR_HTTP_PORT = self.server_port - settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' - settings.DAPR_HTTP_ENDPOINT = 'https://localhost:{}'.format(self.server_port) - self.client = DaprClient() - self.app_id = 'fakeapp' - self.method_name = 'fakemethod' - self.invoke_url = f'/v1.0/invoke/{self.app_id}/method/{self.method_name}' + @classmethod + def setUpClass(cls): + cls.server = FakeHttpServer(cls.server_port) + cls.server.start_secure() + + cls.app_id = 'fakeapp' + cls.method_name = 'fakemethod' + cls.invoke_url = f'/v1.0/invoke/{cls.app_id}/method/{cls.method_name}' + + # We need to set up the certificates for the gRPC server + # because the DaprGrpcClient will try to create a connection + GrpcCerts.create_certificates() + + @classmethod + def tearDownClass(cls): + GrpcCerts.delete_certificates() + cls.server.shutdown_server() - def tearDown(self): - self.server.shutdown_server() + def setUp(self): settings.DAPR_API_TOKEN = None + settings.DAPR_HTTP_PORT = self.server_port settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http' + settings.DAPR_HTTP_ENDPOINT = 'https://127.0.0.1:{}'.format(self.server_port) + + self.server.reset() + self.client = DaprClient() def test_global_timeout_setting_is_honored(self): previous_timeout = settings.DAPR_HTTP_TIMEOUT_SECONDS settings.DAPR_HTTP_TIMEOUT_SECONDS = 1 + new_client = DaprClient(f'https://localhost:{self.server_port}') + self.server.set_server_delay(1.5) with self.assertRaises(TimeoutError): new_client.invoke_method(self.app_id, self.method_name, '') @@ -117,13 +139,3 @@ def test_timeout_exception_thrown_when_timeout_reached(self): self.server.set_server_delay(1.5) with self.assertRaises(TimeoutError): new_client.invoke_method(self.app_id, self.method_name, '') - - @patch.object(settings, 'DAPR_HTTP_ENDPOINT', None) - def test_get_api_url_default(self): - client = DaprClient() - self.assertEqual( - 'http://{}:{}/{}'.format( - settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION - ), - client.invocation_client._client.get_api_url(), - )