forked from dbca-wa/borgslave-sync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslave_sync_status.py
472 lines (398 loc) · 14.6 KB
/
slave_sync_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
import os
import hashlib
import json
import logging
import pytz
from datetime import datetime
import time
from jinja2 import Template
from slave_sync_task import ordered_sync_task_type
from slave_sync_env import SYNC_STATUS_PATH,now,DEFAULT_TIMEZONE
logger = logging.getLogger(__name__)
def date_to_str(d):
return d.strftime("%Y-%m-%d %H:%M:%S.%f")
def date_from_str(d_str):
"""
convert a string date to date object;return None if failed
"""
try:
return DEFAULT_TIMEZONE.localize(datetime.strptime(d_str,"%Y-%m-%d %H:%M:%S.%f"))
except:
pass
return None
class SlaveSyncStatus(object):
"""
manage sync status information of a sync job
each sync status is consisted with multiple task status object
"""
@classmethod
def get_bitbucket_status(cls):
if not hasattr(cls,"_bitbucket_status"):
cls._bitbucket_status = SlaveSyncStatus("bitbucket","sync",str(now()))
return cls._bitbucket_status
update = "update"
remove = "remove"
_status_objects = []
_modified = False
def __init__(self,sync_file,action = "update",file_content = None):
if file_content:
#file content is not null, worked in persistent mode
self._status_file = os.path.join(SYNC_STATUS_PATH,sync_file)
self._info = None
if os.path.isfile(self._status_file):
#load the status file
with open(self._status_file,'r') as f:
txt = f.read()
if txt and txt.strip():
self._info = json.loads(txt)
else:
self._info = {}
else:
if not os.path.exists(os.path.dirname(self._status_file)):
#dir does not exist, create it.
os.makedirs(os.path.dirname(self._status_file))
self._info = {}
m = hashlib.md5()
m.update(file_content)
md5_hash = m.hexdigest()
if self._info.get('md5',None) != md5_hash or self._info.get('action',None) != action:
self._info.clear()
self._info = {"file":sync_file}
self._info['md5'] = md5_hash
self._info['action'] = action
self._persistent = True
else:
#file content is null, worked in non persistent mode
self._persistent = False
self._info = {'action':action}
self._info = {'file':sync_file}
if "tasks" in self._info:
self._previous_task_status = self._info.pop("tasks")
else:
self._previous_task_status = {}
self._info['tasks'] = {}
SlaveSyncStatus._status_objects.append(self)
@property
def file(self):
return s._info["file"]
@property
def tasks(self):
try:
return len(self._info['tasks'])
except:
return 0
def get_task_status(self,name):
try:
return self._info['tasks'][name]
except:
self._info['tasks'][name] = SlaveSyncTaskStatus(self._previous_task_status.get(name,{}))
return self._info['tasks'][name]
def set_task_status(self,name,task_status):
if task_status:
if isinstance(task_status,SlaveSyncTaskStatus):
self._info['tasks'][name] = task_status
else:
self._info['tasks'][name] = SlaveSyncTaskStatus(task_status)
@property
def is_succeed(self):
return all([s.is_succeed for s in self._info.get("tasks",{}).itervalues()])
@property
def is_failed(self):
return any([s.is_failed for s in self._info.get("tasks",{}).itervalues()])
@property
def is_not_succeed(self):
return any([s.is_not_succeed for s in self._info.get("tasks",{}).itervalues()])
@staticmethod
def all_succeed():
"""
Return true, if all files are processed successfully; otherwise return False
"""
return all([s.is_succeed for s in SlaveSyncStatus._status_objects])
@staticmethod
def save_all():
"""
save all status object into file system
"""
for s in SlaveSyncStatus._status_objects:
s.save()
@staticmethod
def get_failed_status_objects():
"""
Return all status object for failed files.
"""
return [s for s in SlaveSyncStatus._status_objects if s.is_not_succeed]
@property
def is_processed(self):
return any([s.is_processed for s in self._info["tasks"].values()])
@property
def file(self):
"""
Return the associated file
"""
return self._info.get('file','')
header_template = Template("""
Sync File : {{task.file}}
Last Process Time : {{task.last_process_time}}
Succeed : {{is_succeed}}
""")
header_template_bitbucket = Template("""
Synchronize file from repository
Succeed : {{is_succeed}}
""")
task_header_template_1 = Template("""
Task {{task_index}} : {{task_type}}
Succeed : {{task_status.task_status}}
Process Time : {{task_status.last_process_time}}
""")
task_header_template_2 = Template("""
Task {{task_index}} : {{task_type}}
Succeed : {{task_status.task_status}}
Process Time : {{task_status.last_process_time}}
{% for key,value in task_status["messages"].iteritems() -%}
{{key|capitalize}} : {{value}}
{% endfor -%}
""")
stage_template_1 = Template("""
Stage : {{stage_name}}
Succeed : {{stage_status.status}}
Process Time : {{stage_status.last_process_time}}
""")
stage_template_2 = Template("""
Stage : {{stage_name}}
Succeed : {{stage_status.status}}
Process Time : {{stage_status.last_process_time}}
{% for key,value in stage_status["messages"].iteritems() -%}
{{key|capitalize}} : {{value}}
{% endfor -%}
""")
def __str__(self):
message = self.header_template.render({"task":self._info,"is_succeed":self.is_succeed})
task_index = 0
if self == SlaveSyncStatus.get_bitbucket_status():
message = self.header_template_bitbucket.render({"task":self._info,"is_succeed":self.is_succeed})
for task_type,task_status in self._info["tasks"].iteritems():
task_index += 1
message += os.linesep + (self.task_header_template_2 if task_status.has_message() else self.task_header_template_1 ).render({"task_index":task_index,"task_type":task_type,"task_status":task_status})
for stage_name,stage_status in self._info["tasks"][task_type].get("stages",{}).iteritems():
message += os.linesep + (self.stage_template_2 if stage_status.has_message() else self.stage_template_1 ).render({"stage_name":stage_name, "stage_status":stage_status})
else:
message = self.header_template.render({"task":self._info,"is_succeed":self.is_succeed})
for task_types in [["load_metadata","prepare"],ordered_sync_task_type]:
for task_type in task_types:
if task_type not in self._info["tasks"]: continue
task_status = self._info["tasks"][task_type]
task_index += 1
message += os.linesep + (self.task_header_template_2 if task_status.has_message() else self.task_header_template_1 ).render({"task_index":task_index,"task_type":task_type,"task_status":task_status})
for stage_name,stage_status in self._info["tasks"][task_type].get("stages",{}).iteritems():
message += os.linesep + (self.stage_template_2 if "messages" in stage_status else self.stage_template_1 ).render({"stage_name":stage_name, "stage_status":stage_status})
return message
def save(self):
"""
save the status to file
"""
if self._persistent and self.is_processed:
self._info['status'] = self.is_succeed
with open(self._status_file,'w') as f:
f.write(json.dumps(self._info))
@property
def last_process_time(self):
if "last_process_time" in self._info:
return date_from_str(self._info["last_process_time"])
else:
return None
@last_process_time.setter
def last_process_time(self,d):
self._info["last_process_time"] = date_to_str(d)
class SlaveSyncTaskStatus(dict):
"""
status object for a task.
"""
_modified = False
def __init__(self,task_status={}):
super(SlaveSyncTaskStatus,self).__init__(task_status)
#remove failed stages
for s in self.get("stages",{}).keys():
if self.is_stage_not_succeed(s):
del self["stages"][s]
#if no succeed stages and current task is not succeed, clear all task status data.
if self.is_not_succeed:
#current task is not succeed.
stages = self.get("stages")
self.clear()
if stages:
self["stages"] = stages
#init a messages dictionary object
if "messages" not in self :
self["messages"] = {}
#status of the task, succeed job can have failed failed task
@property
def task_status(self):
return self["task_status"] if "task_status" in self else self.get("status",False)
def task_failed(self):
self["task_status"] = False
def clean_task_failed(self):
if "task_status" in self :
del self["task_status"]
@property
def is_processed(self):
return self._modified
@property
def is_succeed(self):
"""
Return true, if the file is processed successfully; otherwise return False
"""
return self.get('status',False)
@property
def is_failed(self):
"""
Return true, if the file is processed failed; otherwise return False
"""
return self.get('status',None) == False
@property
def is_not_succeed(self):
"""
Return true, if the file is processed failed or not executed before; otherwise return False
"""
return not self.is_succeed
@property
def last_process_time(self):
if "last_process_time" in self:
return date_from_str(self["last_process_time"])
else:
return None
@last_process_time.setter
def last_process_time(self,d):
self["last_process_time"] = date_to_str(d)
@property
def shared(self):
return self.get("shared",False)
@shared.setter
def shared(self,value):
self["shared"] = bool(value)
def failed(self):
"""
Set a flag indicate this file is processed failed
"""
self['status'] = False
self._modified = True
def succeed(self):
"""
Set a flag indicate this file is processed successfully
"""
self['status'] = True
self._modified = True
def get_message(self,key,stage=None):
"""
get the message with key
"""
if stage:
return self.get_stage_message(stage,key)
try:
return self["messages"][key]
except:
return ""
def has_message(self,stage=None):
if stage:
return self.has_stage_message(stage)
else:
return True if self["messages"] else False
def set_message(self,key,message,stage=None):
"""
set a message with key
"""
if stage:
self.set_stage_message(stage,key,message)
return
self["messages"][key] = message
self._modified = True
def del_message(self,key,stage=None):
"""
delete a message
"""
if stage:
self.del_stage_message(stage,key)
return
try:
del self["messages"][key]
except:
pass
self._modified = True
@property
def all_stages_succeed(self):
"""
Return True if all stages succeed, or no stages
"""
return all([s.get('status',False) for s in self.get("stages",{}).values()])
@property
def stages(self):
return self._info.get("stages",{})
def is_stage_succeed(self,stage):
"""
Return true, if the stage is processed successfully; otherwise return False
Return false, if it does not executed.
"""
try:
return self["stages"][stage]['status']
except:
return False
def is_stage_not_succeed(self,stage):
"""
Return true, if the file is processed failed or not executed before; otherwise return False
"""
return not self.is_stage_succeed(stage)
def stage_failed(self,stage):
"""
Set a flag indicate this stage is processed failed
"""
if "stages" not in self:
self["stages"] = {}
if stage not in self["stages"]:
self["stages"][stage] = {}
self["stages"][stage]['status'] = False
self["stages"][stage]['last_process_time'] = date_to_str(now())
self._modified = True
def stage_succeed(self,stage):
"""
Set a flag indicate this state is processed successfully
"""
if "stages" not in self:
self["stages"] = {}
if stage not in self["stages"]:
self["stages"][stage] = {}
self["stages"][stage]['status'] = True
self["stages"][stage]['last_process_time'] = date_to_str(now())
self._modified = True
def has_stage_message(self,stage):
try:
return True if self["stages"][stage]["messages"] else None
except:
return False
def get_stage_message(self,stage,key):
"""
get the stage message with key
"""
try:
return self["stages"][stage]["messages"][key]
except:
return ''
def set_stage_message(self,stage,key,message):
"""
set a stage message with key
"""
if "stages" not in self:
self["stages"] = {}
if stage not in self["stages"]:
self["stages"][stage] = {}
if "messages" not in self["stages"][stage]:
self["stages"][stage]["messages"] = {}
self["stages"][stage]["messages"][key] = message
self._modified = True
def del_stage_message(self,stage,key):
"""
delete a stage message
"""
try:
del self["stages"][stage]["messages"][key]
except:
pass
self._modified = True