Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-18879: Clear offset when max_skip error is encountered and return 0 if skip is greater than 250k in the current state. #29

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tap_closeio/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ def set_bookmark(self, path, val):

def get_offset(self, path):
off = bks_.get_offset(self.state, path[0])
return (off or {}).get(path[1])
value = (off or {}).get(path[1])
# for activities stream, if existing state contains offset greater
# than 250K, then return 0, as API will raise max_skip error
if value and path[0] == "activities" and value > 250000:
return 0
return value

def set_offset(self, path, val):
bks_.set_offset(self.state, path[0], path[1], val)
Expand Down
42 changes: 27 additions & 15 deletions tap_closeio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

LOGGER = singer.get_logger()

# default date date window size in days
DATE_WINDOW_SIZE = 15

PATHS = {
IDS.CUSTOM_FIELDS: "/custom_fields/lead/",
IDS.LEADS: "/lead/",
Expand Down Expand Up @@ -115,15 +118,24 @@ def paginated_sync(tap_stream_id, ctx, request, start_date):
# There may be streams other than `leads` that will run into
# `max_skip` errors but YAGNI. We can make the tap more
# complicated once we have an extant need for it.
if 'max_skip = ' in str(e) and tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
if 'max_skip = ' in str(e):
if tap_stream_id == IDS.ACTIVITIES:
LOGGER.warning("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be an error and must be raised with this message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

# clear offset
ctx.clear_offsets(tap_stream_id)
ctx.write_state()
raise

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add the message to reduce the date window.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated error message

elif tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
else:
raise
else:
raise
ctx.clear_offsets(tap_stream_id)
Expand Down Expand Up @@ -168,15 +180,15 @@ def sync_activities(ctx):

try:
# get date window from config
date_window = int(ctx.config.get("date_window", 15))
# if date_window is 0, '0' or None, then set default window size of 15 days
date_window = int(ctx.config.get("date_window", DATE_WINDOW_SIZE))
# if date_window is 0, '0' or None, then set the default window size to DATE_WINDOW_SIZE (15 days)
if not date_window:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
date_window = 15
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
date_window = DATE_WINDOW_SIZE
except ValueError:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
# In case of empty string(''), use default window
date_window = 15
date_window = DATE_WINDOW_SIZE

LOGGER.info("Using offset seconds {}".format(offset_secs))
start_date -= timedelta(seconds=offset_secs)
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/test_activity_stream_date_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_activity_stream_default_date_window(self, mocked_paginated_sync):
# now date
now_date = datetime.now()
config = {
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d/"), # set date 40 days later than now
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d"), # set date 40 days later than now
"api_key": "test_API_key"
}
state = {}
Expand Down
41 changes: 41 additions & 0 deletions tests/unittests/test_activity_stream_offset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from tap_closeio.schemas import IDS
import unittest
from tap_closeio.context import Context
from tap_closeio.streams import paginated_sync, create_request

class TestExistingStateOffset(unittest.TestCase):
def test_existing_state_existing_state_offset_greater_than_250K(self):
config = {
"start_date": "2022-01-01",
"api_key": "test_API_key"
}
state = {
"currently_syncing": "activities",
"bookmarks": {
"activities": {
"date_created": "2022-04-01T00:00:00",
"offset": {"skip": 259000}
}
}
}
context = Context(config, state)
offset = context.get_offset(["activities", "skip"])
self.assertEqual(offset, 0)

def test_existing_state_existing_state_offset_lesser_than_250K(self):
config = {
"start_date": "2022-01-01",
"api_key": "test_API_key"
}
state = {
"currently_syncing": "activities",
"bookmarks": {
"activities": {
"date_created": "2022-04-01T00:00:00",
"offset": {"skip": 1000}
}
}
}
context = Context(config, state)
offset = context.get_offset(["activities", "skip"])
self.assertEqual(offset, 1000)