Skip to content

Commit

Permalink
Merge pull request #189 from clehene/master
Browse files Browse the repository at this point in the history
Fixed Multi chunk transfer hangs as merging chunks fails (#187)
  • Loading branch information
chdevala authored Sep 8, 2017
2 parents 4ef1e75 + 4c6c152 commit 359c2e4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
6 changes: 5 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Release History
===============
unreleased
----------
* Fixed Multi chunk transfer hangs as merging chunks fails #187

0.0.15 (2017-07-26)
-------------------
* Enable Data Lake Store progress controller callback #174
Expand Down Expand Up @@ -87,4 +91,4 @@ Release History
0.0.1 (2016-11-21)
------------------
* Initial preview release. Based on API version 2016-11-01.
* Includes initial ADLS filesystem functionality and extended upload and download support.
* Includes initial ADLS filesystem functionality and extended upload and download support.
6 changes: 4 additions & 2 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,14 @@ def _update(self, future):
if self._merge and len(cstates.objects) > 1:
logger.debug("Merging file: %s", self._fstates[parent])
self._fstates[parent] = 'merging'
merge_future = self._submit(
merge_future = self._pool.submit(
self._merge, self._adlfs, dst,
[chunk for chunk, _ in sorted(cstates.objects,
key=operator.itemgetter(1))],
overwrite=self._parent._overwrite)
overwrite=self._parent._overwrite,
shutdown_event=self._shutdown_event)
self._ffutures[merge_future] = parent
merge_future.add_done_callback(self._update)
else:
if not self._chunked and str(dst).endswith('.inprogress'):
logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
Expand Down
24 changes: 24 additions & 0 deletions tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_even
assert calls == [(8, 32), (16, 32), (24, 32), (32, 32)]


def test_merge(azure):

calls = []

def merge(adlfs, outfile, files, shutdown_event=None, overwrite=False):
calls.append(files)

def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
return size, None

class XLoaderMock(object):
_overwrite = False

file_size = 32
chunk_size = 8
client = ADLTransferClient(azure, parent=XLoaderMock(), transfer=transfer, merge=merge,
chunksize=chunk_size, chunked=True)

client.submit('foo', AzureDLPath('bar'), file_size)
client.run()

assert len(calls[0]) == file_size / chunk_size


def test_temporary_path(azure):
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize):
return size, None
Expand Down

0 comments on commit 359c2e4

Please sign in to comment.