From 5f2d4c1f1d4d2a2acdbd1eeb50a655cb4fb9e164 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sun, 10 Jan 2016 13:53:08 +0100 Subject: [PATCH] Added LVM support #89 --- dfvfs/analyzer/__init__.py | 1 + dfvfs/analyzer/lvm_analyzer_helper.py | 30 ++++ dfvfs/dependencies.py | 3 +- dfvfs/file_io/lvm_file_io.py | 130 ++++++++++++++++ dfvfs/lib/definitions.py | 2 + dfvfs/lib/lvm.py | 28 ++++ dfvfs/path/__init__.py | 1 + dfvfs/path/lvm_path_spec.py | 53 +++++++ dfvfs/resolver/__init__.py | 5 + dfvfs/resolver/lvm_resolver_helper.py | 42 +++++ dfvfs/vfs/fake_file_entry.py | 6 +- dfvfs/vfs/file_entry.py | 8 +- dfvfs/vfs/file_system.py | 1 + dfvfs/vfs/lvm_file_entry.py | 175 +++++++++++++++++++++ dfvfs/vfs/lvm_file_system.py | 147 ++++++++++++++++++ dfvfs/vfs/ntfs_file_entry.py | 2 +- dfvfs/vfs/os_file_entry.py | 6 +- dfvfs/vfs/root_only_file_entry.py | 4 - dfvfs/vfs/tar_file_entry.py | 6 +- dfvfs/vfs/tsk_file_entry.py | 2 +- dfvfs/vfs/tsk_partition_file_entry.py | 7 +- dfvfs/vfs/vshadow_file_entry.py | 12 +- dfvfs/vfs/zip_file_entry.py | 6 +- dfvfs/volume/lvm_volume_system.py | 73 +++++++++ dfvfs/volume/tsk_volume_system.py | 6 +- dfvfs/volume/vshadow_volume_system.py | 5 +- test_data/lvmtest.qcow2 | Bin 0 -> 786432 bytes tests/file_io/lvm_file_io.py | 129 ++++++++++++++++ tests/file_io/vshadow_file_io.py | 8 +- tests/path/lvm_path_spec.py | 94 ++++++++++++ tests/resolver/lvm_resolver_helper.py | 26 ++++ tests/vfs/lvm_file_entry.py | 213 ++++++++++++++++++++++++++ tests/vfs/lvm_file_system.py | 175 +++++++++++++++++++++ tests/vfs/vshadow_file_entry.py | 17 +- tests/vfs/vshadow_file_system.py | 8 +- tests/volume/lvm_volume_system.py | 92 +++++++++++ tests/volume/tsk_volume_system.py | 3 +- tests/volume/vshadow_volume_system.py | 3 +- utils/review.py | 2 +- 39 files changed, 1491 insertions(+), 40 deletions(-) create mode 100644 dfvfs/analyzer/lvm_analyzer_helper.py create mode 100644 dfvfs/file_io/lvm_file_io.py create mode 100644 dfvfs/lib/lvm.py create mode 100644 dfvfs/path/lvm_path_spec.py create mode 100644 dfvfs/resolver/lvm_resolver_helper.py create mode 100644 dfvfs/vfs/lvm_file_entry.py create mode 100644 dfvfs/vfs/lvm_file_system.py create mode 100644 dfvfs/volume/lvm_volume_system.py create mode 100644 test_data/lvmtest.qcow2 create mode 100644 tests/file_io/lvm_file_io.py create mode 100644 tests/path/lvm_path_spec.py create mode 100644 tests/resolver/lvm_resolver_helper.py create mode 100644 tests/vfs/lvm_file_entry.py create mode 100644 tests/vfs/lvm_file_system.py create mode 100644 tests/volume/lvm_volume_system.py diff --git a/dfvfs/analyzer/__init__.py b/dfvfs/analyzer/__init__.py index 49e76f7b..a752ad89 100644 --- a/dfvfs/analyzer/__init__.py +++ b/dfvfs/analyzer/__init__.py @@ -4,6 +4,7 @@ from dfvfs.analyzer import bzip2_analyzer_helper from dfvfs.analyzer import ewf_analyzer_helper from dfvfs.analyzer import gzip_analyzer_helper +from dfvfs.analyzer import lvm_analyzer_helper from dfvfs.analyzer import ntfs_analyzer_helper from dfvfs.analyzer import qcow_analyzer_helper from dfvfs.analyzer import tar_analyzer_helper diff --git a/dfvfs/analyzer/lvm_analyzer_helper.py b/dfvfs/analyzer/lvm_analyzer_helper.py new file mode 100644 index 00000000..748f1a18 --- /dev/null +++ b/dfvfs/analyzer/lvm_analyzer_helper.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +"""The Logical Volume Manager (LVM) format analyzer helper implementation.""" + +from dfvfs.analyzer import analyzer +from dfvfs.analyzer import analyzer_helper +from dfvfs.analyzer import specification +from dfvfs.lib import definitions + + +class LVMAnalyzerHelper(analyzer_helper.AnalyzerHelper): + """Class that implements the LVM analyzer helper.""" + + FORMAT_CATEGORIES = frozenset([ + definitions.FORMAT_CATEGORY_VOLUME_SYSTEM]) + + TYPE_INDICATOR = definitions.TYPE_INDICATOR_LVM + + def GetFormatSpecification(self): + """Retrieves the format specification.""" + format_specification = specification.FormatSpecification( + self.type_indicator) + + # LVM signature. + format_specification.AddNewSignature(b'LABELONE', offset=0) + + return format_specification + + +# Register the analyzer helpers with the analyzer. +analyzer.Analyzer.RegisterHelper(LVMAnalyzerHelper()) diff --git a/dfvfs/dependencies.py b/dfvfs/dependencies.py index 8c876a7c..1caaa1ba 100644 --- a/dfvfs/dependencies.py +++ b/dfvfs/dependencies.py @@ -18,7 +18,8 @@ u'pysmraw': 20140612, u'pyvhdi': 20131210, u'pyvmdk': 20140421, - u'pyvshadow': 20131209} + u'pyvshadow': 20131209, + u'pyvslvm': 20160109} # The tuple values are: # module_name, version_attribute_name, minimum_version, maximum_version diff --git a/dfvfs/file_io/lvm_file_io.py b/dfvfs/file_io/lvm_file_io.py new file mode 100644 index 00000000..46429a99 --- /dev/null +++ b/dfvfs/file_io/lvm_file_io.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +"""The Logical Volume Manager (LVM) file-like object implementation.""" + +import os + +from dfvfs.file_io import file_io +from dfvfs.lib import errors +from dfvfs.lib import lvm +from dfvfs.resolver import resolver + + +class LVMFile(file_io.FileIO): + """Class that implements a file-like object using pyvslvm.""" + + def __init__(self, resolver_context): + """Initializes the file-like object. + + Args: + resolver_context: the resolver context (instance of Context). + """ + super(LVMFile, self).__init__(resolver_context) + self._file_system = None + self._vslvm_logical_volume = None + + def _Close(self): + """Closes the file-like object. + + Raises: + IOError: if the close failed. + """ + self._vslvm_logical_volume = None + + self._file_system.Close() + self._file_system = None + + def _Open(self, path_spec=None, mode='rb'): + """Opens the file-like object defined by path specification. + + Args: + path_spec: optional path specification (instance of PathSpec). + mode: optional file access mode. The default is 'rb' read-only binary. + + Raises: + AccessError: if the access to open the file was denied. + IOError: if the file-like object could not be opened. + PathSpecError: if the path specification is incorrect. + ValueError: if the path specification is invalid. + """ + if not path_spec: + raise ValueError(u'Missing path specfication.') + + volume_index = lvm.LVMPathSpecGetVolumeIndex(path_spec) + if volume_index is None: + raise errors.PathSpecError( + u'Unable to retrieve volume index from path specification.') + + self._file_system = resolver.Resolver.OpenFileSystem( + path_spec, resolver_context=self._resolver_context) + vslvm_volume_group = self._file_system.GetLVMVolumeGroup() + + if (volume_index < 0 or + volume_index >= vslvm_volume_group.number_of_logical_volumes): + raise errors.PathSpecError(( + u'Unable to retrieve LVM logical volume index: {0:d} from path ' + u'specification.').format(volume_index)) + + self._vslvm_logical_volume = vslvm_volume_group.get_logical_volume( + volume_index) + + # Note: that the following functions do not follow the style guide + # because they are part of the file-like object interface. + + def read(self, size=None): + """Reads a byte string from the file-like object at the current offset. + + The function will read a byte string of the specified size or + all of the remaining data if no size was specified. + + Args: + size: Optional integer value containing the number of bytes to read. + Default is all remaining data (None). + + Returns: + A byte string containing the data read. + + Raises: + IOError: if the read failed. + """ + if not self._is_open: + raise IOError(u'Not opened.') + + return self._vslvm_logical_volume.read(size) + + def seek(self, offset, whence=os.SEEK_SET): + """Seeks an offset within the file-like object. + + Args: + offset: The offset to seek. + whence: Optional value that indicates whether offset is an absolute + or relative position within the file. Default is SEEK_SET. + + Raises: + IOError: if the seek failed. + """ + if not self._is_open: + raise IOError(u'Not opened.') + + self._vslvm_logical_volume.seek(offset, whence) + + def get_offset(self): + """Returns the current offset into the file-like object. + + Raises: + IOError: if the file-like object has not been opened. + """ + if not self._is_open: + raise IOError(u'Not opened.') + + return self._vslvm_logical_volume.get_offset() + + def get_size(self): + """Returns the size of the file-like object. + + Raises: + IOError: if the file-like object has not been opened. + """ + if not self._is_open: + raise IOError(u'Not opened.') + + return self._vslvm_logical_volume.size diff --git a/dfvfs/lib/definitions.py b/dfvfs/lib/definitions.py index 0ab6e2a2..9b0cfd1a 100644 --- a/dfvfs/lib/definitions.py +++ b/dfvfs/lib/definitions.py @@ -20,6 +20,7 @@ TYPE_INDICATOR_EWF = u'EWF' TYPE_INDICATOR_FAKE = u'FAKE' TYPE_INDICATOR_GZIP = u'GZIP' +TYPE_INDICATOR_LVM = u'LVM' TYPE_INDICATOR_MOUNT = u'MOUNT' TYPE_INDICATOR_NTFS = u'NTFS' TYPE_INDICATOR_OS = u'OS' @@ -49,6 +50,7 @@ TYPE_INDICATOR_VMDK]) VOLUME_SYSTEM_TYPE_INDICATORS = frozenset([ + TYPE_INDICATOR_LVM, TYPE_INDICATOR_TSK_PARTITION, TYPE_INDICATOR_VSHADOW]) diff --git a/dfvfs/lib/lvm.py b/dfvfs/lib/lvm.py new file mode 100644 index 00000000..686b451c --- /dev/null +++ b/dfvfs/lib/lvm.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +"""Helper functions for Logical Volume Manager (LVM) support.""" + + +def LVMPathSpecGetVolumeIndex(path_spec): + """Retrieves the volume index from the path specification. + + Args: + path_spec: the path specification (instance of PathSpec). + """ + volume_index = getattr(path_spec, u'volume_index', None) + + if volume_index is None: + location = getattr(path_spec, u'location', None) + + if location is None or not location.startswith(u'/lvm'): + return + + volume_index = None + try: + volume_index = int(location[4:], 10) - 1 + except ValueError: + pass + + if volume_index is None or volume_index < 0: + return + + return volume_index diff --git a/dfvfs/path/__init__.py b/dfvfs/path/__init__.py index fb5c8e51..f85d22a6 100644 --- a/dfvfs/path/__init__.py +++ b/dfvfs/path/__init__.py @@ -7,6 +7,7 @@ from dfvfs.path import ewf_path_spec from dfvfs.path import fake_path_spec from dfvfs.path import gzip_path_spec +from dfvfs.path import lvm_path_spec from dfvfs.path import mount_path_spec from dfvfs.path import ntfs_path_spec from dfvfs.path import os_path_spec diff --git a/dfvfs/path/lvm_path_spec.py b/dfvfs/path/lvm_path_spec.py new file mode 100644 index 00000000..c34acded --- /dev/null +++ b/dfvfs/path/lvm_path_spec.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +"""The Logical Volume Manager (LVM) path specification implementation.""" + +from dfvfs.lib import definitions +from dfvfs.path import factory +from dfvfs.path import path_spec + + +class LVMPathSpec(path_spec.PathSpec): + """Class that implements the LVM path specification. + + Attributes: + location: string containing the location. + volume_index: integer containing the volume index. + """ + + TYPE_INDICATOR = definitions.TYPE_INDICATOR_LVM + + def __init__(self, location=None, parent=None, volume_index=None, **kwargs): + """Initializes the path specification object. + + Note that the LVM path specification must have a parent. + + Args: + location: optional string containing the location. + parent: optional parent path specification (instance of PathSpec). + volume_index: optional integer containing the volume index. + + Raises: + ValueError: when parent is not set. + """ + if not parent: + raise ValueError(u'Missing parent value.') + + super(LVMPathSpec, self).__init__(parent=parent, **kwargs) + self.location = location + self.volume_index = volume_index + + @property + def comparable(self): + """Comparable representation of the path specification.""" + string_parts = [] + + if self.location is not None: + string_parts.append(u'location: {0:s}'.format(self.location)) + if self.volume_index is not None: + string_parts.append(u'volume index: {0:d}'.format(self.volume_index)) + + return self._GetComparable(sub_comparable_string=u', '.join(string_parts)) + + +# Register the path specification with the factory. +factory.Factory.RegisterPathSpec(LVMPathSpec) diff --git a/dfvfs/resolver/__init__.py b/dfvfs/resolver/__init__.py index 9431a3da..b3fcf49a 100644 --- a/dfvfs/resolver/__init__.py +++ b/dfvfs/resolver/__init__.py @@ -17,6 +17,11 @@ from dfvfs.resolver import fake_resolver_helper from dfvfs.resolver import gzip_resolver_helper +try: + from dfvfs.resolver import lvm_resolver_helper +except ImportError: + pass + try: from dfvfs.resolver import ntfs_resolver_helper except ImportError: diff --git a/dfvfs/resolver/lvm_resolver_helper.py b/dfvfs/resolver/lvm_resolver_helper.py new file mode 100644 index 00000000..0faeb178 --- /dev/null +++ b/dfvfs/resolver/lvm_resolver_helper.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""The LVM path specification resolver helper implementation.""" + +# This is necessary to prevent a circular import. +import dfvfs.file_io.lvm_file_io +import dfvfs.vfs.lvm_file_system + +from dfvfs.lib import definitions +from dfvfs.resolver import resolver +from dfvfs.resolver import resolver_helper + + +class LVMResolverHelper(resolver_helper.ResolverHelper): + """Class that implements the Logical Volume Manager (LVM) resolver helper.""" + + TYPE_INDICATOR = definitions.TYPE_INDICATOR_LVM + + def NewFileObject(self, resolver_context): + """Creates a new file-like object. + + Args: + resolver_context: the resolver context (instance of resolver.Context). + + Returns: + The file-like object (instance of file_io.FileIO). + """ + return dfvfs.file_io.lvm_file_io.LVMFile(resolver_context) + + def NewFileSystem(self, resolver_context): + """Creates a new file system object. + + Args: + resolver_context: the resolver context (instance of resolver.Context). + + Returns: + The file system object (instance of vfs.FileSystem). + """ + return dfvfs.vfs.lvm_file_system.LVMFileSystem(resolver_context) + + +# Register the resolver helpers with the resolver. +resolver.Resolver.RegisterHelper(LVMResolverHelper()) diff --git a/dfvfs/vfs/fake_file_entry.py b/dfvfs/vfs/fake_file_entry.py index 680fa07f..bb32ad00 100644 --- a/dfvfs/vfs/fake_file_entry.py +++ b/dfvfs/vfs/fake_file_entry.py @@ -145,7 +145,11 @@ def GetFileObject(self, data_stream_name=u''): return file_object def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the root file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ location = getattr(self.path_spec, u'location', None) if location is None: return diff --git a/dfvfs/vfs/file_entry.py b/dfvfs/vfs/file_entry.py index 42417be7..fe4dc25b 100644 --- a/dfvfs/vfs/file_entry.py +++ b/dfvfs/vfs/file_entry.py @@ -292,9 +292,13 @@ def GetLinkedFileEntry(self): """Retrieves the linked file entry, e.g. for a symbolic link.""" return - @abc.abstractmethod def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the root file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ + return def GetSubFileEntryByName(self, name, case_sensitive=True): """Retrieves a sub file entry by name. diff --git a/dfvfs/vfs/file_system.py b/dfvfs/vfs/file_system.py index f6bd85d5..c228ce65 100644 --- a/dfvfs/vfs/file_system.py +++ b/dfvfs/vfs/file_system.py @@ -7,6 +7,7 @@ class FileSystem(object): """Class that implements the VFS file system object interface.""" + LOCATION_ROOT = u'/' PATH_SEPARATOR = u'/' def __init__(self, resolver_context): diff --git a/dfvfs/vfs/lvm_file_entry.py b/dfvfs/vfs/lvm_file_entry.py new file mode 100644 index 00000000..ed68c91e --- /dev/null +++ b/dfvfs/vfs/lvm_file_entry.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +"""The Logical Volume Manager (LVM) file entry implementation.""" + +from dfvfs.lib import date_time +from dfvfs.lib import definitions +from dfvfs.lib import errors +from dfvfs.lib import lvm +from dfvfs.path import lvm_path_spec +from dfvfs.vfs import file_entry +from dfvfs.vfs import vfs_stat + + +class LVMDirectory(file_entry.Directory): + """Class that implements a directory object using pyvslvm.""" + + def _EntriesGenerator(self): + """Retrieves directory entries. + + Since a directory can contain a vast number of entries using + a generator is more memory efficient. + + Yields: + A path specification (instance of PathSpec). + """ + # Only the virtual root file has directory entries. + volume_index = getattr(self.path_spec, u'volume_index', None) + if volume_index is not None: + return + + location = getattr(self.path_spec, u'location', None) + if location is None or location != self._file_system.LOCATION_ROOT: + return + + vslvm_volume_group = self._file_system.GetLVMVolumeGroup() + + for volume_index in range(0, vslvm_volume_group.number_of_logical_volumes): + yield lvm_path_spec.LVMPathSpec( + location=u'/lvm{0:d}'.format(volume_index + 1), + parent=self.path_spec.parent, volume_index=volume_index) + + +class LVMFileEntry(file_entry.FileEntry): + """Class that implements a file entry object using pyvslvm.""" + + TYPE_INDICATOR = definitions.TYPE_INDICATOR_LVM + + def __init__( + self, resolver_context, file_system, path_spec, is_root=False, + is_virtual=False): + """Initializes the file entry object. + + Args: + resolver_context: the resolver context (instance of resolver.Context). + file_system: the file system object (instance of vfs.FileSystem). + path_spec: the path specification (instance of path.PathSpec). + is_root: optional boolean value to indicate if the file entry is + the root file entry of the corresponding file system. + is_virtual: optional boolean value to indicate if the file entry is + a virtual file entry emulated by the corresponding file + system. + """ + super(LVMFileEntry, self).__init__( + resolver_context, file_system, path_spec, is_root=is_root, + is_virtual=is_virtual) + self._name = None + + def _GetDirectory(self): + """Retrieves the directory. + + Returns: + A directory object (instance of Directory) or None. + """ + if self._stat_object is None: + self._stat_object = self._GetStat() + + if (self._stat_object and + self._stat_object.type == self._stat_object.TYPE_DIRECTORY): + return LVMDirectory(self._file_system, self.path_spec) + return + + def _GetStat(self): + """Retrieves the stat object. + + Returns: + The stat object (instance of VFSStat). + + Raises: + BackEndError: when the vslvm logical volume is missing in a non-virtual + file entry. + """ + vslvm_logical_volume = self.GetLVMLogicalVolume() + if not self._is_virtual and vslvm_logical_volume is None: + raise errors.BackEndError( + u'Missing vslvm logical volume in non-virtual file entry.') + + stat_object = vfs_stat.VFSStat() + + # File data stat information. + if vslvm_logical_volume is not None: + stat_object.size = vslvm_logical_volume.size + + # Date and time stat information. + if vslvm_logical_volume is not None: + # TODO: implement in pyvslvm + # timestamp = vslvm_logical_volume.get_creation_time_as_integer() + timestamp = None + if timestamp is not None: + date_time_values = date_time.PosixTimestamp(timestamp) + + stat_time, stat_time_nano = date_time_values.CopyToStatObject() + if stat_time is not None: + stat_object.crtime = stat_time + stat_object.crtime_nano = stat_time_nano + + # Ownership and permissions stat information. + + # File entry type stat information. + + # The root file entry is virtual and should have type directory. + if self._is_virtual: + stat_object.type = stat_object.TYPE_DIRECTORY + else: + stat_object.type = stat_object.TYPE_FILE + + return stat_object + + @property + def name(self): + """The name of the file entry, which does not include the full path.""" + if self._name is None: + location = getattr(self.path_spec, u'location', None) + if location is not None: + self._name = self._file_system.BasenamePath(location) + else: + volume_index = getattr(self.path_spec, u'volume_index', None) + if volume_index is not None: + self._name = u'lvm{0:d}'.format(volume_index + 1) + else: + self._name = u'' + return self._name + + @property + def sub_file_entries(self): + """The sub file entries (generator of instance of FileEntry).""" + if self._directory is None: + self._directory = self._GetDirectory() + + if self._directory: + for path_spec in self._directory.entries: + yield LVMFileEntry(self._resolver_context, self._file_system, path_spec) + + def GetLVMLogicalVolume(self): + """Retrieves the LVM logical volume object. + + Returns: + A LVM logical volume object (instance of pyvslvm.logical_volume). + """ + volume_index = lvm.LVMPathSpecGetVolumeIndex(self.path_spec) + if volume_index is None: + return + + vslvm_volume_group = self._file_system.GetLVMVolumeGroup() + return vslvm_volume_group.get_logical_volume(volume_index) + + def GetParentFileEntry(self): + """Retrieves the parent file entry. + + Returns: + The parent file entry object (instance of FileEntry) or None. + """ + volume_index = lvm.LVMPathSpecGetVolumeIndex(self.path_spec) + if volume_index is None: + return + + return self._file_system.GetRootFileEntry() diff --git a/dfvfs/vfs/lvm_file_system.py b/dfvfs/vfs/lvm_file_system.py new file mode 100644 index 00000000..45080620 --- /dev/null +++ b/dfvfs/vfs/lvm_file_system.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +"""The Logical Volume Manager (LVM) file system implementation.""" + +import pyvslvm + +# This is necessary to prevent a circular import. +import dfvfs.vfs.lvm_file_entry + +from dfvfs import dependencies +from dfvfs.lib import definitions +from dfvfs.lib import errors +from dfvfs.lib import lvm +from dfvfs.path import lvm_path_spec +from dfvfs.resolver import resolver +from dfvfs.vfs import file_system + + +dependencies.CheckModuleVersion(u'pyvslvm') + + +class LVMFileSystem(file_system.FileSystem): + """Class that implements a file system object using pyvslvm.""" + + TYPE_INDICATOR = definitions.TYPE_INDICATOR_LVM + + def __init__(self, resolver_context): + """Initializes a file system object. + + Args: + resolver_context: the resolver context (instance of Context). + """ + super(LVMFileSystem, self).__init__(resolver_context) + self._file_object = None + self._vslvm_handle = None + + def _Close(self): + """Closes the file system object. + + Raises: + IOError: if the close failed. + """ + self._vslvm_volume_group = None + self._vslvm_handle.close() + self._vslvm_handle = None + + self._file_object.close() + self._file_object = None + + def _Open(self, path_spec, mode='rb'): + """Opens the file system object defined by path specification. + + Args: + path_spec: a path specification (instance of PathSpec). + mode: optional file access mode. The default is 'rb' read-only binary. + + Raises: + AccessError: if the access to open the file was denied. + IOError: if the file system object could not be opened. + PathSpecError: if the path specification is incorrect. + ValueError: if the path specification is invalid. + """ + if not path_spec.HasParent(): + raise errors.PathSpecError( + u'Unsupported path specification without parent.') + + file_object = resolver.Resolver.OpenFileObject( + path_spec.parent, resolver_context=self._resolver_context) + + try: + vslvm_handle = pyvslvm.handle() + vslvm_handle.open_file_object(file_object) + # TODO: implement multi physical volume support. + vslvm_handle.open_physical_volume_files_as_file_objects([ + file_object]) + vslvm_volume_group = vslvm_handle.get_volume_group() + except: + file_object.close() + raise + + self._file_object = file_object + self._vslvm_handle = vslvm_handle + self._vslvm_volume_group = vslvm_volume_group + + def FileEntryExistsByPathSpec(self, path_spec): + """Determines if a file entry for a path specification exists. + + Args: + path_spec: a path specification (instance of PathSpec). + + Returns: + Boolean indicating if the file entry exists. + """ + volume_index = lvm.LVMPathSpecGetVolumeIndex(path_spec) + + # The virtual root file has not corresponding volume index but + # should have a location. + if volume_index is None: + location = getattr(path_spec, u'location', None) + return location is not None and location == self.LOCATION_ROOT + + return (volume_index >= 0 and + volume_index < self._vslvm_volume_group.number_of_logical_volumes) + + def GetFileEntryByPathSpec(self, path_spec): + """Retrieves a file entry for a path specification. + + Args: + path_spec: a path specification (instance of PathSpec). + + Returns: + A file entry (instance of FileEntry) or None. + """ + volume_index = lvm.LVMPathSpecGetVolumeIndex(path_spec) + + # The virtual root file has not corresponding volume index but + # should have a location. + if volume_index is None: + location = getattr(path_spec, u'location', None) + if location is None or location != self.LOCATION_ROOT: + return + return dfvfs.vfs.lvm_file_entry.LVMFileEntry( + self._resolver_context, self, path_spec, is_root=True, + is_virtual=True) + + if (volume_index < 0 or + volume_index >= self._vslvm_volume_group.number_of_logical_volumes): + return + return dfvfs.vfs.lvm_file_entry.LVMFileEntry( + self._resolver_context, self, path_spec) + + def GetLVMVolumeGroup(self): + """Retrieves the LVM volume group object. + + Returns: + The LVM handle object (instance of pyvslvm.volume_group). + """ + return self._vslvm_volume_group + + def GetRootFileEntry(self): + """Retrieves the root file entry. + + Returns: + A file entry (instance of FileEntry). + """ + path_spec = lvm_path_spec.LVMPathSpec( + location=self.LOCATION_ROOT, parent=self._path_spec.parent) + return self.GetFileEntryByPathSpec(path_spec) diff --git a/dfvfs/vfs/ntfs_file_entry.py b/dfvfs/vfs/ntfs_file_entry.py index edf50440..7a1a4839 100644 --- a/dfvfs/vfs/ntfs_file_entry.py +++ b/dfvfs/vfs/ntfs_file_entry.py @@ -532,7 +532,7 @@ def GetParentFileEntry(self): """Retrieves the parent file entry. Returns: - The parent file entry (instance of NTFSFileEntry) or None. + The parent file entry (instance of FileEntry) or None. Raises: BackEndError: if the pyfsntfs file entry is missing. diff --git a/dfvfs/vfs/os_file_entry.py b/dfvfs/vfs/os_file_entry.py index 30e74b88..99f58491 100644 --- a/dfvfs/vfs/os_file_entry.py +++ b/dfvfs/vfs/os_file_entry.py @@ -238,7 +238,11 @@ def GetLinkedFileEntry(self): return OSFileEntry(self._resolver_context, self._file_system, path_spec) def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the parent file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ location = getattr(self.path_spec, u'location', None) if location is None: return diff --git a/dfvfs/vfs/root_only_file_entry.py b/dfvfs/vfs/root_only_file_entry.py index 2b721f82..0dd14c9a 100644 --- a/dfvfs/vfs/root_only_file_entry.py +++ b/dfvfs/vfs/root_only_file_entry.py @@ -35,7 +35,3 @@ def sub_file_entries(self): # pylint: disable=unreachable return yield - - def GetParentFileEntry(self): - """Retrieves the parent file entry.""" - return diff --git a/dfvfs/vfs/tar_file_entry.py b/dfvfs/vfs/tar_file_entry.py index 8d63fae2..e272e63e 100644 --- a/dfvfs/vfs/tar_file_entry.py +++ b/dfvfs/vfs/tar_file_entry.py @@ -194,7 +194,11 @@ def sub_file_entries(self): yield TarFileEntry(self._resolver_context, self._file_system, path_spec) def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the parent file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ location = getattr(self.path_spec, u'location', None) if location is None: return diff --git a/dfvfs/vfs/tsk_file_entry.py b/dfvfs/vfs/tsk_file_entry.py index 2e35f341..2f88b8e4 100644 --- a/dfvfs/vfs/tsk_file_entry.py +++ b/dfvfs/vfs/tsk_file_entry.py @@ -494,7 +494,7 @@ def GetParentFileEntry(self): """Retrieves the parent file entry. Returns: - The parent file entry (instance of TSKFileEntry) or None. + The parent file entry (instance of FileEntry) or None. """ location = getattr(self.path_spec, u'location', None) if location is None: diff --git a/dfvfs/vfs/tsk_partition_file_entry.py b/dfvfs/vfs/tsk_partition_file_entry.py index 487cf1fe..3295b7a5 100644 --- a/dfvfs/vfs/tsk_partition_file_entry.py +++ b/dfvfs/vfs/tsk_partition_file_entry.py @@ -169,7 +169,12 @@ def sub_file_entries(self): self._resolver_context, self._file_system, path_spec) def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the parent file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ + # TODO: implement https://github.com/log2timeline/dfvfs/issues/76. return def GetTSKVsPart(self): diff --git a/dfvfs/vfs/vshadow_file_entry.py b/dfvfs/vfs/vshadow_file_entry.py index 858898e0..6b5c0851 100644 --- a/dfvfs/vfs/vshadow_file_entry.py +++ b/dfvfs/vfs/vshadow_file_entry.py @@ -148,8 +148,16 @@ def sub_file_entries(self): self._resolver_context, self._file_system, path_spec) def GetParentFileEntry(self): - """Retrieves the parent file entry.""" - return + """Retrieves the parent file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ + store_index = vshadow.VShadowPathSpecGetStoreIndex(self.path_spec) + if store_index is None: + return + + return self._file_system.GetRootFileEntry() def GetVShadowStore(self): """Retrieves the VSS store object (instance of pyvshadow.store).""" diff --git a/dfvfs/vfs/zip_file_entry.py b/dfvfs/vfs/zip_file_entry.py index 54af9838..85002f60 100644 --- a/dfvfs/vfs/zip_file_entry.py +++ b/dfvfs/vfs/zip_file_entry.py @@ -178,7 +178,11 @@ def sub_file_entries(self): yield ZipFileEntry(self._resolver_context, self._file_system, path_spec) def GetParentFileEntry(self): - """Retrieves the parent file entry.""" + """Retrieves the parent file entry. + + Returns: + The parent file entry (instance of FileEntry) or None. + """ location = getattr(self.path_spec, u'location', None) if location is None: return diff --git a/dfvfs/volume/lvm_volume_system.py b/dfvfs/volume/lvm_volume_system.py new file mode 100644 index 00000000..d3611862 --- /dev/null +++ b/dfvfs/volume/lvm_volume_system.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""Volume system object implementation using Logical Volume Manager (LVM).""" + +from dfvfs.lib import definitions +from dfvfs.lib import errors +from dfvfs.resolver import resolver +from dfvfs.volume import volume_system + + +class LVMVolume(volume_system.Volume): + """Class that implements a volume object using pyvslvm.""" + + def __init__(self, file_entry): + """Initializes the volume object. + + Args: + file_entry: the LVM file entry object (instance of FileEntry). + """ + super(LVMVolume, self).__init__(file_entry.name) + self._file_entry = file_entry + + def _Parse(self): + """Extracts attributes and extents from the volume.""" + vslvm_logical_volume = self._file_entry.GetLVMLogicalVolume() + + self._AddAttribute(volume_system.VolumeAttribute( + u'identifier', vslvm_logical_volume.identifier)) + # TODO: implement in pyvslvm + # self._AddAttribute(volume_system.VolumeAttribute( + # u'creation_time', vslvm_logical_volume.get_creation_time_as_integer())) + + # TODO: add support for logical volume extents + volume_extent = volume_system.VolumeExtent(0, vslvm_logical_volume.size) + self._extents.append(volume_extent) + + +class LVMVolumeSystem(volume_system.VolumeSystem): + """Class that implements a volume system object using pyvslvm.""" + + def __init__(self): + """Initializes the volume system object. + + Raises: + VolumeSystemError: if the volume system could not be accessed. + """ + super(LVMVolumeSystem, self).__init__() + self._file_system = None + + def _Parse(self): + """Extracts sections and volumes from the volume system.""" + root_file_entry = self._file_system.GetRootFileEntry() + + for sub_file_entry in root_file_entry.sub_file_entries: + volume = LVMVolume(sub_file_entry) + self._AddVolume(volume) + + def Open(self, path_spec): + """Opens a volume object defined by path specification. + + Args: + path_spec: the VFS path specification (instance of PathSpec). + + Raises: + VolumeSystemError: if the LVM virtual file system could not be resolved. + """ + self._file_system = resolver.Resolver.OpenFileSystem(path_spec) + if self._file_system is None: + raise errors.VolumeSystemError( + u'Unable to resolve file system from path specification.') + + type_indicator = self._file_system.type_indicator + if type_indicator != definitions.TYPE_INDICATOR_LVM: + raise errors.VolumeSystemError(u'Unsupported file system type.') diff --git a/dfvfs/volume/tsk_volume_system.py b/dfvfs/volume/tsk_volume_system.py index 5f486332..93786c41 100644 --- a/dfvfs/volume/tsk_volume_system.py +++ b/dfvfs/volume/tsk_volume_system.py @@ -38,9 +38,10 @@ def _Parse(self): start_sector = tsk_partition.TSKVsPartGetStartSector(tsk_vs_part) number_of_sectors = tsk_partition.TSKVsPartGetNumberOfSectors(tsk_vs_part) - self._extents.append(volume_system.VolumeExtent( + volume_extent = volume_system.VolumeExtent( start_sector * self._bytes_per_sector, - number_of_sectors * self._bytes_per_sector)) + number_of_sectors * self._bytes_per_sector) + self._extents.append(volume_extent) class TSKVolumeSystem(volume_system.VolumeSystem): @@ -91,7 +92,6 @@ def Open(self, path_spec): be resolved. """ self._file_system = resolver.Resolver.OpenFileSystem(path_spec) - if self._file_system is None: raise errors.VolumeSystemError( u'Unable to resolve file system from path specification.') diff --git a/dfvfs/volume/vshadow_volume_system.py b/dfvfs/volume/vshadow_volume_system.py index 73877d31..0ca4e9ab 100644 --- a/dfvfs/volume/vshadow_volume_system.py +++ b/dfvfs/volume/vshadow_volume_system.py @@ -32,8 +32,8 @@ def _Parse(self): self._AddAttribute(volume_system.VolumeAttribute( u'creation_time', vshadow_store.get_creation_time_as_integer())) - self._extents.append(volume_system.VolumeExtent( - 0, vshadow_store.volume_size)) + volume_extent = volume_system.VolumeExtent(0, vshadow_store.volume_size) + self._extents.append(volume_extent) class VShadowVolumeSystem(volume_system.VolumeSystem): @@ -66,7 +66,6 @@ def Open(self, path_spec): VolumeSystemError: if the VSS virtual file system could not be resolved. """ self._file_system = resolver.Resolver.OpenFileSystem(path_spec) - if self._file_system is None: raise errors.VolumeSystemError( u'Unable to resolve file system from path specification.') diff --git a/test_data/lvmtest.qcow2 b/test_data/lvmtest.qcow2 new file mode 100644 index 0000000000000000000000000000000000000000..f9012656203da57c00aeda7e593a0a50dde68c23 GIT binary patch literal 786432 zcmeI)Pi!35eE{IuB`sMi*^(2-ahk-Q(s5$PT57qZBuZ!t(=si{k*(O2=~{-vF3FL& z)^c~fyQC=xMVal3aR94sCO;gB${LNDf60>YH7z zME#*fo0KTh-$*~ryf<&=z4y(&AH@ZBpLycg|3y*M6Jq(cCoNzu!R z<@3%EAh5E)VtQ)F%I9zNnZ>k&+@)UH<(-$xjtCGSK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oSh= z0>?)l8$EvF>Cwzmm(?G?`Y&-fK0f|bF)kDaCR?Mg9WRv&rQ^-hW5XlKp)Wr3g`>|M zn`w<4Ia_~z_F!nbbm^X@JFKwlo_=TdH5+vMzvfAm1PBlyK!5-N0t5&UAh2eEe;E4y z*|`17-olIfM;>exp8ovrw0{@7^7pk|r>oE2*4h8j9sm5}HFxd?8!j+^W}=leTk)53 z<>~mhc1 zg$o99r)!PbQY*c-GCx}kXIiD!Tr-qk?0a@}?AWQ%k;jgY_U(;BY2@*A|NOJZPLB4y zlsjE1ooTmwDVHp?l4@(BS$-`E7Y+{`D&}TO3lo+3P%5+!>r018t2A9|l_sWY^>UJ4 zUC8C?Gv}J+sZwQPzE+u=4Ygkmb@PRbbLzZ@kx$>m$fxgOii?AXhYlSs4u>uV4&-uEwQ9F1L ziH|mFbM?Mlw}8rcSF29M!;W=Zl3x|LLh>v43Isz{G)}_<@0V?5QutC+Cv*@nk9<*dGrZ zIK01bxHuR;ck)O&^8InA>7}k)<=ONx3=Hir4jw!dI76Q+PkY1H&}JUMTGxNk`j20H zDNP;kcl}ptXRb_ImHBou9=MpfD$^sA2SXx$ZlINa?F; z<;k(Z`6E~4+~rv|q}%ahW5?1Ad|AG_?kZn#F%A!f+<#^^P1oIo7V^kaRXXx^+MDQP z^stBv-Q?bFUCiZL=j!b=+iZp0lT4@MPY*7;VY{aN<0012KN6}i>|Dg46{oma6pIb_k#fxcj^2vqSsp0&Sqfed6zx=x=59Mp) z$qV_R!pIBxXHPV0`Pm~UhEKhAGt=a7q0r5Vmu0n1=3Gsh3`zBarODztr^)btwL)QQ zr^)cNu1b^3r@fi~gJYe?}>UhU=+F} zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk8!K>ORonIz2iddY87Bx3 zxSl}fdJS&u`Na!=aAP~T)$hKl^rsPlEh^AmpFe#4{CGFIbvI-Gi;>VvZxm(UI6wY- zOVR1KH_9#zAl-jvgA?V_Q0%stz~WcJgbKaA-PeA)?@TchJ-WuZ4tUM3QSioJ{#KfQ z3Q=_R^Up^2F6;im@GL(Y;&UM$3h{7=&xd#<#NH735d9(cg(!qr?bgkIr$X-QM(5gW zniktz`a9O(`?%-QWfylJ`^T^STXBW!)7St0(6ZZiy6;?=x!8YtX{MWuFWz_?$4fd-JIZA%wPx#))3v$kblNVh+nFs*OqUzU zRIAoF7wrypZ}r@fsh66~S8I*wW`Ap;73ypR2;BMt={jG&{(sfeS^u~G$c>Nyfprn+ zU6;GK2>}9|QNa4&j8Dy&39OHR^}jxj?m}QQ3RwS}@u?Xzf%Orv{@2IRT?lMO0qcJ= zJ~d+|us#CT|N1z(3xUljVEu2#r)JCq)*MGy z1U8{SPZ)EW0&WYjEyVT^w};pfA{V0b8G$IeBOKouBK^$Io)CA1_(+J4hPXS#$3ol_ z;^QIi4e^N(zY*e-AwCu2(;@B)aes(7#BYYM5g@RU0@r}aNfm-=Pf_H92) z(?j%Ni1d6jsg$d83zxM~AVA>O5}2z_)vBk<)ns~N?r?4LlT^3VV6iaMYFOMlkyiMC}*6Q|3Sr1NwCXFavG*5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5;&^%gieQ*OrLZz*mi3#~Z)uwQ>J_k0pJXKHhm>3A}UOOur(Zq?#?snJa0a;rc6 zP5>JL0-Imp;`;A=u6HQ`Z2ozf0RaL82oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7e?2NS4QN>j<+xn|PX zTQ4=6uhtsVxye%X?B17aNprT`n%Uc&t0#>{d1@wCs+1bDar0cWmCWwV?;R)(4&_>n zq|}^iB=OWt(rl%*Vav@;CXKk6Of`~LTq%_s$==fRY`NO`H}}Ct>M8;R2oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5V)ZNz2WYAqUa}K_odxY^y{7|$}I0N4uv$*c47O`>*vR#=)#4u zUuN3vI_1u}?sYpuaWTYjh-`?o{$JmG>^pz+A0yxVy9eL>#8=-r@;{;K25eyh@3DR7 zCkOBU-rGn1_q&7Nf9G%iv+(J3T)URqFQ3PI+~R$o>YBXo3td3qCJ1~a^zqRUcZc}W z?qBz0y9@m$^sNg51PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZpd)bM8k_H|_|UvM7O(O6RehKQwr}if z{li3Ut^{s1f$r*0zyGQm-MX8x|HVk?C49G7x9>~e!`0LN?y&9{{-k?gPhcGd7QYg1 zR_N`$`t_HI;x+d3?KNKS@*6MkR+@hb*UCTH=-y@B|7ke>vk+eo@n(o`g!pENKM(O2 zA^tMNUxoPV5Z?-6BS3%v0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PE+afovG~t|*i35Btt>HrpQxJ45-u zdUDxHt=W3ybZxFWy;)~y+yn@$B#^H2aDaSu+;>t|E=~!&4a0$WYM`rm3#)O-l6TA(MiOjE#ZA-09s z9^&>8J3{0_bUq^xMR$bbJ45Uau_we`A#4N)5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly OK!5-N0tB{_!2bghxtP!Z literal 0 HcmV?d00001 diff --git a/tests/file_io/lvm_file_io.py b/tests/file_io/lvm_file_io.py new file mode 100644 index 00000000..2f5b1a8c --- /dev/null +++ b/tests/file_io/lvm_file_io.py @@ -0,0 +1,129 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the file-like object implementation using pyvslvm.""" + +import os +import unittest + +from dfvfs.lib import errors +from dfvfs.file_io import lvm_file_io +from dfvfs.path import os_path_spec +from dfvfs.path import qcow_path_spec +from dfvfs.path import lvm_path_spec +from dfvfs.resolver import context + + +class LVMFileTest(unittest.TestCase): + """The unit test for the Logical Volume Manager (LVM) file-like object.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_file = os.path.join(u'test_data', u'lvmtest.qcow2') + path_spec = os_path_spec.OSPathSpec(location=test_file) + self._qcow_path_spec = qcow_path_spec.QCOWPathSpec(parent=path_spec) + + def testOpenClose(self): + """Test the open and close functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + file_object.open(path_spec=path_spec) + self.assertEqual(file_object.get_size(), 4194304) + file_object.close() + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=13) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + with self.assertRaises(errors.PathSpecError): + file_object.open(path_spec=path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm1', parent=self._qcow_path_spec) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + file_object.open(path_spec=path_spec) + self.assertEqual(file_object.get_size(), 8388608) + file_object.close() + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm0', parent=self._qcow_path_spec) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + with self.assertRaises(errors.PathSpecError): + file_object.open(path_spec=path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm13', parent=self._qcow_path_spec) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + with self.assertRaises(errors.PathSpecError): + file_object.open(path_spec=path_spec) + + def testSeek(self): + """Test the seek functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=0) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + file_object.open(path_spec=path_spec) + self.assertEqual(file_object.get_size(), 8388608) + + file_object.seek(0x488) + self.assertEqual(file_object.get_offset(), 0x488) + self.assertEqual(file_object.read(16), b'/home/username/s') + self.assertEqual(file_object.get_offset(), 0x498) + + file_object.seek(-1047544, os.SEEK_END) + self.assertEqual(file_object.get_offset(), 0x00700408) + self.assertEqual(file_object.read(8), b'er,passw') + self.assertEqual(file_object.get_offset(), 0x00700410) + + file_object.seek(3, os.SEEK_CUR) + self.assertEqual(file_object.get_offset(), 0x00700413) + self.assertEqual(file_object.read(7), b'\nbank,j') + self.assertEqual(file_object.get_offset(), 0x0070041a) + + # Conforming to the POSIX seek the offset can exceed the file size + # but reading will result in no data being returned. + expected_offset = 8388608 + 100 + file_object.seek(expected_offset, os.SEEK_SET) + self.assertEqual(file_object.get_offset(), expected_offset) + self.assertEqual(file_object.read(20), b'') + + with self.assertRaises(IOError): + file_object.seek(-10, os.SEEK_SET) + + # On error the offset should not change. + self.assertEqual(file_object.get_offset(), expected_offset) + + with self.assertRaises(IOError): + file_object.seek(10, 5) + + # On error the offset should not change. + self.assertEqual(file_object.get_offset(), expected_offset) + + file_object.close() + + def testRead(self): + """Test the read functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=0) + file_object = lvm_file_io.LVMFile(self._resolver_context) + + file_object.open(path_spec=path_spec) + self.assertEqual(file_object.get_size(), 8388608) + + file_object.seek(0x80400) + + expected_data = ( + b'This is a text file.\n\nWe should be able to parse it.\n') + self.assertEqual(file_object.read(53), expected_data) + + file_object.close() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/file_io/vshadow_file_io.py b/tests/file_io/vshadow_file_io.py index 54fb8556..35a433fb 100644 --- a/tests/file_io/vshadow_file_io.py +++ b/tests/file_io/vshadow_file_io.py @@ -26,7 +26,7 @@ def setUp(self): def testOpenClose(self): """Test the open and close functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_object = vshadow_file_io.VShadowFile(self._resolver_context) file_object.open(path_spec=path_spec) @@ -34,7 +34,7 @@ def testOpenClose(self): file_object.close() path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=13, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=13) file_object = vshadow_file_io.VShadowFile(self._resolver_context) with self.assertRaises(errors.PathSpecError): @@ -65,7 +65,7 @@ def testOpenClose(self): def testSeek(self): """Test the seek functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_object = vshadow_file_io.VShadowFile(self._resolver_context) file_object.open(path_spec=path_spec) @@ -110,7 +110,7 @@ def testSeek(self): def testRead(self): """Test the read functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_object = vshadow_file_io.VShadowFile(self._resolver_context) file_object.open(path_spec=path_spec) diff --git a/tests/path/lvm_path_spec.py b/tests/path/lvm_path_spec.py new file mode 100644 index 00000000..d384ed69 --- /dev/null +++ b/tests/path/lvm_path_spec.py @@ -0,0 +1,94 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the LVM path specification implementation.""" + +import unittest + +from dfvfs.path import lvm_path_spec + +from tests.path import test_lib + + +class LVMPathSpecTest(test_lib.PathSpecTestCase): + """Tests for the LVM path specification implementation.""" + + def testInitialize(self): + """Tests the path specification initialization.""" + path_spec = lvm_path_spec.LVMPathSpec(parent=self._path_spec) + + self.assertIsNotNone(path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._path_spec) + + self.assertIsNotNone(path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._path_spec, volume_index=1) + + self.assertIsNotNone(path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._path_spec, volume_index=1) + + self.assertIsNotNone(path_spec) + + with self.assertRaises(ValueError): + _ = lvm_path_spec.LVMPathSpec(parent=None) + + with self.assertRaises(ValueError): + _ = lvm_path_spec.LVMPathSpec( + parent=self._path_spec, bogus=u'BOGUS') + + def testComparable(self): + """Tests the path specification comparable property.""" + path_spec = lvm_path_spec.LVMPathSpec(parent=self._path_spec) + + self.assertIsNotNone(path_spec) + + expected_comparable = u'\n'.join([ + u'type: TEST', + u'type: LVM', + u'']) + + self.assertEqual(path_spec.comparable, expected_comparable) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._path_spec) + + self.assertIsNotNone(path_spec) + + expected_comparable = u'\n'.join([ + u'type: TEST', + u'type: LVM, location: /lvm2', + u'']) + + self.assertEqual(path_spec.comparable, expected_comparable) + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._path_spec, volume_index=1) + + self.assertIsNotNone(path_spec) + + expected_comparable = u'\n'.join([ + u'type: TEST', + u'type: LVM, volume index: 1', + u'']) + + self.assertEqual(path_spec.comparable, expected_comparable) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._path_spec, volume_index=1) + + self.assertIsNotNone(path_spec) + + expected_comparable = u'\n'.join([ + u'type: TEST', + u'type: LVM, location: /lvm2, volume index: 1', + u'']) + + self.assertEqual(path_spec.comparable, expected_comparable) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/resolver/lvm_resolver_helper.py b/tests/resolver/lvm_resolver_helper.py new file mode 100644 index 00000000..e2c52a27 --- /dev/null +++ b/tests/resolver/lvm_resolver_helper.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the LVM resolver helper implementation.""" + +import unittest + +from dfvfs.resolver import lvm_resolver_helper +from tests.resolver import test_lib + + +class LVMResolverHelperTest(test_lib.ResolverHelperTestCase): + """Tests for the LVM resolver helper implementation.""" + + def testNewFileObject(self): + """Tests the NewFileObject function.""" + resolver_helper_object = lvm_resolver_helper.LVMResolverHelper() + self._TestNewFileObject(resolver_helper_object) + + def testNewFileSystem(self): + """Tests the NewFileSystem function.""" + resolver_helper_object = lvm_resolver_helper.LVMResolverHelper() + self._TestNewFileSystem(resolver_helper_object) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/lvm_file_entry.py b/tests/vfs/lvm_file_entry.py new file mode 100644 index 00000000..ae290e1c --- /dev/null +++ b/tests/vfs/lvm_file_entry.py @@ -0,0 +1,213 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the file entry implementation using the pyvslvm.""" + +import os +import unittest + +from dfvfs.path import lvm_path_spec +from dfvfs.path import os_path_spec +from dfvfs.path import qcow_path_spec +from dfvfs.resolver import context +from dfvfs.vfs import lvm_file_entry +from dfvfs.vfs import lvm_file_system + + +class LVMFileEntryTest(unittest.TestCase): + """The unit test for the LVM file entry object.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_file = os.path.join(u'test_data', u'lvmtest.qcow2') + path_spec = os_path_spec.OSPathSpec(location=test_file) + self._qcow_path_spec = qcow_path_spec.QCOWPathSpec(parent=path_spec) + self._lvm_path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + + self._file_system = lvm_file_system.LVMFileSystem(self._resolver_context) + self._file_system.Open(self._lvm_path_spec) + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._file_system.Close() + + # qcowmount test_data/lvmtest.qcow2 fuse/ + # vslvminfo fuse/qcow1 + # + # Linux Logical Volume Manager (LVM) information: + # Volume Group (VG): + # Name: vg_test + # Identifier: kZ4S06-lhFY-G4cB-8OQx-SWVg-GrI6-1jEYEf + # Sequence number: 3 + # Extent size: 4194304 bytes + # Number of physical volumes: 1 + # Number of logical volumes: 2 + # + # Physical Volume (PV): 1 + # Name: pv0 + # Identifier: btEzLa-i0aL-sfS8-Ae9P-QKGU-IhtA-CkpWm7 + # Device path: /dev/loop1 + # Volume size: 16777216 bytes + # + # Logical Volume (LV): 1 + # Name: lv_test1 + # Identifier: ldAb7Y-GU1t-qDml-VkAp-qt46-0meR-qJS3vC + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 8.0 MiB (8388608 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00000000 (0) + # + # Logical Volume (LV): 2 + # Name: lv_test2 + # Identifier: bJxmc8-JEMZ-jXT9-oVeY-40AY-ROro-mCO8Zz + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 4.0 MiB (4194304 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00800000 (8388608) + + def testIntialize(self): + """Test the initialize functionality.""" + file_entry = lvm_file_entry.LVMFileEntry( + self._resolver_context, self._file_system, self._lvm_path_spec) + + self.assertIsNotNone(file_entry) + + def testGetParentFileEntry(self): + """Test the get parent file entry functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + parent_file_entry = file_entry.GetParentFileEntry() + self.assertIsNotNone(parent_file_entry) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + parent_file_entry = file_entry.GetParentFileEntry() + self.assertIsNone(parent_file_entry) + + def testGetStat(self): + """Test the get stat functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_object = file_entry.GetStat() + + self.assertIsNotNone(stat_object) + self.assertEqual(stat_object.type, stat_object.TYPE_FILE) + + def testIsFunctions(self): + """Test the Is? functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertFalse(file_entry.IsRoot()) + self.assertFalse(file_entry.IsVirtual()) + self.assertTrue(file_entry.IsAllocated()) + + self.assertFalse(file_entry.IsDevice()) + self.assertFalse(file_entry.IsDirectory()) + self.assertTrue(file_entry.IsFile()) + self.assertFalse(file_entry.IsLink()) + self.assertFalse(file_entry.IsPipe()) + self.assertFalse(file_entry.IsSocket()) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertTrue(file_entry.IsRoot()) + self.assertTrue(file_entry.IsVirtual()) + self.assertTrue(file_entry.IsAllocated()) + + self.assertFalse(file_entry.IsDevice()) + self.assertTrue(file_entry.IsDirectory()) + self.assertFalse(file_entry.IsFile()) + self.assertFalse(file_entry.IsLink()) + self.assertFalse(file_entry.IsPipe()) + self.assertFalse(file_entry.IsSocket()) + + def testSubFileEntries(self): + """Test the sub file entries iteration functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_sub_file_entries, 2) + + expected_sub_file_entry_names = [u'lvm1', u'lvm2'] + + sub_file_entry_names = [] + for sub_file_entry in file_entry.sub_file_entries: + sub_file_entry_names.append(sub_file_entry.name) + + self.assertEqual( + len(sub_file_entry_names), len(expected_sub_file_entry_names)) + self.assertEqual( + sorted(sub_file_entry_names), sorted(expected_sub_file_entry_names)) + + def testDataStreams(self): + """Test the data streams functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 1) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, [u'']) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 0) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, []) + + def testGetDataStream(self): + """Test the retrieve data streams functionality.""" + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_stream_name = u'' + data_stream = file_entry.GetDataStream(data_stream_name) + self.assertIsNotNone(data_stream) + self.assertEqual(data_stream.name, data_stream_name) + + data_stream = file_entry.GetDataStream(u'bogus') + self.assertIsNone(data_stream) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/lvm_file_system.py b/tests/vfs/lvm_file_system.py new file mode 100644 index 00000000..be9e4368 --- /dev/null +++ b/tests/vfs/lvm_file_system.py @@ -0,0 +1,175 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for a file system implementation using pyvslvm.""" + +import os +import unittest + +from dfvfs.path import lvm_path_spec +from dfvfs.path import os_path_spec +from dfvfs.path import qcow_path_spec +from dfvfs.resolver import context +from dfvfs.vfs import lvm_file_system + + +class LVMFileSystemTest(unittest.TestCase): + """The unit test for the LVM file system object.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_file = os.path.join(u'test_data', u'lvmtest.qcow2') + path_spec = os_path_spec.OSPathSpec(location=test_file) + self._qcow_path_spec = qcow_path_spec.QCOWPathSpec(parent=path_spec) + self._lvm_path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + + # qcowmount test_data/lvmtest.qcow2 fuse/ + # vslvminfo fuse/qcow1 + # + # Linux Logical Volume Manager (LVM) information: + # Volume Group (VG): + # Name: vg_test + # Identifier: kZ4S06-lhFY-G4cB-8OQx-SWVg-GrI6-1jEYEf + # Sequence number: 3 + # Extent size: 4194304 bytes + # Number of physical volumes: 1 + # Number of logical volumes: 2 + # + # Physical Volume (PV): 1 + # Name: pv0 + # Identifier: btEzLa-i0aL-sfS8-Ae9P-QKGU-IhtA-CkpWm7 + # Device path: /dev/loop1 + # Volume size: 16777216 bytes + # + # Logical Volume (LV): 1 + # Name: lv_test1 + # Identifier: ldAb7Y-GU1t-qDml-VkAp-qt46-0meR-qJS3vC + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 8.0 MiB (8388608 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00000000 (0) + # + # Logical Volume (LV): 2 + # Name: lv_test2 + # Identifier: bJxmc8-JEMZ-jXT9-oVeY-40AY-ROro-mCO8Zz + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 4.0 MiB (4194304 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00800000 (8388608) + + def testOpenAndClose(self): + """Test the open and close functionality.""" + file_system = lvm_file_system.LVMFileSystem(self._resolver_context) + self.assertIsNotNone(file_system) + + file_system.Open(self._lvm_path_spec) + + file_system.Close() + + def testFileEntryExistsByPathSpec(self): + """Test the file entry exists by path specification functionality.""" + file_system = lvm_file_system.LVMFileSystem(self._resolver_context) + self.assertIsNotNone(file_system) + + file_system.Open(self._lvm_path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._qcow_path_spec) + self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=9) + self.assertFalse(file_system.FileEntryExistsByPathSpec(path_spec)) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm0', parent=self._qcow_path_spec) + self.assertFalse(file_system.FileEntryExistsByPathSpec(path_spec)) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm9', parent=self._qcow_path_spec) + self.assertFalse(file_system.FileEntryExistsByPathSpec(path_spec)) + + file_system.Close() + + def testGetFileEntryByPathSpec(self): + """Test the get entry by path specification functionality.""" + file_system = lvm_file_system.LVMFileSystem(self._resolver_context) + self.assertIsNotNone(file_system) + + file_system.Open(self._lvm_path_spec) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNotNone(file_entry) + self.assertEqual(file_entry.name, u'') + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=1) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNotNone(file_entry) + self.assertEqual(file_entry.name, u'lvm2') + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm2', parent=self._qcow_path_spec) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNotNone(file_entry) + self.assertEqual(file_entry.name, u'lvm2') + + path_spec = lvm_path_spec.LVMPathSpec( + parent=self._qcow_path_spec, volume_index=9) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNone(file_entry) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm0', parent=self._qcow_path_spec) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNone(file_entry) + + path_spec = lvm_path_spec.LVMPathSpec( + location=u'/lvm9', parent=self._qcow_path_spec) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNone(file_entry) + + file_system.Close() + + def testGetRootFileEntry(self): + """Test the get root file entry functionality.""" + file_system = lvm_file_system.LVMFileSystem(self._resolver_context) + self.assertIsNotNone(file_system) + + file_system.Open(self._lvm_path_spec) + + file_entry = file_system.GetRootFileEntry() + + self.assertIsNotNone(file_entry) + self.assertEqual(file_entry.name, u'') + + file_system.Close() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/vshadow_file_entry.py b/tests/vfs/vshadow_file_entry.py index 2a5f5b5e..38288eeb 100644 --- a/tests/vfs/vshadow_file_entry.py +++ b/tests/vfs/vshadow_file_entry.py @@ -66,18 +66,25 @@ def testIntialize(self): def testGetParentFileEntry(self): """Test the get parent file entry functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) parent_file_entry = file_entry.GetParentFileEntry() + self.assertIsNotNone(parent_file_entry) + path_spec = vshadow_path_spec.VShadowPathSpec( + location=u'/', parent=self._qcow_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + parent_file_entry = file_entry.GetParentFileEntry() self.assertIsNone(parent_file_entry) def testGetStat(self): """Test the get stat functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) @@ -89,7 +96,7 @@ def testGetStat(self): def testIsFunctions(self): """Test the Is? functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) @@ -143,7 +150,7 @@ def testSubFileEntries(self): def testDataStreams(self): """Test the data streams functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) @@ -171,7 +178,7 @@ def testDataStreams(self): def testGetDataStream(self): """Test the retrieve data streams functionality.""" path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) diff --git a/tests/vfs/vshadow_file_system.py b/tests/vfs/vshadow_file_system.py index 16d38ff9..3bf39882 100644 --- a/tests/vfs/vshadow_file_system.py +++ b/tests/vfs/vshadow_file_system.py @@ -68,7 +68,7 @@ def testFileEntryExistsByPathSpec(self): self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) path_spec = vshadow_path_spec.VShadowPathSpec( @@ -76,7 +76,7 @@ def testFileEntryExistsByPathSpec(self): self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=9, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=9) self.assertFalse(file_system.FileEntryExistsByPathSpec(path_spec)) path_spec = vshadow_path_spec.VShadowPathSpec( @@ -104,7 +104,7 @@ def testGetFileEntryByPathSpec(self): self.assertEqual(file_entry.name, u'') path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=1, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=1) file_entry = file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) @@ -118,7 +118,7 @@ def testGetFileEntryByPathSpec(self): self.assertEqual(file_entry.name, u'vss2') path_spec = vshadow_path_spec.VShadowPathSpec( - store_index=9, parent=self._qcow_path_spec) + parent=self._qcow_path_spec, store_index=9) file_entry = file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNone(file_entry) diff --git a/tests/volume/lvm_volume_system.py b/tests/volume/lvm_volume_system.py new file mode 100644 index 00000000..8a83ba1e --- /dev/null +++ b/tests/volume/lvm_volume_system.py @@ -0,0 +1,92 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""Tests for the volume system implementation using pyvslvm.""" + +import os +import unittest + +from dfvfs.path import os_path_spec +from dfvfs.path import qcow_path_spec +from dfvfs.path import lvm_path_spec +from dfvfs.volume import lvm_volume_system + + +class LVMVolumeSystemTest(unittest.TestCase): + """The unit test for the Logical Volume Manager (LVM) volume system object.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + test_file = os.path.join(u'test_data', u'lvmtest.qcow2') + path_spec = os_path_spec.OSPathSpec(location=test_file) + path_spec = qcow_path_spec.QCOWPathSpec(parent=path_spec) + self._lvm_path_spec = lvm_path_spec.LVMPathSpec( + location=u'/', parent=path_spec) + + # qcowmount test_data/lvmtest.qcow2 fuse/ + # vslvminfo fuse/qcow1 + # + # Linux Logical Volume Manager (LVM) information: + # Volume Group (VG): + # Name: vg_test + # Identifier: kZ4S06-lhFY-G4cB-8OQx-SWVg-GrI6-1jEYEf + # Sequence number: 3 + # Extent size: 4194304 bytes + # Number of physical volumes: 1 + # Number of logical volumes: 2 + # + # Physical Volume (PV): 1 + # Name: pv0 + # Identifier: btEzLa-i0aL-sfS8-Ae9P-QKGU-IhtA-CkpWm7 + # Device path: /dev/loop1 + # Volume size: 16777216 bytes + # + # Logical Volume (LV): 1 + # Name: lv_test1 + # Identifier: ldAb7Y-GU1t-qDml-VkAp-qt46-0meR-qJS3vC + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 8.0 MiB (8388608 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00000000 (0) + # + # Logical Volume (LV): 2 + # Name: lv_test2 + # Identifier: bJxmc8-JEMZ-jXT9-oVeY-40AY-ROro-mCO8Zz + # Number of segments: 1 + # Segment: 1 + # Offset: 0x00000000 (0) + # Size: 4.0 MiB (4194304 bytes) + # Number of stripes: 1 + # Stripe: 1 + # Physical volume: pv0 + # Data area offset: 0x00800000 (8388608) + + def testIterateVolumes(self): + """Test the iterate volumes functionality.""" + volume_system = lvm_volume_system.LVMVolumeSystem() + + volume_system.Open(self._lvm_path_spec) + + self.assertEqual(volume_system.number_of_volumes, 2) + + volume = volume_system.GetVolumeByIndex(1) + self.assertIsNotNone(volume) + + self.assertEqual(volume.number_of_extents, 1) + self.assertEqual(volume.number_of_attributes, 1) + self.assertEqual(volume.identifier, u'lvm2') + + expected_value = u'bJxmc8-JEMZ-jXT9-oVeY-40AY-ROro-mCO8Zz' + volume_attribute = volume.GetAttribute(u'identifier') + self.assertIsNotNone(volume_attribute) + self.assertEqual(volume_attribute.value, expected_value) + + volume = volume_system.GetVolumeByIndex(7) + self.assertIsNone(volume) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/volume/tsk_volume_system.py b/tests/volume/tsk_volume_system.py index d11e3524..61a45f2c 100644 --- a/tests/volume/tsk_volume_system.py +++ b/tests/volume/tsk_volume_system.py @@ -46,8 +46,8 @@ def testIterateVolumes(self): self.assertEqual(volume_system.number_of_volumes, 2) volume = volume_system.GetVolumeByIndex(1) - self.assertIsNotNone(volume) + self.assertEqual(volume.number_of_extents, 1) self.assertEqual(volume.number_of_attributes, 2) self.assertEqual(volume.identifier, u'p2') @@ -63,7 +63,6 @@ def testIterateVolumes(self): self.assertEqual(volume_attribute.value, expected_value) volume = volume_system.GetVolumeByIndex(7) - self.assertIsNone(volume) diff --git a/tests/volume/vshadow_volume_system.py b/tests/volume/vshadow_volume_system.py index 4927041d..0fd70282 100644 --- a/tests/volume/vshadow_volume_system.py +++ b/tests/volume/vshadow_volume_system.py @@ -54,8 +54,8 @@ def testIterateVolumes(self): self.assertEqual(volume_system.number_of_volumes, 2) volume = volume_system.GetVolumeByIndex(1) - self.assertIsNotNone(volume) + self.assertEqual(volume.number_of_extents, 1) self.assertEqual(volume.number_of_attributes, 4) self.assertEqual(volume.identifier, u'vss2') @@ -81,7 +81,6 @@ def testIterateVolumes(self): self.assertEqual(volume_attribute.value, expected_value) volume = volume_system.GetVolumeByIndex(7) - self.assertIsNone(volume) diff --git a/utils/review.py b/utils/review.py index 0ecf01f3..9fa1d41f 100755 --- a/utils/review.py +++ b/utils/review.py @@ -1444,7 +1444,7 @@ def Create(self): create_github_origin = u'{0:s}:{1:s}'.format( git_origin, self._active_branch) - if self._github_helper.CreatePullRequest( + if not self._github_helper.CreatePullRequest( github_access_token, codereview_issue_number, create_github_origin, description): print(u'Unable to create pull request.')