Skip to content

Commit

Permalink
Added LVM support to volume helper log2timeline#89
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Nov 17, 2020
1 parent 397437c commit b38d722
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 27 deletions.
89 changes: 88 additions & 1 deletion dfvfs/helpers/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ class CLIVolumeScannerMediator(volume_scanner.VolumeScannerMediator):
'volume is 1. All volumes can be defined as "all". If no volumes are '
'specified none will be processed. You can abort with Ctrl^C.')

_USER_PROMPT_LVM = (
'Please specify the identifier(s) of the LVM volume that should be '
'processed: Note that a range of volumes can be defined as: 3..5. '
'Multiple volumes can be defined as: 1,3,5 (a list of comma separated '
'values). Ranges and lists can also be combined as: 1,3..5. The first '
'volume is 1. All volumes can be defined as "all". If no volumes are '
'specified none will be processed. You can abort with Ctrl^C.')

_USER_PROMPT_TSK = (
'Please specify the identifier of the partition that should be '
'processed. All partitions can be defined as: "all". Note that you can '
Expand Down Expand Up @@ -500,6 +508,37 @@ def _PrintAPFSVolumeIdentifiersOverview(
self._output_writer.Write('\n')
table_view.Write(self._output_writer)

def _PrintLVMVolumeIdentifiersOverview(
self, volume_system, volume_identifiers):
"""Prints an overview of LVM volume identifiers.
Args:
volume_system (LVMVolumeSystem): volume system.
volume_identifiers (list[str]): allowed volume identifiers.
Raises:
ScannerError: if a volume cannot be resolved from the volume identifier.
"""
header = 'The following Logical Volume Manager (LVM) volumes were found:\n'
self._output_writer.Write(header)

column_names = ['Identifier']
table_view = CLITabularTableView(column_names=column_names)

# Sort the volume identifiers in alphanumeric order.
for volume_identifier in sorted(volume_identifiers, key=lambda string: int(
''.join([character for character in string if character.isdigit()]))):
volume = volume_system.GetVolumeByIdentifier(volume_identifier)
if not volume:
raise errors.ScannerError(
'Volume missing for identifier: {0:s}.'.format(
volume_identifier))

table_view.AddRow([volume.identifier])

self._output_writer.Write('\n')
table_view.Write(self._output_writer)

def _PrintPartitionIdentifiersOverview(
self, volume_system, volume_identifiers):
"""Prints an overview of TSK partition identifiers.
Expand Down Expand Up @@ -578,7 +617,7 @@ def _ReadSelectedVolumes(self, volume_system, prefix='v'):
"""Reads the selected volumes provided by the user.
Args:
volume_system (APFSVolumeSystem): volume system.
volume_system (VolumeSystem): volume system.
prefix (Optional[str]): volume identifier prefix.
Returns:
Expand Down Expand Up @@ -650,6 +689,52 @@ def GetAPFSVolumeIdentifiers(self, volume_system, volume_identifiers):

return selected_volumes

def GetLVMVolumeIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves LVM volume identifiers.
This method can be used to prompt the user to provide LVM volume
identifiers.
Args:
volume_system (LVMVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: selected volume identifiers including prefix or None.
"""
print_header = True
while True:
if print_header:
self._PrintLVMVolumeIdentifiersOverview(
volume_system, volume_identifiers)

print_header = False

self._output_writer.Write('\n')

lines = self._textwrapper.wrap(self._USER_PROMPT_LVM)
self._output_writer.Write('\n'.join(lines))
self._output_writer.Write('\n\nVolume identifier(s): ')

try:
selected_volumes = self._ReadSelectedVolumes(
volume_system, prefix='lvm')
if (not selected_volumes or
not set(selected_volumes).difference(volume_identifiers)):
break
except ValueError:
pass

self._output_writer.Write('\n')

lines = self._textwrapper.wrap(
'Unsupported volume identifier(s), please try again or abort with '
'Ctrl^C.')
self._output_writer.Write('\n'.join(lines))
self._output_writer.Write('\n\n')

return selected_volumes

def GetPartitionIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves partition identifiers.
Expand Down Expand Up @@ -791,6 +876,8 @@ def UnlockEncryptedVolume(
header = 'Found a BitLocker encrypted volume.'
elif locked_scan_node.type_indicator == definitions.TYPE_INDICATOR_FVDE:
header = 'Found a CoreStorage (FVDE) encrypted volume.'
elif locked_scan_node.type_indicator == definitions.TYPE_INDICATOR_LUKSDE:
header = 'Found a LUKS encrypted volume.'
else:
header = 'Found an encrypted volume.'

Expand Down
80 changes: 78 additions & 2 deletions dfvfs/helpers/volume_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver
from dfvfs.volume import apfs_volume_system
from dfvfs.volume import lvm_volume_system
from dfvfs.volume import tsk_volume_system
from dfvfs.volume import vshadow_volume_system

Expand Down Expand Up @@ -54,8 +55,8 @@ class VolumeScannerMediator(object):

# pylint: disable=redundant-returns-doc

# TODO: merge GetAPFSVolumeIdentifiers, GetVSSStoreIdentifiers,
# GetVSSStoreIdentifiers into GetVolumeIdentifiers?
# TODO: merge GetAPFSVolumeIdentifiers, GetLVMVolumeIdentifiers,
# GetVSSStoreIdentifiers, GetVSSStoreIdentifiers into GetVolumeIdentifiers?

@abc.abstractmethod
def GetAPFSVolumeIdentifiers(self, volume_system, volume_identifiers):
Expand All @@ -72,6 +73,21 @@ def GetAPFSVolumeIdentifiers(self, volume_system, volume_identifiers):
list[str]: selected volume identifiers including prefix or None.
"""

@abc.abstractmethod
def GetLVMVolumeIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves LVM volume identifiers.
This method can be used to prompt the user to provide LVM volume
identifiers.
Args:
volume_system (LVMVolumeSystem): volume system.
volume_identifiers (list[str]): volume identifiers including prefix.
Returns:
list[str]: selected volume identifiers including prefix or None.
"""

@abc.abstractmethod
def GetPartitionIdentifiers(self, volume_system, volume_identifiers):
"""Retrieves partition identifiers.
Expand Down Expand Up @@ -191,6 +207,63 @@ def _GetAPFSVolumeIdentifiers(self, scan_node, options):
return self._NormalizedVolumeIdentifiers(
volume_system, volume_identifiers, prefix='apfs')

def _GetLVMVolumeIdentifiers(self, scan_node, options):
"""Determines the LVM volume identifiers.
Args:
scan_node (SourceScanNode): scan node.
options (VolumeScannerOptions): volume scanner options.
Returns:
list[str]: LVM volume identifiers.
Raises:
ScannerError: if the format of or within the source is not supported
or the the scan node is invalid.
UserAbort: if the user requested to abort.
"""
if not scan_node or not scan_node.path_spec:
raise errors.ScannerError('Invalid scan node.')

volume_system = lvm_volume_system.LVMVolumeSystem()
volume_system.Open(scan_node.path_spec)

volume_identifiers = self._source_scanner.GetVolumeIdentifiers(
volume_system)
if not volume_identifiers:
return []

if options.volumes:
if options.volumes == ['all']:
volumes = range(1, volume_system.number_of_volumes + 1)
else:
volumes = options.volumes

try:
selected_volumes = self._NormalizedVolumeIdentifiers(
volume_system, volumes, prefix='lvm')

if not set(selected_volumes).difference(volume_identifiers):
return selected_volumes
except errors.ScannerError as exception:
if self._mediator:
self._mediator.PrintWarning('{0!s}'.format(exception))

if len(volume_identifiers) > 1:
if not self._mediator:
raise errors.ScannerError(
'Unable to proceed. LVM volumes found but no mediator to '
'determine how they should be used.')

try:
volume_identifiers = self._mediator.GetLVMVolumeIdentifiers(
volume_system, volume_identifiers)
except KeyboardInterrupt:
raise errors.UserAbort('File system scan aborted.')

return self._NormalizedVolumeIdentifiers(
volume_system, volume_identifiers, prefix='lvm')

def _GetPartitionIdentifiers(self, scan_node, options):
"""Determines the partition identifiers.
Expand Down Expand Up @@ -467,6 +540,9 @@ def _ScanVolumeSystemRoot(
if scan_node.type_indicator == definitions.TYPE_INDICATOR_APFS_CONTAINER:
volume_identifiers = self._GetAPFSVolumeIdentifiers(scan_node, options)

elif scan_node.type_indicator == definitions.TYPE_INDICATOR_LVM:
volume_identifiers = self._GetLVMVolumeIdentifiers(scan_node, options)

elif scan_node.type_indicator == definitions.TYPE_INDICATOR_VSHADOW:
volume_identifiers = self._GetVSSStoreIdentifiers(scan_node, options)
# Process VSS stores (snapshots) starting with the most recent one.
Expand Down
Binary file modified test_data/lvm.raw
Binary file not shown.
132 changes: 132 additions & 0 deletions tests/helpers/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dfvfs.helpers import command_line
from dfvfs.path import factory as path_spec_factory
from dfvfs.volume import apfs_volume_system
from dfvfs.volume import lvm_volume_system
from dfvfs.volume import tsk_volume_system
from dfvfs.volume import vshadow_volume_system

Expand Down Expand Up @@ -286,6 +287,47 @@ def testPrintAPFSVolumeIdentifiersOverview(self):

self.assertEqual(output_data.split(b'\n'), expected_output_data)

def testPrintLVMVolumeIdentifiersOverview(self):
"""Tests the _PrintLVMVolumeIdentifiersOverview function."""
test_path = self._GetTestFilePath(['lvm.raw'])
self._SkipIfPathNotExists(test_path)

test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_path)
test_raw_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec)
test_lvm_container_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_LVM, location='/', parent=test_raw_path_spec)

volume_system = lvm_volume_system.LVMVolumeSystem()
volume_system.Open(test_lvm_container_path_spec)

file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
output_writer=test_output_writer)

test_mediator._PrintLVMVolumeIdentifiersOverview(
volume_system, ['lvm1'])

file_object.seek(0, os.SEEK_SET)
output_data = file_object.read()

expected_output_data = [
b'The following Logical Volume Manager (LVM) volumes were found:',
b'',
b'Identifier',
b'lvm1',
b'']

if not win32console:
# Using join here since Python 3 does not support format of bytes.
expected_output_data[2] = b''.join([
b'\x1b[1m', expected_output_data[2], b'\x1b[0m'])

self.assertEqual(output_data.split(b'\n'), expected_output_data)

def testPrintTSKPartitionIdentifiersOverview(self):
"""Tests the _PrintTSKPartitionIdentifiersOverview function."""
test_path = self._GetTestFilePath(['mbr.raw'])
Expand Down Expand Up @@ -463,6 +505,96 @@ def testGetAPFSVolumeIdentifiers(self):

self.assertEqual(volume_identifiers, [])

def testGetLVMVolumeIdentifiers(self):
"""Tests the GetLVMVolumeIdentifiers function."""
test_path = self._GetTestFilePath(['lvm.raw'])
self._SkipIfPathNotExists(test_path)

test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_path)
test_raw_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec)
test_lvm_container_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_LVM, location='/', parent=test_raw_path_spec)

volume_system = lvm_volume_system.LVMVolumeSystem()
volume_system.Open(test_lvm_container_path_spec)

# Test selection of single volume.
input_file_object = io.BytesIO(b'1\n')
test_input_reader = command_line.FileObjectInputReader(input_file_object)

output_file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(output_file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
input_reader=test_input_reader, output_writer=test_output_writer)

volume_identifiers = test_mediator.GetLVMVolumeIdentifiers(
volume_system, ['lvm1'])

self.assertEqual(volume_identifiers, ['lvm1'])

# Test selection of single volume.
input_file_object = io.BytesIO(b'lvm1\n')
test_input_reader = command_line.FileObjectInputReader(input_file_object)

output_file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(output_file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
input_reader=test_input_reader, output_writer=test_output_writer)

volume_identifiers = test_mediator.GetLVMVolumeIdentifiers(
volume_system, ['lvm1'])

self.assertEqual(volume_identifiers, ['lvm1'])

# Test selection of single volume with invalid input on first attempt.
input_file_object = io.BytesIO(b'bogus\nlvm1\n')
test_input_reader = command_line.FileObjectInputReader(input_file_object)

output_file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(output_file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
input_reader=test_input_reader, output_writer=test_output_writer)

volume_identifiers = test_mediator.GetLVMVolumeIdentifiers(
volume_system, ['lvm1'])

self.assertEqual(volume_identifiers, ['lvm1'])

# Test selection of all volumes.
input_file_object = io.BytesIO(b'all\n')
test_input_reader = command_line.FileObjectInputReader(input_file_object)

output_file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(output_file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
input_reader=test_input_reader, output_writer=test_output_writer)

volume_identifiers = test_mediator.GetLVMVolumeIdentifiers(
volume_system, ['lvm1', 'lvm2'])

self.assertEqual(volume_identifiers, ['lvm1', 'lvm2'])

# Test selection of no volumes.
input_file_object = io.BytesIO(b'\n')
test_input_reader = command_line.FileObjectInputReader(input_file_object)

output_file_object = io.BytesIO()
test_output_writer = command_line.FileObjectOutputWriter(output_file_object)

test_mediator = command_line.CLIVolumeScannerMediator(
input_reader=test_input_reader, output_writer=test_output_writer)

volume_identifiers = test_mediator.GetLVMVolumeIdentifiers(
volume_system, ['lvm1'])

self.assertEqual(volume_identifiers, [])

def testGetPartitionIdentifiers(self):
"""Tests the GetPartitionIdentifiers function."""
test_path = self._GetTestFilePath(['mbr.raw'])
Expand Down
Loading

0 comments on commit b38d722

Please sign in to comment.