Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove stream_state from interpolation contexts #325

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
42490f1
fix: remove stream_state from interpolation contexts
devin-ai-integration[bot] Feb 7, 2025
3cbdb9b
fix: add validation to prevent stream_state usage in interpolation
devin-ai-integration[bot] Feb 7, 2025
804019e
fix: remove stream_state from request options
devin-ai-integration[bot] Feb 7, 2025
b601c3c
test: add test for stream_state validation
devin-ai-integration[bot] Feb 7, 2025
245aeed
fix: update AirbyteTracedException import path
devin-ai-integration[bot] Feb 7, 2025
d308e31
fix: update AirbyteTracedException import path
devin-ai-integration[bot] Feb 7, 2025
9c2d7c9
style: fix import sorting
devin-ai-integration[bot] Feb 7, 2025
0d7d462
test: fix JWT token comparison in test
devin-ai-integration[bot] Feb 7, 2025
7e7e193
style: fix formatting
devin-ai-integration[bot] Feb 7, 2025
826c132
style: fix import order in jinja.py
devin-ai-integration[bot] Feb 7, 2025
efe206b
style: fix import sorting with ruff
devin-ai-integration[bot] Feb 7, 2025
47530d7
test: update min_max_datetime test to use stream_interval
devin-ai-integration[bot] Feb 7, 2025
c924a04
test: update remaining stream_state references to stream_interval
devin-ai-integration[bot] Feb 7, 2025
d34318c
fix: remove stream_interval from aliases to allow direct usage
devin-ai-integration[bot] Feb 7, 2025
158534a
test: update test_max_newer_time_from_parameters to use stream_interval
devin-ai-integration[bot] Feb 7, 2025
5219447
test: update record filter test to use stream_interval
devin-ai-integration[bot] Feb 7, 2025
6c7c103
test: update record filter test to pass stream_interval in kwargs
devin-ai-integration[bot] Feb 7, 2025
b214d38
feat: add stream_interval support to record filter
devin-ai-integration[bot] Feb 7, 2025
c6932a8
test: update record selector test to use stream_interval
devin-ai-integration[bot] Feb 7, 2025
3a9a323
style: fix formatting in test_record_selector.py
devin-ai-integration[bot] Feb 7, 2025
be05918
feat: add stream_interval support to record selector
devin-ai-integration[bot] Feb 7, 2025
6948f80
style: fix formatting in record_selector.py
devin-ai-integration[bot] Feb 7, 2025
71f8db2
feat: add stream_interval support to record selector transform method
devin-ai-integration[bot] Feb 7, 2025
b9fe92a
feat: add stream_interval support to record transformation
devin-ai-integration[bot] Feb 7, 2025
788ebaa
feat: add stream_interval support to all transformations
devin-ai-integration[bot] Feb 7, 2025
46aad70
fix: update transformation base class to use Mapping type
devin-ai-integration[bot] Feb 7, 2025
517faad
fix: update transformation implementations to match base class
devin-ai-integration[bot] Feb 7, 2025
0657aeb
fix: update record selector to use Dict type for stream_interval
devin-ai-integration[bot] Feb 7, 2025
ed17c96
fix: update transformation implementations to use Dict type for strea…
devin-ai-integration[bot] Feb 7, 2025
e7a6989
fix: remove stream_state from add_fields kwargs
devin-ai-integration[bot] Feb 7, 2025
74e37b9
fix: remove stream_state from request options interpolation
devin-ai-integration[bot] Feb 7, 2025
f3ab630
fix: update transformation imports and fix record selector name
devin-ai-integration[bot] Feb 7, 2025
19f397e
fix: check parent stream state interpolation in stream classification
devin-ai-integration[bot] Feb 7, 2025
ccd9c26
style: fix formatting issues
devin-ai-integration[bot] Feb 7, 2025
bcdafc7
fix: add missing name parameter to RecordSelector in test
devin-ai-integration[bot] Feb 7, 2025
dde539e
fix: add missing name parameter to RecordSelector in test_record_sele…
devin-ai-integration[bot] Feb 7, 2025
a9e1f3e
fix: update Record assertions with correct stream name in test_record…
devin-ai-integration[bot] Feb 7, 2025
ebb8e54
style: fix formatting in test_record_selector
devin-ai-integration[bot] Feb 7, 2025
a4be945
fix: update stream_interval fallback in request input provider
devin-ai-integration[bot] Feb 7, 2025
11269f2
fix: restore stream_state aliasing in jinja.py and remove stream_inte…
devin-ai-integration[bot] Feb 10, 2025
ca7eef2
style: fix formatting in record_filter.py and record_selector.py
devin-ai-integration[bot] Feb 10, 2025
d643b99
test: update test_record_selector to use stream_interval consistently
devin-ai-integration[bot] Feb 10, 2025
540cc44
test: update test_record_filter to use stream_interval consistently
devin-ai-integration[bot] Feb 10, 2025
7607cf9
fix: add stream_interval support to RecordFilter.filter_records
devin-ai-integration[bot] Feb 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers import (
AsyncJobPartitionRouter,
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
Expand Down Expand Up @@ -397,36 +400,25 @@ def _is_datetime_incremental_without_partition_routing(
)
)

