Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve LayerFilesystem iterdir and scandir #973

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,12 @@ def hash(self, algos: Optional[list[str] | list[Callable]] = None) -> tuple[str]
class VirtualDirectory(FilesystemEntry):
"""Virtual directory implementation. Backed by a dict."""

def __init__(self, fs, path):
def __init__(self, fs, path, stat: fsutil.stat_result | None = None):
super().__init__(fs, path, None)
self.up = None
self.top = None
self.entries = {}
self._stat = stat

def __getitem__(self, item) -> FilesystemEntry:
if not self.fs.case_sensitive:
Expand Down Expand Up @@ -903,19 +904,19 @@ def scandir(self) -> Iterator[FilesystemEntry]:
continue
yield entry

def _stat(self) -> fsutil.stat_result:
def _fake_stat(self) -> fsutil.stat_result:
path_addr = fsutil.generate_addr(self.path, alt_separator=self.fs.alt_separator)
return fsutil.stat_result([stat.S_IFDIR, path_addr, id(self.fs), 1, 0, 0, 0, 0, 0, 0])

def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result:
if self.top:
return self.top.stat(follow_symlinks=follow_symlinks)
return self._stat()
return self._stat if self._stat else self._fake_stat()

def lstat(self) -> fsutil.stat_result:
if self.top:
return self.top.lstat()
return self._stat()
return self._stat if self._stat else self._fake_stat()

def is_dir(self, follow_symlinks: bool = True) -> bool:
return True
Expand Down Expand Up @@ -1148,7 +1149,7 @@ def get(self, path: str, relentry: Optional[FilesystemEntry] = None) -> Filesyst

return entry

def makedirs(self, path: str) -> VirtualDirectory:
def makedirs(self, path: str, stat: fsutil.stat_result | None = None) -> VirtualDirectory:
"""Create virtual directories into the VFS from the given path."""
path = fsutil.normalize(path, alt_separator=self.alt_separator).strip("/")
directory = self.root
Expand All @@ -1159,7 +1160,11 @@ def makedirs(self, path: str) -> VirtualDirectory:
parts = path.split("/")
for i, part in enumerate(parts):
if part not in directory:
vdir = VirtualDirectory(self, fsutil.join(*parts[: i + 1], alt_separator=self.alt_separator))
vdir = VirtualDirectory(
fs=self,
path=fsutil.join(*parts[: i + 1], alt_separator=self.alt_separator),
stat=stat if i + 1 == len(parts) else None,
)
vdir.up = directory

directory.add(part, vdir)
Expand All @@ -1175,6 +1180,23 @@ def map_fs(self, vfspath: str, fs: Filesystem) -> None:

mount = map_fs

def map_vdir(self, vfspath: str, tpath: pathlib.Path) -> None:
"""Recursively map a directory from a target into the VFS."""
if not isinstance(tpath, pathlib.Path):
raise ValueError("Argument %s should be a Path or TargetPath instance", tpath)

vfspath = fsutil.normalize(vfspath, alt_separator=self.alt_separator).strip("/")

for path in tpath.rglob("*"):
relpath = str(path).replace(str(tpath), "")
relpath = fsutil.normalize(relpath, alt_separator=os.path.sep)

if path.is_dir():
self.makedirs(relpath, stat=path.lstat())

else:
self.map_file_entry(relpath, path.get())

def map_dir(self, vfspath: str, realpath: str) -> None:
"""Recursively map a directory from the host machine into the VFS."""
vfspath = fsutil.normalize(vfspath, alt_separator=self.alt_separator).strip("/")
Expand Down Expand Up @@ -1535,6 +1557,9 @@ def iterdir(self) -> Iterator[str]:
yielded = {".", ".."}
selfentry = self._resolve()
for fsentry in selfentry.entries:
if not fsentry.is_dir():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add unit tests?

continue

for entry_name in fsentry.iterdir():
name = entry_name if selfentry.fs.case_sensitive else entry_name.lower()
if name in yielded:
Expand All @@ -1550,6 +1575,9 @@ def scandir(self) -> Iterator[LayerFilesystemEntry]:
items = defaultdict(list)
selfentry = self._resolve()
for fsentry in selfentry.entries:
if not fsentry.is_dir():
continue

for entry in fsentry.scandir():
name = entry.name if selfentry.fs.case_sensitive else entry.name.lower()
if name in (".", ".."):
Expand Down
9 changes: 5 additions & 4 deletions dissect/target/filesystems/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, path: Path, *args, **kwargs):
# base_path is /foo/bar/image/overlay2/layerdb/mounts/<id> so we traverse up to /foo/bar to get to the root.
root = path.parents[4]

layers = []
layers: list[tuple[str, Path]] = []
parent_layer = path.joinpath("parent").read_text()

# iterate over all image layers
Expand Down Expand Up @@ -96,15 +96,16 @@ def __init__(self, path: Path, *args, **kwargs):
)
continue

layer_fs = VirtualFilesystem()

# mount points can be files
if layer.is_file():
layer_fs = VirtualFilesystem()
layer_fs.map_file_fh(dest, layer.open("rb"))
layer_fs.map_file_entry(dest, layer.get())

# regular overlay2 layers are directories
# mount points can be directories too
else:
layer_fs = DirectoryFilesystem(layer)
layer_fs.map_vdir("/", layer)

log.info("Adding layer %s to destination %s", layer, dest)
self.append_layer().mount("/" if layer.is_file() else dest, layer_fs)
Expand Down
11 changes: 11 additions & 0 deletions dissect/target/plugins/filesystem/walkfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ def check_compatible(self) -> None:
@arg("--walkfs-path", default="/", help="path to recursively walk")
def walkfs(self, walkfs_path: str = "/") -> Iterator[FilesystemRecord]:
"""Walk a target's filesystem and return all filesystem entries."""

path = self.target.fs.path(walkfs_path)

if not path.exists():
self.target.log.error("No such file or directory: '%s'", walkfs_path)
return

if not path.is_dir():
self.target.log.error("Not a directory: '%s'", walkfs_path)
return

for entry in self.target.fs.recurse(walkfs_path):
try:
yield generate_record(self.target, entry)
Expand Down