Skip to content

Commit

Permalink
Merge pull request #35 from dataiku/release/1.2.0
Browse files Browse the repository at this point in the history
Release/1.2.0
  • Loading branch information
alexbourret authored Oct 12, 2023
2 parents cc74416 + 44b3d40 commit cde8a66
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 30 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## [Version 1.2.0](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.0) - Feature and bugfix release - 2023-05-31

- Add Brotli compression
- Faster recurring calls
- dku_error column kept at all time in API-Connect recipe output schema
- Updated code-env descriptor for DSS 12

## [Version 1.1.4](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.4) - Feature and bugfix release - 2023-02-28

- Add Brotli compression
- Faster recurring calls
- dku_error column kept at all time in API-Connect recipe output schema

## [Version 1.1.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.3) - Bugfix release - 2023-04-18

- Updated code-env descriptor for DSS 12
Expand Down
2 changes: 2 additions & 0 deletions code-env/python/spec/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
jsonpath-ng==1.5.3
requests_ntlm==1.1.0
requests==2.26.0
Brotli==1.0.9
13 changes: 13 additions & 0 deletions custom-recipes/api-connect/recipe.json
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@
"type": "SEPARATOR",
"label": "Advanced"
},
{
"name": "behaviour_when_error",
"label": "Error behaviour",
"description": "Decide how the recipe should react when an input line results in an error",
"type": "SELECT",
"defaultValue": "keep-error-column",
"selectChoices":[
{"value": "ignore", "label": "Ignore the error"},
{"value": "add-error-column", "label": "Add an error column"},
{"value": "keep-error-column", "label": "Error column always in schema"},
{"value": "raise", "label": "Fail the job"}
]
},
{
"name": "ignore_ssl_check",
"label": "Ignore SSL check",
Expand Down
9 changes: 6 additions & 3 deletions custom-recipes/api-connect/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from safe_logger import SafeLogger
from dku_utils import get_dku_key_values, get_endpoint_parameters
from rest_api_recipe_session import RestApiRecipeSession
from dku_constants import DKUConstants


logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)


def get_partitioning_keys(id_list, dku_flow_variables):
Expand All @@ -25,7 +26,7 @@ def get_partitioning_keys(id_list, dku_flow_variables):
return partitioning_keys


logger.info('API-Connect plugin recipe v1.1.3')
logger.info('API-Connect plugin recipe v{}'.format(DKUConstants.PLUGIN_VERSION))

input_A_names = get_input_names_for_role('input_A_role')
config = get_recipe_config()
Expand All @@ -34,6 +35,7 @@ def get_partitioning_keys(id_list, dku_flow_variables):
logger.info("config={}".format(logger.filter_secrets(config)))

credential_parameters = config.get("credential", {})
behaviour_when_error = config.get("behaviour_when_error", "add-error-column")
endpoint_parameters = get_endpoint_parameters(config)
extraction_key = endpoint_parameters.get("extraction_key", "")
is_raw_output = endpoint_parameters.get("raw_output", True)
Expand All @@ -57,7 +59,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
parameter_columns,
parameter_renamings,
display_metadata,
maximum_number_rows=maximum_number_rows
maximum_number_rows=maximum_number_rows,
behaviour_when_error=behaviour_when_error
)
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)

Expand Down
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "api-connect",
"version": "1.1.3",
"version": "1.2.0",
"meta": {
"label": "API Connect",
"description": "Retrieve data from any REST API",
Expand Down
4 changes: 2 additions & 2 deletions python-connectors/api-connect_dataset/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import json


logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)


class RestAPIConnector(Connector):

def __init__(self, config, plugin_config):
Connector.__init__(self, config, plugin_config) # pass the parameters to the base class
logger.info('API-Connect plugin connector v1.1.3')
logger.info('API-Connect plugin connector v{}'.format(DKUConstants.PLUGIN_VERSION))
logger.info("config={}".format(logger.filter_secrets(config)))
endpoint_parameters = get_endpoint_parameters(config)
credential = config.get("credential", {})
Expand Down
5 changes: 4 additions & 1 deletion python-lib/dku_constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
class DKUConstants(object):
API_RESPONSE_KEY = "api_response"
RAW_BODY_FORMAT = "RAW"
FORBIDDEN_KEYS = ["token", "password", "api_key_value"]
FORM_DATA_BODY_FORMAT = "FORM_DATA"
PLUGIN_VERSION = "1.2.0"
RAW_BODY_FORMAT = "RAW"
REPONSE_ERROR_KEY = "dku_error"
34 changes: 23 additions & 11 deletions python-lib/rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dku_constants import DKUConstants


logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)


class RestAPIClientError(ValueError):
Expand All @@ -17,7 +17,7 @@ class RestAPIClientError(ValueError):

class RestAPIClient(object):

def __init__(self, credential, endpoint, custom_key_values={}):
def __init__(self, credential, endpoint, custom_key_values={}, session=None, behaviour_when_error=None):
logger.info("Initialising RestAPIClient, credential={}, endpoint={}".format(logger.filter_secrets(credential), endpoint))

# presets_variables contains all variables available in templates using the {{variable_name}} notation
Expand Down Expand Up @@ -57,6 +57,7 @@ def __init__(self, credential, endpoint, custom_key_values={}):
self.timeout = endpoint.get("timeout", -1)
if self.timeout > 0:
self.requests_kwargs.update({"timeout": self.timeout})
self.behaviour_when_error = behaviour_when_error or "add-error-column"

self.requests_kwargs.update({"params": self.params})
self.pagination = Pagination()
Expand Down Expand Up @@ -90,7 +91,10 @@ def __init__(self, credential, endpoint, custom_key_values={}):
key_value_body = endpoint.get("key_value_body", {})
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
self.metadata = {}
if self.behaviour_when_error == "keep-error-column":
self.metadata = {DKUConstants.REPONSE_ERROR_KEY: None}
self.call_number = 0
self.session = session or requests.Session()

def set_login(self, credential):
login_type = credential.get("login_type", "no_auth")
Expand Down Expand Up @@ -131,26 +135,34 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.")
request_start_time = time.time()
self.time_last_request = request_start_time
error_message = None
status_code = None
response_headers = None
try:
response = self.request_with_redirect_retry(method, url, **kwargs)
request_finish_time = time.time()
status_code = response.status_code
response_headers = response.headers
except Exception as err:
self.pagination.is_last_batch_empty = True
error_message = "Error: {}".format(err)
if can_raise_exeption:
raise RestAPIClientError(error_message)
else:
return {"error": error_message}

request_finish_time = time.time()
self.set_metadata("request_duration", request_finish_time - request_start_time)
self.set_metadata("status_code", response.status_code)
self.set_metadata("response_headers", "{}".format(response.headers))
self.set_metadata("status_code", status_code)
self.set_metadata("response_headers", "{}".format(response_headers))

if error_message:
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}

if response.status_code >= 400:
error_message = "Error {}: {}".format(response.status_code, response.content)
self.pagination.is_last_batch_empty = True
if can_raise_exeption:
raise RestAPIClientError(error_message)
else:
return {"error": error_message}
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
if response.status_code in [204]:
self.pagination.update_next_page({}, response.links)
return self.empty_json_response()
Expand All @@ -163,20 +175,20 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
logger.error("response.content={}".format(response.content))
if can_raise_exeption:
raise RestAPIClientError("The API did not return JSON as expected. {}".format(error_message))
return {"error": error_message}
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}

self.pagination.update_next_page(json_response, response.links)
return json_response

def request_with_redirect_retry(self, method, url, **kwargs):
# In case of redirection to another domain, the authorization header is not kept
# If redirect_auth_header is true, another attempt is made with initial headers to the redirected url
response = requests.request(method, url, **kwargs)
response = self.session.request(method, url, **kwargs)
if self.redirect_auth_header and not response.url.startswith(url):
redirection_kwargs = copy.deepcopy(kwargs)
redirection_kwargs.pop("params", None) # params are contained in the redirected url
logger.warning("Redirection ! Accessing endpoint {} with initial authorization headers".format(response.url))
response = requests.request(method, response.url, **redirection_kwargs)
response = self.session.request(method, response.url, **redirection_kwargs)
return response

def paginated_api_call(self, can_raise_exeption=True):
Expand Down
38 changes: 29 additions & 9 deletions python-lib/rest_api_recipe_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
from dku_constants import DKUConstants
import copy
import json
import requests

logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])

logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)


