Skip to content

Commit

Permalink
Add symlink support to ZipFilesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Aug 7, 2024
1 parent ac8ee68 commit 64ba5b2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 46 deletions.
118 changes: 74 additions & 44 deletions dissect/target/filesystems/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import stat
import zipfile
from datetime import datetime, timezone
from typing import BinaryIO, Optional
from typing import BinaryIO, Iterator

from dissect.util.stream import BufferedStream

from dissect.target.exceptions import FileNotFoundError
from dissect.target.exceptions import FileNotFoundError, FilesystemError, NotASymlinkError
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
VirtualDirectory,
VirtualFile,
VirtualFilesystem,
)
from dissect.target.helpers import fsutil
Expand All @@ -33,7 +32,7 @@ class ZipFilesystem(Filesystem):
def __init__(
self,
fh: BinaryIO,
base: Optional[str] = None,
base: str | None = None,
*args,
**kwargs,
):
Expand All @@ -52,12 +51,7 @@ def __init__(
continue

rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator)

# NOTE: Normally we would check here if the member is a symlink or not

entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry
file_entry = entry_cls(self, rel_name, member)
self._fs.map_file_entry(rel_name, file_entry)
self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member))

@staticmethod
def _detect(fh: BinaryIO) -> bool:
Expand All @@ -69,60 +63,96 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry:
return self._fs.get(path, relentry=relentry)


class ZipFilesystemEntry(VirtualFile):
# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory
# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing
class ZipFilesystemEntry(VirtualDirectory):
fs: ZipFilesystem
entry: zipfile.ZipInfo

def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry

def open(self) -> BinaryIO:
"""Returns file handle (file-like object)."""
if self.is_dir():
raise IsADirectoryError(self.path)

if self.is_symlink():
return self._resolve().open()

try:
return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size)
except Exception:
raise FileNotFoundError()

def readlink(self) -> str:
"""Read the link if this entry is a symlink. Returns a string."""
raise NotImplementedError()
def iterdir(self) -> Iterator[str]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def readlink_ext(self) -> FilesystemEntry:
"""Read the link if this entry is a symlink. Returns a filesystem entry."""
raise NotImplementedError()
if self.is_symlink():
yield from self.readlink_ext().iterdir()
else:
yield from super().iterdir()

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
def scandir(self) -> Iterator[FilesystemEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
return fsutil.stat_result(
[
stat.S_IFREG | 0o777,
self.entry.header_offset,
id(self.fs),
1,
0,
0,
self.entry.file_size,
0,
datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(),
0,
]
)
if self.is_symlink():
yield from self.readlink_ext().scandir()
else:
yield from super().scandir()

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

class ZipFilesystemDirectoryEntry(VirtualDirectory):
def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry
if isinstance(entry, ZipFilesystemEntry):
return entry.entry.is_dir()
elif isinstance(entry, VirtualDirectory):
return True
return False

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

if isinstance(entry, ZipFilesystemEntry):
return not entry.entry.is_dir()
return False

def is_symlink(self) -> bool:
return stat.S_ISLNK(self.entry.external_attr >> 16)

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()
return self.fs.zip.open(self.entry).read().decode()

def readlink_ext(self) -> FilesystemEntry:
return FilesystemEntry.readlink_ext(self)

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
mode = self.entry.external_attr >> 16

if self.entry.is_dir() and not stat.S_ISDIR(mode):
mode = stat.S_IFDIR | mode
elif not self.entry.is_dir() and not stat.S_ISREG(mode):
mode = stat.S_IFREG | mode

return fsutil.stat_result(
[
stat.S_IFDIR | 0o777,
mode,
self.entry.header_offset,
id(self.fs),
1,
Expand Down
28 changes: 26 additions & 2 deletions tests/filesystems/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def _create_zip(prefix="", zip_dir=True):
for i in range(100):
zf.writestr(f"{prefix}dir/{i}", f"contents {i}")

symlink = zipfile.ZipInfo(f"{prefix}symlink_dir")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "dir/")

symlink = zipfile.ZipInfo(f"{prefix}symlink_file")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "file_1")

zf.close()
buf.seek(0)
return buf
Expand Down Expand Up @@ -99,14 +107,18 @@ def test_filesystems_zip(obj, base, request):
fs = ZipFilesystem(fh, base)
assert isinstance(fs, ZipFilesystem)

assert len(fs.listdir("/")) == 3
assert len(fs.listdir("/")) == 5

assert fs.get("./file_1").open().read() == b"file 1 contents"
assert fs.get("./file_2").open().read() == b"file 2 contents"
assert fs.get("./symlink_file").open().read() == b"file 1 contents"
assert len(list(fs.glob("./dir/*"))) == 100
assert len(list(fs.glob("./symlink_dir/*"))) == 100

zfile = fs.get("./file_1")
zdir = fs.get("./dir")
zsymd = fs.get("./symlink_dir")
zsymf = fs.get("./symlink_file")

assert zfile.is_file()
assert not zfile.is_dir()
Expand All @@ -116,13 +128,25 @@ def test_filesystems_zip(obj, base, request):
assert not zdir.is_file()
assert not zdir.is_symlink()

assert zsymd.is_dir()
assert not zsymd.is_file()
assert zsymd.is_symlink()
assert zsymd.readlink() == "dir/"

assert not zsymf.is_dir()
assert zsymf.is_file()
assert zsymf.is_symlink()
assert zsymf.readlink() == "file_1"

file1 = zdir.get("1")
assert file1.is_file()
assert not file1.is_dir()
assert not file1.is_symlink()
assert file1.open().read() == b"contents 1"

assert zfile.stat().st_mode == 0o100777
assert file1.stat() == zsymd.get("1").stat()

assert zfile.stat().st_mode == 0o100600
assert zfile.stat(follow_symlinks=False) == zfile.lstat()

if isinstance(zdir, ZipFilesystemEntry):
Expand Down

0 comments on commit 64ba5b2

Please sign in to comment.