diff --git a/docs/docs/configuration/license_plate_recognition.md b/docs/docs/configuration/license_plate_recognition.md index 4fd7aa568d..103c3bf14e 100644 --- a/docs/docs/configuration/license_plate_recognition.md +++ b/docs/docs/configuration/license_plate_recognition.md @@ -41,6 +41,8 @@ lpr: Ensure that your camera is configured to detect objects of type `car`, and that a car is actually being detected by Frigate. Otherwise, LPR will not run. +Like the other real-time processors in Frigate, license plate recognition runs on the camera stream defined by the `detect` role in your config. To ensure optimal performance, select a suitable resolution for this stream in your camera's firmware that fits your specific scene and requirements. + ## Advanced Configuration Fine-tune the LPR feature using these optional parameters: @@ -52,7 +54,7 @@ Fine-tune the LPR feature using these optional parameters: - Note: If you are using a Frigate+ model and you set the `threshold` in your objects config for `license_plate` higher than this value, recognition will never run. It's best to ensure these values match, or this `detection_threshold` is lower than your object config `threshold`. - **`min_area`**: Defines the minimum size (in pixels) a license plate must be before recognition runs. - Default: `1000` pixels. - - Depending on the resolution of your cameras, you can increase this value to ignore small or distant plates. + - Depending on the resolution of your camera's `detect` stream, you can increase this value to ignore small or distant plates. ### Recognition @@ -114,7 +116,7 @@ lpr: Ensure that: - Your camera has a clear, well-lit view of the plate. -- The plate is large enough in the image (try adjusting `min_area`). +- The plate is large enough in the image (try adjusting `min_area`) or increasing the resolution of your camera's stream. - A `car` is detected first, as LPR only runs on recognized vehicles. If you are using a Frigate+ model or a custom model that detects license plates, ensure that `license_plate` is added to your list of objects to track. @@ -143,7 +145,7 @@ Use `match_distance` to allow small character mismatches. Alternatively, define - View MQTT messages for `frigate/events` to verify detected plates. - Adjust `detection_threshold` and `recognition_threshold` settings. - If you are using a Frigate+ model or a model that detects license plates, watch the debug view (Settings --> Debug) to ensure that `license_plate` is being detected with a `car`. -- Enable debug logs for LPR by adding `frigate.data_processing.real_time.license_plate_processor: debug` to your `logger` configuration. These logs are _very_ verbose, so only enable this when necessary. +- Enable debug logs for LPR by adding `frigate.data_processing.common.license_plate: debug` to your `logger` configuration. These logs are _very_ verbose, so only enable this when necessary. ### Will LPR slow down my system? diff --git a/frigate/api/classification.py b/frigate/api/classification.py index 7cd127d070..bd395737ab 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -9,10 +9,13 @@ from fastapi import APIRouter, Request, UploadFile from fastapi.responses import JSONResponse from pathvalidate import sanitize_filename +from peewee import DoesNotExist +from playhouse.shortcuts import model_to_dict from frigate.api.defs.tags import Tags from frigate.const import FACE_DIR from frigate.embeddings import EmbeddingsContext +from frigate.models import Event logger = logging.getLogger(__name__) @@ -176,3 +179,36 @@ def deregister_faces(request: Request, name: str, body: dict = None): content=({"success": True, "message": "Successfully deleted faces."}), status_code=200, ) + + +@router.put("/lpr/reprocess") +def reprocess_license_plate(request: Request, event_id: str): + if not request.app.frigate_config.lpr.enabled: + message = "License plate recognition is not enabled." + logger.error(message) + return JSONResponse( + content=( + { + "success": False, + "message": message, + } + ), + status_code=400, + ) + + try: + event = Event.get(Event.id == event_id) + except DoesNotExist: + message = f"Event {event_id} not found" + logger.error(message) + return JSONResponse( + content=({"success": False, "message": message}), status_code=404 + ) + + context: EmbeddingsContext = request.app.embeddings + response = context.reprocess_plate(model_to_dict(event)) + + return JSONResponse( + content=response, + status_code=200, + ) diff --git a/frigate/comms/embeddings_updater.py b/frigate/comms/embeddings_updater.py index 58f012e7d6..61c2331cf7 100644 --- a/frigate/comms/embeddings_updater.py +++ b/frigate/comms/embeddings_updater.py @@ -15,6 +15,7 @@ class EmbeddingsRequestEnum(Enum): generate_search = "generate_search" register_face = "register_face" reprocess_face = "reprocess_face" + reprocess_plate = "reprocess_plate" class EmbeddingsResponder: diff --git a/frigate/comms/recordings_updater.py b/frigate/comms/recordings_updater.py new file mode 100644 index 0000000000..862ec10413 --- /dev/null +++ b/frigate/comms/recordings_updater.py @@ -0,0 +1,36 @@ +"""Facilitates communication between processes.""" + +import logging +from enum import Enum + +from .zmq_proxy import Publisher, Subscriber + +logger = logging.getLogger(__name__) + + +class RecordingsDataTypeEnum(str, Enum): + all = "" + recordings_available_through = "recordings_available_through" + + +class RecordingsDataPublisher(Publisher): + """Publishes latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) + + def publish(self, payload: tuple[str, float]) -> None: + super().publish(payload) + + +class RecordingsDataSubscriber(Subscriber): + """Receives latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) diff --git a/frigate/data_processing/real_time/license_plate_processor.py b/frigate/data_processing/common/license_plate/mixin.py similarity index 93% rename from frigate/data_processing/real_time/license_plate_processor.py rename to frigate/data_processing/common/license_plate/mixin.py index bd74419285..1723d213e2 100644 --- a/frigate/data_processing/real_time/license_plate_processor.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -13,34 +13,21 @@ from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon -from frigate.comms.inter_process import InterProcessRequestor -from frigate.config import FrigateConfig from frigate.const import FRIGATE_LOCALHOST -from frigate.embeddings.onnx.lpr_embedding import ( - LicensePlateDetector, - PaddleOCRClassification, - PaddleOCRDetection, - PaddleOCRRecognition, -) from frigate.util.image import area -from ..types import DataProcessorMetrics -from .api import RealTimeProcessorApi - logger = logging.getLogger(__name__) WRITE_DEBUG_IMAGES = False -class LicensePlateProcessor(RealTimeProcessorApi): - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): - super().__init__(config, metrics) - self.requestor = InterProcessRequestor() - self.lpr_config = config.lpr +class LicensePlateProcessingMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.requires_license_plate_detection = ( "license_plate" not in self.config.objects.all_objects ) - self.detected_license_plates: dict[str, dict[str, any]] = {} self.ctc_decoder = CTCDecoder() @@ -52,42 +39,6 @@ def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): self.box_thresh = 0.8 self.mask_thresh = 0.8 - self.lpr_detection_model = None - self.lpr_classification_model = None - self.lpr_recognition_model = None - - if self.config.lpr.enabled: - self.detection_model = PaddleOCRDetection( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.classification_model = PaddleOCRClassification( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.recognition_model = PaddleOCRRecognition( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.yolov9_detection_model = LicensePlateDetector( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - if self.lpr_config.enabled: - # all models need to be loaded to run LPR - self.detection_model._load_model_and_utils() - self.classification_model._load_model_and_utils() - self.recognition_model._load_model_and_utils() - self.yolov9_detection_model._load_model_and_utils() - def _detect(self, image: np.ndarray) -> List[np.ndarray]: """ Detect possible license plates in the input image by first resizing and normalizing it, @@ -114,7 +65,7 @@ def _detect(self, image: np.ndarray) -> List[np.ndarray]: resized_image, ) - outputs = self.detection_model([normalized_image])[0] + outputs = self.model_runner.detection_model([normalized_image])[0] outputs = outputs[0, :, :] boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) @@ -143,7 +94,7 @@ def _classify( norm_img = norm_img[np.newaxis, :] norm_images.append(norm_img) - outputs = self.classification_model(norm_images) + outputs = self.model_runner.classification_model(norm_images) return self._process_classification_output(images, outputs) @@ -183,7 +134,7 @@ def _recognize( norm_image = norm_image[np.newaxis, :] norm_images.append(norm_image) - outputs = self.recognition_model(norm_images) + outputs = self.model_runner.recognition_model(norm_images) return self.ctc_decoder(outputs) def _process_license_plate( @@ -199,9 +150,9 @@ def _process_license_plate( Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. """ if ( - self.detection_model.runner is None - or self.classification_model.runner is None - or self.recognition_model.runner is None + self.model_runner.detection_model.runner is None + or self.model_runner.classification_model.runner is None + or self.model_runner.recognition_model.runner is None ): # we might still be downloading the models logger.debug("Model runners not loaded") @@ -665,7 +616,9 @@ def _preprocess_recognition_image( input_w = int(input_h * max_wh_ratio) # check for model-specific input width - model_input_w = self.recognition_model.runner.ort.get_inputs()[0].shape[3] + model_input_w = self.model_runner.recognition_model.runner.ort.get_inputs()[ + 0 + ].shape[3] if isinstance(model_input_w, int) and model_input_w > 0: input_w = model_input_w @@ -732,19 +685,13 @@ def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray: image = np.rot90(image, k=3) return image - def __update_metrics(self, duration: float) -> None: - """ - Update inference metrics. - """ - self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: """ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ Return the dimensions of the detected plate as [x1, y1, x2, y2]. """ - predictions = self.yolov9_detection_model(input) + predictions = self.model_runner.yolov9_detection_model(input) confidence_threshold = self.lpr_config.detection_threshold @@ -770,8 +717,8 @@ def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: # Return the top scoring bounding box if found if top_box is not None: - # expand box by 15% to help with OCR - expansion = (top_box[2:] - top_box[:2]) * 0.1 + # expand box by 30% to help with OCR + expansion = (top_box[2:] - top_box[:2]) * 0.30 # Expand box expanded_box = np.array( @@ -869,9 +816,8 @@ def _should_keep_previous_plate( # 5. Return True if we should keep the previous plate (i.e., if it scores higher) return prev_score > curr_score - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray): """Look for license plates in image.""" - start = datetime.datetime.now().timestamp() id = obj_data["id"] @@ -934,7 +880,7 @@ def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): # check that license plate is valid # double the value because we've doubled the size of the car - if license_plate_area < self.config.lpr.min_area * 2: + if license_plate_area < self.lpr_config.min_area * 2: logger.debug("License plate is less than min_area") return @@ -972,7 +918,7 @@ def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): # check that license plate is valid if ( not license_plate_box - or area(license_plate_box) < self.config.lpr.min_area + or area(license_plate_box) < self.lpr_config.min_area ): logger.debug(f"Invalid license plate box {license_plate}") return @@ -1078,10 +1024,9 @@ def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): "plate": top_plate, "char_confidences": top_char_confidences, "area": top_area, + "obj_data": obj_data, } - self.__update_metrics(datetime.datetime.now().timestamp() - start) - def handle_request(self, topic, request_data) -> dict[str, any] | None: return diff --git a/frigate/data_processing/common/license_plate/model.py b/frigate/data_processing/common/license_plate/model.py new file mode 100644 index 0000000000..25e7b2cafc --- /dev/null +++ b/frigate/data_processing/common/license_plate/model.py @@ -0,0 +1,31 @@ +from frigate.embeddings.onnx.lpr_embedding import ( + LicensePlateDetector, + PaddleOCRClassification, + PaddleOCRDetection, + PaddleOCRRecognition, +) + +from ...types import DataProcessorModelRunner + + +class LicensePlateModelRunner(DataProcessorModelRunner): + def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): + super().__init__(requestor, device, model_size) + self.detection_model = PaddleOCRDetection( + model_size=model_size, requestor=requestor, device=device + ) + self.classification_model = PaddleOCRClassification( + model_size=model_size, requestor=requestor, device=device + ) + self.recognition_model = PaddleOCRRecognition( + model_size=model_size, requestor=requestor, device=device + ) + self.yolov9_detection_model = LicensePlateDetector( + model_size=model_size, requestor=requestor, device=device + ) + + # Load all models once + self.detection_model._load_model_and_utils() + self.classification_model._load_model_and_utils() + self.recognition_model._load_model_and_utils() + self.yolov9_detection_model._load_model_and_utils() diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index 5c88221c2a..c40caef71c 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -5,16 +5,22 @@ from frigate.config import FrigateConfig -from ..types import DataProcessorMetrics, PostProcessDataEnum +from ..types import DataProcessorMetrics, DataProcessorModelRunner, PostProcessDataEnum logger = logging.getLogger(__name__) class PostProcessorApi(ABC): @abstractmethod - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: DataProcessorModelRunner, + ) -> None: self.config = config self.metrics = metrics + self.model_runner = model_runner pass @abstractmethod diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py new file mode 100644 index 0000000000..9a9974bc72 --- /dev/null +++ b/frigate/data_processing/post/license_plate.py @@ -0,0 +1,231 @@ +"""Handle post processing for license plate recognition.""" + +import datetime +import logging + +import cv2 +import numpy as np +from peewee import DoesNotExist + +from frigate.comms.embeddings_updater import EmbeddingsRequestEnum +from frigate.config import FrigateConfig +from frigate.data_processing.common.license_plate.mixin import ( + WRITE_DEBUG_IMAGES, + LicensePlateProcessingMixin, +) +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) +from frigate.data_processing.types import PostProcessDataEnum +from frigate.models import Recordings +from frigate.util.image import get_image_from_recording + +from ..types import DataProcessorMetrics +from .api import PostProcessorApi + +logger = logging.getLogger(__name__) + + +class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: LicensePlateModelRunner, + detected_license_plates: dict[str, dict[str, any]], + ): + self.detected_license_plates = detected_license_plates + self.model_runner = model_runner + self.lpr_config = config.lpr + self.config = config + super().__init__(config, metrics, model_runner) + + def __update_metrics(self, duration: float) -> None: + """ + Update inference metrics. + """ + self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 + + def process_data( + self, data: dict[str, any], data_type: PostProcessDataEnum + ) -> None: + """Look for license plates in recording stream image + Args: + data (dict): containing data about the input. + data_type (enum): Describing the data that is being processed. + + Returns: + None. + """ + start = datetime.datetime.now().timestamp() + + event_id = data["event_id"] + camera_name = data["camera"] + + if data_type == PostProcessDataEnum.recording: + obj_data = data["obj_data"] + frame_time = obj_data["frame_time"] + recordings_available_through = data["recordings_available"] + + if frame_time > recordings_available_through: + logger.debug( + f"LPR post processing: No recordings available for this frame time {frame_time}, available through {recordings_available_through}" + ) + + elif data_type == PostProcessDataEnum.tracked_object: + # non-functional, need to think about snapshot time + obj_data = data["event"]["data"] + obj_data["id"] = data["event"]["id"] + obj_data["camera"] = data["event"]["camera"] + # TODO: snapshot time? + frame_time = data["event"]["start_time"] + + else: + logger.error("No data type passed to LPR postprocessing") + return + + recording_query = ( + Recordings.select( + Recordings.path, + Recordings.start_time, + ) + .where( + ( + (frame_time >= Recordings.start_time) + & (frame_time <= Recordings.end_time) + ) + ) + .where(Recordings.camera == camera_name) + .order_by(Recordings.start_time.desc()) + .limit(1) + ) + + try: + recording: Recordings = recording_query.get() + time_in_segment = frame_time - recording.start_time + codec = "mjpeg" + + image_data = get_image_from_recording( + self.config.ffmpeg, recording.path, time_in_segment, codec, None + ) + + if not image_data: + logger.debug( + "LPR post processing: Unable to fetch license plate from recording" + ) + + # Convert bytes to numpy array + image_array = np.frombuffer(image_data, dtype=np.uint8) + + if len(image_array) == 0: + logger.debug("LPR post processing: No image") + return + + image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + + except DoesNotExist: + logger.debug("Error fetching license plate for postprocessing") + return + + if WRITE_DEBUG_IMAGES: + cv2.imwrite(f"debug/frames/lpr_post_{start}.jpg", image) + + # convert to yuv for processing + frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) + + detect_width = self.config.cameras[camera_name].detect.width + detect_height = self.config.cameras[camera_name].detect.height + + # Scale the boxes based on detect dimensions + scale_x = image.shape[1] / detect_width + scale_y = image.shape[0] / detect_height + + # Determine which box to enlarge based on detection mode + if self.requires_license_plate_detection: + # Scale and enlarge the car box + box = obj_data.get("box") + if not box: + return + + # Scale original car box to detection dimensions + left = int(box[0] * scale_x) + top = int(box[1] * scale_y) + right = int(box[2] * scale_x) + bottom = int(box[3] * scale_y) + box = [left, top, right, bottom] + else: + # Get the license plate box from attributes + if not obj_data.get("current_attributes"): + return + + license_plate = None + for attr in obj_data["current_attributes"]: + if attr.get("label") != "license_plate": + continue + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + if not license_plate or not license_plate.get("box"): + return + + # Scale license plate box to detection dimensions + orig_box = license_plate["box"] + left = int(orig_box[0] * scale_x) + top = int(orig_box[1] * scale_y) + right = int(orig_box[2] * scale_x) + bottom = int(orig_box[3] * scale_y) + box = [left, top, right, bottom] + + width_box = right - left + height_box = bottom - top + + # Enlarge box slightly to account for drift in detect vs recording stream + enlarge_factor = 0.3 + new_left = max(0, int(left - (width_box * enlarge_factor / 2))) + new_top = max(0, int(top - (height_box * enlarge_factor / 2))) + new_right = min(image.shape[1], int(right + (width_box * enlarge_factor / 2))) + new_bottom = min( + image.shape[0], int(bottom + (height_box * enlarge_factor / 2)) + ) + + keyframe_obj_data = obj_data.copy() + if self.requires_license_plate_detection: + # car box + keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] + else: + # Update the license plate box in the attributes + new_attributes = [] + for attr in obj_data["current_attributes"]: + if attr.get("label") == "license_plate": + new_attr = attr.copy() + new_attr["box"] = [new_left, new_top, new_right, new_bottom] + new_attributes.append(new_attr) + else: + new_attributes.append(attr) + keyframe_obj_data["current_attributes"] = new_attributes + + # run the frame through lpr processing + logger.debug(f"Post processing plate: {event_id}, {frame_time}") + self.lpr_process(keyframe_obj_data, frame) + + self.__update_metrics(datetime.datetime.now().timestamp() - start) + + def handle_request(self, topic, request_data) -> dict[str, any] | None: + if topic == EmbeddingsRequestEnum.reprocess_plate.value: + event = request_data["event"] + + self.process_data( + { + "event_id": event["id"], + "camera": event["camera"], + "event": event, + }, + PostProcessDataEnum.tracked_object, + ) + + return { + "message": "Successfully requested reprocessing of license plate.", + "success": True, + } diff --git a/frigate/data_processing/real_time/api.py b/frigate/data_processing/real_time/api.py index 205431a36c..cd8f3e493f 100644 --- a/frigate/data_processing/real_time/api.py +++ b/frigate/data_processing/real_time/api.py @@ -7,16 +7,22 @@ from frigate.config import FrigateConfig -from ..types import DataProcessorMetrics +from ..types import DataProcessorMetrics, DataProcessorModelRunner logger = logging.getLogger(__name__) class RealTimeProcessorApi(ABC): @abstractmethod - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: DataProcessorModelRunner, + ) -> None: self.config = config self.metrics = metrics + self.model_runner = model_runner pass @abstractmethod diff --git a/frigate/data_processing/real_time/bird_processor.py b/frigate/data_processing/real_time/bird.py similarity index 99% rename from frigate/data_processing/real_time/bird_processor.py rename to frigate/data_processing/real_time/bird.py index 1199f61249..01490d8954 100644 --- a/frigate/data_processing/real_time/bird_processor.py +++ b/frigate/data_processing/real_time/bird.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -class BirdProcessor(RealTimeProcessorApi): +class BirdRealTimeProcessor(RealTimeProcessorApi): def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): super().__init__(config, metrics) self.interpreter: Interpreter = None diff --git a/frigate/data_processing/real_time/face_processor.py b/frigate/data_processing/real_time/face.py similarity index 99% rename from frigate/data_processing/real_time/face_processor.py rename to frigate/data_processing/real_time/face.py index 086c596586..d2b6776537 100644 --- a/frigate/data_processing/real_time/face_processor.py +++ b/frigate/data_processing/real_time/face.py @@ -27,7 +27,7 @@ MIN_MATCHING_FACES = 2 -class FaceProcessor(RealTimeProcessorApi): +class FaceRealTimeProcessor(RealTimeProcessorApi): def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): super().__init__(config, metrics) self.face_config = config.face_recognition diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py new file mode 100644 index 0000000000..a5a1577fef --- /dev/null +++ b/frigate/data_processing/real_time/license_plate.py @@ -0,0 +1,53 @@ +"""Handle processing images for face detection and recognition.""" + +import datetime +import logging + +import numpy as np + +from frigate.config import FrigateConfig +from frigate.data_processing.common.license_plate.mixin import ( + LicensePlateProcessingMixin, +) +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) + +from ..types import DataProcessorMetrics +from .api import RealTimeProcessorApi + +logger = logging.getLogger(__name__) + + +class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcessorApi): + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: LicensePlateModelRunner, + detected_license_plates: dict[str, dict[str, any]], + ): + self.detected_license_plates = detected_license_plates + self.model_runner = model_runner + self.lpr_config = config.lpr + self.config = config + super().__init__(config, metrics, model_runner) + + def __update_metrics(self, duration: float) -> None: + """ + Update inference metrics. + """ + self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 + + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): + """Look for license plates in image.""" + start = datetime.datetime.now().timestamp() + self.lpr_process(obj_data, frame) + self.__update_metrics(datetime.datetime.now().timestamp() - start) + + def handle_request(self, topic, request_data) -> dict[str, any] | None: + return + + def expire_object(self, object_id: str): + if object_id in self.detected_license_plates: + self.detected_license_plates.pop(object_id) diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 39f355667b..6f87f77f9c 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -18,6 +18,13 @@ def __init__(self): self.alpr_pps = mp.Value("d", 0.01) +class DataProcessorModelRunner: + def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): + self.requestor = requestor + self.device = device + self.model_size = model_size + + class PostProcessDataEnum(str, Enum): recording = "recording" review = "review" diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 185d5436b6..18673c4e92 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -17,7 +17,7 @@ from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event +from frigate.models import Event, Recordings from frigate.util.builtin import serialize from frigate.util.services import listen @@ -55,7 +55,7 @@ def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), load_vec_extension=True, ) - models = [Event] + models = [Event, Recordings] db.bind(models) maintainer = EmbeddingMaintainer( @@ -234,3 +234,8 @@ def update_description(self, event_id: str, description: str) -> None: EmbeddingsRequestEnum.embed_description.value, {"id": event_id, "description": description}, ) + + def reprocess_plate(self, event: dict[str, any]) -> dict[str, any]: + return self.requestor.send_data( + EmbeddingsRequestEnum.reprocess_plate.value, {"event": event} + ) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 7925345b22..a18ca7a7f3 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -20,18 +20,29 @@ ) from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataSubscriber, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig from frigate.const import ( CLIPS_DIR, UPDATE_EVENT_DESCRIPTION, ) +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) +from frigate.data_processing.post.api import PostProcessorApi +from frigate.data_processing.post.license_plate import ( + LicensePlatePostProcessor, +) from frigate.data_processing.real_time.api import RealTimeProcessorApi -from frigate.data_processing.real_time.bird_processor import BirdProcessor -from frigate.data_processing.real_time.face_processor import FaceProcessor -from frigate.data_processing.real_time.license_plate_processor import ( - LicensePlateProcessor, +from frigate.data_processing.real_time.bird import BirdRealTimeProcessor +from frigate.data_processing.real_time.face import FaceRealTimeProcessor +from frigate.data_processing.real_time.license_plate import ( + LicensePlateRealTimeProcessor, ) -from frigate.data_processing.types import DataProcessorMetrics +from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event @@ -66,40 +77,71 @@ def __init__( if config.semantic_search.reindex: self.embeddings.reindex() + # create communication for updating event descriptions + self.requestor = InterProcessRequestor() + self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() self.event_metadata_subscriber = EventMetadataSubscriber( EventMetadataTypeEnum.regenerate_description ) + self.recordings_subscriber = RecordingsDataSubscriber( + RecordingsDataTypeEnum.recordings_available_through + ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() - self.processors: list[RealTimeProcessorApi] = [] + + self.detected_license_plates: dict[str, dict[str, any]] = {} + + # model runners to share between realtime and post processors + if self.config.lpr.enabled: + lpr_model_runner = LicensePlateModelRunner(self.requestor) + + # realtime processors + self.realtime_processors: list[RealTimeProcessorApi] = [] if self.config.face_recognition.enabled: - self.processors.append(FaceProcessor(self.config, metrics)) + self.realtime_processors.append(FaceRealTimeProcessor(self.config, metrics)) if self.config.classification.bird.enabled: - self.processors.append(BirdProcessor(self.config, metrics)) + self.realtime_processors.append(BirdRealTimeProcessor(self.config, metrics)) if self.config.lpr.enabled: - self.processors.append(LicensePlateProcessor(self.config, metrics)) + self.realtime_processors.append( + LicensePlateRealTimeProcessor( + self.config, metrics, lpr_model_runner, self.detected_license_plates + ) + ) + + # post processors + self.post_processors: list[PostProcessorApi] = [] + + if self.config.lpr.enabled: + self.post_processors.append( + LicensePlatePostProcessor( + self.config, metrics, lpr_model_runner, self.detected_license_plates + ) + ) - # create communication for updating event descriptions - self.requestor = InterProcessRequestor() self.stop_event = stop_event self.tracked_events: dict[str, list[any]] = {} self.genai_client = get_genai_client(config) + # recordings data + self.recordings_available_through: dict[str, float] = {} + def run(self) -> None: """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): self._process_requests() self._process_updates() + self._process_recordings_updates() self._process_finalized() self._process_event_metadata() self.event_subscriber.stop() self.event_end_subscriber.stop() + self.recordings_subscriber.stop() self.event_metadata_subscriber.stop() self.embeddings_responder.stop() self.requestor.stop() @@ -129,13 +171,15 @@ def _handle_request(topic: str, data: dict[str, any]) -> str: pack=False, ) else: - for processor in self.processors: - resp = processor.handle_request(topic, data) + processors = [self.realtime_processors, self.post_processors] + for processor_list in processors: + for processor in processor_list: + resp = processor.handle_request(topic, data) if resp is not None: return resp except Exception as e: - logger.error(f"Unable to handle embeddings request {e}") + logger.error(f"Unable to handle embeddings request {e}", exc_info=True) self.embeddings_responder.check_for_request(_handle_request) @@ -154,7 +198,7 @@ def _process_updates(self) -> None: camera_config = self.config.cameras[camera] # no need to process updated objects if face recognition, lpr, genai are disabled - if not camera_config.genai.enabled and len(self.processors) == 0: + if not camera_config.genai.enabled and len(self.realtime_processors) == 0: return # Create our own thumbnail based on the bounding box and the frame time @@ -171,7 +215,7 @@ def _process_updates(self) -> None: ) return - for processor in self.processors: + for processor in self.realtime_processors: processor.process_frame(data, yuv_frame) # no need to save our own thumbnails if genai is not enabled @@ -202,7 +246,32 @@ def _process_finalized(self) -> None: event_id, camera, updated_db = ended camera_config = self.config.cameras[camera] - for processor in self.processors: + # call any defined post processors + for processor in self.post_processors: + if isinstance(processor, LicensePlatePostProcessor): + recordings_available = self.recordings_available_through.get(camera) + if ( + recordings_available is not None + and event_id in self.detected_license_plates + ): + processor.process_data( + { + "event_id": event_id, + "camera": camera, + "recordings_available": self.recordings_available_through[ + camera + ], + "obj_data": self.detected_license_plates[event_id][ + "obj_data" + ], + }, + PostProcessDataEnum.recording, + ) + else: + processor.process_data(event_id, PostProcessDataEnum.event_id) + + # expire in realtime processors + for processor in self.realtime_processors: processor.expire_object(event_id) if updated_db: @@ -315,6 +384,24 @@ def _process_finalized(self) -> None: if event_id in self.tracked_events: del self.tracked_events[event_id] + def _process_recordings_updates(self) -> None: + """Process recordings updates.""" + while True: + recordings_data = self.recordings_subscriber.check_for_update(timeout=0.01) + + if recordings_data == None: + break + + camera, recordings_available_through_timestamp = recordings_data + + self.recordings_available_through[camera] = ( + recordings_available_through_timestamp + ) + + logger.debug( + f"{camera} now has recordings available through {recordings_available_through_timestamp}" + ) + def _process_event_metadata(self): # Check for regenerate description requests (topic, event_id, source) = self.event_metadata_subscriber.check_for_update( diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index a4c23763db..faa41f75fd 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -19,6 +19,10 @@ from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataPublisher, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( CACHE_DIR, @@ -70,6 +74,9 @@ def __init__(self, config: FrigateConfig, stop_event: MpEvent): self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber("config/record/") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + self.recordings_publisher = RecordingsDataPublisher( + RecordingsDataTypeEnum.recordings_available_through + ) self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) @@ -213,6 +220,16 @@ async def move_files(self) -> None: [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) + # publish most recently available recording time and None if disabled + self.recordings_publisher.publish( + ( + camera, + recordings[0]["start_time"].timestamp() + if self.config.cameras[camera].record.enabled + else None, + ) + ) + recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) # fire and forget recordings entries @@ -582,4 +599,5 @@ def run(self) -> None: self.requestor.stop() self.config_subscriber.stop() self.detection_subscriber.stop() + self.recordings_publisher.stop() logger.info("Exiting recording maintenance...")