class RestApiRecipeSession:
def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings,
display_metadata=False,
maximum_number_rows=-1):
maximum_number_rows=-1, behaviour_when_error=None):
self.custom_key_values = custom_key_values
self.credential_parameters = credential_parameters
self.endpoint_parameters = endpoint_parameters
Expand All @@ -23,7 +25,8 @@ def __init__(self, custom_key_values, credential_parameters, endpoint_parameters
self.display_metadata = display_metadata
self.maximum_number_rows = maximum_number_rows
self.is_row_limit = (self.maximum_number_rows > 0)
self.can_raise = False
self.behaviour_when_error = behaviour_when_error or "add-error-column"
self.can_raise = self.behaviour_when_error == "raise"

@staticmethod
def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
Expand All @@ -38,6 +41,7 @@ def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
def process_dataframe(self, input_parameters_dataframe, is_raw_output):
results = []
time_last_request = None
session = requests.Session()
for index, input_parameters_row in input_parameters_dataframe.iterrows():
rows_count = 0
self.initial_parameter_columns = {}
Expand All @@ -52,7 +56,13 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
updated_endpoint_parameters,
self.custom_key_values
))
self.client = RestAPIClient(self.credential_parameters, updated_endpoint_parameters, custom_key_values=self.custom_key_values)
self.client = RestAPIClient(
self.credential_parameters,
updated_endpoint_parameters,
custom_key_values=self.custom_key_values,
session=session,
behaviour_when_error=self.behaviour_when_error
)
self.client.time_last_request = time_last_request
while self.client.has_more_data():
page_results = self.retrieve_next_page(is_raw_output)
Expand All @@ -66,17 +76,24 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
def retrieve_next_page(self, is_raw_output):
page_rows = []
logger.info("retrieve_next_page: Calling next page")
json_response = self.client.paginated_api_call(can_raise_exeption=False)
metadata = self.client.get_metadata() if self.display_metadata else {}
json_response = self.client.paginated_api_call(can_raise_exeption=self.can_raise)
default_dict = {
DKUConstants.REPONSE_ERROR_KEY: json_response.get(DKUConstants.REPONSE_ERROR_KEY, None)
} if self.behaviour_when_error == "keep-error-column" else {}
metadata = self.client.get_metadata() if self.display_metadata else default_dict
is_api_returning_dict = True
if self.extraction_key:
data_rows = get_value_from_path(json_response, self.extraction_key.split("."), can_raise=False)
if data_rows is None:
if data_rows is None or type(data_rows) != list:
if self.behaviour_when_error == "ignore":
return []
error_message = "Extraction key '{}' was not found in the incoming data".format(self.extraction_key)
if self.can_raise:
raise DataikuException(error_message)
elif DKUConstants.REPONSE_ERROR_KEY in metadata:
return [metadata]
else:
return [{"error": error_message}]
return self.format_page_rows([{DKUConstants.REPONSE_ERROR_KEY: error_message}], is_raw_output, metadata)
page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata))
else:
# Todo: check api_response key is free and add something overwise
Expand Down Expand Up @@ -112,6 +129,9 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
base_row.update(metadata)
if is_raw_output:
if is_error_message(data_row):
base_row.update({
DKUConstants.API_RESPONSE_KEY: None
})
base_row.update(parse_keys_for_json(data_row))
else:
base_row.update({
Expand All @@ -126,7 +146,7 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
def is_error_message(jsons_response):
if type(jsons_response) not in [dict, list]:
return False
if "error" in jsons_response and len(jsons_response) == 1:
if DKUConstants.REPONSE_ERROR_KEY in jsons_response and len(jsons_response) == 1:
return True
else:
return False
6 changes: 3 additions & 3 deletions python-lib/safe_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@


class SafeLogger(object):
def __init__(self, name, forbiden_keys=None):
def __init__(self, name, forbidden_keys=None):
self.name = name
self.logger = logging.getLogger(self.name)
logging.basicConfig(
level=logging.INFO,
format='{} %(levelname)s - %(message)s'.format(self.name)
)
self.forbiden_keys = forbiden_keys
self.forbidden_keys = forbidden_keys

def info(self, message):
self.logger.info(message)
Expand All @@ -33,7 +33,7 @@ def dig_secrets(self, dictionary):
for key in dictionary:
if isinstance(dictionary[key], dict):
dictionary[key] = self.filter_secrets(dictionary[key])
if key in self.forbiden_keys:
if key in self.forbidden_keys:
dictionary[key] = hash(dictionary[key])
return dictionary

Expand Down

0 comments on commit cde8a66

Please sign in to comment.