Skip to content

Commit

Permalink
Add symlink support to ZipFilesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Aug 12, 2024
1 parent ac8ee68 commit c55e65e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 61 deletions.
127 changes: 81 additions & 46 deletions dissect/target/filesystems/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
import stat
import zipfile
from datetime import datetime, timezone
from typing import BinaryIO, Optional
from typing import BinaryIO, Iterator

from dissect.util.stream import BufferedStream

from dissect.target.exceptions import FileNotFoundError
from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
VirtualDirectory,
VirtualFile,
VirtualFilesystem,
)
from dissect.target.helpers import fsutil
Expand All @@ -33,7 +38,7 @@ class ZipFilesystem(Filesystem):
def __init__(
self,
fh: BinaryIO,
base: Optional[str] = None,
base: str | None = None,
*args,
**kwargs,
):
Expand All @@ -52,12 +57,7 @@ def __init__(
continue

rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator)

# NOTE: Normally we would check here if the member is a symlink or not

entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry
file_entry = entry_cls(self, rel_name, member)
self._fs.map_file_entry(rel_name, file_entry)
self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member))

@staticmethod
def _detect(fh: BinaryIO) -> bool:
Expand All @@ -69,60 +69,95 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry:
return self._fs.get(path, relentry=relentry)


class ZipFilesystemEntry(VirtualFile):
# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory
# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing
class ZipFilesystemEntry(VirtualDirectory):
fs: ZipFilesystem
entry: zipfile.ZipInfo

def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry

def open(self) -> BinaryIO:
"""Returns file handle (file-like object)."""
if self.is_dir():
raise IsADirectoryError(self.path)

if self.is_symlink():
return self._resolve().open()

try:
return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size)
except Exception:
raise FileNotFoundError()
raise FileNotFoundError(self.path)

Check warning on line 92 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L92

Added line #L92 was not covered by tests

def readlink(self) -> str:
"""Read the link if this entry is a symlink. Returns a string."""
raise NotImplementedError()
def iterdir(self) -> Iterator[str]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def readlink_ext(self) -> FilesystemEntry:
"""Read the link if this entry is a symlink. Returns a filesystem entry."""
raise NotImplementedError()
entry = self._resolve()
if isinstance(entry, ZipFilesystemEntry):
yield from super(ZipFilesystemEntry, entry).iterdir()
else:
yield from entry.iterdir()

Check warning on line 102 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L102

