diff --git a/lib/upipe-modules/upipe_aggregate.c b/lib/upipe-modules/upipe_aggregate.c index 696e8395e..525e02eb0 100644 --- a/lib/upipe-modules/upipe_aggregate.c +++ b/lib/upipe-modules/upipe_aggregate.c @@ -1,5 +1,6 @@ /* * Copyright (C) 2013-2015 OpenHeadend S.A.R.L. + * Copyright (C) 2024 EasyTools * * Authors: Benjamin Cohen * @@ -111,6 +112,61 @@ static struct upipe *upipe_agg_alloc(struct upipe_mgr *mgr, return upipe; } +/** @internal @This flushes the aggregated buffer. + * + * @param upipe description structure of the pipe + * @param upump_p reference to pump that generated the buffer + */ +static void upipe_agg_flush(struct upipe *upipe, struct upump **upump_p) +{ + struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe); + struct uref *uref = upipe_agg->aggregated; + upipe_agg->aggregated = NULL; + if (uref) + upipe_agg_output(upipe, uref, upump_p); +} + +/** @internal @This sets the input flow definition for real. + * + * @param upipe description structure of the pipe + * @param flow_def flow definition packet + * @return an error code + */ +static int upipe_agg_set_flow_def_real(struct upipe *upipe, + struct uref *flow_def) +{ + struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe); + uint64_t size = 0; + uint64_t octetrate = 0; + uint64_t latency = 0; + + uref_block_flow_get_size(flow_def, &size); + uref_block_flow_get_octetrate(flow_def, &octetrate); + uref_clock_get_latency(flow_def, &latency); + + upipe_agg->input_size = size; + + int err = uref_block_flow_set_size(flow_def, upipe_agg->output_size); + if (unlikely(!ubase_check(err))) { + uref_free(flow_def); + upipe_throw_fatal(upipe, err); + return err; + } + + if (octetrate) { + latency += (uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate; + err = uref_clock_set_latency(flow_def, latency); + if (unlikely(!ubase_check(err))) { + uref_free(flow_def); + upipe_throw_fatal(upipe, err); + return err; + } + } + upipe_agg_store_flow_def(upipe, flow_def); + return UBASE_ERR_NONE; +} + + /** @internal @This receives data. * * @param upipe description structure of the pipe @@ -124,6 +180,12 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref, size_t size = 0; const size_t output_size = upipe_agg->output_size; + if (unlikely(ubase_check(uref_flow_get_def(uref, NULL)))) { + upipe_agg_flush(upipe, upump_p); + upipe_agg_set_flow_def_real(upipe, uref); + return; + } + uref_block_size(uref, &size); /* check for invalid or too large size */ @@ -135,10 +197,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref, } /* flush if incoming packet makes aggregated overflow */ - if (upipe_agg->size + size > output_size) { - upipe_agg_output(upipe, upipe_agg->aggregated, upump_p); - upipe_agg->aggregated = NULL; - } + if (upipe_agg->size + size > output_size) + upipe_agg_flush(upipe, upump_p); /* keep or attach incoming packet */ if (unlikely(!upipe_agg->aggregated)) { @@ -159,11 +219,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref, /* anticipate next packet size and flush now if necessary */ if (upipe_agg->input_size) size = upipe_agg->input_size; - if (unlikely(upipe_agg->size + size > output_size)) { - upipe_agg_output(upipe, upipe_agg->aggregated, upump_p); - upipe_agg->aggregated = NULL; - upipe_agg->size = 0; - } + if (unlikely(upipe_agg->size + size > output_size)) + upipe_agg_flush(upipe, upump_p); } /** @internal @This sets the input flow definition. @@ -178,26 +235,10 @@ static int upipe_agg_set_flow_def(struct upipe *upipe, struct uref *flow_def) return UBASE_ERR_INVALID; UBASE_RETURN(uref_flow_match_def(flow_def, EXPECTED_FLOW_DEF)) - struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe); - uint64_t size = 0; - uref_block_flow_get_size(flow_def, &size); - upipe_agg->input_size = size; - - uint64_t octetrate = 0; - uref_block_flow_get_octetrate(flow_def, &octetrate); - uint64_t latency = 0; - uref_clock_get_latency(flow_def, &latency); - struct uref *flow_def_dup; if ((flow_def_dup = uref_dup(flow_def)) == NULL) return UBASE_ERR_ALLOC; - UBASE_RETURN(uref_block_flow_set_size(flow_def_dup, upipe_agg->output_size)) - - if (octetrate) { - UBASE_RETURN(uref_clock_set_latency(flow_def_dup, latency + - (uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate)) - } - upipe_agg_store_flow_def(upipe, flow_def_dup); + upipe_agg_input(upipe, flow_def_dup, NULL); return UBASE_ERR_NONE; }