From c7c6a40745f0de0e6b92b9bebaacedb04c7b0132 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Wed, 19 Feb 2025 08:19:56 -0600 Subject: [PATCH 01/17] recordings data pub/sub --- frigate/comms/recordings_updater.py | 36 +++++++++++++++++++++++++++++ frigate/embeddings/maintainer.py | 29 +++++++++++++++++++++++ frigate/record/maintainer.py | 13 +++++++++++ 3 files changed, 78 insertions(+) create mode 100644 frigate/comms/recordings_updater.py diff --git a/frigate/comms/recordings_updater.py b/frigate/comms/recordings_updater.py new file mode 100644 index 0000000000..862ec10413 --- /dev/null +++ b/frigate/comms/recordings_updater.py @@ -0,0 +1,36 @@ +"""Facilitates communication between processes.""" + +import logging +from enum import Enum + +from .zmq_proxy import Publisher, Subscriber + +logger = logging.getLogger(__name__) + + +class RecordingsDataTypeEnum(str, Enum): + all = "" + recordings_available_through = "recordings_available_through" + + +class RecordingsDataPublisher(Publisher): + """Publishes latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) + + def publish(self, payload: tuple[str, float]) -> None: + super().publish(payload) + + +class RecordingsDataSubscriber(Subscriber): + """Receives latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 7925345b22..3d906ab0ea 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -20,6 +20,10 @@ ) from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataSubscriber, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig from frigate.const import ( CLIPS_DIR, @@ -71,6 +75,9 @@ def __init__( self.event_metadata_subscriber = EventMetadataSubscriber( EventMetadataTypeEnum.regenerate_description ) + self.recordings_subscriber = RecordingsDataSubscriber( + RecordingsDataTypeEnum.recordings_available_through + ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() self.processors: list[RealTimeProcessorApi] = [] @@ -90,6 +97,9 @@ def __init__( self.tracked_events: dict[str, list[any]] = {} self.genai_client = get_genai_client(config) + # recordings data + self.recordings_available_through: dict[str, float] = {} + def run(self) -> None: """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): @@ -100,6 +110,7 @@ def run(self) -> None: self.event_subscriber.stop() self.event_end_subscriber.stop() + self.recordings_subscriber.stop() self.event_metadata_subscriber.stop() self.embeddings_responder.stop() self.requestor.stop() @@ -315,6 +326,24 @@ def _process_finalized(self) -> None: if event_id in self.tracked_events: del self.tracked_events[event_id] + def _process_recordings_updates(self) -> None: + """Process recordings updates.""" + while True: + recordings_data = self.recordings_subscriber.check_for_update(timeout=0.01) + + if recordings_data == None: + break + + camera, recordings_available_through_timestamp = recordings_data + + self.recordings_available_through[camera] = ( + recordings_available_through_timestamp + ) + + logger.debug( + f"{camera} now has recordings available through {recordings_available_through_timestamp}" + ) + def _process_event_metadata(self): # Check for regenerate description requests (topic, event_id, source) = self.event_metadata_subscriber.check_for_update( diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index a4c23763db..55b73df1e1 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -19,6 +19,10 @@ from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataPublisher, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( CACHE_DIR, @@ -70,6 +74,9 @@ def __init__(self, config: FrigateConfig, stop_event: MpEvent): self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber("config/record/") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + self.recordings_publisher = RecordingsDataPublisher( + RecordingsDataTypeEnum.recordings_available_through + ) self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) @@ -213,6 +220,11 @@ async def move_files(self) -> None: [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) + # TODO: this is not correct + self.recordings_publisher.publish( + (camera, recordings[0]["start_time"].timestamp()) + ) + recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) # fire and forget recordings entries @@ -582,4 +594,5 @@ def run(self) -> None: self.requestor.stop() self.config_subscriber.stop() self.detection_subscriber.stop() + self.recordings_publisher.stop() logger.info("Exiting recording maintenance...") From 0c09ee53f65a4baeac175e6c39a8b870660451d4 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 09:49:21 -0600 Subject: [PATCH 02/17] function to process recording stream frames --- .../real_time/license_plate_processor.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/frigate/data_processing/real_time/license_plate_processor.py b/frigate/data_processing/real_time/license_plate_processor.py index bd74419285..03a8eda53d 100644 --- a/frigate/data_processing/real_time/license_plate_processor.py +++ b/frigate/data_processing/real_time/license_plate_processor.py @@ -869,6 +869,124 @@ def _should_keep_previous_plate( # 5. Return True if we should keep the previous plate (i.e., if it scores higher) return prev_score > curr_score + def process_keyframe_lpr(self, obj_data: dict[str, any]) -> None: + """ + Runs LPR on an enlarged region of the latest keyframe for the given object. + Called after the main process_frame as a backup check. + Args: + obj_data: Object data dictionary containing camera, id, and box coordinates + """ + if ( + obj_data.get("label") != "car" + or obj_data.get("stationary") == True + or ( + obj_data.get("sub_label") + and obj_data["id"] not in self.detected_license_plates + ) + or obj_data.get("is_keyframe_check") + ): + return + + camera = obj_data.get("camera") + if not camera: + return + + yuv_height, yuv_width = self.config.cameras[camera].frame_shape_yuv + detect_width = self.config.cameras[camera].detect.width + detect_height = self.config.cameras[camera].detect.height + + result = get_latest_keyframe_yuv420(camera) + if result is None: + logger.debug(f"No keyframe available for camera {camera}") + return + + keyframe, timestamp = result + + # Resize keyframe to match frame_shape_yuv dimensions + keyframe_resized = cv2.resize(keyframe, (yuv_width, yuv_height)) + + # Scale the boxes based on detect dimensions + scale_x = detect_width / keyframe.shape[1] + scale_y = detect_height / keyframe.shape[0] + + # Determine which box to enlarge based on detection mode + if self.requires_license_plate_detection: + # Scale and enlarge the car box + box = obj_data.get("box") + if not box: + return + + # Scale original box to detection dimensions + left = int(box[0] * scale_x) + top = int(box[1] * scale_y) + right = int(box[2] * scale_x) + bottom = int(box[3] * scale_y) + box = [left, top, right, bottom] + else: + # Get the license plate box from attributes + if not obj_data.get("current_attributes"): + return + + license_plate = None + for attr in obj_data["current_attributes"]: + if attr.get("label") != "license_plate": + continue + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + if not license_plate or not license_plate.get("box"): + return + + # Scale license plate box to detection dimensions + orig_box = license_plate["box"] + left = int(orig_box[0] * scale_x) + top = int(orig_box[1] * scale_y) + right = int(orig_box[2] * scale_x) + bottom = int(orig_box[3] * scale_y) + box = [left, top, right, bottom] + + width_box = right - left + height_box = bottom - top + + # Enlarge box by 30% + enlarge_factor = 0.3 + new_left = max(0, int(left - (width_box * enlarge_factor / 2))) + new_top = max(0, int(top - (height_box * enlarge_factor / 2))) + new_right = min(detect_width, int(right + (width_box * enlarge_factor / 2))) + new_bottom = min(detect_height, int(bottom + (height_box * enlarge_factor / 2))) + + keyframe_obj_data = obj_data.copy() + if self.requires_license_plate_detection: + keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] + else: + # Update the license plate box in the attributes + new_attributes = [] + for attr in obj_data["current_attributes"]: + if attr.get("label") == "license_plate": + new_attr = attr.copy() + new_attr["box"] = [new_left, new_top, new_right, new_bottom] + new_attributes.append(new_attr) + else: + new_attributes.append(attr) + keyframe_obj_data["current_attributes"] = new_attributes + + keyframe_obj_data["frame_time"] = timestamp + + # Add a flag to prevent infinite recursion + keyframe_obj_data["is_keyframe_check"] = True + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + rgb = cv2.cvtColor(keyframe_resized, cv2.COLOR_YUV2BGR_I420) + cv2.imwrite( + f"debug/frames/keyframe_resized_{current_time}.jpg", + rgb, + ) + + self.process_frame(keyframe_obj_data, keyframe_resized) + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): """Look for license plates in image.""" start = datetime.datetime.now().timestamp() @@ -1078,6 +1196,7 @@ def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): "plate": top_plate, "char_confidences": top_char_confidences, "area": top_area, + "frame_time": obj_data["frame_time"], } self.__update_metrics(datetime.datetime.now().timestamp() - start) From bed1d6436480d7c20a016ac92e99dea4d8002374 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 11:48:53 -0600 Subject: [PATCH 03/17] model runner --- frigate/data_processing/post/api.py | 10 ++++++++-- frigate/data_processing/real_time/api.py | 10 ++++++++-- frigate/data_processing/types.py | 7 +++++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index 5c88221c2a..c40caef71c 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -5,16 +5,22 @@ from frigate.config import FrigateConfig -from ..types import DataProcessorMetrics, PostProcessDataEnum +from ..types import DataProcessorMetrics, DataProcessorModelRunner, PostProcessDataEnum logger = logging.getLogger(__name__) class PostProcessorApi(ABC): @abstractmethod - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: DataProcessorModelRunner, + ) -> None: self.config = config self.metrics = metrics + self.model_runner = model_runner pass @abstractmethod diff --git a/frigate/data_processing/real_time/api.py b/frigate/data_processing/real_time/api.py index 205431a36c..cd8f3e493f 100644 --- a/frigate/data_processing/real_time/api.py +++ b/frigate/data_processing/real_time/api.py @@ -7,16 +7,22 @@ from frigate.config import FrigateConfig -from ..types import DataProcessorMetrics +from ..types import DataProcessorMetrics, DataProcessorModelRunner logger = logging.getLogger(__name__) class RealTimeProcessorApi(ABC): @abstractmethod - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None: + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: DataProcessorModelRunner, + ) -> None: self.config = config self.metrics = metrics + self.model_runner = model_runner pass @abstractmethod diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 39f355667b..6f87f77f9c 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -18,6 +18,13 @@ def __init__(self): self.alpr_pps = mp.Value("d", 0.01) +class DataProcessorModelRunner: + def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): + self.requestor = requestor + self.device = device + self.model_size = model_size + + class PostProcessDataEnum(str, Enum): recording = "recording" review = "review" From 5aeaf4bede63c70023a89e162e143b7c69b147dc Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 11:49:42 -0600 Subject: [PATCH 04/17] lpr model runner --- .../common/license_plate_model.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 frigate/data_processing/common/license_plate_model.py diff --git a/frigate/data_processing/common/license_plate_model.py b/frigate/data_processing/common/license_plate_model.py new file mode 100644 index 0000000000..d3e35d3c54 --- /dev/null +++ b/frigate/data_processing/common/license_plate_model.py @@ -0,0 +1,31 @@ +from frigate.embeddings.onnx.lpr_embedding import ( + LicensePlateDetector, + PaddleOCRClassification, + PaddleOCRDetection, + PaddleOCRRecognition, +) + +from ..types import DataProcessorModelRunner + + +class LicensePlateModelRunner(DataProcessorModelRunner): + def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): + super().__init__(requestor, device, model_size) + self.detection_model = PaddleOCRDetection( + model_size=model_size, requestor=requestor, device=device + ) + self.classification_model = PaddleOCRClassification( + model_size=model_size, requestor=requestor, device=device + ) + self.recognition_model = PaddleOCRRecognition( + model_size=model_size, requestor=requestor, device=device + ) + self.yolov9_detection_model = LicensePlateDetector( + model_size=model_size, requestor=requestor, device=device + ) + + # Load all models once + self.detection_model._load_model_and_utils() + self.classification_model._load_model_and_utils() + self.recognition_model._load_model_and_utils() + self.yolov9_detection_model._load_model_and_utils() From a6c7b6fd1c33c0cb1da28307282f3989076ef8a5 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 11:50:16 -0600 Subject: [PATCH 05/17] refactor to mixin class and use model runner --- .../data_processing/common/license_plate.py | 1194 +++++++++++++++ .../real_time/license_plate_processor.py | 1346 +---------------- frigate/embeddings/maintainer.py | 11 +- 3 files changed, 1216 insertions(+), 1335 deletions(-) create mode 100644 frigate/data_processing/common/license_plate.py diff --git a/frigate/data_processing/common/license_plate.py b/frigate/data_processing/common/license_plate.py new file mode 100644 index 0000000000..13e6dc8d5c --- /dev/null +++ b/frigate/data_processing/common/license_plate.py @@ -0,0 +1,1194 @@ +"""Handle processing images for face detection and recognition.""" + +import datetime +import logging +import math +import re +from typing import List, Optional, Tuple + +import cv2 +import numpy as np +import requests +from Levenshtein import distance +from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset +from shapely.geometry import Polygon + +from frigate.const import FRIGATE_LOCALHOST +from frigate.util.image import area + +logger = logging.getLogger(__name__) + +WRITE_DEBUG_IMAGES = False + + +class LicensePlateProcessingMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.requires_license_plate_detection = ( + "license_plate" not in self.config.objects.all_objects + ) + self.detected_license_plates: dict[str, dict[str, any]] = {} + + self.ctc_decoder = CTCDecoder() + + self.batch_size = 6 + + # Detection specific parameters + self.min_size = 3 + self.max_size = 960 + self.box_thresh = 0.8 + self.mask_thresh = 0.8 + + def _detect(self, image: np.ndarray) -> List[np.ndarray]: + """ + Detect possible license plates in the input image by first resizing and normalizing it, + running a detection model, and filtering out low-probability regions. + + Args: + image (np.ndarray): The input image in which license plates will be detected. + + Returns: + List[np.ndarray]: A list of bounding box coordinates representing detected license plates. + """ + h, w = image.shape[:2] + + if sum([h, w]) < 64: + image = self._zero_pad(image) + + resized_image = self._resize_image(image) + normalized_image = self._normalize_image(resized_image) + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/license_plate_resized_{current_time}.jpg", + resized_image, + ) + + outputs = self.model_runner.detection_model([normalized_image])[0] + outputs = outputs[0, :, :] + + boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) + return self._filter_polygon(boxes, (h, w)) + + def _classify( + self, images: List[np.ndarray] + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + """ + Classify the orientation or category of each detected license plate. + + Args: + images (List[np.ndarray]): A list of images of detected license plates. + + Returns: + Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images + and classification results with confidence scores. + """ + num_images = len(images) + indices = np.argsort([x.shape[1] / x.shape[0] for x in images]) + + for i in range(0, num_images, self.batch_size): + norm_images = [] + for j in range(i, min(num_images, i + self.batch_size)): + norm_img = self._preprocess_classification_image(images[indices[j]]) + norm_img = norm_img[np.newaxis, :] + norm_images.append(norm_img) + + outputs = self.model_runner.classification_model(norm_images) + + return self._process_classification_output(images, outputs) + + def _recognize( + self, images: List[np.ndarray] + ) -> Tuple[List[str], List[List[float]]]: + """ + Recognize the characters on the detected license plates using the recognition model. + + Args: + images (List[np.ndarray]): A list of images of license plates to recognize. + + Returns: + Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores. + """ + input_shape = [3, 48, 320] + num_images = len(images) + + # sort images by aspect ratio for processing + indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) + + for index in range(0, num_images, self.batch_size): + input_h, input_w = input_shape[1], input_shape[2] + max_wh_ratio = input_w / input_h + norm_images = [] + + # calculate the maximum aspect ratio in the current batch + for i in range(index, min(num_images, index + self.batch_size)): + h, w = images[indices[i]].shape[0:2] + max_wh_ratio = max(max_wh_ratio, w * 1.0 / h) + + # preprocess the images based on the max aspect ratio + for i in range(index, min(num_images, index + self.batch_size)): + norm_image = self._preprocess_recognition_image( + images[indices[i]], max_wh_ratio + ) + norm_image = norm_image[np.newaxis, :] + norm_images.append(norm_image) + + outputs = self.model_runner.recognition_model(norm_images) + return self.ctc_decoder(outputs) + + def _process_license_plate( + self, image: np.ndarray + ) -> Tuple[List[str], List[float], List[int]]: + """ + Complete pipeline for detecting, classifying, and recognizing license plates in the input image. + + Args: + image (np.ndarray): The input image in which to detect, classify, and recognize license plates. + + Returns: + Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. + """ + if ( + self.model_runner.detection_model.runner is None + or self.model_runner.classification_model.runner is None + or self.model_runner.recognition_model.runner is None + ): + # we might still be downloading the models + logger.debug("Model runners not loaded") + return [], [], [] + + plate_points = self._detect(image) + if len(plate_points) == 0: + logger.debug("No points found by OCR detector model") + return [], [], [] + + plate_points = self._sort_polygon(list(plate_points)) + plate_images = [self._crop_license_plate(image, x) for x in plate_points] + rotated_images, _ = self._classify(plate_images) + + # debug rotated and classification result + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + for i, img in enumerate(plate_images): + cv2.imwrite( + f"debug/frames/license_plate_rotated_{current_time}_{i + 1}.jpg", + img, + ) + for i, img in enumerate(rotated_images): + cv2.imwrite( + f"debug/frames/license_plate_classified_{current_time}_{i + 1}.jpg", + img, + ) + + # keep track of the index of each image for correct area calc later + sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) + reverse_mapping = { + idx: original_idx for original_idx, idx in enumerate(sorted_indices) + } + + results, confidences = self._recognize(rotated_images) + + if results: + license_plates = [""] * len(rotated_images) + average_confidences = [[0.0]] * len(rotated_images) + areas = [0] * len(rotated_images) + + # map results back to original image order + for i, (plate, conf) in enumerate(zip(results, confidences)): + original_idx = reverse_mapping[i] + + height, width = rotated_images[original_idx].shape[:2] + area = height * width + + average_confidence = conf + + # set to True to write each cropped image for debugging + if False: + save_image = cv2.cvtColor( + rotated_images[original_idx], cv2.COLOR_RGB2BGR + ) + filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg" + cv2.imwrite(filename, save_image) + + license_plates[original_idx] = plate + average_confidences[original_idx] = average_confidence + areas[original_idx] = area + + # Filter out plates that have a length of less than min_plate_length characters + # or that don't match the expected format (if defined) + # Sort by area, then by plate length, then by confidence all desc + filtered_data = [] + for plate, conf, area in zip(license_plates, average_confidences, areas): + if len(plate) < self.lpr_config.min_plate_length: + logger.debug( + f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" + ) + continue + + if self.lpr_config.format and not re.fullmatch( + self.lpr_config.format, plate + ): + logger.debug(f"Filtered out '{plate}' due to format mismatch") + continue + + filtered_data.append((plate, conf, area)) + + sorted_data = sorted( + filtered_data, + key=lambda x: (x[2], len(x[0]), x[1]), + reverse=True, + ) + + if sorted_data: + return map(list, zip(*sorted_data)) + + return [], [], [] + + def _resize_image(self, image: np.ndarray) -> np.ndarray: + """ + Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32. + + Args: + image (np.ndarray): The input image to resize. + + Returns: + np.ndarray: The resized image. + """ + h, w = image.shape[:2] + ratio = min(self.max_size / max(h, w), 1.0) + resize_h = max(int(round(int(h * ratio) / 32) * 32), 32) + resize_w = max(int(round(int(w * ratio) / 32) * 32), 32) + return cv2.resize(image, (resize_w, resize_h)) + + def _normalize_image(self, image: np.ndarray) -> np.ndarray: + """ + Normalize the input image by subtracting the mean and multiplying by the standard deviation. + + Args: + image (np.ndarray): The input image to normalize. + + Returns: + np.ndarray: The normalized image, transposed to match the model's expected input format. + """ + mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64") + std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype("float64") + + image = image.astype("float32") + cv2.subtract(image, mean, image) + cv2.multiply(image, std, image) + return image.transpose((2, 0, 1))[np.newaxis, ...] + + def _boxes_from_bitmap( + self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int + ) -> Tuple[np.ndarray, List[float]]: + """ + Process the binary mask to extract bounding boxes and associated confidence scores. + + Args: + output (np.ndarray): Output confidence map from the model. + mask (np.ndarray): Binary mask of detected regions. + dest_width (int): Target width for scaling the box coordinates. + dest_height (int): Target height for scaling the box coordinates. + + Returns: + Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores. + """ + + mask = (mask * 255).astype(np.uint8) + height, width = mask.shape + outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) + + # handle different return values of findContours between OpenCV versions + contours = outs[0] if len(outs) == 2 else outs[1] + + boxes = [] + scores = [] + + for index in range(len(contours)): + contour = contours[index] + + # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. + points, min_side = self._get_min_boxes(contour) + logger.debug(f"min side {index}, {min_side}") + + if min_side < self.min_size: + continue + + points = np.array(points) + + score = self._box_score(output, contour) + logger.debug(f"box score {index}, {score}") + if self.box_thresh > score: + continue + + polygon = Polygon(points) + distance = polygon.area / polygon.length + + # Use pyclipper to shrink the polygon slightly based on the computed distance. + offset = PyclipperOffset() + offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) + points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2)) + + # get the minimum bounding box around the shrunken polygon. + box, min_side = self._get_min_boxes(points) + + if min_side < self.min_size + 2: + continue + + box = np.array(box) + + # normalize and clip box coordinates to fit within the destination image size. + box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) + box[:, 1] = np.clip( + np.round(box[:, 1] / height * dest_height), 0, dest_height + ) + + boxes.append(box.astype("int32")) + scores.append(score) + + return np.array(boxes, dtype="int32"), scores + + @staticmethod + def _get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]: + """ + Calculate the minimum bounding box (rotated rectangle) for a given contour. + + Args: + contour (np.ndarray): The contour points of the detected shape. + + Returns: + Tuple[List[Tuple[float, float]], float]: A list of four points representing the + corners of the bounding box, and the length of the shortest side. + """ + bounding_box = cv2.minAreaRect(contour) + points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0]) + index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0) + index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2) + box = [points[index_1], points[index_2], points[index_3], points[index_4]] + return box, min(bounding_box[1]) + + @staticmethod + def _box_score(bitmap: np.ndarray, contour: np.ndarray) -> float: + """ + Calculate the average score within the bounding box of a contour. + + Args: + bitmap (np.ndarray): The output confidence map from the model. + contour (np.ndarray): The contour of the detected shape. + + Returns: + float: The average score of the pixels inside the contour region. + """ + h, w = bitmap.shape[:2] + contour = contour.reshape(-1, 2) + x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) + x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) + mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) + cv2.fillPoly(mask, [contour - [x1, y1]], 1) + return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] + + @staticmethod + def _expand_box(points: List[Tuple[float, float]]) -> np.ndarray: + """ + Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio. + + Args: + points (List[Tuple[float, float]]): Points of the polygon to expand. + + Returns: + np.ndarray: Expanded polygon points. + """ + polygon = Polygon(points) + distance = polygon.area / polygon.length + offset = PyclipperOffset() + offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) + expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2)) + return expanded + + def _filter_polygon( + self, points: List[np.ndarray], shape: Tuple[int, int] + ) -> np.ndarray: + """ + Filter a set of polygons to include only valid ones that fit within an image shape + and meet size constraints. + + Args: + points (List[np.ndarray]): List of polygons to filter. + shape (Tuple[int, int]): Shape of the image (height, width). + + Returns: + np.ndarray: List of filtered polygons. + """ + height, width = shape + return np.array( + [ + self._clockwise_order(point) + for point in points + if self._is_valid_polygon(point, width, height) + ] + ) + + @staticmethod + def _is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool: + """ + Check if a polygon is valid, meaning it fits within the image bounds + and has sides of a minimum length. + + Args: + point (np.ndarray): The polygon to validate. + width (int): Image width. + height (int): Image height. + + Returns: + bool: Whether the polygon is valid or not. + """ + return ( + point[:, 0].min() >= 0 + and point[:, 0].max() < width + and point[:, 1].min() >= 0 + and point[:, 1].max() < height + and np.linalg.norm(point[0] - point[1]) > 3 + and np.linalg.norm(point[0] - point[3]) > 3 + ) + + @staticmethod + def _clockwise_order(point: np.ndarray) -> np.ndarray: + """ + Arrange the points of a polygon in clockwise order based on their angular positions + around the polygon's center. + + Args: + point (np.ndarray): Array of points of the polygon. + + Returns: + np.ndarray: Points ordered in clockwise direction. + """ + center = point.mean(axis=0) + return point[ + np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0])) + ] + + @staticmethod + def _sort_polygon(points): + """ + Sort polygons based on their position in the image. If polygons are close in vertical + position (within 5 pixels), sort them by horizontal position. + + Args: + points: List of polygons to sort. + + Returns: + List: Sorted list of polygons. + """ + points.sort(key=lambda x: (x[0][1], x[0][0])) + for i in range(len(points) - 1): + for j in range(i, -1, -1): + if abs(points[j + 1][0][1] - points[j][0][1]) < 5 and ( + points[j + 1][0][0] < points[j][0][0] + ): + temp = points[j] + points[j] = points[j + 1] + points[j + 1] = temp + else: + break + return points + + @staticmethod + def _zero_pad(image: np.ndarray) -> np.ndarray: + """ + Apply zero-padding to an image, ensuring its dimensions are at least 32x32. + The padding is added only if needed. + + Args: + image (np.ndarray): Input image. + + Returns: + np.ndarray: Zero-padded image. + """ + h, w, c = image.shape + pad = np.zeros((max(32, h), max(32, w), c), np.uint8) + pad[:h, :w, :] = image + return pad + + @staticmethod + def _preprocess_classification_image(image: np.ndarray) -> np.ndarray: + """ + Preprocess a single image for classification by resizing, normalizing, and padding. + + This method resizes the input image to a fixed height of 48 pixels while adjusting + the width dynamically up to a maximum of 192 pixels. The image is then normalized and + padded to fit the required input dimensions for classification. + + Args: + image (np.ndarray): Input image to preprocess. + + Returns: + np.ndarray: Preprocessed and padded image. + """ + # fixed height of 48, dynamic width up to 192 + input_shape = (3, 48, 192) + input_c, input_h, input_w = input_shape + + h, w = image.shape[:2] + ratio = w / h + resized_w = min(input_w, math.ceil(input_h * ratio)) + + resized_image = cv2.resize(image, (resized_w, input_h)) + + # handle single-channel images (grayscale) if needed + if input_c == 1 and resized_image.ndim == 2: + resized_image = resized_image[np.newaxis, :, :] + else: + resized_image = resized_image.transpose((2, 0, 1)) + + # normalize + resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 + + padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32) + padded_image[:, :, :resized_w] = resized_image + + return padded_image + + def _process_classification_output( + self, images: List[np.ndarray], outputs: List[np.ndarray] + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + """ + Process the classification model output by matching labels with confidence scores. + + This method processes the outputs from the classification model and rotates images + with high confidence of being labeled "180". It ensures that results are mapped to + the original image order. + + Args: + images (List[np.ndarray]): List of input images. + outputs (List[np.ndarray]): Corresponding model outputs. + + Returns: + Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and + classification results (label and confidence score). + """ + labels = ["0", "180"] + results = [["", 0.0]] * len(images) + indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) + + outputs = np.stack(outputs) + + outputs = [ + (labels[idx], outputs[i, idx]) + for i, idx in enumerate(outputs.argmax(axis=1)) + ] + + for i in range(0, len(images), self.batch_size): + for j in range(len(outputs)): + label, score = outputs[j] + results[indices[i + j]] = [label, score] + # make sure we have high confidence if we need to flip a box, this will be rare in lpr + if "180" in label and score >= 0.9: + images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) + + return images, results + + def _preprocess_recognition_image( + self, image: np.ndarray, max_wh_ratio: float + ) -> np.ndarray: + """ + Preprocess an image for recognition by dynamically adjusting its width. + + This method adjusts the width of the image based on the maximum width-to-height ratio + while keeping the height fixed at 48 pixels. The image is then normalized and padded + to fit the required input dimensions for recognition. + + Args: + image (np.ndarray): Input image to preprocess. + max_wh_ratio (float): Maximum width-to-height ratio for resizing. + + Returns: + np.ndarray: Preprocessed and padded image. + """ + # fixed height of 48, dynamic width based on ratio + input_shape = [3, 48, 320] + input_h, input_w = input_shape[1], input_shape[2] + + assert image.shape[2] == input_shape[0], "Unexpected number of image channels." + + # dynamically adjust input width based on max_wh_ratio + input_w = int(input_h * max_wh_ratio) + + # check for model-specific input width + model_input_w = self.model_runner.recognition_model.runner.ort.get_inputs()[ + 0 + ].shape[3] + if isinstance(model_input_w, int) and model_input_w > 0: + input_w = model_input_w + + h, w = image.shape[:2] + aspect_ratio = w / h + resized_w = min(input_w, math.ceil(input_h * aspect_ratio)) + + resized_image = cv2.resize(image, (resized_w, input_h)) + resized_image = resized_image.transpose((2, 0, 1)) + resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 + + # Compute mean pixel value of the resized image (per channel) + mean_pixel = np.mean(resized_image, axis=(1, 2), keepdims=True) + padded_image = np.full( + (input_shape[0], input_h, input_w), mean_pixel, dtype=np.float32 + ) + padded_image[:, :, :resized_w] = resized_image + + return padded_image + + @staticmethod + def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray: + """ + Crop the license plate from the image using four corner points. + + This method crops the region containing the license plate by using the perspective + transformation based on four corner points. If the resulting image is significantly + taller than wide, the image is rotated to the correct orientation. + + Args: + image (np.ndarray): Input image containing the license plate. + points (np.ndarray): Four corner points defining the plate's position. + + Returns: + np.ndarray: Cropped and potentially rotated license plate image. + """ + assert len(points) == 4, "shape of points must be 4*2" + points = points.astype(np.float32) + crop_width = int( + max( + np.linalg.norm(points[0] - points[1]), + np.linalg.norm(points[2] - points[3]), + ) + ) + crop_height = int( + max( + np.linalg.norm(points[0] - points[3]), + np.linalg.norm(points[1] - points[2]), + ) + ) + pts_std = np.float32( + [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] + ) + matrix = cv2.getPerspectiveTransform(points, pts_std) + image = cv2.warpPerspective( + image, + matrix, + (crop_width, crop_height), + borderMode=cv2.BORDER_REPLICATE, + flags=cv2.INTER_CUBIC, + ) + height, width = image.shape[0:2] + if height * 1.0 / width >= 1.5: + image = np.rot90(image, k=3) + return image + + def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: + """ + Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ + + Return the dimensions of the detected plate as [x1, y1, x2, y2]. + """ + predictions = self.model_runner.yolov9_detection_model(input) + + confidence_threshold = self.lpr_config.detection_threshold + + top_score = -1 + top_box = None + + # Loop over predictions + for prediction in predictions: + score = prediction[6] + if score >= confidence_threshold: + bbox = prediction[1:5] + # Scale boxes back to original image size + scale_x = input.shape[1] / 256 + scale_y = input.shape[0] / 256 + bbox[0] *= scale_x + bbox[1] *= scale_y + bbox[2] *= scale_x + bbox[3] *= scale_y + + if score > top_score: + top_score = score + top_box = bbox + + # Return the top scoring bounding box if found + if top_box is not None: + # expand box by 15% to help with OCR + expansion = (top_box[2:] - top_box[:2]) * 0.1 + + # Expand box + expanded_box = np.array( + [ + top_box[0] - expansion[0], # x1 + top_box[1] - expansion[1], # y1 + top_box[2] + expansion[0], # x2 + top_box[3] + expansion[1], # y2 + ] + ).clip(0, [input.shape[1], input.shape[0]] * 2) + + logger.debug(f"Found license plate: {expanded_box.astype(int)}") + return tuple(expanded_box.astype(int)) + else: + return None # No detection above the threshold + + def _should_keep_previous_plate( + self, id, top_plate, top_char_confidences, top_area, avg_confidence + ): + if id not in self.detected_license_plates: + return False + + prev_data = self.detected_license_plates[id] + prev_plate = prev_data["plate"] + prev_char_confidences = prev_data["char_confidences"] + prev_area = prev_data["area"] + prev_avg_confidence = ( + sum(prev_char_confidences) / len(prev_char_confidences) + if prev_char_confidences + else 0 + ) + + # 1. Normalize metrics + # Length score - use relative comparison + # If lengths are equal, score is 0.5 for both + # If one is longer, it gets a higher score up to 1.0 + max_length_diff = 4 # Maximum expected difference in plate lengths + length_diff = len(top_plate) - len(prev_plate) + curr_length_score = 0.5 + ( + length_diff / (2 * max_length_diff) + ) # Normalize to 0-1 + curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1 + prev_length_score = 1 - curr_length_score # Inverse relationship + + # Area score (normalize based on max of current and previous) + max_area = max(top_area, prev_area) + curr_area_score = top_area / max_area + prev_area_score = prev_area / max_area + + # Average confidence score (already normalized 0-1) + curr_conf_score = avg_confidence + prev_conf_score = prev_avg_confidence + + # Character confidence comparison score + min_length = min(len(top_plate), len(prev_plate)) + if min_length > 0: + curr_char_conf = sum(top_char_confidences[:min_length]) / min_length + prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length + else: + curr_char_conf = 0 + prev_char_conf = 0 + + # 2. Define weights + weights = { + "length": 0.4, + "area": 0.3, + "avg_confidence": 0.2, + "char_confidence": 0.1, + } + + # 3. Calculate weighted scores + curr_score = ( + curr_length_score * weights["length"] + + curr_area_score * weights["area"] + + curr_conf_score * weights["avg_confidence"] + + curr_char_conf * weights["char_confidence"] + ) + + prev_score = ( + prev_length_score * weights["length"] + + prev_area_score * weights["area"] + + prev_conf_score * weights["avg_confidence"] + + prev_char_conf * weights["char_confidence"] + ) + + # 4. Log the comparison for debugging + logger.debug( + f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs " + f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n" + f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), " + f"Area: {top_area} vs {prev_area}, " + f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}" + ) + + # 5. Return True if we should keep the previous plate (i.e., if it scores higher) + return prev_score > curr_score + + def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray): + """Look for license plates in image.""" + + id = obj_data["id"] + + # don't run for non car objects + if obj_data.get("label") != "car": + logger.debug("Not a processing license plate for non car object.") + return + + # don't run for stationary car objects + if obj_data.get("stationary") == True: + logger.debug("Not a processing license plate for a stationary car object.") + return + + # don't overwrite sub label for objects that have a sub label + # that is not a license plate + if obj_data.get("sub_label") and id not in self.detected_license_plates: + logger.debug( + f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." + ) + return + + license_plate: Optional[dict[str, any]] = None + + if self.requires_license_plate_detection: + logger.debug("Running manual license_plate detection.") + car_box = obj_data.get("box") + + if not car_box: + return + + rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + left, top, right, bottom = car_box + car = rgb[top:bottom, left:right] + + # double the size of the car for better box detection + car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0]))) + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/car_frame_{current_time}.jpg", + car, + ) + + yolov9_start = datetime.datetime.now().timestamp() + license_plate = self._detect_license_plate(car) + logger.debug( + f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" + ) + + if not license_plate: + logger.debug("Detected no license plates for car object.") + return + + license_plate_area = max( + 0, + (license_plate[2] - license_plate[0]) + * (license_plate[3] - license_plate[1]), + ) + + # check that license plate is valid + # double the value because we've doubled the size of the car + if license_plate_area < self.lpr_config.min_area * 2: + logger.debug("License plate is less than min_area") + return + + license_plate_frame = car[ + license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] + ] + else: + # don't run for object without attributes + if not obj_data.get("current_attributes"): + logger.debug("No attributes to parse.") + return + + attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) + for attr in attributes: + if attr.get("label") != "license_plate": + continue + + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + # no license plates detected in this frame + if not license_plate: + return + + if license_plate.get("score") < self.lpr_config.detection_threshold: + logger.debug( + f"Plate detection score is less than the threshold ({license_plate['score']:0.2f} < {self.lpr_config.detection_threshold})" + ) + return + + license_plate_box = license_plate.get("box") + + # check that license plate is valid + if ( + not license_plate_box + or area(license_plate_box) < self.lpr_config.min_area + ): + logger.debug(f"Invalid license plate box {license_plate}") + return + + license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + license_plate_frame = license_plate_frame[ + license_plate_box[1] : license_plate_box[3], + license_plate_box[0] : license_plate_box[2], + ] + + # double the size of the license plate frame for better OCR + license_plate_frame = cv2.resize( + license_plate_frame, + ( + int(2 * license_plate_frame.shape[1]), + int(2 * license_plate_frame.shape[0]), + ), + ) + + if WRITE_DEBUG_IMAGES: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/license_plate_frame_{current_time}.jpg", + license_plate_frame, + ) + + # run detection, returns results sorted by confidence, best first + license_plates, confidences, areas = self._process_license_plate( + license_plate_frame + ) + + logger.debug(f"Text boxes: {license_plates}") + logger.debug(f"Confidences: {confidences}") + logger.debug(f"Areas: {areas}") + + if license_plates: + for plate, confidence, text_area in zip(license_plates, confidences, areas): + avg_confidence = ( + (sum(confidence) / len(confidence)) if confidence else 0 + ) + + logger.debug( + f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" + ) + else: + # no plates found + logger.debug("No text detected") + return + + top_plate, top_char_confidences, top_area = ( + license_plates[0], + confidences[0], + areas[0], + ) + avg_confidence = ( + (sum(top_char_confidences) / len(top_char_confidences)) + if top_char_confidences + else 0 + ) + + # Check if we have a previously detected plate for this ID + if id in self.detected_license_plates: + if self._should_keep_previous_plate( + id, top_plate, top_char_confidences, top_area, avg_confidence + ): + logger.debug("Keeping previous plate") + return + + # Check against minimum confidence threshold + if avg_confidence < self.lpr_config.recognition_threshold: + logger.debug( + f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})" + ) + return + + # Determine subLabel based on known plates, use regex matching + # Default to the detected plate, use label name if there's a match + sub_label = next( + ( + label + for label, plates in self.lpr_config.known_plates.items() + if any( + re.match(f"^{plate}$", top_plate) + or distance(plate, top_plate) <= self.lpr_config.match_distance + for plate in plates + ) + ), + top_plate, + ) + + # Send the result to the API + resp = requests.post( + f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", + json={ + "camera": obj_data.get("camera"), + "subLabel": sub_label, + "subLabelScore": avg_confidence, + }, + ) + + if resp.status_code == 200: + self.detected_license_plates[id] = { + "plate": top_plate, + "char_confidences": top_char_confidences, + "area": top_area, + "frame_time": obj_data["frame_time"], + } + + def handle_request(self, topic, request_data) -> dict[str, any] | None: + return + + def expire_object(self, object_id: str): + if object_id in self.detected_license_plates: + self.detected_license_plates.pop(object_id) + + +class CTCDecoder: + """ + A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model. + + This decoder converts the model's output probabilities into readable sequences of characters + while removing duplicates and handling blank tokens. It also calculates the confidence scores + for each decoded character sequence. + """ + + def __init__(self): + """ + Initialize the CTCDecoder with a list of characters and a character map. + + The character set includes digits, letters, special characters, and a "blank" token + (used by the CTC model for decoding purposes). A character map is created to map + indices to characters. + """ + self.characters = [ + "blank", + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + ":", + ";", + "<", + "=", + ">", + "?", + "@", + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Q", + "R", + "S", + "T", + "U", + "V", + "W", + "X", + "Y", + "Z", + "[", + "\\", + "]", + "^", + "_", + "`", + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + "{", + "|", + "}", + "~", + "!", + '"', + "#", + "$", + "%", + "&", + "'", + "(", + ")", + "*", + "+", + ",", + "-", + ".", + "/", + " ", + " ", + ] + self.char_map = {i: char for i, char in enumerate(self.characters)} + + def __call__( + self, outputs: List[np.ndarray] + ) -> Tuple[List[str], List[List[float]]]: + """ + Decode a batch of model outputs into character sequences and their confidence scores. + + The method takes the output probability distributions for each time step and uses + the best path decoding strategy. It then merges repeating characters and ignores + blank tokens. Confidence scores for each decoded character are also calculated. + + Args: + outputs (List[np.ndarray]): A list of model outputs, where each element is + a probability distribution for each time step. + + Returns: + Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences + and confidence scores for each sequence. + """ + results = [] + confidences = [] + for output in outputs: + seq_log_probs = np.log(output + 1e-8) + best_path = np.argmax(seq_log_probs, axis=1) + + merged_path = [] + merged_probs = [] + for t, char_index in enumerate(best_path): + if char_index != 0 and (t == 0 or char_index != best_path[t - 1]): + merged_path.append(char_index) + merged_probs.append(seq_log_probs[t, char_index]) + + result = "".join(self.char_map[idx] for idx in merged_path) + results.append(result) + + confidence = np.exp(merged_probs).tolist() + confidences.append(confidence) + + return results, confidences diff --git a/frigate/data_processing/real_time/license_plate_processor.py b/frigate/data_processing/real_time/license_plate_processor.py index 03a8eda53d..96e51dbaa5 100644 --- a/frigate/data_processing/real_time/license_plate_processor.py +++ b/frigate/data_processing/real_time/license_plate_processor.py @@ -2,735 +2,32 @@ import datetime import logging -import math -import re -from typing import List, Optional, Tuple -import cv2 import numpy as np -import requests -from Levenshtein import distance -from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset -from shapely.geometry import Polygon -from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.const import FRIGATE_LOCALHOST -from frigate.embeddings.onnx.lpr_embedding import ( - LicensePlateDetector, - PaddleOCRClassification, - PaddleOCRDetection, - PaddleOCRRecognition, +from frigate.data_processing.common.license_plate import ( + LicensePlateProcessingMixin, ) -from frigate.util.image import area +from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner from ..types import DataProcessorMetrics from .api import RealTimeProcessorApi logger = logging.getLogger(__name__) -WRITE_DEBUG_IMAGES = False - -class LicensePlateProcessor(RealTimeProcessorApi): - def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): - super().__init__(config, metrics) - self.requestor = InterProcessRequestor() +class LicensePlateProcessor(LicensePlateProcessingMixin, RealTimeProcessorApi): + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: LicensePlateModelRunner, + ): + self.model_runner = model_runner self.lpr_config = config.lpr - self.requires_license_plate_detection = ( - "license_plate" not in self.config.objects.all_objects - ) - self.detected_license_plates: dict[str, dict[str, any]] = {} - - self.ctc_decoder = CTCDecoder() - - self.batch_size = 6 - - # Detection specific parameters - self.min_size = 3 - self.max_size = 960 - self.box_thresh = 0.8 - self.mask_thresh = 0.8 - - self.lpr_detection_model = None - self.lpr_classification_model = None - self.lpr_recognition_model = None - - if self.config.lpr.enabled: - self.detection_model = PaddleOCRDetection( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.classification_model = PaddleOCRClassification( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.recognition_model = PaddleOCRRecognition( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - self.yolov9_detection_model = LicensePlateDetector( - model_size="large", - requestor=self.requestor, - device="CPU", - ) - - if self.lpr_config.enabled: - # all models need to be loaded to run LPR - self.detection_model._load_model_and_utils() - self.classification_model._load_model_and_utils() - self.recognition_model._load_model_and_utils() - self.yolov9_detection_model._load_model_and_utils() - - def _detect(self, image: np.ndarray) -> List[np.ndarray]: - """ - Detect possible license plates in the input image by first resizing and normalizing it, - running a detection model, and filtering out low-probability regions. - - Args: - image (np.ndarray): The input image in which license plates will be detected. - - Returns: - List[np.ndarray]: A list of bounding box coordinates representing detected license plates. - """ - h, w = image.shape[:2] - - if sum([h, w]) < 64: - image = self._zero_pad(image) - - resized_image = self._resize_image(image) - normalized_image = self._normalize_image(resized_image) - - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - cv2.imwrite( - f"debug/frames/license_plate_resized_{current_time}.jpg", - resized_image, - ) - - outputs = self.detection_model([normalized_image])[0] - outputs = outputs[0, :, :] - - boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) - return self._filter_polygon(boxes, (h, w)) - - def _classify( - self, images: List[np.ndarray] - ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: - """ - Classify the orientation or category of each detected license plate. - - Args: - images (List[np.ndarray]): A list of images of detected license plates. - - Returns: - Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images - and classification results with confidence scores. - """ - num_images = len(images) - indices = np.argsort([x.shape[1] / x.shape[0] for x in images]) - - for i in range(0, num_images, self.batch_size): - norm_images = [] - for j in range(i, min(num_images, i + self.batch_size)): - norm_img = self._preprocess_classification_image(images[indices[j]]) - norm_img = norm_img[np.newaxis, :] - norm_images.append(norm_img) - - outputs = self.classification_model(norm_images) - - return self._process_classification_output(images, outputs) - - def _recognize( - self, images: List[np.ndarray] - ) -> Tuple[List[str], List[List[float]]]: - """ - Recognize the characters on the detected license plates using the recognition model. - - Args: - images (List[np.ndarray]): A list of images of license plates to recognize. - - Returns: - Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores. - """ - input_shape = [3, 48, 320] - num_images = len(images) - - # sort images by aspect ratio for processing - indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) - - for index in range(0, num_images, self.batch_size): - input_h, input_w = input_shape[1], input_shape[2] - max_wh_ratio = input_w / input_h - norm_images = [] - - # calculate the maximum aspect ratio in the current batch - for i in range(index, min(num_images, index + self.batch_size)): - h, w = images[indices[i]].shape[0:2] - max_wh_ratio = max(max_wh_ratio, w * 1.0 / h) - - # preprocess the images based on the max aspect ratio - for i in range(index, min(num_images, index + self.batch_size)): - norm_image = self._preprocess_recognition_image( - images[indices[i]], max_wh_ratio - ) - norm_image = norm_image[np.newaxis, :] - norm_images.append(norm_image) - - outputs = self.recognition_model(norm_images) - return self.ctc_decoder(outputs) - - def _process_license_plate( - self, image: np.ndarray - ) -> Tuple[List[str], List[float], List[int]]: - """ - Complete pipeline for detecting, classifying, and recognizing license plates in the input image. - - Args: - image (np.ndarray): The input image in which to detect, classify, and recognize license plates. - - Returns: - Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. - """ - if ( - self.detection_model.runner is None - or self.classification_model.runner is None - or self.recognition_model.runner is None - ): - # we might still be downloading the models - logger.debug("Model runners not loaded") - return [], [], [] - - plate_points = self._detect(image) - if len(plate_points) == 0: - logger.debug("No points found by OCR detector model") - return [], [], [] - - plate_points = self._sort_polygon(list(plate_points)) - plate_images = [self._crop_license_plate(image, x) for x in plate_points] - rotated_images, _ = self._classify(plate_images) - - # debug rotated and classification result - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - for i, img in enumerate(plate_images): - cv2.imwrite( - f"debug/frames/license_plate_rotated_{current_time}_{i + 1}.jpg", - img, - ) - for i, img in enumerate(rotated_images): - cv2.imwrite( - f"debug/frames/license_plate_classified_{current_time}_{i + 1}.jpg", - img, - ) - - # keep track of the index of each image for correct area calc later - sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) - reverse_mapping = { - idx: original_idx for original_idx, idx in enumerate(sorted_indices) - } - - results, confidences = self._recognize(rotated_images) - - if results: - license_plates = [""] * len(rotated_images) - average_confidences = [[0.0]] * len(rotated_images) - areas = [0] * len(rotated_images) - - # map results back to original image order - for i, (plate, conf) in enumerate(zip(results, confidences)): - original_idx = reverse_mapping[i] - - height, width = rotated_images[original_idx].shape[:2] - area = height * width - - average_confidence = conf - - # set to True to write each cropped image for debugging - if False: - save_image = cv2.cvtColor( - rotated_images[original_idx], cv2.COLOR_RGB2BGR - ) - filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg" - cv2.imwrite(filename, save_image) - - license_plates[original_idx] = plate - average_confidences[original_idx] = average_confidence - areas[original_idx] = area - - # Filter out plates that have a length of less than min_plate_length characters - # or that don't match the expected format (if defined) - # Sort by area, then by plate length, then by confidence all desc - filtered_data = [] - for plate, conf, area in zip(license_plates, average_confidences, areas): - if len(plate) < self.lpr_config.min_plate_length: - logger.debug( - f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" - ) - continue - - if self.lpr_config.format and not re.fullmatch( - self.lpr_config.format, plate - ): - logger.debug(f"Filtered out '{plate}' due to format mismatch") - continue - - filtered_data.append((plate, conf, area)) - - sorted_data = sorted( - filtered_data, - key=lambda x: (x[2], len(x[0]), x[1]), - reverse=True, - ) - - if sorted_data: - return map(list, zip(*sorted_data)) - - return [], [], [] - - def _resize_image(self, image: np.ndarray) -> np.ndarray: - """ - Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32. - - Args: - image (np.ndarray): The input image to resize. - - Returns: - np.ndarray: The resized image. - """ - h, w = image.shape[:2] - ratio = min(self.max_size / max(h, w), 1.0) - resize_h = max(int(round(int(h * ratio) / 32) * 32), 32) - resize_w = max(int(round(int(w * ratio) / 32) * 32), 32) - return cv2.resize(image, (resize_w, resize_h)) - - def _normalize_image(self, image: np.ndarray) -> np.ndarray: - """ - Normalize the input image by subtracting the mean and multiplying by the standard deviation. - - Args: - image (np.ndarray): The input image to normalize. - - Returns: - np.ndarray: The normalized image, transposed to match the model's expected input format. - """ - mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64") - std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype("float64") - - image = image.astype("float32") - cv2.subtract(image, mean, image) - cv2.multiply(image, std, image) - return image.transpose((2, 0, 1))[np.newaxis, ...] - - def _boxes_from_bitmap( - self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int - ) -> Tuple[np.ndarray, List[float]]: - """ - Process the binary mask to extract bounding boxes and associated confidence scores. - - Args: - output (np.ndarray): Output confidence map from the model. - mask (np.ndarray): Binary mask of detected regions. - dest_width (int): Target width for scaling the box coordinates. - dest_height (int): Target height for scaling the box coordinates. - - Returns: - Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores. - """ - - mask = (mask * 255).astype(np.uint8) - height, width = mask.shape - outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) - - # handle different return values of findContours between OpenCV versions - contours = outs[0] if len(outs) == 2 else outs[1] - - boxes = [] - scores = [] - - for index in range(len(contours)): - contour = contours[index] - - # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. - points, min_side = self._get_min_boxes(contour) - logger.debug(f"min side {index}, {min_side}") - - if min_side < self.min_size: - continue - - points = np.array(points) - - score = self._box_score(output, contour) - logger.debug(f"box score {index}, {score}") - if self.box_thresh > score: - continue - - polygon = Polygon(points) - distance = polygon.area / polygon.length - - # Use pyclipper to shrink the polygon slightly based on the computed distance. - offset = PyclipperOffset() - offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) - points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2)) - - # get the minimum bounding box around the shrunken polygon. - box, min_side = self._get_min_boxes(points) - - if min_side < self.min_size + 2: - continue - - box = np.array(box) - - # normalize and clip box coordinates to fit within the destination image size. - box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) - box[:, 1] = np.clip( - np.round(box[:, 1] / height * dest_height), 0, dest_height - ) - - boxes.append(box.astype("int32")) - scores.append(score) - - return np.array(boxes, dtype="int32"), scores - - @staticmethod - def _get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]: - """ - Calculate the minimum bounding box (rotated rectangle) for a given contour. - - Args: - contour (np.ndarray): The contour points of the detected shape. - - Returns: - Tuple[List[Tuple[float, float]], float]: A list of four points representing the - corners of the bounding box, and the length of the shortest side. - """ - bounding_box = cv2.minAreaRect(contour) - points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0]) - index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0) - index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2) - box = [points[index_1], points[index_2], points[index_3], points[index_4]] - return box, min(bounding_box[1]) - - @staticmethod - def _box_score(bitmap: np.ndarray, contour: np.ndarray) -> float: - """ - Calculate the average score within the bounding box of a contour. - - Args: - bitmap (np.ndarray): The output confidence map from the model. - contour (np.ndarray): The contour of the detected shape. - - Returns: - float: The average score of the pixels inside the contour region. - """ - h, w = bitmap.shape[:2] - contour = contour.reshape(-1, 2) - x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) - x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) - mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) - cv2.fillPoly(mask, [contour - [x1, y1]], 1) - return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] - - @staticmethod - def _expand_box(points: List[Tuple[float, float]]) -> np.ndarray: - """ - Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio. - - Args: - points (List[Tuple[float, float]]): Points of the polygon to expand. - - Returns: - np.ndarray: Expanded polygon points. - """ - polygon = Polygon(points) - distance = polygon.area / polygon.length - offset = PyclipperOffset() - offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) - expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2)) - return expanded - - def _filter_polygon( - self, points: List[np.ndarray], shape: Tuple[int, int] - ) -> np.ndarray: - """ - Filter a set of polygons to include only valid ones that fit within an image shape - and meet size constraints. - - Args: - points (List[np.ndarray]): List of polygons to filter. - shape (Tuple[int, int]): Shape of the image (height, width). - - Returns: - np.ndarray: List of filtered polygons. - """ - height, width = shape - return np.array( - [ - self._clockwise_order(point) - for point in points - if self._is_valid_polygon(point, width, height) - ] - ) - - @staticmethod - def _is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool: - """ - Check if a polygon is valid, meaning it fits within the image bounds - and has sides of a minimum length. - - Args: - point (np.ndarray): The polygon to validate. - width (int): Image width. - height (int): Image height. - - Returns: - bool: Whether the polygon is valid or not. - """ - return ( - point[:, 0].min() >= 0 - and point[:, 0].max() < width - and point[:, 1].min() >= 0 - and point[:, 1].max() < height - and np.linalg.norm(point[0] - point[1]) > 3 - and np.linalg.norm(point[0] - point[3]) > 3 - ) - - @staticmethod - def _clockwise_order(point: np.ndarray) -> np.ndarray: - """ - Arrange the points of a polygon in clockwise order based on their angular positions - around the polygon's center. - - Args: - point (np.ndarray): Array of points of the polygon. - - Returns: - np.ndarray: Points ordered in clockwise direction. - """ - center = point.mean(axis=0) - return point[ - np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0])) - ] - - @staticmethod - def _sort_polygon(points): - """ - Sort polygons based on their position in the image. If polygons are close in vertical - position (within 5 pixels), sort them by horizontal position. - - Args: - points: List of polygons to sort. - - Returns: - List: Sorted list of polygons. - """ - points.sort(key=lambda x: (x[0][1], x[0][0])) - for i in range(len(points) - 1): - for j in range(i, -1, -1): - if abs(points[j + 1][0][1] - points[j][0][1]) < 5 and ( - points[j + 1][0][0] < points[j][0][0] - ): - temp = points[j] - points[j] = points[j + 1] - points[j + 1] = temp - else: - break - return points - - @staticmethod - def _zero_pad(image: np.ndarray) -> np.ndarray: - """ - Apply zero-padding to an image, ensuring its dimensions are at least 32x32. - The padding is added only if needed. - - Args: - image (np.ndarray): Input image. - - Returns: - np.ndarray: Zero-padded image. - """ - h, w, c = image.shape - pad = np.zeros((max(32, h), max(32, w), c), np.uint8) - pad[:h, :w, :] = image - return pad - - @staticmethod - def _preprocess_classification_image(image: np.ndarray) -> np.ndarray: - """ - Preprocess a single image for classification by resizing, normalizing, and padding. - - This method resizes the input image to a fixed height of 48 pixels while adjusting - the width dynamically up to a maximum of 192 pixels. The image is then normalized and - padded to fit the required input dimensions for classification. - - Args: - image (np.ndarray): Input image to preprocess. - - Returns: - np.ndarray: Preprocessed and padded image. - """ - # fixed height of 48, dynamic width up to 192 - input_shape = (3, 48, 192) - input_c, input_h, input_w = input_shape - - h, w = image.shape[:2] - ratio = w / h - resized_w = min(input_w, math.ceil(input_h * ratio)) - - resized_image = cv2.resize(image, (resized_w, input_h)) - - # handle single-channel images (grayscale) if needed - if input_c == 1 and resized_image.ndim == 2: - resized_image = resized_image[np.newaxis, :, :] - else: - resized_image = resized_image.transpose((2, 0, 1)) - - # normalize - resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 - - padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32) - padded_image[:, :, :resized_w] = resized_image - - return padded_image - - def _process_classification_output( - self, images: List[np.ndarray], outputs: List[np.ndarray] - ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: - """ - Process the classification model output by matching labels with confidence scores. - - This method processes the outputs from the classification model and rotates images - with high confidence of being labeled "180". It ensures that results are mapped to - the original image order. - - Args: - images (List[np.ndarray]): List of input images. - outputs (List[np.ndarray]): Corresponding model outputs. - - Returns: - Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and - classification results (label and confidence score). - """ - labels = ["0", "180"] - results = [["", 0.0]] * len(images) - indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) - - outputs = np.stack(outputs) - - outputs = [ - (labels[idx], outputs[i, idx]) - for i, idx in enumerate(outputs.argmax(axis=1)) - ] - - for i in range(0, len(images), self.batch_size): - for j in range(len(outputs)): - label, score = outputs[j] - results[indices[i + j]] = [label, score] - # make sure we have high confidence if we need to flip a box, this will be rare in lpr - if "180" in label and score >= 0.9: - images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) - - return images, results - - def _preprocess_recognition_image( - self, image: np.ndarray, max_wh_ratio: float - ) -> np.ndarray: - """ - Preprocess an image for recognition by dynamically adjusting its width. - - This method adjusts the width of the image based on the maximum width-to-height ratio - while keeping the height fixed at 48 pixels. The image is then normalized and padded - to fit the required input dimensions for recognition. - - Args: - image (np.ndarray): Input image to preprocess. - max_wh_ratio (float): Maximum width-to-height ratio for resizing. - - Returns: - np.ndarray: Preprocessed and padded image. - """ - # fixed height of 48, dynamic width based on ratio - input_shape = [3, 48, 320] - input_h, input_w = input_shape[1], input_shape[2] - - assert image.shape[2] == input_shape[0], "Unexpected number of image channels." - - # dynamically adjust input width based on max_wh_ratio - input_w = int(input_h * max_wh_ratio) - - # check for model-specific input width - model_input_w = self.recognition_model.runner.ort.get_inputs()[0].shape[3] - if isinstance(model_input_w, int) and model_input_w > 0: - input_w = model_input_w - - h, w = image.shape[:2] - aspect_ratio = w / h - resized_w = min(input_w, math.ceil(input_h * aspect_ratio)) - - resized_image = cv2.resize(image, (resized_w, input_h)) - resized_image = resized_image.transpose((2, 0, 1)) - resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 - - # Compute mean pixel value of the resized image (per channel) - mean_pixel = np.mean(resized_image, axis=(1, 2), keepdims=True) - padded_image = np.full( - (input_shape[0], input_h, input_w), mean_pixel, dtype=np.float32 - ) - padded_image[:, :, :resized_w] = resized_image - - return padded_image - - @staticmethod - def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray: - """ - Crop the license plate from the image using four corner points. - - This method crops the region containing the license plate by using the perspective - transformation based on four corner points. If the resulting image is significantly - taller than wide, the image is rotated to the correct orientation. - - Args: - image (np.ndarray): Input image containing the license plate. - points (np.ndarray): Four corner points defining the plate's position. - - Returns: - np.ndarray: Cropped and potentially rotated license plate image. - """ - assert len(points) == 4, "shape of points must be 4*2" - points = points.astype(np.float32) - crop_width = int( - max( - np.linalg.norm(points[0] - points[1]), - np.linalg.norm(points[2] - points[3]), - ) - ) - crop_height = int( - max( - np.linalg.norm(points[0] - points[3]), - np.linalg.norm(points[1] - points[2]), - ) - ) - pts_std = np.float32( - [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] - ) - matrix = cv2.getPerspectiveTransform(points, pts_std) - image = cv2.warpPerspective( - image, - matrix, - (crop_width, crop_height), - borderMode=cv2.BORDER_REPLICATE, - flags=cv2.INTER_CUBIC, - ) - height, width = image.shape[0:2] - if height * 1.0 / width >= 1.5: - image = np.rot90(image, k=3) - return image + self.config = config + super().__init__(config, metrics, model_runner) def __update_metrics(self, duration: float) -> None: """ @@ -738,467 +35,10 @@ def __update_metrics(self, duration: float) -> None: """ self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: - """ - Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ - - Return the dimensions of the detected plate as [x1, y1, x2, y2]. - """ - predictions = self.yolov9_detection_model(input) - - confidence_threshold = self.lpr_config.detection_threshold - - top_score = -1 - top_box = None - - # Loop over predictions - for prediction in predictions: - score = prediction[6] - if score >= confidence_threshold: - bbox = prediction[1:5] - # Scale boxes back to original image size - scale_x = input.shape[1] / 256 - scale_y = input.shape[0] / 256 - bbox[0] *= scale_x - bbox[1] *= scale_y - bbox[2] *= scale_x - bbox[3] *= scale_y - - if score > top_score: - top_score = score - top_box = bbox - - # Return the top scoring bounding box if found - if top_box is not None: - # expand box by 15% to help with OCR - expansion = (top_box[2:] - top_box[:2]) * 0.1 - - # Expand box - expanded_box = np.array( - [ - top_box[0] - expansion[0], # x1 - top_box[1] - expansion[1], # y1 - top_box[2] + expansion[0], # x2 - top_box[3] + expansion[1], # y2 - ] - ).clip(0, [input.shape[1], input.shape[0]] * 2) - - logger.debug(f"Found license plate: {expanded_box.astype(int)}") - return tuple(expanded_box.astype(int)) - else: - return None # No detection above the threshold - - def _should_keep_previous_plate( - self, id, top_plate, top_char_confidences, top_area, avg_confidence - ): - if id not in self.detected_license_plates: - return False - - prev_data = self.detected_license_plates[id] - prev_plate = prev_data["plate"] - prev_char_confidences = prev_data["char_confidences"] - prev_area = prev_data["area"] - prev_avg_confidence = ( - sum(prev_char_confidences) / len(prev_char_confidences) - if prev_char_confidences - else 0 - ) - - # 1. Normalize metrics - # Length score - use relative comparison - # If lengths are equal, score is 0.5 for both - # If one is longer, it gets a higher score up to 1.0 - max_length_diff = 4 # Maximum expected difference in plate lengths - length_diff = len(top_plate) - len(prev_plate) - curr_length_score = 0.5 + ( - length_diff / (2 * max_length_diff) - ) # Normalize to 0-1 - curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1 - prev_length_score = 1 - curr_length_score # Inverse relationship - - # Area score (normalize based on max of current and previous) - max_area = max(top_area, prev_area) - curr_area_score = top_area / max_area - prev_area_score = prev_area / max_area - - # Average confidence score (already normalized 0-1) - curr_conf_score = avg_confidence - prev_conf_score = prev_avg_confidence - - # Character confidence comparison score - min_length = min(len(top_plate), len(prev_plate)) - if min_length > 0: - curr_char_conf = sum(top_char_confidences[:min_length]) / min_length - prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length - else: - curr_char_conf = 0 - prev_char_conf = 0 - - # 2. Define weights - weights = { - "length": 0.4, - "area": 0.3, - "avg_confidence": 0.2, - "char_confidence": 0.1, - } - - # 3. Calculate weighted scores - curr_score = ( - curr_length_score * weights["length"] - + curr_area_score * weights["area"] - + curr_conf_score * weights["avg_confidence"] - + curr_char_conf * weights["char_confidence"] - ) - - prev_score = ( - prev_length_score * weights["length"] - + prev_area_score * weights["area"] - + prev_conf_score * weights["avg_confidence"] - + prev_char_conf * weights["char_confidence"] - ) - - # 4. Log the comparison for debugging - logger.debug( - f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs " - f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n" - f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), " - f"Area: {top_area} vs {prev_area}, " - f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}" - ) - - # 5. Return True if we should keep the previous plate (i.e., if it scores higher) - return prev_score > curr_score - - def process_keyframe_lpr(self, obj_data: dict[str, any]) -> None: - """ - Runs LPR on an enlarged region of the latest keyframe for the given object. - Called after the main process_frame as a backup check. - Args: - obj_data: Object data dictionary containing camera, id, and box coordinates - """ - if ( - obj_data.get("label") != "car" - or obj_data.get("stationary") == True - or ( - obj_data.get("sub_label") - and obj_data["id"] not in self.detected_license_plates - ) - or obj_data.get("is_keyframe_check") - ): - return - - camera = obj_data.get("camera") - if not camera: - return - - yuv_height, yuv_width = self.config.cameras[camera].frame_shape_yuv - detect_width = self.config.cameras[camera].detect.width - detect_height = self.config.cameras[camera].detect.height - - result = get_latest_keyframe_yuv420(camera) - if result is None: - logger.debug(f"No keyframe available for camera {camera}") - return - - keyframe, timestamp = result - - # Resize keyframe to match frame_shape_yuv dimensions - keyframe_resized = cv2.resize(keyframe, (yuv_width, yuv_height)) - - # Scale the boxes based on detect dimensions - scale_x = detect_width / keyframe.shape[1] - scale_y = detect_height / keyframe.shape[0] - - # Determine which box to enlarge based on detection mode - if self.requires_license_plate_detection: - # Scale and enlarge the car box - box = obj_data.get("box") - if not box: - return - - # Scale original box to detection dimensions - left = int(box[0] * scale_x) - top = int(box[1] * scale_y) - right = int(box[2] * scale_x) - bottom = int(box[3] * scale_y) - box = [left, top, right, bottom] - else: - # Get the license plate box from attributes - if not obj_data.get("current_attributes"): - return - - license_plate = None - for attr in obj_data["current_attributes"]: - if attr.get("label") != "license_plate": - continue - if license_plate is None or attr.get("score", 0.0) > license_plate.get( - "score", 0.0 - ): - license_plate = attr - - if not license_plate or not license_plate.get("box"): - return - - # Scale license plate box to detection dimensions - orig_box = license_plate["box"] - left = int(orig_box[0] * scale_x) - top = int(orig_box[1] * scale_y) - right = int(orig_box[2] * scale_x) - bottom = int(orig_box[3] * scale_y) - box = [left, top, right, bottom] - - width_box = right - left - height_box = bottom - top - - # Enlarge box by 30% - enlarge_factor = 0.3 - new_left = max(0, int(left - (width_box * enlarge_factor / 2))) - new_top = max(0, int(top - (height_box * enlarge_factor / 2))) - new_right = min(detect_width, int(right + (width_box * enlarge_factor / 2))) - new_bottom = min(detect_height, int(bottom + (height_box * enlarge_factor / 2))) - - keyframe_obj_data = obj_data.copy() - if self.requires_license_plate_detection: - keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] - else: - # Update the license plate box in the attributes - new_attributes = [] - for attr in obj_data["current_attributes"]: - if attr.get("label") == "license_plate": - new_attr = attr.copy() - new_attr["box"] = [new_left, new_top, new_right, new_bottom] - new_attributes.append(new_attr) - else: - new_attributes.append(attr) - keyframe_obj_data["current_attributes"] = new_attributes - - keyframe_obj_data["frame_time"] = timestamp - - # Add a flag to prevent infinite recursion - keyframe_obj_data["is_keyframe_check"] = True - - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - rgb = cv2.cvtColor(keyframe_resized, cv2.COLOR_YUV2BGR_I420) - cv2.imwrite( - f"debug/frames/keyframe_resized_{current_time}.jpg", - rgb, - ) - - self.process_frame(keyframe_obj_data, keyframe_resized) - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): """Look for license plates in image.""" start = datetime.datetime.now().timestamp() - - id = obj_data["id"] - - # don't run for non car objects - if obj_data.get("label") != "car": - logger.debug("Not a processing license plate for non car object.") - return - - # don't run for stationary car objects - if obj_data.get("stationary") == True: - logger.debug("Not a processing license plate for a stationary car object.") - return - - # don't overwrite sub label for objects that have a sub label - # that is not a license plate - if obj_data.get("sub_label") and id not in self.detected_license_plates: - logger.debug( - f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." - ) - return - - license_plate: Optional[dict[str, any]] = None - - if self.requires_license_plate_detection: - logger.debug("Running manual license_plate detection.") - car_box = obj_data.get("box") - - if not car_box: - return - - rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - left, top, right, bottom = car_box - car = rgb[top:bottom, left:right] - - # double the size of the car for better box detection - car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0]))) - - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - cv2.imwrite( - f"debug/frames/car_frame_{current_time}.jpg", - car, - ) - - yolov9_start = datetime.datetime.now().timestamp() - license_plate = self._detect_license_plate(car) - logger.debug( - f"YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" - ) - - if not license_plate: - logger.debug("Detected no license plates for car object.") - return - - license_plate_area = max( - 0, - (license_plate[2] - license_plate[0]) - * (license_plate[3] - license_plate[1]), - ) - - # check that license plate is valid - # double the value because we've doubled the size of the car - if license_plate_area < self.config.lpr.min_area * 2: - logger.debug("License plate is less than min_area") - return - - license_plate_frame = car[ - license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] - ] - else: - # don't run for object without attributes - if not obj_data.get("current_attributes"): - logger.debug("No attributes to parse.") - return - - attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) - for attr in attributes: - if attr.get("label") != "license_plate": - continue - - if license_plate is None or attr.get("score", 0.0) > license_plate.get( - "score", 0.0 - ): - license_plate = attr - - # no license plates detected in this frame - if not license_plate: - return - - if license_plate.get("score") < self.lpr_config.detection_threshold: - logger.debug( - f"Plate detection score is less than the threshold ({license_plate['score']:0.2f} < {self.lpr_config.detection_threshold})" - ) - return - - license_plate_box = license_plate.get("box") - - # check that license plate is valid - if ( - not license_plate_box - or area(license_plate_box) < self.config.lpr.min_area - ): - logger.debug(f"Invalid license plate box {license_plate}") - return - - license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) - license_plate_frame = license_plate_frame[ - license_plate_box[1] : license_plate_box[3], - license_plate_box[0] : license_plate_box[2], - ] - - # double the size of the license plate frame for better OCR - license_plate_frame = cv2.resize( - license_plate_frame, - ( - int(2 * license_plate_frame.shape[1]), - int(2 * license_plate_frame.shape[0]), - ), - ) - - if WRITE_DEBUG_IMAGES: - current_time = int(datetime.datetime.now().timestamp()) - cv2.imwrite( - f"debug/frames/license_plate_frame_{current_time}.jpg", - license_plate_frame, - ) - - # run detection, returns results sorted by confidence, best first - license_plates, confidences, areas = self._process_license_plate( - license_plate_frame - ) - - logger.debug(f"Text boxes: {license_plates}") - logger.debug(f"Confidences: {confidences}") - logger.debug(f"Areas: {areas}") - - if license_plates: - for plate, confidence, text_area in zip(license_plates, confidences, areas): - avg_confidence = ( - (sum(confidence) / len(confidence)) if confidence else 0 - ) - - logger.debug( - f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" - ) - else: - # no plates found - logger.debug("No text detected") - return - - top_plate, top_char_confidences, top_area = ( - license_plates[0], - confidences[0], - areas[0], - ) - avg_confidence = ( - (sum(top_char_confidences) / len(top_char_confidences)) - if top_char_confidences - else 0 - ) - - # Check if we have a previously detected plate for this ID - if id in self.detected_license_plates: - if self._should_keep_previous_plate( - id, top_plate, top_char_confidences, top_area, avg_confidence - ): - logger.debug("Keeping previous plate") - return - - # Check against minimum confidence threshold - if avg_confidence < self.lpr_config.recognition_threshold: - logger.debug( - f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.recognition_threshold})" - ) - return - - # Determine subLabel based on known plates, use regex matching - # Default to the detected plate, use label name if there's a match - sub_label = next( - ( - label - for label, plates in self.lpr_config.known_plates.items() - if any( - re.match(f"^{plate}$", top_plate) - or distance(plate, top_plate) <= self.lpr_config.match_distance - for plate in plates - ) - ), - top_plate, - ) - - # Send the result to the API - resp = requests.post( - f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", - json={ - "camera": obj_data.get("camera"), - "subLabel": sub_label, - "subLabelScore": avg_confidence, - }, - ) - - if resp.status_code == 200: - self.detected_license_plates[id] = { - "plate": top_plate, - "char_confidences": top_char_confidences, - "area": top_area, - "frame_time": obj_data["frame_time"], - } - + self.lpr_process(obj_data, frame) self.__update_metrics(datetime.datetime.now().timestamp() - start) def handle_request(self, topic, request_data) -> dict[str, any] | None: @@ -1207,161 +47,3 @@ def handle_request(self, topic, request_data) -> dict[str, any] | None: def expire_object(self, object_id: str): if object_id in self.detected_license_plates: self.detected_license_plates.pop(object_id) - - -class CTCDecoder: - """ - A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model. - - This decoder converts the model's output probabilities into readable sequences of characters - while removing duplicates and handling blank tokens. It also calculates the confidence scores - for each decoded character sequence. - """ - - def __init__(self): - """ - Initialize the CTCDecoder with a list of characters and a character map. - - The character set includes digits, letters, special characters, and a "blank" token - (used by the CTC model for decoding purposes). A character map is created to map - indices to characters. - """ - self.characters = [ - "blank", - "0", - "1", - "2", - "3", - "4", - "5", - "6", - "7", - "8", - "9", - ":", - ";", - "<", - "=", - ">", - "?", - "@", - "A", - "B", - "C", - "D", - "E", - "F", - "G", - "H", - "I", - "J", - "K", - "L", - "M", - "N", - "O", - "P", - "Q", - "R", - "S", - "T", - "U", - "V", - "W", - "X", - "Y", - "Z", - "[", - "\\", - "]", - "^", - "_", - "`", - "a", - "b", - "c", - "d", - "e", - "f", - "g", - "h", - "i", - "j", - "k", - "l", - "m", - "n", - "o", - "p", - "q", - "r", - "s", - "t", - "u", - "v", - "w", - "x", - "y", - "z", - "{", - "|", - "}", - "~", - "!", - '"', - "#", - "$", - "%", - "&", - "'", - "(", - ")", - "*", - "+", - ",", - "-", - ".", - "/", - " ", - " ", - ] - self.char_map = {i: char for i, char in enumerate(self.characters)} - - def __call__( - self, outputs: List[np.ndarray] - ) -> Tuple[List[str], List[List[float]]]: - """ - Decode a batch of model outputs into character sequences and their confidence scores. - - The method takes the output probability distributions for each time step and uses - the best path decoding strategy. It then merges repeating characters and ignores - blank tokens. Confidence scores for each decoded character are also calculated. - - Args: - outputs (List[np.ndarray]): A list of model outputs, where each element is - a probability distribution for each time step. - - Returns: - Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences - and confidence scores for each sequence. - """ - results = [] - confidences = [] - for output in outputs: - seq_log_probs = np.log(output + 1e-8) - best_path = np.argmax(seq_log_probs, axis=1) - - merged_path = [] - merged_probs = [] - for t, char_index in enumerate(best_path): - if char_index != 0 and (t == 0 or char_index != best_path[t - 1]): - merged_path.append(char_index) - merged_probs.append(seq_log_probs[t, char_index]) - - result = "".join(self.char_map[idx] for idx in merged_path) - results.append(result) - - confidence = np.exp(merged_probs).tolist() - confidences.append(confidence) - - return results, confidences diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 3d906ab0ea..0142e813dd 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,6 +29,7 @@ CLIPS_DIR, UPDATE_EVENT_DESCRIPTION, ) +from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.bird_processor import BirdProcessor from frigate.data_processing.real_time.face_processor import FaceProcessor @@ -70,6 +71,9 @@ def __init__( if config.semantic_search.reindex: self.embeddings.reindex() + # create communication for updating event descriptions + self.requestor = InterProcessRequestor() + self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() self.event_metadata_subscriber = EventMetadataSubscriber( @@ -89,10 +93,11 @@ def __init__( self.processors.append(BirdProcessor(self.config, metrics)) if self.config.lpr.enabled: - self.processors.append(LicensePlateProcessor(self.config, metrics)) + lpr_model_runner = LicensePlateModelRunner(self.requestor) + self.processors.append( + LicensePlateProcessor(self.config, metrics, lpr_model_runner) + ) - # create communication for updating event descriptions - self.requestor = InterProcessRequestor() self.stop_event = stop_event self.tracked_events: dict[str, list[any]] = {} self.genai_client = get_genai_client(config) From 192418502bc7a5c9c2e15bdac7483e0de6de1547 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 14:08:08 -0600 Subject: [PATCH 06/17] separate out realtime and post processors --- frigate/data_processing/post/license_plate.py | 56 +++++++++++++++++++ .../real_time/{bird_processor.py => bird.py} | 2 +- .../real_time/{face_processor.py => face.py} | 2 +- ...se_plate_processor.py => license_plate.py} | 2 +- frigate/embeddings/maintainer.py | 43 +++++++++----- 5 files changed, 89 insertions(+), 16 deletions(-) create mode 100644 frigate/data_processing/post/license_plate.py rename frigate/data_processing/real_time/{bird_processor.py => bird.py} (99%) rename frigate/data_processing/real_time/{face_processor.py => face.py} (99%) rename frigate/data_processing/real_time/{license_plate_processor.py => license_plate.py} (94%) diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py new file mode 100644 index 0000000000..fc210b2b7e --- /dev/null +++ b/frigate/data_processing/post/license_plate.py @@ -0,0 +1,56 @@ +"""Handle processing images for face detection and recognition.""" + +import datetime +import logging + +from frigate.config import FrigateConfig +from frigate.data_processing.common.license_plate import ( + LicensePlateProcessingMixin, +) +from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner +from frigate.data_processing.types import PostProcessDataEnum + +from ..types import DataProcessorMetrics +from .api import PostProcessorApi + +logger = logging.getLogger(__name__) + + +class LicensePlatePostProcessor(PostProcessorApi, LicensePlateProcessingMixin): + def __init__( + self, + config: FrigateConfig, + metrics: DataProcessorMetrics, + model_runner: LicensePlateModelRunner, + ): + self.model_runner = model_runner + self.lpr_config = config.lpr + self.config = config + super().__init__(config, metrics, model_runner) + + def __update_metrics(self, duration: float) -> None: + """ + Update inference metrics. + """ + self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 + + def process_data( + self, data: dict[str, any], data_type: PostProcessDataEnum + ) -> None: + """Look for license plates in recording stream image + Args: + data (dict): containing data about the input. + data_type (enum): Describing the data that is being processed. + + Returns: + None. + """ + + start = datetime.datetime.now().timestamp() + + # self.lpr_process(obj_data, frame) + + self.__update_metrics(datetime.datetime.now().timestamp() - start) + + def handle_request(self, topic, request_data) -> dict[str, any] | None: + return diff --git a/frigate/data_processing/real_time/bird_processor.py b/frigate/data_processing/real_time/bird.py similarity index 99% rename from frigate/data_processing/real_time/bird_processor.py rename to frigate/data_processing/real_time/bird.py index 1199f61249..01490d8954 100644 --- a/frigate/data_processing/real_time/bird_processor.py +++ b/frigate/data_processing/real_time/bird.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -class BirdProcessor(RealTimeProcessorApi): +class BirdRealTimeProcessor(RealTimeProcessorApi): def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): super().__init__(config, metrics) self.interpreter: Interpreter = None diff --git a/frigate/data_processing/real_time/face_processor.py b/frigate/data_processing/real_time/face.py similarity index 99% rename from frigate/data_processing/real_time/face_processor.py rename to frigate/data_processing/real_time/face.py index 086c596586..d2b6776537 100644 --- a/frigate/data_processing/real_time/face_processor.py +++ b/frigate/data_processing/real_time/face.py @@ -27,7 +27,7 @@ MIN_MATCHING_FACES = 2 -class FaceProcessor(RealTimeProcessorApi): +class FaceRealTimeProcessor(RealTimeProcessorApi): def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics): super().__init__(config, metrics) self.face_config = config.face_recognition diff --git a/frigate/data_processing/real_time/license_plate_processor.py b/frigate/data_processing/real_time/license_plate.py similarity index 94% rename from frigate/data_processing/real_time/license_plate_processor.py rename to frigate/data_processing/real_time/license_plate.py index 96e51dbaa5..3a92dcc115 100644 --- a/frigate/data_processing/real_time/license_plate_processor.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -class LicensePlateProcessor(LicensePlateProcessingMixin, RealTimeProcessorApi): +class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcessorApi): def __init__( self, config: FrigateConfig, diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 0142e813dd..b1adf697e5 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -30,11 +30,15 @@ UPDATE_EVENT_DESCRIPTION, ) from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner +from frigate.data_processing.post.api import PostProcessorApi +from frigate.data_processing.post.license_plate import ( + LicensePlatePostProcessor, +) from frigate.data_processing.real_time.api import RealTimeProcessorApi -from frigate.data_processing.real_time.bird_processor import BirdProcessor -from frigate.data_processing.real_time.face_processor import FaceProcessor -from frigate.data_processing.real_time.license_plate_processor import ( - LicensePlateProcessor, +from frigate.data_processing.real_time.bird import BirdRealTimeProcessor +from frigate.data_processing.real_time.face import FaceRealTimeProcessor +from frigate.data_processing.real_time.license_plate import ( + LicensePlateRealTimeProcessor, ) from frigate.data_processing.types import DataProcessorMetrics from frigate.events.types import EventTypeEnum @@ -84,18 +88,31 @@ def __init__( ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() - self.processors: list[RealTimeProcessorApi] = [] + + # model runners to share between realtime and post processors + if self.config.lpr.enabled: + lpr_model_runner = LicensePlateModelRunner(self.requestor) + + # realtime processors + self.realtime_processors: list[RealTimeProcessorApi] = [] if self.config.face_recognition.enabled: - self.processors.append(FaceProcessor(self.config, metrics)) + self.realtime_processors.append(FaceRealTimeProcessor(self.config, metrics)) if self.config.classification.bird.enabled: - self.processors.append(BirdProcessor(self.config, metrics)) + self.realtime_processors.append(BirdRealTimeProcessor(self.config, metrics)) if self.config.lpr.enabled: - lpr_model_runner = LicensePlateModelRunner(self.requestor) - self.processors.append( - LicensePlateProcessor(self.config, metrics, lpr_model_runner) + self.realtime_processors.append( + LicensePlateRealTimeProcessor(self.config, metrics, lpr_model_runner) + ) + + # post processors + self.post_processors: list[PostProcessorApi] = [] + + if self.config.lpr.enabled: + self.post_processors.append( + LicensePlatePostProcessor(self.config, metrics, lpr_model_runner) ) self.stop_event = stop_event @@ -170,7 +187,7 @@ def _process_updates(self) -> None: camera_config = self.config.cameras[camera] # no need to process updated objects if face recognition, lpr, genai are disabled - if not camera_config.genai.enabled and len(self.processors) == 0: + if not camera_config.genai.enabled and len(self.realtime_processors) == 0: return # Create our own thumbnail based on the bounding box and the frame time @@ -187,7 +204,7 @@ def _process_updates(self) -> None: ) return - for processor in self.processors: + for processor in self.realtime_processors: processor.process_frame(data, yuv_frame) # no need to save our own thumbnails if genai is not enabled @@ -218,7 +235,7 @@ def _process_finalized(self) -> None: event_id, camera, updated_db = ended camera_config = self.config.cameras[camera] - for processor in self.processors: + for processor in self.realtime_processors: processor.expire_object(event_id) if updated_db: From b0bbfabab9b4cf58c094b01325e7e10325b4b101 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 14:11:15 -0600 Subject: [PATCH 07/17] move model and mixin folders --- .../common/{license_plate.py => license_plate/mixin.py} | 0 .../{license_plate_model.py => license_plate/model.py} | 2 +- frigate/data_processing/post/license_plate.py | 6 ++++-- frigate/data_processing/real_time/license_plate.py | 6 ++++-- frigate/embeddings/maintainer.py | 4 +++- 5 files changed, 12 insertions(+), 6 deletions(-) rename frigate/data_processing/common/{license_plate.py => license_plate/mixin.py} (100%) rename frigate/data_processing/common/{license_plate_model.py => license_plate/model.py} (96%) diff --git a/frigate/data_processing/common/license_plate.py b/frigate/data_processing/common/license_plate/mixin.py similarity index 100% rename from frigate/data_processing/common/license_plate.py rename to frigate/data_processing/common/license_plate/mixin.py diff --git a/frigate/data_processing/common/license_plate_model.py b/frigate/data_processing/common/license_plate/model.py similarity index 96% rename from frigate/data_processing/common/license_plate_model.py rename to frigate/data_processing/common/license_plate/model.py index d3e35d3c54..25e7b2cafc 100644 --- a/frigate/data_processing/common/license_plate_model.py +++ b/frigate/data_processing/common/license_plate/model.py @@ -5,7 +5,7 @@ PaddleOCRRecognition, ) -from ..types import DataProcessorModelRunner +from ...types import DataProcessorModelRunner class LicensePlateModelRunner(DataProcessorModelRunner): diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index fc210b2b7e..fd6a455a01 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -4,10 +4,12 @@ import logging from frigate.config import FrigateConfig -from frigate.data_processing.common.license_plate import ( +from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, ) -from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) from frigate.data_processing.types import PostProcessDataEnum from ..types import DataProcessorMetrics diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index 3a92dcc115..f6540948de 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -6,10 +6,12 @@ import numpy as np from frigate.config import FrigateConfig -from frigate.data_processing.common.license_plate import ( +from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, ) -from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) from ..types import DataProcessorMetrics from .api import RealTimeProcessorApi diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index b1adf697e5..00251b13f3 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -29,7 +29,9 @@ CLIPS_DIR, UPDATE_EVENT_DESCRIPTION, ) -from frigate.data_processing.common.license_plate_model import LicensePlateModelRunner +from frigate.data_processing.common.license_plate.model import ( + LicensePlateModelRunner, +) from frigate.data_processing.post.api import PostProcessorApi from frigate.data_processing.post.license_plate import ( LicensePlatePostProcessor, From 8b78aff6f59b5296cf1e5e1275b2b5e09156c595 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 16:11:49 -0600 Subject: [PATCH 08/17] basic postprocessor --- frigate/data_processing/post/license_plate.py | 74 ++++++++++++++++++- frigate/embeddings/__init__.py | 4 +- frigate/embeddings/maintainer.py | 23 +++++- 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index fd6a455a01..4ebf1c6b9f 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -3,6 +3,11 @@ import datetime import logging +import cv2 +import numpy as np +from peewee import DoesNotExist +from playhouse.shortcuts import model_to_dict + from frigate.config import FrigateConfig from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, @@ -11,6 +16,8 @@ LicensePlateModelRunner, ) from frigate.data_processing.types import PostProcessDataEnum +from frigate.models import Event, Recordings +from frigate.util.image import get_image_from_recording from ..types import DataProcessorMetrics from .api import PostProcessorApi @@ -47,10 +54,75 @@ def process_data( Returns: None. """ + event_id = data["event_id"] + camera_name = data["camera"] + recordings_available_through = data["recordings_available"] start = datetime.datetime.now().timestamp() - # self.lpr_process(obj_data, frame) + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + logger.error("License plate event does not exist yet") + return + + # Skip the event if not an object + if event.data.get("type") != "object": + logger.error("Invalid object") + return + + # TODO: need frame time of best plate from realtime processor + frame_time = event.end_time - 5 + + recording_query = ( + Recordings.select( + Recordings.path, + Recordings.start_time, + ) + .where( + ( + (frame_time >= Recordings.start_time) + & (frame_time <= Recordings.end_time) + ) + ) + .where(Recordings.camera == camera_name) + .order_by(Recordings.start_time.desc()) + .limit(1) + ) + + try: + recording: Recordings = recording_query.get() + time_in_segment = frame_time - recording.start_time + codec = "mjpeg" + + image_data = get_image_from_recording( + self.config.ffmpeg, recording.path, time_in_segment, codec, None + ) + + if not image_data: + logger.error("Unable to fetch license plate from recording") + + # Convert bytes to numpy array + image_array = np.frombuffer(image_data, dtype=np.uint8) + + if len(image_array) == 0: + logger.error("No image") + return + + image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + + cv2.imwrite(f"debug/frames/lpr_post_{frame_time}.jpg", image) + + frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) + yuv_height, yuv_width = self.config.cameras[camera_name].frame_shape_yuv + frame_resized = cv2.resize(frame, (yuv_width, yuv_height)) + + logger.info("Post processing plate") + self.lpr_process(model_to_dict(event), frame_resized) + except DoesNotExist: + logger.error( + "Error fetching license plate from recording for postprocessing" + ) self.__update_metrics(datetime.datetime.now().timestamp() - start) diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 185d5436b6..187afe0529 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -17,7 +17,7 @@ from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event +from frigate.models import Event, Recordings from frigate.util.builtin import serialize from frigate.util.services import listen @@ -55,7 +55,7 @@ def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), load_vec_extension=True, ) - models = [Event] + models = [Event, Recordings] db.bind(models) maintainer = EmbeddingMaintainer( diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 00251b13f3..2a2101b120 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -42,7 +42,7 @@ from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) -from frigate.data_processing.types import DataProcessorMetrics +from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event @@ -129,6 +129,7 @@ def run(self) -> None: while not self.stop_event.is_set(): self._process_requests() self._process_updates() + self._process_recordings_updates() self._process_finalized() self._process_event_metadata() @@ -237,6 +238,26 @@ def _process_finalized(self) -> None: event_id, camera, updated_db = ended camera_config = self.config.cameras[camera] + # call any defined post processors + for processor in self.post_processors: + if isinstance(processor, LicensePlatePostProcessor): + recordings_available = self.recordings_available_through.get(camera) + if recordings_available is not None: + # and event_id is an event in detected_license_plates + processor.process_data( + { + "event_id": event_id, + "camera": camera, + "recordings_available": self.recordings_available_through[ + camera + ], + }, + PostProcessDataEnum.recording, + ) + else: + processor.process_data(event_id, PostProcessDataEnum.event_id) + + # expire in realtime processors for processor in self.realtime_processors: processor.expire_object(event_id) From 9349e2ac28ce058e1ed28c8570e52a41e9d54c6d Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 16:16:40 -0600 Subject: [PATCH 09/17] clean up --- frigate/data_processing/post/license_plate.py | 2 +- frigate/record/maintainer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 4ebf1c6b9f..d93a84b06a 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -1,4 +1,4 @@ -"""Handle processing images for face detection and recognition.""" +"""Handle post processing for license plate recognition.""" import datetime import logging diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 55b73df1e1..68d2ca8fd4 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -220,7 +220,7 @@ async def move_files(self) -> None: [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) - # TODO: this is not correct + # publish most recently available recording time self.recordings_publisher.publish( (camera, recordings[0]["start_time"].timestamp()) ) From f7ca351aa4fc080e9f39724118ac6dd2ee9df8b8 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 16:33:55 -0600 Subject: [PATCH 10/17] docs --- docs/docs/configuration/license_plate_recognition.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/docs/configuration/license_plate_recognition.md b/docs/docs/configuration/license_plate_recognition.md index 4fd7aa568d..103c3bf14e 100644 --- a/docs/docs/configuration/license_plate_recognition.md +++ b/docs/docs/configuration/license_plate_recognition.md @@ -41,6 +41,8 @@ lpr: Ensure that your camera is configured to detect objects of type `car`, and that a car is actually being detected by Frigate. Otherwise, LPR will not run. +Like the other real-time processors in Frigate, license plate recognition runs on the camera stream defined by the `detect` role in your config. To ensure optimal performance, select a suitable resolution for this stream in your camera's firmware that fits your specific scene and requirements. + ## Advanced Configuration Fine-tune the LPR feature using these optional parameters: @@ -52,7 +54,7 @@ Fine-tune the LPR feature using these optional parameters: - Note: If you are using a Frigate+ model and you set the `threshold` in your objects config for `license_plate` higher than this value, recognition will never run. It's best to ensure these values match, or this `detection_threshold` is lower than your object config `threshold`. - **`min_area`**: Defines the minimum size (in pixels) a license plate must be before recognition runs. - Default: `1000` pixels. - - Depending on the resolution of your cameras, you can increase this value to ignore small or distant plates. + - Depending on the resolution of your camera's `detect` stream, you can increase this value to ignore small or distant plates. ### Recognition @@ -114,7 +116,7 @@ lpr: Ensure that: - Your camera has a clear, well-lit view of the plate. -- The plate is large enough in the image (try adjusting `min_area`). +- The plate is large enough in the image (try adjusting `min_area`) or increasing the resolution of your camera's stream. - A `car` is detected first, as LPR only runs on recognized vehicles. If you are using a Frigate+ model or a custom model that detects license plates, ensure that `license_plate` is added to your list of objects to track. @@ -143,7 +145,7 @@ Use `match_distance` to allow small character mismatches. Alternatively, define - View MQTT messages for `frigate/events` to verify detected plates. - Adjust `detection_threshold` and `recognition_threshold` settings. - If you are using a Frigate+ model or a model that detects license plates, watch the debug view (Settings --> Debug) to ensure that `license_plate` is being detected with a `car`. -- Enable debug logs for LPR by adding `frigate.data_processing.real_time.license_plate_processor: debug` to your `logger` configuration. These logs are _very_ verbose, so only enable this when necessary. +- Enable debug logs for LPR by adding `frigate.data_processing.common.license_plate: debug` to your `logger` configuration. These logs are _very_ verbose, so only enable this when necessary. ### Will LPR slow down my system? From 9d581e52cb15e4f6df8a1ab600691b48bd6f8a2d Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 18:55:21 -0600 Subject: [PATCH 11/17] postprocessing logic --- .../common/license_plate/mixin.py | 5 +- frigate/data_processing/post/license_plate.py | 116 +++++++++++++++--- .../real_time/license_plate.py | 2 + frigate/embeddings/maintainer.py | 19 ++- 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 13e6dc8d5c..a186806bd9 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -WRITE_DEBUG_IMAGES = False +WRITE_DEBUG_IMAGES = True class LicensePlateProcessingMixin: @@ -28,7 +28,6 @@ def __init__(self, *args, **kwargs): self.requires_license_plate_detection = ( "license_plate" not in self.config.objects.all_objects ) - self.detected_license_plates: dict[str, dict[str, any]] = {} self.ctc_decoder = CTCDecoder() @@ -1025,7 +1024,7 @@ def lpr_process(self, obj_data: dict[str, any], frame: np.ndarray): "plate": top_plate, "char_confidences": top_char_confidences, "area": top_area, - "frame_time": obj_data["frame_time"], + "obj_data": obj_data, } def handle_request(self, topic, request_data) -> dict[str, any] | None: diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index d93a84b06a..fb2c80ae14 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -6,7 +6,6 @@ import cv2 import numpy as np from peewee import DoesNotExist -from playhouse.shortcuts import model_to_dict from frigate.config import FrigateConfig from frigate.data_processing.common.license_plate.mixin import ( @@ -16,7 +15,7 @@ LicensePlateModelRunner, ) from frigate.data_processing.types import PostProcessDataEnum -from frigate.models import Event, Recordings +from frigate.models import Recordings from frigate.util.image import get_image_from_recording from ..types import DataProcessorMetrics @@ -25,13 +24,15 @@ logger = logging.getLogger(__name__) -class LicensePlatePostProcessor(PostProcessorApi, LicensePlateProcessingMixin): +class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): def __init__( self, config: FrigateConfig, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, + detected_license_plates: dict[str, dict[str, any]], ): + self.detected_license_plates = detected_license_plates self.model_runner = model_runner self.lpr_config = config.lpr self.config = config @@ -57,22 +58,23 @@ def process_data( event_id = data["event_id"] camera_name = data["camera"] recordings_available_through = data["recordings_available"] + obj_data = data["obj_data"] start = datetime.datetime.now().timestamp() - try: - event: Event = Event.get(Event.id == event_id) - except DoesNotExist: - logger.error("License plate event does not exist yet") + # Skip the event if it's not a previously processed plate + if event_id not in self.detected_license_plates: + logger.debug( + f"LPR post processing: {event_id} is not a previously processed license plate" + ) return - # Skip the event if not an object - if event.data.get("type") != "object": - logger.error("Invalid object") - return + frame_time = obj_data["frame_time"] - # TODO: need frame time of best plate from realtime processor - frame_time = event.end_time - 5 + if frame_time > recordings_available_through: + logger.debug( + f"LPR post processing: No recordings available for this frame time {frame_time}, available through {recordings_available_through}" + ) recording_query = ( Recordings.select( @@ -100,13 +102,15 @@ def process_data( ) if not image_data: - logger.error("Unable to fetch license plate from recording") + logger.debug( + "LPR post processing: Unable to fetch license plate from recording" + ) # Convert bytes to numpy array image_array = np.frombuffer(image_data, dtype=np.uint8) if len(image_array) == 0: - logger.error("No image") + logger.debug("LPR post processing: No image") return image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) @@ -114,13 +118,85 @@ def process_data( cv2.imwrite(f"debug/frames/lpr_post_{frame_time}.jpg", image) frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) - yuv_height, yuv_width = self.config.cameras[camera_name].frame_shape_yuv - frame_resized = cv2.resize(frame, (yuv_width, yuv_height)) - logger.info("Post processing plate") - self.lpr_process(model_to_dict(event), frame_resized) + detect_width = self.config.cameras[camera_name].detect.width + detect_height = self.config.cameras[camera_name].detect.height + + # Scale the boxes based on detect dimensions + scale_x = image.shape[1] / detect_width + scale_y = image.shape[0] / detect_height + + # Determine which box to enlarge based on detection mode + if self.requires_license_plate_detection: + # Scale and enlarge the car box + box = obj_data.get("box") + if not box: + return + + # Scale original box to detection dimensions + left = int(box[0] * scale_x) + top = int(box[1] * scale_y) + right = int(box[2] * scale_x) + bottom = int(box[3] * scale_y) + box = [left, top, right, bottom] + else: + # Get the license plate box from attributes + if not obj_data.get("current_attributes"): + return + + license_plate = None + for attr in obj_data["current_attributes"]: + if attr.get("label") != "license_plate": + continue + if license_plate is None or attr.get( + "score", 0.0 + ) > license_plate.get("score", 0.0): + license_plate = attr + + if not license_plate or not license_plate.get("box"): + return + + # Scale license plate box to detection dimensions + orig_box = license_plate["box"] + left = int(orig_box[0] * scale_x) + top = int(orig_box[1] * scale_y) + right = int(orig_box[2] * scale_x) + bottom = int(orig_box[3] * scale_y) + box = [left, top, right, bottom] + + width_box = right - left + height_box = bottom - top + + # Enlarge box slightly to account for drift in detect vs recording stream + enlarge_factor = 0.3 + new_left = max(0, int(left - (width_box * enlarge_factor / 2))) + new_top = max(0, int(top - (height_box * enlarge_factor / 2))) + new_right = min( + image.shape[1], int(right + (width_box * enlarge_factor / 2)) + ) + new_bottom = min( + image.shape[0], int(bottom + (height_box * enlarge_factor / 2)) + ) + + keyframe_obj_data = obj_data.copy() + if self.requires_license_plate_detection: + keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] + else: + # Update the license plate box in the attributes + new_attributes = [] + for attr in obj_data["current_attributes"]: + if attr.get("label") == "license_plate": + new_attr = attr.copy() + new_attr["box"] = [new_left, new_top, new_right, new_bottom] + new_attributes.append(new_attr) + else: + new_attributes.append(attr) + keyframe_obj_data["current_attributes"] = new_attributes + + logger.debug(f"Post processing plate: {event_id}, {frame_time}") + self.lpr_process(keyframe_obj_data, frame) except DoesNotExist: - logger.error( + logger.debug( "Error fetching license plate from recording for postprocessing" ) diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index f6540948de..a5a1577fef 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -25,7 +25,9 @@ def __init__( config: FrigateConfig, metrics: DataProcessorMetrics, model_runner: LicensePlateModelRunner, + detected_license_plates: dict[str, dict[str, any]], ): + self.detected_license_plates = detected_license_plates self.model_runner = model_runner self.lpr_config = config.lpr self.config = config diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 2a2101b120..9ceb793693 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -91,6 +91,8 @@ def __init__( self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() + self.detected_license_plates: dict[str, dict[str, any]] = {} + # model runners to share between realtime and post processors if self.config.lpr.enabled: lpr_model_runner = LicensePlateModelRunner(self.requestor) @@ -106,7 +108,9 @@ def __init__( if self.config.lpr.enabled: self.realtime_processors.append( - LicensePlateRealTimeProcessor(self.config, metrics, lpr_model_runner) + LicensePlateRealTimeProcessor( + self.config, metrics, lpr_model_runner, self.detected_license_plates + ) ) # post processors @@ -114,7 +118,9 @@ def __init__( if self.config.lpr.enabled: self.post_processors.append( - LicensePlatePostProcessor(self.config, metrics, lpr_model_runner) + LicensePlatePostProcessor( + self.config, metrics, lpr_model_runner, self.detected_license_plates + ) ) self.stop_event = stop_event @@ -242,8 +248,10 @@ def _process_finalized(self) -> None: for processor in self.post_processors: if isinstance(processor, LicensePlatePostProcessor): recordings_available = self.recordings_available_through.get(camera) - if recordings_available is not None: - # and event_id is an event in detected_license_plates + if ( + recordings_available is not None + and event_id in self.detected_license_plates + ): processor.process_data( { "event_id": event_id, @@ -251,6 +259,9 @@ def _process_finalized(self) -> None: "recordings_available": self.recordings_available_through[ camera ], + "obj_data": self.detected_license_plates[event_id][ + "obj_data" + ], }, PostProcessDataEnum.recording, ) From ea937ad2ea5e487a29a25bd1071f2659a43961c9 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 20 Feb 2025 18:58:12 -0600 Subject: [PATCH 12/17] clean up --- frigate/data_processing/post/license_plate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index fb2c80ae14..791ff15dea 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -115,7 +115,8 @@ def process_data( image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - cv2.imwrite(f"debug/frames/lpr_post_{frame_time}.jpg", image) + if False: + cv2.imwrite(f"debug/frames/lpr_post_{frame_time}.jpg", image) frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) From 5448e7d3400fa7fcbb1a536a2d429bff1a1c371f Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 21 Feb 2025 07:11:47 -0600 Subject: [PATCH 13/17] return none if recordings are disabled --- frigate/record/maintainer.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 68d2ca8fd4..faa41f75fd 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -220,9 +220,14 @@ async def move_files(self) -> None: [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) - # publish most recently available recording time + # publish most recently available recording time and None if disabled self.recordings_publisher.publish( - (camera, recordings[0]["start_time"].timestamp()) + ( + camera, + recordings[0]["start_time"].timestamp() + if self.config.cameras[camera].record.enabled + else None, + ) ) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) From 17a99f41f2e60043c31c14c438bc09da7a252364 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 21 Feb 2025 07:12:10 -0600 Subject: [PATCH 14/17] run postprocessor handle_requests too --- frigate/embeddings/maintainer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 9ceb793693..a18ca7a7f3 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -171,13 +171,15 @@ def _handle_request(topic: str, data: dict[str, any]) -> str: pack=False, ) else: - for processor in self.processors: - resp = processor.handle_request(topic, data) + processors = [self.realtime_processors, self.post_processors] + for processor_list in processors: + for processor in processor_list: + resp = processor.handle_request(topic, data) if resp is not None: return resp except Exception as e: - logger.error(f"Unable to handle embeddings request {e}") + logger.error(f"Unable to handle embeddings request {e}", exc_info=True) self.embeddings_responder.check_for_request(_handle_request) From 8b6e4b7721304c54c01287f2cfa5f71976603999 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 21 Feb 2025 07:12:54 -0600 Subject: [PATCH 15/17] tweak expansion --- frigate/data_processing/common/license_plate/mixin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index a186806bd9..1723d213e2 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -WRITE_DEBUG_IMAGES = True +WRITE_DEBUG_IMAGES = False class LicensePlateProcessingMixin: @@ -717,8 +717,8 @@ def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: # Return the top scoring bounding box if found if top_box is not None: - # expand box by 15% to help with OCR - expansion = (top_box[2:] - top_box[:2]) * 0.1 + # expand box by 30% to help with OCR + expansion = (top_box[2:] - top_box[:2]) * 0.30 # Expand box expanded_box = np.array( From 27757b80dfff53c893559e20f300e6e976d732b6 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 21 Feb 2025 07:13:27 -0600 Subject: [PATCH 16/17] add put endpoint --- frigate/api/classification.py | 36 +++++++++++++++++++++++++++++ frigate/comms/embeddings_updater.py | 1 + frigate/embeddings/__init__.py | 5 ++++ 3 files changed, 42 insertions(+) diff --git a/frigate/api/classification.py b/frigate/api/classification.py index 7cd127d070..bd395737ab 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -9,10 +9,13 @@ from fastapi import APIRouter, Request, UploadFile from fastapi.responses import JSONResponse from pathvalidate import sanitize_filename +from peewee import DoesNotExist +from playhouse.shortcuts import model_to_dict from frigate.api.defs.tags import Tags from frigate.const import FACE_DIR from frigate.embeddings import EmbeddingsContext +from frigate.models import Event logger = logging.getLogger(__name__) @@ -176,3 +179,36 @@ def deregister_faces(request: Request, name: str, body: dict = None): content=({"success": True, "message": "Successfully deleted faces."}), status_code=200, ) + + +@router.put("/lpr/reprocess") +def reprocess_license_plate(request: Request, event_id: str): + if not request.app.frigate_config.lpr.enabled: + message = "License plate recognition is not enabled." + logger.error(message) + return JSONResponse( + content=( + { + "success": False, + "message": message, + } + ), + status_code=400, + ) + + try: + event = Event.get(Event.id == event_id) + except DoesNotExist: + message = f"Event {event_id} not found" + logger.error(message) + return JSONResponse( + content=({"success": False, "message": message}), status_code=404 + ) + + context: EmbeddingsContext = request.app.embeddings + response = context.reprocess_plate(model_to_dict(event)) + + return JSONResponse( + content=response, + status_code=200, + ) diff --git a/frigate/comms/embeddings_updater.py b/frigate/comms/embeddings_updater.py index 58f012e7d6..61c2331cf7 100644 --- a/frigate/comms/embeddings_updater.py +++ b/frigate/comms/embeddings_updater.py @@ -15,6 +15,7 @@ class EmbeddingsRequestEnum(Enum): generate_search = "generate_search" register_face = "register_face" reprocess_face = "reprocess_face" + reprocess_plate = "reprocess_plate" class EmbeddingsResponder: diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 187afe0529..18673c4e92 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -234,3 +234,8 @@ def update_description(self, event_id: str, description: str) -> None: EmbeddingsRequestEnum.embed_description.value, {"id": event_id, "description": description}, ) + + def reprocess_plate(self, event: dict[str, any]) -> dict[str, any]: + return self.requestor.send_data( + EmbeddingsRequestEnum.reprocess_plate.value, {"event": event} + ) From db75e78920dae55a248dd04fe1776e5edae87f65 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 21 Feb 2025 07:13:39 -0600 Subject: [PATCH 17/17] postprocessor tweaks with endpoint --- frigate/data_processing/post/license_plate.py | 222 ++++++++++-------- 1 file changed, 123 insertions(+), 99 deletions(-) diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 791ff15dea..9a9974bc72 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -7,8 +7,10 @@ import numpy as np from peewee import DoesNotExist +from frigate.comms.embeddings_updater import EmbeddingsRequestEnum from frigate.config import FrigateConfig from frigate.data_processing.common.license_plate.mixin import ( + WRITE_DEBUG_IMAGES, LicensePlateProcessingMixin, ) from frigate.data_processing.common.license_plate.model import ( @@ -55,26 +57,32 @@ def process_data( Returns: None. """ + start = datetime.datetime.now().timestamp() + event_id = data["event_id"] camera_name = data["camera"] - recordings_available_through = data["recordings_available"] - obj_data = data["obj_data"] - start = datetime.datetime.now().timestamp() + if data_type == PostProcessDataEnum.recording: + obj_data = data["obj_data"] + frame_time = obj_data["frame_time"] + recordings_available_through = data["recordings_available"] - # Skip the event if it's not a previously processed plate - if event_id not in self.detected_license_plates: - logger.debug( - f"LPR post processing: {event_id} is not a previously processed license plate" - ) - return + if frame_time > recordings_available_through: + logger.debug( + f"LPR post processing: No recordings available for this frame time {frame_time}, available through {recordings_available_through}" + ) - frame_time = obj_data["frame_time"] + elif data_type == PostProcessDataEnum.tracked_object: + # non-functional, need to think about snapshot time + obj_data = data["event"]["data"] + obj_data["id"] = data["event"]["id"] + obj_data["camera"] = data["event"]["camera"] + # TODO: snapshot time? + frame_time = data["event"]["start_time"] - if frame_time > recordings_available_through: - logger.debug( - f"LPR post processing: No recordings available for this frame time {frame_time}, available through {recordings_available_through}" - ) + else: + logger.error("No data type passed to LPR postprocessing") + return recording_query = ( Recordings.select( @@ -115,93 +123,109 @@ def process_data( image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - if False: - cv2.imwrite(f"debug/frames/lpr_post_{frame_time}.jpg", image) - - frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) - - detect_width = self.config.cameras[camera_name].detect.width - detect_height = self.config.cameras[camera_name].detect.height - - # Scale the boxes based on detect dimensions - scale_x = image.shape[1] / detect_width - scale_y = image.shape[0] / detect_height - - # Determine which box to enlarge based on detection mode - if self.requires_license_plate_detection: - # Scale and enlarge the car box - box = obj_data.get("box") - if not box: - return - - # Scale original box to detection dimensions - left = int(box[0] * scale_x) - top = int(box[1] * scale_y) - right = int(box[2] * scale_x) - bottom = int(box[3] * scale_y) - box = [left, top, right, bottom] - else: - # Get the license plate box from attributes - if not obj_data.get("current_attributes"): - return - - license_plate = None - for attr in obj_data["current_attributes"]: - if attr.get("label") != "license_plate": - continue - if license_plate is None or attr.get( - "score", 0.0 - ) > license_plate.get("score", 0.0): - license_plate = attr - - if not license_plate or not license_plate.get("box"): - return - - # Scale license plate box to detection dimensions - orig_box = license_plate["box"] - left = int(orig_box[0] * scale_x) - top = int(orig_box[1] * scale_y) - right = int(orig_box[2] * scale_x) - bottom = int(orig_box[3] * scale_y) - box = [left, top, right, bottom] - - width_box = right - left - height_box = bottom - top - - # Enlarge box slightly to account for drift in detect vs recording stream - enlarge_factor = 0.3 - new_left = max(0, int(left - (width_box * enlarge_factor / 2))) - new_top = max(0, int(top - (height_box * enlarge_factor / 2))) - new_right = min( - image.shape[1], int(right + (width_box * enlarge_factor / 2)) - ) - new_bottom = min( - image.shape[0], int(bottom + (height_box * enlarge_factor / 2)) - ) - - keyframe_obj_data = obj_data.copy() - if self.requires_license_plate_detection: - keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] - else: - # Update the license plate box in the attributes - new_attributes = [] - for attr in obj_data["current_attributes"]: - if attr.get("label") == "license_plate": - new_attr = attr.copy() - new_attr["box"] = [new_left, new_top, new_right, new_bottom] - new_attributes.append(new_attr) - else: - new_attributes.append(attr) - keyframe_obj_data["current_attributes"] = new_attributes - - logger.debug(f"Post processing plate: {event_id}, {frame_time}") - self.lpr_process(keyframe_obj_data, frame) except DoesNotExist: - logger.debug( - "Error fetching license plate from recording for postprocessing" - ) + logger.debug("Error fetching license plate for postprocessing") + return + + if WRITE_DEBUG_IMAGES: + cv2.imwrite(f"debug/frames/lpr_post_{start}.jpg", image) + + # convert to yuv for processing + frame = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420) + + detect_width = self.config.cameras[camera_name].detect.width + detect_height = self.config.cameras[camera_name].detect.height + + # Scale the boxes based on detect dimensions + scale_x = image.shape[1] / detect_width + scale_y = image.shape[0] / detect_height + + # Determine which box to enlarge based on detection mode + if self.requires_license_plate_detection: + # Scale and enlarge the car box + box = obj_data.get("box") + if not box: + return + + # Scale original car box to detection dimensions + left = int(box[0] * scale_x) + top = int(box[1] * scale_y) + right = int(box[2] * scale_x) + bottom = int(box[3] * scale_y) + box = [left, top, right, bottom] + else: + # Get the license plate box from attributes + if not obj_data.get("current_attributes"): + return + + license_plate = None + for attr in obj_data["current_attributes"]: + if attr.get("label") != "license_plate": + continue + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + if not license_plate or not license_plate.get("box"): + return + + # Scale license plate box to detection dimensions + orig_box = license_plate["box"] + left = int(orig_box[0] * scale_x) + top = int(orig_box[1] * scale_y) + right = int(orig_box[2] * scale_x) + bottom = int(orig_box[3] * scale_y) + box = [left, top, right, bottom] + + width_box = right - left + height_box = bottom - top + + # Enlarge box slightly to account for drift in detect vs recording stream + enlarge_factor = 0.3 + new_left = max(0, int(left - (width_box * enlarge_factor / 2))) + new_top = max(0, int(top - (height_box * enlarge_factor / 2))) + new_right = min(image.shape[1], int(right + (width_box * enlarge_factor / 2))) + new_bottom = min( + image.shape[0], int(bottom + (height_box * enlarge_factor / 2)) + ) + + keyframe_obj_data = obj_data.copy() + if self.requires_license_plate_detection: + # car box + keyframe_obj_data["box"] = [new_left, new_top, new_right, new_bottom] + else: + # Update the license plate box in the attributes + new_attributes = [] + for attr in obj_data["current_attributes"]: + if attr.get("label") == "license_plate": + new_attr = attr.copy() + new_attr["box"] = [new_left, new_top, new_right, new_bottom] + new_attributes.append(new_attr) + else: + new_attributes.append(attr) + keyframe_obj_data["current_attributes"] = new_attributes + + # run the frame through lpr processing + logger.debug(f"Post processing plate: {event_id}, {frame_time}") + self.lpr_process(keyframe_obj_data, frame) self.__update_metrics(datetime.datetime.now().timestamp() - start) def handle_request(self, topic, request_data) -> dict[str, any] | None: - return + if topic == EmbeddingsRequestEnum.reprocess_plate.value: + event = request_data["event"] + + self.process_data( + { + "event_id": event["id"], + "camera": event["camera"], + "event": event, + }, + PostProcessDataEnum.tracked_object, + ) + + return { + "message": "Successfully requested reprocessing of license plate.", + "success": True, + }