From 8bb63b80d1f5ca30d0247c2b49170ce9621e82f1 Mon Sep 17 00:00:00 2001 From: Damien Thenot Date: Mon, 13 Jan 2025 17:23:48 +0100 Subject: [PATCH] In Progress Signed-off-by: Damien Thenot Co-authored-by: Guillaume --- drivers/cowutil.py | 4 + drivers/qcow2util.py | 602 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 606 insertions(+) create mode 100644 drivers/qcow2util.py diff --git a/drivers/cowutil.py b/drivers/cowutil.py index 000d47a4a..416cfa6a5 100755 --- a/drivers/cowutil.py +++ b/drivers/cowutil.py @@ -314,8 +314,12 @@ def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str: def getCowUtil(vdi_type: str) -> CowUtil: import vhdutil + import qcow2util if getImageFormatFromVdiType(vdi_type) in (ImageFormat.RAW, ImageFormat.VHD): return vhdutil.VhdUtil() + if getImageFormatFromVdiType(vdi_type) in (ImageFormat.QCOW2): + return qcow2util.QCowUtil() + assert False, f"Unsupported VDI type: {vdi_type}" diff --git a/drivers/qcow2util.py b/drivers/qcow2util.py new file mode 100644 index 000000000..448bc53a4 --- /dev/null +++ b/drivers/qcow2util.py @@ -0,0 +1,602 @@ +from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union, override, BinaryIO + +import errno +import time +import struct + +import util + +from pathlib import Path + +from cowutil import CowUtil, CowImageInfo + +MAX_QCOW_CHAIN_LENGTH: Final = 30 + +QEMU_IMG: Final = "/usr/bin/qemu-img" + + +class QCowUtil(CowUtil): + + # We followed specifications found here: + # https://github.com/qemu/qemu/blob/master/docs/interop/qcow2.txt + + QCOW2_MAGIC = 0x514649FB # b"QFI\xfb": Magic number for QCOW2 files + QCOW2_HEADER_SIZE = 104 # In fact the last information we need is at offset 40-47 + QCOW2_L2_SIZE = 65536 + QCOW2_BACKING_FILE_OFFSET = 8 + + ALLOCATED_ENTRY_BIT = ( + 0x8000_0000_0000_0000 # Bit 63 is the allocated bit for standard cluster + ) + CLUSTER_TYPE_BIT = 0x4000_0000_0000_0000 # 0 for standard, 1 for compressed cluster + L2_OFFSET_MASK = 0x00FF_FFFF_FFFF_FF00 # Bits 9-55 are offset of L2 table. + CLUSTER_DESCRIPTION_MASK = 0x3FFF_FFFF_FFFF_FFFF # Bit 0-61 is cluster description + STANDARD_CLUSTER_OFFSET_MASK = ( + 0x00FF_FFFF_FFFF_FF00 # Bits 9-55 are offset of standard cluster + ) + + def __init__(self): + self.qcow_read = False + + def _read_qcow2(self, path: str): + with open(path, "rb") as qcow2_file: + self.filename = path # Keep the filename if clean is called + self.header = self._read_qcow2_header(qcow2_file) + self.l1 = self._get_l1_entries(qcow2_file) + # The l1_to_l2 allows to get L2 entries for a given L1. If L1 entry + # is not allocated we store an empty list. + self.l1_to_l2: Dict[int, List[int]] = {} + + for l1_entry in self.l1: + l2_offset = l1_entry & self.L2_OFFSET_MASK + if l2_offset == 0: + self.l1_to_l2[l1_entry] = [] + else: + self.l1_to_l2[l1_entry] = self._get_l2_entries( + qcow2_file, l2_offset + ) + self.qcow_read = True + + def _get_l1_entries(self, file: BinaryIO) -> List[int]: + """Returns the list of all L1 entries. + + Args: + file: The qcow2 file object. + + Returns: + list: List of all L1 entries + """ + l1_table_offset = self.header["l1_table_offset"] + file.seek(l1_table_offset) + + l1_table_size = self.header["l1_size"] * 8 # Each L1 entry is 8 bytes + l1_table = file.read(l1_table_size) + + return [ + struct.unpack(">Q", l1_table[i : i + 8])[0] + for i in range(0, len(l1_table), 8) + ] + + @staticmethod + def _get_l2_entries(file: BinaryIO, l2_offset: int) -> List[int]: + """Returns the list of all L2 entries at a given L2 offset. + + Args: + file: The qcow2 file. + l2_offset: the L2 offset where to look for entries + + Returns: + list: List of all L2 entries + """ + # The size of L2 is 65536 bytes and each entry is 8 bytes. + file.seek(l2_offset) + l2_table = file.read(QCowUtil.QCOW2_L2_SIZE) + + return [ + struct.unpack(">Q", l2_table[i : i + 8])[0] + for i in range(0, len(l2_table), 8) + ] + + @staticmethod + def _read_qcow2_backingfile(file: BinaryIO, backing_file_offset: int , backing_file_size: int) -> str: + if backing_file_offset == 0: + return "" + + file.seek(backing_file_offset) + parent_name = file.read(backing_file_size) + return parent_name.decode("UTF-8") + + @staticmethod + def _read_qcow2_header(file: BinaryIO) -> Dict[str, Any]: + """Returns a dict containing some information from QCow2 header. + + Args: + file: The qcow2 file object. + + Returns: + dict: magic, version, cluster_bits, l1_size and l1_table_offset. + + Raises: + ValueError: if qcow2 magic is not recognized or cluster size not supported. + """ + # The header is as follow: + # + # magic: u32, // Magic string "QFI\xfb" + # version: u32, // Version (2 or 3) + # backing_file_offset: u64, // Offset to the backing file name + # backing_file_size: u32, // Size of the backing file name + # cluster_bits: u32, // Bits used for addressing within a cluster + # size: u64, // Virtual disk size + # crypt_method: u32, // 0 = no encryption, 1 = AES encryption + # l1_size: u32, // Number of entries in the L1 table + # l1_table_offset: u64, // Offset to the active L1 table + # refcount_table_offset: u64, // Offset to the refcount table + # refcount_table_clusters: u32, // Number of clusters for the refcount table + # nb_snapshots: u32, // Number of snapshots in the image + # snapshots_offset: u64, // Offset to the snapshot table + + file.seek(0) + header = file.read(QCowUtil.QCOW2_HEADER_SIZE) + ( + magic, + version, + backing_file_offset, + backing_file_size, + cluster_bits, + size, + _, + l1_size, + l1_table_offset, + refcount_table_offset, + _, + _, + snapshots_offset, + ) = struct.unpack(">IIQIIQIIQQIIQ", header[:72]) + + if magic != QCowUtil.QCOW2_MAGIC: + raise ValueError("Not a valid QCOW2 file") + + if cluster_bits != 16: + raise ValueError("Only default cluster size of 64K is supported") + + parent_name = QCowUtil._read_qcow2_backingfile(file, backing_file_offset, backing_file_size) + + return { + "version": version, + "backing_file_offset": backing_file_offset, + "backing_file_size": backing_file_size, + "virtual_disk_size": size, + "cluster_bits": cluster_bits, + "l1_size": l1_size, + "l1_table_offset": l1_table_offset, + "refcount_table_offset": refcount_table_offset, + "snapshots_offset": snapshots_offset, + "parent": parent_name, + } + + @staticmethod + def _is_l1_allocated(entry: int) -> bool: + """Checks if the given L1 entry is allocated. + + If the offset is 0 then the L2 table and all clusters described + by this L2 table are unallocated. + + Args: + entry: L1 entry + + Returns: + bool: True if the L1 entry is allocated (ie has a valid offset). + False otherwise. + """ + return (entry & QCowUtil.L2_OFFSET_MASK) != 0 + + @staticmethod + def _is_l2_allocated(entry: int) -> bool: + """Checks if a given entry is allocated. + + Currently we only support standard clusters. And for standard clusters + the bit 63 is set to 1 for allocated ones or offset is not 0. + + Args: + entry: L2 entry + + Returns: + bool: Returns True if the L2 entry is allocated, False otherwise + + Raises: + raise an exception if the cluster is not a standard one. + """ + assert entry & QCowUtil.CLUSTER_TYPE_BIT == 0 + return (entry & QCowUtil.ALLOCATED_ENTRY_BIT != 0) or ( + entry & QCowUtil.STANDARD_CLUSTER_OFFSET_MASK != 0 + ) + + @staticmethod + def _get_allocated_clusters(l2_entries: List[int]) -> List[int]: + """Get all allocated clusters in a given list of L2 entries. + + Args: + l2_entries: A list of L2 entries. + + Returns: + A list of all allocated entries + """ + return [entry for entry in l2_entries if QCowUtil._is_l2_allocated(entry)] + + @staticmethod + def _get_cluster_to_byte(clusters: int, cluster_bits: int) -> int: + # (1 << cluster_bits) give cluster size in byte + return clusters * (1 << cluster_bits) + + def _get_number_of_allocated_clusters(self) -> int: + """Get the number of allocated clusters. + + Args: + self: A QcowInfo object. + + Returns: + An integer that is the list of allocated clusters. + """ + assert(self.qcow_read) + + allocated_clusters = 0 + + for l2_entries in self.l1_to_l2.values(): + allocated_clusters += len(self._get_allocated_clusters(l2_entries)) + + return allocated_clusters + + @staticmethod + def _move_backing_file( + f: BinaryIO, old_offset: int, new_offset: int, data_size: int + ) -> None: + """Move a number of bytes from old_offset to new_offset and replaces the old + value by 0s. It is up to the caller to save the current position in the file + if needed. + + Args: + f: the file the will be modified + old_offset: the current offset + new_offset: the new offset where we want to move data + data_size: Size in bytes of data that we want to move + + Returns: + Nothing but the file f is modified and the position in the file also. + """ + # Read the string at backing_file_offset + f.seek(old_offset) + data = f.read(data_size) + + # Write zeros at the original location + f.seek(old_offset) + f.write(b"\x00" * data_size) + + # Write the string to the new location + f.seek(new_offset) + f.write(data) + + def _add_or_find_custom_header(self) -> int: + """Add custom header at the end of header extensions + + It finds the end of the header extensions and add the custom header. + If the header already exists nothing is done. + + Args: + + Returns: + It returns the data offset where custom header is found or created. + If data offset is 0 something weird happens. + The qcow2 file in self.filename can be modified. + """ + assert self.qcow_read + + header_length = 72 # This is the default value for version 2 images + + custom_header_type = 0x76617465 # vate: it is easy to recognize with hexdump -C + custom_header_length = 8 + custom_header_data = 0 + # We don't need padding because we are already aligned + custom_header = struct.pack( + ">IIQ", custom_header_type, custom_header_length, custom_header_data + ) + + with open(self.filename, "rb+") as qcow2_file: + if self.header["version"] == 3: + qcow2_file.seek(100) # 100 is the offset of header_length + header_length = int.from_bytes(qcow2_file.read(4)) + + # After the image header we found Header extension. So we need to find the end of + # the header extension area and add our custom header. + qcow2_file.seek(header_length) + + custom_data_offset = 0 + + while True: + ext_type = int.from_bytes(qcow2_file.read(4)) + ext_len = int.from_bytes(qcow2_file.read(4)) + + if ext_type == custom_header_type: + # A custom header is already there + custom_data_offset = qcow2_file.tell() + break + + if ext_type == 0x00000000: + # End mark found. If we found the end mark it means that we didn't find + # the custom header. So we need to add it. + custom_data_offset = qcow2_file.tell() + + # We will overwrite the end marker so rewind a little bit to + # write the new type extension and the new length. But if there is + # a backing file we need to move it to make some space. + if self.header["backing_file_offset"]: + # Keep current position + saved_pos = qcow2_file.tell() + + bf_offset = self.header["backing_file_offset"] + bf_size = self.header["backing_file_size"] + bf_new_offset = bf_offset + len(custom_header) + self._move_backing_file( + qcow2_file, bf_offset, bf_new_offset, bf_size + ) + + # Update the header to match the new backing file offset + self.header["backing_file_offset"] = bf_new_offset + qcow2_file.seek(self.QCOW2_BACKING_FILE_OFFSET) + qcow2_file.write(struct.pack(">Q", bf_new_offset)) + + # Restore saved position + qcow2_file.seek(saved_pos) + + qcow2_file.seek(-8, 1) + qcow2_file.write(custom_header) + break + + # Round up the header extension size to the next multiple of 8 + ext_len = (ext_len + 7) & 0xFFFFFFF8 + qcow2_file.seek(ext_len, 1) + + return custom_data_offset + + # ---- + # Implementation of CowUtil + # ---- + + @override + def getMinImageSize(self) -> int: #TODO: Minimum size of an image in byte + pass + + @override + def getMaxImageSize(self) -> int: #TODO: Maximum size of an image in byte + pass + + @override + def getBlockSize(self, path: str) -> int: #TODO: Blocksize of the image in byte + pass + + @override + def getFooterSize(self, path: str) -> int: #TODO: in byte + pass + + @override + def getMaxChainLength(self) -> int: + return MAX_QCOW_CHAIN_LENGTH + + @override + def calcOverheadEmpty(self, virtual_size: int) -> int: + pass + + @override + def calcOverheadBitmap(self, virtual_size: int) -> int: + pass + + @override + def getInfo( + self, + path: str, + extractUuidFunction: Callable[[str], str], + includeParent: bool = True, + resolveParent: bool = True, + useBackupFooter: bool = False + ) -> CowImageInfo: + self._read_qcow2(path) + uuid = extractUuidFunction(path) + cowinfo = CowImageInfo(uuid) + cowinfo.path = Path(path).name + cowinfo.sizeVirt = self.header["virtual_disk_size"] + cowinfo.sizePhys = self.getSizePhys(path) + cowinfo.hidden = self.get_hidden() + cowinfo.sizeAllocated = self._get_cluster_to_byte(self._get_number_of_allocated_clusters(), self.header["cluster_bits"]) + if includeParent: + parent_path = self.header["parent"] + if parent_path != "": + cowinfo.parentPath = parent_path + cowinfo.parentUuid = extractUuidFunction(parent_path) + cowinfo.error = 0 + + return cowinfo + + @override + def getInfoFromLVM( + self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str + ) -> Optional[CowImageInfo]: + pass + + @override + def getAllInfoFromVG( + self, + pattern: str, + extractUuidFunction: Callable[[str], str], + vgName: Optional[str] = None, + parents: bool = False, + exitOnError: bool = False + ) -> Dict[str, CowImageInfo]: + pass + + @override + def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]: + parent = self.getParentNoCheck(path) + if parent: + return extractUuidFunction(parent) + return None + + @override + def getParentNoCheck(self, path: str) -> Optional[str]: + self._read_qcow2(path) + parent_path = self.header["parent"] + if parent_path == "": + return None + return parent_path + + @override + def hasParent(self, path: str) -> bool: + if self.getParentNoCheck(path): + return True + return False + + @override + def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None: + pass + + @override + def getHidden(self, path: str) -> bool: + """Get hidden property according to the value b + + Args: + + Returns: + True if hidden is set, False otherwise + """ + self._read_qcow2(path) + custom_data_offset = self._add_or_find_custom_header() + if custom_data_offset == 0: + print("ERROR: Custom data offset not found... should not reach this") + return False + + with open(path, "rb") as qcow2_file: + qcow2_file.seek(custom_data_offset) + hidden = qcow2_file.read(1) + if hidden == b"\x00": + return False + + return True + + @override + def setHidden(self, path: str, hidden: bool = True) -> None: + """Set hidden property according to the value b + + Args: + bool: True if you want to set the property. False otherwise + + Returns: + nothing. If the custom headers is not found it is created so the + qcow file can be modified. + """ + self._read_qcow2(path) + custom_data_offset = self._add_or_find_custom_header() + if custom_data_offset == 0: + util.SMlog("ERROR: Custom data offset not found... should not reach this") + return #TODO: Add exception + + with open(self.filename, "rb+") as qcow2_file: + qcow2_file.seek(custom_data_offset) + if hidden: + qcow2_file.write(b"\x01") + else: + qcow2_file.write(b"\x00") + + @override + def getSizeVirt(self, path: str) -> int: + pass + + @override + def setSizeVirt(self, path: str, size: int, jFile: str) -> None: + pass + + @override + def setSizeVirtFast(self, path: str, size: int) -> None: + pass + + @override + def getMaxResizeSize(self, path: str) -> int: + pass + + @override + def getSizePhys(self, path: str) -> int: + cmd = ["du", "-b", path] #TODO: use os.stat instead since it won't work with a block device + ret = self._ioretry(cmd) + return ret.split()[0] + + @override + def setSizePhys(self, path: str, size: int, debug: bool = True) -> None: + pass + + @override + def getAllocatedSize(self, path: str) -> int: + self._read_qcow2(path) + clusters = self._get_number_of_allocated_clusters() + cluster_bits = self.header["cluster_bits"] + return self._get_cluster_to_byte(clusters, cluster_bits) + + @override + def getResizeJournalSize(self) -> int: + pass + + @override + def killData(self, path: str) -> None: + pass + + @override + def getDepth(self, path: str) -> int: + pass + + @override + def getBlockBitmap(self, path: str) -> bytes: + pass + + @override + def coalesce(self, path: str) -> int: + pass + + @override + def create(self, path: str, size: int, static: bool, msize: int = 0) -> None: + pass + + @override + def snapshot( + self, + path: str, + parent: str, + parentRaw: bool, + msize: int = 0, + checkEmpty: Optional[bool] = True + ) -> None: + pass + + @override + def check( + self, + path: str, + ignoreMissingFooter: Optional[bool] = False, + fast: Optional[bool] = False + ) -> CheckResult: + pass + + @override + def revert(self, path: str, jFile: str) -> None: + pass + + @override + def repair(self, path: str) -> None: + pass + + @override + def validateAndRoundImageSize(self, size: int) -> int: + pass + + @override + def getKeyHash(self, path: str) -> Optional[str]: + pass + + @override + def setKey(self, path: str, key_hash: str) -> None: + pass \ No newline at end of file