Skip to content

Commit

Permalink
Release 0.0.45 (#284)
Browse files Browse the repository at this point in the history
* Remove multiple calls to info while opening a file. Also make sure that server file is created when opening.
* Also made sure that both append and write mode buffer are not larger than 4 Mb.
* Add test to check creation on open
* Update test recordings
* Update version to 0.0.45
  • Loading branch information
akharit authored May 10, 2019
1 parent f325c5d commit 3130ccd
Show file tree
Hide file tree
Showing 110 changed files with 68,346 additions and 97 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
Release History
===============

0.0.45 (2019-05-10)
+++++++++++++++++++
* Update open and close ADLFile semantics
* Refactor code and improve performance for opening a file

0.0.44 (2019-03-05)
+++++++++++++++++++
* Add continuation token to LISTSTATUS api call
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.44"
__version__ = "0.0.45"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
145 changes: 53 additions & 92 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def info(self, path, invalidate_cache=True, expected_error_code=None):
for f in self.dirs[root_as_posix]:
if f['name'] == path_as_posix:
found = True
f = to_return
break
if not found:
self.dirs[root_as_posix].append(to_return)
Expand Down Expand Up @@ -916,7 +915,7 @@ class AzureDLFile(object):
location of file
mode : str {'wb', 'rb', 'ab'}
blocksize : int
Size of the write or read-ahead buffer. For writing, will be
Size of the write or read-ahead buffer. For writing(and appending, will be
truncated to 4MB (2**22).
delimiter : bytes or None
If specified and in write mode, each flush will send data terminating
Expand Down Expand Up @@ -949,27 +948,45 @@ def __init__(self, azure, path, mode='rb', blocksize=2**25,
self.trim = True
self.buffer = io.BytesIO()
self.blocksize = blocksize
self.first_write = True
uniqueid = str(uuid.uuid4())
self.filesessionid = uniqueid
self.leaseid = uniqueid

# always invalidate the cache when checking for existence of a file
# that may be created or written to (for the first time).
exists = self.azure.exists(path, invalidate_cache=True)
try:
file_data = self.azure.info(path, invalidate_cache=True, expected_error_code=404)
exists = True
except FileNotFoundError:
exists = False

# cannot create a new file object out of a directory
if exists and self.info()['type'] == 'DIRECTORY':
if exists and file_data['type'] == 'DIRECTORY':
raise IOError('path: {} is a directory, not a file, and cannot be opened for reading or writing'.format(path))

if mode == 'ab' and exists:
self.loc = self.info()['length']
self.first_write = False
elif mode == 'rb':
self.size = self.info()['length']
else:
if mode == 'ab' or mode == 'wb':
self.blocksize = min(2**22, blocksize)

if mode == 'ab' and exists:
self.loc = file_data['length']
elif (mode == 'ab' and not exists) or (mode == 'wb'):
# Create the file
_put_data_with_retry(
rest=self.azure.azure,
op='CREATE',
path=self.path.as_posix(),
data=None,
overwrite='true',
write='true',
syncFlag='DATA',
leaseid=self.leaseid,
filesessionid=self.filesessionid)
logger.debug('Created file %s ' % self.path)
else: # mode == 'rb':
if not exists:
raise FileNotFoundError(path.as_posix())
self.size = file_data['length']

def info(self):
""" File information about this path """
return self.azure.info(self.path)
Expand Down Expand Up @@ -1154,7 +1171,6 @@ def write(self, data):
self.flush(syncFlag='DATA')
return out


def flush(self, syncFlag='METADATA', force=False):
"""
Write buffered data to ADL.
Expand All @@ -1172,94 +1188,39 @@ def flush(self, syncFlag='METADATA', force=False):
return

if not (syncFlag == 'METADATA' or syncFlag == 'DATA' or syncFlag == 'CLOSE'):
raise ValueError('syncFlag must be one of these: METADAT, DATA or CLOSE')


if self.buffer.tell() == 0:
if force and self.first_write:
_put_data_with_retry(
self.azure.azure,
'CREATE',
path=self.path.as_posix(),
data=None,
overwrite='true',
write='true',
syncFlag=syncFlag,
leaseid=self.leaseid,
filesessionid=self.filesessionid)
self.first_write = False
return

self.buffer.seek(0)
raise ValueError('syncFlag must be one of these: METADATA, DATA or CLOSE')

common_args_append = {
'rest': self.azure.azure,
'op': 'APPEND',
'path': self.path.as_posix(),
'append': 'true',
'leaseid': self.leaseid,
'filesessionid': self.filesessionid
}
self.buffer.seek(0) # Go to start of buffer
data = self.buffer.read()

syncFlagLocal = 'DATA'
while len(data) > self.blocksize:
data_to_write_limit = self.blocksize
if self.delimiter:
place = data[:self.blocksize].rfind(self.delimiter)
else:
place = -1
if place < 0:
# not found - write whole block
limit = self.blocksize
else:
limit = place + len(self.delimiter)
if self.first_write:
_put_data_with_retry(
self.azure.azure,
'CREATE',
path=self.path.as_posix(),
data=data[:limit],
overwrite='true',
write='true',
syncFlag=syncFlagLocal,
leaseid=self.leaseid,
filesessionid=self.filesessionid)
self.first_write = False
else:
_put_data_with_retry(
self.azure.azure,
'APPEND',
path=self.path.as_posix(),
data=data[:limit],
append='true',
syncFlag=syncFlagLocal,
leaseid=self.leaseid,
filesessionid=self.filesessionid)
logger.debug('Wrote %d bytes to %s' % (limit, self))
data = data[limit:]

delimiter_index = data.rfind(self.delimiter, 0, self.blocksize)
if delimiter_index != -1: # delimiter found
data_to_write_limit = delimiter_index + len(self.delimiter)

self.buffer = io.BytesIO(data)
self.buffer.seek(0, 2)
offset = self.tell() - len(data)
_put_data_with_retry(syncFlag='DATA', data=data[:data_to_write_limit], offset=offset, **common_args_append)
logger.debug('Wrote %d bytes to %s' % (data_to_write_limit, self))
data = data[data_to_write_limit:]

if force:
zero_offset = self.tell() - len(data)
if self.first_write:
_put_data_with_retry(
self.azure.azure,
'CREATE',
path=self.path.as_posix(),
data=data,
overwrite='true',
write='true',
syncFlag=syncFlag,
leaseid=self.leaseid,
filesessionid=self.filesessionid)
self.first_write = False
else:
_put_data_with_retry(
self.azure.azure,
'APPEND',
path=self.path.as_posix(),
data=data,
offset=zero_offset,
append='true',
syncFlag=syncFlag,
leaseid=self.leaseid,
filesessionid=self.filesessionid)
offset = self.tell() - len(data)
_put_data_with_retry(syncFlag=syncFlag, data=data, offset=offset, **common_args_append)
logger.debug('Wrote %d bytes to %s' % (len(data), self))
self.buffer = io.BytesIO()
data = b''

self.buffer = io.BytesIO(data)
self.buffer.seek(0, 2) # seek to end for other writes to buffer

def close(self):
""" Close file
Expand Down
Loading

0 comments on commit 3130ccd

Please sign in to comment.