diff --git a/dissect/target/filesystems/zip.py b/dissect/target/filesystems/zip.py index 6c4d33c9d8..742eca40ee 100644 --- a/dissect/target/filesystems/zip.py +++ b/dissect/target/filesystems/zip.py @@ -4,16 +4,21 @@ import stat import zipfile from datetime import datetime, timezone -from typing import BinaryIO, Optional +from typing import BinaryIO, Iterator from dissect.util.stream import BufferedStream -from dissect.target.exceptions import FileNotFoundError +from dissect.target.exceptions import ( + FileNotFoundError, + FilesystemError, + IsADirectoryError, + NotADirectoryError, + NotASymlinkError, +) from dissect.target.filesystem import ( Filesystem, FilesystemEntry, VirtualDirectory, - VirtualFile, VirtualFilesystem, ) from dissect.target.helpers import fsutil @@ -33,7 +38,7 @@ class ZipFilesystem(Filesystem): def __init__( self, fh: BinaryIO, - base: Optional[str] = None, + base: str | None = None, *args, **kwargs, ): @@ -52,12 +57,7 @@ def __init__( continue rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator) - - # NOTE: Normally we would check here if the member is a symlink or not - - entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry - file_entry = entry_cls(self, rel_name, member) - self._fs.map_file_entry(rel_name, file_entry) + self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member)) @staticmethod def _detect(fh: BinaryIO) -> bool: @@ -69,60 +69,87 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry: return self._fs.get(path, relentry=relentry) -class ZipFilesystemEntry(VirtualFile): +# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory +# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing +class ZipFilesystemEntry(VirtualDirectory): + fs: ZipFilesystem + entry: zipfile.ZipInfo + + def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo): + super().__init__(fs, path) + self.entry = entry + def open(self) -> BinaryIO: - """Returns file handle (file-like object).""" + if self.is_dir(): + raise IsADirectoryError(self.path) + + if self.is_symlink(): + return self._resolve().open() + try: return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size) except Exception: raise FileNotFoundError() - def readlink(self) -> str: - """Read the link if this entry is a symlink. Returns a string.""" - raise NotImplementedError() + def iterdir(self) -> Iterator[str]: + if not self.is_dir(): + raise NotADirectoryError(self.path) - def readlink_ext(self) -> FilesystemEntry: - """Read the link if this entry is a symlink. Returns a filesystem entry.""" - raise NotImplementedError() + yield from super(ZipFilesystemEntry, self._resolve()).iterdir() - def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: - """Return the stat information of this entry.""" - return self.lstat() + def scandir(self) -> Iterator[FilesystemEntry]: + if not self.is_dir(): + raise NotADirectoryError(self.path) - def lstat(self) -> fsutil.stat_result: - """Return the stat information of the given path, without resolving links.""" - # ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] - return fsutil.stat_result( - [ - stat.S_IFREG | 0o777, - self.entry.header_offset, - id(self.fs), - 1, - 0, - 0, - self.entry.file_size, - 0, - datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(), - 0, - ] - ) + yield from super(ZipFilesystemEntry, self._resolve()).scandir() + def is_dir(self, follow_symlinks: bool = True) -> bool: + try: + entry = self._resolve(follow_symlinks=follow_symlinks) + except FilesystemError: + return False -class ZipFilesystemDirectoryEntry(VirtualDirectory): - def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo): - super().__init__(fs, path) - self.entry = entry + if isinstance(entry, ZipFilesystemEntry): + return entry.entry.is_dir() + return isinstance(entry, VirtualDirectory) + + def is_file(self, follow_symlinks: bool = True) -> bool: + try: + entry = self._resolve(follow_symlinks=follow_symlinks) + except FilesystemError: + return False + + if isinstance(entry, ZipFilesystemEntry): + return not entry.entry.is_dir() + return False + + def is_symlink(self) -> bool: + return stat.S_ISLNK(self.entry.external_attr >> 16) + + def readlink(self) -> str: + if not self.is_symlink(): + raise NotASymlinkError() + return self.fs.zip.open(self.entry).read().decode() + + def readlink_ext(self) -> FilesystemEntry: + return FilesystemEntry.readlink_ext(self) def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: - """Return the stat information of this entry.""" - return self.lstat() + return self._resolve(follow_symlinks=follow_symlinks).lstat() def lstat(self) -> fsutil.stat_result: """Return the stat information of the given path, without resolving links.""" # ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] + mode = self.entry.external_attr >> 16 + + if self.entry.is_dir() and not stat.S_ISDIR(mode): + mode = stat.S_IFDIR | mode + elif not self.entry.is_dir() and not stat.S_ISREG(mode): + mode = stat.S_IFREG | mode + return fsutil.stat_result( [ - stat.S_IFDIR | 0o777, + mode, self.entry.header_offset, id(self.fs), 1, diff --git a/tests/filesystems/test_zip.py b/tests/filesystems/test_zip.py index 292350d010..9f70c84ce2 100644 --- a/tests/filesystems/test_zip.py +++ b/tests/filesystems/test_zip.py @@ -3,6 +3,7 @@ import pytest +from dissect.target.exceptions import IsADirectoryError, NotADirectoryError, NotASymlinkError from dissect.target.filesystems.zip import ZipFilesystem, ZipFilesystemEntry @@ -51,6 +52,14 @@ def _create_zip(prefix="", zip_dir=True): for i in range(100): zf.writestr(f"{prefix}dir/{i}", f"contents {i}") + symlink = zipfile.ZipInfo(f"{prefix}symlink_dir") + symlink.external_attr = 0o120777 << 16 + zf.writestr(symlink, "dir/") + + symlink = zipfile.ZipInfo(f"{prefix}symlink_file") + symlink.external_attr = 0o120777 << 16 + zf.writestr(symlink, "file_1") + zf.close() buf.seek(0) return buf @@ -99,22 +108,50 @@ def test_filesystems_zip(obj, base, request): fs = ZipFilesystem(fh, base) assert isinstance(fs, ZipFilesystem) - assert len(fs.listdir("/")) == 3 + assert len(fs.listdir("/")) == 5 assert fs.get("./file_1").open().read() == b"file 1 contents" assert fs.get("./file_2").open().read() == b"file 2 contents" + assert fs.get("./symlink_file").open().read() == b"file 1 contents" assert len(list(fs.glob("./dir/*"))) == 100 + assert len(list(fs.glob("./symlink_dir/*"))) == 100 zfile = fs.get("./file_1") zdir = fs.get("./dir") + zsymd = fs.get("./symlink_dir") + zsymf = fs.get("./symlink_file") assert zfile.is_file() assert not zfile.is_dir() assert not zfile.is_symlink() + with pytest.raises(NotADirectoryError): + list(zfile.iterdir()) + + with pytest.raises(NotADirectoryError): + next(zfile.scandir()) + + with pytest.raises(NotASymlinkError): + zfile.readlink() + assert zdir.is_dir() assert not zdir.is_file() assert not zdir.is_symlink() + assert len(list(zdir.iterdir())) == 100 + assert len(list(zdir.scandir())) == 100 + + with pytest.raises(IsADirectoryError): + zdir.open() + + assert zsymd.is_dir() + assert not zsymd.is_file() + assert zsymd.is_symlink() + assert zsymd.readlink() == "dir/" + + assert not zsymf.is_dir() + assert zsymf.is_file() + assert zsymf.is_symlink() + assert zsymf.readlink() == "file_1" file1 = zdir.get("1") assert file1.is_file() @@ -122,7 +159,9 @@ def test_filesystems_zip(obj, base, request): assert not file1.is_symlink() assert file1.open().read() == b"contents 1" - assert zfile.stat().st_mode == 0o100777 + assert file1.stat() == zsymd.get("1").stat() + + assert zfile.stat().st_mode == 0o100600 assert zfile.stat(follow_symlinks=False) == zfile.lstat() if isinstance(zdir, ZipFilesystemEntry):