Skip to content

Commit

Permalink
feat(low-code): add use check availability flag to dynamic check (#293)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
Co-authored-by: Aaron ("AJ") Steers <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2025
1 parent 3af96dc commit 4e7f94a
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 32 deletions.
28 changes: 19 additions & 9 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ class CheckDynamicStream(ConnectionChecker):
stream_count (int): numbers of streams to check
"""

# TODO: Add field stream_names to check_connection for static streams
# https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483

stream_count: int
parameters: InitVar[Mapping[str, Any]]
use_check_availability: bool = True

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand All @@ -31,21 +35,27 @@ def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)

if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
if not self.use_check_availability:
return True, None

availability_strategy = HttpAvailabilityStrategy()

for stream_index in range(min(self.stream_count, len(streams))):
stream = streams[stream_index]
availability_strategy = HttpAvailabilityStrategy()
try:
try:
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
except Exception as error:
logger.error(
f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}"
)
return False, f"Unable to connect to stream {stream.name} - {error}"
except Exception as error:
error_message = (
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
)
logger.error(error_message, exc_info=True)
return False, error_message

return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ definitions:
title: Stream Count
description: Numbers of the streams to try reading from when running a check operation.
type: integer
use_check_availability:
title: Use Check Availability
description: Enables stream check availability. This field is automatically set by the CDK.
type: boolean
default: true
CompositeErrorHandler:
title: Composite Error Handler
description: Error handler that sequentially iterates over a list of error handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class CheckDynamicStream(BaseModel):
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)
use_check_availability: Optional[bool] = Field(
True,
description="Enables stream check availability. This field is automatically set by the CDK.",
title="Use Check Availability",
)


class ConcurrencyLevel(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,15 @@ def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any)
def create_check_dynamic_stream(
model: CheckDynamicStreamModel, config: Config, **kwargs: Any
) -> CheckDynamicStream:
return CheckDynamicStream(stream_count=model.stream_count, parameters={})
assert model.use_check_availability is not None # for mypy

use_check_availability = model.use_check_availability

return CheckDynamicStream(
stream_count=model.stream_count,
use_check_availability=use_check_availability,
parameters={},
)

def create_composite_error_handler(
self, model: CompositeErrorHandlerModel, config: Config, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ def _create_error_message(self, response: requests.Response) -> Optional[str]:
:param response: The HTTP response which can be used during interpolation
:return: The evaluated error message string to be emitted
"""
return self.error_message.eval( # type: ignore [no-any-return, union-attr]
return self.error_message.eval( # type: ignore[no-any-return, union-attr]
self.config, response=self._safe_response_json(response), headers=response.headers
)

def _response_matches_predicate(self, response: requests.Response) -> bool:
return (
bool(
self.predicate.condition # type: ignore [union-attr]
and self.predicate.eval( # type: ignore [union-attr]
None, # type: ignore [arg-type]
self.predicate.condition # type:ignore[union-attr]
and self.predicate.eval( # type:ignore[union-attr]
None, # type: ignore[arg-type]
response=self._safe_response_json(response),
headers=response.headers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str,
return self._get_airbyte_type(complex_type.field_type)

field_type = self._get_airbyte_type(complex_type.field_type)

field_type["items"] = (
self._get_airbyte_type(complex_type.items)
if isinstance(complex_type.items, str)
Expand Down
46 changes: 28 additions & 18 deletions unit_tests/sources/declarative/checks/test_check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import logging
from copy import deepcopy

import pytest

Expand Down Expand Up @@ -104,56 +105,65 @@


@pytest.mark.parametrize(
"response_code, available_expectation, expected_messages",
"response_code, available_expectation, use_check_availability, expected_messages",
[
pytest.param(
404,
False,
True,
["Not found. The requested resource was not found on the server."],
id="test_stream_unavailable_unhandled_error",
),
pytest.param(
403,
False,
True,
["Forbidden. You don't have permission to access this resource."],
id="test_stream_unavailable_handled_error",
),
pytest.param(200, True, [], id="test_stream_available"),
pytest.param(200, True, True, [], id="test_stream_available"),
pytest.param(200, True, False, [], id="test_stream_available"),
pytest.param(
401,
False,
True,
["Unauthorized. Please ensure you are authenticated correctly."],
id="test_stream_unauthorized_error",
),
],
)
def test_check_dynamic_stream(response_code, available_expectation, expected_messages):
def test_check_dynamic_stream(
response_code, available_expectation, use_check_availability, expected_messages
):
manifest = deepcopy(_MANIFEST)

with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
http_mocker.get(
HttpRequest(url="https://api.test.com/items/1"),
HttpResponse(body=json.dumps(expected_messages), status_code=response_code),
items_request = HttpRequest(url="https://api.test.com/items")
items_response = HttpResponse(
body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}])
)
http_mocker.get(items_request, items_response)

item_request = HttpRequest(url="https://api.test.com/items/1")
item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code)
item_request_count = 1
http_mocker.get(item_request, item_response)

if not use_check_availability:
manifest["check"]["use_check_availability"] = False
item_request_count = 0 # stream only created and data request not called

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST,
source_config=manifest,
config=_CONFIG,
catalog=None,
state=None,
)

stream_is_available, reason = source.check_connection(logger, _CONFIG)

http_mocker.assert_number_of_calls(item_request, item_request_count)

assert stream_is_available == available_expectation
for message in expected_messages:
assert message in reason

0 comments on commit 4e7f94a

Please sign in to comment.