Skip to content

Commit

Permalink
Add symlink support to ZipFilesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Aug 7, 2024
1 parent ac8ee68 commit 8e569e8
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 46 deletions.
122 changes: 78 additions & 44 deletions dissect/target/filesystems/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import stat
import zipfile
from datetime import datetime, timezone
from typing import BinaryIO, Optional
from typing import BinaryIO, Iterator

from dissect.util.stream import BufferedStream

from dissect.target.exceptions import FileNotFoundError
from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
NotASymlinkError,
)
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
VirtualDirectory,
VirtualFile,
VirtualFilesystem,
)
from dissect.target.helpers import fsutil
Expand All @@ -33,7 +36,7 @@ class ZipFilesystem(Filesystem):
def __init__(
self,
fh: BinaryIO,
base: Optional[str] = None,
base: str | None = None,
*args,
**kwargs,
):
Expand All @@ -52,12 +55,7 @@ def __init__(
continue

rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator)

# NOTE: Normally we would check here if the member is a symlink or not

entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry
file_entry = entry_cls(self, rel_name, member)
self._fs.map_file_entry(rel_name, file_entry)
self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member))

@staticmethod
def _detect(fh: BinaryIO) -> bool:
Expand All @@ -69,60 +67,96 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry:
return self._fs.get(path, relentry=relentry)


class ZipFilesystemEntry(VirtualFile):
# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory
# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing
class ZipFilesystemEntry(VirtualDirectory):
fs: ZipFilesystem
entry: zipfile.ZipInfo

def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry

def open(self) -> BinaryIO:
"""Returns file handle (file-like object)."""
if self.is_dir():
raise IsADirectoryError(self.path)

Check warning on line 83 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L83

Added line #L83 was not covered by tests

if self.is_symlink():
return self._resolve().open()

try:
return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size)
except Exception:
raise FileNotFoundError()

def readlink(self) -> str:
"""Read the link if this entry is a symlink. Returns a string."""
raise NotImplementedError()
def iterdir(self) -> Iterator[str]:
if not self.is_dir():
raise NotADirectoryError(self.path)

Check warning on line 95 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L94-L95

Added lines #L94 - L95 were not covered by tests

def readlink_ext(self) -> FilesystemEntry:
"""Read the link if this entry is a symlink. Returns a filesystem entry."""
raise NotImplementedError()
if self.is_symlink():
yield from self.readlink_ext().iterdir()

Check warning on line 98 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L97-L98

Added lines #L97 - L98 were not covered by tests
else:
yield from super().iterdir()

Check warning on line 100 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L100

Added line #L100 was not covered by tests

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
def scandir(self) -> Iterator[FilesystemEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

Check warning on line 104 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L104

Added line #L104 was not covered by tests

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
return fsutil.stat_result(
[
stat.S_IFREG | 0o777,
self.entry.header_offset,
id(self.fs),
1,
0,
0,
self.entry.file_size,
0,
datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(),
0,
]
)
if self.is_symlink():
yield from self.readlink_ext().scandir()
else:
yield from super().scandir()

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 115 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L114-L115

Added lines #L114 - L115 were not covered by tests

class ZipFilesystemDirectoryEntry(VirtualDirectory):
def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry
if isinstance(entry, ZipFilesystemEntry):
return entry.entry.is_dir()
elif isinstance(entry, VirtualDirectory):
return True
return False

Check warning on line 121 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L121

Added line #L121 was not covered by tests

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

Check warning on line 127 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L126-L127

Added lines #L126 - L127 were not covered by tests

if isinstance(entry, ZipFilesystemEntry):
return not entry.entry.is_dir()
return False

def is_symlink(self) -> bool:
return stat.S_ISLNK(self.entry.external_attr >> 16)

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()

Check warning on line 138 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L138

Added line #L138 was not covered by tests
return self.fs.zip.open(self.entry).read().decode()

def readlink_ext(self) -> FilesystemEntry:
return FilesystemEntry.readlink_ext(self)

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
mode = self.entry.external_attr >> 16

if self.entry.is_dir() and not stat.S_ISDIR(mode):
mode = stat.S_IFDIR | mode

Check warning on line 153 in dissect/target/filesystems/zip.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/filesystems/zip.py#L153

Added line #L153 was not covered by tests
elif not self.entry.is_dir() and not stat.S_ISREG(mode):
mode = stat.S_IFREG | mode

return fsutil.stat_result(
[
stat.S_IFDIR | 0o777,
mode,
self.entry.header_offset,
id(self.fs),
1,
Expand Down
28 changes: 26 additions & 2 deletions tests/filesystems/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def _create_zip(prefix="", zip_dir=True):
for i in range(100):
zf.writestr(f"{prefix}dir/{i}", f"contents {i}")

symlink = zipfile.ZipInfo(f"{prefix}symlink_dir")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "dir/")

symlink = zipfile.ZipInfo(f"{prefix}symlink_file")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "file_1")

zf.close()
buf.seek(0)
return buf
Expand Down Expand Up @@ -99,14 +107,18 @@ def test_filesystems_zip(obj, base, request):
fs = ZipFilesystem(fh, base)
assert isinstance(fs, ZipFilesystem)

assert len(fs.listdir("/")) == 3
assert len(fs.listdir("/")) == 5

assert fs.get("./file_1").open().read() == b"file 1 contents"
assert fs.get("./file_2").open().read() == b"file 2 contents"
assert fs.get("./symlink_file").open().read() == b"file 1 contents"
assert len(list(fs.glob("./dir/*"))) == 100
assert len(list(fs.glob("./symlink_dir/*"))) == 100

zfile = fs.get("./file_1")
zdir = fs.get("./dir")
zsymd = fs.get("./symlink_dir")
zsymf = fs.get("./symlink_file")

assert zfile.is_file()
assert not zfile.is_dir()
Expand All @@ -116,13 +128,25 @@ def test_filesystems_zip(obj, base, request):
assert not zdir.is_file()
assert not zdir.is_symlink()

assert zsymd.is_dir()
assert not zsymd.is_file()
assert zsymd.is_symlink()
assert zsymd.readlink() == "dir/"

assert not zsymf.is_dir()
assert zsymf.is_file()
assert zsymf.is_symlink()
assert zsymf.readlink() == "file_1"

file1 = zdir.get("1")
assert file1.is_file()
assert not file1.is_dir()
assert not file1.is_symlink()
assert file1.open().read() == b"contents 1"

assert zfile.stat().st_mode == 0o100777
assert file1.stat() == zsymd.get("1").stat()

assert zfile.stat().st_mode == 0o100600
assert zfile.stat(follow_symlinks=False) == zfile.lstat()

if isinstance(zdir, ZipFilesystemEntry):
Expand Down

0 comments on commit 8e569e8

Please sign in to comment.