Skip to content

Commit

Permalink
addr2line: add type checking
Browse files Browse the repository at this point in the history
Add additional type hints and a few other type related changes to
seastar-addr2line and addr2line.py. After this these scripts type
check cleanly with pyright strict.

--test passes.
  • Loading branch information
travisdowns committed Sep 25, 2024
1 parent 1147ac2 commit 890ffc4
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 46 deletions.
114 changes: 73 additions & 41 deletions scripts/addr2line.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@
import subprocess
from enum import Enum
from functools import cache
from typing import Any
from typing import Any, Optional, TypeVar, Union, cast

# special binary path/module indicating that the address is from the kernel
KERNEL_MODULE = '<kernel>'


T = TypeVar('T')


def notNone(o: Optional[T]) -> T:
"""Asserts the argument is not None then returns it."""
assert o is not None
return o


class Addr2Line:

# Matcher for a line that appears at the end a single decoded
Expand All @@ -48,7 +57,7 @@ class Addr2Line:
r"(.*0x0: \?\? at .*\n)" # llvm-addr2line pattern
)

def __init__(self, binary, concise=False, cmd_path="addr2line"):
def __init__(self, binary: str, concise: bool = False, cmd_path: str = "addr2line"):
self._binary = binary

# Print warning if binary has no debug info according to `file`.
Expand All @@ -60,48 +69,61 @@ def __init__(self, binary, concise=False, cmd_path="addr2line"):
print('{}'.format(s))

