Skip to content

Commit

Permalink
Merge pull request #11 from dataiku/feature/sc-68121-implement-rfc598…
Browse files Browse the repository at this point in the history
…8-pagination

[sc-68121] implement rfc5988 pagination
  • Loading branch information
alexbourret authored Nov 22, 2021
2 parents 0a785c3 + 518eefa commit bd4953c
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 37 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog


## [Version 1.0.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.3) - Bugfix and feature release - 2021-11-23

- Fixes error raised on HTTP 204 status codes
- Adds requests performance indicator to output datasets
- Data extraction key is replaced by a path
- Fixes JSON formatting issues
- Implements RFC5988 for pagination

## [Version 1.0.2](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.2) - Bugfix release - 2021-05-25

- Fixed recipe ignoring the selected http_method
Expand Down
14 changes: 14 additions & 0 deletions custom-recipes/api-connect/recipe.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "display_metadata",
"label": "Display metadata",
"description": "Status code, request time...",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "timeout",
"label": "Timeout (s)",
Expand All @@ -239,6 +246,13 @@
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
},
{
"name": "maximum_number_rows",
"label": "Maximum number of rows",
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
}
],
"resourceKeys": []
Expand Down
6 changes: 5 additions & 1 deletion custom-recipes/api-connect/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
raise ValueError("There is no parameter column selected.")
parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {}))
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
display_metadata = config.get("display_metadata", False)
maximum_number_rows = config.get("maximum_number_rows", -1)
input_parameters_dataset = dataiku.Dataset(input_A_names[0])
partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables)
custom_key_values.update(partitioning_keys)
Expand All @@ -51,7 +53,9 @@ def get_partitioning_keys(id_list, dku_flow_variables):
endpoint_parameters,
extraction_key,
parameter_columns,
parameter_renamings
parameter_renamings,
display_metadata,
maximum_number_rows=maximum_number_rows
)
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)

Expand Down
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "api-connect",
"version": "1.0.2",
"version": "1.0.3",
"meta": {
"label": "API Connect",
"description": "Retrieve data from any REST API",
Expand Down
18 changes: 16 additions & 2 deletions python-connectors/api-connect_dataset/connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
},
{
"name": "extraction_key",
"label": "Key to data array (optional)",
"description": "",
"label": "Path to data array (optional)",
"description": "Dot separated key path",
"defaultValue": null,
"type": "STRING"
},
Expand Down Expand Up @@ -186,6 +186,13 @@
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "display_metadata",
"label": "Display metadata",
"description": "Status code, request time...",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "timeout",
"label": "Timeout (s)",
Expand All @@ -199,6 +206,13 @@
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
},
{
"name": "maximum_number_rows",
"label": "Maximum number of rows",
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
}
]
}
50 changes: 31 additions & 19 deletions python-connectors/api-connect_dataset/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from dataikuapi.utils import DataikuException
from safe_logger import SafeLogger
from rest_api_client import RestAPIClient
from dku_utils import get_dku_key_values, get_endpoint_parameters
from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path
from dku_constants import DKUConstants
import json


logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])

Expand All @@ -18,10 +21,11 @@ def __init__(self, config, plugin_config):
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
self.client = RestAPIClient(credential, endpoint_parameters, custom_key_values)
extraction_key = endpoint_parameters.get("extraction_key", None)
if extraction_key == '':
extraction_key = None
self.extraction_key = extraction_key
self.extraction_key = extraction_key or ''
self.extraction_path = self.extraction_key.split('.')
self.raw_output = endpoint_parameters.get("raw_output", None)
self.maximum_number_rows = config.get("maximum_number_rows", -1)
self.display_metadata = config.get("display_metadata", False)

def get_read_schema(self):
# In this example, we don't specify a schema here, so DSS will infer the schema
Expand All @@ -30,29 +34,37 @@ def get_read_schema(self):

def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
partition_id=None, records_limit=-1):
is_records_limit = records_limit > 0
is_records_limit = (records_limit > 0) or (self.maximum_number_rows > 0)
if self.maximum_number_rows > 0:
records_limit = self.maximum_number_rows
record_count = 0
while self.client.has_more_data():
json_response = self.client.paginated_api_call()
if self.extraction_key is None:
# Todo: check api_response key is free and add something overwise
if isinstance(json_response, list):
record_count += len(json_response)
for row in json_response:
yield {"api_response": row}
else:
record_count += 1
yield {"api_response": json_response}
metadata = self.client.get_metadata() if self.display_metadata else None
if self.extraction_key:
data = get_value_from_path(json_response, self.extraction_path)
else:
data = json_response.get(self.extraction_key, None)
if data is None:
raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key))
data = json_response
if isinstance(data, list):
record_count += len(data)
for result in data:
yield {"api_response": result} if self.raw_output else result
for row in data:
yield self.format_output(row, metadata)
else:
record_count += 1
yield self.format_output(data, metadata)
if is_records_limit and record_count >= records_limit:
break

