forked from clee704/torrentcheck
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtorrentcheck.py
executable file
·191 lines (163 loc) · 5.54 KB
/
torrentcheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#! /usr/bin/env python
import argparse
import hashlib
import itertools
import os
import bencode
parser = argparse.ArgumentParser(
description='Check the integrity of torrent downloads.')
parser.add_argument('directory', help='download directory')
parser.add_argument('torrent', nargs='*', help='torrent file')
parser.add_argument(
'--delete',
action='store_true',
help='delete files that are not found in the torrent file; '
'do nothing if the torrent file has only a single file')
parser.add_argument(
'--list-delete',
action='store_true',
help='list the files that would be deleted if --delete was set')
parser.add_argument(
'--debug',
action='store_true',
help='print traceback and exit when an exception occurs')
def main():
try:
args = parser.parse_args()
if not os.path.isdir(args.directory):
print '{} is not a directory'.format(args.directory)
return 2
if args.delete or args.list_delete:
cmd = delete_cmd
else:
cmd = verify_cmd
all_ok = True
for torrent_path in args.torrent:
with open(torrent_path, 'rb') as f:
torrent = bencode.bdecode(f.read())
info = torrent['info']
try:
ok = cmd(info, torrent_path, args)
except Exception:
ok = False
print '{}: ERROR'.format(torrent_path)
if args.debug:
raise
all_ok = all_ok and ok
return 0 if all_ok else 1
except KeyboardInterrupt:
return 1
def delete_cmd(info, torrent_path, args):
if 'files' not in info:
return True
base_path = os.path.join(args.directory, info['name'])
paths = set(os.path.join(base_path, *f['path']) for f in info['files'])
count = 0
for dirpath, dirnames, filenames in os.walk(base_path):
for filename in filenames:
p = os.path.join(dirpath, filename)
if p not in paths:
count += 1
if args.list_delete:
print '{}: {}'.format(torrent_path, p)
if args.delete:
os.unlink(p)
if count == 0:
print '{}: OK'.format(torrent_path)
else:
verb = 'deleted' if args.delete else 'found'
print '{}: {} extra file(s) {}'.format(torrent_path, count, verb)
return True
def verify_cmd(info, torrent_path, args):
ok = verify(info, args.directory)
if ok:
print '{}: OK'.format(torrent_path)
else:
print '{}: FAILED'.format(torrent_path)
return ok
def verify(info, directory_path):
"""Return True if the checksum values in the torrent file match the
computed checksum values of downloaded file(s) in the directory and if
each file has the correct length as specified in the torrent file.
"""
base_path = os.path.join(directory_path, info['name'])
if 'length' in info:
if os.stat(base_path).st_size != info['length']:
return False
getfile = lambda: open(base_path, 'rb')
else:
assert 'files' in info, 'invalid torrent file'
for f in info['files']:
p = os.path.join(base_path, *f['path'])
if os.stat(p).st_size != f['length']:
return False
getfile = lambda: ConcatenatedFile(base_path, info['files'])
with getfile() as f:
return compare_checksum(info, f)
def compare_checksum(info, f):
"""Return True if the checksum values in the info dictionary match the
computed checksum values of file content.
"""
pieces = info['pieces']
def getchunks(f, size):
while True:
chunk = f.read(size)
if chunk == '':
break
yield hashlib.sha1(chunk).digest()
calc = getchunks(f, info['piece length'])
ref = (pieces[i:i + 20] for i in xrange(0, len(pieces), 20))
for expected, actual in itertools.izip(calc, ref):
if expected != actual:
return False
return ensure_empty(calc) and ensure_empty(ref)
def ensure_empty(gen):
"""Return True if the generator is empty. If it is not empty, the first
element is discarded.
"""
try:
next(gen)
return False
except StopIteration:
return True
class ConcatenatedFile(object):
"""A file-like object that acts like a single file whose content is a
concatenation of the specified files. The returned object supports read(),
__enter__() and __exit__().
"""
def __init__(self, base, files):
self._base = base
self._files = files
self._f = EmptyFile()
self._i = -1
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._f.close()
return False
def read(self, size):
if self._i == len(self._files):
return ''
buf = []
count = 0
while True:
chunk = self._f.read(size - count)
count += len(chunk)
buf.append(chunk)
if count < size:
self._i += 1
if self._i == len(self._files):
break
p = os.path.join(self._base, *self._files[self._i]['path'])
self._f.close()
self._f = open(p, 'rb')
else:
break
return ''.join(buf)
class EmptyFile(object):
def read(self, size):
return ''
def close(self):
return
if __name__ == '__main__':
exit(main())