Skip to content

Commit

Permalink
Merge pull request #440 from PTG-Kitware/dev/tm-reset-start-stop
Browse files Browse the repository at this point in the history
Add reset/start/stop System command behaviors and response by GSP Task Monitor
  • Loading branch information
Purg authored Jun 11, 2024
2 parents 8a8a718 + dbf5736 commit 19738fb
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 62 deletions.
4 changes: 3 additions & 1 deletion angel_system/global_step_prediction/global_step_predictor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from typing import Any, Dict, List

import yaml
import seaborn as sn
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(
self.recipe_configs = recipe_config_dict

# Array of tracker dicts
self.trackers = []
self.trackers: List[Dict[str, Any]] = []
for recipe in recipe_types:
self.initialize_new_recipe_tracker(recipe)

Expand Down Expand Up @@ -357,6 +358,7 @@ def reset_one_tracker(self, tracker_ind):
print(f"RESETTING tracker {tracker_ind}")
self.trackers[tracker_ind]["current_broad_step"] = 0
self.trackers[tracker_ind]["current_granular_step"] = 0
self.trackers[tracker_ind]["active"] = True
self.tracker_resets.append(self.trackers[tracker_ind]["recipe"])

def granular_to_broad_step(self, tracker, granular_step):
Expand Down
10 changes: 5 additions & 5 deletions config/tasks/cooking/multi-task-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ tasks:
# (i.e. relative to $(ANGEL_WORKSPACE)).
- id: 0
label: "Coffee"
config_file: "./config/tasks/recipe_coffee.yaml"
config_file: "./config/tasks/cooking/recipe_coffee.yaml"
active: true
- id: 1
label: "Tea"
config_file: "./config/tasks/recipe_tea.yaml"
config_file: "./config/tasks/cooking/recipe_tea.yaml"
active: true
- id: 2
label: "Pinwheel"
config_file: "./config/tasks/recipe_pinwheel.yaml"
config_file: "./config/tasks/cooking/recipe_pinwheel.yaml"
active: true
- id: 3
label: "Oatmeal"
config_file: "./config/tasks/recipe_oatmeal.yaml"
config_file: "./config/tasks/cooking/recipe_oatmeal.yaml"
active: true
- id: 4
label: "Dessert Quesadilla"
config_file: "./config/tasks/recipe_dessertquesadilla.yaml"
config_file: "./config/tasks/cooking/recipe_dessertquesadilla.yaml"
active: true
15 changes: 14 additions & 1 deletion ros/angel_msgs/msg/SystemCommands.msg
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,21 @@
# received message instance.
#

##############################
# Task Monitor commands

# Reset the whole task monitor state.
bool reset_monitor_state

# Toggle the state of the monitor's "pause" command.
bool toggle_monitor_pause

# Index of the task to which the commands here pertain.
int8 task_index
bool reset_current_task
# Reset the referenced task's state and start over from the beginning.
# TODO: This is currently unused.
bool reset_task
# Revert our step tracking for the referenced task to the previous step.
bool previous_step
# Progress our step tracking for the referenced task to the next step.
bool next_step
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from threading import RLock
from typing import Dict
from typing import List
from typing import Optional

from builtin_interfaces.msg import Time
Expand Down Expand Up @@ -43,6 +44,9 @@
# The step mode to use for this predictor instance. This must be either "broad"
# or "granular"
PARAM_STEP_MODE = "step_mode"
# If the GSP Node should start in "paused" mode as opposed to starting in an
# active state.
PARAM_START_PAUSED = "start_paused"
# Enable ground-truth plotting mode by specifying the path to an MSCOCO file
# that includes image level `activity_gt` attribute.
# Requires co-specification of the video ID to select out of the COCO file.
Expand Down Expand Up @@ -89,6 +93,7 @@ def __init__(self):
(PARAM_GT_ACT_COCO, ""),
(PARAM_GT_VIDEO_ID, -1),
(PARAM_GT_OUTPUT_DIR, "outputs"),
(PARAM_START_PAUSED, False),
],
)
self._config_file = param_values[PARAM_CONFIG_FILE]
Expand All @@ -114,59 +119,28 @@ def __init__(self):
f"one of {VALID_STEP_MODES}."
)

# Determine what recipes are in the config
# TODO: make use of angel_system.data.config_structs instead of
# manually loading and accessing by string keys.
with open(self._config_file, "r") as stream:
config = yaml.safe_load(stream)
recipe_types = [
recipe["label"] for recipe in config["tasks"] if recipe["active"]
]
recipe_configs = [
recipe["config_file"] for recipe in config["tasks"] if recipe["active"]
]

