Skip to content

Commit

Permalink
Add use_check_availability to CheckDynamicStream
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Jan 29, 2025
1 parent f59cd42 commit 7b148ce
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 68 deletions.
27 changes: 15 additions & 12 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CheckDynamicStream(ConnectionChecker):

stream_count: int
parameters: InitVar[Mapping[str, Any]]
use_check_availability: bool = True

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand All @@ -31,21 +32,23 @@ def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)

if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
if not self.use_check_availability:
return True, None

availability_strategy = HttpAvailabilityStrategy()

for stream_index in range(min(self.stream_count, len(streams))):
stream = streams[stream_index]
availability_strategy = HttpAvailabilityStrategy()
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
try:
for stream in streams[:min(self.stream_count, len(streams))]:
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
except Exception as error:
logger.error(
f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}"
)
return False, f"Unable to connect to stream {stream.name} - {error}"
except Exception as error:
error_message = f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
logger.error(error_message, exc_info=True)
return False, error_message

return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ definitions:
title: Stream Count
description: Numbers of the streams to try reading from when running a check operation.
type: integer
use_check_availability:
title: Use Check Availability
description: Enables stream check availability. This field is automatically set by the CDK.
type: boolean
default: true
CompositeErrorHandler:
title: Composite Error Handler
description: Error handler that sequentially iterates over a list of error handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class CheckDynamicStream(BaseModel):
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)
use_check_availability: Optional[bool] = Field(
True,
description="Enables stream check availability. This field is automatically set by the CDK.",
title="Use Check Availability",
)


class ConcurrencyLevel(BaseModel):
Expand Down Expand Up @@ -604,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -1040,24 +1047,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -1075,7 +1086,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1661,7 +1674,9 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
schema_normalization: Optional[
Union[SchemaNormalization, CustomSchemaNormalization]
] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
Expand Down Expand Up @@ -1835,12 +1850,16 @@ class Config:
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = (
Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
)
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
Expand Down Expand Up @@ -2112,7 +2131,11 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2156,7 +2179,9 @@ class AsyncRetriever(BaseModel):
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -2190,7 +2215,11 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2258,10 +2287,12 @@ class DynamicDeclarativeStream(BaseModel):
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,11 @@ def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any)
def create_check_dynamic_stream(
model: CheckDynamicStreamModel, config: Config, **kwargs: Any
) -> CheckDynamicStream:
return CheckDynamicStream(stream_count=model.stream_count, parameters={})
assert model.use_check_availability is not None # for mypy

use_check_availability = model.use_check_availability

return CheckDynamicStream(stream_count=model.stream_count, use_check_availability=use_check_availability, parameters={})

def create_composite_error_handler(
self, model: CompositeErrorHandlerModel, config: Config, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ def _create_error_message(self, response: requests.Response) -> Optional[str]:
:param response: The HTTP response which can be used during interpolation
:return: The evaluated error message string to be emitted
"""
return self.error_message.eval( # type: ignore [no-any-return, union-attr]
return self.error_message.eval( # type: ignore[no-any-return, union-attr]
self.config, response=self._safe_response_json(response), headers=response.headers
)

def _response_matches_predicate(self, response: requests.Response) -> bool:
return (
bool(
self.predicate.condition # type: ignore [union-attr]
and self.predicate.eval( # type: ignore [union-attr]
None, # type: ignore [arg-type]
self.predicate.condition # type:ignore[union-attr]
and self.predicate.eval( # type:ignore[union-attr]
None, # type: ignore[arg-type]
response=self._safe_response_json(response),
headers=response.headers,
)
Expand Down
Loading

0 comments on commit 7b148ce

Please sign in to comment.