Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new contact bookmark #1

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4708ecf
Run all streams when resuming from previous sync
dmosorast May 16, 2019
bece1a2
Fix unit tests
dmosorast May 16, 2019
99a4df6
Merge pull request #90 from singer-io/fix/run-all-streams-each-sync
dmosorast May 16, 2019
9064f12
Version 2.2.8 and changelog
dmosorast May 16, 2019
a25b421
First pass on lookback for engagements
dmosorast May 17, 2019
c86c2bd
Add log message about adjusting bookmark
dmosorast May 17, 2019
f076733
Remove datetime parse leftover from original attempt
dmosorast May 17, 2019
f1662a7
Keep duration positive by using end - start
dmosorast May 17, 2019
49d4052
Correct bookmark adjustment and log message
dmosorast May 17, 2019
c2bb3b0
Merge pull request #91 from singer-io/fix/lookback-on-engagements
dmosorast May 20, 2019
52acfe7
Version 2.3.0 and changelog
dmosorast May 20, 2019
9a88573
Change the request to take a optional parameter from the config (#92)
luandy64 Jun 5, 2019
66381dc
Bump to v2.4.0, and changelog
luandy64 Jun 5, 2019
ac72133
Change backoff to 10 second intervals to align with the API burst rat…
luandy64 Jun 19, 2019
794a5f8
Bump to v2.4.1
luandy64 Jun 19, 2019
0eadd76
Feature/configurable window size (#94)
luandy64 Jul 23, 2019
92b9644
Bump to v2.5.0
luandy64 Jul 23, 2019
5bf43f9
Explicitly cast these to integers (#95)
luandy64 Jul 24, 2019
95ee316
Bump to v2.5.1
luandy64 Jul 24, 2019
643877e
Update contacts endpoint to accept a resume offset vid bookmark
Dec 6, 2019
496ea3a
increased the time between retries and number of retries
Dec 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 2.4.0
* The owners stream can optionally fetch "inactive owners" [#92](https://github.com/singer-io/tap-hubspot/pull/92)

## 2.3.0
* Engagements will now track how long the stream takes to sync, and look back on the next run by that amount to cover potentially missed updates due to asynchronous updates during the previous sync [#91](https://github.com/singer-io/tap-hubspot/pull/91)

## 2.2.8
* When resuming an interrupted sync, will now attempt all streams before exiting [#90](https://github.com/singer-io/tap-hubspot/pull/90)

## 2.2.7
* Add `delivered`, `forward`, `print`, `reply`, `spamreport` to `campaigns.counters`

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-hubspot',
version='2.2.7',
version='2.5.1',
description='Singer.io tap for extracting data from the HubSpot API',
author='Stitch',
url='http://singer.io',
Expand Down
82 changes: 64 additions & 18 deletions tap_hubspot/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,28 @@ class StateFields:
offset = 'offset'
this_stream = 'this_stream'

CHUNK_SIZES = {
"email_events": 1000 * 60 * 60 * 24,
"subscription_changes": 1000 * 60 * 60 * 24,
}

BASE_URL = "https://api.hubapi.com"

CONTACTS_BY_COMPANY = "contacts_by_company"

DEFAULT_CHUNK_SIZE = 1000 * 60 * 60 * 24

CONFIG = {
"access_token": None,
"token_expires": None,
"email_chunk_size": DEFAULT_CHUNK_SIZE,
"subscription_chunk_size": DEFAULT_CHUNK_SIZE,

# in config.json
"redirect_uri": None,
"client_id": None,
"client_secret": None,
"refresh_token": None,
"start_date": None,
"hapikey": None
"hapikey": None,
"include_inactives": None,
}


ENDPOINTS = {
"contacts_properties": "/properties/v1/contacts/properties",
"contacts_all": "/contacts/v1/lists/all/contacts/all",
Expand Down Expand Up @@ -99,6 +98,16 @@ def get_start(state, tap_stream_id, bookmark_key):
return CONFIG['start_date']
return current_bookmark

def has_bookmark(state, tap_stream_id, bookmark_key):
return singer.get_bookmark(state, tap_stream_id, bookmark_key) is not None

def get_previous_time_window(state, tap_stream_id):
return singer.get_bookmark(state, tap_stream_id, "last_sync_duration")

def write_stream_duration(state, tap_stream_id, start, end):
duration = (end - start).total_seconds()
return singer.write_bookmark(state, tap_stream_id, "last_sync_duration", duration)

def get_url(endpoint, **kwargs):
if endpoint not in ENDPOINTS:
raise ValueError("Invalid endpoint {}".format(endpoint))
Expand Down Expand Up @@ -226,13 +235,14 @@ def parse_source_from_url(url):
return None


@backoff.on_exception(backoff.expo,
requests.exceptions.RequestException,
max_tries=5,
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=10,
jitter=None,
giveup=giveup,
on_giveup=on_giveup,
factor=2)
@utils.ratelimit(9, 1)
interval=30)
def request(url, params=None):

params = params or {}
Expand Down Expand Up @@ -319,8 +329,14 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
def sync_contacts(STATE, ctx):
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
bookmark_key = 'versionTimestamp'
bookmark_resume_key = 'resume_from_vid'
start = utils.strptime_with_tz(get_start(STATE, "contacts", bookmark_key))
LOGGER.info("sync_contacts from %s", start)
resume_from_vid = get_start(STATE, "contacts", bookmark_resume_key)
if resume_from_vid == CONFIG['start_date']:
resume_from_vid = '0'
else:
STATE = singer.set_offset(STATE, "contacts", 'vidOffset', resume_from_vid)
LOGGER.info("sync_contacts from %s starting from vid offset %s", start, resume_from_vid)

max_bk_value = start
schema = load_schema("contacts")
Expand Down Expand Up @@ -516,9 +532,15 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path):
url = get_url(entity_name)

mdata = metadata.to_map(catalog.get('metadata'))

if entity_name == 'email_events':
window_size = int(CONFIG['email_chunk_size'])
elif entity_name == 'subscription_changes':
window_size = int(CONFIG['subscription_chunk_size'])

with metrics.record_counter(entity_name) as counter:
while start_ts < now_ts:
end_ts = start_ts + CHUNK_SIZES[entity_name]
end_ts = start_ts + window_size
params = {
'startTimestamp': start_ts,
'endTimestamp': end_ts,
Expand Down Expand Up @@ -663,7 +685,12 @@ def sync_owners(STATE, ctx):
max_bk_value = start

LOGGER.info("sync_owners from %s", start)
data = request(get_url("owners")).json()

params = {}
if CONFIG.get('include_inactives'):
params['includeInactives'] = "true"
data = request(get_url("owners"), params).json()

time_extracted = utils.now()

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
Expand All @@ -686,6 +713,22 @@ def sync_engagements(STATE, ctx):
bookmark_key = 'lastUpdated'
singer.write_schema("engagements", schema, ["engagement_id"], [bookmark_key], catalog.get('stream_alias'))
start = get_start(STATE, "engagements", bookmark_key)

# Because this stream doesn't query by `lastUpdated`, it cycles
# through the data set every time. The issue with this is that there
# is a race condition by which records may be updated between the
# start of this table's sync and the end, causing some updates to not
# be captured, in order to combat this, we must save a lookback window
# that handles the duration of time that this stream was last syncing,
# and look back by that amount on the next sync
last_sync_duration = get_previous_time_window(STATE, "engagements")
current_sync_start = utils.now()
if has_bookmark(STATE, "engagements", bookmark_key) and \
last_sync_duration is not None:
LOGGER.info(("Last sync of engagements lasted {} seconds. Adjusting bookmark by this "
"amount to account for race conditions with record updates.").format(last_sync_duration))
start = utils.strptime_to_utc(start) - datetime.timedelta(seconds=last_sync_duration)
start = utils.strftime(start)
max_bk_value = start
LOGGER.info("sync_engagements from %s", start)

Expand All @@ -711,6 +754,8 @@ def sync_engagements(STATE, ctx):
max_bk_value = record['engagement'][bookmark_key]

STATE = singer.write_bookmark(STATE, 'engagements', bookmark_key, max_bk_value)
# Write duration for next sync's lookback window
STATE = write_stream_duration(STATE, 'engagements', current_sync_start, utils.now())
singer.write_state(STATE)
return STATE

Expand Down Expand Up @@ -758,13 +803,15 @@ def get_streams_to_sync(streams, state):
target_stream = singer.get_currently_syncing(state)
result = streams
if target_stream:
result = list(itertools.dropwhile(
skipped = list(itertools.takewhile(
lambda x: x.tap_stream_id != target_stream, streams))
rest = list(itertools.dropwhile(
lambda x: x.tap_stream_id != target_stream, streams))
result = rest + skipped # Move skipped streams to end
if not result:
raise Exception('Unknown stream {} in state'.format(target_stream))
return result


def get_selected_streams(remaining_streams, annotated_schema):
selected_streams = []
for stream in remaining_streams:
Expand All @@ -776,7 +823,6 @@ def get_selected_streams(remaining_streams, annotated_schema):

return selected_streams


def do_sync(STATE, catalogs):
ctx = Context(catalogs)
validate_dependencies(ctx)
Expand Down
9 changes: 9 additions & 0 deletions tap_hubspot/schemas/owners.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@
"hasContactsAccess" : {
"type": ["null", "boolean"]
},
"isActive": {
"type": ["null", "boolean"]
},
"activeUserId" : {
"type": ["null", "integer"]
},
"userIdIncludingInactive" : {
"type": ["null", "integer"]
},
"remoteList": {
"type": "array",
"items": {
Expand Down
37 changes: 27 additions & 10 deletions tap_hubspot/tests/test_get_streams_to_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,40 @@

class TestGetStreamsToSync(unittest.TestCase):

def test_get_streams_to_sync_with_no_this_stream(self):
streams = [
def setUp(self):
self.streams = [
Stream('a', 'a', [], None, None),
Stream('b', 'b', [], None, None),
Stream('c', 'c', [], None, None),
]

def test_get_streams_to_sync_with_no_this_stream(self):
state = {'this_stream': None}
self.assertEqual(streams, get_streams_to_sync(streams, state))
self.assertEqual(self.streams, get_streams_to_sync(self.streams, state))

def test_get_streams_to_sync_with_this_stream(self):
streams = [
Stream('a', 'a', [], None, None),
Stream('b', 'b', [], None, None),
Stream('c', 'c', [], None, None),
]
def test_get_streams_to_sync_with_first_stream(self):
state = {'currently_syncing': 'a'}

result = get_streams_to_sync(self.streams, state)

parsed_result = [s.tap_stream_id for s in result]
self.assertEqual(parsed_result, ['a', 'b', 'c'])

def test_get_streams_to_sync_with_middle_stream(self):
state = {'currently_syncing': 'b'}
self.assertEqual(streams[1:], list(get_streams_to_sync(streams, state)))

result = get_streams_to_sync(self.streams, state)

parsed_result = [s.tap_stream_id for s in result]
self.assertEqual(parsed_result, ['b', 'c', 'a'])

def test_get_streams_to_sync_with_last_stream(self):
state = {'currently_syncing': 'c'}

result = get_streams_to_sync(self.streams, state)

parsed_result = [s.tap_stream_id for s in result]
self.assertEqual(parsed_result, ['c', 'a', 'b'])

def test_parse_source_from_url_succeeds(self):
url = "https://api.hubapi.com/companies/v2/companies/recent/modified"
Expand Down