Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-18879: Clear offset when max_skip error is encountered and return 0 if skip is greater than 250k in the current state. #29

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tap_closeio/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ def set_bookmark(self, path, val):

def get_offset(self, path):
off = bks_.get_offset(self.state, path[0])
return (off or {}).get(path[1])
value = (off or {}).get(path[1])
if path[0] == "activities" and value > 250000:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here regarding a specific scenario like when the state reached this condition for the customer with the previous version and this will reduce skip to 0 for date_windowing and all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

return 0
return value

def set_offset(self, path, val):
bks_.set_offset(self.state, path[0], path[1], val)
Expand Down
42 changes: 27 additions & 15 deletions tap_closeio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

LOGGER = singer.get_logger()

# default date date window size in days
DATE_WINDOW_SIZE = 15

PATHS = {
IDS.CUSTOM_FIELDS: "/custom_fields/lead/",
IDS.LEADS: "/lead/",
Expand Down Expand Up @@ -115,15 +118,24 @@ def paginated_sync(tap_stream_id, ctx, request, start_date):
# There may be streams other than `leads` that will run into
# `max_skip` errors but YAGNI. We can make the tap more
# complicated once we have an extant need for it.
if 'max_skip = ' in str(e) and tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
if 'max_skip = ' in str(e):
if tap_stream_id == IDS.ACTIVITIES:
LOGGER.warning("The skip cannot be greater than 250000, please reduce the date window size and try again.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.warning("The skip cannot be greater than 250000, please reduce the date window size and try again.")
LOGGER.warning("Hit max_skip error so clearing skip offset, please reduce the date window size and try again.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

# clear offset
ctx.clear_offsets(tap_stream_id)
ctx.write_state()
raise

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add the message to reduce the date window.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated error message

elif tap_stream_id == IDS.LEADS:
LOGGER.info(("Hit max_skip error. "
"Setting bookmark to `{}` and restarting pagination.".format(
max_bookmark)))
skip = 0
ctx.clear_offsets(tap_stream_id)
ctx.set_bookmark(bookmark(tap_stream_id), max_bookmark)
_request = create_leads_request(ctx)
ctx.write_state()
else:
raise
else:
raise
ctx.clear_offsets(tap_stream_id)
Expand Down Expand Up @@ -168,15 +180,15 @@ def sync_activities(ctx):

try:
# get date window from config
date_window = int(ctx.config.get("date_window", 15))
# if date_window is 0, '0' or None, then set default window size of 15 days
date_window = int(ctx.config.get("date_window", DATE_WINDOW_SIZE))
# if date_window is 0, '0' or None, then set the default window size to DATE_WINDOW_SIZE (15 days)
if not date_window:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
date_window = 15
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
date_window = DATE_WINDOW_SIZE
except ValueError:
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of 15 days.".format(ctx.config.get("date_window")))
LOGGER.warning("Invalid value of date window is passed: \'{}\', using default window size of {} days.".format(ctx.config.get("date_window"), DATE_WINDOW_SIZE))
# In case of empty string(''), use default window
date_window = 15
date_window = DATE_WINDOW_SIZE

LOGGER.info("Using offset seconds {}".format(offset_secs))
start_date -= timedelta(seconds=offset_secs)
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/test_activity_stream_date_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_activity_stream_default_date_window(self, mocked_paginated_sync):
# now date
now_date = datetime.now()
config = {
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d/"), # set date 40 days later than now
"start_date": (now_date - timedelta(days=40)).strftime("%Y-%m-%d"), # set date 40 days later than now
"api_key": "test_API_key"
}
state = {}
Expand Down