Skip to content

Commit

Permalink
Fixed HLS timestamp was not accurate
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Feb 6, 2025
1 parent 0919c63 commit 2e19dc1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 83 deletions.
66 changes: 25 additions & 41 deletions src/projects/modules/containers/mpegts/mpegts_packager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ namespace mpegts
auto marker = GetFirstMarker();
if (marker.timestamp >= sample._dts && marker.timestamp < sample._dts + sample._duration)
{
logti("Stream(%s) Track(%u) has a marker at %lld, force to create a new boundary", _config.stream_id_meta.CStr(), track_id, marker.timestamp);
logti("Stream(%s) Track(%u) has a marker at %lld (%lld - %lld), force to create a new boundary", _config.stream_id_meta.CStr(), track_id, marker.timestamp, sample._dts, sample._dts + sample._duration);

_force_make_boundary = true;
}
}

if ((sample_buffer->GetCurrentDurationUs() >= _config.target_duration_ms * 1000) || _force_make_boundary == true)
if ((sample_buffer->GetCurrentDurationMs() >= _config.target_duration_ms) || _force_make_boundary == true)
{
if (media_packet->GetMediaType() == cmn::MediaType::Video && media_packet->IsKeyFrame())
{
Expand Down Expand Up @@ -332,9 +332,9 @@ namespace mpegts
}

// If the segment duration is too long (twice the target duration), a new segment is forcibly created.
if (force_create == false && main_sample_buffer->GetTotalAvailableDurationUs() >= _config.target_duration_ms * 2000)
if (force_create == false && main_sample_buffer->GetTotalAvailableDurationMs() >= _config.target_duration_ms * 2)
{
logtw("Stream(%s) Main Track(%u) has too long duration (%llu us, twice the target duration %u us), force to create a new segment", _config.stream_id_meta.CStr(), _main_track_id, main_sample_buffer->GetTotalAvailableDurationUs(), _config.target_duration_ms * 1000);
logtw("Stream(%s) Main Track(%u) has too long duration (%.3f ms, twice the target duration %u ms), force to create a new segment", _config.stream_id_meta.CStr(), _main_track_id, main_sample_buffer->GetTotalAvailableDurationMs(), _config.target_duration_ms);

if (main_sample_buffer->HasSegmentBoundary() == false)
{
Expand All @@ -351,23 +351,25 @@ namespace mpegts
return;
}

auto main_segment_duration_us = main_sample_buffer->GetDurationUntilSegmentBoundaryUs();
uint64_t total_main_segment_duration_us = main_sample_buffer->GetTotalConsumedDurationUs() + main_segment_duration_us;

auto main_segment_duration = main_sample_buffer->GetDurationUntilSegmentBoundary();
auto total_main_segment_duration = main_sample_buffer->GetTotalConsumedDuration() + main_segment_duration;
auto main_segment_duration_ms = main_sample_buffer->GetDurationUntilSegmentBoundaryMs();

bool found_marker = false;
Marker marker;
if (HasMarker() == true)
{
int64_t main_segment_base_timestamp = main_sample_buffer->GetSample()._dts;
int64_t main_segment_duration = main_segment_duration_us / 1000000 * 90000;
int64_t main_segment_end_timestamp = main_segment_base_timestamp + main_segment_duration;

logtd("Stream(%s) Main Track(%u) main_segment_base_timestamp(%lld) main_segment_duration(%lld) main_segment_duration_ms(%f) main_segment_end_timestamp(%lld)", _config.stream_id_meta.CStr(), _main_track_id, main_segment_base_timestamp, main_segment_duration, main_segment_duration_ms, main_segment_end_timestamp);

while (HasMarker())
{
marker = GetFirstMarker();
if (marker.timestamp < main_segment_base_timestamp)
{
logte("Stream(%s) Main Track(%u) has a marker at %lld, but it is before the current segment", _config.stream_id_meta.CStr(), _main_track_id, marker.timestamp);
logte("Stream(%s) Main Track(%u) has a marker at %lld, but it is before the current segment %lld", _config.stream_id_meta.CStr(), _main_track_id, marker.timestamp, main_segment_base_timestamp);
RemoveMarker(marker.timestamp);
}
else
Expand Down Expand Up @@ -408,34 +410,16 @@ namespace mpegts
continue;
}

auto total_sample_segment_duration_us = sample_buffer->GetTotalConsumedDurationUs() + sample_buffer->GetCurrentDurationUs();

logtd("Stream(%s) Track(%u) \n\
total_sample_segment_duration_us(%llu)\n\
\tsample_buffer->GetTotalConsumedDurationUs()%llu\n\
\tsample_buffer->GetCurrentDurationUs()%llu\n\
total_main_segment_duration_us(%llu)\n\
\tmain_sample_buffer->GetDurationUntilSegmentBoundaryUs()(%llu)\n\
\tsample_buffer->GetCurrentDurationUs()(%llu)\n\
main_sample_buffer->GetTotalAvailableDurationUs()(%llu)",
_config.stream_id_meta.CStr(),
track_id,
total_sample_segment_duration_us,
sample_buffer->GetTotalConsumedDurationUs(),
sample_buffer->GetCurrentDurationUs(),
total_main_segment_duration_us,
main_sample_buffer->GetDurationUntilSegmentBoundaryUs(),
sample_buffer->GetCurrentDurationUs(),
main_sample_buffer->GetTotalAvailableDurationUs());
auto total_sample_segment_duration = sample_buffer->GetTotalConsumedDuration() + sample_buffer->GetCurrentDuration();

// if video segment is 6000, audio segment is at least 6000*0.97(=5820), it is normal case, wait for more samples
if (static_cast<double>(total_sample_segment_duration_us) < static_cast<double>(total_main_segment_duration_us) * 0.97)
if (static_cast<double>(total_sample_segment_duration) < static_cast<double>(total_main_segment_duration) * 0.97)
{
// Too much difference between the main track and the track, it means that the track may have a problem.
// For example, there may be cases where audio stops coming in at all at some point.
if (static_cast<double>(total_sample_segment_duration_us) * 2.0 < static_cast<double>(total_main_segment_duration_us))
if (total_sample_segment_duration * 2.0 < total_main_segment_duration)
{
logtw("Stream(%s) Track(%u) sample duration (%llu us) is less than half of the main (%llu us), forcing segment generation.", _config.stream_id_meta.CStr(), track_id, total_sample_segment_duration_us, total_main_segment_duration_us);
logtw("Stream(%s) Track(%u) sample duration (%lld) is less than half of the main (%lld), forcing segment generation.", _config.stream_id_meta.CStr(), track_id, total_sample_segment_duration, total_main_segment_duration);
}
else
{
Expand All @@ -453,7 +437,7 @@ namespace mpegts
return;
}

auto segment = std::make_shared<Segment>(GetNextSegmentId(), first_sample._dts, main_segment_duration_us);
auto segment = std::make_shared<Segment>(GetNextSegmentId(), first_sample._dts, main_segment_duration_ms);
if (found_marker == true)
{
segment->SetMarker(marker);
Expand Down Expand Up @@ -531,10 +515,10 @@ namespace mpegts
continue;
}

if (sample_buffer->GetTotalConsumedDurationUs() >= total_main_segment_duration_us)
if (sample_buffer->GetTotalConsumedDuration() >= total_main_segment_duration)
{
// Wraparound, if total_main_segment_duration_us is wrapped around, continue to pop samples until the total_consumed_duration_us is wrapped around
if (sample_buffer->GetTotalConsumedDurationUs() - total_main_segment_duration_us < UINT64_MAX / 2)
if (sample_buffer->GetTotalConsumedDuration() - total_main_segment_duration < UINT64_MAX / 2)
{
completed_tracks[track_id] = true;
continue;
Expand Down Expand Up @@ -591,7 +575,7 @@ namespace mpegts
{
std::lock_guard<std::shared_mutex> lock(_segments_guard);
_segments.emplace(segment->GetId(), segment);
_total_segments_duration_us += segment->GetDurationUs();
_total_segments_duration_ms += segment->GetDurationMs();
}

size_t Packager::GetBufferedSegmentCount() const
Expand Down Expand Up @@ -619,7 +603,7 @@ namespace mpegts
if (it != _segments.end())
{
_segments.erase(it);
_total_segments_duration_us -= segment->GetDurationUs();
_total_segments_duration_ms -= segment->GetDurationMs();
}
}

Expand Down Expand Up @@ -654,12 +638,12 @@ namespace mpegts
// Remove data from segment, it has been saved in a file
segment->ResetData();
segment->SetFilePath(file_path);
_total_file_stored_segments_duration_us += segment->GetDurationUs();
_total_file_stored_segments_duration_ms += segment->GetDurationMs();
_file_stored_segments.emplace(segment->GetId(), segment);
}

// Delete old segments from stored list and file
while (GetTotalFileStoredSegmentsDurationUs() > _config.dvr_window_ms * 1000)
while (GetTotalFileStoredSegmentsDurationMs() > _config.dvr_window_ms)
{
auto oldest_segment = GetOldestSegmentFromFile();

Expand All @@ -677,10 +661,10 @@ namespace mpegts
}
}

uint64_t Packager::GetTotalFileStoredSegmentsDurationUs() const
double Packager::GetTotalFileStoredSegmentsDurationMs() const
{
std::shared_lock<std::shared_mutex> lock(_file_stored_segments_guard);
return _total_file_stored_segments_duration_us;
return _total_file_stored_segments_duration_ms;
}

std::shared_ptr<Segment> Packager::GetOldestSegmentFromFile() const
Expand All @@ -701,7 +685,7 @@ namespace mpegts
auto it = _file_stored_segments.find(segment->GetId());
if (it != _file_stored_segments.end())
{
_total_file_stored_segments_duration_us -= segment->GetDurationUs();
_total_file_stored_segments_duration_ms -= segment->GetDurationMs();
_file_stored_segments.erase(it);
}
}
Expand Down
92 changes: 53 additions & 39 deletions src/projects/modules/containers/mpegts/mpegts_packager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ namespace mpegts
class Segment
{
public:
Segment(uint64_t segment_id, int64_t first_dts, uint64_t duration_us)
Segment(uint64_t segment_id, int64_t first_dts, double duration_ms)
{
_segment_id = segment_id;
_first_dts = first_dts;
_duration_us = duration_us;
_duration_ms = duration_ms;
}

bool AddPacketData(const std::shared_ptr<const ov::Data> &data)
Expand Down Expand Up @@ -73,9 +73,9 @@ namespace mpegts
return _first_dts;
}

uint64_t GetDurationUs() const
double GetDurationMs() const
{
return _duration_us;
return _duration_ms;
}

ov::String GetFilePath() const
Expand Down Expand Up @@ -147,7 +147,7 @@ namespace mpegts
private:
uint64_t _segment_id = 0;
int64_t _first_dts = -1;
uint64_t _duration_us = 0;
double _duration_ms = 0;
ov::String _url;

ov::String _file_path;
Expand Down Expand Up @@ -195,32 +195,30 @@ namespace mpegts
return _track;
}

uint64_t GetSampleDurationUs(const Sample &sample) const
{
double duration = static_cast<double>(sample.media_packet->GetDuration()) * 1000000.0 / GetTrack()->GetTimeBase().GetTimescale();
return static_cast<uint64_t>(duration);
}

bool AddSample(const Sample &sample)
{
_samples.push(sample);

uint64_t duration_us = GetSampleDurationUs(sample);

_current_samples_count++;
_current_samples_duration_us += duration_us;
_current_samples_duration += sample._duration;

_total_available_count++;
_total_available_duration_us += duration_us;
_total_available_duration += sample._duration;

return true;
}

uint64_t GetCurrentDurationUs() const
uint64_t GetCurrentDuration() const
{
return _current_samples_duration_us;
return _current_samples_duration;
}

double GetCurrentDurationMs() const
{
// return in milliseconds, sample duration is in 90kHz
return static_cast<double>(GetCurrentDuration()) / 90000.0 * 1000.0;
}

bool HasSegmentBoundary() const
{
return _segment_boundaries.empty() == false;
Expand All @@ -236,27 +234,39 @@ namespace mpegts
{
SegmentBoundary boundary;
boundary.sample_count = _current_samples_count;
boundary.duration_us = _current_samples_duration_us;
boundary.duration = _current_samples_duration;

_segment_boundaries.push(boundary);

_current_samples_count = 0;
_current_samples_duration_us = 0;
_current_samples_duration = 0;
}

uint64_t GetDurationUntilSegmentBoundaryUs() const
uint64_t GetDurationUntilSegmentBoundary() const
{
if (HasSegmentBoundary() == false)
{
return 0;
}

return _segment_boundaries.front().duration_us;
return _segment_boundaries.front().duration;
}

uint64_t GetTotalAvailableDurationUs() const
double GetDurationUntilSegmentBoundaryMs() const
{
return _total_available_duration_us;
// return in milliseconds, sample duration is in 90kHz
return static_cast<double>(GetDurationUntilSegmentBoundary()) / 90000.0 * 1000.0;
}

uint64_t GetTotalAvailableDuration() const
{
return _total_available_duration;
}

double GetTotalAvailableDurationMs() const
{
// return in milliseconds, sample duration is in 90kHz
return static_cast<double>(GetTotalAvailableDuration()) / 90000.0 * 1000.0;
}

bool IsEmpty() const
Expand All @@ -274,16 +284,14 @@ namespace mpegts
auto sample = _samples.front();
_samples.pop();

uint64_t sample_duration_us = GetSampleDurationUs(sample);

_current_samples_count--;
_current_samples_duration_us -= sample_duration_us;
_current_samples_duration -= sample._duration;

_total_available_count--;
_total_available_duration_us -= sample_duration_us;
_total_available_duration -= sample._duration;

_total_consumed_samples_count++;
_total_consumed_samples_duration_us += sample_duration_us;
_total_consumed_samples_duration += sample._duration;

return sample;
}
Expand Down Expand Up @@ -322,39 +330,45 @@ namespace mpegts
}

