-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMPEGPacket.py
65 lines (54 loc) · 2.37 KB
/
MPEGPacket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
""" Class for MPEG2-TS packet data """
import typing
from BinaryPacketWrapper import BinaryPacketWrapper
from BinaryPacketWrapper import MPEGPacketBinaryReadException
from MPEGAdaptationField import MPEGAdaptationField
import MPEGAdaptationField
from TSData import TSData
from PMTTable import PMTTable
HDR_SYNC_BYTE = 0xff000000 # Magic 0x47
HDR_TEI = 0x00800000 # Transport error indicator
HDR_PUSI = 0x00400000 # Payload unit start indicator
HDR_PRIORITY = 0x00200000 # Transport priority
HDR_PID = 0x001fff00 # Packet identifier
HDR_TSC = 0x000000c0 # Transport scrambling control
HDR_AFC = 0x00000030 # Adaptation field control
HDR_CCOUNTER = 0x0000000f # Continuity counter
class MPEGParseException(Exception):
pass
def parse_packet(wrapper: BinaryPacketWrapper, ts_data: TSData, pid_dict, pmt_dict):
if wrapper.current_read_bytes > 0:
raise MPEGParseException('BinaryPacketWrapper offset is not at beginning of packet')
header = int.from_bytes(wrapper.get_bytes(4), byteorder='big')
hdr_sync_byte = (header & HDR_SYNC_BYTE) >> 24
hdr_tei = bool((header & HDR_TEI))
hdr_pusi = bool((header & HDR_PUSI))
hdr_priority = bool((header & HDR_PRIORITY))
hdr_pid = (header & HDR_PID) >> 8
hdr_tsc = (header & HDR_TSC) >> 6
hdr_afc = (header & HDR_AFC) >> 4
hdr_continuity_counter = (header & HDR_CCOUNTER)
if hdr_pid in pid_dict:
pid_dict[hdr_pid] += 1
else:
pid_dict[hdr_pid] = 1
if hdr_sync_byte != 0x47:
raise MPEGParseException('Invalid sync_byte when parsing MPEG-TS header')
MPEGAdaptationField.process_adaptation_field(wrapper, hdr_afc)
# Attempt to parse PAT table if not already parsed
if hdr_pid == 0 and not ts_data.has_pat_table:
ts_data.pat_table.parse(wrapper)
# Attempt to parse NIT table
elif hdr_pid == 16 and not ts_data.has_nit_table:
ts_data.nit_table.parse(wrapper)
# Attempt to parse SDT table
elif hdr_pid == 17 and not ts_data.has_sdt_table:
ts_data.sdt_table.parse(wrapper)
# Check if this is PMT of some program
else:
for e in ts_data.pat_table.entries:
if hdr_pid == e.program_map_pid and e.program_num != 0:
if e.program_num not in pmt_dict:
t = PMTTable(e.program_num, e.program_map_pid)
t.parse(wrapper)
pmt_dict[e.program_num] = t