Skip to content

Commit

Permalink
Combine StreamBuffer into SegmentBuffer in stream (home-assistant#51041)
Browse files Browse the repository at this point in the history
* Combine StreamBuffer into SegmentBuffer in stream

* Use new style type hint in comment
Remove unused member self._segment

* Change reset_av to static helper function

* Change make_new_av to only return OutputContainer
  • Loading branch information
uvjustin authored May 25, 2021
1 parent 0fb2504 commit 2eb87b8
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 63 deletions.
16 changes: 1 addition & 15 deletions homeassistant/components/stream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
from collections import deque
import io
from typing import TYPE_CHECKING, Callable
from typing import Callable

from aiohttp import web
import attr
Expand All @@ -16,23 +16,9 @@

from .const import ATTR_STREAMS, DOMAIN

if TYPE_CHECKING:
import av.container
import av.video

PROVIDERS = Registry()


@attr.s
class StreamBuffer:
"""Represent a segment."""

segment: io.BytesIO = attr.ib()
output: av.container.OutputContainer = attr.ib()
vstream: av.video.VideoStream = attr.ib()
astream = attr.ib(default=None) # type=Optional[av.audio.AudioStream]


@attr.s
class Segment:
"""Represent a segment."""
Expand Down
109 changes: 61 additions & 48 deletions homeassistant/components/stream/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from __future__ import annotations

from collections import deque
import io
from io import BytesIO
import logging
from typing import cast

import av

Expand All @@ -17,38 +18,11 @@
SEGMENT_CONTAINER_FORMAT,
STREAM_TIMEOUT,
)
from .core import Segment, StreamBuffer, StreamOutput
from .core import Segment, StreamOutput

_LOGGER = logging.getLogger(__name__)


def create_stream_buffer(video_stream, audio_stream, sequence):
"""Create a new StreamBuffer."""

segment = io.BytesIO()
container_options = {
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets",
"avoid_negative_ts": "disabled",
"fragment_index": str(sequence),
}
output = av.open(
segment,
mode="w",
format=SEGMENT_CONTAINER_FORMAT,
container_options={
"video_track_timescale": str(int(1 / video_stream.time_base)),
**container_options,
},
)
vstream = output.add_stream(template=video_stream)
# Check if audio is requested
astream = None
if audio_stream and audio_stream.name in AUDIO_CODECS:
astream = output.add_stream(template=audio_stream)
return StreamBuffer(segment, output, vstream, astream)


class SegmentBuffer:
"""Buffer for writing a sequence of packets to the output as a segment."""

Expand All @@ -61,12 +35,41 @@ def __init__(self, outputs_callback) -> None:
self._outputs: list[StreamOutput] = []
self._sequence = 0
self._segment_start_pts = None
self._stream_buffer = None
self._memory_file: BytesIO = cast(BytesIO, None)
self._av_output: av.container.OutputContainer = None
self._input_video_stream: av.video.VideoStream = None
self._input_audio_stream = None # av.audio.AudioStream | None
self._output_video_stream: av.video.VideoStream = None
self._output_audio_stream = None # av.audio.AudioStream | None

@staticmethod
def make_new_av(
memory_file, sequence: int, input_vstream: av.video.VideoStream
) -> av.container.OutputContainer:
"""Make a new av OutputContainer."""
return av.open(
memory_file,
mode="w",
format=SEGMENT_CONTAINER_FORMAT,
container_options={
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
# "cmaf" flag replaces several of the movflags used, but too recent to use for now
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer",
"avoid_negative_ts": "disabled",
"fragment_index": str(sequence + 1),
"video_track_timescale": str(int(1 / input_vstream.time_base)),
},
)

def set_streams(self, video_stream, audio_stream):
def set_streams(
self,
video_stream: av.video.VideoStream,
audio_stream,
# no type hint for audio_stream until https://github.com/PyAV-Org/PyAV/pull/775 is merged
) -> None:
"""Initialize output buffer with streams from container."""
self._video_stream = video_stream
self._audio_stream = audio_stream
self._input_video_stream = video_stream
self._input_audio_stream = audio_stream

def reset(self, video_pts):
"""Initialize a new stream segment."""
Expand All @@ -77,15 +80,27 @@ def reset(self, video_pts):
# Fetch the latest StreamOutputs, which may have changed since the
# worker started.
self._outputs = self._outputs_callback().values()
self._stream_buffer = create_stream_buffer(
self._video_stream, self._audio_stream, self._sequence
self._memory_file = BytesIO()
self._av_output = self.make_new_av(
memory_file=self._memory_file,
sequence=self._sequence,
input_vstream=self._input_video_stream,
)
self._output_video_stream = self._av_output.add_stream(
template=self._input_video_stream
)
# Check if audio is requested
self._output_audio_stream = None
if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS:
self._output_audio_stream = self._av_output.add_stream(
template=self._input_audio_stream
)

def mux_packet(self, packet):
"""Mux a packet to the appropriate StreamBuffers."""
"""Mux a packet to the appropriate output stream."""

# Check for end of segment
if packet.stream == self._video_stream and packet.is_keyframe:
if packet.stream == self._input_video_stream and packet.is_keyframe:
duration = (packet.pts - self._segment_start_pts) * packet.time_base
if duration >= MIN_SEGMENT_DURATION:
# Save segment to outputs
Expand All @@ -95,19 +110,17 @@ def mux_packet(self, packet):
self.reset(packet.pts)

# Mux the packet
if packet.stream == self._video_stream:
packet.stream = self._stream_buffer.vstream
self._stream_buffer.output.mux(packet)
elif packet.stream == self._audio_stream:
packet.stream = self._stream_buffer.astream
self._stream_buffer.output.mux(packet)
if packet.stream == self._input_video_stream:
packet.stream = self._output_video_stream
self._av_output.mux(packet)
elif packet.stream == self._input_audio_stream:
packet.stream = self._output_audio_stream
self._av_output.mux(packet)

def flush(self, duration):
"""Create a segment from the buffered packets and write to output."""
self._stream_buffer.output.close()
segment = Segment(
self._sequence, self._stream_buffer.segment, duration, self._stream_id
)
self._av_output.close()
segment = Segment(self._sequence, self._memory_file, duration, self._stream_id)
for stream_output in self._outputs:
stream_output.put(segment)

Expand All @@ -120,7 +133,7 @@ def discontinuity(self):

def close(self):
"""Close stream buffer."""
self._stream_buffer.output.close()
self._av_output.close()


def stream_worker(source, options, segment_buffer, quit_event): # noqa: C901
Expand Down

0 comments on commit 2eb87b8

Please sign in to comment.