Skip to content

Commit

Permalink
Dev to master for 0.0.46 release (#292)
Browse files Browse the repository at this point in the history
* Add doc on cp that is not implemented

* Fix response not none

* Add per request timeout capability

* Concat shouldn't retry

* Update history and version
  • Loading branch information
akharit authored Jun 27, 2019
1 parent 3130ccd commit 33d96ba
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 11 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
Release History
===============

0.0.46 (2019-06-25)
+++++++++++++++++++
* Expose per request timeout. Default to 60.
* Concat will not retry by default.
* Bug fixes.

0.0.45 (2019-05-10)
+++++++++++++++++++
* Update open and close ADLFile semantics
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.45"
__version__ = "0.0.46"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
17 changes: 11 additions & 6 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from .lib import DatalakeRESTInterface
from .utils import ensure_writable, read_block
from .enums import ExpiryOptionType
from .retry import ExponentialRetryPolicy
from .retry import ExponentialRetryPolicy, NoRetryPolicy
from .multiprocessor import multi_processor_change_acl

if sys.version_info >= (3, 4):
Expand All @@ -39,6 +39,7 @@
logger = logging.getLogger(__name__)
valid_expire_types = [x.value for x in ExpiryOptionType]


class AzureDLFileSystem(object):
"""
Access Azure DataLake Store as if it were a file-system
Expand All @@ -57,16 +58,18 @@ class AzureDLFileSystem(object):
The API version to target with requests. Changing this value will
change the behavior of the requests, and can cause unexpected behavior or
breaking changes. Changes to this value should be undergone with caution.
per_call_timeout_seconds : float(60)
This is the timeout for each requests library call.
kwargs: optional key/values
See ``lib.auth()``; full list: tenant_id, username, password, client_id,
client_secret, resource
"""
_singleton = [None]

def __init__(self, token=None, **kwargs):
# store instance vars
def __init__(self, token=None, per_call_timeout_seconds=60, **kwargs):
self.token = token
self.kwargs = kwargs
self.per_call_timeout_seconds = per_call_timeout_seconds
self.connect()
self.dirs = {}
self._emptyDirs = []
Expand All @@ -85,7 +88,7 @@ def connect(self):
"""
Establish connection object.
"""
self.azure = DatalakeRESTInterface(token=self.token, **self.kwargs)
self.azure = DatalakeRESTInterface(token=self.token, req_timeout_s= self.per_call_timeout_seconds, **self.kwargs)
self.token = self.azure.token

def __setstate__(self, state):
Expand Down Expand Up @@ -775,16 +778,18 @@ def concat(self, outfile, filelist, delete_source=False):
sourceList = [AzureDLPath(f).as_posix() for f in filelist]
sources = {}
sources["sources"] = sourceList

self.azure.call('MSCONCAT', outfile.as_posix(),
data=bytearray(json.dumps(sources,separators=(',', ':')), encoding="utf-8"),
deleteSourceDirectory=delete,
headers={'Content-Type': "application/json"})
headers={'Content-Type': "application/json"},
retry_policy=NoRetryPolicy())
self.invalidate_cache(outfile)

merge = concat

def cp(self, path1, path2):
""" Copy file between locations on ADL """
""" Not implemented. Copy file between locations on ADL """
# TODO: any implementation for this without download?
raise NotImplementedError

Expand Down
7 changes: 5 additions & 2 deletions azure/datalake/store/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class DatalakeRESTInterface:
The API version to target with requests. Changing this value will
change the behavior of the requests, and can cause unexpected behavior or
breaking changes. Changes to this value should be undergone with caution.
req_timeout_s : float(60)
This is the timeout for each requests library call.
kwargs: optional arguments to auth
See ``auth()``. Includes, e.g., username, password, tenant; will pull
values from environment variables if not provided.
Expand Down Expand Up @@ -256,7 +258,7 @@ class DatalakeRESTInterface:
}

def __init__(self, store_name=default_store, token=None,
url_suffix=default_adls_suffix, api_version='2018-09-01', **kwargs):
url_suffix=default_adls_suffix, api_version='2018-09-01', req_timeout_s=60, **kwargs):
# in the case where an empty string is passed for the url suffix, it must be replaced with the default.
url_suffix = url_suffix or default_adls_suffix
self.local = threading.local()
Expand All @@ -278,6 +280,7 @@ def __init__(self, store_name=default_store, token=None,
platform.platform(),
__name__,
__version__)
self.req_timeout_s = req_timeout_s

@property
def session(self):
Expand Down Expand Up @@ -474,7 +477,7 @@ def __call_once(self, method, url, params, data, stream, request_id, retry_count
req_headers['User-Agent'] = self.user_agent
req_headers.update(headers)
self._log_request(method, url, op, urllib.quote(path), kwargs, req_headers, retry_count)
return func(url, params=params, headers=req_headers, data=data, stream=stream)
return func(url, params=params, headers=req_headers, data=data, stream=stream, timeout=self.req_timeout_s)

def __getstate__(self):
state = self.__dict__.copy()
Expand Down
4 changes: 2 additions & 2 deletions azure/datalake/store/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def f_retry(*args, **kwargs):
response = response_from_adal_exception(last_exception)
if hasattr(last_exception, 'response'): # HTTP exception i.e 429
response = last_exception.response

request_successful = last_exception is None or response.status_code == 401 # 401 = Invalid credentials
request_successful = last_exception is None or (response is not None and response.status_code == 401) # 401 = Invalid credentials
if request_successful or not retry_policy.should_retry(response, last_exception, retry_count):
break
if last_exception is not None:
Expand Down

0 comments on commit 33d96ba

Please sign in to comment.