From 38f8979e41fe261a2c7be2050a78e4c9484c993e Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Fri, 26 May 2023 13:36:32 -0400 Subject: [PATCH] Changed GPT back-end to support gpt{GUID} alias (#738) --- dfvfs/file_io/gpt_file_io.py | 11 ++++---- dfvfs/helpers/command_line.py | 42 ++++++++++++---------------- dfvfs/lib/gpt_helper.py | 31 --------------------- dfvfs/vfs/gpt_file_entry.py | 30 ++++++++++---------- dfvfs/vfs/gpt_file_system.py | 46 +++++++++++++++++++++++++++---- tests/vfs/gpt_file_entry.py | 6 ++-- tests/vfs/gpt_file_system.py | 27 ++++++++++++++---- tests/volume/gpt_volume_system.py | 26 ++++++++--------- 8 files changed, 118 insertions(+), 101 deletions(-) delete mode 100644 dfvfs/lib/gpt_helper.py diff --git a/dfvfs/file_io/gpt_file_io.py b/dfvfs/file_io/gpt_file_io.py index 8307e69b..ea57190f 100644 --- a/dfvfs/file_io/gpt_file_io.py +++ b/dfvfs/file_io/gpt_file_io.py @@ -5,7 +5,6 @@ from dfvfs.file_io import file_io from dfvfs.lib import errors -from dfvfs.lib import gpt_helper from dfvfs.resolver import resolver @@ -50,19 +49,21 @@ def _Open(self, mode='rb'): OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ - entry_index = gpt_helper.GPTPathSpecGetEntryIndex(self._path_spec) + file_system = resolver.Resolver.OpenFileSystem( + self._path_spec, resolver_context=self._resolver_context) + + entry_index = file_system.GetEntryIndexByPathSpec(self._path_spec) if entry_index is None: raise errors.PathSpecError( 'Unable to retrieve entry index from path specification.') - self._file_system = resolver.Resolver.OpenFileSystem( - self._path_spec, resolver_context=self._resolver_context) - vsgpt_volume = self._file_system.GetGPTVolume() + vsgpt_volume = file_system.GetGPTVolume() if not vsgpt_volume.has_partition_with_identifier(entry_index): raise errors.PathSpecError( f'Missing GPT partition with entry index: {entry_index:d}') + self._file_system = file_system self._vsgpt_partition = vsgpt_volume.get_partition_by_identifier( entry_index) diff --git a/dfvfs/helpers/command_line.py b/dfvfs/helpers/command_line.py index 2f19a47d..146a6aad 100644 --- a/dfvfs/helpers/command_line.py +++ b/dfvfs/helpers/command_line.py @@ -290,35 +290,27 @@ class CLIVolumeScannerMediator(volume_scanner.VolumeScannerMediator): _UNITS_1000 = ['B', 'kB', 'MB', 'GB', 'TB', 'EB', 'ZB', 'YB'] _UNITS_1024 = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'EiB', 'ZiB', 'YiB'] - _USER_PROMPT_APFS = ( - 'Please specify the identifier(s) of the APFS volume that should be ' - 'processed: Note that a range of volumes can be defined as: 3..5. ' - 'Multiple volumes can be defined as: 1,3,5 (a list of comma separated ' + _USER_PROMPT_PARTITIONS = ( + 'Please specify the identifier of the partition that should be ' + 'processed. All partitions can be defined as: "all". Note that you can ' + 'abort with Ctrl^C.') + + _USER_PROMPT_SNAPSHOTS = ( + 'Please specify the identifier(s) of the {0:s} snapshot that should be ' + 'processed. Note that a range of snapshots can be defined as: 3..5. ' + 'Multiple snapshots can be defined as: 1,3,5 (a list of comma separated ' 'values). Ranges and lists can also be combined as: 1,3..5. The first ' - 'volume is 1. All volumes can be defined as "all". If no volumes are ' - 'specified none will be processed. You can abort with Ctrl^C.') + 'snapshot is 1. All snapshots can be defined as "all". If no snapshots ' + 'are specified none will be processed. You can abort with Ctrl^C.') - _USER_PROMPT_LVM = ( - 'Please specify the identifier(s) of the LVM volume that should be ' - 'processed: Note that a range of volumes can be defined as: 3..5. ' + _USER_PROMPT_VOLUMES = ( + 'Please specify the identifier(s) of the {0:s} volume that should be ' + 'processed. Note that a range of volumes can be defined as: 3..5. ' 'Multiple volumes can be defined as: 1,3,5 (a list of comma separated ' 'values). Ranges and lists can also be combined as: 1,3..5. The first ' 'volume is 1. All volumes can be defined as "all". If no volumes are ' 'specified none will be processed. You can abort with Ctrl^C.') - _USER_PROMPT_PARTITIONS = ( - 'Please specify the identifier of the partition that should be ' - 'processed. All partitions can be defined as: "all". Note that you can ' - 'abort with Ctrl^C.') - - _USER_PROMPT_VSS = ( - 'Please specify the identifier(s) of the VSS that should be processed: ' - 'Note that a range of stores can be defined as: 3..5. Multiple stores ' - 'can be defined as: 1,3,5 (a list of comma separated values). Ranges ' - 'and lists can also be combined as: 1,3..5. The first store is 1. All ' - 'stores can be defined as "all". If no stores are specified none will ' - 'be processed. You can abort with Ctrl^C.') - def __init__(self, input_reader=None, output_writer=None): """Initializes a volume scanner mediator. @@ -626,7 +618,7 @@ def GetAPFSVolumeIdentifiers(self, volume_system, volume_identifiers): while True: self._output_writer.Write('\n') - lines = self._textwrapper.wrap(self._USER_PROMPT_APFS) + lines = self._textwrapper.wrap(self._USER_PROMPT_VOLUMES.format('APFS')) self._output_writer.Write('\n'.join(lines)) self._output_writer.Write('\n\nVolume identifier(s): ') @@ -667,7 +659,7 @@ def GetLVMVolumeIdentifiers(self, volume_system, volume_identifiers): while True: self._output_writer.Write('\n') - lines = self._textwrapper.wrap(self._USER_PROMPT_LVM) + lines = self._textwrapper.wrap(self._USER_PROMPT_VOLUMES.format('LVM')) self._output_writer.Write('\n'.join(lines)) self._output_writer.Write('\n\nVolume identifier(s): ') @@ -746,7 +738,7 @@ def GetVSSStoreIdentifiers(self, volume_system, volume_identifiers): while True: self._output_writer.Write('\n') - lines = self._textwrapper.wrap(self._USER_PROMPT_VSS) + lines = self._textwrapper.wrap(self._USER_PROMPT_SNAPSHOTS.format('VSS')) self._output_writer.Write('\n'.join(lines)) self._output_writer.Write('\n\nVSS identifier(s): ') diff --git a/dfvfs/lib/gpt_helper.py b/dfvfs/lib/gpt_helper.py deleted file mode 100644 index 16e7f8bc..00000000 --- a/dfvfs/lib/gpt_helper.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -"""Helper functions for GUID Partition Table (GPT) support.""" - - -def GPTPathSpecGetEntryIndex(path_spec): - """Retrieves the entry index from the path specification. - - Args: - path_spec (PathSpec): path specification. - - Returns: - int: entry index or None if not available. - """ - entry_index = getattr(path_spec, 'entry_index', None) - - if entry_index is None: - location = getattr(path_spec, 'location', None) - - if location is None or not location.startswith('/p'): - return None - - entry_index = None - try: - entry_index = int(location[2:], 10) - 1 - except ValueError: - pass - - if entry_index is None or entry_index < 0: - return None - - return entry_index diff --git a/dfvfs/vfs/gpt_file_entry.py b/dfvfs/vfs/gpt_file_entry.py index 59c78585..ef1549bf 100644 --- a/dfvfs/vfs/gpt_file_entry.py +++ b/dfvfs/vfs/gpt_file_entry.py @@ -3,7 +3,6 @@ from dfvfs.lib import definitions from dfvfs.lib import errors -from dfvfs.lib import gpt_helper from dfvfs.vfs import file_entry from dfvfs.vfs import gpt_directory @@ -14,14 +13,15 @@ class GPTFileEntry(file_entry.FileEntry): TYPE_INDICATOR = definitions.TYPE_INDICATOR_GPT def __init__( - self, resolver_context, file_system, path_spec, is_root=False, - is_virtual=False, vsgpt_partition=None): + self, resolver_context, file_system, path_spec, entry_index=None, + is_root=False, is_virtual=False, vsgpt_partition=None): """Initializes a file entry. Args: resolver_context (Context): resolver context. file_system (FileSystem): file system. path_spec (PathSpec): path specification. + entry_index (Optional[int]): GPT partition entry index or None. is_root (Optional[bool]): True if the file entry is the root file entry of the corresponding file system. is_virtual (Optional[bool]): True if the file entry is a virtual file @@ -39,6 +39,7 @@ def __init__( super(GPTFileEntry, self).__init__( resolver_context, file_system, path_spec, is_root=is_root, is_virtual=is_virtual) + self._entry_index = entry_index self._name = None self._vsgpt_partition = vsgpt_partition @@ -75,16 +76,18 @@ def _GetSubFileEntries(self): def name(self): """str: name of the file entry, without the full path.""" if self._name is None: - location = getattr(self.path_spec, 'location', None) - if location is not None: - self._name = self._file_system.BasenamePath(location) + self._name = '' + + # Prefer generating the name seeing that the GPT back-end supports + # aliases, such as "/p1" and "/gpt{GUID}". + if self._entry_index is not None: + gpt_entry_index = self._entry_index + 1 + self._name = f'p{gpt_entry_index:d}' else: - entry_index = getattr(self.path_spec, 'entry_index', None) - if entry_index is not None: - gpt_entry_index = entry_index + 1 - self._name = f'p{gpt_entry_index:d}' - else: - self._name = '' + location = getattr(self.path_spec, 'location', None) + if location is not None: + self._name = self._file_system.BasenamePath(location) + return self._name @property @@ -109,8 +112,7 @@ def GetParentFileEntry(self): Returns: GPTFileEntry: parent file entry or None if not available. """ - entry_index = gpt_helper.GPTPathSpecGetEntryIndex(self.path_spec) - if entry_index is None: + if self._entry_index is None: return None return self._file_system.GetRootFileEntry() diff --git a/dfvfs/vfs/gpt_file_system.py b/dfvfs/vfs/gpt_file_system.py index 765ae773..a9fdba54 100644 --- a/dfvfs/vfs/gpt_file_system.py +++ b/dfvfs/vfs/gpt_file_system.py @@ -5,7 +5,6 @@ from dfvfs.lib import definitions from dfvfs.lib import errors -from dfvfs.lib import gpt_helper from dfvfs.path import gpt_path_spec from dfvfs.resolver import resolver from dfvfs.vfs import file_system @@ -73,7 +72,7 @@ def FileEntryExistsByPathSpec(self, path_spec): Returns: bool: True if the file entry exists. """ - entry_index = gpt_helper.GPTPathSpecGetEntryIndex(path_spec) + entry_index = self.GetEntryIndexByPathSpec(path_spec) # The virtual root file has no corresponding partition entry index but # should have a location. @@ -83,6 +82,41 @@ def FileEntryExistsByPathSpec(self, path_spec): return self._vsgpt_volume.has_partition_with_identifier(entry_index) + def GetEntryIndexByPathSpec(self, path_spec): + """Retrieves the entry index for a path specification. + + Args: + path_spec (PathSpec): path specification. + + Returns: + int: entry index or None if not available. + """ + entry_index = getattr(path_spec, 'entry_index', None) + if entry_index is not None: + return entry_index + + location = getattr(path_spec, 'location', None) + if location is None: + return None + + entry_index = None + + if location[:5] == '/gpt{' and location[-1] == '}': + for vsgpt_partition in iter(self._vsgpt_volume.partitions): + if vsgpt_partition.identifier == location[5:-1]: + entry_index = vsgpt_partition.entry_index + + elif location[:2] == '/p': + try: + entry_index = int(location[2:], 10) - 1 + except ValueError: + pass + + if entry_index is None or entry_index < 0: + return None + + return entry_index + def GetFileEntryByPathSpec(self, path_spec): """Retrieves a file entry for a path specification. @@ -92,7 +126,7 @@ def GetFileEntryByPathSpec(self, path_spec): Returns: GPTFileEntry: a file entry or None if not available. """ - entry_index = gpt_helper.GPTPathSpecGetEntryIndex(path_spec) + entry_index = self.GetEntryIndexByPathSpec(path_spec) # The virtual root file has no corresponding partition entry index but # should have a location. @@ -108,7 +142,8 @@ def GetFileEntryByPathSpec(self, path_spec): if not self._vsgpt_volume.has_partition_with_identifier(entry_index): return None - return gpt_file_entry.GPTFileEntry(self._resolver_context, self, path_spec) + return gpt_file_entry.GPTFileEntry( + self._resolver_context, self, path_spec, entry_index=entry_index) def GetGPTPartitionByPathSpec(self, path_spec): """Retrieves a GPT partition for a path specification. @@ -119,9 +154,10 @@ def GetGPTPartitionByPathSpec(self, path_spec): Returns: pyvsgpt.partition: a GPT partition or None if not available. """ - entry_index = gpt_helper.GPTPathSpecGetEntryIndex(path_spec) + entry_index = self.GetEntryIndexByPathSpec(path_spec) if entry_index is None: return None + return self._vsgpt_volume.get_partition_by_identifier(entry_index) def GetGPTVolume(self): diff --git a/tests/vfs/gpt_file_entry.py b/tests/vfs/gpt_file_entry.py index 460c375e..0ae7f347 100644 --- a/tests/vfs/gpt_file_entry.py +++ b/tests/vfs/gpt_file_entry.py @@ -41,19 +41,19 @@ def tearDown(self): # vsgptinfo gpt.raw # # GUID Partition Table (GPT) information: - # Disk identifier : 25271092-82a1-4e85-9be8-2eb59926af3f + # Disk identifier : e86e657a-d840-4c09-afe3-a1a5f665cf44 # Bytes per sector : 512 # Number of partitions : 2 # # Partition: 1 - # Identifier : b6d37ab4-051f-4556-97d2-ad1f8a609644 + # Identifier : 1e25588c-27a9-4094-868c-2f257021f87b # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 # Type : 0x00 (Empty) # Offset : 1048576 (0x00100000) # Size : 65536 # # Partition: 2 - # Identifier : a03faa35-d9a1-4315-a644-681506850073 + # Identifier : 53d86ccf-3188-4b54-90d8-81866426b70a # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 # Type : 0x00 (Empty) # Offset : 2097152 (0x00200000) diff --git a/tests/vfs/gpt_file_system.py b/tests/vfs/gpt_file_system.py index 15bd99d0..82a1ebeb 100644 --- a/tests/vfs/gpt_file_system.py +++ b/tests/vfs/gpt_file_system.py @@ -36,23 +36,23 @@ def tearDown(self): # vsgptinfo gpt.raw # # GUID Partition Table (GPT) information: - # Disk identifier : 25271092-82a1-4e85-9be8-2eb59926af3f + # Disk identifier : e86e657a-d840-4c09-afe3-a1a5f665cf44 # Bytes per sector : 512 # Number of partitions : 2 # # Partition: 1 - # Identifier : b6d37ab4-051f-4556-97d2-ad1f8a609644 + # Identifier : 1e25588c-27a9-4094-868c-2f257021f87b # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 # Type : 0x00 (Empty) # Offset : 1048576 (0x00100000) - # Size : 65024 + # Size : 65536 # # Partition: 2 - # Identifier : a03faa35-d9a1-4315-a644-681506850073 + # Identifier : 53d86ccf-3188-4b54-90d8-81866426b70a # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 # Type : 0x00 (Empty) # Offset : 2097152 (0x00200000) - # Size : 65024 + # Size : 65536 def testOpenAndClose(self): """Test the open and close functionality.""" @@ -85,6 +85,12 @@ def testFileEntryExistsByPathSpec(self): parent=self._raw_path_spec) self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_GPT, + location='/gpt{1e25588c-27a9-4094-868c-2f257021f87b}', + parent=self._raw_path_spec) + self.assertTrue(file_system.FileEntryExistsByPathSpec(path_spec)) + path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_GPT, entry_index=9, parent=self._raw_path_spec) @@ -100,6 +106,8 @@ def testFileEntryExistsByPathSpec(self): parent=self._raw_path_spec) self.assertFalse(file_system.FileEntryExistsByPathSpec(path_spec)) + # TODO: add tests for GetEntryIndexByPathSpec + def testGetFileEntryByPathSpec(self): """Tests the GetFileEntryByPathSpec function.""" file_system = gpt_file_system.GPTFileSystem( @@ -132,6 +140,15 @@ def testGetFileEntryByPathSpec(self): self.assertIsNotNone(file_entry) self.assertEqual(file_entry.name, 'p1') + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_GPT, + location='/gpt{1e25588c-27a9-4094-868c-2f257021f87b}', + parent=self._raw_path_spec) + file_entry = file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNotNone(file_entry) + self.assertEqual(file_entry.name, 'p1') + path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_GPT, entry_index=9, parent=self._raw_path_spec) diff --git a/tests/volume/gpt_volume_system.py b/tests/volume/gpt_volume_system.py index 43e74de7..4a7ceae9 100644 --- a/tests/volume/gpt_volume_system.py +++ b/tests/volume/gpt_volume_system.py @@ -30,23 +30,23 @@ def setUp(self): # vsgptinfo gpt.raw # # GUID Partition Table (GPT) information: - # Disk identifier : e86e657a-d840-4c09-afe3-a1a5f665cf44 - # Bytes per sector : 512 - # Number of partitions : 2 + # Disk identifier : e86e657a-d840-4c09-afe3-a1a5f665cf44 + # Bytes per sector : 512 + # Number of partitions : 2 # # Partition: 1 - # Identifier : 1e25588c-27a9-4094-868c-2f257021f87b - # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 - # Type : 0x00 (Empty) - # Offset : 1048576 (0x00100000) - # Size : 65536 + # Identifier : 1e25588c-27a9-4094-868c-2f257021f87b + # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 + # Type : 0x00 (Empty) + # Offset : 1048576 (0x00100000) + # Size : 65536 # # Partition: 2 - # Identifier : 53d86ccf-3188-4b54-90d8-81866426b70a - # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 - # Type : 0x00 (Empty) - # Offset : 2097152 (0x00200000) - # Size : 65536 + # Identifier : 53d86ccf-3188-4b54-90d8-81866426b70a + # Type identifier : 0fc63daf-8483-4772-8e79-3d69d8477de4 + # Type : 0x00 (Empty) + # Offset : 2097152 (0x00200000) + # Size : 65536 def testIterateVolumes(self): """Test the iterate volumes functionality."""