From 7632e83eccdc65985cd65d0152708d5c05aecdbb Mon Sep 17 00:00:00 2001 From: Mirela Andreea Grigoras Date: Thu, 21 Feb 2019 14:47:33 +0200 Subject: [PATCH 1/2] Make tap-hubspot compatible with Postgres, if needed. --- tap_hubspot/__init__.py | 264 ++++++++++++++++++++++++++++++---------- 1 file changed, 200 insertions(+), 64 deletions(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index a5dccf2b..b2544b33 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -7,6 +7,7 @@ import sys import json +import argparse import attr import backoff import requests @@ -93,6 +94,109 @@ class StateFields: "owners": "/owners/v2/owners", } +def make_postgres_comp_if_needed(stream, STATE, ctx, postgres_compatible = False): + if postgres_compatible: + STATE = stream.sync(STATE, ctx, True) # pylint: disable=not-callable + else: + STATE = stream.sync(STATE, ctx, False) + return STATE + +def parse_args(required_config_keys): + '''Parse standard command-line args. + Parses the command-line arguments mentioned in the SPEC and the + BEST_PRACTICES documents: + -c,--config Config file + -s,--state State file + -d,--discover Run in discover mode + -p,--properties Properties file: DEPRECATED, please use --catalog instead + --catalog Catalog file + --postgres_compatible The generated schema is compatible with a Postgres database + Returns the parsed args object from argparse. For each argument that + point to JSON files (config, state, properties), we will automatically + load and parse the JSON file. + ''' + parser = argparse.ArgumentParser() + + parser.add_argument( + '-c', '--config', + help='Config file', + required=True) + + parser.add_argument( + '-s', '--state', + help='State file') + + parser.add_argument( + '-p', '--properties', + help='Property selections: DEPRECATED, Please use --catalog instead') + + parser.add_argument( + '--catalog', + help='Catalog file') + + parser.add_argument( + '-d', '--discover', + action='store_true', + help='Do schema discovery') + + parser.add_argument( + '--postgres_compatible', + action='store_true', + help='Tap-huspot is compatible with a Postgres target.') + + args = parser.parse_args() + if args.config: + args.config = utils.load_json(args.config) + if args.state: + args.state = utils.load_json(args.state) + else: + args.state = {} + if args.properties: + args.properties = utils.load_json(args.properties) + if args.catalog: + args.catalog = Catalog.load(args.catalog) + + utils.check_config(args.config, required_config_keys) + + return args + +def replace_na_with_none(obj): + '''Given a certain object, the function will replace any 'N/A' values with None. + E.g: object = { + "key1" : [{"subkey1": "value1"}, {"subkey2": "N/A"}], + "key2" : "n/a", + "key3" : { + "subkey3" : "n/a", + "subkey4" : "value2" + } + } + self.replace_na_with_none(object) will return: + { + "key1" : [{"subkey1": "value1"}, {"subkey2": None}], + "key2" : None, + "key3" : { + "subkey3" : None, + "subkey4" : "value2" + } + } + ''' + if isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_dict[key] = replace_na_with_none(value) + return new_dict + + if isinstance(obj, list): + new_list = [] + for value in obj: + new_list.append(replace_na_with_none(value)) + return new_list + + if isinstance(obj, str): + if obj.lower() == 'n/a': + obj = None + return obj + def get_start(state, tap_stream_id, bookmark_key): current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key) if current_bookmark is None: @@ -106,7 +210,7 @@ def get_url(endpoint, **kwargs): return BASE_URL + ENDPOINTS[endpoint].format(**kwargs) -def get_field_type_schema(field_type): +def get_field_type_schema(field_type, postgres_compatible = False): if field_type == "bool": return {"type": ["null", "boolean"]} @@ -117,63 +221,64 @@ def get_field_type_schema(field_type): elif field_type == "number": # A value like 'N/A' can be returned for this type, # so we have to let this be a string sometimes - return {"type": ["null", "number", "string"]} - + if postgres_compatible: + return {"type": ["null", "number"]} + else: + return {"type": ["null", "number", "string"]} else: return {"type": ["null", "string"]} -def get_field_schema(field_type, extras=False): +def get_field_schema(field_type, postgres_compatible, extras=False): if extras: return { "type": "object", "properties": { - "value": get_field_type_schema(field_type), - "timestamp": get_field_type_schema("datetime"), - "source": get_field_type_schema("string"), - "sourceId": get_field_type_schema("string"), + "value": get_field_type_schema(field_type, postgres_compatible), + "timestamp": get_field_type_schema("datetime", postgres_compatible), + "source": get_field_type_schema("string", postgres_compatible), + "sourceId": get_field_type_schema("string", postgres_compatible), } } else: return { "type": "object", "properties": { - "value": get_field_type_schema(field_type), + "value": get_field_type_schema(field_type, postgres_compatible), } } -def parse_custom_schema(entity_name, data): +def parse_custom_schema(entity_name, data, postgres_compatible = False): return { field['name']: get_field_schema( - field['type'], entity_name != "contacts") + field['type'], postgres_compatible, entity_name != "contacts") for field in data } -def get_custom_schema(entity_name): - return parse_custom_schema(entity_name, request(get_url(entity_name + "_properties")).json()) - +def get_custom_schema(entity_name, postgres_compatible = False): + return parse_custom_schema(entity_name, request(get_url(entity_name + "_properties")).json(), postgres_compatible) def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) -def load_associated_company_schema(): - associated_company_schema = load_schema("companies") +def load_associated_company_schema(postgres_compatible = False): + associated_company_schema = load_schema("companies", postgres_compatible) #pylint: disable=line-too-long associated_company_schema['properties']['company-id'] = associated_company_schema['properties'].pop('companyId') associated_company_schema['properties']['portal-id'] = associated_company_schema['properties'].pop('portalId') return associated_company_schema -def load_schema(entity_name): +def load_schema(entity_name, postgres_compatible = False): schema = utils.load_json(get_abs_path('schemas/{}.json'.format(entity_name))) if entity_name in ["contacts", "companies", "deals"]: - custom_schema = get_custom_schema(entity_name) + custom_schema = get_custom_schema(entity_name, postgres_compatible) schema['properties']['properties'] = { "type": "object", "properties": custom_schema, } if entity_name == "contacts": - schema['properties']['associated-company'] = load_associated_company_schema() + schema['properties']['associated-company'] = load_associated_company_schema(postgres_compatible) return schema @@ -298,7 +403,7 @@ def gen_request(STATE, tap_stream_id, url, params, path, more_key, offset_keys, singer.write_state(STATE) -def _sync_contact_vids(catalog, vids, schema, bumble_bee): +def _sync_contact_vids(catalog, vids, schema, bumble_bee, postgres_compatible = False): if len(vids) == 0: return @@ -307,6 +412,8 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee): mdata = metadata.to_map(catalog.get('metadata')) for record in data.values(): + if postgres_compatible: + record = replace_na_with_none(record) record = bumble_bee.transform(record, schema, mdata) singer.write_record("contacts", record, catalog.get('stream_alias'), time_extracted=time_extracted) @@ -316,14 +423,15 @@ def _sync_contact_vids(catalog, vids, schema, bumble_bee): 'count': 100, } -def sync_contacts(STATE, ctx): + +def sync_contacts(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) bookmark_key = 'versionTimestamp' start = utils.strptime_with_tz(get_start(STATE, "contacts", bookmark_key)) LOGGER.info("sync_contacts from %s", start) max_bk_value = start - schema = load_schema("contacts") + schema = load_schema("contacts", postgres_compatible) singer.write_schema("contacts", schema, ["vid"], [bookmark_key], catalog.get('stream_alias')) @@ -346,10 +454,10 @@ def sync_contacts(STATE, ctx): max_bk_value = modified_time if len(vids) == 100: - _sync_contact_vids(catalog, vids, schema, bumble_bee) + _sync_contact_vids(catalog, vids, schema, bumble_bee, postgres_compatible) vids = [] - _sync_contact_vids(catalog, vids, schema, bumble_bee) + _sync_contact_vids(catalog, vids, schema, bumble_bee, postgres_compatible) STATE = singer.write_bookmark(STATE, 'contacts', bookmark_key, utils.strftime(max_bk_value)) singer.write_state(STATE) @@ -366,8 +474,8 @@ def use_recent_companies_endpoint(response): default_contacts_by_company_params = {'count' : 100} # NB> to do: support stream aliasing and field selection -def _sync_contacts_by_company(STATE, company_id): - schema = load_schema(CONTACTS_BY_COMPANY) +def _sync_contacts_by_company(STATE, company_id, postgres_compatible = False): + schema = load_schema(CONTACTS_BY_COMPANY, postgres_compatible) url = get_url("contacts_by_company", company_id=company_id) path = 'vids' @@ -378,6 +486,8 @@ def _sync_contacts_by_company(STATE, company_id): counter.increment() record = {'company-id' : company_id, 'contact-id' : row} + if postgres_compatible: + record = replace_na_with_none(record) record = bumble_bee.transform(record, schema) singer.write_record("contacts_by_company", record, time_extracted=utils.now()) @@ -387,20 +497,20 @@ def _sync_contacts_by_company(STATE, company_id): 'limit': 250, 'properties': ["createdate", "hs_lastmodifieddate"] } -def sync_companies(STATE, ctx): +def sync_companies(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) bumble_bee = Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) bookmark_key = 'hs_lastmodifieddate' start = utils.strptime_with_tz(get_start(STATE, "companies", bookmark_key)) LOGGER.info("sync_companies from %s", start) - schema = load_schema('companies') + schema = load_schema('companies', postgres_compatible) singer.write_schema("companies", schema, ["companyId"], [bookmark_key], catalog.get('stream_alias')) url = get_url("companies_all") max_bk_value = start if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: - contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY) + contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY, postgres_compatible) singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"]) with bumble_bee: @@ -421,6 +531,8 @@ def sync_companies(STATE, ctx): if not modified_time or modified_time >= start: record = request(get_url("companies_detail", company_id=row['companyId'])).json() + if postgres_compatible: + record = replace_na_with_none(record) record = bumble_bee.transform(record, schema, mdata) singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now()) if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: @@ -430,7 +542,7 @@ def sync_companies(STATE, ctx): singer.write_state(STATE) return STATE -def sync_deals(STATE, ctx): +def sync_deals(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) bookmark_key = 'hs_lastmodifieddate' @@ -442,7 +554,7 @@ def sync_deals(STATE, ctx): 'includeAssociations': False, 'properties' : []} - schema = load_schema("deals") + schema = load_schema("deals", postgres_compatible) singer.write_schema("deals", schema, ["dealId"], [bookmark_key], catalog.get('stream_alias')) # Check if we should include associations @@ -474,6 +586,8 @@ def sync_deals(STATE, ctx): max_bk_value = modified_time if not modified_time or modified_time >= start: + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) singer.write_record("deals", record, catalog.get('stream_alias'), time_extracted=utils.now()) @@ -482,10 +596,10 @@ def sync_deals(STATE, ctx): return STATE #NB> no suitable bookmark is available: https://developers.hubspot.com/docs/methods/email/get_campaigns_by_id -def sync_campaigns(STATE, ctx): +def sync_campaigns(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("campaigns") + schema = load_schema("campaigns", postgres_compatible) singer.write_schema("campaigns", schema, ["id"], catalog.get('stream_alias')) LOGGER.info("sync_campaigns(NO bookmarks)") url = get_url("campaigns_all") @@ -494,14 +608,16 @@ def sync_campaigns(STATE, ctx): with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in gen_request(STATE, 'campaigns', url, params, "campaigns", "hasMore", ["offset"], ["offset"]): record = request(get_url("campaigns_detail", campaign_id=row['id'])).json() + if postgres_compatible: + record = replace_na_with_none(record) record = bumble_bee.transform(record, schema, mdata) singer.write_record("campaigns", record, catalog.get('stream_alias'), time_extracted=utils.now()) return STATE -def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path): - schema = load_schema(entity_name) +def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path, postgres_compatible = False): + schema = load_schema(entity_name, postgres_compatible) bookmark_key = 'startTimestamp' singer.write_schema(entity_name, schema, key_properties, [bookmark_key], catalog.get('stream_alias')) @@ -535,6 +651,8 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path): for row in data[path]: counter.increment() + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) singer.write_record(entity_name, record, @@ -555,21 +673,21 @@ def sync_entity_chunked(STATE, catalog, entity_name, key_properties, path): singer.write_state(STATE) return STATE -def sync_subscription_changes(STATE, ctx): +def sync_subscription_changes(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) STATE = sync_entity_chunked(STATE, catalog, "subscription_changes", ["timestamp", "portalId", "recipient"], - "timeline") + "timeline", postgres_compatible) return STATE -def sync_email_events(STATE, ctx): +def sync_email_events(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) - STATE = sync_entity_chunked(STATE, catalog, "email_events", ["id"], "events") + STATE = sync_entity_chunked(STATE, catalog, "email_events", ["id"], "events", postgres_compatible) return STATE -def sync_contact_lists(STATE, ctx): +def sync_contact_lists(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("contact_lists") + schema = load_schema("contact_lists", postgres_compatible) bookmark_key = 'updatedAt' singer.write_schema("contact_lists", schema, ["listId"], [bookmark_key], catalog.get('stream_alias')) @@ -582,8 +700,9 @@ def sync_contact_lists(STATE, ctx): params = {'count': 250} with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in gen_request(STATE, 'contact_lists', url, params, "lists", "has-more", ["offset"], ["offset"]): + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) - if record[bookmark_key] >= start: singer.write_record("contact_lists", record, catalog.get('stream_alias'), time_extracted=utils.now()) if record[bookmark_key] >= max_bk_value: @@ -594,10 +713,10 @@ def sync_contact_lists(STATE, ctx): return STATE -def sync_forms(STATE, ctx): +def sync_forms(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("forms") + schema = load_schema("forms", postgres_compatible) bookmark_key = 'updatedAt' singer.write_schema("forms", schema, ["guid"], [bookmark_key], catalog.get('stream_alias')) @@ -611,6 +730,8 @@ def sync_forms(STATE, ctx): with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in data: + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) if record[bookmark_key] >= start: @@ -623,10 +744,10 @@ def sync_forms(STATE, ctx): return STATE -def sync_workflows(STATE, ctx): +def sync_workflows(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("workflows") + schema = load_schema("workflows", postgres_compatible) bookmark_key = 'updatedAt' singer.write_schema("workflows", schema, ["id"], [bookmark_key], catalog.get('stream_alias')) start = get_start(STATE, "workflows", bookmark_key) @@ -642,6 +763,8 @@ def sync_workflows(STATE, ctx): with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in data['workflows']: + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) if record[bookmark_key] >= start: singer.write_record("workflows", record, catalog.get('stream_alias'), time_extracted=time_extracted) @@ -652,10 +775,10 @@ def sync_workflows(STATE, ctx): singer.write_state(STATE) return STATE -def sync_owners(STATE, ctx): +def sync_owners(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("owners") + schema = load_schema("owners", postgres_compatible) bookmark_key = 'updatedAt' singer.write_schema("owners", schema, ["ownerId"], [bookmark_key], catalog.get('stream_alias')) @@ -668,6 +791,8 @@ def sync_owners(STATE, ctx): with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in data: + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) if record[bookmark_key] >= max_bk_value: max_bk_value = record[bookmark_key] @@ -679,10 +804,10 @@ def sync_owners(STATE, ctx): singer.write_state(STATE) return STATE -def sync_engagements(STATE, ctx): +def sync_engagements(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema("engagements") + schema = load_schema("engagements", postgres_compatible) bookmark_key = 'lastUpdated' singer.write_schema("engagements", schema, ["engagement_id"], [bookmark_key], catalog.get('stream_alias')) start = get_start(STATE, "engagements", bookmark_key) @@ -701,6 +826,8 @@ def sync_engagements(STATE, ctx): with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for engagement in engagements: + if postgres_compatible: + engagement = replace_na_with_none(engagement) record = bumble_bee.transform(engagement, schema, mdata) if record['engagement'][bookmark_key] >= start: # hoist PK and bookmark field to top-level record @@ -714,15 +841,17 @@ def sync_engagements(STATE, ctx): singer.write_state(STATE) return STATE -def sync_deal_pipelines(STATE, ctx): +def sync_deal_pipelines(STATE, ctx, postgres_compatible = False): catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - schema = load_schema('deal_pipelines') + schema = load_schema('deal_pipelines', postgres_compatible) singer.write_schema('deal_pipelines', schema, ['pipelineId'], catalog.get('stream_alias')) LOGGER.info('sync_deal_pipelines') data = request(get_url('deal_pipelines')).json() with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: for row in data: + if postgres_compatible: + row = replace_na_with_none(row) record = bumble_bee.transform(row, schema, mdata) singer.write_record("deal_pipelines", record, catalog.get('stream_alias'), time_extracted=utils.now()) singer.write_state(STATE) @@ -777,7 +906,7 @@ def get_selected_streams(remaining_streams, annotated_schema): return selected_streams -def do_sync(STATE, catalogs): +def do_sync(STATE, catalogs, postgres_compatible = False): ctx = Context(catalogs) validate_dependencies(ctx) @@ -791,7 +920,8 @@ def do_sync(STATE, catalogs): singer.write_state(STATE) try: - STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable + STATE = make_postgres_comp_if_needed(stream, STATE, ctx, postgres_comp) + # stream.sync(STATE, ctx) # pylint: disable=not-callable except SourceUnavailableException as ex: error_message = str(ex).replace(CONFIG['access_token'], 10 * '*') LOGGER.error(error_message) @@ -828,8 +958,8 @@ def validate_dependencies(ctx): if errs: raise DependencyException(" ".join(errs)) -def load_discovered_schema(stream): - schema = load_schema(stream.tap_stream_id) +def load_discovered_schema(stream, postgres_compatible = False): + schema = load_schema(stream.tap_stream_id, postgres_compatible) mdata = metadata.new() mdata = metadata.write(mdata, (), 'table-key-properties', stream.key_properties) @@ -850,11 +980,11 @@ def load_discovered_schema(stream): return schema, metadata.to_list(mdata) -def discover_schemas(): +def discover_schemas(postgres_compatible = False): result = {'streams': []} for stream in STREAMS: LOGGER.info('Loading schema for %s', stream.tap_stream_id) - schema, mdata = load_discovered_schema(stream) + schema, mdata = load_discovered_schema(stream, postgres_compatible) result['streams'].append({'stream': stream.tap_stream_id, 'tap_stream_id': stream.tap_stream_id, 'schema': schema, @@ -862,7 +992,7 @@ def discover_schemas(): # Load the contacts_by_company schema LOGGER.info('Loading schema for contacts_by_company') contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE') - schema, mdata = load_discovered_schema(contacts_by_company) + schema, mdata = load_discovered_schema(contacts_by_company, postgres_compatible) result['streams'].append({'stream': CONTACTS_BY_COMPANY, 'tap_stream_id': CONTACTS_BY_COMPANY, @@ -871,12 +1001,12 @@ def discover_schemas(): return result -def do_discover(): +def do_discover(postgres_compatible = False): LOGGER.info('Loading schemas') - json.dump(discover_schemas(), sys.stdout, indent=4) + json.dump(discover_schemas(postgres_compatible = postgres_compatible), sys.stdout, indent=4) def main_impl(): - args = utils.parse_args( + args = parse_args( ["redirect_uri", "client_id", "client_secret", @@ -890,9 +1020,15 @@ def main_impl(): STATE.update(args.state) if args.discover: - do_discover() + if args.postgres_compatible: + do_discover(True) + else: + do_discover() elif args.properties: - do_sync(STATE, args.properties) + if args.postgres_compatible: + do_sync(STATE, args.properties, True) + else: + do_sync(STATE, args.properties) else: LOGGER.info("No properties were selected") From 3c675e1e02ad183c05e1251bc338cbf9c47f8aba Mon Sep 17 00:00:00 2001 From: Mirela Andreea Grigoras Date: Thu, 21 Feb 2019 15:42:04 +0200 Subject: [PATCH 2/2] Fix name. --- tap_hubspot/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index b2544b33..0d125522 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -920,7 +920,7 @@ def do_sync(STATE, catalogs, postgres_compatible = False): singer.write_state(STATE) try: - STATE = make_postgres_comp_if_needed(stream, STATE, ctx, postgres_comp) + STATE = make_postgres_comp_if_needed(stream, STATE, ctx, postgres_compatible) # stream.sync(STATE, ctx) # pylint: disable=not-callable except SourceUnavailableException as ex: error_message = str(ex).replace(CONFIG['access_token'], 10 * '*')