Skip to content

Commit

Permalink
🎉Source Salesforce: add checkpointing (airbytehq#24888)
Browse files Browse the repository at this point in the history
* Source Salesforce: add checkpointing

* Source-Iterable: fix integration tests

* Source Salesforce: fix integration test s;ices

* Source Salesforce: wait for latest record to be accessible

* Source Salesforce: retry for 10 times for everything

* Source Salesforce: refactoring. Add checkpointing for all incremental

* Source Salesforce: small fixes

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
1 parent 8c4db0b commit dd607dc
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23087,7 +23087,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "2.0.9",
"dockerImageTag": "2.0.10",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/salesforce",
"icon": "salesforce.svg",
"sourceType": "api",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 2.0.9
dockerImageTag: 2.0.10
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13749,7 +13749,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:2.0.9"
- dockerImage: "airbyte/source-salesforce:2.0.10"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.9
LABEL io.airbyte.version=2.0.10
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ acceptance_tests:
bypass_reason: "impossible to fill the stream with data because it is an organic traffic"
- name: "Describe"
bypass_reason: "Data is not permanent"
timeout_seconds: 3600
fail_on_extra_columns: false
incremental:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state:
future_state_path: "integration_tests/future_state.json"
timeout_seconds: 7200
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def test_not_queryable_stream(caplog, input_config):
)
def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages):
stream = get_stream(input_sandbox_config, stream_name)
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
stream_slice = {
"start_date": "2023-01-01T00:00:00.000+0000",
"end_date": "2023-02-01T00:00:00.000+0000"
}
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

create_query_matcher = re.compile(r"jobs/query$")
job_matcher = re.compile(r"jobs/query/fake_id$")
Expand All @@ -88,7 +92,7 @@ def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, str
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
m.register_uri("DELETE", job_matcher, json={})
with caplog.at_level(logging.WARNING):
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

caplog_rec_counter = len(caplog.records) - 1
for log_message in log_messages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
from pathlib import Path

import pendulum
import pytest
import requests
from airbyte_cdk.models import SyncMode
Expand Down Expand Up @@ -66,8 +67,7 @@ def update_note(stream, note_id, headers):


def get_stream_state():
state_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
return {"LastModifiedDate": state_date}
return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}


def test_update_for_deleted_record(stream):
Expand All @@ -79,21 +79,54 @@ def test_update_for_deleted_record(stream):

created_note_id = response.json()["id"]

notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
assert created_note_id in notes, "The stream didn't return the note we created"
# A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
notes = []
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
try:
assert created_note_id in notes, "The stream didn't return the note we created"
break
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"

is_note_updated = False
is_deleted = False
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
if created_note_id == record["Id"]:
is_note_updated = True
is_deleted = record["IsDeleted"]
# A record may still be accessible right after deletion for some time
attempts = 10
while True:
is_note_updated = False
is_deleted = False
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
if created_note_id == record["Id"]:
is_note_updated = True
is_deleted = record["IsDeleted"]
break
try:
assert is_note_updated, "No deleted note during the sync"
assert is_deleted, "Wrong field value for deleted note during the sync"
break
assert is_note_updated, "No deleted note during the sync"
assert is_deleted, "Wrong field value for deleted note during the sync"
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

time.sleep(1)
response = update_note(stream, created_note_id, headers)
Expand All @@ -107,8 +140,25 @@ def test_deleted_record(stream):

created_note_id = response.json()["id"]

notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
assert created_note_id in notes, "No created note during the sync"
# A record may not be accessible right after creation. This workaround makes few attempts to receive latest record
notes = []
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
try:
assert created_note_id in notes, "No created note during the sync"
break
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1

response = update_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not updated"
Expand All @@ -117,14 +167,29 @@ def test_deleted_record(stream):
response = delete_note(stream, created_note_id, headers)
assert response.status_code == 204, "Note was not deleted"