options = f"-{'C' if not concise else ''}fpia"
self._input = subprocess.Popen(
self._input_proc = subprocess.Popen(
[cmd_path, options, "-e", self._binary],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
if concise:
self._output = subprocess.Popen(
self._output_proc = subprocess.Popen(
["c++filt", "-p"],
stdin=self._input.stdout,
stdin=self._input_proc.stdout,
stdout=subprocess.PIPE,
universal_newlines=True,
)
else:
self._output = self._input
self._output_proc = self._input_proc

# If a library doesn't exist in a particular path, addr2line
# will just exit. We need to be robust against that. We
# can't just wait on self._addr2line since there is no
# guarantee on what timeout is sufficient.
self._input.stdin.write('\n')
self._input.stdin.flush()
res = self._output.stdout.readline()
self._input.write('\n')
self._input.flush()
res = self._output.readline()
self._missing = res == ''

@property
def _input(self):
"""Returns the input stream for the process/pipe."""
return notNone(self._input_proc.stdin)

@property
def _output(self):
"""Returns the output stream for the process/pipe."""
return notNone(self._output_proc.stdout)

def _read_resolved_address(self):
res = self._output.stdout.readline()
res = self._output.readline()
# remove the address
res = res.split(': ', 1)[1]
line = ''
while Addr2Line.dummy_pattern.fullmatch(line) is None:
res = first.split(': ', 1)[1]
while True:
line = self._output.readline()
if Addr2Line.dummy_pattern.fullmatch(line):
self._parent.debug('Addr2Line read output ( dummy): ', line)
break
self._parent.debug('Addr2Line read output (non-dummy): ', line)
res += line
line = self._output.stdout.readline()
return res

def __call__(self, address):
def __call__(self, address: str):
if self._missing:
return " ".join([self._binary, address, '\n'])
# We print a dummy 0x0 address after the address we are interested in
# which we can look for in _read_address
self._input.stdin.write(address + '\n0x0\n')
self._input.stdin.flush()
self._input.write(address + '\n0x0\n')
self._input.flush()
return self._read_resolved_address()


Expand All @@ -110,7 +132,7 @@ class KernelResolver:

LAST_SYMBOL_MAX_SIZE = 1024

def __init__(self, kallsyms='/proc/kallsyms'):
def __init__(self, kallsyms: str = '/proc/kallsyms'):
syms: list[tuple[int, str]] = []
ksym_re = re.compile(r'(?P<addr>[0-9a-f]+) (?P<type>.+) (?P<name>\S+)')
warnings_left = 10
Expand Down Expand Up @@ -159,7 +181,7 @@ def __init__(self, kallsyms='/proc/kallsyms'):
self.sym_names: tuple[str]
self.sym_addrs, self.sym_names = zip(*syms) # type: ignore

def __call__(self, addrstr):
def __call__(self, addrstr: str):
if self.error:
return addrstr + '\n'

Expand Down Expand Up @@ -188,6 +210,11 @@ def __call__(self, addrstr):
return f'{sn[idx]}+0x{address - saddr:x}\n'


LineResult = dict[
str, Union[None, 'BacktraceResolver.BacktraceParser.Type', str, list[dict[str, Any]]]
]


class BacktraceResolver:

class BacktraceParser:
Expand Down Expand Up @@ -219,7 +246,7 @@ def __init__(self):
self.generic_re = re.compile(fr"^(?:.*\s+){full_addr_match}\s*$", flags=re.IGNORECASE)
self.separator_re = re.compile(r'^\W*-+\W*$')

def split_addresses(self, addrstring: str, default_path=None):
def split_addresses(self, addrstring: str, default_path: Optional[str] = None):
addresses: list[dict[str, Any]] = []
for obj in addrstring.split():
m = re.match(self.address_re, obj)
Expand All @@ -228,12 +255,15 @@ def split_addresses(self, addrstring: str, default_path=None):
addresses.append({'path': m.group(1) or default_path, 'addr': m.group(2)})
return addresses

def __call__(self, line):
def get_prefix(s):
def __call__(self, line: str):

def get_prefix(s: Optional[str]):
if s is not None:
s = s.strip()
return s or None

ret: LineResult

# order here is important: the kernel callstack regex
# needs to come first since it is more specific and would
# otherwise be matched by the online regex which comes next
Expand Down Expand Up @@ -292,36 +322,36 @@ def get_prefix(s):

def __init__(
self,
executable,
kallsyms='/proc/kallsyms',
before_lines=1,
context_re='',
verbose=False,
concise=False,
cmd_path='addr2line',
executable: str,
kallsyms: str = '/proc/kallsyms',
before_lines: int = 1,
context_re: Optional[str] = '',
verbose: bool = False,
concise: bool = False,
cmd_path: str = 'addr2line',
):
self._executable = executable
self._kallsyms = kallsyms
self._current_backtrace = []
self._current_backtrace: list[tuple[str, str]] = []
self._prefix = None
self._before_lines = before_lines
self._before_lines_queue = collections.deque(maxlen=before_lines)
self._before_lines_queue: collections.deque[str] = collections.deque(maxlen=before_lines)
self._i = 0
self._known_backtraces = {}
self._known_backtraces: dict[str, int] = {}
if context_re is not None:
self._context_re = re.compile(context_re)
else:
self._context_re = None
self._verbose = verbose
self._concise = concise
self._cmd_path = cmd_path
self._known_modules = {}
self._known_modules: dict[str, Union[Addr2Line, KernelResolver]] = {}
self._get_resolver_for_module(
self._executable
) # fail fast if there is something wrong with the exe resolver
self.parser = self.BacktraceParser()

def _get_resolver_for_module(self, module):
def _get_resolver_for_module(self, module: str):
if not module in self._known_modules:
if module == KERNEL_MODULE:
resolver = KernelResolver(kallsyms=self._kallsyms)
Expand All @@ -333,11 +363,13 @@ def _get_resolver_for_module(self, module):
def __enter__(self):
return self

def __exit__(self, type, value, tb):
def __exit__(self, *_):
self._print_current_backtrace()

@cache
def resolve_address(self, address, module=None, verbose=None):
def resolve_address(
self, address: str, module: Optional[str] = None, verbose: Optional[bool] = None
):
if module is None:
module = self._executable
if verbose is None:
Expand All @@ -347,14 +379,14 @@ def resolve_address(self, address, module=None, verbose=None):
resolved_address = '{{{}}} {}: {}'.format(module, address, resolved_address)
return resolved_address

def _print_resolved_address(self, module, address):
def _print_resolved_address(self, module: Optional[str], address: str):
sys.stdout.write(self.resolve_address(address, module))

def _backtrace_context_matches(self):
if self._context_re is None:
return True

if any(map(lambda x: self._context_re.search(x) is not None, self._before_lines_queue)):
if any(self._context_re.search(x) for x in self._before_lines_queue):
return True

if (not self._prefix is None) and self._context_re.search(self._prefix):
Expand Down Expand Up @@ -400,7 +432,7 @@ def _print_current_backtrace(self):
self._current_backtrace = []
self._i += 1

def __call__(self, line):
def __call__(self, line: str):
res = self.parser(line)

if not res:
Expand All @@ -414,11 +446,11 @@ def __call__(self, line):
elif res['type'] == self.BacktraceParser.Type.SEPARATOR:
pass
elif res['type'] == self.BacktraceParser.Type.ADDRESS:
addresses = res['addresses']
addresses = cast(list[dict[str, Any]], res['addresses'])
if len(addresses) > 1:
self._print_current_backtrace()
if len(self._current_backtrace) == 0:
self._prefix = res['prefix']
self._prefix = cast(Union[str, None], res['prefix'])
for r in addresses:
if r['path']:
self._current_backtrace.append((r['path'], r['addr']))
Expand Down
14 changes: 9 additions & 5 deletions scripts/seastar-addr2line
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
# Copyright (C) 2017 ScyllaDB

import argparse
from typing import Any, Optional, Sequence, TextIO
import unittest
import re
import sys

from addr2line import BacktraceResolver


def read_backtrace(stdin):
def read_backtrace(stdin: TextIO):
"""
Read stdin char-by-char and stop when when user pressed Ctrl+D or the
Enter twice. Altough reading char-by-char is slow this won't be a
problem here as backtraces shouldn't be huge.
"""
linefeeds = 0
line = []
line: list[str] = []

while True:
char = stdin.read(1)
Expand All @@ -52,12 +52,17 @@ def read_backtrace(stdin):
break


TestResult = dict[str, Any]
MaybeTestResult = Optional[TestResult]
TestList = Sequence[tuple[str, MaybeTestResult]]


class TestStringMethods(unittest.TestCase):

def setUp(self):
self.parser = BacktraceResolver.BacktraceParser()

def _test(self, cases):
def _test(self, cases: TestList):
for line, expected in cases:
res = self.parser(line.strip() + '\n')
self.assertEqual(res, expected, f"failed to parse {line}")
Expand Down Expand Up @@ -499,7 +504,6 @@ There are three operational modes:
verbose=args.verbose,
cmd_path=args.addr2line,
) as resolve:
p = re.compile(r'\W+')
for line in list(lines):
resolve(line.strip() + '\n')

Expand Down

0 comments on commit 890ffc4

Please sign in to comment.