Skip to content

Commit

Permalink
Add symlink support to ZipFilesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Aug 9, 2024
1 parent ac8ee68 commit d12643a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 47 deletions.
117 changes: 72 additions & 45 deletions dissect/target/filesystems/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
import stat
import zipfile
from datetime import datetime, timezone
from typing import BinaryIO, Optional
from typing import BinaryIO, Iterator

from dissect.util.stream import BufferedStream

from dissect.target.exceptions import FileNotFoundError
from dissect.target.exceptions import (
FileNotFoundError,
FilesystemError,
IsADirectoryError,
NotADirectoryError,
NotASymlinkError,
)
from dissect.target.filesystem import (
Filesystem,
FilesystemEntry,
VirtualDirectory,
VirtualFile,
VirtualFilesystem,
)
from dissect.target.helpers import fsutil
Expand All @@ -33,7 +38,7 @@ class ZipFilesystem(Filesystem):
def __init__(
self,
fh: BinaryIO,
base: Optional[str] = None,
base: str | None = None,
*args,
**kwargs,
):
Expand All @@ -52,12 +57,7 @@ def __init__(
continue

rel_name = fsutil.normpath(mname[len(self.base) :], alt_separator=self.alt_separator)

# NOTE: Normally we would check here if the member is a symlink or not

entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry
file_entry = entry_cls(self, rel_name, member)
self._fs.map_file_entry(rel_name, file_entry)
self._fs.map_file_entry(rel_name, ZipFilesystemEntry(self, rel_name, member))

@staticmethod
def _detect(fh: BinaryIO) -> bool:
Expand All @@ -69,60 +69,87 @@ def get(self, path: str, relentry: FilesystemEntry = None) -> FilesystemEntry:
return self._fs.get(path, relentry=relentry)


class ZipFilesystemEntry(VirtualFile):
# Note: We subclass from VirtualDirectory because VirtualFilesystem is currently only compatible with VirtualDirectory
# Subclass from VirtualDirectory so we get that compatibility for free, and override the rest to do our own thing
class ZipFilesystemEntry(VirtualDirectory):
fs: ZipFilesystem
entry: zipfile.ZipInfo

def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry

def open(self) -> BinaryIO:
"""Returns file handle (file-like object)."""
if self.is_dir():
raise IsADirectoryError(self.path)

if self.is_symlink():
return self._resolve().open()

try:
return BufferedStream(self.fs.zip.open(self.entry), size=self.entry.file_size)
except Exception:
raise FileNotFoundError()

def readlink(self) -> str:
"""Read the link if this entry is a symlink. Returns a string."""
raise NotImplementedError()
def iterdir(self) -> Iterator[str]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def readlink_ext(self) -> FilesystemEntry:
"""Read the link if this entry is a symlink. Returns a filesystem entry."""
raise NotImplementedError()
yield from super(ZipFilesystemEntry, self._resolve()).iterdir()

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
def scandir(self) -> Iterator[FilesystemEntry]:
if not self.is_dir():
raise NotADirectoryError(self.path)

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
return fsutil.stat_result(
[
stat.S_IFREG | 0o777,
self.entry.header_offset,
id(self.fs),
1,
0,
0,
self.entry.file_size,
0,
datetime(*self.entry.date_time, tzinfo=timezone.utc).timestamp(),
0,
]
)
yield from super(ZipFilesystemEntry, self._resolve()).scandir()

def is_dir(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

class ZipFilesystemDirectoryEntry(VirtualDirectory):
def __init__(self, fs: ZipFilesystem, path: str, entry: zipfile.ZipInfo):
super().__init__(fs, path)
self.entry = entry
if isinstance(entry, ZipFilesystemEntry):
return entry.entry.is_dir()
return isinstance(entry, VirtualDirectory)

def is_file(self, follow_symlinks: bool = True) -> bool:
try:
entry = self._resolve(follow_symlinks=follow_symlinks)
except FilesystemError:
return False

if isinstance(entry, ZipFilesystemEntry):
return not entry.entry.is_dir()
return False

def is_symlink(self) -> bool:
return stat.S_ISLNK(self.entry.external_attr >> 16)

def readlink(self) -> str:
if not self.is_symlink():
raise NotASymlinkError()
return self.fs.zip.open(self.entry).read().decode()

def readlink_ext(self) -> FilesystemEntry:
return FilesystemEntry.readlink_ext(self)

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
"""Return the stat information of this entry."""
return self.lstat()
return self._resolve(follow_symlinks=follow_symlinks).lstat()

def lstat(self) -> fsutil.stat_result:
"""Return the stat information of the given path, without resolving links."""
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime']
mode = self.entry.external_attr >> 16

if self.entry.is_dir() and not stat.S_ISDIR(mode):
mode = stat.S_IFDIR | mode
elif not self.entry.is_dir() and not stat.S_ISREG(mode):
mode = stat.S_IFREG | mode

return fsutil.stat_result(
[
stat.S_IFDIR | 0o777,
mode,
self.entry.header_offset,
id(self.fs),
1,
Expand Down
43 changes: 41 additions & 2 deletions tests/filesystems/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from dissect.target.exceptions import IsADirectoryError, NotADirectoryError, NotASymlinkError
from dissect.target.filesystems.zip import ZipFilesystem, ZipFilesystemEntry


Expand Down Expand Up @@ -51,6 +52,14 @@ def _create_zip(prefix="", zip_dir=True):
for i in range(100):
zf.writestr(f"{prefix}dir/{i}", f"contents {i}")

symlink = zipfile.ZipInfo(f"{prefix}symlink_dir")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "dir/")

symlink = zipfile.ZipInfo(f"{prefix}symlink_file")
symlink.external_attr = 0o120777 << 16
zf.writestr(symlink, "file_1")

zf.close()
buf.seek(0)
return buf
Expand Down Expand Up @@ -99,30 +108,60 @@ def test_filesystems_zip(obj, base, request):
fs = ZipFilesystem(fh, base)
assert isinstance(fs, ZipFilesystem)

assert len(fs.listdir("/")) == 3
assert len(fs.listdir("/")) == 5

assert fs.get("./file_1").open().read() == b"file 1 contents"
assert fs.get("./file_2").open().read() == b"file 2 contents"
assert fs.get("./symlink_file").open().read() == b"file 1 contents"
assert len(list(fs.glob("./dir/*"))) == 100
assert len(list(fs.glob("./symlink_dir/*"))) == 100

zfile = fs.get("./file_1")
zdir = fs.get("./dir")
zsymd = fs.get("./symlink_dir")
zsymf = fs.get("./symlink_file")

assert zfile.is_file()
assert not zfile.is_dir()
assert not zfile.is_symlink()

with pytest.raises(NotADirectoryError):
list(zfile.iterdir())

with pytest.raises(NotADirectoryError):
next(zfile.scandir())

with pytest.raises(NotASymlinkError):
zfile.readlink()

assert zdir.is_dir()
assert not zdir.is_file()
assert not zdir.is_symlink()
assert len(list(zdir.iterdir())) == 100
assert len(list(zdir.scandir())) == 100

with pytest.raises(IsADirectoryError):
zdir.open()

assert zsymd.is_dir()
assert not zsymd.is_file()
assert zsymd.is_symlink()
assert zsymd.readlink() == "dir/"

assert not zsymf.is_dir()
assert zsymf.is_file()
assert zsymf.is_symlink()
assert zsymf.readlink() == "file_1"

file1 = zdir.get("1")
assert file1.is_file()
assert not file1.is_dir()
assert not file1.is_symlink()
assert file1.open().read() == b"contents 1"

assert zfile.stat().st_mode == 0o100777
assert file1.stat() == zsymd.get("1").stat()

assert zfile.stat().st_mode == 0o100600
assert zfile.stat(follow_symlinks=False) == zfile.lstat()

if isinstance(zdir, ZipFilesystemEntry):
Expand Down

0 comments on commit d12643a

Please sign in to comment.