def format_output(self, item, metadata=None):
output = metadata or {}
if self.raw_output:
output.update({
DKUConstants.API_RESPONSE_KEY: json.dumps(item)
})
else:
output.update(parse_keys_for_json(item))
return output

def get_writer(self, dataset_schema=None, dataset_partitioning=None,
partition_id=None):
"""
Expand Down
1 change: 1 addition & 0 deletions python-lib/dku_constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
class DKUConstants(object):
API_RESPONSE_KEY = "api_response"
RAW_BODY_FORMAT = "RAW"
FORM_DATA_BODY_FORMAT = "FORM_DATA"
29 changes: 28 additions & 1 deletion python-lib/dku_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
import copy


def get_dku_key_values(endpoint_query_string):
return {key_value.get("from"): key_value.get("to") for key_value in endpoint_query_string if key_value.get("from")}

Expand All @@ -19,7 +23,30 @@ def get_endpoint_parameters(configuration):
"requests_per_minute",
"pagination_type",
"next_page_url_key",
"top_key", "skip_key"
"top_key", "skip_key", "maximum_number_rows"
]
parameters = {endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None}
return parameters


def parse_keys_for_json(items):
ret = {}
for key in items:
value = items.get(key)
if isinstance(value, dict) or isinstance(value, list):
ret.update({key: json.dumps(value)})
elif value is None:
continue
else:
ret.update({key: value})
return ret


def get_value_from_path(dictionary, path):
ret = copy.deepcopy(dictionary)
for key in path:
if key in ret and isinstance(ret, dict):
ret = ret.get(key)
else:
raise ValueError("The extraction path {} was not found in the incoming data".format(path))
return ret
7 changes: 6 additions & 1 deletion python-lib/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ def reset_paging(self, counting_key=None, url=None):
def set_counting_key(self, counting_key):
self.counting_key = counting_key

def update_next_page(self, data):
def update_next_page(self, data, response_links=None):
response_links = response_links or {}
next_link = response_links.get('next', {})
next_page_url = next_link.get("url")
self.is_first_batch = False
self.counter += 1
self.next_page_number = self.next_page_number + 1
if next_page_url:
self.next_page_url = next_page_url
if isinstance(data, list):
batch_size = len(data)
self.records_to_skip = self.records_to_skip + batch_size
Expand Down
20 changes: 19 additions & 1 deletion python-lib/rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, credential, endpoint, custom_key_values={}):
elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]:
key_value_body = endpoint.get("key_value_body", {})
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
self.metadata = {}

def set_login(self, credential):
login_type = credential.get("login_type", "no_auth")
Expand Down Expand Up @@ -140,24 +141,32 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
if self.loop_detector.is_stuck_in_loop(url, kwargs.get("params", {}), kwargs.get("headers", {})):
raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.")
try:
request_start_time = time.time()
response = requests.request(method, url, **kwargs)
request_finish_time = time.time()
except Exception as err:
self.pagination.is_last_batch_empty = True
error_message = "Error: {}".format(err)
if can_raise_exeption:
raise RestAPIClientError(error_message)
else:
return {"error": error_message}
self.set_metadata("request_duration", request_finish_time - request_start_time)
self.time_last_request = time.time()
self.set_metadata("status_code", response.status_code)
self.set_metadata("response_headers", "{}".format(response.headers))
if response.status_code >= 400:
error_message = "Error {}: {}".format(response.status_code, response.content)
self.pagination.is_last_batch_empty = True
if can_raise_exeption:
raise RestAPIClientError(error_message)
else:
return {"error": error_message}
if response.status_code in [204]:
self.pagination.update_next_page({}, response.links)
return self.empty_json_response()
json_response = response.json()
self.pagination.update_next_page(json_response)
self.pagination.update_next_page(json_response, response.links)
return json_response

def paginated_api_call(self, can_raise_exeption=True):
Expand All @@ -167,6 +176,12 @@ def paginated_api_call(self, can_raise_exeption=True):
self.requests_kwargs.update({"params": params})
return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs)

def empty_json_response(self):
return {self.extraction_key: {}} if self.extraction_key else {}

def set_metadata(self, metadata_name, value):
self.metadata["dku_{}".format(metadata_name)] = value

@staticmethod
def get_params(endpoint_query_string, keywords):
templated_query_string = get_dku_key_values(endpoint_query_string)
Expand All @@ -191,3 +206,6 @@ def enforce_throttling(self):
if time_since_last_resquests < self.time_between_requests:
logger.info("Enforcing {}s throttling".format(self.time_between_requests - time_since_last_resquests))
time.sleep(self.time_between_requests - time_since_last_resquests)

def get_metadata(self):
return self.metadata
Loading

0 comments on commit bd4953c

Please sign in to comment.