Skip to content

Commit

Permalink
Support streaming of data in chunks (#169)
Browse files Browse the repository at this point in the history
When opening a TDMS file with TdmsFile.read, support reading chunks of data. Also use this to support streaming export to HDF. Fixes #150 and #164.

In order to support initialising HDF arrays before data is read, adds a dtype attribute to TdmsChannel.
  • Loading branch information
adamreeve authored Mar 27, 2020
1 parent e131c40 commit ce39459
Show file tree
Hide file tree
Showing 17 changed files with 462 additions and 100 deletions.
9 changes: 9 additions & 0 deletions docs/apireference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ Reading TDMS Files
:members:
:exclude-members: group, channel, has_data, property, number_values

.. autoclass:: DataChunk()
:members:

.. autoclass:: GroupDataChunk()
:members:

.. autoclass:: ChannelDataChunk()
:members:

Writing TDMS Files
------------------

Expand Down
17 changes: 17 additions & 0 deletions docs/reading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ For example, to read 200 data points, beginning at offset 1,000::
length = 200
channel_data = channel.read_data(offset, length)

Alternatively, you may have an application where you wish to stream all data chunk by chunk.
:py:meth:`~nptdms.TdmsFile.data_chunks` is a generator that produces instances of
:py:class:`~nptdms.DataChunk`, which can be used after opening a TDMS file with
:py:meth:`~nptdms.TdmsFile.open`.
For example, to compute the mean of a channel::

channel_sum = 0.0
channel_length = 0
with TdmsFile.open(tdms_file_path) as tdms_file:
for chunk in tdms_file.data_chunks():
channel_chunk = chunk[group_name][channel_name]
channel_length += len(channel_chunk)
channel_sum += channel_chunk[:].sum()
channel_mean = channel_sum / channel_length

This approach can also be useful to stream TDMS data to another format on disk or into a data store.

In cases where you don't need to read the file data and only need to read metadata, you can
also use the static :py:meth:`~nptdms.TdmsFile.read_metadata` method::

Expand Down
13 changes: 8 additions & 5 deletions docs/writing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ speeding up the writing of files and minimising file size are not
implemented by npTDMS, but the basic functionality required to
write TDMS files is available.

To write a TDMS file, the :py:class:`nptdms.TdmsWriter` class is used, which
To write a TDMS file, the :py:class:`~nptdms.TdmsWriter` class is used, which
should be used as a context manager.
The ``__init__`` method accepts the path to the file to create, or a file
The :py:meth:`~nptdms.TdmsWriter.__init__` method accepts the path to the file to create, or a file
that has already been opened in binary write mode::

with TdmsWriter("my_file.tdms") as tdms_writer:
# write data

The :py:meth:`nptdms.TdmsWriter.write_segment` method is used to write
The :py:meth:`~nptdms.TdmsWriter.write_segment` method is used to write
a segment of data to the TDMS file. Because the TDMS file format is designed
for streaming data applications, it supports writing data one segment at a time
as data becomes available.
If you don't require this functionality you can simple call ``write_segment`` once
with all of your data.

The ``write_segment`` method takes a list of objects, each of which must be an
The :py:meth:`~nptdms.TdmsWriter.write_segment` method takes a list of objects, each of which must be an
instance of one of:

- :py:class:`nptdms.RootObject`. This is the TDMS root object, and there may only be one root object in a segment.
Expand Down Expand Up @@ -69,7 +69,7 @@ is given below::
channel_object])

You could also read a TDMS file and then re-write it by passing
:py:class:`nptdms.TdmsGroup` and :py:class:`nptdms.TdmsChannel`
:py:class:`~nptdms.TdmsGroup` and :py:class:`~nptdms.TdmsChannel`
instances to the ``write_segment`` method. If you want
to only copy certain channels for example, you could do something like::

Expand All @@ -83,3 +83,6 @@ to only copy certain channels for example, you could do something like::
root_object = RootObject(original_file.properties)
channels_to_copy = [chan for chan in original_channels if include_channel(chan)]
copied_file.write_segment([root_object] + original_groups + channels_to_copy)

Note that this isn't suitable for copying channels with scaled data, as the channel data
will already have scaling applied.
2 changes: 1 addition & 1 deletion nptdms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
from .version import __version_info__, __version__

# Export public objects
from .tdms import TdmsFile, TdmsGroup, TdmsChannel
from .tdms import TdmsFile, TdmsGroup, TdmsChannel, DataChunk, GroupDataChunk, ChannelDataChunk
from .writer import TdmsWriter, RootObject, GroupObject, ChannelObject
70 changes: 38 additions & 32 deletions nptdms/base_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ def _read_object_properties(self, file, object_path):
def read_raw_data(self, f):
"""Read raw data from a TDMS segment
:returns: A generator of DataChunk objects with raw channel data for
:returns: A generator of RawDataChunk objects with raw channel data for
objects in this segment.
"""

if not self.toc_mask & toc_properties['kTocRawData']:
yield DataChunk.empty()
yield RawDataChunk.empty()

f.seek(self.data_position)

