-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathreader.py
527 lines (438 loc) · 22.1 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
""" Lower level TDMS reader API that allows finer grained reading of data
"""
from collections import OrderedDict
import logging
import os
import numpy as np
import struct
from nptdms.common import toc_properties
from nptdms.utils import Timer
from nptdms.base_segment import RawChannelDataChunk
from nptdms.tdms_segment import TdmsSegment, SegmentIndexCache
from nptdms.log import log_manager
log = log_manager.get_logger(__name__)
_struct_unpack = struct.unpack
class TdmsReader(object):
""" Reads metadata and data from a TDMS file.
:ivar object_metadata: Dictionary of object path to ObjectMetadata
"""
def __init__(self, tdms_file):
""" Initialise a new TdmsReader
:param tdms_file: Either the path to the tdms file to read
as a string or pathlib.Path, or an already opened file.
"""
self._file_path = None
self._index_file_path = None
self._file = None
self._index_file = None
self._segments = None
self._prev_segment_objects = {}
self.object_metadata = OrderedDict()
self._segment_channel_offsets = {}
self.tdms_version = None
if hasattr(tdms_file, "read"):
# Is a file
tag = tdms_file.read(4)
tdms_file.seek(0, os.SEEK_SET)
if tag == b"TDSh":
self._index_file = tdms_file
elif tag == b"TDSm":
self._file = tdms_file
else:
raise ValueError(
f"File should either start with 'b`TDSh`' or 'b`TDSm`', submitted starts with '{tag}'.")
else:
# Is path to a file
source_path = str(tdms_file)
if source_path.endswith(".tdms_index"):
self._index_file_path = source_path
self._index_file = open(self._index_file_path, "rb")
else:
self._file_path = source_path
self._file = open(self._file_path, "rb")
filepath = self._file_path + '_index'
if os.path.isfile(filepath):
self._index_file_path = filepath
self._index_file = open(self._index_file_path, "rb")
if self._file is not None:
self._data_file_size = _get_file_size(self._file)
else:
self._data_file_size = None
def close(self):
if self._file is None and self._index_file is None:
# Already closed
return
if self._file_path is not None:
# File path was provided so we opened the file and should close it.
self._file.close()
if self._index_file_path is not None:
# Index file path was provided so we opened the file and should close it.
self._index_file.close()
# Finally always remove reference to the files
self._file = None
self._index_file = None
def read_metadata(self, require_segment_indexes=False):
""" Read all metadata and structure information from a TdmsFile
:param require_segment_indexes: Whether to create segment object indexes to allow lookup of objects by path.
"""
if self._index_file is not None: # generally try to read metadata from index file because it is faster
file = self._index_file
reading_index_file = True
elif self._file is not None: # fallback if only a data file is supplied
file = self._file
reading_index_file = False
else:
raise ValueError("Neither tdms_index file nor tdms file is available.")
self._segments = []
segment_position = 0
try:
with Timer(log, "Read metadata"):
# Read metadata first to work out how much space we need
previous_segment = None
index_cache = SegmentIndexCache() if require_segment_indexes else None
while True:
start_position = file.tell()
try:
segment, properties = self._read_segment_metadata(
file, segment_position, index_cache, previous_segment, reading_index_file)
except EOFError:
# We've finished reading the file
break
self._update_object_metadata(segment)
self._update_object_properties(properties)
self._segments.append(segment)
previous_segment = segment
segment_position = segment.next_segment_pos
if reading_index_file:
file.seek(start_position + segment.data_position - segment.position, os.SEEK_SET)
else:
file.seek(segment.next_segment_pos, os.SEEK_SET)
finally:
if reading_index_file and self._index_file_path is not None:
file.close()
def read_raw_data(self):
""" Read raw data from all segments, chunk by chunk
:returns: A generator that yields RawDataChunk objects
"""
self._ensure_open()
if self._segments is None:
raise RuntimeError(
"Cannot read data unless metadata has first been read")
for segment in self._segments:
self._verify_segment_start(segment)
for chunk in segment.read_raw_data(self._file):
yield chunk
def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
""" Read raw data for a single channel, chunk by chunk
:param channel_path: The path of the channel object to read data for
:param offset: Initial position to read data from.
:param length: Number of values to attempt to read.
If None, then all values starting from the offset will be read.
Fewer values will be returned if attempting to read beyond the end of the available data.
:returns: A generator that yields RawChannelDataChunk objects
"""
self._ensure_open()
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")
try:
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
except KeyError:
with Timer(log, "Build data index for channel"):
self._build_index(channel_path)
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
object_metadata = self.object_metadata[channel_path]
max_length_from_offset = object_metadata.num_values - offset
if length is None:
length = max_length_from_offset
else:
# Make sure we're not trying to read more data than is actually available
length = min(length, max_length_from_offset)
end_index = offset + length
# Binary search to find first and last segments to read
start_segment = first_segment + np.searchsorted(segment_offsets, offset, side='right')
end_segment = first_segment + np.searchsorted(segment_offsets, end_index, side='left')
segment_index = start_segment
values_read = 0
for segment in self._segments[start_segment:end_segment + 1]:
self._verify_segment_start(segment)
# By default, read all chunks in a segment
chunk_offset = 0
num_chunks = segment.num_chunks
segment_obj = segment.get_segment_object(channel_path)
chunk_size = 0 if (segment_obj is None or not segment_obj.has_data) else segment_obj.number_values
if chunk_size == 0:
continue
segment_start_index = (
0 if segment_index == first_segment else segment_offsets[segment_index - first_segment - 1])
remaining_values_to_skip = 0
# For the first and last segments, we may not need to read all chunks,
# and may need to trim some data from the beginning or end of the chunk.
if segment_index == start_segment:
num_values_to_skip = offset - segment_start_index
chunk_offset = num_values_to_skip // chunk_size
remaining_values_to_skip = num_values_to_skip % chunk_size
num_chunks -= chunk_offset
if segment_index == end_segment:
# Note: segment_index may be both start and end
segment_end_index = segment_offsets[segment_index - first_segment]
num_values_to_trim = segment_end_index - end_index
# Account for segments where the final chunk is truncated
final_chunk_size = (segment_end_index - segment_start_index) % chunk_size
final_chunk_size = chunk_size if final_chunk_size == 0 else final_chunk_size
if num_values_to_trim >= final_chunk_size:
num_chunks -= 1
num_values_to_trim -= final_chunk_size
num_chunks -= num_values_to_trim // chunk_size
for i, chunk in enumerate(
segment.read_raw_data_for_channel(self._file, channel_path, chunk_offset, num_chunks)):
skip = remaining_values_to_skip if i == 0 else 0
values_read += len(chunk) - skip
trim = 0 if values_read < length else values_read - length
yield _trim_channel_chunk(chunk, skip, trim)
segment_index += 1
def read_channel_chunk_for_index(self, channel_path, index):
""" Read the chunk containing the given index
:returns: Tuple of raw channel data chunk and the integer offset to the beginning of the chunk
:rtype: (RawChannelDataChunk, int)
"""
self._ensure_open()
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")
try:
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
except KeyError:
with Timer(log, "Build data index for channel"):
self._build_index(channel_path)
(first_segment, segment_offsets) = self._segment_channel_offsets[channel_path]
# Binary search to find the segment to read
segment_index = first_segment + np.searchsorted(segment_offsets, index, side='right')
segment = self._segments[segment_index]
segment_obj = segment.get_segment_object(channel_path)
chunk_size = 0 if segment_obj is None else segment_obj.number_values
segment_start_index = (
0 if segment_index == first_segment else segment_offsets[segment_index - first_segment - 1])
index_in_segment = index - segment_start_index
chunk_index = index_in_segment // chunk_size
self._verify_segment_start(segment)
chunk_data = next(segment.read_raw_data_for_channel(self._file, channel_path, chunk_index, 1))
chunk_offset = segment_start_index + chunk_index * chunk_size
return chunk_data, chunk_offset
def is_index_file_only(self):
""" Convenience function to access if the supplied file is an index file and no data file is available
:rtype: bool
"""
return self._file is None and self._index_file is not None
def _read_segment_metadata(
self, file, segment_position, index_cache, previous_segment, is_index_file):
(position, toc_mask, data_position, next_segment_pos, segment_incomplete) = self._read_lead_in(
file, segment_position, is_index_file)
segment = TdmsSegment(
position, toc_mask, next_segment_pos, data_position, segment_incomplete)
properties = segment.read_segment_objects(
file, self._prev_segment_objects, index_cache, previous_segment)
return segment, properties
def _read_lead_in(self, file, segment_position, is_index_file=False):
lead_in_bytes = file.read(28)
if len(lead_in_bytes) < 28:
raise EOFError
expected_tag = b'TDSh' if is_index_file else b'TDSm'
tag = lead_in_bytes[:4]
if tag != expected_tag:
raise ValueError(
"Segment does not start with %r, but with %r" % (expected_tag, tag))
# Next four bytes are table of contents mask
toc_mask = _struct_unpack('<l', lead_in_bytes[4:8])[0]
if log.isEnabledFor(logging.DEBUG):
log.debug("Reading segment at %d", segment_position)
for prop_name, prop_mask in toc_properties.items():
prop_is_set = (toc_mask & prop_mask) != 0
log.debug("Property %s is %s", prop_name, prop_is_set)
endianness = '>' if (toc_mask & toc_properties['kTocBigEndian']) else '<'
# Next four bytes are version number, then 8 bytes each for the offset values
(version, next_segment_offset, raw_data_offset) = _struct_unpack(endianness + 'lQQ', lead_in_bytes[8:28])
if self.tdms_version is None:
if version not in (4712, 4713):
log.warning("Unrecognised version number: %d" % version)
self.tdms_version = version
elif self.tdms_version != version:
log.warning("Segment version mismatch, %d != %d" % (version, self.tdms_version))
# Calculate data and next segment position
lead_size = 7 * 4
data_position = segment_position + lead_size + raw_data_offset
segment_incomplete = next_segment_offset == 0xFFFFFFFFFFFFFFFF
if segment_incomplete:
# Segment size is unknown. This can happen if LabVIEW crashes.
next_segment_pos = self._data_file_size
else:
next_segment_pos = (
segment_position + next_segment_offset + lead_size)
if self._data_file_size is not None and next_segment_pos > self._data_file_size:
# The raw data offset is incorrect, and there is less data than expected in this segment
next_segment_pos = self._data_file_size
segment_incomplete = True
if segment_incomplete:
if next_segment_pos < data_position:
# Metadata wasn't completely written and don't have any data in this segment,
# don't try to read any metadata
log.warning("Last segment metadata is incomplete")
raise EOFError
else:
# Try to read until the end of the file if we have complete metadata
log.warning(
"Last segment of file has less data than expected, "
"will attempt to read to the end of the file")
log.debug("Next segment offset = %d, raw data offset = %d, expected data size = %d b, actual data size = %d b",
next_segment_offset, raw_data_offset,
next_segment_offset - raw_data_offset,
next_segment_pos - data_position)
return segment_position, toc_mask, data_position, next_segment_pos, segment_incomplete
def _verify_segment_start(self, segment):
""" When reading data for a segment, check for the TDSm tag at the start of the segment in an attempt
to detect any mismatch between tdms and tdms_index files.
"""
position = segment.position
self._file.seek(segment.position)
expected_tag = b'TDSm'
tag = self._file.read(4)
if tag != expected_tag:
raise ValueError(
"Attempted to read data segment at position {0} but did not find segment start header. ".format(
position) +
"Check that the tdms_index file matches the tdms data file.")
def _update_object_metadata(self, segment):
""" Update object metadata using the metadata read from a single segment
"""
for segment_object in segment.ordered_objects:
path = segment_object.path
self._prev_segment_objects[path] = segment_object
object_metadata = self._get_or_create_object(path)
object_metadata.num_values += _number_of_segment_values(segment_object, segment)
_update_object_data_type(path, object_metadata, segment_object)
if segment_object.scaler_data_types is not None:
_update_object_scaler_data_types(path, object_metadata, segment_object)
def _update_object_properties(self, segment_object_properties):
""" Update object properties using any properties in a segment
"""
if segment_object_properties is not None:
for path, properties in segment_object_properties.items():
object_metadata = self._get_or_create_object(path)
for prop, val in properties:
object_metadata.properties[prop] = val
def _get_or_create_object(self, path):
""" Get existing object metadata or create metadata for a new object
"""
try:
return self.object_metadata[path]
except KeyError:
obj = ObjectMetadata()
self.object_metadata[path] = obj
return obj
def _build_index(self, channel_path):
""" Builds an index into the segment data for faster lookup of values
_segment_channel_offsets provides data offset at the end of each segment per channel
"""
num_segments = len(self._segments)
# Get number of values for this channel in each segment
segment_num_values = np.zeros(num_segments, dtype=np.int64)
first_segment = -1
last_segment = -1
for i, segment in enumerate(self._segments):
obj_index = segment.object_index.get(channel_path)
if obj_index is not None:
segment_obj = segment.ordered_objects[obj_index]
num_values = _number_of_segment_values(segment_obj, segment)
if num_values > 0:
segment_num_values[i] = num_values
last_segment = i
if first_segment == -1:
first_segment = i
# Now use the cumulative sum to get the total channel value count
# at the end of each segment.
if first_segment == -1:
first_segment = num_segments
last_segment = num_segments
channel_offsets = np.cumsum(segment_num_values[first_segment:last_segment + 1])
# It's likely that many channels will have the same shaped data,
# so de-duplicate these arrays to reduce memory usage.
existing_arrays = (xs for (_, xs) in self._segment_channel_offsets.values())
channel_offsets = _deduplicate_array(channel_offsets, existing_arrays)
self._segment_channel_offsets[channel_path] = (first_segment, channel_offsets)
def _ensure_open(self):
if self._file is None and self._index_file is None:
raise RuntimeError(
"Cannot read data after the underlying TDMS reader is closed")
def _number_of_segment_values(segment_object, segment):
""" Compute the number of values an object has in a segment
"""
if not segment_object.has_data:
return 0
if segment.final_chunk_lengths_override is None:
return segment_object.number_values * segment.num_chunks
else:
return (segment_object.number_values * (segment.num_chunks - 1) +
segment.final_chunk_lengths_override.get(segment_object.path, 0))
def _update_object_data_type(path, obj, segment_object):
""" Update the data type for an object using its segment metadata
"""
if obj.data_type is not None and obj.data_type != segment_object.data_type:
raise ValueError(
"Segment data doesn't have the same type as previous "
"segments for objects %s. Expected type %s but got %s" %
(path, obj.data_type, segment_object.data_type))
obj.data_type = segment_object.data_type
def _update_object_scaler_data_types(path, obj, segment_object):
""" Update the DAQmx scaler data types for an object using its segment metadata
"""
if obj.scaler_data_types is not None and obj.scaler_data_types != segment_object.scaler_data_types:
raise ValueError(
"Segment data doesn't have the same scaler data types as previous "
"segments for objects %s. Expected types %s but got %s" %
(path, obj.scaler_data_types, segment_object.scaler_data_types))
obj.scaler_data_types = segment_object.scaler_data_types
class ObjectMetadata(object):
""" Stores information about an object in a TDMS file
"""
def __init__(self):
self.properties = OrderedDict()
self.data_type = None
self.scaler_data_types = None
self.num_values = 0
def _trim_channel_chunk(chunk, skip=0, trim=0):
if skip == 0 and trim == 0:
return chunk
data = None
scaler_data = None
if chunk.data is not None:
data = chunk.data[skip:len(chunk.data) - trim]
if chunk.scaler_data is not None:
scaler_data = {
scale_id: d[skip:len(d) - trim]
for (scale_id, d) in chunk.scaler_data.items()}
return RawChannelDataChunk(data, scaler_data)
def _deduplicate_array(xs, candidates):
""" Reduce memory usage by replacing an array with a reference to an existing array if equal
"""
for candidate in candidates:
if _array_equal(xs, candidate):
return candidate
return xs
def _array_equal(a, b, chunk_size=100):
""" Compare two arrays for equality
"""
# Numpy array_equal compares all elements rather than comparing one at a time and short-circuiting when it
# finds a difference. Break up the comparison into chunks to make this faster. Adapted from:
# https://stackoverflow.com/questions/26260848/numpy-fast-check-for-complete-array-equality-like-matlabs-isequal
if len(a) != len(b):
return False
num_chunks = (len(a) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
offset = i * chunk_size
if not (a[offset:offset+chunk_size] == b[offset:offset+chunk_size]).all():
return False
return True
def _get_file_size(file):
current_pos = file.tell()
file.seek(0, os.SEEK_END)
end_pos = file.tell()
file.seek(current_pos, os.SEEK_SET)
return end_pos