diff --git a/docs/apireference.rst b/docs/apireference.rst index 2634a9a..f3fef76 100644 --- a/docs/apireference.rst +++ b/docs/apireference.rst @@ -18,6 +18,15 @@ Reading TDMS Files :members: :exclude-members: group, channel, has_data, property, number_values +.. autoclass:: DataChunk() + :members: + +.. autoclass:: GroupDataChunk() + :members: + +.. autoclass:: ChannelDataChunk() + :members: + Writing TDMS Files ------------------ diff --git a/docs/reading.rst b/docs/reading.rst index 99dbe00..7a53087 100644 --- a/docs/reading.rst +++ b/docs/reading.rst @@ -79,6 +79,23 @@ For example, to read 200 data points, beginning at offset 1,000:: length = 200 channel_data = channel.read_data(offset, length) +Alternatively, you may have an application where you wish to stream all data chunk by chunk. +:py:meth:`~nptdms.TdmsFile.data_chunks` is a generator that produces instances of +:py:class:`~nptdms.DataChunk`, which can be used after opening a TDMS file with +:py:meth:`~nptdms.TdmsFile.open`. +For example, to compute the mean of a channel:: + + channel_sum = 0.0 + channel_length = 0 + with TdmsFile.open(tdms_file_path) as tdms_file: + for chunk in tdms_file.data_chunks(): + channel_chunk = chunk[group_name][channel_name] + channel_length += len(channel_chunk) + channel_sum += channel_chunk[:].sum() + channel_mean = channel_sum / channel_length + +This approach can also be useful to stream TDMS data to another format on disk or into a data store. + In cases where you don't need to read the file data and only need to read metadata, you can also use the static :py:meth:`~nptdms.TdmsFile.read_metadata` method:: diff --git a/docs/writing.rst b/docs/writing.rst index 0620af3..e4d4eca 100644 --- a/docs/writing.rst +++ b/docs/writing.rst @@ -7,22 +7,22 @@ speeding up the writing of files and minimising file size are not implemented by npTDMS, but the basic functionality required to write TDMS files is available. -To write a TDMS file, the :py:class:`nptdms.TdmsWriter` class is used, which +To write a TDMS file, the :py:class:`~nptdms.TdmsWriter` class is used, which should be used as a context manager. -The ``__init__`` method accepts the path to the file to create, or a file +The :py:meth:`~nptdms.TdmsWriter.__init__` method accepts the path to the file to create, or a file that has already been opened in binary write mode:: with TdmsWriter("my_file.tdms") as tdms_writer: # write data -The :py:meth:`nptdms.TdmsWriter.write_segment` method is used to write +The :py:meth:`~nptdms.TdmsWriter.write_segment` method is used to write a segment of data to the TDMS file. Because the TDMS file format is designed for streaming data applications, it supports writing data one segment at a time as data becomes available. If you don't require this functionality you can simple call ``write_segment`` once with all of your data. -The ``write_segment`` method takes a list of objects, each of which must be an +The :py:meth:`~nptdms.TdmsWriter.write_segment` method takes a list of objects, each of which must be an instance of one of: - :py:class:`nptdms.RootObject`. This is the TDMS root object, and there may only be one root object in a segment. @@ -69,7 +69,7 @@ is given below:: channel_object]) You could also read a TDMS file and then re-write it by passing -:py:class:`nptdms.TdmsGroup` and :py:class:`nptdms.TdmsChannel` +:py:class:`~nptdms.TdmsGroup` and :py:class:`~nptdms.TdmsChannel` instances to the ``write_segment`` method. If you want to only copy certain channels for example, you could do something like:: @@ -83,3 +83,6 @@ to only copy certain channels for example, you could do something like:: root_object = RootObject(original_file.properties) channels_to_copy = [chan for chan in original_channels if include_channel(chan)] copied_file.write_segment([root_object] + original_groups + channels_to_copy) + +Note that this isn't suitable for copying channels with scaled data, as the channel data +will already have scaling applied. diff --git a/nptdms/__init__.py b/nptdms/__init__.py index 12bf7bf..f39c6a5 100644 --- a/nptdms/__init__.py +++ b/nptdms/__init__.py @@ -7,5 +7,5 @@ from .version import __version_info__, __version__ # Export public objects -from .tdms import TdmsFile, TdmsGroup, TdmsChannel +from .tdms import TdmsFile, TdmsGroup, TdmsChannel, DataChunk, GroupDataChunk, ChannelDataChunk from .writer import TdmsWriter, RootObject, GroupObject, ChannelObject diff --git a/nptdms/base_segment.py b/nptdms/base_segment.py index d8792aa..c56958c 100644 --- a/nptdms/base_segment.py +++ b/nptdms/base_segment.py @@ -182,12 +182,12 @@ def _read_object_properties(self, file, object_path): def read_raw_data(self, f): """Read raw data from a TDMS segment - :returns: A generator of DataChunk objects with raw channel data for + :returns: A generator of RawDataChunk objects with raw channel data for objects in this segment. """ if not self.toc_mask & toc_properties['kTocRawData']: - yield DataChunk.empty() + yield RawDataChunk.empty() f.seek(self.data_position) @@ -207,12 +207,12 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks= :param channel_path: Path of channel to read data for :param chunk_offset: Index of chunk to begin reading from :param num_chunks: Number of chunks to read, or None to read to the end - :returns: A generator of ChannelDataChunk objects with raw channel data for + :returns: A generator of RawChannelDataChunk objects with raw channel data for a single channel in this segment. """ if not self.toc_mask & toc_properties['kTocRawData']: - yield ChannelDataChunk.empty() + yield RawChannelDataChunk.empty() f.seek(self.data_position) @@ -277,14 +277,9 @@ def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path # Derived classes can implement more optimised reading. data_chunk = self._read_data_chunk(file, data_objects, chunk_index) try: - if data_chunk.raw_data: - return ChannelDataChunk.channel_data(data_chunk.raw_data[channel_path]) - elif data_chunk.daqmx_raw_data: - return ChannelDataChunk.scaler_data(data_chunk.daqmx_raw_data[channel_path]) - else: - return ChannelDataChunk.empty() + return data_chunk.channel_data[channel_path] except KeyError: - return ChannelDataChunk.empty() + return RawChannelDataChunk.empty() def _new_segment_object(self, object_path): """ Create a new segment object for a segment @@ -325,56 +320,67 @@ def scaler_data_types(self): return None -class DataChunk(object): +class RawDataChunk(object): """Data read from a single chunk in a TDMS segment - :ivar raw_data: A dictionary of object data in this chunk for standard - TDMS channels. Keys are object paths and values are numpy arrays. - :ivar daqmx_raw_data: A dictionary of data in this segment for - DAQmx raw data. Keys are object paths and values are dictionaries of - numpy arrays keyed by scaler id. + :ivar channel_data: A dictionary of channel data chunks. + Keys are object paths and values are RawChannelDataChunk instances. """ - def __init__(self, data, daqmx_data): - self.raw_data = data - self.daqmx_raw_data = daqmx_data + def __init__(self, channel_data): + self.channel_data = channel_data @staticmethod def empty(): - return DataChunk({}, {}) + return RawDataChunk({}) @staticmethod def channel_data(data): - return DataChunk(data, {}) + channel_chunks = { + path: RawChannelDataChunk.channel_data(d) + for (path, d) in data.items() + } + return RawDataChunk(channel_chunks) @staticmethod def scaler_data(data): - return DataChunk({}, data) + channel_chunks = { + path: RawChannelDataChunk.scaler_data(d) + for (path, d) in data.items() + } + return RawDataChunk(channel_chunks) -class ChannelDataChunk(object): +class RawChannelDataChunk(object): """Data read for a single channel from a single chunk in a TDMS segment - :ivar raw_data: Raw data in this chunk for a standard TDMS channel. - :ivar daqmx_raw_data: A dictionary of scaler data in this segment for + :ivar data: Raw data in this chunk for a standard TDMS channel. + :ivar scaler_data: A dictionary of scaler data in this segment for DAQmx raw data. Keys are the scaler id and values are data arrays. """ - def __init__(self, data, daqmx_data): - self.raw_data = data - self.daqmx_raw_data = daqmx_data + def __init__(self, data, scaler_data): + self.data = data + self.scaler_data = scaler_data + + def __len__(self): + if self.data is not None: + return len(self.data) + elif self.scaler_data is not None: + return next(len(d) for d in self.scaler_data.values()) + return 0 @staticmethod def empty(): - return ChannelDataChunk(None, None) + return RawChannelDataChunk(None, None) @staticmethod def channel_data(data): - return ChannelDataChunk(data, None) + return RawChannelDataChunk(data, None) @staticmethod def scaler_data(data): - return ChannelDataChunk(None, data) + return RawChannelDataChunk(None, data) def read_property(f, endianness="<"): diff --git a/nptdms/channel_data.py b/nptdms/channel_data.py index de967f7..191213a 100644 --- a/nptdms/channel_data.py +++ b/nptdms/channel_data.py @@ -25,7 +25,7 @@ def get_data_receiver(obj, num_values, memmap_dir=None): return DaqmxDataReceiver(obj, num_values, memmap_dir) if obj.data_type.nptype is None: - return ListDataReceiver() + return ListDataReceiver(obj) return NumpyDataReceiver(obj, num_values, memmap_dir) @@ -37,9 +37,13 @@ class ListDataReceiver(object): :ivar data: List of data points """ - def __init__(self): + def __init__(self, channel): """Initialise new data receiver for a TDMS object """ + if channel.data_type == types.String: + self._dtype = np.dtype('O') + else: + self._dtype = None self._data = [] self.scaler_data = {} @@ -50,7 +54,7 @@ def append_data(self, data): @property def data(self): - return np.array(self._data) + return np.array(self._data, dtype=self._dtype) class NumpyDataReceiver(object): diff --git a/nptdms/daqmx.py b/nptdms/daqmx.py index 6171c09..2c63a37 100644 --- a/nptdms/daqmx.py +++ b/nptdms/daqmx.py @@ -3,7 +3,7 @@ from nptdms import types from nptdms.base_segment import ( - BaseSegment, BaseSegmentObject, DataChunk, read_interleaved_segment_bytes) + BaseSegment, BaseSegmentObject, RawDataChunk, read_interleaved_segment_bytes) from nptdms.log import log_manager @@ -78,7 +78,7 @@ def _read_data_chunk(self, file, data_objects, chunk_index): scaler.data_type.nptype.newbyteorder(self.endianness)) scaler_data[obj.path][scaler.scale_id] = this_scaler_data - return DataChunk.scaler_data(scaler_data) + return RawDataChunk.scaler_data(scaler_data) class DaqmxSegmentObject(BaseSegmentObject): diff --git a/nptdms/export/hdf_export.py b/nptdms/export/hdf_export.py index dadd2dd..77b9a5e 100644 --- a/nptdms/export/hdf_export.py +++ b/nptdms/export/hdf_export.py @@ -23,7 +23,6 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'): h5file = h5py.File(filepath, mode) - container_group = None if group in h5file: container_group = h5file[group] else: @@ -34,7 +33,8 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'): container_group.attrs[property_name] = _hdf_attr_value(property_value) # Now iterate through groups and channels, - # writing the properties and data + # writing the properties and creating data sets + datasets = {} for group in tdms_file.groups(): # Write the group's properties container_group.create_group(group.name) @@ -47,26 +47,47 @@ def from_tdms_file(tdms_file, filepath, mode='w', group='/'): if channel.data_type is types.String: # Encode as variable length UTF-8 strings - channel_data = container_group.create_dataset( - channel_key, (len(channel.data),), dtype=h5py.string_dtype()) - channel_data[...] = channel.data + datasets[channel.path] = container_group.create_dataset( + channel_key, (len(channel),), dtype=h5py.string_dtype()) elif channel.data_type is types.TimeStamp: # Timestamps are represented as fixed length ASCII strings # because HDF doesn't natively support timestamps - channel_data = container_group.create_dataset( - channel_key, (len(channel.data),), dtype='S27') - string_data = np.datetime_as_string(channel.data, unit='us', timezone='UTC') - encoded_data = [s.encode('ascii') for s in string_data] - channel_data[...] = encoded_data + datasets[channel.path] = container_group.create_dataset( + channel_key, (len(channel),), dtype='S27') else: - container_group[channel_key] = channel.data + datasets[channel.path] = container_group.create_dataset( + channel_key, (len(channel),), dtype=channel.dtype) for prop_name, prop_value in channel.properties.items(): container_group[channel_key].attrs[prop_name] = _hdf_attr_value(prop_value) + # Set data + if tdms_file.data_read: + for group in tdms_file.groups(): + for channel in group.channels(): + datasets[channel.path][...] = _hdf_array(channel, channel.data) + else: + # Data hasn't been read into memory, stream it from disk + for chunk in tdms_file.data_chunks(): + for group in chunk.groups(): + for channel_chunk in group.channels(): + channel = tdms_file[group.name][channel_chunk.name] + offset = channel_chunk.offset + end = offset + len(channel_chunk) + datasets[channel.path][offset:end] = _hdf_array(channel, channel_chunk[:]) + return h5file +def _hdf_array(channel, data): + """ Convert data array into a format suitable for initialising HDF data + """ + if channel.data_type is types.TimeStamp: + string_data = np.datetime_as_string(data, unit='us', timezone='UTC') + return [s.encode('ascii') for s in string_data] + return data + + def _hdf_attr_value(value): """ Convert a value into a format suitable for an HDF attribute """ diff --git a/nptdms/reader.py b/nptdms/reader.py index c2f5b1f..c9a07c9 100644 --- a/nptdms/reader.py +++ b/nptdms/reader.py @@ -4,7 +4,7 @@ import numpy as np from nptdms.utils import Timer, OrderedDict from nptdms.tdms_segment import read_segment_metadata -from nptdms.base_segment import ChannelDataChunk +from nptdms.base_segment import RawChannelDataChunk from nptdms.log import log_manager log = log_manager.get_logger(__name__) @@ -75,7 +75,7 @@ def read_metadata(self): def read_raw_data(self): """ Read raw data from all segments, chunk by chunk - :returns: A generator that yields DataChunk objects + :returns: A generator that yields RawDataChunk objects """ if self._segments is None: raise RuntimeError( @@ -92,7 +92,7 @@ def read_raw_data_for_channel(self, channel_path, offset=0, length=None): :param length: Number of values to attempt to read. If None, then all values starting from the offset will be read. Fewer values will be returned if attempting to read beyond the end of the available data. - :returns: A generator that yields ChannelDataChunk objects + :returns: A generator that yields RawChannelDataChunk objects """ if self._segments is None: raise RuntimeError("Cannot read data unless metadata has first been read") @@ -262,12 +262,12 @@ def __init__(self): def _trim_channel_chunk(chunk, skip=0, trim=0): if skip == 0 and trim == 0: return chunk - raw_data = None - daqmx_raw_data = None - if chunk.raw_data is not None: - raw_data = chunk.raw_data[skip:len(chunk.raw_data) - trim] - if chunk.daqmx_raw_data is not None: - daqmx_raw_data = { + data = None + scaler_data = None + if chunk.data is not None: + data = chunk.data[skip:len(chunk.data) - trim] + if chunk.scaler_data is not None: + scaler_data = { scale_id: d[skip:len(d) - trim] - for (scale_id, d) in chunk.daqmx_raw_data.items()} - return ChannelDataChunk(raw_data, daqmx_raw_data) + for (scale_id, d) in chunk.scaler_data.items()} + return RawChannelDataChunk(data, scaler_data) diff --git a/nptdms/scaling.py b/nptdms/scaling.py index 6b2eddb..6e7996d 100644 --- a/nptdms/scaling.py +++ b/nptdms/scaling.py @@ -61,6 +61,8 @@ def from_properties(properties, scale_index): return PolynomialScaling(coefficients, input_source) def scale(self, data): + # Ensure data is double type before scaling + data = data.astype(np.dtype('float64'), copy=False) return np.polynomial.polynomial.polyval(data, self.coefficients) @@ -229,7 +231,7 @@ def scale(self, data): """ Convert voltage data to temperature in Kelvin """ # Ensure data is double precision - data = data.astype(np.float64, copy=False) + data = data.astype(np.dtype('float64'), copy=False) if self.excitation_type == CURRENT_EXCITATION: r_t = data / self.excitation_value elif self.excitation_type == VOLTAGE_EXCITATION: @@ -376,6 +378,26 @@ def scale(self, raw_channel_data): final_scale = len(self.scalings) - 1 return self._compute_scaled_data(final_scale, raw_channel_data) + def get_dtype(self, raw_data_type, scaler_data_types): + """ Get the numpy dtype for scaled data + """ + final_scale = len(self.scalings) - 1 + return self._compute_scale_dtype(final_scale, raw_data_type, scaler_data_types) + + def _compute_scale_dtype(self, scale_index, raw_data_type, scaler_data_types): + if scale_index == RAW_DATA_INPUT_SOURCE: + return raw_data_type.nptype + scaling = self.scalings[scale_index] + if isinstance(scaling, DaqMxScalerScaling): + return scaler_data_types[scaling.scale_id].nptype + elif isinstance(scaling, AddScaling) or isinstance(scaling, SubtractScaling): + return np.result_type( + self._compute_scale_dtype(scaling.left_input_source, raw_data_type, scaler_data_types), + self._compute_scale_dtype(scaling.right_input_source, raw_data_type, scaler_data_types)) + else: + # Any other scaling type should produce double data + return np.dtype('float64') + def _compute_scaled_data(self, scale_index, raw_channel_data): """ Compute output data from a single scale in the set of all scalings, computing any required input scales recursively. diff --git a/nptdms/tdms.py b/nptdms/tdms.py index 54a4889..02d35da 100644 --- a/nptdms/tdms.py +++ b/nptdms/tdms.py @@ -3,16 +3,18 @@ This module contains the public facing API for reading TDMS files """ +from collections import defaultdict import warnings import numpy as np -from nptdms import scaling +from nptdms import scaling, types from nptdms.utils import Timer, OrderedDict from nptdms.log import log_manager from nptdms.common import ObjectPath from nptdms.reader import TdmsReader from nptdms.channel_data import get_data_receiver from nptdms.export import hdf_export, pandas_export +from nptdms.base_segment import RawChannelDataChunk log = log_manager.get_logger(__name__) @@ -91,6 +93,7 @@ def __init__(self, file, memmap_dir=None, read_metadata_only=False, keep_open=Fa self._properties = {} self._channel_data = {} self._reader = None + self.data_read = False reader = TdmsReader(file) try: @@ -144,6 +147,21 @@ def as_hdf(self, filepath, mode='w', group='/'): """ return hdf_export.from_tdms_file(self, filepath, mode, group) + def data_chunks(self): + """ A generator that streams chunks of data from disk. + This method may only be used when the TDMS file was opened without reading all data immediately. + + :rtype: Generator that yields :class:`DataChunk` objects + """ + if self._reader is None: + raise RuntimeError( + "Cannot read data chunks after the underlying TDMS reader is closed") + channel_offsets = defaultdict(int) + for chunk in self._reader.read_raw_data(): + yield DataChunk(self, chunk, channel_offsets) + for path, data in chunk.channel_data.items(): + channel_offsets[path] += len(data) + def close(self): """ Close the underlying file if it was opened by this TdmsFile @@ -219,14 +237,13 @@ def _read_data(self, tdms_reader): with Timer(log, "Read data"): # Now actually read all the data for chunk in tdms_reader.read_raw_data(): - for (path, data) in chunk.raw_data.items(): - channel_data = self._channel_data[path] - channel_data.append_data(data) - for (path, data) in chunk.daqmx_raw_data.items(): + for (path, data) in chunk.channel_data.items(): channel_data = self._channel_data[path] - for scaler_id, scaler_data in data.items(): - channel_data.append_scaler_data( - scaler_id, scaler_data) + if data.data is not None: + channel_data.append_data(data.data) + elif data.scaler_data is not None: + for scaler_id, scaler_data in data.scaler_data.items(): + channel_data.append_scaler_data(scaler_id, scaler_data) for group in self.groups(): for channel in group.channels(): @@ -234,6 +251,8 @@ def _read_data(self, tdms_reader): if channel_data is not None: channel._set_raw_data(channel_data) + self.data_read = True + def _read_channel_data(self, channel, offset=0, length=None): if offset < 0: raise ValueError("offset must be non-negative") @@ -255,10 +274,10 @@ def _read_channel_data(self, channel, offset=0, length=None): with Timer(log, "Read data"): # Now actually read all the data for chunk in self._reader.read_raw_data_for_channel(channel.path, offset, length): - if chunk.raw_data is not None: - channel_data.append_data(chunk.raw_data) - if chunk.daqmx_raw_data is not None: - for scaler_id, scaler_data in chunk.daqmx_raw_data.items(): + if chunk.data is not None: + channel_data.append_data(chunk.data) + if chunk.scaler_data is not None: + for scaler_id, scaler_data in chunk.scaler_data.items(): channel_data.append_scaler_data(scaler_id, scaler_data) return channel_data @@ -489,6 +508,24 @@ def name(self): """ return self._path.channel + @_property_builtin + def dtype(self): + """ NumPy data type of the channel data + + For data with a scaling this is the data type of the scaled data + + :rtype: numpy.dtype + """ + if self.data_type is types.String: + return np.dtype('O') + elif self.data_type is types.TimeStamp: + return np.dtype('