diff --git a/CHANGES.rst b/CHANGES.rst index f4b5ceb..616f1d2 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,11 +1,13 @@ Changelog ========= -1.5 (unreleased) + +2.0 (unreleased) ---------------- - Use `pyproject.toml` instead of `setup.py` [tomgross] - Document `eapi`-endpoint for fs.opener [tomgross] +- Implement binary-protocol [tomgross] 1.4 (2024-12-29) ---------------- diff --git a/src/pcloud/api.py b/src/pcloud/api.py index 534e3f9..abef669 100644 --- a/src/pcloud/api.py +++ b/src/pcloud/api.py @@ -1,28 +1,26 @@ +import os +import requests +import zipfile + from hashlib import sha1 from io import BytesIO + +from pcloud.protocols import JsonAPIProtocol +from pcloud.protocols import JsonEAPIProtocol +from pcloud.protocols import BinAPIProtocol +from pcloud.protocols import BinEAPIProtocol +from pcloud.protocols import TestProtocol +from pcloud.protocols import NearestProtocol +from pcloud.jsonprotocol import PCloudJSONConnection from pcloud.oauth2 import TokenHandler +from pcloud.utils import log +from pcloud.utils import to_api_datetime from pcloud.validate import MODE_AND from pcloud.validate import RequiredParameterCheck -from requests_toolbelt.multipart.encoder import MultipartEncoder + from urllib.parse import urlparse from urllib.parse import urlunsplit -import datetime -import logging -import os.path -import requests -import sys -import zipfile - - -log = logging.getLogger("pcloud") -log.setLevel(logging.INFO) - -handler = logging.StreamHandler(sys.stderr) -handler.setLevel(logging.INFO) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -log.addHandler(handler) # File open flags https://docs.pcloud.com/methods/fileops/file_open.html O_WRITE = int("0x0002", 16) @@ -47,40 +45,35 @@ class InvalidFileModeError(Exception): """File mode not supported""" -# Helpers -def to_api_datetime(dt): - """Converter to a datetime structure the pCloud API understands - - See https://docs.pcloud.com/structures/datetime.html - """ - if isinstance(dt, datetime.datetime): - return dt.isoformat() - return dt - - class PyCloud(object): endpoints = { - "api": "https://api.pcloud.com/", - "eapi": "https://eapi.pcloud.com/", - "test": "http://localhost:5023/", - "nearest": "", + "api": JsonAPIProtocol, + "eapi": JsonEAPIProtocol, + "test": TestProtocol, + "binapi": BinAPIProtocol, + "bineapi": BinEAPIProtocol, + "nearest": NearestProtocol, } def __init__( self, username, password, endpoint="api", token_expire=31536000, oauth2=False ): - self.session = requests.Session() if endpoint not in self.endpoints: log.error( "Endpoint (%s) not found. Use one of: %s", endpoint, - ",".join(self.endpoints.keys()), + ", ".join(self.endpoints.keys()), ) return elif endpoint == "nearest": self.endpoint = self.getnearestendpoint() + conn = PCloudJSONConnection(self) else: - self.endpoint = self.endpoints.get(endpoint) + protocol = self.endpoints.get(endpoint) + self.endpoint = protocol.endpoint + conn = protocol.connection(self) + self.connection = conn.connect() + log.info(f"Using pCloud API endpoint: {self.endpoint}") self.username = username.lower().encode("utf-8") self.password = password.encode("utf-8") @@ -108,38 +101,26 @@ def oauth2_authorize( See https://docs.pcloud.com/methods/oauth_2.0/authorize.html Per default the Python webbrowser library, which opens - a reals browser is used for URL redirection. + a real browser used for URL redirection. You can provide your own token handler (i.e. headless selenium), if needed. """ - ep = {urlparse(y).netloc: x for x, y in PyCloud.endpoints.items()} + ep = { + urlparse(protocol.endpoint).netloc: key + for key, protocol in PyCloud.endpoints.items() + } code, hostname = tokenhandler(client_id).get_access_token() params = {"client_id": client_id, "client_secret": client_secret, "code": code} endpoint = ep.get(hostname) - endpoint_url = PyCloud.endpoints.get(endpoint) + endpoint_url = PyCloud.endpoints.get(endpoint).endpoint resp = requests.get(endpoint_url + "oauth2_token", params=params).json() access_token = resp.get("access_token") return cls("", access_token, endpoint, token_expire, oauth2=True) def _do_request(self, method, authenticate=True, json=True, endpoint=None, **kw): - if authenticate and self.auth_token: # Password authentication - params = {"auth": self.auth_token} - elif authenticate and self.access_token: # OAuth2 authentication - params = {"access_token": self.access_token} - else: - params = {} - if endpoint is None: - endpoint = self.endpoint - params.update(kw) - log.debug("Doing request to %s%s", endpoint, method) - log.debug("Params: %s", params) - resp = self.session.get(endpoint + method, params=params) - if json: - result = resp.json() - else: - result = resp.content - log.debug("Response: %s", result) - return result + return self.connection.do_get_request( + method, authenticate, json, endpoint, **kw + ) # Authentication def getdigest(self): @@ -176,6 +157,7 @@ def getnearestendpoint(self): resp = self._do_request( "getapiserver", authenticate=False, endpoint=default_api ) + api = resp.get("api") if len(api): return urlunsplit(["https", api[0], "/", "", ""]) @@ -234,24 +216,11 @@ def copyfolder(self, **kwargs): raise NotImplementedError # File - def _upload(self, method, files, **kwargs): - if self.auth_token: # Password authentication - kwargs["auth"] = self.auth_token - elif self.access_token: # OAuth2 authentication - kwargs["access_token"] = self.access_token - fields = list(kwargs.items()) - fields.extend(files) - m = MultipartEncoder(fields=fields) - resp = requests.post( - self.endpoint + method, data=m, headers={"Content-Type": m.content_type} - ) - return resp.json() - @RequiredParameterCheck(("files", "data")) def uploadfile(self, **kwargs): """upload a file to pCloud - 1) You can specify a list of filenames to read + 1) You can specify a list of filenames to upload files=['/home/pcloud/foo.txt', '/home/pcloud/bar.txt'] 2) you can specify binary data via the data parameter and @@ -276,7 +245,7 @@ def uploadfile(self, **kwargs): if "folderid" in kwargs: # cast folderid to string, since API allows this but requests not kwargs["folderid"] = str(kwargs["folderid"]) - return self._upload("uploadfile", files, **kwargs) + return self.connection.upload("uploadfile", files, **kwargs) @RequiredParameterCheck(("progresshash",)) def uploadprogress(self, **kwargs): @@ -365,53 +334,55 @@ def gettextfile(self, **kwargs): # File API methods @RequiredParameterCheck(("flags",)) def file_open(self, **kwargs): - return self._do_request("file_open", **kwargs) + return self._do_request("file_open", use_session=True, **kwargs) @RequiredParameterCheck(("fd", "count")) def file_read(self, **kwargs): - return self._do_request("file_read", json=False, **kwargs) + return self._do_request("file_read", json=False, use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_pread(self, **kwargs): - return self._do_request("file_pread", json=False, **kwargs) + return self._do_request("file_pread", json=False, use_session=True, **kwargs) @RequiredParameterCheck(("fd", "data")) def file_pread_ifmod(self, **kwargs): - return self._do_request("file_pread_ifmod", json=False, **kwargs) + return self._do_request( + "file_pread_ifmod", json=False, use_session=True, **kwargs + ) @RequiredParameterCheck(("fd",)) def file_size(self, **kwargs): - return self._do_request("file_size", **kwargs) + return self._do_request("file_size", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_truncate(self, **kwargs): - return self._do_request("file_truncate", **kwargs) + return self._do_request("file_truncate", use_session=True, **kwargs) @RequiredParameterCheck(("fd", "data")) def file_write(self, **kwargs): files = [("file", ("upload-file.io", BytesIO(kwargs.pop("data"))))] kwargs["fd"] = str(kwargs["fd"]) - return self._upload("file_write", files, **kwargs) + return self.connection.upload("file_write", files, **kwargs) @RequiredParameterCheck(("fd",)) def file_pwrite(self, **kwargs): - return self._do_request("file_pwrite", **kwargs) + return self._do_request("file_pwrite", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_checksum(self, **kwargs): - return self._do_request("file_checksum", **kwargs) + return self._do_request("file_checksum", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_seek(self, **kwargs): - return self._do_request("file_seek", **kwargs) + return self._do_request("file_seek", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_close(self, **kwargs): - return self._do_request("file_close", **kwargs) + return self._do_request("file_close", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_lock(self, **kwargs): - return self._do_request("file_lock", **kwargs) + return self._do_request("file_lock", use_session=True, **kwargs) # Archiving @RequiredParameterCheck(("path", "fileid")) @@ -529,5 +500,19 @@ def trash_restorepath(self, **kwargs): def trash_restore(self, **kwargs): raise NotImplementedError + # convenience methods + @RequiredParameterCheck(("path",)) + def file_exists(self, **kwargs): + path = kwargs["path"] + resp = self.file_open(path=path, flags=O_APPEND) + result = resp.get("result") + if result == 0: + self.file_close(fd=resp["fd"]) + return True + elif result == 2009: + return False + else: + raise OSError(f"pCloud error occured ({result}) - {resp['error']}: {path}") + # EOF diff --git a/src/pcloud/binaryprotocol.py b/src/pcloud/binaryprotocol.py new file mode 100644 index 0000000..5609f59 --- /dev/null +++ b/src/pcloud/binaryprotocol.py @@ -0,0 +1,279 @@ +import io +import socket +import ssl + +from urllib.parse import urlparse + + +class PCloudBuffer(io.BufferedRWPair): + """Buffer that raises IOError on insufficient bytes for read.""" + + def read(self, size=-1): + result = super().read(size) + if size != -1 and len(result) != size: + raise IOError(f"Requested {size} bytes, got {len(result)}") + return result + + +class PCloudBinaryConnection(object): + """Connection to pcloud.com based on their binary protocol. + + NOTE: .connect() must be called to establish network communication. + """ + + allowed_endpoints = frozenset(["binapi", "bineapi"]) + + def __init__(self, api, persistent_params=None): + """Initializes the binary API. + NOTE: .connect() must be called to establish network communication. + """ + self.api = api + self.server = urlparse(api.endpoint).netloc + self.timeout = 30 + self.socket = None + self.fp = None + if persistent_params is None: + self.persistent_params = {} + else: + self.persistent_params = persistent_params + + def do_get_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + """Sends command and returns result. Blocks if result is needed. + + If '_data' is in params it is the file data + :param method: the pcloud method to call + :param **params: parameters to be passed to the api, except: + - _data is the file data + - _data_progress_callback is the upload callback + - _noresult - if no result should be returned (you must call + .get_result manually) + :returns dictionary returned by the api or None if _noresult is set + """ + data = kw.pop("_data", None) + data_progress_callback = kw.pop("_data_progress_callback", None) + noresult = kw.pop("_noresult", None) + self.send_command_nb( + method, kw, data=data, data_progress_callback=data_progress_callback + ) + if not noresult: + return self.get_result() + + def upload(self, method, files, **kwargs): + if self.api.auth_token: # Password authentication + kwargs["auth"] = self.api.auth_token + elif self.api.access_token: # OAuth2 authentication + kwargs["access_token"] = self.api.access_token + + progress_callback = kwargs.pop("progress_callback", None) + for entry in files: + filename, fd = entry[1] + response = self.do_get_request( + method, + _data=fd, + filename=filename, + _data_progress_callback=progress_callback, + **kwargs, + ) + return response + + def connect(self): + """Establish connection and return self.""" + if self.socket: + raise ValueError("maybe connect called twice?") + context = ssl.create_default_context() + sock = socket.create_connection((self.server, 443), self.timeout) + self.socket = context.wrap_socket(sock, server_hostname=self.server) + raw = socket.SocketIO(self.socket, "rwb") + self.socket._io_refs += 1 + self.fp = PCloudBuffer(raw, raw, 8192) + return self + + def _prepare_send_request(self, method, params, data_len): + req = bytearray() + # actually preallocating would be more efficient but... + + method_name = method.encode("utf-8") + method_len = len(method_name) + assert method_len < 128 + + if data_len is not None: + method_len |= 0x80 + + req.extend(method_len.to_bytes(1, "little")) + if data_len is not None: + req.extend(data_len.to_bytes(8, "little")) + + req.extend(method_name) + req.extend(len(params).to_bytes(1, "little")) + + for key, value in params.items(): + key = key.encode("utf-8") + key_len = len(key) + assert key_len < 64, "Parameter name too long" + + if isinstance(value, int) and value < 0: + # negative numbers are converted to string + value = str(value) + + if isinstance(value, list): + # lists (usually ints) are joined with , + value = ",".join(map(str, value)) + + if isinstance(value, str): + value = value.encode("utf-8") + + if isinstance(value, bytes): + req.extend(key_len.to_bytes(1, "little")) + req.extend(key) + req.extend(len(value).to_bytes(4, "little")) + req.extend(value) + elif isinstance(value, int): + req.extend((key_len | 0x40).to_bytes(1, "little")) + req.extend(key) + req.extend(value.to_bytes(8, "little")) + elif isinstance(value, bool): + req.extend((key_len | 0x80).to_bytes(1, "little")) + req.extend(key) + req.extend(value.to_bytes(1, "little")) + else: + raise ValueError("Unknown value type {0}".format(type(value))) + + return req + + def _send_raw_data(self, data, data_len, progress_callback): + """Sends data at the end of send_command.""" + if isinstance(data, io.IOBase): + written_bytes, to_write = 0, data_len + while data_len > 0: + to_write = min(data_len, 8192) + if to_write != self.fp.write(data.read(to_write)): + raise IOError( + "Mismatch between bytes written and supplied data length" + ) + data_len -= to_write + if progress_callback: + progress_callback(to_write) + else: + written_bytes = self.fp.write(data) + if written_bytes != data_len: + raise IOError("Mismatch between bytes written and supplied data length") + + def _determine_data_len(self, data, data_len=None): + if data is None: + data_len = None + elif data_len is None: # and data is not None + data_len = getattr(data, "__len__", lambda: None)() + if data_len is None: + if isinstance(data, io.IOBase) and data.seekable(): + pos = data.tell() + data_len = data.seek(0, io.SEEK_END) - pos + data.seek(pos, io.SEEK_SET) + if data_len is None: + raise ValueError("Unable to determine data length") + return data_len + + def send_command_nb( + self, method, params, data=None, data_len=None, data_progress_callback=None + ): + """Send command without blocking. + + NOTE: params is updated with self.persistent_params + + :param data_len: if not None should be consistent with data. + :param data_progress_callback: called only for data which is io.IOBase + """ + data_len = self._determine_data_len(data, data_len) + + params.update(self.persistent_params) + req = self._prepare_send_request(method, params, data_len) + assert len(req) < 65536, "Request too long {0}".format(len(req)) + self.fp.write(len(req).to_bytes(2, "little")) + self.fp.write(req) + + if data is not None: + self._send_raw_data(data, data_len, data_progress_callback) + + self.fp.flush() + + def get_result(self): + """ Return the result from a call to the pcloud API. + """ + self.fp.read(4) + return self._read_object(strings=dict()) + + def _read_object(self, strings): + obj_type = self.fp.read(1)[0] + if (obj_type <= 3) or (100 <= obj_type <= 149): + # new string + if 100 <= obj_type: + str_len = obj_type - 100 + else: + str_len = int.from_bytes(self.fp.read(obj_type + 1), "little") + string = self.fp.read(str_len).decode("utf-8") + strings[len(strings)] = string + return string + elif 4 <= obj_type <= 7: + # existing string, long index + return strings[int.from_bytes(self.fp.read(obj_type - 3), "little")] + elif 8 <= obj_type <= 15: + # int + return int.from_bytes(self.fp.read(obj_type - 7), "little") + elif obj_type == 16: + # hash + result = {} + while self.fp.peek(1)[0] != 255: + key = self._read_object(strings) + result[key] = self._read_object(strings) + self.fp.read(1) # consume byte 255 + if "data" in result: + return self.read_data(result.get("data")) + return result + elif obj_type == 17: + # list + result = [] + while self.fp.peek(1)[0] != 255: + result.append(self._read_object(strings)) + self.fp.read(1) # consume byte 255 + return result + elif obj_type == 18: + return False + elif obj_type == 19: + return True + elif obj_type == 20: + # data, return data_length + # be sure to consume the data + return int.from_bytes(self.fp.read(8), "little") + elif 150 <= obj_type <= 199: + # existing string, short index + return strings[obj_type - 150] + elif 200 <= obj_type <= 219: + # int, inline + return obj_type - 200 + # nothing matched + raise ValueError("Unknown value returned: {0}".format(obj_type)) + + def read_data(self, data_len): + return self.fp.read(data_len) + + def get_data_stream(self): + """Returns raw stream, from the socket. + + NOTE: Be careful with this file + NOTE: Be sure to consume exactly data_len bytes. + """ + return self.fp + + def write_data(self, writer, data_len, progress_callback=None): + """Write data from response. + + NOTE: Be sure to consume all of it. + """ + while data_len > 0: + to_write = min(8192, data_len) + assert to_write == writer.write(self.fp.read(to_write)) + data_len -= to_write + if progress_callback: + progress_callback(to_write) + + def close(self): + self.socket.close() diff --git a/src/pcloud/dummyprotocol.py b/src/pcloud/dummyprotocol.py new file mode 100644 index 0000000..6b05afc --- /dev/null +++ b/src/pcloud/dummyprotocol.py @@ -0,0 +1,36 @@ +import requests + +from pcloud.utils import log +from requests_toolbelt.multipart.encoder import MultipartEncoder +from pcloud.jsonprotocol import PCloudJSONConnection + + +class NoOpSession(object): + kwargs = {} + + def get(self, url, **kwargs): + self.kwargs = kwargs + self.kwargs["url"] = url + return self + + def json(self): + return self.kwargs + + +class PCloudDummyConnection(PCloudJSONConnection): + """Connection to pcloud.com based on their JSON protocol.""" + + allowed_endpoints = frozenset(["test"]) + + def __init__(self, api): + """Connect to pcloud API based on their JSON protocol.""" + self.session = NoOpSession() + self.api = api + + def do_get_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + if "noop" in kw: + kw.pop("noop") + params = {"params": kw, "url": self.api.endpoint + method} + return params + else: + return super().do_get_request(method, authenticate, json, endpoint, **kw) diff --git a/src/pcloud/jsonprotocol.py b/src/pcloud/jsonprotocol.py new file mode 100644 index 0000000..663a712 --- /dev/null +++ b/src/pcloud/jsonprotocol.py @@ -0,0 +1,56 @@ +import requests + +from pcloud.utils import log +from requests_toolbelt.multipart.encoder import MultipartEncoder + + +class PCloudJSONConnection(object): + """Connection to pcloud.com based on their JSON protocol.""" + + allowed_endpoints = frozenset(["api", "eapi", "nearest"]) + + def __init__(self, api): + """Connect to pcloud API based on their JSON protocol.""" + self.session = requests.Session() + self.api = api + + def connect(self): + return self + + def do_get_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + if authenticate and self.api.auth_token: # Password authentication + params = {"auth": self.api.auth_token} + elif authenticate and self.api.access_token: # OAuth2 authentication + params = {"access_token": self.api.access_token} + else: + params = {} + if endpoint is None: + endpoint = self.api.endpoint + params.update(kw) + log.debug("Doing request to %s%s", endpoint, method) + log.debug("Params: %s", params) + if "use_session" in kw: + get_method = self.session.get + else: + get_method = requests.get + resp = get_method(endpoint + method, params=params, allow_redirects=False) + resp.raise_for_status() + if json: + result = resp.json() + else: + result = resp.content + log.debug("Response: %s", result) + return result + + def upload(self, method, files, **kwargs): + if self.api.auth_token: # Password authentication + kwargs["auth"] = self.api.auth_token + elif self.api.access_token: # OAuth2 authentication + kwargs["access_token"] = self.api.access_token + fields = list(kwargs.items()) + fields.extend(files) + m = MultipartEncoder(fields=fields) + resp = requests.post( + self.api.endpoint + method, data=m, headers={"Content-Type": m.content_type} + ) + return resp.json() diff --git a/src/pcloud/protocols.py b/src/pcloud/protocols.py new file mode 100644 index 0000000..b872770 --- /dev/null +++ b/src/pcloud/protocols.py @@ -0,0 +1,39 @@ +from pcloud.dummyprotocol import PCloudDummyConnection +from pcloud.jsonprotocol import PCloudJSONConnection +from pcloud.binaryprotocol import PCloudBinaryConnection + + +class TestProtocol(object): + name = "test" + endpoint = "http://localhost:5023/" + connection = PCloudDummyConnection + + +class JsonAPIProtocol(object): + name = "api" + endpoint = "https://api.pcloud.com/" + connection = PCloudJSONConnection + + +class JsonEAPIProtocol(object): + name = "eapi" + endpoint = "https://eapi.pcloud.com/" + connection = PCloudJSONConnection + + +class BinAPIProtocol(object): + name = "binapi" + endpoint = "https://binapi.pcloud.com/" + connection = PCloudBinaryConnection + + +class BinEAPIProtocol(object): + name = "bineapi" + endpoint = "https://bineapi.pcloud.com/" + connection = PCloudBinaryConnection + + +class NearestProtocol(object): + name = "nearest" + endpoint = "" + connection = PCloudJSONConnection diff --git a/src/pcloud/tests/test_api.py b/src/pcloud/tests/test_api.py index 0e9ccaf..411a15e 100644 --- a/src/pcloud/tests/test_api.py +++ b/src/pcloud/tests/test_api.py @@ -6,6 +6,7 @@ import json import os.path import pytest +import requests class NoOpSession(object): @@ -28,24 +29,31 @@ def get_auth_token(self): self.auth_token = None self.access_token = None else: - return super(DummyPyCloud, self).get_auth_token() + return super().get_auth_token() def __init__(self, username, password, noop=False): if noop: self.noop = True - super(DummyPyCloud, self).__init__(username, password, endpoint="test") + super().__init__(username, password, endpoint="test") if noop: self.session = NoOpSession() + def _do_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + if self.noop: + kw["noop"] = True + return self.connection.do_get_request( + method, authenticate, json, endpoint, **kw + ) + class DummyPCloudFS(PCloudFS): factory = DummyPyCloud def test_getfolderpublink(): - api = DummyPyCloud("john", "doe", noop=True) + pcapi = DummyPyCloud("john", "doe", noop=True) dt = datetime.datetime(2023, 10, 5, 12, 3, 12) - assert api.getfolderpublink(folderid=20, expire=dt) == { + assert pcapi.getfolderpublink(folderid=20, expire=dt) == { "params": {"expire": "2023-10-05T12:03:12", "folderid": 20}, "url": "http://localhost:5023/getfolderpublink", } @@ -114,7 +122,7 @@ def test_getpublinkdownload(self): papi.getpublinkdownload(file=self.noop_dummy_file) def test_server_security(self): - api = DummyPyCloud("", "") - resp = api.session.get(api.endpoint + "../../bogus.sh", params={}) + papi = DummyPyCloud("", "") + resp = requests.get(papi.endpoint + "../../bogus.sh", params={}) assert resp.content == b'{"Error": "Path not found or not accessible!"}' assert resp.status_code == 404 diff --git a/src/pcloud/tests/test_helpers.py b/src/pcloud/tests/test_helpers.py index 625bc43..4ecb3c8 100644 --- a/src/pcloud/tests/test_helpers.py +++ b/src/pcloud/tests/test_helpers.py @@ -1,4 +1,4 @@ -from pcloud.api import to_api_datetime +from pcloud.utils import to_api_datetime import datetime diff --git a/src/pcloud/tests/test_integration.py b/src/pcloud/tests/test_integration.py index f41872d..1ce3a35 100644 --- a/src/pcloud/tests/test_integration.py +++ b/src/pcloud/tests/test_integration.py @@ -3,17 +3,21 @@ import time import zipfile +from fs import opener from io import BytesIO from pathlib import Path from pcloud.api import PyCloud from pcloud.api import O_CREAT +from urllib.parse import quote -@pytest.fixture -def pycloud(): + + +@pytest.fixture(scope="module", params=["eapi", "bineapi"]) +def pycloud(request): username = os.environ.get("PCLOUD_USERNAME") password = os.environ.get("PCLOUD_PASSWORD") - return PyCloud(username, password, endpoint="eapi") + return PyCloud(username, password, endpoint=request.param) folder_for_tests = "integration-test" @@ -81,3 +85,13 @@ def test_listtokens(pycloud): result = pycloud.listtokens() assert result["result"] == 0 assert len(result["tokens"]) > 1 + + +# def testpyfsopener(pycloud): +# username = quote(os.environ.get("PCLOUD_USERNAME")) +# password = quote(os.environ.get("PCLOUD_PASSWORD")) +# pcloud_url = f'pcloud://{username}:{password}/' +# pcloud_url = 'pcloud://itconsense+pytest%40gmail.com:eXOtICf4TH3r/' +# # import pdb; pdb.set_trace() +# with opener.open_fs(pcloud_url) as pcloud_fs: +# assert pcloud_fs.listdir('/') == {} diff --git a/src/pcloud/utils.py b/src/pcloud/utils.py new file mode 100644 index 0000000..636fd85 --- /dev/null +++ b/src/pcloud/utils.py @@ -0,0 +1,23 @@ +import datetime +import logging +import sys + +log = logging.getLogger("pcloud") +log.setLevel(logging.INFO) + +handler = logging.StreamHandler(sys.stderr) +handler.setLevel(logging.INFO) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +log.addHandler(handler) + + +# Helpers +def to_api_datetime(dt): + """Converter to a datetime structure the pCloud API understands + + See https://docs.pcloud.com/structures/datetime.html + """ + if isinstance(dt, datetime.datetime): + return dt.isoformat() + return dt