def _stream_supports_concurrent_partition_processing(
def _stream_uses_stream_state_interpolation(
self, declarative_stream: DeclarativeStream
) -> bool:
"""
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that
state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel,
stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's
stream_state can be updated in any order depending on which stream partition's finish first.

We should start to move away from depending on the value of stream_state for low-code components that operate
per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the
cdk-migrations.md for the full list of connectors.
"""

if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(
declarative_stream.retriever.requester, HttpRequester
):
http_requester = declarative_stream.retriever.requester
if "stream_state" in http_requester._path.string:
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe."
)
return False
return True

request_options_provider = http_requester._request_options_provider
if request_options_provider.request_options_contain_stream_state():
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe."
)
return False
return True

record_selector = declarative_stream.retriever.record_selector
if isinstance(record_selector, RecordSelector):
Expand All @@ -438,9 +430,9 @@ def _stream_supports_concurrent_partition_processing(
and "stream_state" in record_selector.record_filter.condition
):
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing"
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe."
)
return False
return True

for add_fields in [
transformation
Expand All @@ -450,17 +442,47 @@ def _stream_supports_concurrent_partition_processing(
for field in add_fields.fields:
if isinstance(field.value, str) and "stream_state" in field.value:
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe."
)
return False
return True
if (
isinstance(field.value, InterpolatedString)
and "stream_state" in field.value.string
):
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe."
)
return False
return True
return False

def _stream_supports_concurrent_partition_processing(
self, declarative_stream: DeclarativeStream
) -> bool:
"""
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that
state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel,
stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's
stream_state can be updated in any order depending on which stream partition's finish first.

We should start to move away from depending on the value of stream_state for low-code components that operate
per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the
cdk-migrations.md for the full list of connectors.
"""
# Check if the stream uses stream_state interpolation in any of its components
if self._stream_uses_stream_state_interpolation(declarative_stream):
return False

# Check if any parent stream uses stream_state interpolation
if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(
declarative_stream.retriever.stream_slicer, SubstreamPartitionRouter
):
for parent_config in declarative_stream.retriever.stream_slicer.parent_stream_configs:
if self._stream_uses_stream_state_interpolation(parent_config.stream):
self.logger.warning(
f"Low-code stream '{declarative_stream.name}' has a parent stream that uses stream_state interpolation which is not thread-safe. Defaulting to synchronous processing"
)
return False