Expand All @@ -207,12 +207,12 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=
:param channel_path: Path of channel to read data for
:param chunk_offset: Index of chunk to begin reading from
:param num_chunks: Number of chunks to read, or None to read to the end
:returns: A generator of ChannelDataChunk objects with raw channel data for
:returns: A generator of RawChannelDataChunk objects with raw channel data for
a single channel in this segment.
"""

if not self.toc_mask & toc_properties['kTocRawData']:
yield ChannelDataChunk.empty()
yield RawChannelDataChunk.empty()

f.seek(self.data_position)

Expand Down Expand Up @@ -277,14 +277,9 @@ def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path
# Derived classes can implement more optimised reading.
data_chunk = self._read_data_chunk(file, data_objects, chunk_index)
try:
if data_chunk.raw_data:
return ChannelDataChunk.channel_data(data_chunk.raw_data[channel_path])
elif data_chunk.daqmx_raw_data:
return ChannelDataChunk.scaler_data(data_chunk.daqmx_raw_data[channel_path])
else:
return ChannelDataChunk.empty()
return data_chunk.channel_data[channel_path]
except KeyError:
return ChannelDataChunk.empty()
return RawChannelDataChunk.empty()

def _new_segment_object(self, object_path):
""" Create a new segment object for a segment
Expand Down Expand Up @@ -325,56 +320,67 @@ def scaler_data_types(self):
return None


class DataChunk(object):
class RawDataChunk(object):
"""Data read from a single chunk in a TDMS segment
:ivar raw_data: A dictionary of object data in this chunk for standard
TDMS channels. Keys are object paths and values are numpy arrays.
:ivar daqmx_raw_data: A dictionary of data in this segment for
DAQmx raw data. Keys are object paths and values are dictionaries of
numpy arrays keyed by scaler id.
:ivar channel_data: A dictionary of channel data chunks.
Keys are object paths and values are RawChannelDataChunk instances.
"""

def __init__(self, data, daqmx_data):
self.raw_data = data
self.daqmx_raw_data = daqmx_data
def __init__(self, channel_data):
self.channel_data = channel_data

@staticmethod
def empty():
return DataChunk({}, {})
return RawDataChunk({})

@staticmethod
def channel_data(data):
return DataChunk(data, {})
channel_chunks = {
path: RawChannelDataChunk.channel_data(d)
for (path, d) in data.items()
}
return RawDataChunk(channel_chunks)

@staticmethod
def scaler_data(data):
return DataChunk({}, data)
channel_chunks = {
path: RawChannelDataChunk.scaler_data(d)
for (path, d) in data.items()
}
return RawDataChunk(channel_chunks)


class ChannelDataChunk(object):
class RawChannelDataChunk(object):
"""Data read for a single channel from a single chunk in a TDMS segment
:ivar raw_data: Raw data in this chunk for a standard TDMS channel.
:ivar daqmx_raw_data: A dictionary of scaler data in this segment for
:ivar data: Raw data in this chunk for a standard TDMS channel.
:ivar scaler_data: A dictionary of scaler data in this segment for
DAQmx raw data. Keys are the scaler id and values are data arrays.
"""

def __init__(self, data, daqmx_data):
self.raw_data = data
self.daqmx_raw_data = daqmx_data
def __init__(self, data, scaler_data):
self.data = data
self.scaler_data = scaler_data

def __len__(self):
if self.data is not None:
return len(self.data)
elif self.scaler_data is not None:
return next(len(d) for d in self.scaler_data.values())
return 0

@staticmethod
def empty():
return ChannelDataChunk(None, None)
return RawChannelDataChunk(None, None)

@staticmethod
def channel_data(data):
return ChannelDataChunk(data, None)
return RawChannelDataChunk(data, None)

@staticmethod
def scaler_data(data):
return ChannelDataChunk(None, data)
return RawChannelDataChunk(None, data)


def read_property(f, endianness="<"):
Expand Down
10 changes: 7 additions & 3 deletions nptdms/channel_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_data_receiver(obj, num_values, memmap_dir=None):
return DaqmxDataReceiver(obj, num_values, memmap_dir)

if obj.data_type.nptype is None:
return ListDataReceiver()
return ListDataReceiver(obj)

return NumpyDataReceiver(obj, num_values, memmap_dir)

