diff --git a/tap_closeio/context.py b/tap_closeio/context.py index 98d5b73..ee1d531 100644 --- a/tap_closeio/context.py +++ b/tap_closeio/context.py @@ -46,7 +46,12 @@ def set_bookmark(self, path, val): def get_offset(self, path): off = bks_.get_offset(self.state, path[0]) - return (off or {}).get(path[1]) + value = (off or {}).get(path[1]) + # for activities stream, if existing state contains offset greater + # than 250K, then return 0, as API will raise max_skip error + if value and path[0] == "activities" and value > 250000: + return 0 + return value def set_offset(self, path, val): bks_.set_offset(self.state, path[0], path[1], val) diff --git a/tap_closeio/streams.py b/tap_closeio/streams.py index 954761d..b8ece2a 100644 --- a/tap_closeio/streams.py +++ b/tap_closeio/streams.py @@ -12,6 +12,9 @@ LOGGER = singer.get_logger() +# default date date window size in days +DATE_WINDOW_SIZE = 15 + PATHS = { IDS.CUSTOM_FIELDS: "/custom_fields/lead/", IDS.LEADS: "/lead/", @@ -115,15 +118,24 @@ def paginated_sync(tap_stream_id, ctx, request, start_date): # There may be streams other than `leads` that will run into # `max_skip` errors but YAGNI. We can make the tap more # complicated once we have an extant need for it. - if 'max_skip = ' in str(e) and tap_stream_id == IDS.LEADS: - LOGGER.info(("Hit max_skip error. " - "Setting bookmark to `{}` and restarting pagination.".format( - max_bookmark))) - skip = 0 - ctx.clear_offsets(tap_stream_id) - ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark) - _request = create_leads_request(ctx) - ctx.write_state() + if 'max_skip = ' in str(e): + if tap_stream_id == IDS.ACTIVITIES: + LOGGER.warning("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.") + # clear offset + ctx.clear_offsets(tap_stream_id) + ctx.write_state() + raise Exception(str(e) + " So, clearing skip offset, please reduce the date window size and try again.") from None + elif tap_stream_id == IDS.LEADS: + LOGGER.info(("Hit max_skip error. " + "Setting bookmark to `{}` and restarting pagination.".format( + max_bookmark))) + skip = 0 + ctx.clear_offsets(tap_stream_id) + ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark) + _request = create_leads_request(ctx) + ctx.write_state() + else: + raise else: raise ctx.clear_offsets(tap_stream_id) @@ -168,15 +180,15 @@ def sync_activities(ctx): try: # get date window from config - date_window = int(ctx.config.get("date_window", 15)) - # if date_window is 0, '0' or None, then set default window size of 15 days + date_window = int(ctx.config.get("date_window", DATE_WINDOW_SIZE)) + # if date_window is 0, '0' or None, then set the default window size to DATE_WINDOW_SIZE (15 days) if not date_window: - LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window"))) - date_window = 15 + LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE)) + date_window = DATE_WINDOW_SIZE except ValueError: - LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window"))) + LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE)) # In case of empty string(''), use default window - date_window = 15 + date_window = DATE_WINDOW_SIZE LOGGER.info("Using offset seconds {}".format(offset_secs)) start_date -= timedelta(seconds=offset_secs) diff --git a/tests/unittests/test_activity_stream_date_window.py b/tests/unittests/test_activity_stream_date_window.py index ec36952..819e1a2 100644 --- a/tests/unittests/test_activity_stream_date_window.py +++ b/tests/unittests/test_activity_stream_date_window.py @@ -14,7 +14,7 @@ def test_activity_stream_default_date_window(self, mocked_paginated_sync): # now date now_date = datetime.now() config = { - "start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d/"), # set date 40 days later than now + "start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d"), # set date 40 days later than now "api_key": "test_API_key" } state = {} diff --git a/tests/unittests/test_activity_stream_offset.py b/tests/unittests/test_activity_stream_offset.py new file mode 100644 index 0000000..73bb99a --- /dev/null +++ b/tests/unittests/test_activity_stream_offset.py @@ -0,0 +1,119 @@ +from unittest import mock +from tap_closeio.schemas import IDS +from tap_closeio.context import Context +from tap_closeio.streams import paginated_sync, create_request +import unittest + +# mock Page class and set next_skip and records in the page +class MockPage: + def __init__(self, records, next_skip): + self.records = records + self.next_skip = next_skip + +# mock paginate function and yield MockPage +def mock_paginate(*args, **kwargs): + pages = [ + MockPage([{"id": 1, "date_created": "2022-01-02"}, {"id": 1, "date_created": "2022-01-03"}], 2), + MockPage([{"id": 1, "date_created": "2022-01-04"}, {"id": 1, "date_created": "2022-01-04"}], 4) + ] + # yield 1st page, as a result after 1st page, the skip will be 2 + yield pages[0] + # if there is param in the request to raise error, then raise max_skip error + if args[2].params.get("error"): + raise Exception("The skip you set is larger than the maximum skip for this resource (max_skip = 250000).") + # yield 2nd page + yield pages[1] + +# mock format_dts function and return 3rd argument, ie. record +def mock_format_dts(*args, **kwargs): + return args[2] + +class TestExistingStateOffset(unittest.TestCase): + """ + Test cases to verify we are returning 0 if skip > 250K else return the skip in the existing state file + """ + def test_existing_state_offset_greater_than_250K(self): + """ + Test case to verify we are returning 0 as skip is greater than 250K in existing state + """ + # mock config + config = { + "start_date": "2022-01-01", + "api_key": "test_API_key" + } + # mock state with skip > 250K + state = { + "currently_syncing": "activities", + "bookmarks": { + "activities": { + "date_created": "2022-04-01T00:00:00", + "offset": {"skip": 259000} + } + } + } + # create Context with config and state + context = Context(config, state) + # function call + offset = context.get_offset(["activities", "skip"]) + # verify we got 0 as we had skip > 250K + self.assertEqual(offset, 0) + + def test_existing_state_offset_lesser_than_250K(self): + """ + Test case to verify we are returning existing skip as it is lesser than 250K in existing state + """ + # mock config + config = { + "start_date": "2022-01-01", + "api_key": "test_API_key" + } + # mock state with skip < 250K + state = { + "currently_syncing": "activities", + "bookmarks": { + "activities": { + "date_created": "2022-04-01T00:00:00", + "offset": {"skip": 1000} + } + } + } + # create Context with config and state + context = Context(config, state) + # function call + offset = context.get_offset(["activities", "skip"]) + # verify we got existing skip from the state as skip < 250K + self.assertEqual(offset, 1000) + +@mock.patch("tap_closeio.streams.write_records") +@mock.patch("tap_closeio.streams.LOGGER.warning") +@mock.patch("tap_closeio.streams.paginate", side_effect = mock_paginate) +@mock.patch("tap_closeio.streams.format_dts", side_effect = mock_format_dts) +class TestOffsetClear(unittest.TestCase): + def test_clear_state_for_max_skip_error(self, mocked_format_dts, mocked_paginate, mocked_logger_warning, mocked_write_records): + """ + Test case to verify we clear the 'skip' when we encounter 'max_skip' error for 'activity' stream + """ + # mock config + config = { + "start_date": "2022-01-01", + "api_key": "test_API_key" + } + # mock state + state = {} + # mock param to raise 'max_skip' error + params = {"error": "true"} + # create Context with config and state + context = Context(config, state) + # create request for activity stream with params + request = create_request(IDS.ACTIVITIES, params) + + # verify we raise Exception during function call + with self.assertRaises(Exception) as e: + paginated_sync(IDS.ACTIVITIES, context, request, "2022-01-01") + + # verify the error message + self.assertEqual(str(e.exception), "The skip you set is larger than the maximum skip for this resource (max_skip = 250000). So, clearing skip offset, please reduce the date window size and try again.") + # verify we did not get 'skip' in the state file + self.assertIsNone(state.get("bookmarks").get("activities").get("offset").get("skip")) + # verify we log 'reduce date window' message + mocked_logger_warning.assert_called_with("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.")