Skip to content

Commit

Permalink
initial filtering commit
Browse files Browse the repository at this point in the history
  • Loading branch information
agatav committed Sep 9, 2024
1 parent 043ceee commit 262e325
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,15 @@
"client_id": "123456789_client_id_hubspot",
"client_secret": "123456789_client_secret_hubspot",
"refresh_token": "123456789_some_refresh_token"
}
}
},
"stream_filters": [
{
"stream_name": "contacts",
"filter_value": {
"propertyName": "city",
"operator": "EQ",
"value": "Madrid"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ def get_common_params(self, config) -> Mapping[str, Any]:
api = self.get_api(config=config)
# Additional configuration is necessary for testing certain streams due to their specific restrictions.
acceptance_test_config = config.get("acceptance_test_config", {})

stream_filters = "stream_filters" in config and config["stream_filters"]
if stream_filters:
return dict(api=api, start_date=start_date, credentials=credentials, acceptance_test_config=acceptance_test_config, stream_filters=stream_filters)

return dict(api=api, start_date=start_date, credentials=credentials, acceptance_test_config=acceptance_test_config)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ connectionSpecification:
description: If enabled then experimental streams become available for sync.
type: boolean
default: false
stream_filters:
title: Filters
description: Filters to apply to the data streams
type: array
items:
type: object
properties:
stream_name:
type: string
title: Stream Name
filter_value:
type: object
title: Filter expression
properties:
propertyName:
type: string
title: Property Name
operator:
type: string
title: Operator
value:
type: string
title: Value
advanced_auth:
auth_flow_type: oauth2.0
predicate_key:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,11 +1106,20 @@ def url(self):
def __init__(
self,
include_archived_only: bool = False,
stream_filters: Mapping[str, Any] = None,
**kwargs,
):
super().__init__(**kwargs)
self._state = None
self._include_archived_only = include_archived_only
self._stream_filter = None

# Filter for records
if stream_filters is None:
stream_filters = {}
for filter in stream_filters:
if filter["stream_name"] == self.name:
self._stream_filter = filter["filter_value"]

@retry_connection_handler(max_tries=5, factor=5)
@retry_after_handler(fixed_retry_after=1, max_tries=3)
Expand Down Expand Up @@ -1141,6 +1150,18 @@ def _process_search(
if self.state
else {}
)

if self._stream_filter:
for filter_item in self._stream_filter:
filter_value = filter_item.get("filter_value", {})
if "propertyName" in filter_value and "operator" in filter_value and "value" in filter_value:
payload["filters"].append({
"propertyName": filter_value["propertyName"],
"operator": filter_value["operator"],
"value": filter_value["value"],
})
logger.info(f"I've got PAYLOAD: {payload}")

if next_page_token:
payload.update(next_page_token["payload"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ def config_eperimantal_fixture():
}


@pytest.fixture(name="config_with_filters")
def config_with_filters_fixture():
return {
"start_date": "2021-01-10T00:00:00Z",
"credentials": {"credentials_title": "Private App Credentials", "access_token": "test_access_token"},
"enable_experimental_streams": True,
"stream_filters": [{"stream_name": "contacts", "filter_value": {"propertyName": "city","operator": "EQ","value": "Madrid"}}],
}


@pytest.fixture(name="config_invalid_date")
def config_invalid_date_fixture():
return {
Expand Down

0 comments on commit 262e325

Please sign in to comment.