-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmicroSWIFT_processor.py
executable file
·152 lines (115 loc) · 4.28 KB
/
microSWIFT_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#! /usr/bin/env python3.7
'''
Created on May 16, 2014
Modified 2/26/2021 for use with Python 3.7
change 'buffer(data[0:1])' to data[0:1]
decode payload type from bytes to str
@author: adioso
'''
import codecs
from struct import unpack_from
# Payload version
_4_0 = '7'
def _checkSize(size, expected, name, p_id):
if size != expected:
raise Exception("Payload {} {} size {} expected {}".format(p_id,
name,
size,
expected))
def _getDouble(data, index):
end = index + 8
return (unpack_from('d', data[index:end])[0], end)
def _getFloat(data, index):
end = index + 4
if end > len(data):
print('Reached end of data unexpectedly')
return (unpack_from('f', data[index:end])[0], end)
def _getInt1(data, index):
end = index + 1
return (ord(data[index:end]), end)
def _getInt2(data, index):
end = index + 2
return (unpack_from('h', data[index:end])[0], end)
def _getInt4(data, index):
end = index + 4
return (unpack_from('i', data[index:end])[0], end)
# Get Payload type, current valid types are 2 or 3
def _getPayloadType(data):
(data_type,) = unpack_from('c', data[0:1])
data_type = data_type.decode('ascii')
return (data_type, 1)
def processData(p_id, data):
# Get Payload type, current valid types are 2 or 3
(data_type, index) = _getPayloadType(data)
print("payload type: {}".format(data_type))
index = 1
if data_type != _4_0:
print("Invalid payload type: 0x{}".format(codecs.encode(data_type,
"hex")))
sys.exit(1)
data_len = len(data)
while index < data_len:
print("Index: {}".format(index))
(sensor_type, index) = _getInt1(data, index)
(com_port, index) = _getInt1(data, index)
print("Sensor: {}\tCom Port: {}".format(sensor_type, com_port))
(size, index) = _getInt2(data, index)
print("Size: {}".format(size))
if sensor_type == 50:
index = _processMicroSWIFT(p_id, data, index, size)
else:
raise Exception(
"Payload {} has unknown sensor type {} at index {}".format(
p_id, sensor_type, index))
def _processMicroSWIFT(p_id, data, index, size):
if size == 0:
print("MicroSWIFT empty")
return index
(hs, index) = _getFloat(data, index)
print("hs {}".format(hs))
(tp, index) = _getFloat(data, index)
print("tp {}".format(tp))
(dp, index) = _getFloat(data, index)
print("dp {}".format(dp))
arrays = [ 'e', 'f', 'a1', 'b1', 'a2', 'b2', 'cf']
# TODO Get the array data
for array in arrays:
# 0 - 41
for a_index in range(0, 42):
(val, index) = _getFloat(data, index)
print("{}{} {}".format(array, a_index, val))
(lat, index) = _getFloat(data, index)
print("lat {}".format(lat))
(lon, index) = _getFloat(data, index)
print("lon {}".format(lon))
(mean_temp, index) = _getFloat(data, index)
print("mean_temp {}".format(mean_temp))
(mean_voltage, index) = _getFloat(data, index)
print("mean_voltage {}".format(mean_voltage))
(mean_u, index) = _getFloat(data, index)
print("mean_u {}".format(mean_u))
(mean_v, index) = _getFloat(data, index)
print("mean_v {}".format(mean_v))
(mean_z, index) = _getFloat(data, index)
print("mean_z {}".format(mean_z))
(year, index) = _getInt4(data, index)
print("year {}".format(year))
(month, index) = _getInt4(data, index)
print("month {}".format(month))
(day, index) = _getInt4(data, index)
print("day {}".format(day))
(hour, index) = _getInt4(data, index)
print("hour {}".format(hour))
(min, index) = _getInt4(data, index)
print("min {}".format(min))
(sec, index) = _getInt4(data, index)
print("sec {}".format(sec))
return index
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Provide the path to the payload file.")
sys.exit(1)
with open(sys.argv[1], "rb") as binfile:
payload_data = bytearray(binfile.read())
processData(0, payload_data)