recipe_config_dict = dict(zip(recipe_types, recipe_configs))
log.info(f"Recipes: {recipe_config_dict}")

# Instantiate the GlobalStepPredictor module
self.gsp = GlobalStepPredictor(
threshold_multiplier_weak=self._threshold_multiplier_weak,
threshold_frame_count=self._thresh_frame_count,
threshold_frame_count_weak=self._threshold_frame_count_weak,
deactivate_thresh_frame_count=self._deactivate_thresh_frame_count,
recipe_types=recipe_types,
recipe_config_dict=recipe_config_dict,
activity_config_fpath=self._activity_config_file,
)

# model_file = pre-computed averages of TP activations
self.gsp.get_average_TP_activations_from_file(self._model_file)
log.info("Global state predictor loaded")

# Mapping from recipe to current step. Used to track state changes
# of the GSP and determine when to publish a TaskUpdate msg.
self.recipe_current_step_id = {}
self.recipe_current_step_id: Dict[str, int] = {}

# Mapping from recipe to a list of skipped step IDs. Used to ensure
# that duplicate task error messages are not published for the same
# skipped step
self.recipe_skipped_step_ids = {}
self.recipe_published_last_msg = {}
self.recipe_skipped_step_ids: Dict[str, List[int]] = {}
self.recipe_published_last_msg: Dict[str, bool] = {}

# Track the latest activity classification end time sent to the HMM
# Time is represented as a the ROS Time message
# Time is represented as the ROS Time message
self._latest_act_classification_end_time = None

# Control access to GSP
self._gsp_lock = RLock()

for task in self.gsp.trackers:
self.recipe_current_step_id[task["recipe"]] = task[
f"current_{self._step_mode}_step"
]
self.recipe_skipped_step_ids[task["recipe"]] = []
self.recipe_published_last_msg[task["recipe"]] = False
# The GSP Instance, which we'll load now.
self._gsp_lock = RLock() # Control access to GSP
log.info(
f"Starting system in PAUSED mode? :: {param_values[PARAM_START_PAUSED]}"
)
self._gsp_active: bool = not param_values[PARAM_START_PAUSED]
self.gsp: Optional[GlobalStepPredictor] = None
self._reload_gsp()

# Initialize ROS hooks
self._task_update_publisher = self.create_publisher(
Expand Down Expand Up @@ -220,30 +194,91 @@ def __init__(self):
)
log.info("GT params specified, initializing data... Done")

def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
def _reload_gsp(self) -> None:
"""
Callback function for the system command subscriber topic.
Forces an update of the GSP to a new step.
(Re)Load the GSP instance from input configuration parameters.
This will recreate the GSP instance and return it.
This will make use of the `_gsp_lock` for access protection.
"""
log = self.get_logger()
with self._gsp_lock:
# Determine what recipes are in the config
# TODO: make use of angel_system.data.config_structs instead of
# manually loading and accessing by string keys.
with open(self._config_file, "r") as stream:
config = yaml.safe_load(stream)
recipe_types = [
recipe["label"] for recipe in config["tasks"] if recipe["active"]
]
recipe_configs = [
recipe["config_file"] for recipe in config["tasks"] if recipe["active"]
]

recipe_config_dict = dict(zip(recipe_types, recipe_configs))
log.info(f"Recipes: {recipe_config_dict}")

# Instantiate the GlobalStepPredictor module
self.gsp = GlobalStepPredictor(
threshold_multiplier_weak=self._threshold_multiplier_weak,
threshold_frame_count=self._thresh_frame_count,
threshold_frame_count_weak=self._threshold_frame_count_weak,
deactivate_thresh_frame_count=self._deactivate_thresh_frame_count,
recipe_types=recipe_types,
recipe_config_dict=recipe_config_dict,
activity_config_fpath=self._activity_config_file,
)

# model_file = pre-computed averages of TP activations
self.gsp.get_average_TP_activations_from_file(self._model_file)
log.info("Global state predictor (re)loaded")

# Load default values into our stateful mappings.
for task in self.gsp.trackers:
self.recipe_current_step_id[task["recipe"]] = task[
f"current_{self._step_mode}_step"
]
self.recipe_skipped_step_ids[task["recipe"]] = []
self.recipe_published_last_msg[task["recipe"]] = False

# Track the latest activity classification end time sent to the HMM
# Time is represented as the ROS Time message
# Reset the last activity classification time to "not received yet"
self._latest_act_classification_end_time = None

