diff --git a/scripts/addr2line.py b/scripts/addr2line.py index d48738f6cd..554082faa6 100755 --- a/scripts/addr2line.py +++ b/scripts/addr2line.py @@ -25,12 +25,21 @@ import subprocess from enum import Enum from functools import cache -from typing import Any +from typing import Any, Optional, TypeVar, Union, cast # special binary path/module indicating that the address is from the kernel KERNEL_MODULE = '' +T = TypeVar('T') + + +def notNone(o: Optional[T]) -> T: + """Asserts the argument is not None then returns it.""" + assert o is not None + return o + + class Addr2Line: # Matcher for a line that appears at the end a single decoded @@ -48,7 +57,7 @@ class Addr2Line: r"(.*0x0: \?\? at .*\n)" # llvm-addr2line pattern ) - def __init__(self, binary, concise=False, cmd_path="addr2line"): + def __init__(self, binary: str, concise: bool = False, cmd_path: str = "addr2line"): self._binary = binary # Print warning if binary has no debug info according to `file`. @@ -60,48 +69,61 @@ def __init__(self, binary, concise=False, cmd_path="addr2line"): print('{}'.format(s)) options = f"-{'C' if not concise else ''}fpia" - self._input = subprocess.Popen( + self._input_proc = subprocess.Popen( [cmd_path, options, "-e", self._binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True, ) if concise: - self._output = subprocess.Popen( + self._output_proc = subprocess.Popen( ["c++filt", "-p"], - stdin=self._input.stdout, + stdin=self._input_proc.stdout, stdout=subprocess.PIPE, universal_newlines=True, ) else: - self._output = self._input + self._output_proc = self._input_proc # If a library doesn't exist in a particular path, addr2line # will just exit. We need to be robust against that. We # can't just wait on self._addr2line since there is no # guarantee on what timeout is sufficient. - self._input.stdin.write('\n') - self._input.stdin.flush() - res = self._output.stdout.readline() + self._input.write('\n') + self._input.flush() + res = self._output.readline() self._missing = res == '' + @property + def _input(self): + """Returns the input stream for the process/pipe.""" + return notNone(self._input_proc.stdin) + + @property + def _output(self): + """Returns the output stream for the process/pipe.""" + return notNone(self._output_proc.stdout) + def _read_resolved_address(self): - res = self._output.stdout.readline() + res = self._output.readline() # remove the address - res = res.split(': ', 1)[1] - line = '' - while Addr2Line.dummy_pattern.fullmatch(line) is None: + res = first.split(': ', 1)[1] + while True: + line = self._output.readline() + if Addr2Line.dummy_pattern.fullmatch(line): + self._parent.debug('Addr2Line read output ( dummy): ', line) + break + self._parent.debug('Addr2Line read output (non-dummy): ', line) res += line - line = self._output.stdout.readline() return res - def __call__(self, address): + def __call__(self, address: str): if self._missing: return " ".join([self._binary, address, '\n']) # We print a dummy 0x0 address after the address we are interested in # which we can look for in _read_address - self._input.stdin.write(address + '\n0x0\n') - self._input.stdin.flush() + self._input.write(address + '\n0x0\n') + self._input.flush() return self._read_resolved_address() @@ -110,7 +132,7 @@ class KernelResolver: LAST_SYMBOL_MAX_SIZE = 1024 - def __init__(self, kallsyms='/proc/kallsyms'): + def __init__(self, kallsyms: str = '/proc/kallsyms'): syms: list[tuple[int, str]] = [] ksym_re = re.compile(r'(?P[0-9a-f]+) (?P.+) (?P\S+)') warnings_left = 10 @@ -159,7 +181,7 @@ def __init__(self, kallsyms='/proc/kallsyms'): self.sym_names: tuple[str] self.sym_addrs, self.sym_names = zip(*syms) # type: ignore - def __call__(self, addrstr): + def __call__(self, addrstr: str): if self.error: return addrstr + '\n' @@ -188,6 +210,11 @@ def __call__(self, addrstr): return f'{sn[idx]}+0x{address - saddr:x}\n' +LineResult = dict[ + str, Union[None, 'BacktraceResolver.BacktraceParser.Type', str, list[dict[str, Any]]] +] + + class BacktraceResolver: class BacktraceParser: @@ -219,7 +246,7 @@ def __init__(self): self.generic_re = re.compile(fr"^(?:.*\s+){full_addr_match}\s*$", flags=re.IGNORECASE) self.separator_re = re.compile(r'^\W*-+\W*$') - def split_addresses(self, addrstring: str, default_path=None): + def split_addresses(self, addrstring: str, default_path: Optional[str] = None): addresses: list[dict[str, Any]] = [] for obj in addrstring.split(): m = re.match(self.address_re, obj) @@ -228,12 +255,15 @@ def split_addresses(self, addrstring: str, default_path=None): addresses.append({'path': m.group(1) or default_path, 'addr': m.group(2)}) return addresses - def __call__(self, line): - def get_prefix(s): + def __call__(self, line: str): + + def get_prefix(s: Optional[str]): if s is not None: s = s.strip() return s or None + ret: LineResult + # order here is important: the kernel callstack regex # needs to come first since it is more specific and would # otherwise be matched by the online regex which comes next @@ -292,22 +322,22 @@ def get_prefix(s): def __init__( self, - executable, - kallsyms='/proc/kallsyms', - before_lines=1, - context_re='', - verbose=False, - concise=False, - cmd_path='addr2line', + executable: str, + kallsyms: str = '/proc/kallsyms', + before_lines: int = 1, + context_re: Optional[str] = '', + verbose: bool = False, + concise: bool = False, + cmd_path: str = 'addr2line', ): self._executable = executable self._kallsyms = kallsyms - self._current_backtrace = [] + self._current_backtrace: list[tuple[str, str]] = [] self._prefix = None self._before_lines = before_lines - self._before_lines_queue = collections.deque(maxlen=before_lines) + self._before_lines_queue: collections.deque[str] = collections.deque(maxlen=before_lines) self._i = 0 - self._known_backtraces = {} + self._known_backtraces: dict[str, int] = {} if context_re is not None: self._context_re = re.compile(context_re) else: @@ -315,13 +345,13 @@ def __init__( self._verbose = verbose self._concise = concise self._cmd_path = cmd_path - self._known_modules = {} + self._known_modules: dict[str, Union[Addr2Line, KernelResolver]] = {} self._get_resolver_for_module( self._executable ) # fail fast if there is something wrong with the exe resolver self.parser = self.BacktraceParser() - def _get_resolver_for_module(self, module): + def _get_resolver_for_module(self, module: str): if not module in self._known_modules: if module == KERNEL_MODULE: resolver = KernelResolver(kallsyms=self._kallsyms) @@ -333,11 +363,13 @@ def _get_resolver_for_module(self, module): def __enter__(self): return self - def __exit__(self, type, value, tb): + def __exit__(self, *_): self._print_current_backtrace() @cache - def resolve_address(self, address, module=None, verbose=None): + def resolve_address( + self, address: str, module: Optional[str] = None, verbose: Optional[bool] = None + ): if module is None: module = self._executable if verbose is None: @@ -347,14 +379,14 @@ def resolve_address(self, address, module=None, verbose=None): resolved_address = '{{{}}} {}: {}'.format(module, address, resolved_address) return resolved_address - def _print_resolved_address(self, module, address): + def _print_resolved_address(self, module: Optional[str], address: str): sys.stdout.write(self.resolve_address(address, module)) def _backtrace_context_matches(self): if self._context_re is None: return True - if any(map(lambda x: self._context_re.search(x) is not None, self._before_lines_queue)): + if any(self._context_re.search(x) for x in self._before_lines_queue): return True if (not self._prefix is None) and self._context_re.search(self._prefix): @@ -400,7 +432,7 @@ def _print_current_backtrace(self): self._current_backtrace = [] self._i += 1 - def __call__(self, line): + def __call__(self, line: str): res = self.parser(line) if not res: @@ -414,11 +446,11 @@ def __call__(self, line): elif res['type'] == self.BacktraceParser.Type.SEPARATOR: pass elif res['type'] == self.BacktraceParser.Type.ADDRESS: - addresses = res['addresses'] + addresses = cast(list[dict[str, Any]], res['addresses']) if len(addresses) > 1: self._print_current_backtrace() if len(self._current_backtrace) == 0: - self._prefix = res['prefix'] + self._prefix = cast(Union[str, None], res['prefix']) for r in addresses: if r['path']: self._current_backtrace.append((r['path'], r['addr'])) diff --git a/scripts/seastar-addr2line b/scripts/seastar-addr2line index 859fd25f0f..328ab15fa8 100755 --- a/scripts/seastar-addr2line +++ b/scripts/seastar-addr2line @@ -19,21 +19,21 @@ # Copyright (C) 2017 ScyllaDB import argparse +from typing import Any, Optional, Sequence, TextIO import unittest -import re import sys from addr2line import BacktraceResolver -def read_backtrace(stdin): +def read_backtrace(stdin: TextIO): """ Read stdin char-by-char and stop when when user pressed Ctrl+D or the Enter twice. Altough reading char-by-char is slow this won't be a problem here as backtraces shouldn't be huge. """ linefeeds = 0 - line = [] + line: list[str] = [] while True: char = stdin.read(1) @@ -52,12 +52,17 @@ def read_backtrace(stdin): break +TestResult = dict[str, Any] +MaybeTestResult = Optional[TestResult] +TestList = Sequence[tuple[str, MaybeTestResult]] + + class TestStringMethods(unittest.TestCase): def setUp(self): self.parser = BacktraceResolver.BacktraceParser() - def _test(self, cases): + def _test(self, cases: TestList): for line, expected in cases: res = self.parser(line.strip() + '\n') self.assertEqual(res, expected, f"failed to parse {line}") @@ -499,7 +504,6 @@ There are three operational modes: verbose=args.verbose, cmd_path=args.addr2line, ) as resolve: - p = re.compile(r'\W+') for line in list(lines): resolve(line.strip() + '\n')