-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathController.py
292 lines (235 loc) · 11.7 KB
/
Controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import datetime
import glob
import os
import subprocess
import sys
import tkinter as tk
from multiprocessing import Process
from tkinter import messagebox
import serial
import App_Parameters as app_param
from App_Util import (process_get_HK_logs, resource_path,
sample_hk_command_process)
from Mission_Util import (process_handle_downlink,
process_send_mission_telecommand,
sample_downlink_process,
sample_mission_command_process)
from Testing import IS_TESTING
from Tk_Main_Page import MainPage
from Tk_Mission_Window import MissionWindow
from Tk_Start_Page import StartPage
class Controller(tk.Frame):
def __init__(self, parent, pipe_beacon):
tk.Frame.__init__(self, parent)
self.parent = parent
self.parent.resizable(width=False, height=False)
self.parent.iconbitmap(resource_path("assets/satellite.ico"))
self.parent.title("Dream2space Ground Station")
# Scan for serial ports
ports = self.scan_serial_ports()
self.ports = ports
# Pipe for beacon
self.pipe_beacon = pipe_beacon
# List of pending missions
self.pending_mission_list = []
# List of executing missions
self.current_mission_list = []
# Put all pages into container
self.container = tk.Frame(self.parent)
self.container.grid()
self.make_start_page()
# Poll and check for missions to execute
self.mission_execution_check()
# Initializing method to create Start Page
def make_start_page(self):
self.start = StartPage(self.container, self)
# Scan Serial ports in PC
def scan_serial_ports(self):
ports = []
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
continue
result.insert(0, " ")
# In testing, add dummy entries
if IS_TESTING:
result.append("COM14")
result.append("COM15")
return result
# Handle transition from Start Page to Main Page
# Upon pressing button to select Serial ports
def handle_transition(self):
# Extract ports selected
self.port_ttnc = self.start.get_ttnc_port()
self.port_payload = self.start.get_payload_port()
# Ports selected are wrong -> Prompt users to reselect
if self.port_ttnc == self.port_payload or self.port_ttnc == " " or self.port_ttnc == " ":
# Same ports selected
self.start.set_port_warning_message()
# Ports selected are correct -> Proceed to generate main page
else:
# Pass ttnc serial object via pipe to thread
self.pipe_beacon.send(self.port_ttnc)
# Erase Start Page
self.container.grid_forget()
self.container = tk.Frame(self.parent)
self.container.pack()
# Generate Container to store new page
self.main_page = MainPage(parent=self.container, controller=self, beacon_pipe=self.pipe_beacon,
width=app_param.APP_WIDTH, height=app_param.APP_HEIGHT)
# Handles Housekeeping Process after button pressed
def handle_hk_process_start(self):
if IS_TESTING:
self.housekeeping_process = Process(target=sample_hk_command_process, daemon=True) # Testing
else:
self.is_hk_process_success = False
self.prev_file_number = len(os.listdir(
app_param.HOUSEKEEPING_DATA_FOLDER_FILEPATH))
self.housekeeping_process = Process(target=process_get_HK_logs, daemon=True,
args=(self.pipe_beacon, self.port_ttnc, ))
self.housekeeping_process.start()
# Disable mission and housekeeping commands
self.main_page.show_disable_command_after_hk_command()
# Checks regularly if housekeeping process is complete
def hk_process_checking(self):
# If process still alive, continute to check
if self.housekeeping_process.is_alive():
self.after(100, self.hk_process_checking)
# If process ended, inform user
else:
is_success = False
if not IS_TESTING:
# Determine if telecommand obtaining is successful
curr_number_files = len(os.listdir(
app_param.HOUSEKEEPING_DATA_FOLDER_FILEPATH))
if curr_number_files > self.prev_file_number:
self.is_hk_process_success = True
self.prev_file_number = curr_number_files
else:
self.is_hk_process_success = True
# Housekeeping data parsing sucess - Open up explorer
if self.is_hk_process_success == True:
path = os.path.relpath(
app_param.HOUSEKEEPING_DATA_FOLDER_FILEPATH)
if sys.platform.startswith('win'):
os.startfile(path)
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
subprocess.check_call(['xdg-open', '--', path])
# display success message
is_success = True
# Housekeeping data parsing failed
else:
# display fail message
is_success = False
# Update screens
self.main_page.show_enable_command_after_hk_command()
self.main_page.show_status_after_hk_command(is_success)
# Undo flag
self.is_hk_process_success = False
# Open mission window
def open_mission_downlink_command_window(self):
self.mission_window = MissionWindow(self.parent, self)
# Handle mission checking and scheduling after submited on mission window
def handle_mission_scheduling(self):
def validate_mission(mission_input, pending_mission_list, current_mission_list):
# print(mission_input)
# Validate if (1) mission time is after current time, (2) downlink time after mission time
is_mission_time_future = mission_input.mission_datetime > datetime.datetime.now()
is_downlink_after_mission = mission_input.downlink_datetime > mission_input.mission_datetime
num_mission = len(self.pending_mission_list)
# Generate a complete list of current and pending missions
merge_list = pending_mission_list+current_mission_list
# Validate if (1) new mission is not less than 15 sec before or after a mission
# (2) new downlink is not less than 10 mins before or after a downlink
is_new_mission_allowed = True
for mission in merge_list:
if abs(mission.mission_datetime - mission_input.mission_datetime).total_seconds() <= 15:
is_new_mission_allowed = False
break
if abs(mission.downlink_datetime - mission_input.downlink_datetime).total_seconds() <= 3 * 3 * 60:
is_new_mission_allowed = False
break
if is_new_mission_allowed and is_mission_time_future and is_downlink_after_mission and num_mission < 3:
return True
else:
return False
# Do input validation
mission = self.mission_window.get_user_mission_input()
is_valid_input = validate_mission(mission, self.pending_mission_list, self.current_mission_list)
if is_valid_input:
# Close top window
self.mission_window.handle_mission_success()
# Add into pending mission list
self.pending_mission_list.append(mission)
self.pending_mission_list.sort(key=lambda x: x.downlink_datetime) # Sort on earliest downlink datetime
# print(self.pending_mission_list)
# Send CCSDS mission command to Cubesat
if IS_TESTING:
self.mission_command_process = Process(target=sample_mission_command_process, daemon=True) # Testing
else:
self.mission_command_process = Process(target=process_send_mission_telecommand, daemon=True, args=(
mission, self.pipe_beacon, self.port_ttnc, )) # Testing
self.mission_command_process.start()
# Update screens
self.main_page.show_disable_command_after_mission_command()
self.main_page.update_pending_mission_table(self.pending_mission_list)
else:
# Input time is not valid
num_current_missions = len(self.pending_mission_list)
self.mission_window.display_error_message(num_current_missions)
def mission_execution_check(self):
# print(f"CHECK! {datetime.datetime.now()}")
num_mission = len(self.pending_mission_list)
# print(self.pending_mission_list)
if num_mission != 0:
# Check top most mission item
# Start collection process if within 2 minutes of downlink
earliest_mission = self.pending_mission_list[0]
upcoming_downlink_datetime = earliest_mission.downlink_datetime
if upcoming_downlink_datetime - datetime.datetime.now() < datetime.timedelta(seconds=120):
print(f"less than 2 minutes to mission!!")
self.current_mission_list.append(earliest_mission)
del self.pending_mission_list[0]
if IS_TESTING:
self.downlink_process = Process(target=sample_downlink_process, daemon=True) # Testing
else:
self.downlink_process = Process(
target=process_handle_downlink, daemon=True,
args=(self.port_payload, earliest_mission.get_mission_name(),
earliest_mission.get_mission_datetime_string(),
earliest_mission.get_downlink_datetime_string(),))
self.downlink_process.start()
# Render on the missions screens
self.main_page.update_pending_mission_table(self.pending_mission_list)
self.main_page.update_current_mission_table(self.current_mission_list)
try:
if len(self.current_mission_list) != 0 and not self.downlink_process.is_alive():
del self.current_mission_list[0]
self.main_page.update_current_mission_table(self.current_mission_list)
except AttributeError:
pass
self.after(app_param.APP_DOWNLINK_PROCESS_CHECK_INTERVAL, self.mission_execution_check)
# Respond to button pressed when User wishes to view completed Missions/Downlink
def view_completed_missions(self):
# If no mission created yet, not records found
# Show popup warning
if not os.path.exists(app_param.GROUND_STN_MISSION_LOG_FILEPATH):
messagebox.showerror(title="Dream2space Ground Station",
message="Mission records not found!\nTry to send a mission command.")
else:
if sys.platform.startswith('win'):
# Replace slash with backslash
os.startfile(app_param.GROUND_STN_MISSION_LOG_FILEPATH.replace("/", "\\"))
path = os.path.relpath(app_param.GROUND_STN_MISSION_FOLDER_PATH)
os.startfile(path)
else:
os.startfile(app_param.GROUND_STN_MISSION_LOG_FILEPATH)