-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.py
359 lines (284 loc) · 11.4 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
"""Storage service: manage and allocate storage """
import time
import random
from threading import Timer
import zlib
from pprint import pformat
from obj import *
from util import *
from service import *
from oodict import *
from io import *
import config
from logging import info, debug
class StorageService(Service):
"""Storage management for chunk store
disk manager:
online
offline
frozen
status
storage allocator:
alloc add one entry in chunks db, if anything fails, remember to free please
free
chunk locator:
search
heartbeat service:
hb
replicator:
replicate chunk?
On startup, the Namenode enters a special state called Safemode. Replication of data blocks
does not occur when the Namenode is in the Safemode state. The Namenode receives
Heartbeat and Blockreport messages from the Datanodes. A Blockreport contains the list of
data blocks that a Datanode is hosting. Each block has a specified minimum number of
replicas. A block is considered safely replicated when the minimum number of replicas of
that data block has checked in with the Namenode. After a configurable percentage of safely
replicated data blocks checks in with the Namenode (plus an additional 30 seconds), the
Namenode exits the Safemode state. It then determines the list of data blocks (if any) that
still have fewer than the specified number of replicas. The Namenode then replicates these
blocks to other Datanodes.
silent time: 30s, for chunk servers to send chunk report again
Chunk location DB is a dict, whose key is (fid, cid), value is a dict of version and locations.
ex. (1, 2): {'v': 2, 'l': set([dev1, dev2])}
"""
def __init__(self):
self._db = FileDB(config.home)
# Deleted chunks, indexed by device id, so you can easily get all
# deleted chunks on one device. Each dev has a list of chunks.
# This should be saved on disk too, to survive through crash
self._deleted_chunks = self._db.load('deleted_chunks', {})
# Enter silent mode for 30s, wait for chunk servers to send chunk
# reports
info('Entering Silent mode')
self.silent_mode = True
Timer(30, self.enter_normal_mode).start()
self._devices = {}
self._chunks_map = {}
self._nodes = {} # Nodes alive info
def enter_normal_mode(self):
self.silent_mode = False
info('Entering normal mode')
def _flush(self):
# Save self._delete_chunks
self._db.store(self._deleted_chunks, 'deleted_chunks')
def _writeable(self, did):
"""Check if a device is writeable"""
return self._available(did) and self._devices[did].conf.mode != 'frozen'
def _available(self, did):
"""Check if a device is available"""
if did not in self._devices:
return False
return self._is_alive(self._devices[did].addr)
def _is_alive(self, host):
"""Check if a host is alive. If we haven't heard of it more than 120
seconds, we consider it as dead.
We may use hostid aka hid here.
"""
return host in self._nodes and time.time() - self._nodes[host].update_time < 120
def _free_enough(self, did, size):
"""Check if a device has larger space than size bytes"""
conf = self._devices[did].conf
return conf.size - conf.used > size
def alloc(self, req):
"""Allocate spaces
@size
@n
return locations, which is a list of tuple (did, addr)
"""
if self.silent_mode:
self._error('silent mode, please try later')
debug('Alloc %s bytes on %d devices', req.size, req.n)
# Alloc algorithm, better have a list sorted by free space
value = []
dids = self._devices.keys()
random.shuffle(dids)
found = 0
for did in dids:
debug('%s alive %s available %s writable %s free %s', did, self._is_alive(self._devices[did].addr), self._available(did), self._writeable(did), self._free_enough(did, req.size))
if self._writeable(did) and self._free_enough(did, req.size):
value.append((did, self._devices[did].addr))
found += 1
if found >= req.n:
break
if not found:
self._error('no dev avaiable') # Find nothing, this should not happen
debug('%s', value)
return value
def _delete_chunks_map_entry(self, chunk):
key = chunk.fid, chunk.cid
if key not in self._chunks_map:
return
# Haven't check version
for did in self._chunks_map[key]['l']:
self._deleted_chunks.setdefault(did, []).append(chunk)
del self._chunks_map[key]
def _insert_chunks_map_entry(self, chunk, did):
"""Add chunk replica info to location map
return True if inserted, False if stale """
key = chunk.fid, chunk.cid
if key not in self._chunks_map:
self._chunks_map[key] = {'v': chunk.version, 'l': set([did])}
else:
old_version = self._chunks_map[key]['v']
if chunk.version < old_version:
return False # Stale
elif chunk.version == old_version:
self._chunks_map[key]['l'].add(did)
else:
# Free old ones
old = Chunk()
old.fid, old.cid = key
old.version = old_version
for did in self._chunks_map[key]['l']:
self._deleted_chunks.setdefault(did, []).append(old)
# Save new
self._chunks_map[key]['v'] = chunk.version
self._chunks_map[key]['l'] = set([did])
return True
def publish(self, req):
"""Publish a chunk replica, this is for single newly created chunk. If
you want to publish all chunks on a device, use 'online' method please.
@chunk
@dids
"""
if self.silent_mode:
self._error('silent mode, please try later')
chunk = Chunk(req.chunk) # Make sure we have a chunk
stale = []
for did in req.dids:
if not self._insert_chunks_map_entry(chunk, did):
stale.append(did)
self._flush()
return {'stale': stale}
def locate(self, req):
key = req.fid, req.cid
if key not in self._chunks_map:
self._error('No replica found')
result = OODict()
result.version = self._chunks_map[key]['v']
result.locations = []
# Found avaiable devices
for did in self._chunks_map[key]['l']:
if self._available(did):
result.locations.append((did, self._devices[did].addr, self._devices[did].conf.path))
return result
def search(self, req):
"""Search chunks locations
@chunks dict of chunks, indexed by cid
return dict of cid: locations.
locations is list of tuple (did, addr)
"""
if self.silent_mode:
self._error('silent mode, please try later')
value = {}
for cid in req.chunks.keys():
chunk = Chunk(req.chunks[cid])
key = chunk.fid, chunk.cid
if key not in self._chunks_map:
continue # No replicas
version = self._chunks_map[key]['v']
if chunk.version != version:
debug('version mismatch: want %s, got %s', chunk, version)
continue # Version error
# Found avaiable devices
for did in self._chunks_map[key]['l']:
if self._available(did):
value.setdefault(cid, []).append((did, self._devices[did].addr))
return value
def free(self, req):
"""Delete chunk entry and all replicas
@deleted chunk list
"""
for chunk in req.deleted:
chunk = Chunk(chunk) # Translate dict to Chunk object
self._delete_chunks_map_entry(chunk)
self._flush()
return 'ok'
def _update_host(self, host):
"""Update host aliveness, add device entry if given"""
if host not in self._nodes:
self._nodes[host] = OODict({'devs': set()})
self._nodes[host].update_time = time.time()
def _update_device(self, host, did):
self._update_host(host)
self._nodes[host].devs.add(did)
def heartbeat(self, req):
"""Update nodes healthy
@addr
@confs configs of changed devices, optional
return deleted chunks if have
"""
rv = OODict()
rv.needreport = False
# First time connect, please send your chunkreports to me
if req.addr not in self._nodes: # Or not alive?
rv.needreport = True
self._update_host(req.addr)
# Update changed devices
for did, conf in req.confs.items():
self._devices.setdefault(did, OODict()).conf = conf
# See whether there are chunks deleted by meta node, belonging to this
# chunk server
deleted = {}
dids_not_exist = []
for did in self._nodes[req.addr].devs:
if did in self._deleted_chunks:
deleted[did] = self._deleted_chunks[did]
del self._deleted_chunks[did]
# Clean device list for this host
if did not in self._devices:
dids_not_exist.append(did)
for did in dids_not_exist:
self._nodes[req.addr].devs.remove(did)
rv.deleted_chunks = deleted
return rv
def online(self, req):
"""Online device
@conf device config
@addr chunk server address
@report chunk report dict
"""
did = req.conf.id
if did in self._devices:
self._error('already online')
self._devices.setdefault(did, OODict()).conf = req.conf
self._devices[did].addr = req.addr
report = eval(zlib.decompress(req.payload))
for key in report.keys():
chunk = Chunk(report[key])
self._insert_chunks_map_entry(chunk, did)
self._update_device(req.addr, did)
# Flush
self._flush()
info('Device %s online', did)
return 'ok'
def _get_device(self, did):
if did not in self._devices:
self._error('not online')
return self._devices[did]
def offline(self, req):
"""Offline device
@did device id
@replicate bool, whether to replicate
"""
dev = self._get_device(req.did)
del self._devices[req.did]
# Update location infos for chunks on it
# chunk -> dev ok
# dev -> chunks how? should we iterate all the chunks?
# belonging to one device?
#if replicate
info('Device %s offline', req.did)
return 'ok'
def frozen(self, req):
"""Frozen device
@did
"""
dev = self._get_device(req.did)
# Should we wait for writings to be finished?
self._devices[req.did].conf.mode = 'frozen'
info('Device %s frozen', req.did)
return 'ok'
def status(self, req):
"""Get status of the storage system"""
return {'devices': self._devices, 'nodes': self._nodes}