From 52faddcb362178927d73c0ffbfb8b0762db0fef7 Mon Sep 17 00:00:00 2001 From: Misha Baranov Date: Fri, 5 Jul 2024 19:14:13 +0200 Subject: [PATCH] Fix races --- .../gst-plugins-good/gst/rtpmanager/rtptwcc.c | 303 +++++++++++------- 1 file changed, 194 insertions(+), 109 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/rtptwcc.c b/subprojects/gst-plugins-good/gst/rtpmanager/rtptwcc.c index babb0c79ac..d26237d2ea 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/rtptwcc.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/rtptwcc.c @@ -60,6 +60,23 @@ typedef enum { RTP_TWCC_FECBLOCK_PKT_LOST } TWCCPktState; +static const gchar * +_pkt_state_s (TWCCPktState state) +{ + switch (state){ + case RTP_TWCC_FECBLOCK_PKT_UNKNOWN: + return "UNKNOWN"; + case RTP_TWCC_FECBLOCK_PKT_RECEIVED: + return "RECEIVED"; + case RTP_TWCC_FECBLOCK_PKT_RECOVERED: + return "RECOVERED"; + case RTP_TWCC_FECBLOCK_PKT_LOST: + return "LOST"; + default: + return "INVALID"; + } +} + typedef enum { RTP_TWCC_CHUNK_TYPE_RUN_LENGTH = 0, @@ -105,6 +122,7 @@ typedef struct gboolean lost; guint32 protects_ssrc; GArray * protects_seqnums; + gboolean stats_processed; TWCCPktState state; gint update_stats; @@ -151,6 +169,10 @@ struct _RTPTWCCManager GHashTable *pt_to_twcc_ext_id; TWCCStatsCtx *stats_ctx; + /* The first packet in stats_ctx seqnum, valid even if there is a gap in + stats_ctx caused feedback packet loss + */ + gint32 stats_ctx_first_seqnum; GHashTable *stats_ctx_by_pt; /* In order to keep RingBuffer sizes under control, we assert @@ -219,7 +241,7 @@ struct _RTPTWCCManager * Eventually will be used to calculate redundancy statistics */ -static SentPacket * _find_stats_sentpacket (TWCCStatsCtx * ctx, guint16 seqnum); +static SentPacket * _find_stats_sentpacket (RTPTWCCManager * twcc, guint16 seqnum); static SentPacket * _find_sentpacket (RTPTWCCManager * twcc, guint16 seqnum); typedef GArray* RedBlockKey; @@ -326,9 +348,34 @@ _redblock_reconsider (RTPTWCCManager * twcc, RedBlock * block) gboolean recovered = FALSE; gboolean unknowns = FALSE; gsize nrecovered = 0; + + /* Special case for RTX: lost RTX introduces extra complexity which + is easier to handle separately + */ + if (block->seqs->len == 1) { + SentPacket *pkt = _find_stats_sentpacket (twcc, + g_array_index (block->seqs, guint16, 0)); + + if (pkt && pkt->state == RTP_TWCC_FECBLOCK_PKT_LOST) { + for (gsize i = 0; i < block->fec_seqs->len; ++i) { + SentPacket *pkt = _find_stats_sentpacket (twcc, + g_array_index (block->fec_seqs, guint16, i)); + if (pkt->state == RTP_TWCC_FECBLOCK_PKT_RECEIVED) { + nrecovered++; + break; + } + } + if (nrecovered == 1) { + pkt->state = RTP_TWCC_FECBLOCK_PKT_RECOVERED; + } + } + + return nrecovered; + } + /* Walk through all the packets and check if the block could be recovered */ for (gsize i = 0; i < block->seqs->len; ++i) { - SentPacket *pkt = _find_stats_sentpacket (twcc->stats_ctx, + SentPacket *pkt = _find_stats_sentpacket (twcc, g_array_index (block->seqs, guint16, i)); if (!pkt || pkt->state == RTP_TWCC_FECBLOCK_PKT_UNKNOWN) { unknowns = TRUE; @@ -345,7 +392,7 @@ _redblock_reconsider (RTPTWCCManager * twcc, RedBlock * block) /* Walk through all fec packets */ for (gsize i = 0; i < block->fec_seqs->len; ++i) { - SentPacket *pkt = _find_stats_sentpacket (twcc->stats_ctx, + SentPacket *pkt = _find_stats_sentpacket (twcc, g_array_index (block->fec_seqs, guint16, i)); if (!pkt || pkt->state == RTP_TWCC_FECBLOCK_PKT_UNKNOWN) { unknowns = TRUE; @@ -375,7 +422,7 @@ _redblock_reconsider (RTPTWCCManager * twcc, RedBlock * block) if (lost > 0 && lost <= block->fec_seqs->len) { /* We have enough packets to recover the block */ for (gsize i = 0; i < block->seqs->len; ++i) { - SentPacket *pkt = _find_stats_sentpacket (twcc->stats_ctx, + SentPacket *pkt = _find_stats_sentpacket (twcc, g_array_index (block->seqs, guint16, i)); if (pkt->state == RTP_TWCC_FECBLOCK_PKT_LOST) { pkt->state = RTP_TWCC_FECBLOCK_PKT_RECOVERED; @@ -383,7 +430,7 @@ _redblock_reconsider (RTPTWCCManager * twcc, RedBlock * block) } } for (gsize i = 0; i < block->fec_seqs->len; ++i) { - SentPacket *pkt = _find_stats_sentpacket (twcc->stats_ctx, + SentPacket *pkt = _find_stats_sentpacket (twcc, g_array_index (block->fec_seqs, guint16, i)); if (pkt->state == RTP_TWCC_FECBLOCK_PKT_LOST) { pkt->state = RTP_TWCC_FECBLOCK_PKT_RECOVERED; @@ -546,7 +593,7 @@ _get_stats_packets_window (GstQueueArray * array, return ret; } -static void _rm_packet_links (RTPTWCCManager * twcc, SentPacket * pkt); +static void _rm_last_stats_pkt (RTPTWCCManager * twcc); static gboolean twcc_stats_ctx_calculate_windowed_stats (RTPTWCCManager * twcc, TWCCStatsCtx * ctx, @@ -755,20 +802,11 @@ twcc_stats_ctx_get_structure (TWCCStatsCtx * ctx) } static gint -_idx_sentpacket (GstQueueArray * array, guint16 seqnum) +_idx_sentpacket (RTPTWCCManager * twcc, guint16 seqnum) { - if (gst_queue_array_is_empty (array) == TRUE) { - return -1; - } - - SentPacket * first = NULL; - for (gsize i = 0; !first && i < gst_queue_array_get_length (array); i++) { - first = ((StatsPktPtr*)gst_queue_array_peek_head_struct (array)) - ->sentpkt; - } - - const gint idx = gst_rtp_buffer_compare_seqnum (first->seqnum, seqnum); - if (idx >= 0) { + const gint idx = gst_rtp_buffer_compare_seqnum ( + (guint16)twcc->stats_ctx_first_seqnum, seqnum); + if (twcc->stats_ctx_first_seqnum >= 0 && idx >= 0) { return idx; } else { return -1; @@ -776,47 +814,55 @@ _idx_sentpacket (GstQueueArray * array, guint16 seqnum) } static SentPacket * -_find_stats_sentpacket (TWCCStatsCtx * ctx, guint16 seqnum) +_find_stats_sentpacket (RTPTWCCManager * twcc, guint16 seqnum) { - gint idx = _idx_sentpacket (ctx->pt_packets, seqnum); + gint idx = _idx_sentpacket (twcc, seqnum); SentPacket * res = NULL; - if (idx >= 0 && idx < gst_queue_array_get_length (ctx->pt_packets)) { - res = ((StatsPktPtr*)gst_queue_array_peek_nth_struct (ctx->pt_packets, idx)) + if (idx >= 0 && idx < gst_queue_array_get_length (twcc->stats_ctx->pt_packets)) { + res = ((StatsPktPtr*)gst_queue_array_peek_nth_struct (twcc->stats_ctx->pt_packets, idx)) ->sentpkt; } return res; } +static TWCCStatsCtx * +_get_ctx_for_pt (RTPTWCCManager * twcc, guint pt); + static void -twcc_stats_ctx_add_packet (RTPTWCCManager * twcc, TWCCStatsCtx * ctx, - SentPacket * pkt) +twcc_stats_ctx_add_packet (RTPTWCCManager * twcc, SentPacket * pkt) { if (!pkt) { return; - } else if (gst_queue_array_is_empty (ctx->pt_packets)) { + } else if (gst_queue_array_is_empty (twcc->stats_ctx->pt_packets)) { + gst_queue_array_push_tail_struct (twcc->stats_ctx->pt_packets, + (StatsPktPtr*)&pkt); + TWCCStatsCtx *ctx = _get_ctx_for_pt (twcc, pkt->pt); gst_queue_array_push_tail_struct (ctx->pt_packets, (StatsPktPtr*)&pkt); + twcc->stats_ctx->last_pkt_fb = ctx->last_pkt_fb = pkt; + twcc->stats_ctx_first_seqnum = pkt->seqnum; } else { - /* Swap the */ - gint idx = _idx_sentpacket (ctx->pt_packets, pkt->seqnum); + const gint idx = _idx_sentpacket (twcc, pkt->seqnum); + GstQueueArray * main_array = twcc->stats_ctx->pt_packets; if (idx < 0) { + GST_WARNING ("Packet #%u is too old for stats, dropping, latest pkt is #%u", + pkt->seqnum, twcc->stats_ctx_first_seqnum); return; - } else if (idx >= gst_queue_array_get_length (ctx->pt_packets)) { - const gsize n2push = idx - gst_queue_array_get_length (ctx->pt_packets); - const gsize n2remove = - n2push + gst_queue_array_get_length (ctx->pt_packets) >MAX_STATS_PACKETS - ? n2push + gst_queue_array_get_length (ctx->pt_packets) - - MAX_STATS_PACKETS - : 0; + } else if (idx >= gst_queue_array_get_length (main_array)) { + const gsize n2push = idx - gst_queue_array_get_length (main_array); + /* if n2push > 0 means that the last twcc feedback packet[s] is lost, + */ for (gsize i = 0; i < n2push; i++) { - gst_queue_array_push_tail_struct (ctx->pt_packets, + gst_queue_array_push_tail_struct (main_array, &null_statspktptr); } + gst_queue_array_push_tail_struct (main_array, (StatsPktPtr*)&pkt); + TWCCStatsCtx *ctx = _get_ctx_for_pt (twcc, pkt->pt); gst_queue_array_push_tail_struct (ctx->pt_packets, (StatsPktPtr*)&pkt); - ctx->last_pkt_fb = pkt; + twcc->stats_ctx->last_pkt_fb = ctx->last_pkt_fb = pkt; } else { - ((StatsPktPtr*)gst_queue_array_peek_nth_struct (ctx->pt_packets, idx)) - ->sentpkt = pkt; + /* TWCC packets reordered -- do nothing */ + GST_WARNING ("Packet #%u is out of order, not going to stats", pkt->seqnum); } } } @@ -889,6 +935,7 @@ rtp_twcc_manager_init (RTPTWCCManager * twcc) twcc->rtcp_buffers = g_queue_new (); twcc->stats_ctx = twcc_stats_ctx_new (); + twcc->stats_ctx_first_seqnum = -1; twcc->stats_ctx_by_pt = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) twcc_stats_ctx_free); @@ -926,6 +973,7 @@ rtp_twcc_manager_finalize (GObject * object) g_queue_free_full (twcc->rtcp_buffers, (GDestroyNotify) gst_buffer_unref); g_mutex_clear (&twcc->recv_lock); g_mutex_clear (&twcc->send_lock); + g_mutex_clear (&twcc->sent_packets_feedback_lock); g_hash_table_destroy (twcc->stats_ctx_by_pt); g_hash_table_destroy (twcc->seqnum_2_redblocks); @@ -969,39 +1017,43 @@ _get_ctx_for_pt (RTPTWCCManager * twcc, guint pt) } static void -_remove_packet_from_stats (RTPTWCCManager * twcc, SentPacket * pkt) -{ - TWCCStatsCtx *stats_ctx[] = {twcc->stats_ctx, - _get_ctx_for_pt (twcc, pkt->pt)}; - - for (guint i = 0; i < G_N_ELEMENTS (stats_ctx); i++) { - SentPacket * head = ((StatsPktPtr*)gst_queue_array_peek_head_struct ( - stats_ctx[i]->pt_packets))->sentpkt; - const gint idx = gst_rtp_buffer_compare_seqnum (head->seqnum, pkt->seqnum); - if (idx > 0) { - GST_WARNING ("Attempting to remove packet that has greater seqnum " - "than the head in stats context queue"); +_rm_last_packet_from_stats_arrays (RTPTWCCManager * twcc) +{ + SentPacket * head = ((StatsPktPtr*)gst_queue_array_peek_head_struct ( + twcc->stats_ctx->pt_packets))->sentpkt; + if (head) { + TWCCStatsCtx * ctx = _get_ctx_for_pt (twcc, head->pt); + SentPacket * ctx_pkt = ((StatsPktPtr*)gst_queue_array_peek_head_struct ( + ctx->pt_packets))->sentpkt; + if (!ctx_pkt || ctx_pkt->seqnum != head->seqnum) { + GST_WARNING ("Attempting to remove packet from pt stats context " + "which seqnum does not match the main stats context seqnum, " + "main: #%u, pt: %u, context packet: #%u, pt: %u", + head->seqnum, head->pt, ctx_pkt ? ctx_pkt->seqnum : -1, ctx_pkt ? ctx_pkt->pt : -1); g_assert_not_reached (); - } else if (idx == 0) { - gst_queue_array_pop_head_struct (stats_ctx[i]->pt_packets); } - if (stats_ctx[i]->last_pkt_fb - && stats_ctx[i]->last_pkt_fb->seqnum == pkt->seqnum) { - stats_ctx[i]->last_pkt_fb = NULL; + if (ctx->last_pkt_fb == head) { + twcc->stats_ctx->last_pkt_fb = ctx->last_pkt_fb = NULL; } + gst_queue_array_pop_head_struct (ctx->pt_packets); + GST_LOG ("Removing packet #%u from stats context, ts: %" GST_STIME_FORMAT, + head->seqnum, head->local_ts); } + gst_queue_array_pop_head_struct (twcc->stats_ctx->pt_packets); + twcc->stats_ctx_first_seqnum++; } static void -_rm_packet_links (RTPTWCCManager * twcc, SentPacket * pkt) +_rm_last_stats_pkt (RTPTWCCManager * twcc) { - _remove_packet_from_stats (twcc, pkt); + SentPacket * head = ((StatsPktPtr*)gst_queue_array_peek_head_struct ( + twcc->stats_ctx->pt_packets))->sentpkt; /* If this packet maps to a block in hash tables -- remove every links leading to this block as well as this packet: as we will remove this packet from the context, we will not be able to use this block anyways. */ RedBlock * block = NULL; - if (g_hash_table_lookup_extended (twcc->seqnum_2_redblocks, - GUINT_TO_POINTER(pkt->seqnum), NULL, (gpointer *)&block)) { + if (head && g_hash_table_lookup_extended (twcc->seqnum_2_redblocks, + GUINT_TO_POINTER(head->seqnum), NULL, (gpointer *)&block)) { RedBlockKey key = _redblock_key_new (block->seqs); for (gsize i = 0; i < block->seqs->len; i++) { g_hash_table_remove (twcc->seqnum_2_redblocks, @@ -1014,6 +1066,7 @@ _rm_packet_links (RTPTWCCManager * twcc, SentPacket * pkt) g_hash_table_remove (twcc->redund_2_redblocks, key); _redblock_key_free (key); } + _rm_last_packet_from_stats_arrays (twcc); } static gint32 @@ -1034,20 +1087,6 @@ _lookup_seqnum (RTPTWCCManager * twcc, guint32 ssrc, guint16 seqnum) return ret; } -static void -_add_packet_to_stats (RTPTWCCManager * twcc, SentPacket * sent_pkt) -{ - TWCCStatsCtx *ctx; - - /* add packet to the stats context */ - twcc_stats_ctx_add_packet (twcc, twcc->stats_ctx, sent_pkt); - - /* add packet to the payload specific stats context */ - ctx = _get_ctx_for_pt (twcc, sent_pkt->pt); - twcc_stats_ctx_add_packet (twcc, ctx, sent_pkt); -} - - static void recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo) { @@ -1140,6 +1179,7 @@ sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo, packet->state = RTP_TWCC_FECBLOCK_PKT_UNKNOWN; packet->protects_ssrc = protect_ssrc; packet->protects_seqnums = protect_seqnums_array; + packet->stats_processed = FALSE; } static void @@ -1242,13 +1282,15 @@ _set_twcc_seqnum_data (RTPTWCCManager * twcc, RTPPacketInfo * pinfo, if (GST_CLOCK_TIME_IS_VALID(twcc->prev_stat_window_beginning) && GST_CLOCK_DIFF (pkt_ts, twcc->prev_stat_window_beginning) < 0) { - GST_WARNING_OBJECT (twcc, "sent_packets FIFO underruns"); + GST_WARNING_OBJECT (twcc, "sent_packets FIFO overflows, dropping"); g_assert_not_reached (); } else if (GST_CLOCK_TIME_IS_VALID(twcc->prev_stat_window_beginning) && GST_CLOCK_DIFF (pkt_ts, twcc->prev_stat_window_beginning) < GST_MSECOND * 250) { GST_WARNING_OBJECT (twcc, "Risk of" " underrun of sent_packets FIFO"); + } else { + // GST_WARNING_OBJECT (twcc, "sent_packets FIFO overflows, dropping"); } gst_queue_array_pop_head_struct (twcc->sent_packets); } @@ -2198,20 +2240,25 @@ rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc, pkt->status); _add_parsed_packet_to_value_array (array, pkt); } else { - GST_DEBUG_OBJECT ( twcc, "pkt: #%u, remote_ts: 0 delta_ts: 0 status: %u", pkt->seqnum, pkt->status); + GST_DEBUG_OBJECT ( twcc, "pkt: #%u, remote_ts: 0 delta_ts: 0 status: %u", + pkt->seqnum, pkt->status); } - if (!!(found = _find_sentpacket (twcc, pkt->seqnum))) { - found->remote_ts = pkt->remote_ts; - found->state = pkt->status == RTP_TWCC_PACKET_STATUS_NOT_RECV - ? RTP_TWCC_FECBLOCK_PKT_LOST : RTP_TWCC_FECBLOCK_PKT_RECEIVED; - g_array_append_vals (pkt_2_stats, &found, 1); - GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT - " size: %u, remote-ts: %" GST_TIME_FORMAT, pkt->seqnum, - GST_TIME_ARGS (found->local_ts), - found->size * 8, GST_TIME_ARGS (pkt->remote_ts)); - - /* calculate the round-trip time */ - rtt = GST_CLOCK_DIFF (found->local_ts, current_time); + /* Do not process feedback on packets we have got feedback previously */ + if (!!(found = _find_sentpacket (twcc, pkt->seqnum)) + && (found->state == RTP_TWCC_FECBLOCK_PKT_UNKNOWN + || found->state == RTP_TWCC_FECBLOCK_PKT_LOST) + ) { + found->remote_ts = pkt->remote_ts; + found->state = pkt->status == RTP_TWCC_PACKET_STATUS_NOT_RECV + ? RTP_TWCC_FECBLOCK_PKT_LOST : RTP_TWCC_FECBLOCK_PKT_RECEIVED; + g_array_append_vals (pkt_2_stats, &found, 1); + GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT + " size: %u, remote-ts: %" GST_TIME_FORMAT, pkt->seqnum, + GST_TIME_ARGS (found->local_ts), + found->size * 8, GST_TIME_ARGS (pkt->remote_ts)); + + /* calculate the round-trip time */ + rtt = GST_CLOCK_DIFF (found->local_ts, current_time); } } { @@ -2259,16 +2306,34 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, SentPacket * pkt = (SentPacket*)g_array_index (psentpkts, SentPacket*, i); if (!pkt) { continue; + } else if (pkt->stats_processed) { + /* This packet was already added to stats structures, but we've got + one more feedback for it + */ + RedBlock * block; + if (g_hash_table_lookup_extended (twcc->seqnum_2_redblocks, + GUINT_TO_POINTER(pkt->seqnum), NULL, (gpointer *)&block)) { + const gsize packets_recovered = _redblock_reconsider (twcc, block); + if (packets_recovered > 0) { + GST_LOG ("Reconsider block because of packet #%u, " + "recovered %lu pckt", pkt->seqnum, packets_recovered); + } + } + continue; } + pkt->stats_processed = TRUE; + GST_LOG ("Processing #%u packet in stats, state: %s", pkt->seqnum, + _pkt_state_s (pkt->state)); rtp_twcc_manager_register_seqnum (twcc, pkt->ssrc, pkt->orig_seqnum, pkt->seqnum); - _add_packet_to_stats (twcc, pkt); + twcc_stats_ctx_add_packet (twcc, pkt); /* This is either RTX or FEC packet */ if (pkt->protects_seqnums && pkt->protects_seqnums->len > 0) { /* We are expecting non-twcc seqnums in the buffer's meta here, so change them to twcc seqnums. */ + for (gsize i = 0; i < pkt->protects_seqnums->len; i++) { const guint16 prot_seqnum = g_array_index (pkt->protects_seqnums, guint16, i); @@ -2278,9 +2343,10 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, g_array_index (pkt->protects_seqnums, guint16, i) = (guint16)twcc_seqnum; } - GST_LOG ("FEC sn: #%u covers sn: #%u", pkt->seqnum, twcc_seqnum); + GST_LOG ("FEC sn: #%u covers twcc sn: #%u, orig sn: %u", + pkt->seqnum, twcc_seqnum, prot_seqnum); } - + /* Check if this packet covers the same block that was already added. */ RedBlockKey key = _redblock_key_new (pkt->protects_seqnums); RedBlock * block = NULL; @@ -2328,8 +2394,8 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, } const gsize packets_recovered = _redblock_reconsider (twcc, block); if (packets_recovered > 0) { - GST_LOG ("Reconsider block because of packet #%u, recovere", - pkt->seqnum); + GST_LOG ("Reconsider block because of packet #%u, recovered %lu pckt", + pkt->seqnum, packets_recovered); } /* RTX of FEC */ } else { @@ -2338,8 +2404,8 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, GUINT_TO_POINTER(pkt->seqnum), NULL, (gpointer *)&block)) { const gsize packets_recovered = _redblock_reconsider (twcc, block); if (packets_recovered > 0) { - GST_LOG ("Reconsider block because of packet #%u, recovere", - pkt->seqnum); + GST_LOG ("Reconsider block because of packet #%u, " + "recovered %lu pckt", pkt->seqnum, packets_recovered); } } } @@ -2352,28 +2418,46 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, return twcc_stats_ctx_get_structure (twcc->stats_ctx); /* Prune old packets in stats */ + gint last_seqnum_to_free = -1; + /* First remove all them from stats structures, and then from sent_packets + queue at once so as not to lock sent_packets for longer then necessary + */ while (!gst_queue_array_is_empty (twcc->stats_ctx->pt_packets)) { SentPacket * pkt = ((StatsPktPtr*)gst_queue_array_peek_head_struct ( twcc->stats_ctx->pt_packets))->sentpkt; - if (!pkt) { - g_assert_not_reached (); - } if (gst_queue_array_get_length (twcc->stats_ctx->pt_packets) >= MAX_STATS_PACKETS - || GST_CLOCK_DIFF (pkt->local_ts, last_ts) > PACKETS_HIST_DUR) { - _rm_packet_links (twcc, pkt); - SentPacket * sent_pkt = gst_queue_array_peek_head_struct ( - twcc->sent_packets); - if (sent_pkt && sent_pkt->seqnum == pkt->seqnum) { - g_mutex_lock (&twcc->send_lock); - gst_queue_array_pop_head_struct (twcc->sent_packets); - g_mutex_unlock (&twcc->send_lock); + || (pkt && GST_CLOCK_DIFF (pkt->local_ts, last_ts) > PACKETS_HIST_DUR)) { + if (pkt) { + if (last_seqnum_to_free >= 0 + && gst_rtp_buffer_compare_seqnum (pkt->seqnum, last_seqnum_to_free) + >= 0) { + GST_WARNING_OBJECT (twcc, "Seqnum reorder in stats pkts"); + g_assert_not_reached (); + } + last_seqnum_to_free = pkt->seqnum; } - _free_sentpacket (pkt); + _rm_last_stats_pkt (twcc); } else { break; } } + /* Remove old packets from sent_packets queue */ + if (last_seqnum_to_free >= 0) { + g_mutex_lock (&twcc->send_lock); + while (!gst_queue_array_is_empty (twcc->sent_packets)) { + SentPacket * pkt = gst_queue_array_peek_head_struct (twcc->sent_packets); + GST_LOG_OBJECT (twcc, "Freeing sent packet #%u", pkt->seqnum); + if (gst_rtp_buffer_compare_seqnum (pkt->seqnum, last_seqnum_to_free) + >= 0) { + _free_sentpacket (pkt); + gst_queue_array_pop_head (twcc->sent_packets); + } else { + break; + } + } + g_mutex_unlock (&twcc->send_lock); + } array = g_value_array_new (0); end_time = GST_CLOCK_DIFF (stats_window_delay, last_ts); @@ -2381,9 +2465,10 @@ rtp_twcc_manager_get_windowed_stats (RTPTWCCManager * twcc, GST_DEBUG_OBJECT (twcc, "Calculating windowed stats for the window %" GST_STIME_FORMAT - " starting from %" GST_STIME_FORMAT " to: %" GST_STIME_FORMAT, + " starting from %" GST_STIME_FORMAT " to: %" GST_STIME_FORMAT " overall packets: %u", GST_STIME_ARGS (stats_window_size), GST_STIME_ARGS (start_time), - GST_STIME_ARGS (end_time)); + GST_STIME_ARGS (end_time), + gst_queue_array_get_length (twcc->stats_ctx->pt_packets)); if (!GST_CLOCK_TIME_IS_VALID(twcc->prev_stat_window_beginning) || GST_CLOCK_DIFF (twcc->prev_stat_window_beginning, start_time) > 0) {