Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleOffsetPaginator fix and Json Selector #59

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ plugins:
kind: string
- name: pagination_initial_offset
kind: integer
- name: offset_records_jsonpath
kind: string
- name: streams
kind: array
- name: name
Expand Down Expand Up @@ -158,6 +160,7 @@ provided at the top-level will be the default values for each stream.:
- `pagination_limit_per_page_param`: optional: The name of the param that indicates the limit/per_page. Defaults to None.
- `pagination_total_limit_param`: optional: The name of the param that indicates the total limit e.g. total, count. Defaults to total
- `pagination_initial_offset`: optional: The initial offset for the first request. Defaults to 1.
- `offset_records_jsonpath`: optional: a jsonpath string representing the path to the records. Defaults to `None`.
- `next_page_token_path`: optional: a jsonpath string representing the path to the "next page" token. Defaults to `'$.next_page'` for the `jsonpath_paginator` paginator only otherwise None.
- `streams`: required: a list of objects that contain the configuration of each stream. See stream-level params below.
- `path`: optional: see stream-level params below.
Expand Down Expand Up @@ -186,6 +189,7 @@ provided at the top-level will be the default values for each stream.:
- `oauth_extras`: optional: see authentication params below.
- `oauth_expiration_secs`: optional: see authentication params below.
- `aws_credentials`: optional: see authentication params below.
- `offset_records_jsonpath`: optional: see pagination params below.

#### Stream level config options.
Parameters that appear at the stream-level
Expand Down Expand Up @@ -327,6 +331,22 @@ There are additional request styles supported as follows for pagination.
- `next_page_token_path` - Use to locate an appropriate link in the response. Default `"hasMore"`.
- `simple_offset_paginator` - A paginator that uses `offset` and `limit` parameters to page through a collection of resources. Unlike `offset_paginator`, this paginator does not rely on any headers to determine whether it should keep paginating. Instead, it will continue paginating (by sending requests with increasing `offset`) until the API returns 0 results. You can use this paginator if the API returns a JSON array of records rather than a top-level object.
- `pagination_page_size` - Sets a limit to number of records per page / response. Default `25` records.
- `offset_records_jsonpath` - The JSONPath to the records in the response. Defaults to `None`. In the example below we would select the contacts array with `"offset_records_jsonpath": "$.contacts"`. Once the number of records doe not equal `pagination_page_size` the tap will stop paginating.

```json
{
"contacts": [
{
"id": 52,
"emailBlacklisted": false,
"smsBlacklisted": false,
"createdAt": "2024-09-24T01:00:00.000-00:00",
"modifiedAt": "2024-09-25T01:00:00.000-00:00",
}
],
"count": 256
}
```

### Additional Response Styles
There are additional response styles supported as follows.
Expand Down
17 changes: 16 additions & 1 deletion tap_rest_api_msdk/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ def has_more(self, response: requests.Response):
class SimpleOffsetPaginator(BaseOffsetPaginator):
"""Simple Offset Paginator."""

def __init__(self, *args, pagination_page_size: int = 25, **kwargs):
def __init__(
self,
*args,
offset_records_jsonpath=None,
pagination_page_size: int = 25,
**kwargs
):
super().__init__(*args, **kwargs)
self._offset_records_jsonpath = offset_records_jsonpath
self._pagination_page_size = pagination_page_size

def has_more(self, response: requests.Response):
Expand All @@ -94,6 +101,14 @@ def has_more(self, response: requests.Response):
Whether there are more pages to fetch.

"""
if self._offset_records_jsonpath:
records_left = len(
next(
extract_jsonpath(self._offset_records_jsonpath, response.json()), 0
)
) # type: ignore
return records_left == self._pagination_page_size

return len(response.json()) == self._pagination_page_size


Expand Down
4 changes: 4 additions & 0 deletions tap_rest_api_msdk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
pagination_limit_per_page_param: Optional[str] = None,
pagination_total_limit_param: Optional[str] = None,
pagination_initial_offset: int = 1,
offset_records_jsonpath: Optional[str] = None,
start_date: Optional[datetime] = None,
source_search_field: Optional[str] = None,
source_search_query: Optional[str] = None,
Expand Down Expand Up @@ -169,6 +170,7 @@ def __init__(
self.source_search_query = source_search_query
self.pagination_page_size: Optional[int]
self.pagination_initial_offset = pagination_initial_offset
self.offset_records_jsonpath = offset_records_jsonpath

# Setting Pagination Limits
if self.pagination_request_style == "restapi_header_link_paginator":
Expand Down Expand Up @@ -329,6 +331,8 @@ def get_new_paginator(self):
return SimpleOffsetPaginator(
start_value=self.pagination_initial_offset,
page_size=self.pagination_page_size,
offset_records_jsonpath=self.offset_records_jsonpath,
pagination_page_size=self.pagination_page_size,
)
else:
self.logger.error(
Expand Down
13 changes: 13 additions & 0 deletions tap_rest_api_msdk/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ class TapRestApiMsdk(Tap):
required=False,
description="The initial offset to start pagination from. Defaults to 1",
),
th.Property(
"offset_records_jsonpath",
th.StringType,
default=None,
required=False,
description="Optional jsonpath string representing the path in the results "
"Defaults to `None`.",
),
)

# add common properties to top-level properties
Expand Down Expand Up @@ -463,6 +471,10 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
source_search_query = stream.get(
"source_search_query", self.config.get("source_search_query", "")
)
offset_records_jsonpath = stream.get(
"offset_records_jsonpath",
self.config.get("offset_records_jsonpath", None),
)

schema = {}
schema_config = stream.get("schema")
Expand Down Expand Up @@ -524,6 +536,7 @@ def discover_streams(self) -> List[DynamicStream]: # type: ignore
"pagination_initial_offset",
1,
),
offset_records_jsonpath=offset_records_jsonpath,
schema=schema,
start_date=start_date,
source_search_field=source_search_field,
Expand Down