_total_consumed_samples_count += boundary.sample_count;
_total_consumed_samples_duration_us += boundary.duration_us;
_total_consumed_samples_duration += boundary.duration;

_total_available_duration_us -= boundary.duration_us;
_total_available_duration -= boundary.duration;
_total_available_count -= boundary.sample_count;

return samples;
}

uint64_t GetTotalConsumedDurationUs() const
uint64_t GetTotalConsumedDuration() const
{
return _total_consumed_samples_duration_us;
return _total_consumed_samples_duration;
}

double GetTotalConsumedDurationMs() const
{
// return in milliseconds, sample duration is in 90kHz
return static_cast<double>(GetTotalConsumedDuration()) / 90000.0 * 1000.0;
}

private:
std::shared_ptr<const MediaTrack> _track;
std::queue<Sample> _samples;

struct SegmentBoundary
{
uint64_t sample_count = 0;
uint64_t duration_us = 0;
uint64_t duration = 0;
};

std::queue<SegmentBoundary> _segment_boundaries;

uint64_t _current_samples_count = 0;
uint64_t _current_samples_duration_us = 0;
uint64_t _current_samples_duration = 0;

uint64_t _total_available_duration_us = 0;
uint64_t _total_available_duration = 0;
uint64_t _total_available_count = 0;

