Skip to content

Commit

Permalink
robustness improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler-romero committed Dec 9, 2024
1 parent 630add9 commit 4ae9b48
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 20 deletions.
51 changes: 37 additions & 14 deletions src/stream/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import cv2
import yaml
from framegrab import FrameGrabber, MotionDetector
from framegrab import FrameGrabber, GrabError, MotionDetector
from groundlight import Groundlight

from stream.grabber import StreamType, framegrabber_factory
Expand Down Expand Up @@ -44,7 +44,7 @@


def process_single_frame(frame: cv2.Mat, gl: Groundlight, detector: str) -> None:
"""Process a single frame and send it to Groundlight
"""Process a single frame and send it to Groundlight.
Args:
frame: OpenCV image frame to process
Expand Down Expand Up @@ -87,17 +87,17 @@ def validate_stream_args(args: argparse.Namespace) -> tuple[str | int, StreamTyp
return stream, stream_type


def parse_motion_args(args: argparse.Namespace) -> tuple[bool, float, float, float]:
def parse_motion_args(args: argparse.Namespace) -> tuple[bool, float, int, float, float]:
"""Parse and validate motion detection arguments"""
if not args.motion:
logger.info("Motion detection disabled.")
return False, 0, 0, 0
return False, 0, 0, 0, 0

logger.info(
f"Motion detection enabled with threshold={args.threshold} and post-motion capture of {args.postmotion}s "
f"Motion detection enabled with pixel_threshold={args.motion_pixel_threshold}, value_threshold={args.motion_val_threshold} post-motion capture of {args.postmotion}s "
f"and max interval of {args.maxinterval}s"
)
return True, args.threshold, args.postmotion, args.maxinterval
return True, args.motion_pixel_threshold, args.motion_val_threshold, args.postmotion, args.maxinterval


def run_capture_loop( # noqa: PLR0912 PLR0913
Expand All @@ -121,8 +121,13 @@ def run_capture_loop( # noqa: PLR0912 PLR0913

while True:
start = time.time()
frame = grabber.grab()
if frame is None:
try:
frame = grabber.grab()
except GrabError:
logger.exception("Error grabbing frame")
frame = None

if frame is None: # No frame captured, or exception occurred
logger.warning("No frame captured!")
time.sleep(0.1) # Brief pause before retrying
continue
Expand Down Expand Up @@ -224,10 +229,17 @@ def main():
)
parser.add_argument(
"-r",
"--threshold",
"--motion_pixel_threshold",
type=float,
default=1,
help="Motion detection threshold (%% pixels changed). Defaults to 1%%.",
help="Motion detection pixel threshold (%% pixels changed). Defaults to 1%%.",
)
parser.add_argument(
"-b",
"--motion_val_threshold",
type=int,
default=20,
help="Motion detection value threshold (degree of change). Defaults to 20.",
)
parser.add_argument(
"-p",
Expand Down Expand Up @@ -272,7 +284,9 @@ def main():

crop_region = parse_crop_string(args.crop) if args.crop else None
stream, stream_type = validate_stream_args(args)
motion_detect, motion_threshold, post_motion_time, max_frame_interval = parse_motion_args(args)
motion_detect, motion_pixel_threshold, motion_val_threshold, post_motion_time, max_frame_interval = (
parse_motion_args(args)
)

# Setup Groundlight client
gl = Groundlight(endpoint=args.endpoint, api_token=args.token)
Expand All @@ -294,7 +308,11 @@ def main():
queue, tc, workers = setup_workers(fn=_process_single_frame, num_workers=worker_count)

# Setup motion detection if enabled
motion_detector = MotionDetector(pct_threshold=motion_threshold) if motion_detect else None
motion_detector = (
MotionDetector(pct_threshold=motion_pixel_threshold, val_threshold=motion_val_threshold)
if motion_detect
else None
)

print_banner(gl=gl, args=args)

Expand All @@ -311,8 +329,13 @@ def main():
)
except KeyboardInterrupt:
logger.info("Exiting with KeyboardInterrupt.")
tc.force_exit()
sys.exit(-1)
except Exception as e:
logger.error(f"Exiting with exception: {e}", exc_info=True)
finally:
# Clean up threads
tc.shutdown()
for worker in workers:
worker.join(timeout=5.0)


if __name__ == "__main__":
Expand Down
37 changes: 31 additions & 6 deletions src/stream/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,40 @@


class ThreadControl:
"""Controls graceful shutdown of worker threads"""
"""Gracefully shutdown all worker threads
Args:
timeout: Maximum time to wait for threads to finish
Returns:
bool: True if all threads completed, False if timeout occurred
"""

def __init__(self):
self.exit_all_threads = False

def force_exit(self):
def shutdown(self) -> bool:
logger.debug("Attempting force exit of all threads")
self.exit_all_threads = True
return True


def setup_workers(
fn: Callable, num_workers: int = 10, daemon: bool = True
) -> tuple[Queue, ThreadControl, list[Thread]]:
"""Setup worker threads and queues
def setup_workers(fn: Callable, num_workers: int = 10) -> tuple[Queue, ThreadControl, list[Thread]]:
"""Setup worker threads and queues"""
Args:
fn: Function to process work items
num_workers: Number of worker threads
daemon: If True, threads will be daemon threads that exit when main thread exits
"""

q = Queue()
tc = ThreadControl()
workers = []

for _ in range(num_workers):
thread = Thread(target=worker_loop, kwargs=dict(q=q, control=tc, fn=fn))
thread = Thread(target=worker_loop, kwargs=dict(q=q, control=tc, fn=fn), daemon=daemon)
workers.append(thread)
thread.start()

Expand All @@ -49,8 +64,18 @@ def worker_loop(q: Queue, control: ThreadControl, fn: Callable):
while not control.exit_all_threads:
try:
work = q.get(timeout=1) # Timeout prevents orphaned threads
fn(work)

try:
fn(work)
except Exception as e:
logger.error(f"Error processing work item: {e}", exc_info=True)
finally:
q.task_done() # Signal completion even if there was an error

except Empty:
continue
except Exception as e:
logger.error(f"Critical error in worker thread: {e}", exc_info=True)
break

logger.debug("exiting worker thread.")

0 comments on commit 4ae9b48

Please sign in to comment.