Skip to content

Commit

Permalink
[Low code CDK] add support for iterating over parent stream slices (a…
Browse files Browse the repository at this point in the history
…irbytehq#13287)

* checkout from alex/cac

* checkout from alex/cac

* checkout from alex/cac

* Add missing tests

* Add missing files

* Add missing tests

* add missing file

* missing file

* missing file

* rename

* doc

* doc

* remove broken test

* rename

* jinja dependency

* Add comment

* comment

* comment

* pyjq dependency

* rename file

* delete unused file

* Revert "delete unused file"

This reverts commit 758e939.

* fix

* rename

* abstract property

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223.

* isort

* update state

* fix imports

* update dependency

* format

* rename file

* decoder

* Use decoder

* Update comment

* dict_state is actually backed by a dict

* Add a comment

* update state takes kwargs

* move state out of offset paginator

* update jq parameter order

* update

* remove incremental mixin

* delete comment

* update comments

* update comments

* remove no_state

* rename package

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223.

* format

* decoder

* better error handling

* remove nostate

* isort

* remove print

* move test

* delete duplicates

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* pass parameters through kwargs

* update interface to pass source in interface

* update interface to pass source in interface

* rename to stream_slicer

* Allow passing a string or an enum

* Define StateType enum

* convert state_type if not of type type

* convert state_type if not of type type

* Low code connectors: string interpolation with jinja (airbytehq#12852)

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223.

* format

* decoder

* better error handling

* remove nostate

* isort

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* move test to right module

* Add missing test

* Use authbase instead of deprecated class

* leverage generator

* Delete dead code

* rename methods

* rename to declarative

* rename the classes too

* Try to install packages to build jq

* isort

* only automake

* Revert "only automake"

This reverts commit c8fe154.

* remove git

* format

* substream slicer

* specify everything in the slice_definition

* Add jq dependency

* support multiple parent streams

* support getting the parent stream name

* cleanup

* Add comment

* format

* delete duplicate file
  • Loading branch information
girarda authored Jun 2, 2022
1 parent d0d74ec commit 7fbe6da
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Iterable, List, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.streams.core import Stream


class SubstreamSlicer(StreamSlicer):
"""
Stream slicer that iterates over the parent's stream slices and records and emits slices by interpolating the slice_definition mapping
Will populate the state with `parent_stream_slice` and `parent_record` so they can be accessed by other components
"""

def __init__(self, parent_streams: List[Stream], state: DictState, slice_definition: Mapping[str, Any]):
self._parent_streams = parent_streams
self._state = state
self._interpolation = InterpolatedMapping(slice_definition, JinjaInterpolation())

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
"""
Iterate over each parent stream.
For each stream, iterate over its stream_slices.
For each stream slice, iterate over each records.
yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
"""
if not self._parent_streams:
yield from []
else:
for parent_stream in self._parent_streams:
for parent_stream_slice in parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=None, stream_state=stream_state):
self._state.update_state(parent_stream_slice=parent_stream_slice)
self._state.update_state(parent_record=None)
empty_parent_slice = True

for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
empty_parent_slice = False
slice_definition = self._get_slice_definition(parent_stream_slice, parent_record, parent_stream.name)
self._state.update_state(parent_record=parent_record)
yield slice_definition
# If the parent slice contains no records,
# yield a slice definition with parent_record==None
if empty_parent_slice:
slice_definition = self._get_slice_definition(parent_stream_slice, None, parent_stream.name)
yield slice_definition

def _get_slice_definition(self, parent_stream_slice, parent_record, parent_stream_name):
return self._interpolation.eval(
None, parent_stream_slice=parent_stream_slice, parent_record=parent_record, parent_stream_name=parent_stream_name
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ def test():
"field": "value",
"field_to_interpolate_from_config": "{{ config['c'] }}",
"field_to_interpolate_from_kwargs": "{{ kwargs['a'] }}",
"a_field": "{{ value_passed_directly }}",
}
config = {"c": "VALUE_FROM_CONFIG"}
kwargs = {"a": "VALUE_FROM_KWARGS"}
mapping = InterpolatedMapping(d)

interpolated = mapping.eval(config, **{"kwargs": kwargs})
value_passed_directly = "ABC"
interpolated = mapping.eval(config, **{"kwargs": kwargs}, value_passed_directly=value_passed_directly)

assert interpolated["field"] == "value"
assert interpolated["field_to_interpolate_from_config"] == "VALUE_FROM_CONFIG"
assert interpolated["field_to_interpolate_from_kwargs"] == "VALUE_FROM_KWARGS"
assert interpolated["a_field"] == value_passed_directly
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Iterable, List, Mapping, Optional, Union

import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer
from airbyte_cdk.sources.streams.core import Stream

parent_records = [{"id": 1, "data": "data1"}, {"id": 2, "data": "data2"}]
more_records = [{"id": 10, "data": "data10", "slice": "second_parent"}, {"id": 20, "data": "data20", "slice": "second_parent"}]

data_first_parent_slice = [{"id": 0, "slice": "first", "data": "A"}, {"id": 1, "slice": "first", "data": "B"}]
data_second_parent_slice = [{"id": 2, "slice": "second", "data": "C"}]
data_third_parent_slice = []
all_parent_data = data_first_parent_slice + data_second_parent_slice + data_third_parent_slice
parent_slices = [{"slice": "first"}, {"slice": "second"}, {"slice": "third"}]
second_parent_stream_slice = [{"slice": "second_parent"}]

slice_definition = {"{{ parent_stream_name }}_id": "{{ parent_record['id'] }}", "parent_slice": "{{ parent_stream_slice['slice'] }}"}


class MockStream(Stream):
def __init__(self, slices, records, name):
self._slices = slices
self._records = records
self._name = name

@property
def name(self) -> str:
return self._name

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return "id"

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self._slices

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
if not stream_slice:
yield from self._records
else:
yield from [r for r in self._records if r["slice"] == stream_slice["slice"]]


@pytest.mark.parametrize(
"test_name, parent_streams, slice_definition, expected_slices",
[
(
"test_single_parent_slices_no_records",
[MockStream([{}], [], "first_stream")],
slice_definition,
[{"first_stream_id": None, "parent_slice": None}],
),
(
"test_single_parent_slices_with_records",
[MockStream([{}], parent_records, "first_stream")],
slice_definition,
[{"first_stream_id": "1", "parent_slice": None}, {"first_stream_id": "2", "parent_slice": None}],
),
(
"test_with_parent_slices_and_records",
[MockStream(parent_slices, all_parent_data, "first_stream")],
slice_definition,
[
{"parent_slice": "first", "first_stream_id": "0"},
{"parent_slice": "first", "first_stream_id": "1"},
{"parent_slice": "second", "first_stream_id": "2"},
{"parent_slice": "third", "first_stream_id": None},
],
),
(
"test_multiple_parent_streams",
[
MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"),
MockStream(second_parent_stream_slice, more_records, "second_stream"),
],
slice_definition,
[
{"parent_slice": "first", "first_stream_id": "0"},
{"parent_slice": "first", "first_stream_id": "1"},
{"parent_slice": "second", "first_stream_id": "2"},
{"parent_slice": "third", "first_stream_id": None},
{"parent_slice": "second_parent", "second_stream_id": "10"},
{"parent_slice": "second_parent", "second_stream_id": "20"},
],
),
],
)
def test_substream_slicer(test_name, parent_streams, slice_definition, expected_slices):
state = DictState()
slicer = SubstreamSlicer(parent_streams, state, slice_definition)
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices

0 comments on commit 7fbe6da

Please sign in to comment.