Skip to content

Commit

Permalink
Rewrote event system according to advice by virtuald, not exactly usi…
Browse files Browse the repository at this point in the history
…ng generators, but same concept in passing an object to the function that indicates when the function should terminate.
  • Loading branch information
computer-whisperer committed Nov 8, 2014
1 parent d66bef5 commit 71dda44
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 291 deletions.
291 changes: 169 additions & 122 deletions framework/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,136 +2,183 @@
This is an event system. Modules can add callbacks to events, which will be run in a new thread once triggered.
This is the primary mechanism for starting and stopping run loops.
On the flip side, events can be either "triggered" which is a one-time call (For example: shoot_cannon or reset_gyro),
Events can be either "triggered" which is a one-time call (For example: shoot_cannon or reset_gyro),
or they can be started and then stopped, for events with duration (For example: run, teleoperated, or enabled).
The main difference is that when modules are freshly loaded, the refresh_events method is called, which then finds all
of the "started" events and triggers them for the new module.
The main difference is that when modules are freshly loaded, the repeat_callbacks method is called, which then finds all
of the "active" events and calls them for the new module.
"""

import logging
import threading
import framework.module_engine

__author__ = 'christian'

#All event callback objects
_event_callbacks = dict()
#All triggered stated events
_active_events = list()


class _EventCallback:
"""This represents a callback to an event"""

def __init__(self, callback, tgtmod, inv_func):
#Set variables
self.func = callback
self.inv_func = inv_func
self.subsystem = tgtmod

def call(self):
"""Asynchronously call the main callback function"""
threading.Thread(target=framework.module_engine.get_modules(self.subsystem).call_wrap, args={self.func}).start()

def call_inverse(self):
"""Asynchronously call the inverse callback function"""
threading.Thread(target=framework.module_engine.get_modules(self.subsystem).call_wrap, args={self.inv_func}).start()


def add_callback(event, subsystem, callback=lambda: True, inverse_callback=lambda: True):
"""Set a callback for a specified event, callback function, target module, source module, and inverse callback"""
#If there is no such event already listed, than create the list
if event not in _event_callbacks:
_event_callbacks[event] = list()
#Create the callback object and add it to the list
_event_callbacks[event].append(_EventCallback(callback, subsystem, inverse_callback))


def start_event(eventname, src_subsystem):
"""Start the event"""
#If the event is not already started
if eventname not in [x["event"] for x in _active_events]:
#Add event to active_events and trigger all callbacks
_active_events.append({"event": eventname, "src_subsystem": src_subsystem})
trigger_event(eventname, src_subsystem)


def stop_event(eventname, src_subsystem):
"""Stop the event"""
#Scan through each active event
for event in _active_events[:]:
#If it is the correct event
if event["event"] is eventname:
#Clear active_events of event and trigger all inverse callbacks
_active_events.remove(event)
trigger_event(eventname, src_subsystem, inverse_event=True)


def trigger_event(eventname, src_subsystem, inverse_event=False):
"""Trigger all callbacks for an event"""
#If we even have callbacks for this eventname
if eventname in _event_callbacks:
#Then go through them all and call them!
for callback in _event_callbacks[eventname]:
try:
#If we want the normal one
if not inverse_event:
callback.call()
#Otherwise, we want the inverse callback
else:
callback.call_inverse()
except Exception as e:
#Naughty Naughty, little callback!
logging.error("Exception calling callback on event " + eventname + ": " + str(e))
#Let the system know
if not inverse_event:
logging.info("Triggered callbacks for event " + eventname + " by subsystem " + src_subsystem)
else:
logging.info("Triggered inverse callbacks for event " + eventname + " by subsystem " + src_subsystem)


def refresh_events(subsystem):
"""Check if subsystem has callbacks for anything in active_events, and call them if so"""
#For each active event
for event in _active_events:
#If we have any callbacks for it
if event["event"] in _event_callbacks:
#Check each one to see if it points to the subsystem
for callback in _event_callbacks[event["event"]]:
if callback.subsystem is subsystem:
try:
#Then call it, and report any issues!
callback.call()
except Exception as e:
#Naughty Naughty, little callback!
logging.error("Exception calling callback on event " + event["event"] + ": " + str(e))
logging.info("Refreshed events for subsystem " + subsystem)


def cleanup_events(subsystem=None):
#This stores references to all created events
events = dict()


class Event(object):
"""This manages everything related to an event"""

#The event's name
name = ""
#Is the event active
active = False

class Task(object):
"""
This is used to store a reference to a callback function and to manage its state.
it is also the first argument passed to callback functions
"""

#Is the task currently active?
#This is different than the event level's active value, since individual tasks
# can be stopped for module unload
active = False

def __init__(self, event, target_subsystem, function):
self.event = event
self.target_subsystem = target_subsystem
self.function = function

def run(self):
"""Spawn a new thread with the task function."""
call_wrap = framework.module_engine.get_modules(self.target_subsystem).call_wrap
threading.Thread(target=call_wrap, args=(self.function, self)).start()

def __init__(self, name):
#Initialize variables
self.name = name
self._callbacks = list()
self._inverse_callbacks = list()

def add_callback(self, target_subsystem, callback):
"""Registers the callback with subsystem as target_subsystem"""
self._callbacks.append(self.Task(self, target_subsystem, callback))

def add_inverse_callback(self, target_subsystem, inverse_callback):
"""Registers the inverse callback with subsystem as target_subsystem"""
self._inverse_callbacks.append(self.Task(self, target_subsystem, inverse_callback))

def trigger(self, src_subsystem):
"""Starts all callbacks"""
#Loops through all callbacks and first sets them as active, then runs them
for callback in self._callbacks:
callback.active = True
callback.run()

#Log it if we actually did anything
if len(self._callbacks) is not 0:
logging.info("Triggered event {} by subsystem {}".format(self.name, src_subsystem))

def start(self, src_subsystem):
"""Starts all callbacks and sets _active to True"""
#Loops through all callbacks and first sets them as active, then runs them
for callback in self._callbacks:
callback.active = True
callback.run()

#Log it if we actually did anything
if len(self._callbacks) is not 0:
logging.info("Started event {} by subsystem {}".format(self.name, src_subsystem))

#Set ourself to active
self.active = True

def stop(self, src_subsystem):
"""Starts all inverse callbacks and sets _active to False"""

did_something = False

#Set all callbacks to inactive
for callback in self._callbacks:
did_something = did_something or callback.active
callback.active = False

#Run all inverse_callbacks
for callback in self._inverse_callbacks:
did_something = True
callback.run()

#Log it if we did something
if did_something:
logging.info("Stopped event {} by subsystem {}".format(self.name, src_subsystem))
self.active = False

def remove_callbacks(self, target_subsystem=None):
"""
Removes callback records. If target_subsystem is specified,
restricts removal to that subsystem, otherwise it removes all callbacks.
"""
#For each callback, if either the subsystem matches, or we have been given no subsystem:
for callback in self._callbacks[:]:
if target_subsystem is callback.target_subsystem or target_subsystem is None:
#Deactivate the callback and remove it from our list.
callback.active = False
self._callbacks.remove(callback)

def repeat_callbacks(self, target_subsystem):
"""Repeats all callbacks pointing to target_subsystem if we are active"""
#If we are active
if self.active:
#Loop through all callbacks that match the target_subsystem and run them
for callback in [c for c in self._callbacks if c.target_subsystem is target_subsystem]:
callback.run()


def _get_event(name):
"""Look for event event name, and create one if it does not exist. Then return it."""
if name not in events:
events[name] = Event(name)
return events[name]


def add_callback(event_name, target_subsystem, callback):
"""Set a callback for a specified event, target module, and callback function"""
#Get the event and add the callback
_get_event(event_name).add_callback(target_subsystem, callback)


def add_inverse_callback(event_name, target_subsystem, callback):
"""Set an inverse callback for a specified event, target module, and callback function"""
#Get the event and add the callback
_get_event(event_name).add_inverse_callback(target_subsystem, callback)


def start_event(event_name, src_subsystem):
"""Starts the event event_name"""
#Get the event and start it.
_get_event(event_name).start(src_subsystem)


def stop_event(event_name, src_subsystem):
"""Stop the event event_name"""
#Get the event and stop it.
_get_event(event_name).stop(src_subsystem)


def trigger_event(event_name, src_subsystem):
"""Trigger all callbacks for event event_name"""
#Get the event and trigger it.
_get_event(event_name).trigger(src_subsystem)


def repeat_callbacks(target_subsystem):
"""Repeats all active callbacks pointed to this subsystem"""
#Call repeat_callbacks on all events
for event in events:
events[event].repeat_callbacks(target_subsystem)
logging.info("Refreshed callbacks for subsystem " + target_subsystem)


def remove_callbacks(target_subsystem=None):
"""
Purge the event system of all callbacks and active_events related to subsystem.
If subsystem is not specified, then purge for all subsystems
"""
#Purge active_events:
#For each active event
for event in _active_events[:]:
#If it was started by subsystem
if event["src_subsystem"] is subsystem or subsystem is None:
#Stop it!
stop_event(event["event"], "events")

#Purge callbacks
#For each event
for event in _event_callbacks:
#For each callback in that event
#The "[:]" copies the list, so we are not deleting objects from the same list as we are iterating over.
for callback in _event_callbacks[event][:]:
#If we have a match, check if it is an active event, and call it's inverse callback if so!
if callback.subsystem is subsystem or subsystem is None:
#Is this event an active event?
if event in [x["event"] for x in _active_events]:
#If so, call it's inverse callback
callback.call_inverse()
#Purge it!
_event_callbacks[event].remove(callback)
#Call remove_callbacks on all events
for event in events:
events[event].remove_callbacks(target_subsystem)
if target_subsystem is None:
logging.info("Removed callbacks all subsystems")
else:
logging.info("Removed callbacks for subsystem " + target_subsystem)
8 changes: 4 additions & 4 deletions framework/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ def gen_paths():
#The log file index is a 4-digit number corresponding to an unused log folder
index = 0
#If our base log_root_dir exists:
if os.path.exists(log_dir):
if os.path.exists(log_root_dir):

#Get existing folders, convert to string list, and sort
folders = os.listdir(log_dir)
folders = os.listdir(log_root_dir)
ids = [int(f) for f in folders]
ids.sort()

#This algorithim determines the next sequential value for our log index, it scans through the existing numbers
#This algorithm determines the next sequential value for our log index, it scans through the existing numbers
#until either it finds a missing number in sequence, or runs out of numbers to scan.

#Set this to a high number to start with, as it will get set every loop iteration
Expand All @@ -66,7 +66,7 @@ def gen_paths():
index += 1

#Set the log_dir, which is the directory for storing all logs during this run session
log_dir = os.path.join(log_dir, str(index).zfill(4))
log_dir = os.path.join(log_root_dir, str(index).zfill(4))

#Set the log_file, which is a dump of all console output
log_file = os.path.join(log_dir, "main.log")
Expand Down
10 changes: 5 additions & 5 deletions framework/module_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def load_module(name):
#Trigger a *subsystem*.load event for the rest of the system to hear, and get our newly-loaded module
#up-to-date on current events
events.start_event(subsystem + ".load", subsystem)
events.refresh_events(subsystem)
events.repeat_callbacks(subsystem)


def unload_module(subsystem):
Expand Down Expand Up @@ -245,7 +245,7 @@ def load(self, modname=None):
#Trigger a *subsystem*.load event for the rest of the system to hear, and get our newly-loaded module
#up-to-date on current events
events.start_event(self.subsystem + ".load", self.subsystem)
events.refresh_events(self.subsystem)
events.repeat_callbacks(self.subsystem)

#Yay, we must have been successfull!
success = True
Expand All @@ -260,7 +260,7 @@ def unload(self):
"""Unload the currently loaded module"""

#Remove all event callbacks and active events, triggering any inverse events
events.cleanup_events(self.subsystem)
events.remove_callbacks(self.subsystem)

#Let the rest of the system know that this module has been unloaded;
#stop the *subsystem*.load event and trigger the *subsystem*.unload event
Expand All @@ -273,7 +273,7 @@ def unload(self):
self.mod_loaded = False
logging.info("unloaded module " + self.subsystem)

def call_wrap(self, func):
def call_wrap(self, func, *args, **kwargs):
"""This function is responsible for running a module's function in a contained environment and handling any issues"""

#Grab a process id and increment the reference.
Expand All @@ -285,7 +285,7 @@ def call_wrap(self, func):

try:
#Run the function!
func()
func(*args, **kwargs)
except Exception as e:
#Something happened! Report an exception and try to replace_faulty
logging.error("Exception calling func " + func.__name__ + ": " + str(e) + "\n" + traceback.format_exc())
Expand Down
2 changes: 1 addition & 1 deletion framework/tests/resources/basic_events/testMod1.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ class Module:
def set_callback(self):
events.add_callback("test", self.subsystem, self.callback)

def callback(self):
def callback(self, task):
self.index += 1
2 changes: 1 addition & 1 deletion framework/tests/resources/state_events/testMod1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ def __init__(self):
def reset(self):
self.index = 1

def callback(self):
def callback(self, task):
self.index += 1
Loading

0 comments on commit 71dda44

Please sign in to comment.