-
Notifications
You must be signed in to change notification settings - Fork 2
/
program_handler.py
163 lines (136 loc) · 7.15 KB
/
program_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json
import socket
import threading
from time import sleep
from datetime import date, datetime, timedelta, time
import dateutil.tz
import pytz
from pytz import timezone
from apscheduler.scheduler import Scheduler
from rootio.config import DefaultConfig
from rootio.radio.models import ScheduledProgram
from sqlalchemy import text
from radio_program import RadioProgram
class ProgramHandler:
def __init__(self, db, radio_station):
self.__db = db
self.__radio_station = radio_station
self.__scheduler = None
self.__scheduled_jobs = None
self.__start_listeners()
self.__radio_station.logger.info("Done initialising ProgramHandler for {0}".format(radio_station.station.name))
def run(self):
self.run_today_schedule()
def __prepare_schedule(self):
self.__load_programs()
self.__scheduler = Scheduler()
self.__scheduled_jobs = dict()
def run_today_schedule(self):
self.__prepare_schedule()
self.__scheduler.start()
self.__schedule_programs()
self.__schedule_next_day_scheduler()
print self.__scheduler.get_jobs()
def stop(self):
self.__stop_program()
# any clean up goes here
# unschedule stuff
def __schedule_next_day_scheduler(self):
#TODO: make this safe for differebt timezones!
base_date = date.today() + timedelta(1,0)
tomorrow_date = datetime.combine(base_date, time())
#add the timezone offset
tomorrow_date = tomorrow_date + timedelta(0, timezone(self.__radio_station.station.timezone).utcoffset(datetime.now()).seconds)
self.__scheduler.add_date_job(getattr(self, 'run_today_schedule'), tomorrow_date) #schedule the scheduler to reload at midnight
def __schedule_programs(self):
for scheduled_program in self.__scheduled_programs:
if not self.__is_program_expired(scheduled_program):
self.__add_scheduled_job(scheduled_program)
self.__radio_station.logger.info(
"Scheduled program {0} for station {1} starting at {2}".format(scheduled_program.program.name,
self.__radio_station.station.name,
scheduled_program.start))
return
def __add_scheduled_job(self, scheduled_program):
program = RadioProgram(self.__db, scheduled_program, self.__radio_station)
scheduled_job = self.__scheduler.add_date_job(getattr(program, 'start'),
self.__get_program_start_time(scheduled_program).replace(
tzinfo=None))
self.__scheduled_jobs[scheduled_program.id] = scheduled_job
def __delete_scheduled_job(self, index):
if index in self.__scheduled_jobs:
self.__scheduler.unschedule_job(self.__scheduled_jobs[index])
del self.__scheduled_jobs[index]
def __stop_program(self):
#self.__running_program.stop()
return
def __run_program(self):
#self.__running_program.run()
return
def __load_programs(self):
self.__scheduled_programs = self.__db.query(ScheduledProgram).filter(
ScheduledProgram.station_id == self.__radio_station.id).filter(text("date(start at TIME ZONE 'UTC') = current_date at TIME ZONE 'UTC'")).filter(
ScheduledProgram.deleted == False).all()
self.__radio_station.logger.info("Loaded programs for {0}".format(self.__radio_station.station.name))
def __load_program(self, id):
return self.__db.query(ScheduledProgram).filter(ScheduledProgram.id == id).first()
def __start_listeners(self):
t = threading.Thread(target=self.__listen_for_scheduling_changes,
args=(DefaultConfig.SCHEDULE_EVENTS_SERVER_IP, DefaultConfig.SCHEDULE_EVENTS_SERVER_PORT))
t.start()
def __listen_for_scheduling_changes(self, ip, port):
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
addr = (ip, port)
#It may not be possible to connect after restart, TIME_WAIT could come into play etc. Anyway, keep trying
connected = False
while not connected:
try:
sck.connect(addr)
connected = True
except:
self.__radio_station.logger.error("Could not connect to server, retrying in 30 ...")
sleep(30)
sck.send(json.dumps({'station':self.__radio_station.id, 'action':'register'}))
while True:
data = sck.recv(1024)
try:
event = json.loads(data)
if event["action"] == "delete":
self.__delete_scheduled_job(event["id"])
self.__radio_station.logger.info("Scheduled program with id {0} has been deleted".format(event["id"]))
elif event["action"] == "add":
scheduled_program = self.__load_program(event["id"])
if not self.__is_program_expired(scheduled_program):
self.__add_scheduled_job(scheduled_program)
self.__radio_station.logger.info(
"Scheduled program with id {0} has been added at time {1}".format(event["id"],
scheduled_program.start))
elif event["action"] == "update":
self.__delete_scheduled_job(event["id"])
scheduled_program = self.__load_program(event["id"])
if not self.__is_program_expired(scheduled_program, scheduled_program.program.duration):
self.__add_scheduled_job(scheduled_program)
self.__radio_station.logger.info(
"Scheduled program with id {0} has been moved to start at time {1}".format(event["id"],
scheduled_program.start))
except:
pass #Most probably a JSON parse error
"""
Gets the program to run from the current list of programs that are lined up for the day
"""
def __get_current_program(self):
for program in self.__scheduled_programs:
if not self.__is_program_expired(program):
return program
"""
Returns whether or not the time for a particular program has passed
"""
def __is_program_expired(self, scheduled_program):
now = pytz.utc.localize(datetime.utcnow())
return (scheduled_program.start + scheduled_program.program.duration) < (now + timedelta(minutes=1))
def __get_program_start_time(self, scheduled_program):
now = datetime.now(dateutil.tz.tzlocal())
if scheduled_program.start < now: # Time at which program begins is already past
return now + timedelta(seconds=5) # 5 second scheduling allowance
else:
return scheduled_program.start + timedelta(seconds=5) # 5 second scheduling allowance