From 2eb87b8806588e2d065b54705ba745cf8a3f2663 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Tue, 25 May 2021 13:57:07 +0800 Subject: [PATCH] Combine StreamBuffer into SegmentBuffer in stream (#51041) * Combine StreamBuffer into SegmentBuffer in stream * Use new style type hint in comment Remove unused member self._segment * Change reset_av to static helper function * Change make_new_av to only return OutputContainer --- homeassistant/components/stream/core.py | 16 +--- homeassistant/components/stream/worker.py | 109 ++++++++++++---------- 2 files changed, 62 insertions(+), 63 deletions(-) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index a289464f92b04e..0d29474858f479 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -4,7 +4,7 @@ import asyncio from collections import deque import io -from typing import TYPE_CHECKING, Callable +from typing import Callable from aiohttp import web import attr @@ -16,23 +16,9 @@ from .const import ATTR_STREAMS, DOMAIN -if TYPE_CHECKING: - import av.container - import av.video - PROVIDERS = Registry() -@attr.s -class StreamBuffer: - """Represent a segment.""" - - segment: io.BytesIO = attr.ib() - output: av.container.OutputContainer = attr.ib() - vstream: av.video.VideoStream = attr.ib() - astream = attr.ib(default=None) # type=Optional[av.audio.AudioStream] - - @attr.s class Segment: """Represent a segment.""" diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 05dc0b076a48c9..d6562cf93db80c 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -2,8 +2,9 @@ from __future__ import annotations from collections import deque -import io +from io import BytesIO import logging +from typing import cast import av @@ -17,38 +18,11 @@ SEGMENT_CONTAINER_FORMAT, STREAM_TIMEOUT, ) -from .core import Segment, StreamBuffer, StreamOutput +from .core import Segment, StreamOutput _LOGGER = logging.getLogger(__name__) -def create_stream_buffer(video_stream, audio_stream, sequence): - """Create a new StreamBuffer.""" - - segment = io.BytesIO() - container_options = { - # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 - "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets", - "avoid_negative_ts": "disabled", - "fragment_index": str(sequence), - } - output = av.open( - segment, - mode="w", - format=SEGMENT_CONTAINER_FORMAT, - container_options={ - "video_track_timescale": str(int(1 / video_stream.time_base)), - **container_options, - }, - ) - vstream = output.add_stream(template=video_stream) - # Check if audio is requested - astream = None - if audio_stream and audio_stream.name in AUDIO_CODECS: - astream = output.add_stream(template=audio_stream) - return StreamBuffer(segment, output, vstream, astream) - - class SegmentBuffer: """Buffer for writing a sequence of packets to the output as a segment.""" @@ -61,12 +35,41 @@ def __init__(self, outputs_callback) -> None: self._outputs: list[StreamOutput] = [] self._sequence = 0 self._segment_start_pts = None - self._stream_buffer = None + self._memory_file: BytesIO = cast(BytesIO, None) + self._av_output: av.container.OutputContainer = None + self._input_video_stream: av.video.VideoStream = None + self._input_audio_stream = None # av.audio.AudioStream | None + self._output_video_stream: av.video.VideoStream = None + self._output_audio_stream = None # av.audio.AudioStream | None + + @staticmethod + def make_new_av( + memory_file, sequence: int, input_vstream: av.video.VideoStream + ) -> av.container.OutputContainer: + """Make a new av OutputContainer.""" + return av.open( + memory_file, + mode="w", + format=SEGMENT_CONTAINER_FORMAT, + container_options={ + # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 + # "cmaf" flag replaces several of the movflags used, but too recent to use for now + "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer", + "avoid_negative_ts": "disabled", + "fragment_index": str(sequence + 1), + "video_track_timescale": str(int(1 / input_vstream.time_base)), + }, + ) - def set_streams(self, video_stream, audio_stream): + def set_streams( + self, + video_stream: av.video.VideoStream, + audio_stream, + # no type hint for audio_stream until https://github.com/PyAV-Org/PyAV/pull/775 is merged + ) -> None: """Initialize output buffer with streams from container.""" - self._video_stream = video_stream - self._audio_stream = audio_stream + self._input_video_stream = video_stream + self._input_audio_stream = audio_stream def reset(self, video_pts): """Initialize a new stream segment.""" @@ -77,15 +80,27 @@ def reset(self, video_pts): # Fetch the latest StreamOutputs, which may have changed since the # worker started. self._outputs = self._outputs_callback().values() - self._stream_buffer = create_stream_buffer( - self._video_stream, self._audio_stream, self._sequence + self._memory_file = BytesIO() + self._av_output = self.make_new_av( + memory_file=self._memory_file, + sequence=self._sequence, + input_vstream=self._input_video_stream, ) + self._output_video_stream = self._av_output.add_stream( + template=self._input_video_stream + ) + # Check if audio is requested + self._output_audio_stream = None + if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS: + self._output_audio_stream = self._av_output.add_stream( + template=self._input_audio_stream + ) def mux_packet(self, packet): - """Mux a packet to the appropriate StreamBuffers.""" + """Mux a packet to the appropriate output stream.""" # Check for end of segment - if packet.stream == self._video_stream and packet.is_keyframe: + if packet.stream == self._input_video_stream and packet.is_keyframe: duration = (packet.pts - self._segment_start_pts) * packet.time_base if duration >= MIN_SEGMENT_DURATION: # Save segment to outputs @@ -95,19 +110,17 @@ def mux_packet(self, packet): self.reset(packet.pts) # Mux the packet - if packet.stream == self._video_stream: - packet.stream = self._stream_buffer.vstream - self._stream_buffer.output.mux(packet) - elif packet.stream == self._audio_stream: - packet.stream = self._stream_buffer.astream - self._stream_buffer.output.mux(packet) + if packet.stream == self._input_video_stream: + packet.stream = self._output_video_stream + self._av_output.mux(packet) + elif packet.stream == self._input_audio_stream: + packet.stream = self._output_audio_stream + self._av_output.mux(packet) def flush(self, duration): """Create a segment from the buffered packets and write to output.""" - self._stream_buffer.output.close() - segment = Segment( - self._sequence, self._stream_buffer.segment, duration, self._stream_id - ) + self._av_output.close() + segment = Segment(self._sequence, self._memory_file, duration, self._stream_id) for stream_output in self._outputs: stream_output.put(segment) @@ -120,7 +133,7 @@ def discontinuity(self): def close(self): """Close stream buffer.""" - self._stream_buffer.output.close() + self._av_output.close() def stream_worker(source, options, segment_buffer, quit_event): # noqa: C901