Skip to content

Commit

Permalink
Add CutoffStream to Dropbox chunked upload and update doc string
Browse files Browse the repository at this point in the history
  • Loading branch information
cslzchen committed Jul 18, 2018
1 parent 0413169 commit 685f111
Showing 1 changed file with 75 additions and 43 deletions.
118 changes: 75 additions & 43 deletions waterbutler/providers/dropbox/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import typing
import logging
from http import HTTPStatus

from waterbutler.core import provider, streams
Expand All @@ -14,6 +15,8 @@
DropboxFolderMetadata,
)

logger = logging.getLogger(__name__)


class DropboxProvider(provider.BaseProvider):
"""Provider for the Dropbox.com cloud storage service.
Expand Down Expand Up @@ -294,30 +297,42 @@ async def _chunked_upload(self,
stream: streams.BaseStream,
path: WaterButlerPath
) -> dict:
"""Chunked uploading is a 3 step process.
"""Chunked uploading is a 3-step process using Dropbox's "Upload Session".
First a session is created, this session connects each of the chunks as
they are uploaded.
First, start a new upload session and receive an upload session ID.
API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-start
The file is split into multiple parts, and uploaded across multiple requests.
Then, split the file into multiple chunks and upload them across multiple requests.
API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-append
When all of the parts have finished uploading, the session that was
created is completed, and dropbox rearranges all the chunks that were
uploaded to reform the complete file.
Finally, when all of the parts have finished uploading, send a complete session request to
let Dropbox combine the uploaded data save it to the given file path.
API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-finish
https://www.dropbox.com/developers-v1/core/docs#chunked-upload
Quirks:
1. A single request should not upload more than 150 MB.
2. The maximum size of a file one can upload to an upload session is 350 GB.
3. An upload session can be used for a maximum of 48 hours.
"""

# 1. creates a upload session and retrieves session id to upload parts.
# 1. Create an upload session and retrieves the session id to upload parts.
session_id = await self._create_upload_session()

# 2. uploads parts to session.
# 2. Upload all parts in the session
await self._upload_parts(stream, session_id)

# 3. completes session and returns the uploaded files' metadata.
# 3. Complete the session and return the uploaded file's metadata.
return await self._complete_session(stream, session_id, path)

async def _create_upload_session(self) -> str:
"""Create an upload session for chunked upload.
"Upload sessions allow you to upload a single file in one or more requests, for example
where the size of the file is greater than 150 MB. This call starts a new upload session
with the given data."
Ref: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-start
"""

resp = await self.make_request(
'POST',
Expand All @@ -326,61 +341,78 @@ async def _create_upload_session(self) -> str:
'Content-Type': 'application/octet-stream',
'Dropbox-API-Arg': json.dumps({'close': False}),
},
expects={200},
expects=(200, ),
throws=core_exceptions.UploadError
)
return (await resp.json())['session_id']
return (await resp.json()).get('session_id', None)

async def _upload_parts(self,
stream: streams.BaseStream,
session_id: str) -> None:
"""Uploads all of the parts that compose the file to Dropbox.
async def _upload_parts(self, stream: streams.BaseStream, session_id: str) -> None:
"""Repeatedly upload all parts/chunks of the given stream to Dropbox one by one.
"""

upload_args = {
'close': False,
'cursor': {
'session_id': session_id,
'offset': None
}
'cursor': {'session_id': session_id, 'offset': 0, }
}
for i in range(0, stream.size, self.CHUNK_SIZE):
upload_args['cursor']['offset'] = i
await self._upload_part(
upload_args,
stream
)

async def _upload_part(self,
upload_args: dict,
stream: streams.BaseStream) -> None:
"""Uploads a single part for a Dropbox chunked upload session.
parts = [self.CHUNK_SIZE for _ in range(0, stream.size // self.CHUNK_SIZE)]
if stream.size % self.CHUNK_SIZE:
parts.append(stream.size - (len(parts) * self.CHUNK_SIZE))
logger.debug('Chunked upload segment sizes: {}'.format(parts))

last_chunk_size = 0
for chunk_id, chunk_size in enumerate(parts):
# Calculates the the ``offset`` that is required for ``/chunked_upload`` and that
# represents the number of bytes transferred so far. If the offset does not match the
# expected offset on the server, the server will ignore the request and respond with a
# 400 error that includes the current offset.
upload_args['cursor']['offset'] += last_chunk_size
logger.debug(' uploading part {} with size {} starting at offset '
'{}'.format(chunk_id + 1, chunk_size, upload_args['cursor']['offset']))
await self._upload_part(stream, chunk_size, upload_args)
last_chunk_size = chunk_size

async def _upload_part(self, stream: streams.BaseStream,
chunk_size: int, upload_args: dict) -> None:
"""Upload one part/chunk of the given stream to Dropbox
"Append more data to an upload session. When the parameter close is set, this call will
close the session. A single request should not upload more than 150 MB. ..."
Ref: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-append
"""

cutoff_stream = streams.CutoffStream(stream, cutoff=chunk_size)

resp = await self.make_request(
'POST',
self._build_content_url('files', 'upload_session', 'append_v2'),
headers={
# ``Content-Length`` is required for ``asyncio`` to use inner chunked stream read
'Content-Length': str(chunk_size),
'Content-Type': 'application/octet-stream',
'Dropbox-API-Arg': json.dumps(upload_args),
},
data=stream.read(self.CHUNK_SIZE),
data=cutoff_stream,
expects=(200, ),
throws=core_exceptions.UploadError,
throws=core_exceptions.UploadError
)

await resp.release()

async def _complete_session(self,
stream: streams.BaseStream,
session_id: str,
async def _complete_session(self, stream: streams.BaseStream, session_id: str,
path: WaterButlerPath) -> dict:
"""Complete the chunked upload session.
"Finish an upload session and save the uploaded data to the given file path. ... The maximum
size of a file one can upload to an upload session is 350 GB."
Ref: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-finish
"""

upload_args = {
'cursor': {
'session_id': session_id,
'offset': stream.size,
},
'commit': {
"path": path.full_path,
},
'cursor': {'session_id': session_id, 'offset': stream.size, },
'commit': {"path": path.full_path, },
}

resp = await self.make_request(
Expand Down

0 comments on commit 685f111

Please sign in to comment.