From cdad43f93eb9097d5713458d2013306e77c8d216 Mon Sep 17 00:00:00 2001 From: keithmk Date: Fri, 16 Feb 2024 11:33:38 -0500 Subject: [PATCH] Added more UI --- .../mil_tools/scripts/mil-preflight/main.py | 247 ++++++++---------- .../mil_tools/scripts/mil-preflight/menus.py | 72 +++++ .../mil-preflight/old-code/RunTests.py | 11 - .../scripts/mil-preflight/old-code/TestLib.py | 93 ------- 4 files changed, 184 insertions(+), 239 deletions(-) create mode 100644 mil_common/utils/mil_tools/scripts/mil-preflight/menus.py delete mode 100644 mil_common/utils/mil_tools/scripts/mil-preflight/old-code/RunTests.py delete mode 100644 mil_common/utils/mil_tools/scripts/mil-preflight/old-code/TestLib.py diff --git a/mil_common/utils/mil_tools/scripts/mil-preflight/main.py b/mil_common/utils/mil_tools/scripts/mil-preflight/main.py index 4d773cb8a..5bd578b4d 100644 --- a/mil_common/utils/mil_tools/scripts/mil-preflight/main.py +++ b/mil_common/utils/mil_tools/scripts/mil-preflight/main.py @@ -1,59 +1,20 @@ import subprocess import time +import menus import rosnode import rospy import rostopic -import typer from PyInquirer import prompt from rich.console import Console -from rich.progress import track +from rich.progress import Progress, track # Custom message imports from subjugator_msgs.msg import ThrusterCmd -app = typer.Typer() - - -def display_start_menu(): - console = Console() - - # Title - console.print( - "[bold green]Preflight Program - Autonomous Robot Verification[/bold green]", - ) - - # Description - console.print( - "Welcome to the Preflight Program, a tool inspired by the preflight checklists used by pilots before " - "flying a plane. This program is designed to verify the functionality of all software and hardware " - "systems on your autonomous robot. It ensures that everything is in working order, allowing you to " - "safely deploy your robot with confidence.\n", - ) - - # Authors section - console.print("\n[italic]Authors:[/italic]") - console.print("Keith Khadar") - console.print("Anthony Liao") - console.print("Joshua Thomas\n") - - # Menu options - start_menu = [ - { - "type": "list", - "name": "mode selection", - "message": "Menu", - "choices": [ - "Run Preflight Full Test", - "View Report", - "Run Specific Test", - "View Documentation", - "Exit", - ], - }, - ] - option = prompt(start_menu) - return next(iter(option.values())) +node_timout = 5 # seconds +topic_timout = 5 # seconds +actuator_timout = 5 # seconds hardwareChecklist = [ @@ -72,13 +33,13 @@ def display_start_menu(): ] topics = [ - "/camera/front/right/image_raw", - "/camera/down/image_raw", - "/camera/front/left/image_raw", - "/dvl", - "/depth", - "/imu/data_raw", - "/imu/mag", + # "/camera/front/right/image_raw", + # "/camera/down/image_raw", + # "/camera/front/left/image_raw", + # "/dvl", + # "/depth", + # "/imu/data_raw", + # "/imu/mag", ] nodes = ["/odom_estimator"] @@ -100,127 +61,143 @@ def display_start_menu(): ] -@app.command("Start") def main(): - # Display Modes/Options + # Clear Screen and Display Start menu subprocess.run("clear", shell=True) - mode = display_start_menu() + # Print Info about Preflight + Console().print(menus.info_page) + + # Print start select menu + option = prompt(menus.start_menu) + mode = next(iter(option.values())) + + # Select the mode and run it if mode == "Run Preflight Full Test": - hardware() - software() - actuators() + fullTest() if mode == "Exit": subprocess.run("clear", shell=True) return + + # Return to this screen after running the selected mode main() -def hardware(): - # Complete the hardware tests - subprocess.run("clear", shell=True) - answers = prompt(hardwareChecklist) - while len(next(iter(answers.values()))) != 5: +def fullTest(): + ### Complete the hardware tests ### + + while True: subprocess.run("clear", shell=True) answers = prompt(hardwareChecklist) + # If everything has been checked off + if len(next(iter(answers.values()))) == 5: + break + else: + menu_ans = prompt(menus.incomplete_continue) + if next(iter(menu_ans.values())) is True: + # Continue even though everything has not been checked off + break + else: + # Go back to main menu + return + + ### Complete Software Tests ### -def software(): - # Complete the software tests subprocess.run("clear", shell=True) + try: + rostopic._check_master() + except Exception: + Console().print("[bold] ROS not running! Please try again later[/]") + menu_ans = prompt(menus.press_anykey) + return + rospy.init_node("preflight") # Check Nodes answers = [] for node in track(nodes, description="Checking Nodes..."): + # Try and ping the nodes try: - answers.append({node: rosnode.rosnode_ping(node, 5)}) + answers.append((node, rosnode.rosnode_ping(node, node_timout))) except Exception: - answers.append({node: False}) - print(answers) + answers.append((node, False)) + + print_results(answers) + menu_ans = prompt(menus.continue_question) + if next(iter(menu_ans.values())) is False: + # Go back to main menu + return # Check Topics answers = [] for topic in track(topics, description="Checking Topics..."): + # Check for messages on the topics try: topicType, topicStr, _ = rostopic.get_topic_class(topic) # get topic class rospy.wait_for_message( topicStr, topicType, - 5, + topic_timout, ) # try to get a message from that topic answers.append({topic: True}) except Exception: answers.append({topic: False}) - print(answers) - - print( - prompt( - [ - { - "type": "confirm", - "name": "continue", - "message": "Continue?", - }, - ], - ), - ) - - -def actuators(): + print_results(answers) + + menu_ans = prompt(menus.continue_question) + if next(iter(menu_ans.values())) is False: + # Go back to main menu + return + + ### Actuators Test ### subprocess.run("clear", shell=True) - print("test") + answers = [] - try: - prompt( - [ - { - "type": "confirm", - "name": "runActuator", - "message": "Are your sure you want to run " - + actuatorsList[0][0] - + "? BE CAREFUL make sure everyone's fingures are secured.", - }, - ], - ) - topicType, topicStr, _ = rostopic.get_topic_class( - actuatorsList[0][0], - ) # get topic class - print(topicType) - pub = rospy.Publisher(topicStr, topicType, queue_size=10) - print(actuatorsList[0][1]) - t_end = time.time() + 5 - while time.time() < t_end: - pub.publish(actuatorsList[0][1]) - answers.append( - prompt( - [ - { - "type": "confirm", - "name": "worked?", - "message": "Did " + actuatorsList[0][0] + " work as expected?", - }, - ], - ), - ) - except Exception as e: - print(e) - answers.append(False) - - print(answers) - - print( - prompt( - [ - { - "type": "confirm", - "name": "continue", - "message": "Continue?", - }, - ], - ), - ) + for actuator in actuatorsList: + try: + # Confirm that it is safe to run this actuator + Console().print(menus.safety_check, actuator[0]) + menu_ans = prompt(menus.continue_question) + if next(iter(menu_ans.values())) is False: + # Go back to main menu + return + + # Create a publisher + topicType, topicStr, _ = rostopic.get_topic_class(actuator[0]) + pub = rospy.Publisher(topicStr, topicType, queue_size=10) + + # Publish to the topic for the specified timeout + with Progress() as progress: + t_start = time.time() + t_end = t_start + actuator_timout + t_prev = time.time() + task = progress.add_task("Running", total=(t_end - t_start)) + while time.time() <= t_end: + pub.publish(actuator[1]) + progress.update(task, advance=(time.time() - t_prev)) + t_prev = time.time() + progress.update(task, advance=t_end) + + # Ask if the actuator worked + Console().print(menus.actuator_check) + answers.append((actuator[0], next(iter(prompt(menus.yes_no).values())))) + except Exception: + answers.append((actuator[0], False)) + + print_results(answers) + menu_ans = prompt(menus.press_anykey) + return + + +def print_results(systems): + subprocess.run("clear", shell=True) + for name, status in systems: + if status: + Console().print(f"{name}: [green]✔[/] Working") + else: + Console().print(f"{name}: [red]❌[/] Not Working") if __name__ == "__main__": - app() + main() diff --git a/mil_common/utils/mil_tools/scripts/mil-preflight/menus.py b/mil_common/utils/mil_tools/scripts/mil-preflight/menus.py new file mode 100644 index 000000000..01ab69f31 --- /dev/null +++ b/mil_common/utils/mil_tools/scripts/mil-preflight/menus.py @@ -0,0 +1,72 @@ +################################################################################ +# File name: menus.py +# Author: Keith Khadar +# Description: This file is used to store all the menus used by prefligh +################################################################################ + + +# ----- Home Page -----# +info_page = """ +[bold green]Preflight Program - Autonomous Robot Verification[/bold green] +Welcome to the Preflight Program, a tool inspired by the preflight checklists used by pilots before flying a plane. This program is designed to verify the functionality of all software and hardware systems on your autonomous robot. It ensures that everything is in working order, allowing you to safely deploy your robot with confidence.\n +[italic]Authors:[/italic] +Keith Khadar +Anthony Liao +Joshua Thomas\n +""" + + +start_menu = [ + { + "type": "list", + "name": "mode selection", + "message": "Menu", + "choices": [ + "Run Preflight Full Test", + "View Report", + "Run Specific Test", + "View Documentation", + "Exit", + ], + }, +] + +# ----- Loading Screen -----# +continue_question = [ + { + "type": "confirm", + "name": "continue", + "message": "Continue?", + }, +] +press_anykey = [ + { + "type": "confirm", + "name": "continue", + "message": "Press any key to continue.", + }, +] + +incomplete_continue = [ + { + "type": "confirm", + "name": "incomplete", + "message": "This checklist is incomplete. Do you wish to continue?", + }, +] +yes_no = [ + { + "type": "confirm", + "name": "yes_no", + "message": "Yes?", + }, +] + +# ----- Actuators Screen -----# +safety_check = """ +MAKE SURE THAT EVERYONE'S FINGERS ARE CLEAR!! Is it safe to run the following actuator? +""" + +actuator_check = """ +Did the actuator work? +""" diff --git a/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/RunTests.py b/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/RunTests.py deleted file mode 100644 index 54d2940d4..000000000 --- a/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/RunTests.py +++ /dev/null @@ -1,11 +0,0 @@ -import importlib - - -def getHardWareCheckist(robotName): - robotName += ".hardware" - module = importlib.import_module(robotName) - - if hasattr(module, "hardwareTests"): - return module.hardwareTests - else: - return [] diff --git a/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/TestLib.py b/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/TestLib.py deleted file mode 100644 index 4670e8781..000000000 --- a/mil_common/utils/mil_tools/scripts/mil-preflight/old-code/TestLib.py +++ /dev/null @@ -1,93 +0,0 @@ -import rospy -import rosservice -import rostopic - - -class Test: - # Constructor - def __init__(self, name, description, communicationType, address): - self.name = name - self.description = description - self.communicationType = communicationType - self.address = address - self.needsHumanAuthorization = False - - # Check if the communication interface is up and running - def isActive(self): - # Check if the topic is up - if self.communicationType == "Topic": - topicType, topicStr, _ = rostopic.get_topic_class( - self.address, - ) # get topic class - try: - rospy.wait_for_message( - topicStr, - topicType, - ) # try to get a message from that topic - return True - except Exception: - return False - - # Check if the service is up - if self.communicationType == "Service": - return rosservice.waitForService( - self.address, - ) # Wait for the service to be up - - # Check if the action server is up - if self.communicationType == "Action": - # action client - topicType, topicStr, _ = rostopic.get_topic_class( - self.address + "/feedback", - ) - try: - # Wait for a message from the topic - rospy.wait_for_message(topicStr, topicType) - return True - except Exception: - return False - - # Send data over the communication interface and pass in the return values into the check function - def VerifyData(self, args, checkFunction): - result = "" - - if self.communicationType == "Topic": - # Get the class of the topic - topicType, topicStr, _ = rostopic.get_topic_class(self.address) - try: - # Get a message from the topic - result = rospy.wait_for_message(topicStr, topicType) - except Exception: - return False - - if self.communicationType == "Service": - try: - # Get a message form the service - result = rosservice.call_service(self.address, args) - except Exception: - return False - - if self.communicationType == "Action": - # Get the class of the topic, remember an action will publish its results to a topic with /goal at the end - topicType, topicStr, _ = rostopic.get_topic_class( - self.address + "/feedback", - ) - try: - # Wait for a message from the topic - result = rospy.wait_for_message(topicStr, topicType) - except Exception: - return False - - return checkFunction(args, result) - - def runCommand(self, args): - # use a dictionary - if self.communicationType == "Topic": - topicType, topicStr, _ = rostopic.get_topic_class(self.address) - if self.communicationType == "Service": - # Do something - self - if self.communicationType == "Action": - # Do something - self - return False