From ae9a8a5a872cf6343d0a51a7cc2bb3ce0e957141 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Wed, 13 Oct 2021 18:19:28 +0200 Subject: [PATCH 01/13] [sc-68121] implement rfc5988 pagination --- python-lib/pagination.py | 7 ++++++- python-lib/rest_api_client.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/python-lib/pagination.py b/python-lib/pagination.py index c2513d5..1d296f7 100644 --- a/python-lib/pagination.py +++ b/python-lib/pagination.py @@ -50,10 +50,15 @@ def reset_paging(self, counting_key=None, url=None): def set_counting_key(self, counting_key): self.counting_key = counting_key - def update_next_page(self, data): + def update_next_page(self, data, response_links=None): + response_links = response_links or {} + next_link = response_links.get('next', {}) + next_page_url = next_link.get("url") self.is_first_batch = False self.counter += 1 self.next_page_number = self.next_page_number + 1 + if next_page_url: + self.next_page_url = next_page_url if isinstance(data, list): batch_size = len(data) self.records_to_skip = self.records_to_skip + batch_size diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 72f16a0..9b17d15 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -157,7 +157,7 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): else: return {"error": error_message} json_response = response.json() - self.pagination.update_next_page(json_response) + self.pagination.update_next_page(json_response, response.links) return json_response def paginated_api_call(self, can_raise_exeption=True): From ee875e5c4ef708ddc0dcdc9990a0ee50d3bf2293 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Thu, 14 Oct 2021 15:20:24 +0200 Subject: [PATCH 02/13] Add option for row nb limitation --- python-connectors/api-connect_dataset/connector.json | 7 +++++++ python-connectors/api-connect_dataset/connector.py | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index 48359c4..b12b577 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -199,6 +199,13 @@ "description": "-1 for no limit", "type": "INT", "defaultValue": -1 + }, + { + "name": "maximum_number_rows", + "label": "Maximum number of rows", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 } ] } diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index 76ebac5..e9e0fda 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -22,6 +22,7 @@ def __init__(self, config, plugin_config): extraction_key = None self.extraction_key = extraction_key self.raw_output = endpoint_parameters.get("raw_output", None) + self.maximum_number_rows = config.get("maximum_number_rows", -1) def get_read_schema(self): # In this example, we don't specify a schema here, so DSS will infer the schema @@ -30,7 +31,9 @@ def get_read_schema(self): def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, records_limit=-1): - is_records_limit = records_limit > 0 + is_records_limit = (records_limit > 0) or (self.maximum_number_rows > 0) + if self.maximum_number_rows > 0: + records_limit = self.maximum_number_rows record_count = 0 while self.client.has_more_data(): json_response = self.client.paginated_api_call() From 9510c3473c15516f17fb1914c33e3151a7f746d2 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Thu, 14 Oct 2021 15:37:06 +0200 Subject: [PATCH 03/13] Fix for [sc-72912] --- .../api-connect_dataset/connector.py | 29 +++++++++++++++---- python-lib/dku_constants.py | 1 + python-lib/dku_utils.py | 18 +++++++++++- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index e9e0fda..60b1915 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -2,7 +2,10 @@ from dataikuapi.utils import DataikuException from safe_logger import SafeLogger from rest_api_client import RestAPIClient -from dku_utils import get_dku_key_values, get_endpoint_parameters +from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json +from dku_constants import DKUConstants +import json + logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) @@ -41,18 +44,32 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, # Todo: check api_response key is free and add something overwise if isinstance(json_response, list): record_count += len(json_response) - for row in json_response: - yield {"api_response": row} + if self.raw_output: + for row in json_response: + yield { + DKUConstants.API_RESPONSE_KEY: json.dumps(row) + } + else: + for row in json_response: + yield parse_keys_for_json(row) else: record_count += 1 - yield {"api_response": json_response} + yield { + DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) + } else: data = json_response.get(self.extraction_key, None) if data is None: raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key)) record_count += len(data) - for result in data: - yield {"api_response": result} if self.raw_output else result + if self.raw_output: + for result in data: + yield { + DKUConstants.API_RESPONSE_KEY: json.dumps(result) + } + else: + for result in data: + yield parse_keys_for_json(result) if is_records_limit and record_count >= records_limit: break diff --git a/python-lib/dku_constants.py b/python-lib/dku_constants.py index 7597cbd..7d62cf5 100644 --- a/python-lib/dku_constants.py +++ b/python-lib/dku_constants.py @@ -1,3 +1,4 @@ class DKUConstants(object): + API_RESPONSE_KEY = "api_response" RAW_BODY_FORMAT = "RAW" FORM_DATA_BODY_FORMAT = "FORM_DATA" diff --git a/python-lib/dku_utils.py b/python-lib/dku_utils.py index c7fee62..b85ae37 100644 --- a/python-lib/dku_utils.py +++ b/python-lib/dku_utils.py @@ -1,3 +1,6 @@ +import json + + def get_dku_key_values(endpoint_query_string): return {key_value.get("from"): key_value.get("to") for key_value in endpoint_query_string if key_value.get("from")} @@ -19,7 +22,20 @@ def get_endpoint_parameters(configuration): "requests_per_minute", "pagination_type", "next_page_url_key", - "top_key", "skip_key" + "top_key", "skip_key", "maximum_number_rows" ] parameters = {endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None} return parameters + + +def parse_keys_for_json(items): + ret = {} + for key in items: + value = items.get(key) + if isinstance(value, dict) or isinstance(value, list): + ret.update({key: json.dumps(value)}) + elif value is None: + continue + else: + ret.update({key: value}) + return ret From 9cfc4fb5d99e44f530925e88176ac18eccd2906a Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 19 Oct 2021 11:03:02 +0200 Subject: [PATCH 04/13] Fix for recipe and [sc-73058] [sc-69389] [sc-72911] --- custom-recipes/api-connect/recipe.json | 7 +++ custom-recipes/api-connect/recipe.py | 4 +- .../api-connect_dataset/connector.json | 7 +++ .../api-connect_dataset/connector.py | 56 ++++++++----------- python-lib/dku_utils.py | 10 ++++ python-lib/rest_api_client.py | 17 ++++++ python-lib/rest_api_recipe_session.py | 32 +++++++---- 7 files changed, 90 insertions(+), 43 deletions(-) diff --git a/custom-recipes/api-connect/recipe.json b/custom-recipes/api-connect/recipe.json index 1f08dcc..c5edc4b 100644 --- a/custom-recipes/api-connect/recipe.json +++ b/custom-recipes/api-connect/recipe.json @@ -226,6 +226,13 @@ "type": "BOOLEAN", "defaultValue": false }, + { + "name": "display_metadata", + "label": "Display metadata", + "description": "Status code, request time...", + "type": "BOOLEAN", + "defaultValue": false + }, { "name": "timeout", "label": "Timeout (s)", diff --git a/custom-recipes/api-connect/recipe.py b/custom-recipes/api-connect/recipe.py index 2c8d503..04756d7 100644 --- a/custom-recipes/api-connect/recipe.py +++ b/custom-recipes/api-connect/recipe.py @@ -40,6 +40,7 @@ def get_partitioning_keys(id_list, dku_flow_variables): raise ValueError("There is no parameter column selected.") parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {})) custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) +display_metadata = config.get("display_metadata", False) input_parameters_dataset = dataiku.Dataset(input_A_names[0]) partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables) custom_key_values.update(partitioning_keys) @@ -51,7 +52,8 @@ def get_partitioning_keys(id_list, dku_flow_variables): endpoint_parameters, extraction_key, parameter_columns, - parameter_renamings + parameter_renamings, + display_metadata ) results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output) diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index b12b577..ba8c7a0 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -186,6 +186,13 @@ "type": "BOOLEAN", "defaultValue": false }, + { + "name": "display_metadata", + "label": "Display metadata", + "description": "Status code, request time...", + "type": "BOOLEAN", + "defaultValue": false + }, { "name": "timeout", "label": "Timeout (s)", diff --git a/python-connectors/api-connect_dataset/connector.py b/python-connectors/api-connect_dataset/connector.py index 60b1915..a9fac21 100644 --- a/python-connectors/api-connect_dataset/connector.py +++ b/python-connectors/api-connect_dataset/connector.py @@ -2,7 +2,7 @@ from dataikuapi.utils import DataikuException from safe_logger import SafeLogger from rest_api_client import RestAPIClient -from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json +from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path from dku_constants import DKUConstants import json @@ -21,11 +21,11 @@ def __init__(self, config, plugin_config): custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) self.client = RestAPIClient(credential, endpoint_parameters, custom_key_values) extraction_key = endpoint_parameters.get("extraction_key", None) - if extraction_key == '': - extraction_key = None - self.extraction_key = extraction_key + self.extraction_key = extraction_key or '' + self.extraction_path = self.extraction_key.split('.') self.raw_output = endpoint_parameters.get("raw_output", None) self.maximum_number_rows = config.get("maximum_number_rows", -1) + self.display_metadata = config.get("display_metadata", False) def get_read_schema(self): # In this example, we don't specify a schema here, so DSS will infer the schema @@ -40,39 +40,31 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, record_count = 0 while self.client.has_more_data(): json_response = self.client.paginated_api_call() - if self.extraction_key is None: - # Todo: check api_response key is free and add something overwise - if isinstance(json_response, list): - record_count += len(json_response) - if self.raw_output: - for row in json_response: - yield { - DKUConstants.API_RESPONSE_KEY: json.dumps(row) - } - else: - for row in json_response: - yield parse_keys_for_json(row) - else: - record_count += 1 - yield { - DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) - } + metadata = self.client.get_metadata() if self.display_metadata else None + if self.extraction_key: + data = get_value_from_path(json_response, self.extraction_path) else: - data = json_response.get(self.extraction_key, None) - if data is None: - raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key)) + data = json_response + if isinstance(data, list): record_count += len(data) - if self.raw_output: - for result in data: - yield { - DKUConstants.API_RESPONSE_KEY: json.dumps(result) - } - else: - for result in data: - yield parse_keys_for_json(result) + for row in data: + yield self.format_output(row, metadata) + else: + record_count += 1 + yield self.format_output(data, metadata) if is_records_limit and record_count >= records_limit: break + def format_output(self, item, metadata=None): + output = metadata or {} + if self.raw_output: + output.update({ + DKUConstants.API_RESPONSE_KEY: json.dumps(item) + }) + else: + output.update(parse_keys_for_json(item)) + return output + def get_writer(self, dataset_schema=None, dataset_partitioning=None, partition_id=None): """ diff --git a/python-lib/dku_utils.py b/python-lib/dku_utils.py index b85ae37..b165b3b 100644 --- a/python-lib/dku_utils.py +++ b/python-lib/dku_utils.py @@ -39,3 +39,13 @@ def parse_keys_for_json(items): else: ret.update({key: value}) return ret + + +def get_value_from_path(dictionary, path, default_reply=None): + ret = dictionary + for key in path: + if key in ret: + ret = ret.get(key) + else: + return default_reply + return ret diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 9b17d15..a6bb785 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -107,6 +107,7 @@ def __init__(self, credential, endpoint, custom_key_values={}): elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]: key_value_body = endpoint.get("key_value_body", {}) self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)}) + self.metadata = {} def set_login(self, credential): login_type = credential.get("login_type", "no_auth") @@ -140,7 +141,10 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): if self.loop_detector.is_stuck_in_loop(url, kwargs.get("params", {}), kwargs.get("headers", {})): raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.") try: + request_start_time = time.time() response = requests.request(method, url, **kwargs) + request_finish_time = time.time() + self.set_metadata("request_duration", request_finish_time - request_start_time) except Exception as err: self.pagination.is_last_batch_empty = True error_message = "Error: {}".format(err) @@ -149,6 +153,7 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): else: return {"error": error_message} self.time_last_request = time.time() + self.set_metadata("status_code", response.status_code) if response.status_code >= 400: error_message = "Error {}: {}".format(response.status_code, response.content) self.pagination.is_last_batch_empty = True @@ -156,6 +161,9 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): raise RestAPIClientError(error_message) else: return {"error": error_message} + if response.status_code in [204]: + self.pagination.update_next_page({}, response.links) + return self.empty_json_response() json_response = response.json() self.pagination.update_next_page(json_response, response.links) return json_response @@ -167,6 +175,12 @@ def paginated_api_call(self, can_raise_exeption=True): self.requests_kwargs.update({"params": params}) return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs) + def empty_json_response(self): + return {self.extraction_key: {}} if self.extraction_key else {} + + def set_metadata(self, metadata_name, value): + self.metadata["dku_{}".format(metadata_name)] = value + @staticmethod def get_params(endpoint_query_string, keywords): templated_query_string = get_dku_key_values(endpoint_query_string) @@ -191,3 +205,6 @@ def enforce_throttling(self): if time_since_last_resquests < self.time_between_requests: logger.info("Enforcing {}s throttling".format(self.time_between_requests - time_since_last_resquests)) time.sleep(self.time_between_requests - time_since_last_resquests) + + def get_metadata(self): + return self.metadata diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index 505d0a5..ad3d34f 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -1,13 +1,16 @@ from dataikuapi.utils import DataikuException from rest_api_client import RestAPIClient from safe_logger import SafeLogger +from dku_utils import parse_keys_for_json +from dku_constants import DKUConstants import copy +import json logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) class RestApiRecipeSession: - def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings): + def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings, display_metadata=False): self.custom_key_values = custom_key_values self.credential_parameters = credential_parameters self.endpoint_parameters = endpoint_parameters @@ -15,6 +18,7 @@ def __init__(self, custom_key_values, credential_parameters, endpoint_parameters self.client = None self.initial_parameter_columns = None self.column_to_parameter_dict = self.get_column_to_parameter_dict(parameter_columns, parameter_renamings) + self.display_metadata = display_metadata @staticmethod def get_column_to_parameter_dict(parameter_columns, parameter_renamings): @@ -51,37 +55,45 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output): def retrieve_next_page(self, is_raw_output): page_rows = [] - base_row = copy.deepcopy(self.initial_parameter_columns) logger.info("retrieve_next_page: Calling next page") json_response = self.client.paginated_api_call(can_raise_exeption=False) + metadata = self.client.get_metadata() if self.display_metadata else {} if self.extraction_key: data_rows = json_response.get(self.extraction_key, [json_response]) if data_rows is None: raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key)) - page_rows.extend(self.format_page_rows(data_rows, is_raw_output)) + page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata)) else: # Todo: check api_response key is free and add something overwise + base_row = metadata if is_raw_output: if is_error_message(json_response): - base_row.update(json_response) + base_row.update(parse_keys_for_json(json_response)) else: - base_row.update({"api_response": json_response}) + base_row.update({ + DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) + }) else: - base_row.update(json_response) + base_row.update(parse_keys_for_json(json_response)) + base_row.update(self.initial_parameter_columns) page_rows.append(base_row) return page_rows - def format_page_rows(self, data_rows, is_raw_output): + def format_page_rows(self, data_rows, is_raw_output, metadata=None): page_rows = [] + metadata = metadata or {} for data_row in data_rows: base_row = copy.deepcopy(self.initial_parameter_columns) + base_row.update(metadata) if is_raw_output: if is_error_message(data_row): - base_row.update(data_row) + base_row.update(parse_keys_for_json(data_row)) else: - base_row.update({"api_response": data_row}) + base_row.update({ + DKUConstants.API_RESPONSE_KEY: json.dumps(data_row) + }) else: - base_row.update(data_row) + base_row.update(parse_keys_for_json(data_row)) page_rows.append(base_row) return page_rows From 46d58da8d17a86b737b6bccc8d55e628f035dc28 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Wed, 20 Oct 2021 13:56:20 +0200 Subject: [PATCH 05/13] Add integration tests for json output, rfc5988, auth, recipe --- tests/python/integration/test_scenario.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/python/integration/test_scenario.py b/tests/python/integration/test_scenario.py index 836e2f9..2b299f0 100644 --- a/tests/python/integration/test_scenario.py +++ b/tests/python/integration/test_scenario.py @@ -9,3 +9,11 @@ def test_run_api_connect_authentication_modes(user_dss_clients): def test_run_api_connect_pagination_modes(user_dss_clients): dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="PAGINATION") + + +def test_run_api_connect_recipes(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="Recipes") + + +def test_run_api_connect_using_global_variable(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="UsingGlobalVariable") From 69924dc5cc7e78ec6d5cda7b9f5b8b2f59e9c520 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Wed, 3 Nov 2021 14:12:21 +0100 Subject: [PATCH 06/13] Fix for response_headers in metadata --- python-lib/rest_api_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index a6bb785..8f68566 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -5,6 +5,7 @@ from loop_detector import LoopDetector from dku_utils import get_dku_key_values from dku_constants import DKUConstants +import json logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) @@ -144,7 +145,6 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): request_start_time = time.time() response = requests.request(method, url, **kwargs) request_finish_time = time.time() - self.set_metadata("request_duration", request_finish_time - request_start_time) except Exception as err: self.pagination.is_last_batch_empty = True error_message = "Error: {}".format(err) @@ -152,8 +152,10 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): raise RestAPIClientError(error_message) else: return {"error": error_message} + self.set_metadata("request_duration", request_finish_time - request_start_time) self.time_last_request = time.time() self.set_metadata("status_code", response.status_code) + self.set_metadata("response_headers", "{}".format(response.headers)) if response.status_code >= 400: error_message = "Error {}: {}".format(response.status_code, response.content) self.pagination.is_last_batch_empty = True From dd1770f60ef29746f2b470f1e13fd4a85077f2d5 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Wed, 3 Nov 2021 17:46:03 +0100 Subject: [PATCH 07/13] Add row count to recipe, fix recipe when API returns arrays --- custom-recipes/api-connect/recipe.json | 7 +++++++ custom-recipes/api-connect/recipe.py | 4 +++- python-lib/rest_api_client.py | 1 - python-lib/rest_api_recipe_session.py | 29 +++++++++++++++++++++----- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/custom-recipes/api-connect/recipe.json b/custom-recipes/api-connect/recipe.json index c5edc4b..febf6a7 100644 --- a/custom-recipes/api-connect/recipe.json +++ b/custom-recipes/api-connect/recipe.json @@ -246,6 +246,13 @@ "description": "-1 for no limit", "type": "INT", "defaultValue": -1 + }, + { + "name": "maximum_number_rows", + "label": "Maximum number of rows", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 } ], "resourceKeys": [] diff --git a/custom-recipes/api-connect/recipe.py b/custom-recipes/api-connect/recipe.py index 04756d7..49890c9 100644 --- a/custom-recipes/api-connect/recipe.py +++ b/custom-recipes/api-connect/recipe.py @@ -41,6 +41,7 @@ def get_partitioning_keys(id_list, dku_flow_variables): parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {})) custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) display_metadata = config.get("display_metadata", False) +maximum_number_rows = config.get("maximum_number_rows", -1) input_parameters_dataset = dataiku.Dataset(input_A_names[0]) partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables) custom_key_values.update(partitioning_keys) @@ -53,7 +54,8 @@ def get_partitioning_keys(id_list, dku_flow_variables): extraction_key, parameter_columns, parameter_renamings, - display_metadata + display_metadata, + maximum_number_rows=maximum_number_rows ) results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 8f68566..de1d728 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -5,7 +5,6 @@ from loop_detector import LoopDetector from dku_utils import get_dku_key_values from dku_constants import DKUConstants -import json logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"]) diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index ad3d34f..af79151 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -10,7 +10,9 @@ class RestApiRecipeSession: - def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings, display_metadata=False): + def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings, + display_metadata=False, + maximum_number_rows=-1): self.custom_key_values = custom_key_values self.credential_parameters = credential_parameters self.endpoint_parameters = endpoint_parameters @@ -19,6 +21,8 @@ def __init__(self, custom_key_values, credential_parameters, endpoint_parameters self.initial_parameter_columns = None self.column_to_parameter_dict = self.get_column_to_parameter_dict(parameter_columns, parameter_renamings) self.display_metadata = display_metadata + self.maximum_number_rows = maximum_number_rows + self.is_row_limit = (self.maximum_number_rows > 0) @staticmethod def get_column_to_parameter_dict(parameter_columns, parameter_renamings): @@ -34,6 +38,7 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output): results = [] time_last_request = None for index, input_parameters_row in input_parameters_dataframe.iterrows(): + rows_count = 0 self.initial_parameter_columns = {} for column_name in self.column_to_parameter_dict: parameter_name = self.column_to_parameter_dict[column_name] @@ -50,6 +55,9 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output): while self.client.has_more_data(): page_results = self.retrieve_next_page(is_raw_output) results.extend(page_results) + rows_count += len(page_results) + if self.is_row_limit and rows_count >= self.maximum_number_rows: + break time_last_request = self.client.time_last_request return results @@ -58,6 +66,7 @@ def retrieve_next_page(self, is_raw_output): logger.info("retrieve_next_page: Calling next page") json_response = self.client.paginated_api_call(can_raise_exeption=False) metadata = self.client.get_metadata() if self.display_metadata else {} + is_api_returning_dict = True if self.extraction_key: data_rows = json_response.get(self.extraction_key, [json_response]) if data_rows is None: @@ -65,7 +74,7 @@ def retrieve_next_page(self, is_raw_output): page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata)) else: # Todo: check api_response key is free and add something overwise - base_row = metadata + base_row = copy.deepcopy(metadata) if is_raw_output: if is_error_message(json_response): base_row.update(parse_keys_for_json(json_response)) @@ -74,9 +83,19 @@ def retrieve_next_page(self, is_raw_output): DKUConstants.API_RESPONSE_KEY: json.dumps(json_response) }) else: - base_row.update(parse_keys_for_json(json_response)) - base_row.update(self.initial_parameter_columns) - page_rows.append(base_row) + if isinstance(json_response, dict): + base_row.update(parse_keys_for_json(json_response)) + elif isinstance(json_response, list): + is_api_returning_dict = False + for row in json_response: + base_row = copy.deepcopy(metadata) + base_row.update(parse_keys_for_json(row)) + base_row.update(self.initial_parameter_columns) + page_rows.append(base_row) + + if is_api_returning_dict: + base_row.update(self.initial_parameter_columns) + page_rows.append(base_row) return page_rows def format_page_rows(self, data_rows, is_raw_output, metadata=None): From 42b97f28bde2a0e28551531dbf4346ff457edfd9 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Tue, 9 Nov 2021 13:38:45 +0100 Subject: [PATCH 08/13] Adding test for APIs returning arrays --- tests/python/integration/test_scenario.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/python/integration/test_scenario.py b/tests/python/integration/test_scenario.py index 2b299f0..4e20be3 100644 --- a/tests/python/integration/test_scenario.py +++ b/tests/python/integration/test_scenario.py @@ -17,3 +17,7 @@ def test_run_api_connect_recipes(user_dss_clients): def test_run_api_connect_using_global_variable(user_dss_clients): dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="UsingGlobalVariable") + + +def test_run_api_connect_array_api(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="ArrayAPI") From 32b4394d90bb3400883a6909578cf50dbcbbcd23 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Fri, 12 Nov 2021 11:39:27 +0100 Subject: [PATCH 09/13] Fix get_value_from_path --- python-lib/dku_utils.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python-lib/dku_utils.py b/python-lib/dku_utils.py index b165b3b..85884f3 100644 --- a/python-lib/dku_utils.py +++ b/python-lib/dku_utils.py @@ -1,4 +1,5 @@ import json +import copy def get_dku_key_values(endpoint_query_string): @@ -41,11 +42,11 @@ def parse_keys_for_json(items): return ret -def get_value_from_path(dictionary, path, default_reply=None): - ret = dictionary +def get_value_from_path(dictionary, path): + ret = copy.deepcopy(dictionary) for key in path: - if key in ret: + if key in ret and isinstance(ret, dict): ret = ret.get(key) else: - return default_reply + raise ValueError("The extraction path {} was not found in the incoming data".format(path)) return ret From 1ade91db217bae581c6caa81cd3109baf9a2a1b6 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Fri, 12 Nov 2021 16:21:37 +0100 Subject: [PATCH 10/13] Change label key -> path --- python-connectors/api-connect_dataset/connector.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-connectors/api-connect_dataset/connector.json b/python-connectors/api-connect_dataset/connector.json index ba8c7a0..d7b3f57 100644 --- a/python-connectors/api-connect_dataset/connector.json +++ b/python-connectors/api-connect_dataset/connector.json @@ -110,8 +110,8 @@ }, { "name": "extraction_key", - "label": "Key to data array (optional)", - "description": "", + "label": "Path to data array (optional)", + "description": "Dot separated key path", "defaultValue": null, "type": "STRING" }, From 79f8eb86774aca303b756a89f1bb22edfab4cc07 Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Fri, 12 Nov 2021 17:08:28 +0100 Subject: [PATCH 11/13] Add integration test for path search --- tests/python/integration/test_scenario.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/python/integration/test_scenario.py b/tests/python/integration/test_scenario.py index 4e20be3..4164da9 100644 --- a/tests/python/integration/test_scenario.py +++ b/tests/python/integration/test_scenario.py @@ -21,3 +21,7 @@ def test_run_api_connect_using_global_variable(user_dss_clients): def test_run_api_connect_array_api(user_dss_clients): dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="ArrayAPI") + + +def test_run_api_connect_search_path(user_dss_clients): + dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="SearchPath") From bdc67dd68ea4d4b4b4018ac230a867615559ea5c Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Mon, 22 Nov 2021 11:04:35 +0100 Subject: [PATCH 12/13] Version update --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index 773faf6..40c088e 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "api-connect", - "version": "1.0.2", + "version": "1.0.3", "meta": { "label": "API Connect", "description": "Retrieve data from any REST API", From 518eefa325d1949d4f69aface73afbf015ff5e6b Mon Sep 17 00:00:00 2001 From: Alexandre Bourret Date: Mon, 22 Nov 2021 11:05:02 +0100 Subject: [PATCH 13/13] Changelog update --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d6ac6b..12223d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog + +## [Version 1.0.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.3) - Bugfix and feature release - 2021-11-23 + +- Fixes error raised on HTTP 204 status codes +- Adds requests performance indicator to output datasets +- Data extraction key is replaced by a path +- Fixes JSON formatting issues +- Implements RFC5988 for pagination + ## [Version 1.0.2](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.2) - Bugfix release - 2021-05-25 - Fixed recipe ignoring the selected http_method