Added line #L102 was not covered by tests

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
def scandir(self) -> Iterator[FilesystemEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
return fsutil.stat_result(
[
stat.S_IFREG | 0o777,
self.entry.header_offset,
id(self.fs),
1,
0,
0,
self.entry.file_size,
0,
datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(),
0,
]
)
entry = self._resolve()
if isinstance(entry, ZipFilesystemEntry):
yield from super(ZipFilesystemEntry, entry).scandir()
else:
yield from entry.scandir()

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 118 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L117-L118

Added lines #L117 - L118 were not covered by tests

class ZipFilesystemDirectoryEntry(VirtualDirectory):
def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry
if isinstance(entry, ZipFilesystemEntry):
return entry.entry.is_dir()
return isinstance(entry, VirtualDirectory)

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 128 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L127-L128

Added lines #L127 - L128 were not covered by tests

if isinstance(entry, ZipFilesystemEntry):
return not entry.entry.is_dir()
return False

def is_symlink(self) -> bool:
return stat.S_ISLNK(self.entry.external_attr >> 16)

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()
return self.fs.zip.open(self.entry).read().decode()

def readlink_ext(self) -> FilesystemEntry:
return FilesystemEntry.readlink_ext(self)

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
mode = self.entry.external_attr >> 16

if self.entry.is_dir() and not stat.S_ISDIR(mode):
mode = stat.S_IFDIR | mode

Check warning on line 154 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L154

Added line #L154 was not covered by tests
elif not self.entry.is_dir() and not stat.S_ISREG(mode):
mode = stat.S_IFREG | mode

return fsutil.stat_result(
[
stat.S_IFDIR | 0o777,
mode,
self.entry.header_offset,
id(self.fs),
1,
Expand Down
73 changes: 58 additions & 15 deletions tests/filesystems/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

import pytest

from dissect.target.exceptions import (
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystems.zip import ZipFilesystem, ZipFilesystemEntry


def _mkdir(zf, name):
def _mkdir(zf: zipfile.ZipFile, name: str) -> None:
# There's no easy way to make a directory with Python zipfile, so copy what zipfile.py does
zinfo = zipfile.ZipInfo(name)
zinfo.compress_size = 0
Expand All @@ -32,7 +37,7 @@ def _mkdir(zf, name):
zf.start_dir = zf.fp.tell()


def _create_zip(prefix="", zip_dir=True):
def _create_zip(prefix: str = "", zip_dir: bool = True) -> io.BytesIO:
buf = io.BytesIO()
zf = zipfile.ZipFile(buf, "w")

Expand All @@ -51,34 +56,42 @@ def _create_zip(prefix="", zip_dir=True):
for i in range(100):
zf.writestr(f"{prefix}dir/{i}", f"contents {i}")

symlink = zipfile.ZipInfo(f"{prefix}symlink_dir")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "dir/")

symlink = zipfile.ZipInfo(f"{prefix}symlink_file")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "file_1")

zf.close()
buf.seek(0)
return buf


@pytest.fixture
def zip_simple():
yield _create_zip()
def zip_simple() -> io.BytesIO:
return _create_zip()


@pytest.fixture
def zip_base():
yield _create_zip("base/")
def zip_base() -> io.BytesIO:
return _create_zip("base/")


@pytest.fixture
def zip_relative():
yield _create_zip("./", False)
def zip_relative() -> io.BytesIO:
return _create_zip("./", False)


@pytest.fixture
def zip_relative_dir():
yield _create_zip("./")
def zip_relative_dir() -> io.BytesIO:
return _create_zip("./")


@pytest.fixture
def zip_virtual_dir():
yield _create_zip("", False)
def zip_virtual_dir() -> io.BytesIO:
return _create_zip("", False)


@pytest.mark.parametrize(
Expand All @@ -91,38 +104,68 @@ def zip_virtual_dir():
("zip_virtual_dir", ""),
],
)
def test_filesystems_zip(obj, base, request):
def test_filesystems_zip(obj: str, base: str, request: pytest.FixtureRequest) -> None:
fh = request.getfixturevalue(obj)

assert ZipFilesystem.detect(fh)

fs = ZipFilesystem(fh, base)
assert isinstance(fs, ZipFilesystem)

assert len(fs.listdir("/")) == 3
assert len(fs.listdir("/")) == 5

assert fs.get("./file_1").open().read() == b"file 1 contents"
assert fs.get("./file_2").open().read() == b"file 2 contents"
assert fs.get("./symlink_file").open().read() == b"file 1 contents"
assert len(list(fs.glob("./dir/*"))) == 100
assert len(list(fs.glob("./symlink_dir/*"))) == 100

zfile = fs.get("./file_1")
zdir = fs.get("./dir")
zsymd = fs.get("./symlink_dir")
zsymf = fs.get("./symlink_file")

assert zfile.is_file()
assert not zfile.is_dir()
assert not zfile.is_symlink()

with pytest.raises(NotADirectoryError):
list(zfile.iterdir())

with pytest.raises(NotADirectoryError):
next(zfile.scandir())

with pytest.raises(NotASymlinkError):
zfile.readlink()

assert zdir.is_dir()
assert not zdir.is_file()
assert not zdir.is_symlink()
assert len(list(zdir.iterdir())) == 100
assert len(list(zdir.scandir())) == 100

with pytest.raises(IsADirectoryError):
zdir.open()

assert zsymd.is_dir()
assert not zsymd.is_file()
assert zsymd.is_symlink()
assert zsymd.readlink() == "dir/"

assert not zsymf.is_dir()
assert zsymf.is_file()
assert zsymf.is_symlink()
assert zsymf.readlink() == "file_1"

file1 = zdir.get("1")
assert file1.is_file()
assert not file1.is_dir()
assert not file1.is_symlink()
assert file1.open().read() == b"contents 1"

assert zfile.stat().st_mode == 0o100777
assert file1.stat() == zsymd.get("1").stat()

assert zfile.stat().st_mode == 0o100600
assert zfile.stat(follow_symlinks=False) == zfile.lstat()

if isinstance(zdir, ZipFilesystemEntry):
Expand Down

0 comments on commit c55e65e

Please sign in to comment.