Expand All @@ -37,9 +37,13 @@ class ListDataReceiver(object):
:ivar data: List of data points
"""

def __init__(self):
def __init__(self, channel):
"""Initialise new data receiver for a TDMS object
"""
if channel.data_type == types.String:
self._dtype = np.dtype('O')
else:
self._dtype = None
self._data = []
self.scaler_data = {}

Expand All @@ -50,7 +54,7 @@ def append_data(self, data):

@property
def data(self):
return np.array(self._data)
return np.array(self._data, dtype=self._dtype)


class NumpyDataReceiver(object):
Expand Down
4 changes: 2 additions & 2 deletions nptdms/daqmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from nptdms import types
from nptdms.base_segment import (
BaseSegment, BaseSegmentObject, DataChunk, read_interleaved_segment_bytes)
BaseSegment, BaseSegmentObject, RawDataChunk, read_interleaved_segment_bytes)
from nptdms.log import log_manager


Expand Down Expand Up @@ -78,7 +78,7 @@ def _read_data_chunk(self, file, data_objects, chunk_index):
scaler.data_type.nptype.newbyteorder(self.endianness))
scaler_data[obj.path][scaler.scale_id] = this_scaler_data

return DataChunk.scaler_data(scaler_data)
return RawDataChunk.scaler_data(scaler_data)


class DaqmxSegmentObject(BaseSegmentObject):
Expand Down
43 changes: 32 additions & 11 deletions nptdms/export/hdf_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'):

h5file = h5py.File(filepath, mode)

container_group = None
if group in h5file:
container_group = h5file[group]
else:
Expand All @@ -34,7 +33,8 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'):
container_group.attrs[property_name] = _hdf_attr_value(property_value)

# Now iterate through groups and channels,
# writing the properties and data
# writing the properties and creating data sets
datasets = {}
for group in tdms_file.groups():
# Write the group's properties
container_group.create_group(group.name)
Expand All @@ -47,26 +47,47 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'):

if channel.data_type is types.String:
# Encode as variable length UTF-8 strings
channel_data = container_group.create_dataset(
channel_key, (len(channel.data),), dtype=h5py.string_dtype())
channel_data[...] = channel.data
datasets[channel.path] = container_group.create_dataset(
channel_key, (len(channel),), dtype=h5py.string_dtype())
elif channel.data_type is types.TimeStamp:
# Timestamps are represented as fixed length ASCII strings
# because HDF doesn't natively support timestamps
channel_data = container_group.create_dataset(
channel_key, (len(channel.data),), dtype='S27')
string_data = np.datetime_as_string(channel.data, unit='us', timezone='UTC')
encoded_data = [s.encode('ascii') for s in string_data]
channel_data[...] = encoded_data
datasets[channel.path] = container_group.create_dataset(
channel_key, (len(channel),), dtype='S27')
else:
container_group[channel_key] = channel.data
datasets[channel.path] = container_group.create_dataset(
channel_key, (len(channel),), dtype=channel.dtype)

for prop_name, prop_value in channel.properties.items():
container_group[channel_key].attrs[prop_name] = _hdf_attr_value(prop_value)

# Set data
if tdms_file.data_read:
for group in tdms_file.groups():
for channel in group.channels():
datasets[channel.path][...] = _hdf_array(channel, channel.data)
else:
# Data hasn't been read into memory, stream it from disk
for chunk in tdms_file.data_chunks():
for group in chunk.groups():
for channel_chunk in group.channels():
channel = tdms_file[group.name][channel_chunk.name]
offset = channel_chunk.offset
end = offset + len(channel_chunk)
datasets[channel.path][offset:end] = _hdf_array(channel, channel_chunk[:])

return h5file


def _hdf_array(channel, data):
""" Convert data array into a format suitable for initialising HDF data
"""
if channel.data_type is types.TimeStamp:
string_data = np.datetime_as_string(data, unit='us', timezone='UTC')
return [s.encode('ascii') for s in string_data]
return data


def _hdf_attr_value(value):
""" Convert a value into a format suitable for an HDF attribute
"""
Expand Down
22 changes: 11 additions & 11 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
from nptdms.utils import Timer, OrderedDict
from nptdms.tdms_segment import read_segment_metadata
from nptdms.base_segment import ChannelDataChunk
from nptdms.base_segment import RawChannelDataChunk
from nptdms.log import log_manager

log = log_manager.get_logger(__name__)
Expand Down Expand Up @@ -75,7 +75,7 @@ def read_metadata(self):
def read_raw_data(self):
""" Read raw data from all segments, chunk by chunk
:returns: A generator that yields DataChunk objects
:returns: A generator that yields RawDataChunk objects
"""
if self._segments is None:
raise RuntimeError(
Expand All @@ -92,7 +92,7 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None):
:param length: Number of values to attempt to read.
If None, then all values starting from the offset will be read.
Fewer values will be returned if attempting to read beyond the end of the available data.
:returns: A generator that yields ChannelDataChunk objects
:returns: A generator that yields RawChannelDataChunk objects
"""
if self._segments is None:
raise RuntimeError("Cannot read data unless metadata has first been read")
Expand Down Expand Up @@ -262,12 +262,12 @@ def __init__(self):
def _trim_channel_chunk(chunk, skip=0, trim=0):
if skip == 0 and trim == 0:
return chunk
raw_data = None
daqmx_raw_data = None
if chunk.raw_data is not None:
raw_data = chunk.raw_data[skip:len(chunk.raw_data) - trim]
if chunk.daqmx_raw_data is not None:
daqmx_raw_data = {
data = None
scaler_data = None
if chunk.data is not None:
data = chunk.data[skip:len(chunk.data) - trim]
if chunk.scaler_data is not None:
scaler_data = {
scale_id: d[skip:len(d) - trim]
for (scale_id, d) in chunk.daqmx_raw_data.items()}
return ChannelDataChunk(raw_data, daqmx_raw_data)
for (scale_id, d) in chunk.scaler_data.items()}
return RawChannelDataChunk(data, scaler_data)
Loading

0 comments on commit ce39459

Please sign in to comment.