diff --git a/dissect/target/plugins/os/unix/log/helpers.py b/dissect/target/plugins/os/unix/log/helpers.py index f37c4d8c3..01b7ee863 100644 --- a/dissect/target/plugins/os/unix/log/helpers.py +++ b/dissect/target/plugins/os/unix/log/helpers.py @@ -1,4 +1,5 @@ -import itertools +from __future__ import annotations + import logging import re from datetime import datetime @@ -22,12 +23,17 @@ ) -def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]: +def iso_readlines(file: Path, max_lines: int | None = None) -> Iterator[tuple[datetime, str]]: """Iterator reading the provided log file in ISO format. Mimics ``year_rollover_helper`` behaviour.""" with open_decompress(file, "rt") as fh: - for line in fh: + for i, line in enumerate(fh): + if max_lines is not None and i >= max_lines: + log.debug("Stopping iso_readlines enumeration in %s: max_lines=%s was reached", file, max_lines) + break + if not (match := RE_TS_ISO.match(line)): - log.warning("No timestamp found in one of the lines in %s!", file) + if not max_lines: + log.warning("No timestamp found in one of the lines in %s!", file) log.debug("Skipping line: %s", line) continue @@ -43,4 +49,6 @@ def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]: def is_iso_fmt(file: Path) -> bool: """Determine if the provided log file uses ISO 8601 timestamp format logging or not.""" - return any(itertools.islice(iso_readlines(file), 0, 2)) + # We do not want to iterate of the entire file so we limit iso_readlines to the first few lines. + # We can not use islice here since that would only work if the file is ISO formatted and thus yields results. + return any(iso_readlines(file, max_lines=3)) diff --git a/tests/plugins/os/unix/log/test_helpers.py b/tests/plugins/os/unix/log/test_helpers.py new file mode 100644 index 000000000..fabba5c80 --- /dev/null +++ b/tests/plugins/os/unix/log/test_helpers.py @@ -0,0 +1,42 @@ +import gzip +import textwrap +from io import BytesIO + +import pytest + +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.unix.log.helpers import is_iso_fmt, iso_readlines + +syslog = """\ +Dec 31 03:14:15 localhost systemd[1]: Starting Journal Service... +Jan 1 13:21:34 localhost systemd: Stopped target Swap. +Jan 2 03:14:15 localhost systemd[1]: Starting Journal Service... +Jan 3 13:21:34 localhost systemd: Stopped target Swap. +2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: Started anacron.service - Run anacron jobs. +2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Anacron 2.3 started on 2024-12-31 +2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Normal exit (0 jobs run) +2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: anacron.service: Deactivated successfully. +""" + + +@pytest.mark.parametrize( + "max_lines, expected_return_value", + [ + (3, False), + (4, False), + (5, True), + (9, True), + ], +) +def test_iso_readlines_max_lines(fs_unix: VirtualFilesystem, max_lines: int, expected_return_value: bool) -> None: + """assert that iso_readlines does not parse more than the provided max_lines""" + + fs_unix.map_file_fh("/var/log/syslog.2", BytesIO(gzip.compress(textwrap.dedent(syslog).encode()))) + assert any(iso_readlines(fs_unix.path("/var/log/syslog.2"), max_lines)) == expected_return_value + + +def test_is_iso_fmt(fs_unix: VirtualFilesystem) -> None: + """assert that is_iso_fmt does not parse more than three max_lines""" + + fs_unix.map_file_fh("/var/log/syslog.3", BytesIO(gzip.compress(textwrap.dedent(syslog).encode()))) + assert not is_iso_fmt(fs_unix.path("/var/log/syslog.3"))