record = None
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
if created_note_id == record["Id"]:
# A record updates take some time to become accessible
attempts = 10
while created_note_id not in notes:
now = pendulum.now(tz="UTC")
stream_slice = {
"start_date": now.add(days=-1).isoformat(timespec="milliseconds"),
"end_date": now.isoformat(timespec="milliseconds")
}
record = None
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state, stream_slice=stream_slice):
if created_note_id == record["Id"]:
break
try:
assert record, "No updated note during the sync"
assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
break

assert record, "No updated note during the sync"
assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, "Note Content was not updated"
except Exception as e:
if attempts:
time.sleep(2)
else:
raise e
attempts = attempts - 1


def test_parallel_discover(input_sandbox_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def _read_stream(
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
url = error.response.url
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
logger.warning(f"API Call {url} limit is exceeded. Error message: '{error_data.get('message')}'")
raise AirbyteStopSync() # if got 403 rate limit response, finish the sync with success.
raise error
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):

class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 120

def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
Expand All @@ -592,6 +593,20 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]:
return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ") # type: ignore[attr-defined,no-any-return]
return None

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start, end = (None, None)
now = pendulum.now(tz="UTC")
initial_date = pendulum.parse((stream_state or {}).get(self.cursor_field, self.start_date), tz="UTC")

slice_number = 1
while not end == now:
start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP))
yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
slice_number = slice_number + 1

def request_params(
self,
stream_state: Mapping[str, Any],
Expand All @@ -607,14 +622,28 @@ def request_params(

property_chunk = property_chunk or {}

stream_date = stream_state.get(self.cursor_field)
start_date = stream_date or self.start_date
start_date = max(
(stream_state or {}).get(self.cursor_field, self.start_date),
(stream_slice or {}).get("start_date", ""),
(next_page_token or {}).get("start_date", ""),
)
end_date = (stream_slice or {}).get("end_date", pendulum.now(tz="UTC").isoformat(timespec="milliseconds"))

select_fields = ",".join(property_chunk.keys())
table_name = self.name
where_conditions = []
order_by_clause = ""

query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
if start_date:
query += f"WHERE {self.cursor_field} >= {start_date} "
where_conditions.append(f"{self.cursor_field} >= {start_date}")
if end_date:
where_conditions.append(f"{self.cursor_field} < {end_date}")
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.cursor_field} ASC"
order_by_clause = f"ORDER BY {self.cursor_field} ASC"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"

return {"q": query}

@property
Expand All @@ -635,34 +664,33 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
page_token: str = last_record[self.cursor_field]
res = {"next_token": page_token}
# use primary key as additional filtering param, if cursor_field is not increased from previous page
if self.primary_key and self.prev_start_date == page_token:
res["primary_key"] = last_record[self.primary_key]
return res
return {"next_token": last_record[self.cursor_field], "primary_key": last_record.get(self.primary_key)}
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
selected_properties = self.get_json_schema().get("properties", {})
start_date = max(
(stream_state or {}).get(self.cursor_field, ""),
(stream_slice or {}).get("start_date", ""),
(next_page_token or {}).get("start_date", ""),
)
end_date = stream_slice["end_date"]

stream_date = stream_state.get(self.cursor_field)
next_token = (next_page_token or {}).get("next_token")
primary_key = (next_page_token or {}).get("primary_key")
start_date = next_token or stream_date or self.start_date
self.prev_start_date = start_date
select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys())
table_name = self.name
where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]
order_by_clause = ""

query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
if start_date:
if primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"WHERE ({self.cursor_field} = {start_date} AND {self.primary_key} > '{primary_key}') OR ({self.cursor_field} > {start_date}) "
else:
query += f"WHERE {self.cursor_field} >= {start_date} "
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
order_by_fields = [self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]
query += f"ORDER BY {','.join(order_by_fields)} ASC LIMIT {self.page_size}"
last_primary_key = (next_page_token or {}).get("primary_key", "")
if last_primary_key:
where_conditions.append(f"{self.primary_key} > '{last_primary_key}'")
order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field])
order_by_clause = f"ORDER BY {order_by_fields} ASC LIMIT {self.page_size}"

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}"
return {"q": query}


Expand Down
Loading

0 comments on commit dd607dc

Please sign in to comment.