-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile.py
executable file
·134 lines (116 loc) · 3.79 KB
/
file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
$URL: svn+ssh://svn.mems-exchange.org/repos/trunk/durus/file.py $
$Id: file.py 31249 2008-10-28 21:49:21Z dbinger $
"""
import os, os.path
from os.path import exists
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
if os.name == 'nt':
import win32con, win32file, pywintypes # http://sf.net/projects/pywin32/
else:
import fcntl
class File (object):
"""
A file wrapper that smooths over some platform-specific
operations.
"""
def __init__(self, name=None, readonly=False, **kwargs):
if name is None:
self.file = NamedTemporaryFile(**kwargs)
else:
if exists(name):
if readonly:
self.file = open(name, 'rb')
else:
self.file = open(name, 'r+b')
else:
if readonly:
raise OSError('No "%s" found.' % name)
self.file = open(name, 'w+b')
if readonly:
assert self.is_readonly()
self.has_lock = False
def get_name(self):
return self.file.name
def is_temporary(self):
return isinstance(self.file, _TemporaryFileWrapper)
def is_readonly(self):
return self.file.mode == 'rb'
def seek(self, n, whence=0):
self.file.seek(n, whence)
if whence == 0:
assert self.file.tell() == n
def seek_end(self):
self.file.seek(0, 2)
def read(self, n=None):
if n is None:
return self.file.read()
else:
return self.file.read(n)
def tell(self):
return self.file.tell()
def stat(self):
return os.stat(self.get_name())
def __len__(self):
return self.stat().st_size
def rename(self, name):
old_name = self.get_name()
if name == old_name:
return
assert not self.is_temporary()
self.obtain_lock()
self.close()
if exists(name):
os.unlink(name)
os.rename(old_name, name)
self.file = open(name, 'r+b')
self.obtain_lock()
def obtain_lock(self):
"""
Make sure that we have an exclusive lock on self.file before
doing a write.
If the lock is not available, raise an exception.
"""
assert not self.is_readonly()
if not self.has_lock:
if os.name == 'nt':
try:
win32file.LockFileEx(
win32file._get_osfhandle(self.file.fileno()),
(win32con.LOCKFILE_EXCLUSIVE_LOCK |
win32con.LOCKFILE_FAIL_IMMEDIATELY),
0, -65536, pywintypes.OVERLAPPED())
except pywintypes.error:
raise IOError("Unable to obtain lock")
else:
fcntl.flock(self.file, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.has_lock = True
def release_lock(self):
"""
Make sure that we do not retain an exclusive lock on self.file.
"""
if self.has_lock:
if os.name == 'nt':
win32file.UnlockFileEx(
win32file._get_osfhandle(self.file.fileno()),
0, -65536, pywintypes.OVERLAPPED())
else:
fcntl.flock(self.file, fcntl.LOCK_UN)
self.has_lock = False
def write(self, s):
self.obtain_lock()
self.file.write(s)
if os.name == 'nt':
# This flush helps the file knows where it ends.
self.file.flush()
def truncate(self):
self.obtain_lock()
self.file.truncate()
def close(self):
self.release_lock()
self.file.close()
def flush(self):
self.file.flush()
def fsync(self):
if hasattr(os, 'fsync'):
os.fsync(self.file)