uint64_t _total_consumed_samples_count = 0;
uint64_t _total_consumed_samples_duration_us = 0;
uint64_t _total_consumed_samples_duration = 0;
};

class PackagerSink : public ov::EnableSharedFromThis<PackagerSink>
Expand Down Expand Up @@ -436,7 +450,7 @@ namespace mpegts
void SaveSegmentToFile(const std::shared_ptr<Segment> &segment);
void DeleteSegmentFile(const std::shared_ptr<Segment> &segment);
void DeleteSegmentFromFileStoredList(const std::shared_ptr<Segment> &segment);
uint64_t GetTotalFileStoredSegmentsDurationUs() const;
double GetTotalFileStoredSegmentsDurationMs() const;
std::shared_ptr<Segment> GetOldestSegmentFromFile() const;

// Retention
Expand Down Expand Up @@ -476,11 +490,11 @@ namespace mpegts
uint64_t _last_segment_id = 0;

std::map<uint64_t, std::shared_ptr<Segment>> _segments;
uint64_t _total_segments_duration_us = 0;
double _total_segments_duration_ms = 0;
mutable std::shared_mutex _segments_guard;

std::map<uint64_t, std::shared_ptr<Segment>> _file_stored_segments;
uint64_t _total_file_stored_segments_duration_us = 0;
double _total_file_stored_segments_duration_ms = 0;
mutable std::shared_mutex _file_stored_segments_guard;

std::map<uint64_t, std::shared_ptr<Segment>> _retained_segments;
Expand Down
Loading

0 comments on commit 2e19dc1

Please sign in to comment.