Skip to content

Commit

Permalink
Decouple flags for debug messages from connector builder log messages (
Browse files Browse the repository at this point in the history
…airbytehq#24881)

* decouple debug message flags from connector builder grouping messages

* Automated Commit - Formatting Changes

* pr feedback simplifying configs a bit

---------

Co-authored-by: brianjlai <[email protected]>
  • Loading branch information
brianjlai and brianjlai authored Apr 6, 2023
1 parent 204e6d8 commit 3ba15b5
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 14 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Note:
{
"config": <normal config>,
"__injected_declarative_manifest": {...},
"__command": <"resolve_manifest" | "list_streams" | "stream_read">
"__command": <"resolve_manifest" | "list_streams" | "test_read">
}
```
*See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed.
Expand Down
5 changes: 4 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource:
manifest = config.get("__injected_declarative_manifest")
return ManifestDeclarativeSource(manifest, True)
return ManifestDeclarativeSource(
source_config=manifest, component_factory=ModelToComponentFactory(emit_connector_builder_messages=True)
)


def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@


class ModelToComponentFactory:
def __init__(self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None):
def __init__(
self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None, emit_connector_builder_messages: bool = False
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self._limit_slices_fetched = limit_slices_fetched
self._emit_connector_builder_messages = emit_connector_builder_messages

def _init_mappings(self):
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = {
Expand Down Expand Up @@ -759,7 +762,7 @@ def create_simple_retriever(
else NoPagination(parameters={})
)

if self._limit_slices_fetched:
if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from itertools import islice
from json import JSONDecodeError
Expand Down Expand Up @@ -61,6 +60,7 @@ class SimpleRetriever(Retriever, HttpStream):
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: Optional[StreamSlicer] = SinglePartitionRouter(parameters={})
emit_connector_builder_messages: bool = False

def __post_init__(self, parameters: Mapping[str, Any]):
self.paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -368,7 +368,7 @@ def read_records(
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = self._read_pages(
self._parse_records_and_emit_request_and_responses,
self.parse_records,
stream_slice,
stream_state,
)
Expand Down Expand Up @@ -408,13 +408,13 @@ def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def _parse_records_and_emit_request_and_responses(self, request, response, stream_state, stream_slice) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield _prepared_request_to_airbyte_message(request)
yield _response_to_airbyte_message(response)
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
def parse_records(
self,
request: requests.PreparedRequest,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any],
) -> Iterable[StreamData]:
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


Expand All @@ -439,6 +439,17 @@ def stream_slices(
) -> Iterable[Optional[Mapping[str, Any]]]:
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices)

def parse_records(
self,
request: requests.PreparedRequest,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any],
) -> Iterable[StreamData]:
yield _prepared_request_to_airbyte_message(request)
yield _response_to_airbyte_message(response)
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel
from airbyte_cdk.sources.declarative.models import OAuthAuthenticator as OAuthAuthenticatorModel
from airbyte_cdk.sources.declarative.models import RecordSelector as RecordSelectorModel
from airbyte_cdk.sources.declarative.models import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models import Spec as SpecModel
from airbyte_cdk.sources.declarative.models import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
Expand All @@ -46,7 +47,7 @@
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
Expand Down Expand Up @@ -1296,3 +1297,29 @@ def test_merge_incremental_and_partition_router(incremental, partition_router, e
if expected_slicer_count > 1:
assert isinstance(stream.retriever.stream_slicer, CartesianProductStreamSlicer)
assert len(stream.retriever.stream_slicer.stream_slicers) == expected_slicer_count


def test_simple_retriever_emit_log_messages():
simple_retriever_model = {
"type": "SimpleRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
},
"requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"},
}

connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
retriever = connector_builder_factory.create_component(
model_type=SimpleRetrieverModel,
component_definition=simple_retriever_model,
config={},
name="Test",
primary_key="id",
stream_slicer=None,
)

assert isinstance(retriever, SimpleRetrieverTestReadDecorator)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Mapping
from unittest.mock import MagicMock, patch

import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
Expand All @@ -10,6 +11,7 @@
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
Expand Down Expand Up @@ -701,3 +703,42 @@ def test_read_records_updates_stream_slicer_once_if_no_records(test_name, last_r

def _generate_slices(number_of_slices):
return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)]


def test_emit_log_request_response_messages():
record_selector = MagicMock()
record_selector.select_records.return_value = records

request = requests.PreparedRequest()
request.headers = {"header": "value"}
request.url = "http://byrde.enterprises.com/casinos"

response = requests.Response()
response.request = request
response.status_code = 200

retriever = SimpleRetrieverTestReadDecorator(
name="stream_name",
primary_key=primary_key,
requester=MagicMock(),
paginator=MagicMock(),
record_selector=record_selector,
stream_slicer=SinglePartitionRouter(parameters={}),
parameters={},
config={},
)

request_log_message, response_log_message, record_1, record_2 = [
record for record in retriever.parse_records(request=request, response=response, stream_slice={}, stream_state={})
]

assert isinstance(request_log_message, AirbyteMessage)
assert request_log_message.type == Type.LOG
assert "request:" in request_log_message.log.message
assert isinstance(response_log_message, AirbyteMessage)
assert response_log_message.type == Type.LOG
assert "response:" in response_log_message.log.message
assert isinstance(record_1, Mapping)
assert record_1 == records[0]
assert isinstance(record_1, Mapping)
assert record_2 == records[1]

0 comments on commit 3ba15b5

Please sign in to comment.