def _sys_cmd_change_step(self, sys_cmd_msg: SystemCommands) -> None:
"""
Handle one command message that changes the step.
This will do nothing if no previous/next step commands are given.
:param sys_cmd_msg: Message containing command content.
"""
log = self.get_logger()

with self._gsp_lock:
if self._step_mode == "broad" and sys_cmd_msg.next_step:
log.info("Manual step change detected -> Next broad step")
update_function = self.gsp.manually_increment_current_broad_step
elif self._step_mode == "broad" and sys_cmd_msg.previous_step:
log.info("Manual step change detected -> Previous broad step")
update_function = self.gsp.manually_decrement_current_step
elif self._step_mode == "granular" and sys_cmd_msg.next_step:
log.info("Manual step change detected -> Next granular step")
update_function = self.gsp.increment_granular_step
elif self._step_mode == "granular" and sys_cmd_msg.previous_step:
log.info("Manual step change detected -> Previous granular step")
update_function = self.gsp.decrement_granular_step
else:
# This should never happen
# No previous/next step request, stopping.
return

try:
tracker_dict_list = update_function(sys_cmd_msg.task_index)
except Exception:
except Exception as ex:
# GSP raises exception if this fails, so just ignore it
log.warn(f"Failed to update step: {ex}")
return

if self._latest_act_classification_end_time is None:
Expand All @@ -267,8 +302,9 @@ def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
and task["active"]
):
log.info(
f"Manual step change detected: {task['recipe']}. Current step: {current_step_id}"
f" Previous step: {previous_step_id}."
f"Manual step change detected: {task['recipe']}. "
f"Current step: {current_step_id} "
f"Previous step: {previous_step_id}."
)
self.publish_task_state_message(
task, self._latest_act_classification_end_time
Expand All @@ -283,7 +319,8 @@ def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
if not self.recipe_published_last_msg[task["recipe"]]:
# The last step activity was completed.
log.info(
f"Final step manually completed: {task['recipe']}. Current step: {current_step_id}"
f"Final step manually completed: {task['recipe']}. "
f"Current step: {current_step_id}"
)
self.publish_task_state_message(
task,
Expand All @@ -298,8 +335,9 @@ def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
and task["active"]
):
log.info(
f"Manual step change detected: {task['recipe']}. Current step: {current_step_id}"
f" Previous step: {previous_step_id}."
f"Manual step change detected: {task['recipe']}. "
f"Current step: {current_step_id} "
f"Previous step: {previous_step_id}."
)
self.publish_task_state_message(
task, self._latest_act_classification_end_time
Expand All @@ -308,6 +346,47 @@ def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):

self.recipe_published_last_msg[task["recipe"]] = False

def _sys_cmd_reset_monitor(self):
"""
Reset the GSP as a result of a system command.
NOTE: This approach is an initial implementation and does not take into
account the GSP tracking multiple instances of the
"""
log = self.get_logger()
with self._gsp_lock:
# Latest "sensor input time", i.e. the reset request is right now.
ts = self.get_clock().now().to_msg()
# Reset all trackers and publish the reset state.
for i, task in enumerate(self.gsp.trackers):
self.gsp.reset_one_tracker(i)
log.info(f"Resetting task {i}")
self.publish_task_state_message(task, ts)
self.recipe_published_last_msg[task["recipe"]] = False

def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
"""
Callback function for the system command subscriber topic.
Forces an update of the GSP to a new step.
"""
log = self.get_logger()

# Handle if we should toggle "active" state
if sys_cmd_msg.toggle_monitor_pause:
with self._gsp_lock:
self._gsp_active = not self._gsp_active
log.info(f"Toggling GSP active state to {self._gsp_active}")

if sys_cmd_msg.reset_monitor_state:
if sys_cmd_msg.next_step or sys_cmd_msg.previous_step:
log.warn(
"Change in task step requested alongside resetting "
"monitor state, abiding reset and ignoring step change."
)
self._sys_cmd_reset_monitor()
elif sys_cmd_msg.next_step or sys_cmd_msg.previous_step:
self._sys_cmd_change_step(sys_cmd_msg)

def det_callback(self, activity_msg: ActivityDetection):
"""
Callback function for the activity detection subscriber topic.
Expand All @@ -322,6 +401,10 @@ def det_callback(self, activity_msg: ActivityDetection):
conf_array = np.expand_dims(conf_array, 0)

with self._gsp_lock:
# If we are not "active", just immediately kick out
if not self._gsp_active:
return

tracker_dict_list = self.gsp.process_new_confidences(conf_array)

print(f"conf_array: {conf_array}")
Expand Down
Loading

0 comments on commit 19738fb

Please sign in to comment.