-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathpanapi.py
210 lines (182 loc) · 6.9 KB
/
panapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/env python
# encoding: utf-8
# PYTHON_ARGCOMPLETE_OK
# from __future__ imports must occur at the beginning of the file
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
import os
import io
from . import const
from . import gvar
from .util import (
pr, perr, formatex,
ls_type, ls_time, get_pcs_path)
from .bypy import ByPy
# put all xslidian's bduss extensions here, i never tried out them though
class PanAPI(ByPy):
IEBDUSSExpired = -6
# Does https work? if so, we should always use https
#PanAPIUrl = 'http://pan.baidu.com/api/'
PanAPIUrl = 'https://pan.baidu.com/api/'
def __init__(self, **kwargs):
super(PanAPI, self).__init__(**kwargs)
self._bdusspath= self._configdir + os.sep + 'bypy.bduss'
self._bduss = ''
if not self._load_local_bduss():
self.pv("BDUSS not found at '{}'.".format(self._bdusspath))
def _load_local_bduss(self):
try:
with io.open(self._bdusspath, 'rb') as infile:
self._bduss = infile.readline().strip()
self.pd("BDUSS loaded: {}".format(self._bduss))
self._cookies = {'BDUSS': self._bduss}
return True
except IOError as ex:
self.pd("Error loading BDUSS:")
self.pd(formatex(ex))
return False
# overriding
def _handle_more_response_error(self, r, sc, ec, act, actargs):
result = const.ERequestFailed
# user not exists
if ec == 31045: # and sc == 403:
self.pd("BDUSS has expired")
result = PanAPI.IEBDUSSExpired
# topath already exists
elif ec == 31196: # and sc == 403:
self.pd("UnzipCopy destination already exists.")
result = act(r, actargs)
# file copy failed
elif ec == 31197: # and sc == 503:
self.pd("File copy failed")
result = act(r, actargs)
# file size exceeds limit
elif ec == 31199: # and sc == 403:
result = act(r, actargs)
return result
def unzip(self, remotepath, subpath = '/', start = 0, limit = 1000):
''' Usage: unzip <remotepath> [<subpath> [<start> [<limit>]]]'''
rpath = get_pcs_path(remotepath)
return self._panapi_unzip_file(rpath, subpath, start, limit);
def _panapi_unzip_file_act(self, r, args):
j = r.json()
self.pd("Unzip response: {}".format(j))
if j['errno'] == 0:
if 'time' in j:
perr("Extraction not completed yet: '{}'...".format(args['path']))
return const.ERequestFailed
elif 'list' in j:
for e in j['list']:
pr("{}\t{}\t{}".format(ls_type(e['isdir'] == 1), e['file_name'], e['size']))
return const.ENoError
def _panapi_unzip_file(self, rpath, subpath, start, limit):
pars = {
'path' : rpath,
'start' : start,
'limit' : limit,
'subpath' : '/' + subpath.strip('/') }
self.pd("Unzip request: {}".format(pars))
return self._get(PanAPI.PanAPIUrl + 'unzip?app_id=250528',
pars, self._panapi_unzip_file_act, cookies = self._cookies, actargs = pars )
def extract(self, remotepath, subpath, saveaspath = None):
''' Usage: extract <remotepath> <subpath> [<saveaspath>]'''
rpath = get_pcs_path(remotepath)
topath = get_pcs_path(saveaspath)
if not saveaspath:
topath = os.path.dirname(rpath) + '/' + subpath
return self._panapi_unzipcopy_file(rpath, subpath, topath)
def _panapi_unzipcopy_file_act(self, r, args):
j = r.json()
self.pd("UnzipCopy response: {}".format(j))
if 'path' in j:
self.pv("Remote extract: '{}#{}' =xx=> '{}' OK.".format(args['path'], args['subpath'], j['path']))
return const.ENoError
elif 'error_code' in j:
if j['error_code'] == 31196:
perr("Remote extract: '{}#{}' =xx=> '{}' FAILED. File already exists.".format(args['path'], args['subpath'], args['topath']))
subresult = self._delete(args['topath'])
if subresult == const.ENoError:
return self._panapi_unzipcopy_file(args['path'], args['subpath'], args['topath'])
else:
return const.ERequestFailed
elif j['error_code'] == 31199:
perr("Remote extract: '{}#{}' =xx=> '{}' FAILED. File too large.".format(args['path'], args['subpath'], args['topath']))
return const.EMaxRetry
else:
perr("Remote extract: '{}#{}' =xx=> '{}' FAILED. Unknown error {}: {}.".format(args['path'], args['subpath'], args['topath'], j['error_code'], j['error_msg']))
return const.EMaxRetry
def _panapi_unzipcopy_file(self, rpath, subpath, topath):
pars = {
'app_id' : 250528,
'method' : 'unzipcopy',
'path' : rpath,
'subpath' : '/' + subpath.strip('/'),
'topath' : topath }
self.pd("UnzipCopy request: {}".format(pars))
return self._get(gvar.pcsurl + 'file',
pars, self._panapi_unzipcopy_file_act, addtoken = False, cookies = self._cookies, actargs = pars )
def revision(self, remotepath):
''' Usage: revision <remotepath> '''
rpath = get_pcs_path(remotepath)
return self._panapi_revision_list(rpath)
def history(self, remotepath):
''' Usage: history <remotepath> '''
return self.revision(remotepath)
def _panapi_revision_list_act(self, r, args):
j = r.json()
self.pd("RevisionList response: {}".format(j))
if j['errno'] == 0:
if 'list' in j:
for e in j['list']:
pr("{}\t{}\t{}".format(e['revision'], e['size'], ls_time(e['revision'] // 1e6)))
return const.ENoError
if j['errno'] == -6: # invalid BDUSS
pr("BDUSS has expired.")
return PanAPI.IEBDUSSExpired
if j['errno'] == -9:
pr("File '{}' not exists.".format(args['path']))
return const.EFileNotFound
return const.ENoError
def _panapi_revision_list(self, rpath):
pars = {
'path' : rpath,
'desc' : 1 }
self.pd("RevisionList request: {}".format(pars))
return self._post(PanAPI.PanAPIUrl + 'revision/list?app_id=250528',
{}, self._panapi_revision_list_act, pars, data = pars, cookies = self._cookies )
def revert(self, remotepath, revision, dir = None):
''' Usage: revert <remotepath> revisionid [dir]'''
rpath = get_pcs_path(remotepath)
dir = get_pcs_path(dir)
if not dir:
dir = os.path.dirname(rpath)
return self._panapi_revision_revert(rpath, revision, dir)
def _panapi_revision_revert_act(self, r, args):
j = r.json()
self.pd("RevisionRevert response: {}".format(j))
if j['errno'] == 0:
self.pv("Remote revert: '{}#{}' =rr=> '{}' OK.".format(args['path'], args['revision'], j['path']))
return const.ENoError
if j['errno'] == -6: # invalid BDUSS
pr("BDUSS has expired.")
return PanAPI.IEBDUSSExpired
if j['errno'] == -9:
pr("File '{}' not exists.".format(args['path']))
return const.EFileNotFound
if j['errno'] == 10:
pr("Reverting '{}' in process...".format(args['path']))
return const.ERequestFailed
return const.ENoError
def _panapi_revision_revert(self, rpath, revision, dir = None):
if not dir:
dir = os.path.dirname(rpath)
pars = {
'revision' : revision,
'path' : rpath,
'type' : 2,
'dir' : dir }
self.pd("RevisionRevert request: {}".format(pars))
return self._post(PanAPI.PanAPIUrl + 'revision/revert?app_id=250528',
{}, self._panapi_revision_revert_act, pars, data = pars, cookies = self._cookies )
# vim: tabstop=4 noexpandtab shiftwidth=4 softtabstop=4 ff=unix fileencoding=utf-8