-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathdas_record.py
168 lines (146 loc) · 5.89 KB
/
das_record.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
import json
import pprint
import logging
from logger.utils.timestamp import timestamp as timestamp_method # noqa: E402
class DASRecord:
"""DASRecord is a structured representation of the field names and
values (and metadata) contained in a sensor record.
"""
############################
def __init__(self, json_str=None, data_id=None, message_type=None,
timestamp=0, fields=None, metadata=None):
"""
If a json string is passed, it is parsed into a dictionary and its
values for timestamp, fields and metadata are copied in. Otherwise,
the DASRecord object is initialized with the passed-in values for
instrument, timestamp, fields (a dictionary of fieldname-value pairs)
and metadata.
If timestamp is not specified, the instance will use the current time.
"""
if json_str:
parsed = json.loads(json_str)
self.data_id = parsed.get('data_id')
self.message_type = parsed.get('message_type')
self.timestamp = parsed.get('timestamp')
self.fields = parsed.get('fields', {})
self.metadata = parsed.get('metadata', {})
else:
# self.source =
self.data_id = data_id
self.message_type = message_type
self.timestamp = timestamp or timestamp_method()
if fields is None:
self.fields = {}
else:
self.fields = fields
if metadata is None:
self.metadata = {}
else:
self.metadata = metadata
############################
def as_json(self, pretty=False):
"""Return DASRecord as a JSON string."""
json_dict = {
'data_id': self.data_id,
'message_type': self.message_type,
'timestamp': self.timestamp,
'fields': self.fields,
'metadata': self.metadata
}
if pretty:
return json.dumps(json_dict, sort_keys=True, indent=4)
else:
return json.dumps(json_dict)
############################
def __str__(self):
das_dict = {
'data_id': self.data_id,
'message_type': self.message_type,
'timestamp': self.timestamp,
'fields': self.fields,
'metadata': self.metadata
}
return pprint.pformat(das_dict)
############################
def __eq__(self, other):
return (other and
self.data_id == other.data_id and
self.message_type == other.message_type and
self.timestamp == other.timestamp and
self.fields == other.fields and
self.metadata == other.metadata)
############################
def __setitem__(self, field, value):
self.fields[field] = value
############################
def __getitem__(self, field):
try:
return self.fields[field]
except KeyError:
logging.error(f'No field "{field}" found in DASRecord {self}')
raise
############################
def __delitem__(self, field):
try:
del self.fields[field]
except KeyError:
logging.error(f'Attempt to delete non-existent field "{field}" in DASRecord: {self}')
############################
def get(self, field, default=None):
return self.fields.get(field, default)
def to_das_record_list(record):
"""Utility function to normalize different types of records into a
list of DASRecords.
Take input in one of these three formats:
- DASRecord
- a single record dict with keys 'timestamp' and 'fields'
- a field dict of format
``` {field_name: [(timestamp, value), (timestamp, value),...],
field_name: [(timestamp, value), (timestamp, value),...],
}
```
and convert it into a list of zero or more DASRecords.
"""
# What type of record is this?
if not record:
return []
# If it's a list, assume it's already a list of DASRecords
if isinstance(record, list):
return record
# If it's a single DASRecord, it's easy
if isinstance(record, DASRecord):
return [record]
# At this point, if it's not a dict, we don't know *what* it is
if not isinstance(record, dict):
logging.error('Unknown type of input passed to to_das_record_list: %s: %s',
type(record), record)
return []
# If it's a single timestamp dict, it's easy
elif 'timestamp' in record and 'fields' in record:
return [DASRecord(timestamp=record['timestamp'],
fields=record['fields'],
metadata=record.get('metadata'))]
# If here, we believe we've received a field dict, in which each
# field may have multiple [timestamp, value] pairs. First thing we
# do is reformat the data into a map of
# {timestamp: {field:value, field:value},...}}
try:
by_timestamp = {}
for field, ts_value_list in record.items():
if not isinstance(ts_value_list, list):
logging.warning('Expected field_name: [(timestamp, value),...] pairs, '
'found %s: %s', field, ts_value_list)
continue
for (timestamp, value) in ts_value_list:
if timestamp not in by_timestamp:
by_timestamp[timestamp] = {}
by_timestamp[timestamp][field] = value
# Now copy the entries into an ordered-by-timestamp list.
results = [DASRecord(timestamp=ts, fields=by_timestamp[ts])
for ts in sorted(by_timestamp)]
return results
except ValueError:
logging.error('Badly-structured field dictionary: %s: %s',
field, pprint.pformat(ts_value_list))
return []