From 518417784ee1bdc60afa6067d27286b3252698a2 Mon Sep 17 00:00:00 2001 From: snake-biscuits <36507175+snake-biscuits@users.noreply.github.com> Date: Fri, 22 Nov 2024 14:24:58 +1000 Subject: [PATCH] (archives)(#197) `sega.Gdi` external files & revisiting `sega.GDRom` --- bsp_tool/archives/cdrom.py | 12 ++-- bsp_tool/archives/sega.py | 141 +++++++++++++++++++++++-------------- 2 files changed, 98 insertions(+), 55 deletions(-) diff --git a/bsp_tool/archives/cdrom.py b/bsp_tool/archives/cdrom.py index 830fd1ba..bfa2837e 100644 --- a/bsp_tool/archives/cdrom.py +++ b/bsp_tool/archives/cdrom.py @@ -283,9 +283,9 @@ def __str__(self) -> str: def from_stream(cls, stream: io.BytesIO) -> PrimaryVolumeDescriptor: out = cls() type_code = binary.read_struct(stream, "B") - assert type_code == 0x01, type_code # 0x01: Primary, 0x02: Supplementary, 0x03: Partition + # 0x01: Primary, 0x02: Supplementary, 0x03: Partition magic = binary.read_struct(stream, "5s") - assert magic == b"CD001" + assert (type_code, magic) == (0x01, b"CD001"), "not a PVD: 0x{type_code:02X} {magic}" version = binary.read_struct(stream, "H") # technically uint8 + 1 char pad assert version == 0x0001 out.system = read_strA(stream, 32) @@ -393,6 +393,8 @@ def path_records(self, path_index: int) -> List[Directory]: return records def listdir(self, search_folder: str) -> List[str]: + if search_folder in (".", "./"): + search_folder = "/" # valid root # NOTE: search_folder is case sensitive records = self.folder_records(search_folder) assert records[0].name == "." @@ -416,10 +418,12 @@ def read(self, filename: str) -> bytes: assert filename in records, "file not found" record = records[filename] assert record.is_file, "f{filename!r} is not a file" - if record.data_interleaved_unit_size != 0 or record.data.interleaved_gap_size != 0: + if record.interleaved_unit_size != 0 or record.interleaved_gap_size != 0: raise NotImplementedError("cannot read interleaved file") self.seek(record.data_lba) - return self.disc.read(record.data_size) + data = self.disc.read(record.data_size) + assert len(data) == record.data_size, "unexpected EOF" + return data def seek(self, lba: int) -> int: true_lba = lba + self.lba_offset diff --git a/bsp_tool/archives/sega.py b/bsp_tool/archives/sega.py index 9c7c1bc9..a94b3cd3 100644 --- a/bsp_tool/archives/sega.py +++ b/bsp_tool/archives/sega.py @@ -5,10 +5,13 @@ import enum import io import os -from typing import List +from typing import Dict, List +from .. import external from . import base from . import cdrom +from . import golden_hawk +from . import mame from . import padus @@ -51,52 +54,54 @@ def from_line(cls, line: str) -> GdiTrack: class Gdi(base.Archive): ext = "*.gdi" - folder: str - filename: str + extras: Dict[str, external.File] tracks: List[GdiTrack] def __init__(self, filename: str = None): + self.extras = dict() self.tracks = list() - if filename is not None: - self.folder, self.filename = os.path.split(filename) def __repr__(self) -> str: - descriptor = f"{self.filename!r} {len(self.tracks)} tracks" + descriptor = f"{len(self.tracks)} tracks" return f"<{self.__class__.__name__} {descriptor} @ 0x{id(self):016X}>" + def extra_patterns(self) -> List[str]: + return self.namelist() + def namelist(self) -> List[str]: return [track.filename for track in self.tracks] def read(self, filename: str) -> bytes: - assert filename in self.namelist() + assert filename in self.namelist(), "unrelated file" + assert filename in self.extras, "couldn't find file" track = {track.filename: track for track in self.tracks}[filename] - with open(os.path.join(self.folder, filename), "rb") as track_file: - if track.type == GdiTrackType.BINARY: - # based on sector conversion code from padus.Cdi.read - sectors = list() - # NOTE: Mode1 2352, not Mode2 like in .cdi? - header_length_for_size = {2048: 0, 2352: 16} - header_length = header_length_for_size[track.sector_size] + track_file = self.extras[filename] + track_file.seek(0) + if track.type == GdiTrackType.BINARY: + # based on sector conversion code from padus.Cdi.read + sectors = list() + # NOTE: Mode1 2352, not Mode2 like in .cdi? + header_length_for_size = {2048: 0, 2352: 16} + header_length = header_length_for_size[track.sector_size] + raw_sector = track_file.read(track.sector_size) + while len(raw_sector) != 0: + assert len(raw_sector) == track.sector_size + sector = raw_sector[header_length:] + sector = sector[:2048] + sectors.append(sector) raw_sector = track_file.read(track.sector_size) - while len(raw_sector) != 0: - assert len(raw_sector) == track.sector_size - sector = raw_sector[header_length:] - sector = sector[:2048] - sectors.append(sector) - raw_sector = track_file.read(track.sector_size) - return b"".join(sectors) - else: - return track_file.read() + return b"".join(sectors) + else: + return track_file.read() @classmethod - def from_file(cls, filename: str) -> Gdi: - out = cls(filename) - with open(filename, "r") as gdi_file: - num_tracks = int(gdi_file.readline()) - for line in gdi_file: - track = GdiTrack.from_line(line) - out.tracks.append(track) - assert len(out.tracks) == num_tracks + def from_stream(cls, stream: io.BytesIO) -> Gdi: + out = cls() + num_tracks = int(stream.readline().decode()) + for line in stream: + track = GdiTrack.from_line(line.decode()) + out.tracks.append(track) + assert len(out.tracks) == num_tracks return out @@ -183,6 +188,24 @@ def namelist(self) -> List[str]: def read(self, filename: str) -> bytes: return self.data_area.read(filename) + @classmethod + def from_archive(cls, parent_archive: base.Archive, filename: str) -> GDRom: + ext = os.path.splitext(filename.lower())[-1] + if ext == ".cdi": + return cls.from_cdi(padus.Cdi.from_archive(parent_archive, filename)) + elif ext == ".chd": + return cls.from_chd(mame.Chd.from_archive(parent_archive, filename)) + elif ext == ".cue": + return cls.from_cue(golden_hawk.Cue.from_archive(parent_archive, filename)) + elif ext == ".gdi": + return cls.from_gdi(Gdi.from_archive(parent_archive, filename)) + else: + raise RuntimeError(f"Unsupported file extension: {ext}") + + @classmethod + def from_bytes(cls, raw_gdrom: bytes) -> GDRom: + raise NotImplementedError("cannot identify disc image format") + @classmethod def from_cdi(cls, cdi: padus.Cdi) -> GDRom: # validate our assumptions @@ -203,19 +226,46 @@ def from_cdi(cls, cdi: padus.Cdi) -> GDRom: # TODO: save session[0]'s tracks to self.soundtrack or something & discard the Cdi return out + @classmethod + def from_chd(cls, chd: mame.Chd) -> GDRom: + raise NotImplementedError("mame.Chd is incomplete") + + @classmethod + def from_cue(cls, cue: golden_hawk.Cue) -> GDRom: + raise NotImplementedError("golden_hawk.Cue is incomplete") + + @classmethod + def from_file(cls, filename: str) -> GDRom: + ext = os.path.splitext(filename.lower())[-1] + if ext == ".cdi": + return cls.from_cdi(padus.Cdi.from_file(filename)) + elif ext == ".chd": + return cls.from_chd(mame.Chd.from_file(filename)) + elif ext == ".cue": + return cls.from_cue(golden_hawk.Cue.from_file(filename)) + elif ext == ".gdi": + return cls.from_gdi(Gdi.from_file(filename)) + else: + raise RuntimeError(f"Unsupported file extension: {ext}") + @classmethod def from_gdi(cls, gdi: Gdi) -> GDRom: - data_track = gdi.tracks[-1] - # validate our assumptions - assert data_track.type == GdiTrackType.BINARY + data_tracks = [ + track + for track in gdi.tracks + if track.start_lba == 45000 + and track.type == GdiTrackType.BINARY] + data_track = data_tracks[0] + # NOTE: if data_track isn't the last track we might not have access to files + # -- it might help if cdrom.Iso read sectors from the Gdi directly + # -- would simplify the math a lot too, and potential save on memory + # -- because the alternative is converting the whole GD-ROM area to bytes + # -- potentially wasting multiple empty sectors worth of space + # NOTE: data_track should at least map the filesystem out = cls() - # NOTE: .gdi files might break up the data area into multiple tracks - # -- maybe we can add controls for merging tracks in that case? out.data_area = cdrom.Iso.from_bytes(gdi.read(data_track.filename), 0x8000, -data_track.start_lba) # NOTE: we assuming there's a PVD at the default address (seaching for a PVD is slow) # -- other PVDs might be present - # -- quakeiii.cdi has a 2nd PVD (thinks it starts at LBA 16 like a normal CD) - # -- iso_2 = cdrom.Iso.from_bytes(cdi.read("1.0.iso"), 0x9B000, (0x9B000 // 0x800) - 16) out.data_area.disc.seek(0) # boot header out.header = Header.from_stream(out.data_area.disc) out.gdi = gdi # DEBUG @@ -224,16 +274,5 @@ def from_gdi(cls, gdi: Gdi) -> GDRom: return out @classmethod - def from_file(cls, filename: str) -> GDRom: - ext = os.path.splitext(filename.lower())[-1] - if ext == ".cdi": - return cls.from_cdi(padus.Cdi.from_file(filename)) - elif ext == ".chd": - # TODO: mame.Chd - raise NotImplementedError() - elif ext == ".cue": - raise NotImplementedError() - elif ext == ".gdi": - return cls.from_gdi(Gdi.from_file(filename)) - else: - raise RuntimeError(f"Unsupported file extension: {ext}") + def from_stream(cls, stream: io.BytesIO) -> GDRom: + raise NotImplementedError("cannot identify disc image format")