Skip to content

Commit

Permalink
feat: Renest for dynamic properties support (#59)
Browse files Browse the repository at this point in the history
* Revert "feat: de-nest properties (#57)"

This reverts commit 67066c7.

* add all available properties

* properties key to generic object

* fix only call properties when available

* not all streams with properties to use dynamic prop logic

* revert dynamic props for a few streams

* add missing property

* dont skip untyped object contents

* dynamic properties

* syntax error

* dont type since the data is wrong sometimes
  • Loading branch information
pnadolny13 authored Dec 4, 2023
1 parent 67066c7 commit 578b9e9
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 220 deletions.
70 changes: 54 additions & 16 deletions tap_hubspot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
from __future__ import annotations

import sys
from pathlib import Path
from typing import Any, Callable, Iterable
from typing import Any, Callable

import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator
from singer_sdk.streams import RESTStream
from singer_sdk import typing as th

if sys.version_info >= (3, 8):
from functools import cached_property
Expand Down Expand Up @@ -124,22 +123,61 @@ def get_url_params(

return params

def post_process(
class DynamicHubspotStream(HubspotStream):
"""DynamicHubspotStream"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def _get_datatype(self, data_type: str) -> th.JSONTypeHelper:
# TODO: consider typing more precisely
return th.StringType()

@cached_property
def schema(self) -> dict:
"""Return a draft JSON schema for this stream."""
hs_props = []
self.hs_properties = self._get_available_properties()
for name, type in self.hs_properties.items():
hs_props.append(
th.Property(name, self._get_datatype(type))
)
return th.PropertiesList(
th.Property("id", th.StringType),
th.Property(
"properties", th.ObjectType(*hs_props),
),
th.Property("createdAt", th.StringType),
th.Property("updatedAt", th.StringType),
th.Property("archived", th.BooleanType),
).to_dict()

def _get_available_properties(self) -> dict[str, str]:
session = requests.Session()
session.auth = self.authenticator

resp = session.get(
f"https://api.hubapi.com/crm/v3/properties/{self.name}",
)
resp.raise_for_status()
results = resp.json().get("results", [])
return {prop["name"]: prop["type"] for prop in results}

def get_url_params(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""Post process row.
context: dict | None,
next_page_token: Any | None,
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
row: Row from response.
context: Stream sync context.
context: The stream context.
next_page_token: The next page index or value.
Returns:
Transformed row.
A dictionary of URL query parameters.
"""
if "properties" in row:
properties = row.pop("properties")
for key, value in properties.items():
row[key] = value
return row
params = super().get_url_params(context, next_page_token)
if self.hs_properties:
params["properties"] = ",".join(self.hs_properties)
return params
Loading

0 comments on commit 578b9e9

Please sign in to comment.