Skip to content

Commit

Permalink
upipe_agg: flush when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
quarium authored and cmassiot committed Feb 13, 2025
1 parent bced08b commit 8144d43
Showing 1 changed file with 67 additions and 26 deletions.
93 changes: 67 additions & 26 deletions lib/upipe-modules/upipe_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2013-2015 OpenHeadend S.A.R.L.
* Copyright (C) 2024 EasyTools
*
* Authors: Benjamin Cohen
*
Expand Down Expand Up @@ -111,6 +112,61 @@ static struct upipe *upipe_agg_alloc(struct upipe_mgr *mgr,
return upipe;
}

/** @internal @This flushes the aggregated buffer.
*
* @param upipe description structure of the pipe
* @param upump_p reference to pump that generated the buffer
*/
static void upipe_agg_flush(struct upipe *upipe, struct upump **upump_p)
{
struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
struct uref *uref = upipe_agg->aggregated;
upipe_agg->aggregated = NULL;
if (uref)
upipe_agg_output(upipe, uref, upump_p);
}

/** @internal @This sets the input flow definition for real.
*
* @param upipe description structure of the pipe
* @param flow_def flow definition packet
* @return an error code
*/
static int upipe_agg_set_flow_def_real(struct upipe *upipe,
struct uref *flow_def)
{
struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
uint64_t size = 0;
uint64_t octetrate = 0;
uint64_t latency = 0;

uref_block_flow_get_size(flow_def, &size);
uref_block_flow_get_octetrate(flow_def, &octetrate);
uref_clock_get_latency(flow_def, &latency);

upipe_agg->input_size = size;

int err = uref_block_flow_set_size(flow_def, upipe_agg->output_size);
if (unlikely(!ubase_check(err))) {
uref_free(flow_def);
upipe_throw_fatal(upipe, err);
return err;
}

if (octetrate) {
latency += (uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate;
err = uref_clock_set_latency(flow_def, latency);
if (unlikely(!ubase_check(err))) {
uref_free(flow_def);
upipe_throw_fatal(upipe, err);
return err;
}
}
upipe_agg_store_flow_def(upipe, flow_def);
return UBASE_ERR_NONE;
}


/** @internal @This receives data.
*
* @param upipe description structure of the pipe
Expand All @@ -124,6 +180,12 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
size_t size = 0;
const size_t output_size = upipe_agg->output_size;

if (unlikely(ubase_check(uref_flow_get_def(uref, NULL)))) {
upipe_agg_flush(upipe, upump_p);
upipe_agg_set_flow_def_real(upipe, uref);
return;
}

uref_block_size(uref, &size);

/* check for invalid or too large size */
Expand All @@ -135,10 +197,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
}

/* flush if incoming packet makes aggregated overflow */
if (upipe_agg->size + size > output_size) {
upipe_agg_output(upipe, upipe_agg->aggregated, upump_p);
upipe_agg->aggregated = NULL;
}
if (upipe_agg->size + size > output_size)
upipe_agg_flush(upipe, upump_p);

/* keep or attach incoming packet */
if (unlikely(!upipe_agg->aggregated)) {
Expand All @@ -159,11 +219,8 @@ static void upipe_agg_input(struct upipe *upipe, struct uref *uref,
/* anticipate next packet size and flush now if necessary */
if (upipe_agg->input_size)
size = upipe_agg->input_size;
if (unlikely(upipe_agg->size + size > output_size)) {
upipe_agg_output(upipe, upipe_agg->aggregated, upump_p);
upipe_agg->aggregated = NULL;
upipe_agg->size = 0;
}
if (unlikely(upipe_agg->size + size > output_size))
upipe_agg_flush(upipe, upump_p);
}

/** @internal @This sets the input flow definition.
Expand All @@ -178,26 +235,10 @@ static int upipe_agg_set_flow_def(struct upipe *upipe, struct uref *flow_def)
return UBASE_ERR_INVALID;
UBASE_RETURN(uref_flow_match_def(flow_def, EXPECTED_FLOW_DEF))

struct upipe_agg *upipe_agg = upipe_agg_from_upipe(upipe);
uint64_t size = 0;
uref_block_flow_get_size(flow_def, &size);
upipe_agg->input_size = size;

uint64_t octetrate = 0;
uref_block_flow_get_octetrate(flow_def, &octetrate);
uint64_t latency = 0;
uref_clock_get_latency(flow_def, &latency);

struct uref *flow_def_dup;
if ((flow_def_dup = uref_dup(flow_def)) == NULL)
return UBASE_ERR_ALLOC;
UBASE_RETURN(uref_block_flow_set_size(flow_def_dup, upipe_agg->output_size))

if (octetrate) {
UBASE_RETURN(uref_clock_set_latency(flow_def_dup, latency +
(uint64_t)upipe_agg->output_size * UCLOCK_FREQ / octetrate))
}
upipe_agg_store_flow_def(upipe, flow_def_dup);
upipe_agg_input(upipe, flow_def_dup, NULL);
return UBASE_ERR_NONE;
}

Expand Down

0 comments on commit 8144d43

Please sign in to comment.