diff --git a/inference/core/version.py b/inference/core/version.py index 0847f52bd..9f3dce730 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.34.0" +__version__ = "0.35.0rc1" if __name__ == "__main__": diff --git a/inference_cli/lib/workflows/core.py b/inference_cli/lib/workflows/core.py index 1fa84b767..38cba46c5 100644 --- a/inference_cli/lib/workflows/core.py +++ b/inference_cli/lib/workflows/core.py @@ -30,7 +30,7 @@ def run_video_processing_with_workflows( from inference_cli.lib.workflows.video_adapter import process_video_with_workflow - process_video_with_workflow( + _ = process_video_with_workflow( input_video_path=input_video_path, output_directory=output_directory, output_file_type=output_file_type, diff --git a/inference_cli/lib/workflows/entities.py b/inference_cli/lib/workflows/entities.py index 901279844..d98ccf295 100644 --- a/inference_cli/lib/workflows/entities.py +++ b/inference_cli/lib/workflows/entities.py @@ -34,3 +34,9 @@ class ImagesDirectoryProcessingDetails: ] aggregated_results_path: Optional[str] = field(default=None) failures_report_path: Optional[str] = field(default=None) + + +@dataclass(frozen=True) +class VideoProcessingDetails: + structured_results_file: Optional[str] + video_outputs: Optional[Dict[str, str]] diff --git a/inference_cli/lib/workflows/video_adapter.py b/inference_cli/lib/workflows/video_adapter.py index 1de519542..f38497577 100644 --- a/inference_cli/lib/workflows/video_adapter.py +++ b/inference_cli/lib/workflows/video_adapter.py @@ -16,7 +16,7 @@ from inference.core.utils.image_utils import load_image_bgr from inference_cli.lib.utils import dump_jsonl from inference_cli.lib.workflows.common import deduct_images, dump_objects_to_json -from inference_cli.lib.workflows.entities import OutputFileType +from inference_cli.lib.workflows.entities import OutputFileType, VideoProcessingDetails def process_video_with_workflow( @@ -31,10 +31,11 @@ def process_video_with_workflow( max_fps: Optional[float] = None, save_image_outputs_as_video: bool = True, api_key: Optional[str] = None, -) -> None: +) -> VideoProcessingDetails: structured_sink = WorkflowsStructuredDataSink( output_directory=output_directory, output_file_type=output_file_type, + numbers_of_streams=1, ) progress_sink = ProgressSink.init(input_video_path=input_video_path) sinks = [structured_sink.on_prediction, progress_sink.on_prediction] @@ -61,9 +62,14 @@ def process_video_with_workflow( pipeline.start(use_main_thread=True) pipeline.join() progress_sink.stop() - structured_sink.flush() + structured_results_file = structured_sink.flush()[0] + video_outputs = None if video_sink is not None: - video_sink.release() + video_outputs = video_sink.release() + return VideoProcessingDetails( + structured_results_file=structured_results_file, + video_outputs=video_outputs, + ) class WorkflowsStructuredDataSink: @@ -72,10 +78,12 @@ def __init__( self, output_directory: str, output_file_type: OutputFileType, + numbers_of_streams: int = 1, ): self._output_directory = output_directory self._structured_results_buffer = defaultdict(list) self._output_file_type = output_file_type + self._numbers_of_streams = numbers_of_streams def on_prediction( self, @@ -94,11 +102,17 @@ def on_prediction( } self._structured_results_buffer[stream_idx].append(prediction) - def flush(self) -> None: + def flush(self) -> List[Optional[str]]: + stream_idx2file_path = {} for stream_idx, buffer in self._structured_results_buffer.items(): - self._flush_stream_buffer(stream_idx=stream_idx) - - def _flush_stream_buffer(self, stream_idx: int) -> None: + file_path = self._flush_stream_buffer(stream_idx=stream_idx) + stream_idx2file_path[stream_idx] = file_path + return [ + stream_idx2file_path.get(stream_idx) + for stream_idx in range(self._numbers_of_streams) + ] + + def _flush_stream_buffer(self, stream_idx: int) -> Optional[str]: content = self._structured_results_buffer[stream_idx] if len(content) == 0: return None @@ -114,6 +128,7 @@ def _flush_stream_buffer(self, stream_idx: int) -> None: else: dump_jsonl(path=file_path, content=content) self._structured_results_buffer[stream_idx] = [] + return file_path def __del__(self): self.flush() @@ -182,11 +197,14 @@ def on_prediction( image = load_image_bgr(value) stream_sinks[key].write_frame(frame=image) - def release(self) -> None: - for stream_sinks in self._video_sinks.values(): - for sink in stream_sinks.values(): + def release(self) -> Optional[Dict[str, str]]: + stream_idx2keys_videos: Dict[int, Dict[str, str]] = defaultdict(dict) + for stream_idx, stream_sinks in self._video_sinks.items(): + for key, sink in stream_sinks.items(): sink.release() + stream_idx2keys_videos[stream_idx][key] = sink.target_path self._video_sinks = defaultdict(dict) + return stream_idx2keys_videos.get(0) def __del__(self): self.release()