return True

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
Expand Down Expand Up @@ -1611,7 +1610,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "/products"
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
Expand Down Expand Up @@ -1661,7 +1659,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- |
[{"clause": {"type": "timestamp", "operator": 10, "parameters":
Expand All @@ -1679,7 +1676,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- sort_order: "ASC"
sort_field: "CREATED_AT"
Expand All @@ -1700,7 +1696,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- Output-Format: JSON
- Version: "{{ config['version'] }}"
Expand All @@ -1717,7 +1712,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- unit: "day"
- query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
Expand Down Expand Up @@ -2072,7 +2066,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
Expand All @@ -2086,7 +2079,6 @@ definitions:
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
Expand Down Expand Up @@ -2753,7 +2745,6 @@ definitions:
- stream_interval
- stream_partition
- stream_slice
- stream_state
examples:
- "{{ record['created_at'] >= stream_interval['start_time'] }}"
- "{{ record.status in ['active', 'expired'] }}"
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ def filter_records(
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"next_page_token": next_page_token,
"stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {},
"stream_interval": stream_interval or {},
}
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
Expand Down Expand Up @@ -71,6 +73,7 @@ def filter_records(
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
records = (
record
Expand All @@ -87,5 +90,6 @@ def filter_records(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
stream_interval=stream_interval,
)
yield from records
27 changes: 19 additions & 8 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

import requests

Expand Down Expand Up @@ -50,8 +50,8 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else self._name
)

@property # type: ignore
def name(self) -> str:
@property
def stream_name(self) -> str:
"""
:return: Stream name
"""
Expand All @@ -61,8 +61,8 @@ def name(self) -> str:
else self._name
)

@name.setter
def name(self, value: str) -> None:
@stream_name.setter
def stream_name(self, value: str) -> None:
if not isinstance(value, property):
self._name = value

Expand All @@ -73,6 +73,7 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Record]:
"""
Selects records from the response
Expand All @@ -81,11 +82,12 @@ def select_records(
:param records_schema: json schema of records to return
:param stream_slice: The stream slice
:param next_page_token: The paginator token
:param stream_interval: The stream interval for incremental sync values
:return: List of Records selected from the response
"""
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
yield from self.filter_and_transform(
all_data, stream_state, records_schema, stream_slice, next_page_token
all_data, stream_state, records_schema, stream_slice, next_page_token, stream_interval
)

def filter_and_transform(
Expand All @@ -95,6 +97,7 @@ def filter_and_transform(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Record]:
"""
There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and
Expand All @@ -104,8 +107,12 @@ def filter_and_transform(
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
share the logic of doing transformations on a set of records.
"""
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
filtered_data = self._filter(
all_data, stream_state, stream_slice, next_page_token, stream_interval
)
transformed_data = self._transform(
filtered_data, stream_state, stream_slice, stream_interval
)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
Expand All @@ -128,13 +135,15 @@ def _filter(
stream_state: StreamState,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
if self.record_filter:
yield from self.record_filter.filter_records(
records,
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
stream_interval=stream_interval,
)
else:
yield from records
Expand All @@ -144,6 +153,7 @@ def _transform(
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
for record in records:
for transformation in self.transformations:
Expand All @@ -152,5 +162,6 @@ def _transform(
config=self.config,
stream_state=stream_state,
stream_slice=stream_slice,
stream_interval=stream_interval,
)
yield record
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from airbyte_cdk.sources.declarative.interpolation.macros import macros
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

STREAM_STATE_DEPRECATION_MESSAGE = (
"Using 'stream_state' in interpolation is no longer supported as it is not thread-safe. "
"Please use 'stream_interval' for incremental sync values or 'stream_partition' for partition router values instead."
)


class StreamPartitionAccessEnvironment(SandboxedEnvironment):
Expand All @@ -32,7 +38,6 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:

# These aliases are used to deprecate existing keywords without breaking all existing connectors.
_ALIASES = {
"stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

Expand Down Expand Up @@ -84,6 +89,9 @@ def eval(
valid_types: Optional[Tuple[Type[Any]]] = None,
**additional_parameters: Any,
) -> Any:
if isinstance(input_str, str) and "stream_state" in input_str:
raise AirbyteTracedException(STREAM_STATE_DEPRECATION_MESSAGE)

context = {"config": config, **additional_parameters}

for alias, equivalent in _ALIASES.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Union
from typing import Any, Dict, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import (
InterpolatedNestedMapping,
Expand Down Expand Up @@ -45,18 +45,20 @@ def eval_request_inputs(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_interval: Optional[Dict[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Returns the request inputs to set on an outgoing HTTP request

:param stream_state: The stream state
:param stream_state: The stream state (deprecated, use stream_interval instead)
:param stream_slice: The stream slice
:param next_page_token: The pagination token
:param stream_interval: The stream interval for incremental sync values
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {
"stream_state": stream_state,
"stream_slice": stream_slice,
"stream_interval": stream_state, # Use stream_state as stream_interval for backward compatibility
"next_page_token": next_page_token,
}
return self._interpolator.eval(self.config, **kwargs) # type: ignore # self._interpolator is always initialized with a value and will not be None
Loading
Loading