Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add an outbound health check #641

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dapr/aio/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dapr.aio.clients.grpc.client import DaprGrpcClientAsync, MetadataTuple, InvokeMethodResponse
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
from dapr.clients.http.helpers import DaprHealthClient
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage

Expand Down Expand Up @@ -71,6 +72,7 @@
"""
super().__init__(address, interceptors, max_grpc_message_length)
self.invocation_client = None
self.health_client = DaprHealthClient(timeout=http_timeout_seconds)

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()

Expand Down Expand Up @@ -131,3 +133,15 @@
http_querystring=http_querystring,
timeout=timeout
)

async def wait(self, timeout_s: int):
"""Wait for the client to become ready. If the client is already ready, this
method returns immediately.

Args:
timeout_s (float): The maximum time to wait in seconds.

Throws:
DaprInternalError: if the timeout expires.
"""
await self.health_client.wait_async(timeout_s)

Check warning on line 147 in dapr/aio/clients/__init__.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/__init__.py#L147

Added line #L147 was not covered by tests
26 changes: 26 additions & 0 deletions dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dapr.clients.base import DaprActorClientBase
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
from dapr.clients.http.helpers import DaprHealthClient
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
from dapr.conf import settings
Expand Down Expand Up @@ -72,6 +73,7 @@
"""
super().__init__(address, interceptors, max_grpc_message_length)
self.invocation_client = None
self.helath_client = DaprHealthClient(timeout=http_timeout_seconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.helath_client = DaprHealthClient(timeout=http_timeout_seconds)
self.health_client = DaprHealthClient(timeout=http_timeout_seconds)


invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()

Expand Down Expand Up @@ -173,3 +175,27 @@
else:
raise NotImplementedError(
'Please use `dapr.aio.clients.DaprClient` for async invocation')

def wait(self, timeout_s: float):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is float because DaprGrpcClient already has a method wait as float, I can remove that method if that would be better....

"""Wait for the client to become ready. If the client is already ready, this
method returns immediately.

Args:
timeout_s (float): The maximum time to wait in seconds.

Throws:
DaprInternalError: if the timeout expires.
"""
self.helath_client.wait(int(timeout_s))

Check warning on line 189 in dapr/clients/__init__.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/__init__.py#L189

Added line #L189 was not covered by tests

async def wait_async(self, timeout_s: float):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is float because DaprGrpcClient already has a method wait as float, I can remove that method if that would be better....

"""Wait for the client to become ready. If the client is already ready, this
method returns immediately.

Args:
timeout_s (float): The maximum time to wait in seconds.

Throws:
DaprInternalError: if the timeout expires.
"""
await self.helath_client.wait_async(int(timeout_s))

Check warning on line 201 in dapr/clients/__init__.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/__init__.py#L201

Added line #L201 was not covered by tests
81 changes: 81 additions & 0 deletions dapr/clients/http/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import asyncio
import time

from typing import Optional

from dapr.clients.http.client import DaprHttpClient, USER_AGENT_HEADER, DAPR_USER_AGENT
from dapr.serializers import DefaultJSONSerializer


class DaprHealthClient:
"""Dapr Health Client"""

def __init__(self, timeout: Optional[int] = 60):
self._client = DaprHttpClient(DefaultJSONSerializer(), timeout, None, None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need to pass in address from DaprClient initialization?
Both http and grpc might not have the same endpoint address.

Copy link
Member

@berndverst berndverst Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unrelated to this PR IMO. We can open an issue for discussion in the future. I don't think we need to tackle that until someone has a concrete need for it. The HTTP Client is only used by Actors and by Service Invocation (and now the health client), though using the SDK for service invocation is no longer the recommended way. Perhaps you can take a look how the DotNet SDK handles this and compare.


async def wait_async(self, timeout_s: int):
"""Wait for the client to become ready. If the client is already ready, this
method returns immediately.

Args:
timeout_s (float): The maximum time to wait in seconds.

Throws:
DaprInternalError: if the timeout expires.
"""
async def make_request() -> bool:
_, r = await self._client.send_bytes(
method='GET',
headers={USER_AGENT_HEADER: DAPR_USER_AGENT},
url=f'{self._client.get_api_url()}/healthz/outbound',
data=None,
query_params=None,
timeout=timeout_s)

return r.status >= 200 and r.status < 300

start = time.time()
while True:
try:
healthy = await make_request()
if healthy:
return
except Exception as e:
remaining = (start + timeout_s) - time.time()
if remaining < 0:
raise e # This will be DaprInternalError as defined in http/client.py
time.sleep(min(1, remaining))

def wait(self, timeout_s: int):
"""Wait for the client to become ready. If the client is already ready, this
method returns immediately.

Args:
timeout_s (float): The maximum time to wait in seconds.

Throws:
DaprInternalError: if the timeout expires.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

awaitable = self.wait_async(timeout_s)
loop.run_until_complete(awaitable)
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ dapr.serializers =
py.typed

[flake8]
exclude =
exclude =
.venv,
.env,
venv,
build,
dist,
Expand Down
45 changes: 45 additions & 0 deletions tests/clients/test_health_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest

from .fake_http_server import FakeHttpServer

from dapr.clients.http.helpers import DaprHealthClient
from dapr.conf import settings


class DaprHealthClientTests(unittest.TestCase):

def setUp(self):
self.server = FakeHttpServer()
self.server_port = self.server.get_port()
self.server.start()
settings.DAPR_HTTP_PORT = self.server_port
settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http'
self.client = DaprHealthClient()
self.app_id = 'fakeapp'

def tearDown(self):
self.server.shutdown_server()
settings.DAPR_API_TOKEN = None
settings.DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http'

def test_wait_ok(self):
self.client.wait(1)

def test_wait_timeout(self):
self.server.shutdown_server()
with self.assertRaises(Exception):
self.client.wait(1)
Loading