From 2d1935d60238e876c1adc43b53a6695270d18699 Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Thu, 19 Sep 2024 14:37:00 +0200 Subject: [PATCH 1/2] feat: Add additional logging tags for protocol information DRAFT to enable comprehensive dissecting in the `hr` commands. --- src/gallia/transports/base.py | 22 ++++++++++++++++++++-- src/gallia/transports/isotp.py | 5 ++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/gallia/transports/base.py b/src/gallia/transports/base.py index c79882949..de99c0283 100644 --- a/src/gallia/transports/base.py +++ b/src/gallia/transports/base.py @@ -6,10 +6,10 @@ import binascii import io from abc import ABC, abstractmethod -from typing import Any, Protocol, Self +from typing import Any, Literal, Protocol, Self from urllib.parse import parse_qs, urlencode, urlparse, urlunparse -from gallia.log import get_logger +from gallia.log import Logger, get_logger from gallia.transports.schemes import TransportScheme from gallia.utils import join_host_port @@ -145,6 +145,24 @@ def __init_subclass__( cls.SCHEME = scheme cls.BUFSIZE = bufsize + @staticmethod + def log_io( + logger: Logger, + iotype: Literal["read", "write"], + proto: str, + data: bytes, + tags: list[str] | None, + trace: bool = False, + ) -> None: + # tags without "=" are deprecated + t = [f"type=io,{iotype}", "encoding=hex", f"proto={proto}", iotype] + if tags is not None: + t += tags + if trace: + logger.trace(data.hex(), extra={"tags": t}) + else: + logger.debug(data.hex(), extra={"tags": t}) + @classmethod def check_scheme(cls, target: TargetURI) -> None: """Checks if the provided URI has the correct scheme.""" diff --git a/src/gallia/transports/isotp.py b/src/gallia/transports/isotp.py index d406f5b92..2a4e05f20 100644 --- a/src/gallia/transports/isotp.py +++ b/src/gallia/transports/isotp.py @@ -182,8 +182,7 @@ async def write( timeout: float | None = None, tags: list[str] | None = None, ) -> int: - t = tags + ["write"] if tags is not None else ["write"] - logger.trace(data.hex(), extra={"tags": t}) + self.log_io(logger, "write", "isotp", data, tags, trace=True) loop = asyncio.get_running_loop() await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout) @@ -199,7 +198,7 @@ async def read(self, timeout: float | None = None, tags: list[str] | None = None if e.errno == errno.EILSEQ: raise BrokenPipeError(f"invalid consecutive frame numbers: {e}") from e raise e - logger.trace(data.hex(), extra={"tags": tags}) + self.log_io(logger, "read", "isotp", data, tags, trace=True) return data async def close(self) -> None: From b2794575fdd16fd4dca7bc73abd684223e48e198 Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Fri, 20 Sep 2024 10:39:19 +0200 Subject: [PATCH 2/2] feat: Add dissector API --- src/gallia/dissect/__init__.py | 48 +++++++++++++++++++++++++ src/gallia/log.py | 20 ++++++++++- src/gallia/transports/base.py | 34 +++--------------- src/gallia/transports/hsfz.py | 7 ++-- src/gallia/transports/isotp.py | 6 ++-- src/hr/__init__.py | 65 +++++++++++++++++++++++++++++++--- 6 files changed, 141 insertions(+), 39 deletions(-) create mode 100644 src/gallia/dissect/__init__.py diff --git a/src/gallia/dissect/__init__.py b/src/gallia/dissect/__init__.py new file mode 100644 index 000000000..c00b605cf --- /dev/null +++ b/src/gallia/dissect/__init__.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Literal, NotRequired, TypeAlias, TypedDict + +from gallia.services.uds.core.service import UDSRequest, UDSResponse + + +@dataclass +class Field: + name: str + raw_data: bytes + dissected_data: str + + +class BaseDissector(ABC): + PROTO: str = "" + + def __init_subclass__( + cls, + /, + proto: str, + **kwargs: Any, + ) -> None: + super().__init_subclass__(**kwargs) + cls.PROTO = proto + + @abstractmethod + def dissect(self, data: bytes, iotype: str | None = None) -> list[Field]: ... + + +class UDSDissector(BaseDissector, proto="uds"): + def dissect(self, data: bytes, iotype: str | None = None) -> list[Field]: + if data[0] & 0b01000000: + dissected_data = repr(UDSResponse.parse_dynamic(data)) + name = "uds response" + else: + dissected_data = repr(UDSRequest.parse_dynamic(data)) + name = "uds request" + + return [Field(name=name, raw_data=data, dissected_data=dissected_data)] + + +# TODO: Can the PROTO attribute be used? +registry: dict[str, BaseDissector] = {"uds": UDSDissector()} diff --git a/src/gallia/log.py b/src/gallia/log.py index 19c796a87..30c972ecf 100644 --- a/src/gallia/log.py +++ b/src/gallia/log.py @@ -24,7 +24,7 @@ from pathlib import Path from queue import Queue from types import TracebackType -from typing import TYPE_CHECKING, Any, BinaryIO, Self, TextIO, TypeAlias, cast +from typing import TYPE_CHECKING, Any, BinaryIO, Literal, Self, TextIO, TypeAlias, cast import msgspec import zstandard @@ -822,3 +822,21 @@ def result( def get_logger(name: str) -> Logger: return cast(Logger, logging.getLogger(name)) + + +def log_io( + logger: Logger, + iotype: Literal["read", "write"], + proto: str, + data: bytes, + tags: list[str] | None = None, + trace: bool = False, +) -> None: + # tags without "=" are deprecated + t = [f"=io={iotype}", "encoding=hex", f"proto={proto}", iotype] + if tags is not None: + t += tags + if trace: + logger.trace(data.hex(), extra={"tags": t}) + else: + logger.debug(data.hex(), extra={"tags": t}) diff --git a/src/gallia/transports/base.py b/src/gallia/transports/base.py index de99c0283..78f279a53 100644 --- a/src/gallia/transports/base.py +++ b/src/gallia/transports/base.py @@ -6,10 +6,10 @@ import binascii import io from abc import ABC, abstractmethod -from typing import Any, Literal, Protocol, Self +from typing import Any, Protocol, Self from urllib.parse import parse_qs, urlencode, urlparse, urlunparse -from gallia.log import Logger, get_logger +from gallia.log import get_logger, log_io from gallia.transports.schemes import TransportScheme from gallia.utils import join_host_port @@ -145,24 +145,6 @@ def __init_subclass__( cls.SCHEME = scheme cls.BUFSIZE = bufsize - @staticmethod - def log_io( - logger: Logger, - iotype: Literal["read", "write"], - proto: str, - data: bytes, - tags: list[str] | None, - trace: bool = False, - ) -> None: - # tags without "=" are deprecated - t = [f"type=io,{iotype}", "encoding=hex", f"proto={proto}", iotype] - if tags is not None: - t += tags - if trace: - logger.trace(data.hex(), extra={"tags": t}) - else: - logger.debug(data.hex(), extra={"tags": t}) - @classmethod def check_scheme(cls, target: TargetURI) -> None: """Checks if the provided URI has the correct scheme.""" @@ -251,13 +233,10 @@ async def write( timeout: float | None = None, tags: list[str] | None = None, ) -> int: - t = tags + ["write"] if tags is not None else ["write"] - - logger.trace(data.hex() + "0a", extra={"tags": t}) - writer = self.get_writer() writer.write(binascii.hexlify(data) + b"\n") await asyncio.wait_for(writer.drain(), timeout) + log_io(logger, "write", "lines", data, trace=True) return len(data) async def read( @@ -266,9 +245,6 @@ async def read( tags: list[str] | None = None, ) -> bytes: data = await asyncio.wait_for(self.get_reader().readline(), timeout) - d = data.decode().strip() - - t = tags + ["read"] if tags is not None else ["read"] - logger.trace(d + "0a", extra={"tags": t}) + log_io(logger, "read", "lines", data, trace=True) - return binascii.unhexlify(d) + return binascii.unhexlify(data.decode().strip()) diff --git a/src/gallia/transports/hsfz.py b/src/gallia/transports/hsfz.py index f0c883488..f75c24ffa 100644 --- a/src/gallia/transports/hsfz.py +++ b/src/gallia/transports/hsfz.py @@ -15,7 +15,7 @@ from pydantic import BaseModel, field_validator from gallia.log import get_logger -from gallia.transports.base import BaseTransport, TargetURI +from gallia.transports.base import BaseTransport, TargetURI, log_io from gallia.utils import auto_int logger = get_logger(__name__) @@ -357,7 +357,9 @@ async def read( timeout: float | None = None, tags: list[str] | None = None, ) -> bytes: - return await asyncio.wait_for(self._conn.read_diag_request(), timeout) + data = await asyncio.wait_for(self._conn.read_diag_request(), timeout) + log_io(logger, "read", "hsfz", data, tags, trace=True) + return data async def write( self, @@ -366,4 +368,5 @@ async def write( tags: list[str] | None = None, ) -> int: await asyncio.wait_for(self._conn.write_diag_request(data), timeout) + log_io(logger, "write", "hsfz", data, tags, trace=True) return len(data) diff --git a/src/gallia/transports/isotp.py b/src/gallia/transports/isotp.py index 2a4e05f20..dafa3c129 100644 --- a/src/gallia/transports/isotp.py +++ b/src/gallia/transports/isotp.py @@ -14,7 +14,7 @@ from pydantic import BaseModel, field_validator from gallia.log import get_logger -from gallia.transports.base import BaseTransport, TargetURI +from gallia.transports.base import BaseTransport, TargetURI, log_io from gallia.utils import auto_int logger = get_logger(__name__) @@ -182,7 +182,7 @@ async def write( timeout: float | None = None, tags: list[str] | None = None, ) -> int: - self.log_io(logger, "write", "isotp", data, tags, trace=True) + log_io(logger, "write", "isotp", data, tags, trace=True) loop = asyncio.get_running_loop() await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout) @@ -198,7 +198,7 @@ async def read(self, timeout: float | None = None, tags: list[str] | None = None if e.errno == errno.EILSEQ: raise BrokenPipeError(f"invalid consecutive frame numbers: {e}") from e raise e - self.log_io(logger, "read", "isotp", data, tags, trace=True) + log_io(logger, "read", "isotp", data, tags, trace=True) return data async def close(self) -> None: diff --git a/src/hr/__init__.py b/src/hr/__init__.py index ee0d03a3d..c50a56095 100644 --- a/src/hr/__init__.py +++ b/src/hr/__init__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import argparse +import binascii import os import signal import sys @@ -12,8 +13,51 @@ import msgspec -from gallia import exitcodes -from gallia.log import ColorMode, PenlogPriority, PenlogReader, resolve_color_mode +from gallia import dissect, exitcodes +from gallia.log import ColorMode, PenlogPriority, PenlogReader, PenlogRecord, resolve_color_mode + + +def extract_tag_info(raw_tag: str) -> str: + parts = raw_tag.split("=", maxsplit=2) + if len(parts) != 2: + raise ValueError() + return parts[1] + + +def get_io_info(tags: list[str]) -> tuple[str, str]: + found_iotype = False + found_proto = False + for tag in tags: + if tag.startswith("io"): + iotype = extract_tag_info(tag) + found_iotype = True + if tag.startswith("proto"): + proto = extract_tag_info(tag) + found_proto = True + if found_iotype and found_proto: + return iotype, proto + raise ValueError() + + +def dissect_record(record: PenlogRecord) -> str | None: + if record.tags is None: + return None + + try: + iotype, proto = get_io_info(record.tags) + except ValueError: + return None + + if proto not in dissect.registry: + return None + + dissector = dissect.registry[proto] + data = binascii.unhexlify(record.data) + + out = "" + for field in dissector.dissect(data, iotype): + out += f"{field.name}: {field.dissected_data}" + return out def parse_args() -> argparse.Namespace: @@ -50,7 +94,14 @@ def parse_args() -> argparse.Namespace: "--lines", type=int, default=100, - help="print the last n lines", + help="argument for --head/--tail)", + ) + parser.add_argument( + "-d", + "--dissect", + action=argparse.BooleanOptionalAction, + default=False, + help="dissect log messages with io and proto tags", ) parser.add_argument( "--color", @@ -81,7 +132,13 @@ def _main() -> int: for record in record_generator: record.colored = colored - print(record, end="") + if args.dissect: + # TODO: Maybe a method of the record? record.dissect()? + if (dissected := dissect_record(record)) is not None: + print(f" {dissected}") + print(record, end="") + else: + print(record, end="") return 0