diff --git a/malduck/__init__.py b/malduck/__init__.py index a23df32..392c3aa 100644 --- a/malduck/__init__.py +++ b/malduck/__init__.py @@ -111,7 +111,7 @@ ) from .structure import Structure from .verify import verify -from .yara import Yara, YaraString, YaraStringMatch +from .yara import Yara, YaraString __all__ = [ # bits @@ -240,7 +240,6 @@ # verify "verify", # yara - "YaraStringMatch", "YaraString", "Yara", ] diff --git a/malduck/extractor/extract_manager.py b/malduck/extractor/extract_manager.py index 2bf70bf..ee9df85 100644 --- a/malduck/extractor/extract_manager.py +++ b/malduck/extractor/extract_manager.py @@ -5,7 +5,7 @@ from typing import Any, Dict, List, Optional, Type, Union from ..procmem import ProcessMemory -from ..yara import Yara, YaraRuleOffsets, YaraRulesetMatch +from ..yara import RulesetMatch, Yara from .extractor import Extractor from .loaders import load_modules @@ -308,15 +308,15 @@ def on_extractor_error( self.parent.on_extractor_error(exc, extractor, method_name) def push_procmem( - self, p: ProcessMemory, _matches: Optional[YaraRulesetMatch] = None + self, p: ProcessMemory, _matches: Optional[RulesetMatch] = None ) -> None: """ Pushes ProcessMemory object for extraction :param p: ProcessMemory object :type p: :class:`malduck.procmem.ProcessMemory` - :param _matches: YaraRulesetMatch object (used internally) - :type _matches: :class:`malduck.yara.YaraRulesetMatch` + :param _matches: RulesetMatch object (used internally) + :type _matches: :class:`malduck.yara.RulesetMatch` """ matches = _matches or p.yarav(self.parent.rules, extended=True) # For each extractor... @@ -338,7 +338,7 @@ def push_procmem( DeprecationWarning, ) getattr(extractor, "handle_yara")( - p, YaraRuleOffsets(matches[rule]) + p, matches.get_ruleset_offsets()[rule] ) else: extractor.handle_match(p, matches[rule]) diff --git a/malduck/extractor/extractor.pyi b/malduck/extractor/extractor.pyi index 56a05c4..0d861c2 100644 --- a/malduck/extractor/extractor.pyi +++ b/malduck/extractor/extractor.pyi @@ -17,7 +17,7 @@ from typing import ( from typing_extensions import Protocol from ..procmem import ProcessMemory, ProcessMemoryELF, ProcessMemoryPE -from ..yara import YaraRuleMatch, YaraStringMatch +from ..yara import RuleMatch, StringMatch from .extract_manager import ProcmemExtractManager Config = Dict[str, Any] @@ -31,13 +31,11 @@ class _StringOffsetCallback(Protocol[T, U]): class _StringCallback(Protocol[T, U]): def __call__( - cls, self: T, p: U, addr: int, match: YaraStringMatch + cls, self: T, p: U, addr: int, match: StringMatch ) -> Union[Config, bool, None]: ... class _RuleCallback(Protocol[T, U]): - def __call__( - cls, self: T, p: U, match: YaraRuleMatch - ) -> Union[Config, bool, None]: ... + def __call__(cls, self: T, p: U, match: RuleMatch) -> Union[Config, bool, None]: ... class _FinalCallback(Protocol[T, U]): def __call__(cls, self: T, p: U) -> Union[Config, bool, None]: ... @@ -109,7 +107,7 @@ class Extractor: def log(self) -> logging.Logger: ... def _get_methods(self, method_type: Type[V]) -> Iterator[Tuple[str, V]]: ... def on_error(self, exc: Exception, method_name: str) -> None: ... - def handle_match(self, p: ProcessMemory, match: YaraRuleMatch) -> None: ... + def handle_match(self, p: ProcessMemory, match: RuleMatch) -> None: ... # Extractor method decorators @overload @staticmethod diff --git a/malduck/procmem/procmem.pyi b/malduck/procmem/procmem.pyi index 793232e..ce23ef3 100644 --- a/malduck/procmem/procmem.pyi +++ b/malduck/procmem/procmem.pyi @@ -18,7 +18,7 @@ from typing_extensions import Literal, Protocol from ..disasm import Instruction from ..extractor import ExtractManager, ExtractorModules from ..ints import IntType -from ..yara import Yara, YaraRulesetMatch, YaraRulesetOffsets +from ..yara import RulesetMatch, RulesetOffsets, Yara from .region import Region class MemoryBuffer(object): @@ -39,7 +39,7 @@ class ProcessMemoryYaraCallback(Protocol): addr: Optional[int], length: Optional[int], extended: Literal[True], - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... @overload def __call__( self, @@ -47,7 +47,7 @@ class ProcessMemoryYaraCallback(Protocol): offset: Optional[int], length: Optional[int], extended: Literal[True], - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... class ProcessMemory: f: Optional[BinaryIO] @@ -242,7 +242,7 @@ class ProcessMemory: offset: Optional[int] = None, length: Optional[int] = None, extended: Literal[False] = False, - ) -> YaraRulesetOffsets: ... + ) -> RulesetOffsets: ... # yarap(ruleset, offset, length, extended=True) @overload def yarap( @@ -251,21 +251,21 @@ class ProcessMemory: offset: Optional[int], length: Optional[int], extended: Literal[True], - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... # yarap(ruleset, extended=True) @overload - def yarap(self, ruleset: Yara, *, extended: Literal[True]) -> YaraRulesetMatch: ... + def yarap(self, ruleset: Yara, *, extended: Literal[True]) -> RulesetMatch: ... # yarap(ruleset, offset=0, extended=True) # yarap(ruleset, 0, extended=True) @overload def yarap( self, ruleset: Yara, offset: Optional[int], *, extended: Literal[True] - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... # yarap(ruleset, length=0, extended=True) @overload def yarap( self, ruleset: Yara, *, length: Optional[int], extended: Literal[True] - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... # yarav(ruleset) # yarav(ruleset, addr) # yarav(ruleset, addr, length) @@ -277,7 +277,7 @@ class ProcessMemory: addr: Optional[int] = None, length: Optional[int] = None, extended: Literal[False] = False, - ) -> YaraRulesetOffsets: ... + ) -> RulesetOffsets: ... # yarav(ruleset, addr, length, extended=True) @overload def yarav( @@ -286,21 +286,21 @@ class ProcessMemory: addr: Optional[int], length: Optional[int], extended: Literal[True], - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... # yarav(ruleset, extended=True) @overload - def yarav(self, ruleset: Yara, *, extended: Literal[True]) -> YaraRulesetMatch: ... + def yarav(self, ruleset: Yara, *, extended: Literal[True]) -> RulesetMatch: ... # yarav(ruleset, addr=0, extended=True) # yarav(ruleset, 0, extended=True) @overload def yarav( self, ruleset: Yara, addr: Optional[int], *, extended: Literal[True] - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... # yarav(ruleset, length=0, extended=True) @overload def yarav( self, ruleset: Yara, *, length: Optional[int], extended: Literal[True] - ) -> YaraRulesetMatch: ... + ) -> RulesetMatch: ... def _findbytes( self, yara_fn: ProcessMemoryYaraCallback, diff --git a/malduck/yara.py b/malduck/yara.py deleted file mode 100644 index c4ac5bc..0000000 --- a/malduck/yara.py +++ /dev/null @@ -1,342 +0,0 @@ -import enum -import json -import logging -import os -import re -import textwrap -from collections import defaultdict, namedtuple -from typing import Callable, Dict, Optional, Tuple, TypeVar - -import yara - -__all__ = [ - "Yara", - "YaraString", - "YaraRulesetMatch", - "YaraRulesetOffsets", - "YaraRuleMatch", - "YaraRuleOffsets", - "YaraStringMatch", - "YaraMatches", - "YaraMatch", -] - -log = logging.getLogger(__name__) - -T = TypeVar("T") -OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]] - -YaraRulesString = Tuple[int, str, bytes] - - -class _Mapper: - def __init__(self, elements, default=None): - self.elements = elements - self.default = default - - def keys(self): - """List of matched string identifiers""" - return self.elements.keys() - - def get(self, item): - """Get matched string offsets or default if not matched""" - return self.elements.get(item, self.default) - - def __bool__(self): - return bool(self.elements) - - def __nonzero__(self): - return self.__bool__() - - def __contains__(self, item): - return item in self.elements - - def __getitem__(self, item): - return self.elements[item] - - def __getattr__(self, item): - try: - return self[item] - except IndexError: - raise AttributeError() - - -class Yara: - """ - Represents Yara ruleset. Rules can be compiled from set of files or defined in code (single rule only). - - Most simple rule (with default identifiers left): - - .. code-block:: python - - from malduck.yara import Yara, YaraString - - Yara(strings="MALWR").match(data=b"MALWRMALWARMALWR").r.string == [0, 11] - - Example of more complex rule defined in Python: - - .. code-block:: python - - from malduck.yara import Yara, YaraString - - ruleset = Yara(name="MalwareRule", - strings={ - "xor_stub": YaraString("This program cannot", xor=True, ascii=True), - "code_ref": YaraString("E2 34 ?? C8 A? FB", type=YaraString.HEX), - "mal1": "MALWR", - "mal2": "MALRW" - }, condition="( $xor_stub and $code_ref ) or any of ($mal*)") - - # If mal1 or mal2 are matched, they are grouped into "mal" - - # Print appropriate offsets - - match = ruleset.match(data=b"MALWR MALRW") - - if match: - # ["mal1", "mal", "mal2"] - print(match.MalwareRule.keys()) - if "mal" in match.MalwareRule: - # Note: Order of offsets for grouped strings is undetermined - print("mal*", match.MalwareRule["mal"]) - - :param rule_paths: Dictionary of {"namespace": "rule_path"}. See also :py:meth:`Yara.from_dir`. - :type rule_paths: dict - :param name: Name of generated rule (default: "r") - :type name: str - :param strings: Dictionary representing set of string patterns ({"string_identifier": YaraString or plain str}) - :type strings: dict or str or :class:`YaraString` - :param condition: Yara rule condition (default: "any of them") - :type condition: str - """ - - def __init__( - self, rule_paths=None, name="r", strings=None, condition="any of them" - ): - if rule_paths: - self.rules = yara.compile(filepaths=rule_paths) - return - - if not strings: - raise ValueError("No strings specified") - - if isinstance(strings, str) or isinstance(strings, YaraString): - strings = {"string": strings} - - yara_strings = "\n ".join( - [ - f"${key} = {str(YaraString(value) if isinstance(value, str) else value)}" - for key, value in strings.items() - ] - ) - yara_source = textwrap.dedent( - f""" - rule {name} {{ - strings: - {yara_strings} - condition: - {condition} - }} - """ - ) - - self.rules = yara.compile(source=yara_source) - - @staticmethod - def from_dir(path, recursive=True, followlinks=True): - """ - Find rules (recursively) in specified path. Supported extensions: \\*.yar, \\*.yara - - :param path: Root path for searching - :type path: str - :param recursive: Search recursively (default: enabled) - :type recursive: bool - :param followlinks: Follow symbolic links (default: enabled) - :type followlinks: bool - :rtype: :class:`Yara` - """ - rule_paths: Dict[str, str] = {} - for root, _, files in os.walk(path, followlinks=followlinks): - for fname in files: - if not fname.endswith(".yar") and not fname.endswith(".yara"): - continue - ruleset_name = os.path.splitext(os.path.basename(fname))[0] - ruleset_path = os.path.join(root, fname) - if ruleset_name in rule_paths: - log.warning( - f"Yara file name collision - {rule_paths[ruleset_name]} " - f"overridden by {ruleset_path}" - ) - rule_paths[ruleset_name] = ruleset_path - if not recursive: - break - return Yara(rule_paths=rule_paths) - - def match(self, offset_mapper=None, extended=False, **kwargs): - """ - Perform matching on file or data block - - :param filepath: Path to the file to be scanned - :type filepath: str - :param data: Data to be scanned - :type data: str - :param offset_mapper: Offset mapping function. For unmapped region, should returned None. - Used by :py:meth:`malduck.procmem.ProcessMemory.yarav` - :type offset_mapper: function - :param extended: Returns extended information about matched strings and rules - :type extended: bool (optional, default False) - :rtype: :class:`malduck.yara.YaraRulesetOffsets` or :class:`malduck.yara.YaraRulesetMatches` - if extended is set to True - """ - matches = YaraRulesetMatch( - self.rules.match(**kwargs), offset_mapper=offset_mapper - ) - return YaraRulesetOffsets(matches) if not extended else matches - - -class YaraStringType(enum.IntEnum): - TEXT = 0 - HEX = 1 - REGEX = 2 - - -class YaraString: - """ - Formatter for Yara string patterns - - :param value: Pattern value - :type value: str - :param type: Pattern type (default is :py:attr:`YaraString.TEXT`) - :type type: :py:attr:`YaraString.TEXT` / :py:attr:`YaraString.HEX` / :py:attr:`YaraString.REGEX` - :param modifiers: Yara string modifier flags - """ - - TEXT = YaraStringType.TEXT - HEX = YaraStringType.HEX - REGEX = YaraStringType.REGEX - - def __init__(self, value, type=YaraStringType.TEXT, **modifiers): - self.value = value - self.type = type - self.modifiers = [k for k, v in modifiers.items() if v is True] - - def __str__(self): - if self.type == YaraStringType.TEXT: - str_value = json.dumps(self.value) - elif self.type == YaraStringType.HEX: - str_value = f"{{ {self.value} }}" - elif self.type == YaraStringType.REGEX: - str_regex = "\\/".join(self.value.split("/")) - str_value = f"/{str_regex}/" - else: - raise ValueError(f"Unknown YaraString type: {self.type}") - return str_value + "".join([" " + modifier for modifier in self.modifiers]) - - -class YaraRulesetMatch(_Mapper): - """ - Yara ruleset matches. Returned by :py:meth:`Yara.match`. - - Rules can be referenced by both attribute and index. - """ - - def __init__(self, matches, offset_mapper=None): - self._matches = matches - super().__init__(elements=self._map_matches(matches, offset_mapper)) - - def _map_matches(self, matches, offset_mapper): - mapped_matches = [ - (match, self._map_strings(match.strings, offset_mapper)) - for match in matches - ] - return { - match.rule: YaraRuleMatch( - match.rule, strings, match.meta, match.namespace, match.tags - ) - for match, strings in mapped_matches - if strings - } - - def _map_strings(self, strings, offset_mapper): - mapped_strings = defaultdict(list) - for offset, identifier, content in strings: - # Get identifier without "$" and group identifier - real_ident, group_ident = self._parse_string_identifier(identifier) - # Map offset if offset_mapper is provided - if offset_mapper is not None: - _offset = offset_mapper(offset, len(content)) - if _offset is None: - # Ignore match for unmapped region - continue - offset = _offset - # Register offset for full identifier - mapped_strings[real_ident].append( - YaraStringMatch(real_ident, offset, content) - ) - # Register offset for grouped identifier - if real_ident != group_ident: - mapped_strings[group_ident].append( - YaraStringMatch(real_ident, offset, content) - ) - return mapped_strings - - def _parse_string_identifier(self, identifier): - real_ident = identifier.lstrip("$") - # Add group identifiers ($str1, $str2 => "str") - match_ident = re.match(r"^\$(\w+?[a-zA-Z])_?(\d*)$", identifier) - group_ident = match_ident.group(1) if match_ident else real_ident - return real_ident, group_ident - - def remap(self, offset_mapper=None): - return YaraRulesetMatch(self._matches, offset_mapper=offset_mapper) - - -class YaraRulesetOffsets(_Mapper): - def __init__(self, matches): - self._matches = matches - super().__init__( - elements={k: YaraRuleOffsets(v) for k, v in matches.elements.items()} - ) - - def remap(self, offset_mapper=None): - return YaraRulesetOffsets(self._matches.remap(offset_mapper)) - - -YaraStringMatch = namedtuple("YaraStringMatch", ["identifier", "offset", "content"]) - - -class YaraRuleMatch(_Mapper): - """ - Rule matches. Returned by `YaraMatches.`. - - Strings can be referenced by both attribute and index. - """ - - def __init__(self, rule, strings, meta, namespace, tags): - self.rule = self.name = rule - self.meta = meta - self.namespace = namespace - self.tags = tags - super().__init__( - elements={k: sorted(v, key=lambda s: s.offset) for k, v in strings.items()} - ) - - def get_offsets(self, string): - return [match.offset for match in self.elements.get(string, [])] - - -class YaraRuleOffsets(_Mapper): - def __init__(self, rule_match): - self.rule = self.name = rule_match.rule - super().__init__( - { - identifier: [match.offset for match in string_matches] - for identifier, string_matches in rule_match.elements.items() - }, - default=[], - ) - - -# Legacy aliases, don't use them in new code -YaraMatches = YaraRulesetOffsets -YaraMatch = YaraRuleOffsets diff --git a/malduck/yara.pyi b/malduck/yara.pyi deleted file mode 100644 index 6d4124f..0000000 --- a/malduck/yara.pyi +++ /dev/null @@ -1,150 +0,0 @@ -import enum -from collections import namedtuple -from typing import ( - Any, - Callable, - Dict, - Generic, - Iterable, - KeysView, - List, - Optional, - Tuple, - TypeVar, - Union, - overload, -) - -from typing_extensions import Literal, Protocol - -T = TypeVar("T") -OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]] - -YaraRulesString = Tuple[int, str, bytes] - -class YaraRulesMatch(Protocol): - meta: Dict[str, str] - namespace: str - rule: str - strings: List[YaraRulesString] - tags: List[str] - -class _Mapper(Generic[T]): - elements: Dict[str, T] - default: Optional[T] - def __init__(self, elements: Dict[str, T], default: Optional[T] = None) -> None: ... - def keys(self) -> KeysView[str]: ... - def get(self, item) -> Optional[T]: ... - def __bool__(self) -> bool: ... - def __nonzero__(self) -> bool: ... - def __contains__(self, item: str) -> bool: ... - def __getitem__(self, item: str) -> T: ... - def __getattr__(self, item: str) -> T: ... - -class Yara: - rules: Any - def __init__( - self, - rule_paths: Optional[Dict[str, str]] = None, - name: str = "r", - strings: Union[ - str, "YaraString", Dict[str, Union[str, "YaraString"]], None - ] = None, - condition: str = "any of them", - ) -> None: ... - @staticmethod - def from_dir( - path: str, recursive: bool = True, followlinks: bool = True - ) -> "Yara": ... - # match(...) - # match(offset_mapper, ...) - # match(offset_mapper, extended=False, ...) - @overload - def match( - self, - offset_mapper: Optional[OffsetMapper] = None, - extended: Literal[False] = False, - **kwargs, - ) -> "YaraRulesetOffsets": ... - # match(offset_mapper, extended=True, ...) - @overload - def match( - self, offset_mapper: Optional[OffsetMapper], extended: Literal[True], **kwargs - ) -> "YaraRulesetMatch": ... - # match(extended=True, ...) - @overload - def match(self, *, extended: Literal[True], **kwargs) -> "YaraRulesetMatch": ... - -class YaraStringType(enum.IntEnum): - TEXT = 0 - HEX = 1 - REGEX = 2 - -class YaraString: - TEXT = YaraStringType.TEXT - HEX = YaraStringType.HEX - REGEX = YaraStringType.REGEX - - value: str - type: YaraStringType - modifiers: List[str] - def __init__( - self, value: str, type: YaraStringType = YaraStringType.TEXT, **modifiers: bool - ) -> None: ... - def __str__(self) -> str: ... - -class YaraRulesetMatch(_Mapper["YaraRuleMatch"]): - _matches: List[YaraRulesMatch] - def __init__( - self, - matches: List[YaraRulesMatch], - offset_mapper: Optional[OffsetMapper] = None, - ) -> None: - super().__init__(elements={}) - def _map_matches( - self, matches: List[YaraRulesMatch], offset_mapper: Optional[OffsetMapper] - ) -> Dict[str, "YaraRuleMatch"]: ... - def _map_strings( - self, strings: Iterable[YaraRulesString], offset_mapper: Optional[OffsetMapper] - ) -> Dict[str, List["YaraStringMatch"]]: ... - def _parse_string_identifier(self, identifier: str) -> Tuple[str, str]: ... - def remap( - self, offset_mapper: Optional[OffsetMapper] = None - ) -> "YaraRulesetMatch": ... - -class YaraRulesetOffsets(_Mapper["YaraRuleOffsets"]): - _matches: YaraRulesetMatch - def __init__(self, matches: YaraRulesetMatch) -> None: - super().__init__(elements={}) - def remap( - self, offset_mapper: Optional[OffsetMapper] = None - ) -> "YaraRulesetOffsets": ... - -YaraStringMatch = namedtuple("YaraStringMatch", ["identifier", "offset", "content"]) - -class YaraRuleMatch(_Mapper[List[YaraStringMatch]]): - rule: str - name: str - meta: Dict[str, str] - namespace: str - tags: List[str] - def __init__( - self, - rule: str, - strings: Dict[str, List[YaraStringMatch]], - meta: Dict[str, str], - namespace: str, - tags: List[str], - ) -> None: - super().__init__({}) - def get_offsets(self, string) -> List[int]: ... - -class YaraRuleOffsets(_Mapper[List[int]]): - rule: str - name: str - def __init__(self, rule_match: YaraRuleMatch) -> None: - super().__init__({}) - -# Legacy aliases, don't use them in new code -YaraMatches = YaraRulesetOffsets -YaraMatch = YaraRuleOffsets diff --git a/malduck/yara/__init__.py b/malduck/yara/__init__.py new file mode 100644 index 0000000..e8ebe99 --- /dev/null +++ b/malduck/yara/__init__.py @@ -0,0 +1,15 @@ +from .match import RuleMatch, RuleOffsets, RulesetMatch, RulesetOffsets, StringMatch +from .rules import YaraRule, YaraString, YaraStringType +from .yara import Yara + +__all__ = [ + "YaraRule", + "YaraString", + "YaraStringType", + "Yara", + "RulesetMatch", + "RulesetOffsets", + "RuleMatch", + "RuleOffsets", + "StringMatch", +] diff --git a/malduck/yara/mapping.py b/malduck/yara/mapping.py new file mode 100644 index 0000000..77ca3e2 --- /dev/null +++ b/malduck/yara/mapping.py @@ -0,0 +1,85 @@ +from collections import defaultdict +from typing import ( + Callable, + ItemsView, + Iterator, + KeysView, + Mapping, + Sequence, + TypeVar, + Union, + ValuesView, +) + +T = TypeVar("T") + + +def aggregate( + collection: Sequence[T], keyfunc: Callable[[T], Sequence[str]] +) -> Mapping[str, Sequence[T]]: + """ + Aggregates elements into {identifier: [elements...]} dictionary. + + keyfunc returns list of identifiers where element should be added + """ + mapping = defaultdict(list) + for el in collection: + for key in keyfunc(el): + mapping[key].append(el) + return dict(mapping) + + +V = TypeVar("V") +D = TypeVar("D") + + +class UserMapping(Mapping[str, V]): + """ + Behaves similarly to frozen dict with predefined default for "get" + + Supports getting items via getattr for compatibility with older modules + """ + + def __init__(self, elements: Mapping[str, V], default=None) -> None: + self.elements = elements + self.default = default + + def keys(self) -> KeysView[str]: + """List of matched string identifiers""" + return self.elements.keys() + + def items(self) -> ItemsView[str, V]: + return self.elements.items() + + def values(self) -> ValuesView[V]: + return self.elements.values() + + def get(self, item: str, default: Union[V, D] = None) -> Union[V, D]: + """Get matched string offsets or default if not matched""" + if default is None: + default = self.default + return self.elements.get(item, default) + + def __bool__(self) -> bool: + return bool(self.elements) + + def __nonzero__(self) -> bool: + return self.__bool__() + + def __contains__(self, item: object) -> bool: + return item in self.elements + + def __getitem__(self, item: str) -> V: + return self.elements[item] + + def __getattr__(self, item: str) -> V: + try: + return self[item] + except IndexError: + raise AttributeError() + + def __iter__(self) -> Iterator[str]: + return iter(self.elements) + + def __len__(self) -> int: + return len(self.elements) diff --git a/malduck/yara/match.py b/malduck/yara/match.py new file mode 100644 index 0000000..732d79e --- /dev/null +++ b/malduck/yara/match.py @@ -0,0 +1,124 @@ +import dataclasses +import logging +import re +from typing import Callable, List, Mapping, Optional, Sequence, Tuple, TypeVar + +from .mapping import UserMapping, aggregate + +log = logging.getLogger(__name__) + +OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]] + +YaraRulesString = Tuple[int, str, bytes] + + +@dataclasses.dataclass(frozen=True) +class StringMatch: + rule: str + identifier: str + offset: int + content: bytes + namespace: Optional[str] = None + meta: Optional[Mapping[str, str]] = None + tags: Sequence[str] = dataclasses.field(default_factory=list) + + def __post_init__(self): + # Remove $ from the beginning + # We need that hack for frozen instance + object.__setattr__(self, "identifier", self.identifier.lstrip("$")) + + def __len__(self) -> int: + return len(self.content) + + def replace_offset(self, offset: int) -> "StringMatch": + return dataclasses.replace(self, offset=offset) + + @property + def groups(self) -> Sequence[str]: + match = re.match(r"^((\w+?)_?\d*)$", self.identifier) + if match: + if match.group(1) != match.group(2): + # str1 => str1, str + return [match.group(1), match.group(2)] + else: + # str => str + return [match.group(1)] + else: + # failsafe for non-standard names + return [self.identifier] + + +# This should be typing.Self but it's available only for Python >=3.11 +Self = TypeVar("Self", bound="RulesetMatch") + + +class RulesetMatch(UserMapping): + def __init__(self, string_matches: Sequence[StringMatch]) -> None: + self._string_matches = string_matches + rules = aggregate(string_matches, lambda m: [m.rule]) + super().__init__( + { + rule: self._make_rule_match(rule, strings) + for rule, strings in rules.items() + } + ) + + @staticmethod + def _make_rule_match(rule: str, strings: Sequence[StringMatch]): + return RuleMatch( + rule=rule, + strings=aggregate(strings, lambda s: s.groups), + namespace=strings[0].namespace, + meta=strings[0].meta, + tags=strings[0].tags, + ) + + def remap(self: Self, mapper: OffsetMapper) -> Self: + mapped_offsets = [ + (match, mapper(match.offset, len(match))) for match in self._string_matches + ] + matches = [ + match.replace_offset(mapped_offset) + for match, mapped_offset in mapped_offsets + if mapped_offset is not None + ] + return self.__class__(matches) + + def get_ruleset_offsets(self) -> "RulesetOffsets": + return RulesetOffsets(self._string_matches) + + +class RulesetOffsets(RulesetMatch): + @staticmethod + def _make_rule_match(rule: str, strings: Sequence[StringMatch]) -> "RuleOffsets": + return RuleOffsets(rule=rule, strings=aggregate(strings, lambda s: s.groups)) + + +class RuleMatch(UserMapping): + def __init__( + self, + rule: str, + strings: Mapping[str, Sequence[StringMatch]], + meta: Optional[Mapping[str, str]], + namespace: Optional[str], + tags: Optional[Sequence[str]], + ) -> None: + self.rule = self.name = rule + self.meta = meta + self.namespace = namespace + self.tags = tags + super().__init__( + {k: sorted(v, key=lambda s: s.offset) for k, v in strings.items()}, + default=[], + ) + + def get_offsets(self, string: str) -> List[int]: + return [match.offset for match in self.elements.get(string, [])] + + +class RuleOffsets(UserMapping): + def __init__(self, rule: str, strings: Mapping[str, Sequence[StringMatch]]) -> None: + self.rule = self.name = rule + super().__init__( + {k: sorted([s.offset for s in v]) for k, v in strings.items()}, default=[] + ) diff --git a/malduck/yara/rules.py b/malduck/yara/rules.py new file mode 100644 index 0000000..fe8f951 --- /dev/null +++ b/malduck/yara/rules.py @@ -0,0 +1,90 @@ +import enum +import json +import logging +import textwrap +from typing import Dict, List, Union + +log = logging.getLogger(__name__) + + +class YaraStringType(enum.IntEnum): + TEXT = 0 + HEX = 1 + REGEX = 2 + + +class YaraString: + """ + Formatter for Yara string patterns + + :param value: Pattern value + :type value: str + :param type: Pattern type (default is :py:attr:`YaraString.TEXT`) + :type type: :py:attr:`YaraString.TEXT` / :py:attr:`YaraString.HEX` / :py:attr:`YaraString.REGEX` + :param modifiers: Yara string modifier flags + """ + + TEXT = YaraStringType.TEXT + HEX = YaraStringType.HEX + REGEX = YaraStringType.REGEX + + def __init__( + self, value: str, type: YaraStringType = YaraStringType.TEXT, **modifiers: bool + ) -> None: + self.value: str = value + self.type: YaraStringType = type + self.modifiers: List[str] = [k for k, v in modifiers.items() if v is True] + + def __str__(self) -> str: + if self.type == YaraStringType.TEXT: + str_value = json.dumps(self.value) + elif self.type == YaraStringType.HEX: + str_value = f"{{ {self.value} }}" + elif self.type == YaraStringType.REGEX: + str_regex = "\\/".join(self.value.split("/")) + str_value = f"/{str_regex}/" + else: + raise ValueError(f"Unknown YaraString type: {self.type}") + return str_value + "".join([" " + modifier for modifier in self.modifiers]) + + +class YaraRule: + """ + Formatter for Yara rule + + :param name: Rule name + :param strings: Single string or mapping of strings + :param condition: Rule condition + """ + + def __init__( + self, + name: str = "r", + strings: Union[str, YaraString, Dict[str, Union[str, YaraString]]] = "", + condition: str = "any of them", + ) -> None: + if not strings: + raise ValueError("No strings specified") + + if isinstance(strings, str) or isinstance(strings, YaraString): + strings = {"string": strings} + + yara_strings = "\n ".join( + [ + f"${key} = {str(YaraString(value) if isinstance(value, str) else value)}" + for key, value in strings.items() + ] + ) + self.source = textwrap.dedent( + f""" + rule {name} {{ + strings: + {yara_strings} + condition: + {condition} + }} + """ + ) + + def __str__(self) -> str: + return self.source diff --git a/malduck/yara/yara.py b/malduck/yara/yara.py new file mode 100644 index 0000000..80b7cff --- /dev/null +++ b/malduck/yara/yara.py @@ -0,0 +1,165 @@ +import logging +import os +from typing import Callable, Dict, Optional + +import yara + +from .match import RulesetMatch, RulesetOffsets, StringMatch +from .rules import YaraRule + +log = logging.getLogger(__name__) +OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]] + + +class Yara: + """ + Represents Yara ruleset. Rules can be compiled from set of files or defined in code. + + Most simple rule (with default identifiers left): + + .. code-block:: python + + from malduck.yara import Yara, YaraString + + Yara(strings="MALWR").match(data=b"MALWRMALWARMALWR").r.string == [0, 11] + + Example of more complex rule defined in Python: + + .. code-block:: python + + from malduck.yara import Yara, YaraString + + ruleset = Yara(name="MalwareRule", + strings={ + "xor_stub": YaraString("This program cannot", xor=True, ascii=True), + "code_ref": YaraString("E2 34 ?? C8 A? FB", type=YaraString.HEX), + "mal1": "MALWR", + "mal2": "MALRW" + }, condition="( $xor_stub and $code_ref ) or any of ($mal*)") + + # If mal1 or mal2 are matched, they are grouped into "mal" + + # Print appropriate offsets + + match = ruleset.match(data=b"MALWR MALRW") + + if match: + # ["mal1", "mal", "mal2"] + print(match.MalwareRule.keys()) + if "mal" in match.MalwareRule: + # Note: Order of offsets for grouped strings is undetermined + print("mal*", match.MalwareRule["mal"]) + + :param rule_paths: Dictionary of {"namespace": "rule_path"}. See also :py:meth:`Yara.from_dir`. + :type rule_paths: dict + :param rules: Dictionary of {"namespace": YaraRule} object. + :type rules: dict + :param compiled_rules: List of precompiled yara.Rules objects + :type compiled_rules: list + :param name: Name of generated rule (default: "r") + :type name: str + :param strings: Dictionary representing set of string patterns ({"string_identifier": YaraString or plain str}) + :type strings: dict or str or :class:`YaraString` + :param condition: Yara rule condition (default: "any of them") + :type condition: str + """ + + def __init__( + self, + rule_paths=None, + rules=None, + compiled_rules=None, + name="r", + strings=None, + condition="any of them", + ) -> None: + self.rulesets = compiled_rules or [] + + if rule_paths: + self.rulesets.append(yara.compile(filepaths=rule_paths)) + + _source_rules = rules or [] + if strings is not None: + _source_rules.append( + YaraRule(name=name, strings=strings, condition=condition) + ) + + if _source_rules: + self.rulesets.append( + yara.compile(source="\n".join(map(str, _source_rules))) + ) + + @staticmethod + def from_dir(path, recursive=True, followlinks=True): + """ + Find rules (recursively) in specified path. Supported extensions: \\*.yar, \\*.yara + + :param path: Root path for searching + :type path: str + :param recursive: Search recursively (default: enabled) + :type recursive: bool + :param followlinks: Follow symbolic links (default: enabled) + :type followlinks: bool + :rtype: :class:`Yara` + """ + rule_paths: Dict[str, str] = {} + for root, _, files in os.walk(path, followlinks=followlinks): + for fname in files: + if not fname.endswith(".yar") and not fname.endswith(".yara"): + continue + ruleset_name = os.path.splitext(os.path.basename(fname))[0] + ruleset_path = os.path.join(root, fname) + if ruleset_name in rule_paths: + log.warning( + f"Yara file name collision - {rule_paths[ruleset_name]} " + f"overridden by {ruleset_path}" + ) + rule_paths[ruleset_name] = ruleset_path + if not recursive: + break + return Yara(rule_paths=rule_paths) + + def yara_match(self, **kwargs): + return [match for rules in self.rulesets for match in rules.match(**kwargs)] + + def match( + self, + offset_mapper: Optional[OffsetMapper] = None, + extended: bool = False, + **kwargs, + ): + """ + Perform matching on file or data block + + :param filepath: Path to the file to be scanned + :type filepath: str + :param data: Data to be scanned + :type data: str + :param offset_mapper: Offset mapping function. For unmapped region, should returned None. + Used by :py:meth:`malduck.procmem.ProcessMemory.yarav` + :type offset_mapper: function + :param extended: Returns extended information about matched strings and rules + :type extended: bool (optional, default False) + :rtype: :class:`malduck.yara.YaraRulesetOffsets` or :class:`malduck.yara.YaraRulesetMatches` + if extended is set to True + """ + yara_matches = self.yara_match(**kwargs) + match_class = RulesetMatch if extended else RulesetOffsets + matches = match_class( + [ + StringMatch( + rule=m.rule, + identifier=identifier, + offset=offset, + content=content, + namespace=m.namespace, + meta=m.meta, + tags=m.tags, + ) + for m in yara_matches + for offset, identifier, content in m.strings + ] + ) + if offset_mapper: + matches = matches.remap(offset_mapper) + return matches diff --git a/malduck/yara/yara.pyi b/malduck/yara/yara.pyi new file mode 100644 index 0000000..b77d329 --- /dev/null +++ b/malduck/yara/yara.pyi @@ -0,0 +1,49 @@ +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, overload + +import yara + +from .match import RulesetMatch, RulesetOffsets +from .rules import YaraRule, YaraString + +OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]] + +YaraRulesString = Tuple[int, str, bytes] + +class Yara: + rules: Any + rulesets: List[yara.Rules] + + def __init__( + self, + rule_paths: Optional[Dict[str, str]] = None, + rules: Optional[List[Union[str, YaraRule]]] = None, + compiled_rules: Optional[List[yara.Rules]] = None, + name: str = "r", + strings: Optional[ + Union[str, YaraString, Dict[str, Union[str, YaraString]]] + ] = None, + condition: str = "any of them", + ) -> None: ... + @staticmethod + def from_dir( + path: str, recursive: bool = True, followlinks: bool = True + ) -> "Yara": ... + def yara_match(self, **kwargs: Any) -> List[yara.Match]: ... + # match(...) + # match(offset_mapper, ...) + # match(offset_mapper, extended=False, ...) + @overload + def match( + self, + offset_mapper: Optional[OffsetMapper] = None, + extended: Literal[False] = False, + **kwargs, + ) -> "RulesetOffsets": ... + # match(offset_mapper, extended=True, ...) + @overload + def match( + self, offset_mapper: Optional[OffsetMapper], extended: Literal[True], **kwargs + ) -> "RulesetMatch": ... + # match(extended=True, ...) + @overload + def match(self, *, extended: Literal[True], **kwargs) -> "RulesetMatch": ... diff --git a/setup.cfg b/setup.cfg index 29ffb55..0a9784e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,3 +23,6 @@ ignore_missing_imports = True [mypy-ida_bytes.*] ignore_missing_imports = True + +[mypy-yara.*] +ignore_missing_imports = True diff --git a/tests/test_disasm.py b/tests/test_disasm.py index fdafaa5..cf06b2f 100644 --- a/tests/test_disasm.py +++ b/tests/test_disasm.py @@ -23,7 +23,7 @@ class TestDisasm(object): b"\x0f\xb6\x05\x00\x00\x04\x00", )) - def setup(self): + def setup_method(self): self.insns = list(disasm(self.streams, 0x1000)) def test_insns(self): @@ -90,7 +90,7 @@ class TestDisasm64bit(object): b"\x48\x8d\x04\x85\x33\x33\x33\x00", )) - def setup(self): + def setup_method(self): self.insns = list(disasm(self.streams, 0x1000, x64=True)) def test_insns(self):