Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the sync functionality so it lifts properties #87

Open
wants to merge 1 commit into
base: lift_properties
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ def request(url, params=None):
# }
# }


def _lift_properties(record):
properties = record.pop('properties', {})
for key, value in properties.items():
if key in record:
raise ValueError('Conflict trying to lift key {}'.format(key))
record[key] = value
return record


#pylint: disable=line-too-long
def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys, offset_targets):
if len(offset_keys) != len(offset_targets):
Expand Down Expand Up @@ -303,7 +313,7 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee):
mdata = metadata.to_map(catalog.get('metadata'))

for _, record in data.items():
record = bumble_bee.transform(record, schema, mdata)
record = bumble_bee.transform(_lift_properties(record), schema, mdata)
singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted)

default_contact_params = {
Expand Down Expand Up @@ -374,7 +384,7 @@ def _sync_contacts_by_company(STATE, company_id):
counter.increment()
record = {'company-id' : company_id,
'contact-id' : row}
record = bumble_bee.transform(record, schema)
record = bumble_bee.transform(_lift_properties(record), schema)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())

return STATE
Expand Down Expand Up @@ -417,7 +427,7 @@ def sync_companies(STATE, ctx):

if not modified_time or modified_time >= start:
record = request(get_url("companies_detail", company_id=row['companyId'])).json()
record = bumble_bee.transform(record, schema, mdata)
record = bumble_bee.transform(_lift_properties(record), schema, mdata)
singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now())
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, record['companyId'])
Expand Down Expand Up @@ -470,7 +480,7 @@ def sync_deals(STATE, ctx):
max_bk_value = modified_time

if not modified_time or modified_time >= start:
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)
singer.write_record("deals", record, catalog.get('stream_alias'), time_extracted=utils.now())

STATE = singer.write_bookmark(STATE, 'deals', bookmark_key, utils.strftime(max_bk_value))
Expand All @@ -490,7 +500,7 @@ def sync_campaigns(STATE, ctx):
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'campaigns', url, params, "campaigns", "hasMore", ["offset"], ["offset"]):
record = request(get_url("campaigns_detail", campaign_id=row['id'])).json()
record = bumble_bee.transform(record, schema, mdata)
record = bumble_bee.transform(_lift_properties(record), schema, mdata)
singer.write_record("campaigns", record, catalog.get('stream_alias'), time_extracted=utils.now())

return STATE
Expand Down Expand Up @@ -531,7 +541,7 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path):

for row in data[path]:
counter.increment()
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)
singer.write_record(entity_name,
record,
catalog.get('stream_alias'),
Expand Down Expand Up @@ -578,7 +588,7 @@ def sync_contact_lists(STATE, ctx):
params = {'count': 250}
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in gen_request(STATE, 'contact_lists', url, params, "lists", "has-more", ["offset"], ["offset"]):
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)

if record[bookmark_key] >= start:
singer.write_record("contact_lists", record, catalog.get('stream_alias'), time_extracted=utils.now())
Expand Down Expand Up @@ -607,7 +617,7 @@ def sync_forms(STATE, ctx):

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)

if record[bookmark_key] >= start:
singer.write_record("forms", record, catalog.get('stream_alias'), time_extracted=time_extracted)
Expand Down Expand Up @@ -638,7 +648,7 @@ def sync_workflows(STATE, ctx):

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data['workflows']:
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)
if record[bookmark_key] >= start:
singer.write_record("workflows", record, catalog.get('stream_alias'), time_extracted=time_extracted)
if record[bookmark_key] >= max_bk_value:
Expand All @@ -664,7 +674,7 @@ def sync_owners(STATE, ctx):

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)
if record[bookmark_key] >= max_bk_value:
max_bk_value = record[bookmark_key]

Expand Down Expand Up @@ -697,7 +707,7 @@ def sync_engagements(STATE, ctx):

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for engagement in engagements:
record = bumble_bee.transform(engagement, schema, mdata)
record = bumble_bee.transform(_lift_properties(engagement), schema, mdata)
if record['engagement'][bookmark_key] >= start:
# hoist PK and bookmark field to top-level record
record['engagement_id'] = record['engagement']['id']
Expand All @@ -719,7 +729,7 @@ def sync_deal_pipelines(STATE, ctx):
data = request(get_url('deal_pipelines')).json()
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
for row in data:
record = bumble_bee.transform(row, schema, mdata)
record = bumble_bee.transform(_lift_properties(row), schema, mdata)
singer.write_record("deal_pipelines", record, catalog.get('stream_alias'), time_extracted=utils.now())
singer.write_state(STATE)
return STATE
Expand Down