From 4ae9b48c5e5ee8212418b62233a02046dda98f2b Mon Sep 17 00:00:00 2001 From: Tyler Romero Date: Mon, 9 Dec 2024 09:03:15 -0800 Subject: [PATCH] robustness improvements --- src/stream/main.py | 51 +++++++++++++++++++++++++++++++------------ src/stream/threads.py | 37 ++++++++++++++++++++++++++----- 2 files changed, 68 insertions(+), 20 deletions(-) diff --git a/src/stream/main.py b/src/stream/main.py index 82dd074..c0d28e8 100644 --- a/src/stream/main.py +++ b/src/stream/main.py @@ -11,7 +11,7 @@ import cv2 import yaml -from framegrab import FrameGrabber, MotionDetector +from framegrab import FrameGrabber, GrabError, MotionDetector from groundlight import Groundlight from stream.grabber import StreamType, framegrabber_factory @@ -44,7 +44,7 @@ def process_single_frame(frame: cv2.Mat, gl: Groundlight, detector: str) -> None: - """Process a single frame and send it to Groundlight + """Process a single frame and send it to Groundlight. Args: frame: OpenCV image frame to process @@ -87,17 +87,17 @@ def validate_stream_args(args: argparse.Namespace) -> tuple[str | int, StreamTyp return stream, stream_type -def parse_motion_args(args: argparse.Namespace) -> tuple[bool, float, float, float]: +def parse_motion_args(args: argparse.Namespace) -> tuple[bool, float, int, float, float]: """Parse and validate motion detection arguments""" if not args.motion: logger.info("Motion detection disabled.") - return False, 0, 0, 0 + return False, 0, 0, 0, 0 logger.info( - f"Motion detection enabled with threshold={args.threshold} and post-motion capture of {args.postmotion}s " + f"Motion detection enabled with pixel_threshold={args.motion_pixel_threshold}, value_threshold={args.motion_val_threshold} post-motion capture of {args.postmotion}s " f"and max interval of {args.maxinterval}s" ) - return True, args.threshold, args.postmotion, args.maxinterval + return True, args.motion_pixel_threshold, args.motion_val_threshold, args.postmotion, args.maxinterval def run_capture_loop( # noqa: PLR0912 PLR0913 @@ -121,8 +121,13 @@ def run_capture_loop( # noqa: PLR0912 PLR0913 while True: start = time.time() - frame = grabber.grab() - if frame is None: + try: + frame = grabber.grab() + except GrabError: + logger.exception("Error grabbing frame") + frame = None + + if frame is None: # No frame captured, or exception occurred logger.warning("No frame captured!") time.sleep(0.1) # Brief pause before retrying continue @@ -224,10 +229,17 @@ def main(): ) parser.add_argument( "-r", - "--threshold", + "--motion_pixel_threshold", type=float, default=1, - help="Motion detection threshold (%% pixels changed). Defaults to 1%%.", + help="Motion detection pixel threshold (%% pixels changed). Defaults to 1%%.", + ) + parser.add_argument( + "-b", + "--motion_val_threshold", + type=int, + default=20, + help="Motion detection value threshold (degree of change). Defaults to 20.", ) parser.add_argument( "-p", @@ -272,7 +284,9 @@ def main(): crop_region = parse_crop_string(args.crop) if args.crop else None stream, stream_type = validate_stream_args(args) - motion_detect, motion_threshold, post_motion_time, max_frame_interval = parse_motion_args(args) + motion_detect, motion_pixel_threshold, motion_val_threshold, post_motion_time, max_frame_interval = ( + parse_motion_args(args) + ) # Setup Groundlight client gl = Groundlight(endpoint=args.endpoint, api_token=args.token) @@ -294,7 +308,11 @@ def main(): queue, tc, workers = setup_workers(fn=_process_single_frame, num_workers=worker_count) # Setup motion detection if enabled - motion_detector = MotionDetector(pct_threshold=motion_threshold) if motion_detect else None + motion_detector = ( + MotionDetector(pct_threshold=motion_pixel_threshold, val_threshold=motion_val_threshold) + if motion_detect + else None + ) print_banner(gl=gl, args=args) @@ -311,8 +329,13 @@ def main(): ) except KeyboardInterrupt: logger.info("Exiting with KeyboardInterrupt.") - tc.force_exit() - sys.exit(-1) + except Exception as e: + logger.error(f"Exiting with exception: {e}", exc_info=True) + finally: + # Clean up threads + tc.shutdown() + for worker in workers: + worker.join(timeout=5.0) if __name__ == "__main__": diff --git a/src/stream/threads.py b/src/stream/threads.py index dfc19f9..34b6273 100644 --- a/src/stream/threads.py +++ b/src/stream/threads.py @@ -13,25 +13,40 @@ class ThreadControl: - """Controls graceful shutdown of worker threads""" + """Gracefully shutdown all worker threads + + Args: + timeout: Maximum time to wait for threads to finish + Returns: + bool: True if all threads completed, False if timeout occurred + """ def __init__(self): self.exit_all_threads = False - def force_exit(self): + def shutdown(self) -> bool: logger.debug("Attempting force exit of all threads") self.exit_all_threads = True + return True + +def setup_workers( + fn: Callable, num_workers: int = 10, daemon: bool = True +) -> tuple[Queue, ThreadControl, list[Thread]]: + """Setup worker threads and queues -def setup_workers(fn: Callable, num_workers: int = 10) -> tuple[Queue, ThreadControl, list[Thread]]: - """Setup worker threads and queues""" + Args: + fn: Function to process work items + num_workers: Number of worker threads + daemon: If True, threads will be daemon threads that exit when main thread exits + """ q = Queue() tc = ThreadControl() workers = [] for _ in range(num_workers): - thread = Thread(target=worker_loop, kwargs=dict(q=q, control=tc, fn=fn)) + thread = Thread(target=worker_loop, kwargs=dict(q=q, control=tc, fn=fn), daemon=daemon) workers.append(thread) thread.start() @@ -49,8 +64,18 @@ def worker_loop(q: Queue, control: ThreadControl, fn: Callable): while not control.exit_all_threads: try: work = q.get(timeout=1) # Timeout prevents orphaned threads - fn(work) + + try: + fn(work) + except Exception as e: + logger.error(f"Error processing work item: {e}", exc_info=True) + finally: + q.task_done() # Signal completion even if there was an error + except Empty: continue + except Exception as e: + logger.error(f"Critical error in worker thread: {e}", exc_info=True) + break logger.debug("exiting worker thread.")