-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathprogrammation_daemon.py
114 lines (82 loc) · 4.14 KB
/
programmation_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
from datetime import datetime, timedelta
import config_manager
import download_manager
import process_utils
import programmation_persistence_manager
from programmation_class import Programmation
import logging
__cm = config_manager.ConfigManager()
__pu = {}
for queue in __cm.redis_queues:
__pu[queue] = process_utils.ProcessUtils(__cm, queue_name=queue)
__pm = programmation_persistence_manager.ProgrammationPersistenceManager()
programmation_interval = __cm.get_app_params().get('_programmation_interval')
enable_redis = False if __cm.get_app_params().get('_enable_redis') is not True else True
def is_job_to_terminate(job=None):
job_end_date = job.get('job').meta.get('programmation_end_date')
if type(job_end_date) != datetime:
try:
job_end_date = datetime.fromisoformat(job_end_date)
except Exception:
return False
return job_end_date < datetime.now()
def run():
logging.getLogger('programmation').info(f'New iteration : {datetime.now()}')
purged_programmations = __pm.purge_all_past_programmations()
if len(purged_programmations) > 0:
logging.getLogger('programmation').info(f'{len(purged_programmations)} deleted outdated entries')
all_programmations = __pm.get_all_enabled_programmations()
jobs_to_check = []
for __sub_pu in __pu:
jobs_to_check = jobs_to_check + __pu.get(__sub_pu).find_job_with_programmation_end_date()
for job in jobs_to_check:
if job is not None :
if is_job_to_terminate(job=job):
logging.getLogger('programmation').info(f"Programmation {job.get('job').meta.get('programmation_id')} stopped by daemon")
for __sub_pu in __pu:
__pu.get(__sub_pu).terminate_redis_active_download(job.get('id'))
for programmation in all_programmations:
prog = Programmation(programmation=programmation, id=programmation.get('id'))
for __sub_pu in __pu:
found_job = __pu.get(__sub_pu).find_job_by_programmation_id(prog.id)
if found_job is not None:
break
must_be_restarted = prog.must_be_restarted()
if found_job is None:
next_execution = prog.get_next_execution()
effective_duration = must_be_restarted
if effective_duration is None and prog.recording_duration:
effective_duration = prog.recording_duration
if effective_duration is not None and prog.recording_stops_at_end:
programmation_end_date = datetime.now().replace(second=0, microsecond=0) + timedelta(
minutes=effective_duration)
else:
programmation_end_date = None
if next_execution is not None:
will_be_executed = next_execution < datetime.now()
else:
will_be_executed = True
if must_be_restarted is not None or will_be_executed:
if must_be_restarted is not None:
effective_programmation_date = datetime.now()
else:
effective_programmation_date = next_execution
dm = download_manager.DownloadManager(__cm,
prog.url,
prog.presets,
prog.user_token,
programmation_id=prog.id,
programmation_end_date=programmation_end_date,
programmation_date=effective_programmation_date,
programmation=prog.get()
)
if dm.get_api_status_code() != 400:
dm.process_downloads()
if __name__ == '__main__':
__cm.init_logger(file_name='programmation_daemon.log')
if not enable_redis:
logging.getLogger('programmation').warning('Redis disabled, programmation daemon exited')
exit()
run()
time.sleep(programmation_interval)