diff --git a/floss/api_hooks.py b/floss/api_hooks.py index 44a819bd0..0c549eb28 100644 --- a/floss/api_hooks.py +++ b/floss/api_hooks.py @@ -6,8 +6,8 @@ import envi import viv_utils.emulator_drivers -import floss.logging_ import floss.utils as fu +import floss.logging_ from floss.const import MOD_NAME logger = floss.logging_.getLogger(__name__) @@ -139,6 +139,7 @@ def _fix_return(self, emu, return_address, return_addresses): class DemoHook: """A demo hook to demonstrate the API of the hook classes.""" + def __call__( self, emu: viv_utils.emulator_drivers.EmulatorDriver, @@ -213,9 +214,7 @@ def _allocate_mem(self, emu, size): size = fu.round_(size, 16) size = fu.get_max_size(size, MAX_MEMORY_ALLOC_SIZE) logger.trace("mapping 0x%x bytes at 0x%x", size, va) - emu.addMemoryMap( - va, envi.memory.MM_RWX, "[heap allocation]", b"\x00" * (size + 4) - ) + emu.addMemoryMap(va, envi.memory.MM_RWX, "[heap allocation]", b"\x00" * (size + 4)) self._heap_addr += size return va @@ -224,9 +223,7 @@ def __call__(self, emu, api, argv): size = argv[0] elif fu.contains_funcname(api, ("VirtualAlloc", "LocalAlloc", "GlobalAlloc")): size = argv[1] - elif fu.contains_funcname( - api, ("VirtualAllocEx", "HeapAlloc", "RtlAllocateHeap") - ): + elif fu.contains_funcname(api, ("VirtualAllocEx", "HeapAlloc", "RtlAllocateHeap")): size = argv[2] elif fu.contains_funcname(api, ("calloc", "calloc_base")): # size, count @@ -245,27 +242,22 @@ class CppNewObjectHook(MemoryAllocationHook): - C++ new operator Thanks to @BenjaminSoelberg """ + ZNWJ = "Znwj" # operator new(unsigned int) ZNAJ = "Znaj" # operator new[](unsigned int) YAPAXI_Z_32 = "??2@YAPAXI@Z" # void * __cdecl operator new(unsigned int) - YAPEAX_K_Z_64 = ( - "??2@YAPEAX_K@Z" # void * __ptr64 __cdecl operator new(unsigned __int64) - ) + YAPEAX_K_Z_64 = "??2@YAPEAX_K@Z" # void * __ptr64 __cdecl operator new(unsigned __int64) DEFAULT_SIZE = 0x1000 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __call__(self, emu, api, argv): - if fu.contains_funcname( - api, (self.ZNWJ, self.ZNWJ, self.YAPAXI_Z_32, self.YAPEAX_K_Z_64) - ): + if fu.contains_funcname(api, (self.ZNWJ, self.ZNWJ, self.YAPAXI_Z_32, self.YAPEAX_K_Z_64)): if argv and len(argv) > 0: size = argv[0] else: - size = ( - self.DEFAULT_SIZE - ) # will allocate a default block size if vivisect failed to extract argv + size = self.DEFAULT_SIZE # will allocate a default block size if vivisect failed to extract argv va = self._allocate_mem(emu, size) fu.call_return(emu, api, argv, va) @@ -274,10 +266,9 @@ def __call__(self, emu, api, argv): class MemoryFreeHook: """Hook calls to memory free functions: free memory and return success.""" + def __call__(self, emu, api, argv): - if fu.contains_funcname( - api, ("free", "free_base", "VirtualFree", "HeapFree", "RtlFreeHeap") - ): + if fu.contains_funcname(api, ("free", "free_base", "VirtualFree", "HeapFree", "RtlFreeHeap")): # If the function succeeds, the return value is nonzero. fu.call_return(emu, api, argv, 1) return True @@ -285,6 +276,7 @@ def __call__(self, emu, api, argv): class MemcpyHook: """Hook calls to memory copy functions: copy memory from source to destination.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("memcpy", "memmove")): dst, src, count = argv @@ -323,6 +315,7 @@ def __call__(self, emu, api, argv): class StrncmpHook: """Hook calls to string compare functions: compare two strings.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("strncmp",)): s1va, s2va, num = argv @@ -349,6 +342,7 @@ def cmp(a, b): class MemchrHook: """Hook calls to memchr: search for a character in a memory block.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("memchr",)): ptr, value, num = argv @@ -365,6 +359,7 @@ def __call__(self, emu, api, argv): class MemsetHook: """Hook calls to memset: fill memory with a constant byte.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("memset",)): ptr, value, num = argv @@ -377,6 +372,7 @@ def __call__(self, emu, api, argv): class PrintfHook: """Hook calls to printf: write formatted data to stdout.""" + # TODO disabled for now as incomplete (need to implement string format) and could result in FP strings as is def __call__(self, emu, api, argv): # TODO vfprintf, vfwprintf, vfprintf_s, vfwprintf_s, vsnprintf, vsnwprintf, etc. @@ -390,6 +386,7 @@ def __call__(self, emu, api, argv): class ExitExceptionHook: """Hook calls to exit and raise exception.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("ExitProcess", "RaiseException")): raise viv_utils.emulator_drivers.StopEmulation() @@ -401,6 +398,7 @@ def __call__(self, emu, api, argv): class SehPrologEpilogHook: """Hook calls to SEH prolog and epilog functions and return success.""" + def __call__(self, emu, api, argv): if fu.contains_funcname( api, @@ -419,10 +417,9 @@ def __call__(self, emu, api, argv): class SecurityCheckCookieHook: """Hook calls to __security_check_cookie and return success.""" + def __call__(self, emu, api, argv): - if fu.contains_funcname( - api, ("__security_check_cookie", "@__security_check_cookie@4") - ): + if fu.contains_funcname(api, ("__security_check_cookie", "@__security_check_cookie@4")): # nop fu.call_return(emu, api, argv, 0) return True @@ -430,6 +427,7 @@ def __call__(self, emu, api, argv): class GetLastErrorHook: """Hook calls to GetLastError and return success.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("GetLastError",)): # always assuming success @@ -440,6 +438,7 @@ def __call__(self, emu, api, argv): class GetCurrentProcessHook: """Hook calls to GetCurrentProcess and return a fake process handle.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("GetCurrentProcess",)): fu.call_return(emu, api, argv, CURRENT_PROCESS_ID) @@ -448,6 +447,7 @@ def __call__(self, emu, api, argv): class CriticalSectionHook: """Hook calls to InitializeCriticalSection and return a fake critical section handle.""" + def __call__(self, emu, api, argv): if fu.contains_funcname(api, ("InitializeCriticalSection",)): (hsection,) = argv diff --git a/floss/decoding_manager.py b/floss/decoding_manager.py index a9a78ff6e..1c5841077 100644 --- a/floss/decoding_manager.py +++ b/floss/decoding_manager.py @@ -1,15 +1,15 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. -from dataclasses import dataclass from typing import List, Tuple +from dataclasses import dataclass -import envi.memory import viv_utils +import envi.memory import viv_utils.emulator_drivers from envi import Emulator -import floss.logging_ import floss.utils +import floss.logging_ from . import api_hooks from .const import DS_MAX_ADDRESS_REVISITS_EMULATION @@ -72,6 +72,7 @@ class Snapshot: sp: the stack counter pc: the instruction pointer """ + memory: Memory sp: int pc: int @@ -94,6 +95,7 @@ def get_map_size(emu): class MapsTooLargeError(Exception): """Exception raised when the emulator has mapped too much memory.""" + pass @@ -114,16 +116,18 @@ def make_snapshot(emu: Emulator) -> Snapshot: @dataclass class Delta: - """a pair of snapshots from before and after an operation. + """a pair of snapshots from before and after an operation. - Facilitates diffing the state of an emulator. + Facilitates diffing the state of an emulator. """ + pre: Snapshot post: Snapshot class DeltaCollectorHook(viv_utils.emulator_drivers.Hook): """hook that collects Deltas at each imported API call.""" + def __init__(self, pre_snap: Snapshot): super().__init__() self._pre_snap = pre_snap @@ -224,9 +228,7 @@ def emulate_function( tos_val = floss.utils.get_stack_value(emu, 0) logger.debug("%s: top of stack (return address): 0x%x", e, tos_val) except envi.exc.DivideByZero as e: - logger.debug( - "vivisect encountered an emulation error. will continue processing. %s", e - ) + logger.debug("vivisect encountered an emulation error. will continue processing. %s", e) except viv_utils.emulator_drivers.StopEmulation: pass except Exception: @@ -242,9 +244,7 @@ def emulate_function( try: deltas.append(Delta(pre_snap, make_snapshot(emu))) except MapsTooLargeError: - logger.debug( - "failed to create final snapshot, emulator mapped too much memory, skipping" - ) + logger.debug("failed to create final snapshot, emulator mapped too much memory, skipping") pass return deltas diff --git a/floss/features/extract.py b/floss/features/extract.py index e9c0389cd..c58ca6e47 100644 --- a/floss/features/extract.py +++ b/floss/features/extract.py @@ -1,20 +1,29 @@ # Copyright (C) 2021 Mandiant, Inc. All Rights Reserved. -from typing import Any, Callable, Iterator, Tuple +from typing import Any, Tuple, Callable, Iterator import envi import networkx -import viv_utils import vivisect -from envi.archs.i386.opconst import (INS_CALL, INS_MOV, INS_ROL, INS_ROR, - INS_SHL, INS_SHR, INS_XOR) +import viv_utils from networkx import strongly_connected_components from viv_utils import BasicBlock +from envi.archs.i386.opconst import INS_MOV, INS_ROL, INS_ROR, INS_SHL, INS_SHR, INS_XOR, INS_CALL import floss.logging_ from floss.const import TS_TIGHT_FUNCTION_MAX_BLOCKS -from floss.features.features import (BlockCount, CallsTo, KindaTightLoop, Loop, - Mov, Nzxor, NzxorLoop, NzxorTightLoop, - Shift, TightFunction, TightLoop) +from floss.features.features import ( + Mov, + Loop, + Nzxor, + Shift, + CallsTo, + NzxorLoop, + TightLoop, + BlockCount, + TightFunction, + KindaTightLoop, + NzxorTightLoop, +) # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features @@ -77,9 +86,7 @@ def is_security_cookie(f, bb, insn) -> bool: return True # ... or within last bytes (instructions) before a return - elif bb.instructions[-1].isReturn() and insn.va > ( - bb.va + bb.size - SECURITY_COOKIE_BYTES_DELTA - ): + elif bb.instructions[-1].isReturn() and insn.va > (bb.va + bb.size - SECURITY_COOKIE_BYTES_DELTA): return True return False @@ -149,9 +156,7 @@ def extract_function_calls_to(f): Returns: An iterator over CallsTo features, each representing a call made from the given function. """ - yield CallsTo( - f.vw, [x[0] for x in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE)] - ) + yield CallsTo(f.vw, [x[0] for x in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE)]) def extract_function_kinda_tight_loop(f): @@ -342,9 +347,7 @@ def abstract_nzxor_loop(features): Returns: An iterator over NzxorLoop features for each identified pattern. """ - if any(isinstance(f, Nzxor) for f in features) and any( - isinstance(f, Loop) for f in features - ): + if any(isinstance(f, Nzxor) for f in features) and any(isinstance(f, Loop) for f in features): yield NzxorLoop() diff --git a/floss/features/features.py b/floss/features/features.py index 1aff1cd34..00ea44dc4 100644 --- a/floss/features/features.py +++ b/floss/features/features.py @@ -16,6 +16,7 @@ class Feature: name (str): Automatically derived from the class name. value: The specific value of the feature being analyzed. """ + def __init__(self, value): """Initializes the Feature instance. @@ -56,7 +57,9 @@ def weighted_score(self): return self.weight * self.score() def __str__(self): - return f"{self.name.ljust(20)} = {self.value} (score: {self.score():.2f}, weighted: {self.weighted_score():.2f})" + return ( + f"{self.name.ljust(20)} = {self.value} (score: {self.score():.2f}, weighted: {self.weighted_score():.2f})" + ) def __repr__(self): return str(self) @@ -67,6 +70,7 @@ class BlockCount(Feature): Inherits from Feature. """ + weight = LOW def __init__(self, block_count): @@ -131,6 +135,7 @@ class Arguments(Feature): Attributes: weight (float): Importance of the argument count, predefined as LOW. """ + weight = LOW def __init__(self, args): @@ -158,6 +163,7 @@ class TightLoop(Feature): Attributes: weight (float): Importance of this feature, predefined as HIGH. """ + # basic block (BB) that jumps to itself weight = HIGH @@ -178,12 +184,14 @@ def score(self): class KindaTightLoop(TightLoop): """Identifies a tight loop within a function, but with an intermediate BB.""" + # BB that jumps to itself via one intermediate BB pass class TightFunction(Feature): """A feature representing a tight function, indicating high importance.""" + # function that basically just wraps a tight loop weight = SEVERE @@ -240,6 +248,7 @@ class CallsTo(Feature): vw: The vivisect workspace instance for analysis. locations (list): A list of locations (addresses) where calls are made. """ + weight = MEDIUM max_calls_to = None @@ -271,6 +280,7 @@ class Loop(Feature): Args: comp: A collection representing the loop's components. """ + weight = MEDIUM def __init__(self, comp): @@ -289,6 +299,7 @@ class NzxorTightLoop(Feature): Attributes: weight (float): The severity of this feature, set to SEVERE. """ + weight = SEVERE def __init__(self): @@ -309,6 +320,7 @@ class NzxorLoop(Feature): Attributes: weight (float): The severity of this feature, also set to SEVERE. """ + weight = SEVERE def __init__(self): diff --git a/floss/function_argument_getter.py b/floss/function_argument_getter.py index 22e446e16..db1257c2b 100644 --- a/floss/function_argument_getter.py +++ b/floss/function_argument_getter.py @@ -1,21 +1,19 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. import contextlib +from typing import Set, List from collections import namedtuple -from typing import List, Set import envi +import vivisect import viv_utils import viv_utils.emulator_drivers -import vivisect -import floss.api_hooks -import floss.logging_ import floss.utils +import floss.logging_ +import floss.api_hooks -FunctionContext = namedtuple( - "FunctionContext", ["emu_snap", "return_address", "decoded_at_va"] -) +FunctionContext = namedtuple("FunctionContext", ["emu_snap", "return_address", "decoded_at_va"]) logger = floss.logging_.getLogger(__name__) @@ -31,7 +29,7 @@ def __init__(self, call_site_va: int): def prehook(self, emu, op, pc): """collect function contexts at call sites - + Args: emu: The emulator. op: The operation. @@ -41,9 +39,7 @@ def prehook(self, emu, op, pc): if pc == self.call_site_va: # strictly calls here, return address should always be next instruction return_address = pc + len(op) - self.function_contexts.append( - FunctionContext(emu.getEmuSnap(), return_address, pc) - ) + self.function_contexts.append(FunctionContext(emu.getEmuSnap(), return_address, pc)) def get_contexts(self) -> List[FunctionContext]: """return the collected function contexts""" @@ -92,23 +88,21 @@ def extract_decoding_contexts( for caller_va in get_caller_vas(vw, decoder_fva): contexts.extend(get_contexts_via_monitor(driver, caller_va, decoder_fva, index)) - logger.trace( - "Got %d function contexts for function at 0x%08x.", len(contexts), decoder_fva - ) + logger.trace("Got %d function contexts for function at 0x%08x.", len(contexts), decoder_fva) return contexts def get_caller_vas(vw, fva) -> Set[int]: """Finds the virtual addresses of functions that call a specified function. - Analyzes a workspace to identify instructions that call the function at the provided virtual address (`fva`). Handles filtering of non-call instructions and recursive calls. + Analyzes a workspace to identify instructions that call the function at the provided virtual address (`fva`). Handles filtering of non-call instructions and recursive calls. - Args: - vw: A Vivisect workspace object. - fva: The virtual address of the function being analyzed. + Args: + vw: A Vivisect workspace object. + fva: The virtual address of the function being analyzed. - Returns: - Set[int]: A set of virtual addresses representing the callers of the function. + Returns: + Set[int]: A set of virtual addresses representing the callers of the function. """ caller_vas = set() for caller_va in vw.getCallers(fva): @@ -136,9 +130,7 @@ def is_call(vw: vivisect.VivWorkspace, va: int) -> bool: try: op = vw.parseOpcode(va) except (envi.UnsupportedInstruction, envi.InvalidInstruction) as e: - logger.trace( - " not a call instruction: failed to decode instruction: %s", e.message - ) + logger.trace(" not a call instruction: failed to decode instruction: %s", e.message) return False if op.iflags & envi.IF_CALL: @@ -148,9 +140,7 @@ def is_call(vw: vivisect.VivWorkspace, va: int) -> bool: return False -def get_contexts_via_monitor( - driver, caller_va, decoder_fva: int, index: viv_utils.InstructionFunctionIndex -): +def get_contexts_via_monitor(driver, caller_va, decoder_fva: int, index: viv_utils.InstructionFunctionIndex): """Collects function call context information via dynamic monitoring. This function sets up a monitor to intercept calls to a target function (`decoder_fva`) made from within a caller function (`caller_va`). It achieves this by emulating the caller function and collecting data about the arguments passed to the target function. diff --git a/floss/identify.py b/floss/identify.py index 047abc075..94ed9b6ed 100644 --- a/floss/identify.py +++ b/floss/identify.py @@ -1,9 +1,9 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. -import collections import copy import operator -from typing import DefaultDict, Dict, List, Tuple +import collections +from typing import Dict, List, Tuple, DefaultDict import tqdm import viv_utils @@ -11,13 +11,14 @@ from tqdm.contrib.logging import logging_redirect_tqdm import floss.logging_ -from floss.features.extract import (abstract_features, - extract_basic_block_features, - extract_function_features, - extract_insn_features) -from floss.features.features import (Arguments, BlockCount, InstructionCount, - TightFunction) from floss.utils import is_thunk_function, redirecting_print_to_tqdm +from floss.features.extract import ( + abstract_features, + extract_insn_features, + extract_function_features, + extract_basic_block_features, +) +from floss.features.features import Arguments, BlockCount, TightFunction, InstructionCount logger = floss.logging_.getLogger(__name__) @@ -108,8 +109,7 @@ def get_function_score_weighted(features): float: The weighted score of the function. """ return round( - sum(feature.weighted_score() for feature in features) - / sum(feature.weight for feature in features), + sum(feature.weighted_score() for feature in features) / sum(feature.weight for feature in features), 3, ) @@ -142,9 +142,7 @@ def get_tight_function_fvas(decoding_function_features) -> List[int]: """ tight_function_fvas = list() for fva, function_data in decoding_function_features.items(): - if any( - filter(lambda f: isinstance(f, TightFunction), function_data["features"]) - ): + if any(filter(lambda f: isinstance(f, TightFunction), function_data["features"])): tight_function_fvas.append(fva) return tight_function_fvas @@ -220,17 +218,13 @@ def get_functions_with_features(functions, features) -> Dict[int, List]: """ functions_by_features = dict() for fva, function_data in functions.items(): - func_features = list( - filter(lambda f: isinstance(f, features), function_data["features"]) - ) + func_features = list(filter(lambda f: isinstance(f, features), function_data["features"])) if func_features: functions_by_features[fva] = func_features return functions_by_features -def find_decoding_function_features( - vw, functions, disable_progress=False -) -> Tuple[Dict[int, Dict], Dict[int, str]]: +def find_decoding_function_features(vw, functions, disable_progress=False) -> Tuple[Dict[int, Dict], Dict[int, str]]: """Identifies decoding function features from a set of functions. Args: @@ -281,9 +275,7 @@ def find_decoding_function_features( n_libs = len(library_functions) percentage = 100 * (n_libs / n_funcs) if isinstance(pb, tqdm.tqdm): - pb.set_postfix_str( - "skipped %d library functions (%d%%)" % (n_libs, percentage) - ) + pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage)) continue f = viv_utils.Function(vw, function_address) @@ -295,15 +287,9 @@ def find_decoding_function_features( } # meta data features - function_data["features"].append( - BlockCount(function_data["meta"].get("block_count")) - ) - function_data["features"].append( - InstructionCount(function_data["meta"].get("instruction_count")) - ) - function_data["features"].append( - Arguments(function_data["meta"].get("api", []).get("arguments")) - ) + function_data["features"].append(BlockCount(function_data["meta"].get("block_count"))) + function_data["features"].append(InstructionCount(function_data["meta"].get("instruction_count"))) + function_data["features"].append(Arguments(function_data["meta"].get("api", []).get("arguments"))) for feature in extract_function_features(f): function_data["features"].append(feature) @@ -319,9 +305,7 @@ def find_decoding_function_features( for feature in abstract_features(function_data["features"]): function_data["features"].append(feature) - function_data["score"] = get_function_score_weighted( - function_data["features"] - ) + function_data["score"] = get_function_score_weighted(function_data["features"]) logger.debug( "analyzed function 0x%x - total score: %.3f", diff --git a/floss/language/go/coverage.py b/floss/language/go/coverage.py index 716f8dd52..deb78171c 100644 --- a/floss/language/go/coverage.py +++ b/floss/language/go/coverage.py @@ -1,16 +1,16 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -import argparse +import sys import logging import pathlib -import sys +import argparse from typing import List import pefile -from floss.language.go.extract import extract_go_strings -from floss.language.utils import get_extract_stats -from floss.results import StaticString, StringEncoding from floss.utils import get_static_strings +from floss.results import StaticString, StringEncoding +from floss.language.utils import get_extract_stats +from floss.language.go.extract import extract_go_strings logger = logging.getLogger(__name__) @@ -30,9 +30,7 @@ def main(): help="minimum string length", ) logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/floss/language/go/extract.py b/floss/language/go/extract.py index f59854b12..c687c30b5 100644 --- a/floss/language/go/extract.py +++ b/floss/language/go/extract.py @@ -1,24 +1,23 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -import argparse +import re +import sys import array +import struct import logging import pathlib -import re -import struct -import sys -from dataclasses import dataclass -from itertools import chain +import argparse +from typing import List, Tuple, Iterable, Optional from pathlib import Path -from typing import Iterable, List, Optional, Tuple +from itertools import chain +from dataclasses import dataclass import pefile from typing_extensions import TypeAlias import floss.utils -from floss.language.utils import (StructString, find_lea_xrefs, - get_struct_string_candidates) from floss.results import StaticString, StringEncoding +from floss.language.utils import StructString, find_lea_xrefs, get_struct_string_candidates logger = logging.getLogger(__name__) @@ -95,9 +94,7 @@ def find_amd64_stackstrings(section_data, offset, min_length): b"\x48\xba(........)|\x48\xb8(........)|\x81\x78\x08(....)|\x81\x79\x08(....)|\x66\x81\x78\x0c(..)|\x66\x81\x79\x0c(..)|\x80\x78\x0e(.)|\x80\x79\x0e(.)" ) - yield from find_stack_strings_with_regex( - extract_stackstring_pattern, section_data, offset, min_length - ) + yield from find_stack_strings_with_regex(extract_stackstring_pattern, section_data, offset, min_length) def find_i386_stackstrings(section_data, offset, min_length): @@ -125,9 +122,7 @@ def find_i386_stackstrings(section_data, offset, min_length): re.DOTALL, ) - yield from find_stack_strings_with_regex( - extract_stackstring_pattern, section_data, offset, min_length - ) + yield from find_stack_strings_with_regex(extract_stackstring_pattern, section_data, offset, min_length) def get_stackstrings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]: @@ -245,9 +240,7 @@ def read_struct_string(pe: pefile.PE, instance: StructString) -> str: return s -def find_string_blob_range( - pe: pefile.PE, struct_strings: List[StructString] -) -> Tuple[VA, VA]: +def find_string_blob_range(pe: pefile.PE, struct_strings: List[StructString]) -> Tuple[VA, VA]: """find the range of the string blob, as loaded in memory. the current algorithm relies on the fact that the Go compiler stores @@ -288,9 +281,7 @@ def find_string_blob_range( struct_strings.sort(key=lambda s: s.address) - run_start, run_end = find_longest_monotonically_increasing_run( - list(map(lambda s: s.length, struct_strings)) - ) + run_start, run_end = find_longest_monotonically_increasing_run(list(map(lambda s: s.length, struct_strings))) # pick the mid string, so that we avoid any junk data on the edges of the string blob run_mid = (run_start + run_end) // 2 @@ -298,9 +289,7 @@ def find_string_blob_range( s = read_struct_string(pe, instance) assert s is not None - logger.debug( - "string blob: struct string instance: 0x%x: %s...", instance.address, s[:16] - ) + logger.debug("string blob: struct string instance: 0x%x: %s...", instance.address, s[:16]) instance_rva = instance.address - image_base section = pe.get_section_by_rva(instance_rva) @@ -358,9 +347,7 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] image_base = pe.OPTIONAL_HEADER.ImageBase with floss.utils.timing("find struct string candidates"): - struct_strings = list( - sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address) - ) + struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) if not struct_strings: logger.warning( "Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported." @@ -369,9 +356,7 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] with floss.utils.timing("find string blob"): try: - string_blob_start, string_blob_end = find_string_blob_range( - pe, struct_strings - ) + string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings) except ValueError: logger.warning( "Failed to find string blob range: Is this a Go binary? If so, the Go version may be unsupported." @@ -428,14 +413,10 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString] # 0x4aabed: -thread limit # # we probably missed the string: " procedure in " - logger.warning( - "probably missed a string blob string ending at: 0x%x", start - 1 - ) + logger.warning("probably missed a string blob string ending at: 0x%x", start - 1) try: - string = StaticString.from_utf8( - sbuf, pe.get_offset_from_rva(start - image_base), min_length - ) + string = StaticString.from_utf8(sbuf, pe.get_offset_from_rva(start - image_base), min_length) yield string except ValueError: pass @@ -494,9 +475,7 @@ def extract_go_strings(sample, min_length) -> List[StaticString]: return go_strings -def get_static_strings_from_blob_range( - sample: pathlib.Path, static_strings: List[StaticString] -) -> List[StaticString]: +def get_static_strings_from_blob_range(sample: pathlib.Path, static_strings: List[StaticString]) -> List[StaticString]: """Filters a list of StaticString objects to include only those within the Go string blob. This function assumes the string blob has already been located within the PE file. @@ -510,9 +489,7 @@ def get_static_strings_from_blob_range( """ pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True) - struct_strings = list( - sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address) - ) + struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address)) if not struct_strings: return [] @@ -525,11 +502,7 @@ def get_static_strings_from_blob_range( string_blob_start = pe.get_offset_from_rva(string_blob_start - image_base) string_blob_end = pe.get_offset_from_rva(string_blob_end - image_base) - return list( - filter( - lambda s: string_blob_start <= s.offset < string_blob_end, static_strings - ) - ) + return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings)) def main(argv=None): @@ -554,9 +527,7 @@ def main(argv=None): logging.basicConfig(level=logging.DEBUG) - go_strings = sorted( - extract_go_strings(args.path, args.min_length), key=lambda s: s.offset - ) + go_strings = sorted(extract_go_strings(args.path, args.min_length), key=lambda s: s.offset) for string in go_strings: print(f"{string.offset:#x}: {string.string}") diff --git a/floss/language/identify.py b/floss/language/identify.py index c2b269db7..338d82304 100644 --- a/floss/language/identify.py +++ b/floss/language/identify.py @@ -2,15 +2,15 @@ import re from enum import Enum +from typing import Tuple, Iterable from pathlib import Path -from typing import Iterable, Tuple import pefile import floss.logging_ -from floss.language.rust.rust_version_database import rust_commit_hash -from floss.language.utils import get_rdata_section from floss.results import StaticString +from floss.language.utils import get_rdata_section +from floss.language.rust.rust_version_database import rust_commit_hash logger = floss.logging_.getLogger(__name__) @@ -20,6 +20,7 @@ class Language(Enum): """Enumerates programming languages that can be identified in binary samples.""" + GO = "go" RUST = "rust" DOTNET = "dotnet" @@ -27,9 +28,7 @@ class Language(Enum): DISABLED = "none" -def identify_language_and_version( - sample: Path, static_strings: Iterable[StaticString] -) -> Tuple[Language, str]: +def identify_language_and_version(sample: Path, static_strings: Iterable[StaticString]) -> Tuple[Language, str]: """Identifies the programming language and version of a given binary sample based on static strings found within. Args: @@ -96,9 +95,7 @@ def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[boo version = rust_commit_hash[matches["hash"]] return True, version else: - logger.debug( - "hash %s not found in Rust commit hash database", matches["hash"] - ) + logger.debug("hash %s not found in Rust commit hash database", matches["hash"]) return True, VERSION_UNKNOWN_OR_NA return False, VERSION_UNKNOWN_OR_NA diff --git a/floss/language/rust/coverage.py b/floss/language/rust/coverage.py index 22900dd7b..003cac872 100644 --- a/floss/language/rust/coverage.py +++ b/floss/language/rust/coverage.py @@ -1,16 +1,16 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -import argparse +import sys import logging import pathlib -import sys -from typing import Iterable, List, Optional, Tuple +import argparse +from typing import List, Tuple, Iterable, Optional import pefile -from floss.language.rust.extract import extract_rust_strings -from floss.language.utils import get_extract_stats from floss.strings import extract_ascii_unicode_strings +from floss.language.utils import get_extract_stats +from floss.language.rust.extract import extract_rust_strings logger = logging.getLogger(__name__) @@ -33,9 +33,7 @@ def main(): help="minimum string length", ) logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index 076fea7a9..69502e321 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -1,18 +1,22 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -import argparse -import itertools +import sys import logging import pathlib -import sys -from typing import Iterable, List, Optional, Tuple +import argparse +import itertools +from typing import List, Tuple, Iterable, Optional -import binary2strings as b2s import pefile +import binary2strings as b2s -from floss.language.utils import (find_lea_xrefs, find_mov_xrefs, - find_push_xrefs, get_rdata_section, - get_struct_string_candidates) from floss.results import StaticString, StringEncoding +from floss.language.utils import ( + find_lea_xrefs, + find_mov_xrefs, + find_push_xrefs, + get_rdata_section, + get_struct_string_candidates, +) logger = logging.getLogger(__name__) @@ -95,16 +99,12 @@ def filter_and_transform_utf8_strings( # our static algorithm does not extract new lines either s = s.replace("\n", "") - transformed_strings.append( - StaticString(string=s, offset=start, encoding=StringEncoding.UTF8) - ) + transformed_strings.append(StaticString(string=s, offset=start, encoding=StringEncoding.UTF8)) return transformed_strings -def split_strings( - static_strings: List[StaticString], address: int, min_length: int -) -> None: +def split_strings(static_strings: List[StaticString], address: int, min_length: int) -> None: """Splits StaticString objects if an address falls within their string data. This function operates directly on the provided `static_strings` list. It checks if a given address lies within an existing StaticString. If so, it splits the string into two, preserving both parts if they meet the minimum length requirement. @@ -129,11 +129,7 @@ def split_strings( ) ) if len(rest) >= min_length: - static_strings.append( - StaticString( - string=rest, offset=address, encoding=StringEncoding.UTF8 - ) - ) + static_strings.append(StaticString(string=rest, offset=address, encoding=StringEncoding.UTF8)) # remove string from static_strings for static_string in static_strings: @@ -287,9 +283,7 @@ def main(argv=None): logging.basicConfig(level=logging.DEBUG) - rust_strings = sorted( - extract_rust_strings(args.path, args.min_length), key=lambda s: s.offset - ) + rust_strings = sorted(extract_rust_strings(args.path, args.min_length), key=lambda s: s.offset) for string in rust_strings: print(f"{string.offset:#x}: {string.string}") diff --git a/floss/language/utils.py b/floss/language/utils.py index 24207d676..55ea423ec 100644 --- a/floss/language/utils.py +++ b/floss/language/utils.py @@ -1,17 +1,17 @@ -import array -import hashlib import re +import array import struct +import hashlib +from typing import List, Tuple, Iterable, Optional from dataclasses import dataclass -from typing import Iterable, List, Optional, Tuple import pefile import tabulate from typing_extensions import TypeAlias import floss.utils -from floss.render.sanitize import sanitize from floss.results import StaticString, StringEncoding +from floss.render.sanitize import sanitize VA: TypeAlias = int @@ -53,6 +53,7 @@ class StructString: https://github.com/rust-lang/rust/blob/3911a63b7777e19dad4043542f908018e70c0bdd/library/alloc/src/string.rs """ + address: VA length: int @@ -142,9 +143,7 @@ def find_lea_xrefs(pe: pefile.PE) -> Iterable[VA]: code = section.get_data() if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_AMD64"]: - xrefs = find_amd64_lea_xrefs( - code, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase - ) + xrefs = find_amd64_lea_xrefs(code, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase) elif pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]: xrefs = find_i386_lea_xrefs(code) else: @@ -291,9 +290,7 @@ def get_max_section_size(pe: pefile.PE) -> int: return max(map(lambda s: s.SizeOfRawData, pe.sections)) -def get_struct_string_candidates_with_pointer_size( - pe: pefile.PE, buf: bytes, psize: int -) -> Iterable[StructString]: +def get_struct_string_candidates_with_pointer_size(pe: pefile.PE, buf: bytes, psize: int) -> Iterable[StructString]: """scan through the given bytes looking for pairs of machine words (address, length) that might potentially be struct String instances. we do some initial validation, like checking that the address is valid @@ -337,15 +334,11 @@ def get_struct_string_candidates_with_pointer_size( yield StructString(address, length) -def get_amd64_struct_string_candidates( - pe: pefile.PE, buf: bytes -) -> Iterable[StructString]: +def get_amd64_struct_string_candidates(pe: pefile.PE, buf: bytes) -> Iterable[StructString]: yield from get_struct_string_candidates_with_pointer_size(pe, buf, 64) -def get_i386_struct_string_candidates( - pe: pefile.PE, buf: bytes -) -> Iterable[StructString]: +def get_i386_struct_string_candidates(pe: pefile.PE, buf: bytes) -> Iterable[StructString]: yield from get_struct_string_candidates_with_pointer_size(pe, buf, 32) @@ -381,10 +374,7 @@ def get_struct_string_candidates(pe: pefile.PE) -> Iterable[StructString]: continue # TODO add .text here for Go version 1.12? - if not ( - section.Name.startswith(b".rdata\x00") - or section.Name.startswith(b".data\x00") - ): + if not (section.Name.startswith(b".rdata\x00") or section.Name.startswith(b".data\x00")): # by convention, the struct String instances are stored in the .rdata or .data section. continue @@ -422,18 +412,14 @@ def get_struct_string_candidates(pe: pefile.PE) -> Iterable[StructString]: continue try: - section_start, _, section_data = next( - filter(lambda s: s[0] <= candidate.address < s[1], section_datas) - ) + section_start, _, section_data = next(filter(lambda s: s[0] <= candidate.address < s[1], section_datas)) except StopIteration: continue instance_offset = candidate.address - section_start # remember: section_data is a memoryview, so this is a fast slice. # when not using memoryview, this takes a *long* time (dozens of seconds or longer). - instance_data = section_data[ - instance_offset : instance_offset + candidate.length - ] + instance_data = section_data[instance_offset : instance_offset + candidate.length] if len(instance_data) != candidate.length: continue @@ -515,15 +501,10 @@ def get_extract_stats( ) type_ = "substring" - if ( - s.string[: len(lang_str.string)] == lang_str.string - and s.offset == lang_str.offset - ): + if s.string[: len(lang_str.string)] == lang_str.string and s.offset == lang_str.offset: type_ = "exactsubstr" - results.append( - (secname, s_id, s_range, True, type_, s, replaced_len, lang_str) - ) + results.append((secname, s_id, s_range, True, type_, s, replaced_len, lang_str)) s = s_trimmed @@ -637,9 +618,7 @@ def get_extract_stats( print(".rdata only") print("len all string chars:", len_all_ss) print("len lang string chars :", len_lang_str) - print( - f"Percentage of string chars extracted: {round(100 * (len_lang_str / len_all_ss))}%" - ) + print(f"Percentage of string chars extracted: {round(100 * (len_lang_str / len_all_ss))}%") print() return 100 * (len_lang_str / len_all_ss) @@ -655,11 +634,7 @@ def get_missed_strings( found = False for lang_str in lang_strings: - if ( - lang_str.string - and lang_str.string in s.string - and s.offset <= lang_str.offset <= s.offset + orig_len - ): + if lang_str.string and lang_str.string in s.string and s.offset <= lang_str.offset <= s.offset + orig_len: found = True # remove found string data diff --git a/floss/main.py b/floss/main.py index 2442eba32..d6c7863bc 100644 --- a/floss/main.py +++ b/floss/main.py @@ -1,50 +1,60 @@ #!/usr/bin/env python # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. -import argparse -import codecs -import logging import os import sys +import codecs +import logging +import argparse import textwrap from enum import Enum -from pathlib import Path from time import time -from typing import List, Optional, Set +from typing import Set, List, Optional +from pathlib import Path import halo -import rich.traceback import viv_utils +import rich.traceback import viv_utils.flirt from vivisect import VivWorkspace -import floss.language.go.coverage -import floss.language.go.extract -import floss.language.rust.coverage -import floss.language.rust.extract -import floss.language.utils -import floss.logging_ -import floss.render.default -import floss.render.json -import floss.results import floss.utils +import floss.results import floss.version -from floss.const import (MAX_FILE_SIZE, MEGABYTE, MIN_STRING_LENGTH, - SUPPORTED_FILE_MAGIC) -from floss.identify import (append_unique, find_decoding_function_features, - get_function_fvas, get_functions_with_tightloops, - get_functions_without_tightloops, - get_tight_function_fvas, get_top_functions) -from floss.language.identify import Language, identify_language_and_version -from floss.logging_ import TRACE, DebugLevel +import floss.logging_ +import floss.render.json +import floss.language.utils +import floss.render.default +import floss.language.go.extract +import floss.language.go.coverage +import floss.language.rust.extract +import floss.language.rust.coverage +from floss.const import MEGABYTE, MAX_FILE_SIZE, MIN_STRING_LENGTH, SUPPORTED_FILE_MAGIC +from floss.utils import ( + hex, + get_imagebase, + get_runtime_diff, + get_static_strings, + get_vivisect_meta_info, + is_string_type_enabled, + set_vivisect_log_level, +) from floss.render import Verbosity from floss.results import Analysis, Metadata, ResultDocument, load +from floss.version import __version__ +from floss.identify import ( + append_unique, + get_function_fvas, + get_top_functions, + get_tight_function_fvas, + get_functions_with_tightloops, + find_decoding_function_features, + get_functions_without_tightloops, +) +from floss.logging_ import TRACE, DebugLevel from floss.stackstrings import extract_stackstrings -from floss.string_decoder import decode_strings from floss.tightstrings import extract_tightstrings -from floss.utils import (get_imagebase, get_runtime_diff, get_static_strings, - get_vivisect_meta_info, hex, is_string_type_enabled, - set_vivisect_log_level) -from floss.version import __version__ +from floss.string_decoder import decode_strings +from floss.language.identify import Language, identify_language_and_version SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") @@ -55,6 +65,7 @@ class StringType(str, Enum): """Enumerates the types of strings that FLOSS can extract from a binary.""" + STATIC = "static" STACK = "stack" TIGHT = "tight" @@ -66,16 +77,18 @@ class WorkspaceLoadError(ValueError): This exception inherits from ValueError, making it suitable for signaling issues encountered during the process of loading or initializing a workspace (e.g., in an analysis tool). """ + pass class ArgumentValueError(ValueError): """Indicates an error occurred while parsing command-line arguments.""" + pass class ArgumentParser(argparse.ArgumentParser): - """argparse will call sys.exit upon parsing invalid arguments. + """argparse will call sys.exit upon parsing invalid arguments. we don't want that, because we might be parsing args within test cases, run as a module, etc. so, we override the behavior to raise a ArgumentValueError instead. @@ -200,11 +213,7 @@ def make_parser(argv): "--format", choices=[f[0] for f in formats], default="auto", - help=( - "select sample format, %s" % format_help - if show_all_options - else argparse.SUPPRESS - ), + help=("select sample format, %s" % format_help if show_all_options else argparse.SUPPRESS), ) advanced_group.add_argument( "--language", @@ -221,11 +230,7 @@ def make_parser(argv): "-l", "--load", action="store_true", - help=( - "load from existing FLOSS results document" - if show_all_options - else argparse.SUPPRESS - ), + help=("load from existing FLOSS results document" if show_all_options else argparse.SUPPRESS), ) advanced_group.add_argument( "--functions", @@ -258,9 +263,7 @@ def make_parser(argv): "--large-file", action="store_true", help=( - "allow processing files larger than {} MB".format( - int(MAX_FILE_SIZE / MEGABYTE) - ) + "allow processing files larger than {} MB".format(int(MAX_FILE_SIZE / MEGABYTE)) if show_all_options else argparse.SUPPRESS ), @@ -293,9 +296,7 @@ def make_parser(argv): ) output_group = parser.add_argument_group("rendering arguments") - output_group.add_argument( - "-j", "--json", action="store_true", help="emit JSON instead of text" - ) + output_group.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") output_group.add_argument( "-v", "--verbose", @@ -395,10 +396,7 @@ def select_functions(vw, asked_functions: Optional[List[int]]) -> Set[int]: # validate that all functions requested by the user exist. missing_functions = sorted(asked_functions_ - functions) if missing_functions: - raise ValueError( - "failed to find functions: %s" - % (", ".join(map(hex, sorted(missing_functions)))) - ) + raise ValueError("failed to find functions: %s" % (", ".join(map(hex, sorted(missing_functions))))) logger.debug("selected %d functions", len(asked_functions_)) logger.trace( @@ -458,13 +456,9 @@ def load_vw( format = "sc64" if format == "sc32": - vw = viv_utils.getShellcodeWorkspaceFromFile( - str(sample_path), arch="i386", analyze=False - ) + vw = viv_utils.getShellcodeWorkspaceFromFile(str(sample_path), arch="i386", analyze=False) elif format == "sc64": - vw = viv_utils.getShellcodeWorkspaceFromFile( - str(sample_path), arch="amd64", analyze=False - ) + vw = viv_utils.getShellcodeWorkspaceFromFile(str(sample_path), arch="amd64", analyze=False) else: vw = viv_utils.getWorkspace(str(sample_path), analyze=False, should_save=False) @@ -477,9 +471,7 @@ def load_vw( try: vw.saveWorkspace() except IOError: - logger.info( - "source directory is not writable, won't save intermediate workspace" - ) + logger.info("source directory is not writable, won't save intermediate workspace") else: logger.debug("not saving workspace") @@ -523,17 +515,13 @@ def get_signatures(sigs_path: Path) -> List[Path]: List[Path]: The paths to the signature files. """ if not sigs_path.exists(): - raise IOError( - "signatures path %s does not exist or cannot be accessed" % str(sigs_path) - ) + raise IOError("signatures path %s does not exist or cannot be accessed" % str(sigs_path)) paths = [] if sigs_path.is_file(): paths.append(sigs_path) elif sigs_path.is_dir(): - logger.debug( - "reading signatures from directory %s", str(sigs_path.resolve().absolute()) - ) + logger.debug("reading signatures from directory %s", str(sigs_path.resolve().absolute())) for item in sigs_path.iterdir(): if item.is_file(): if item.suffix in [".pat", ".pat.gz", ".sig"]: @@ -601,25 +589,15 @@ def main(argv=None) -> int: args.sample.close() if args.functions: - if is_string_type_enabled( - StringType.STATIC, args.disabled_types, args.enabled_types - ): + if is_string_type_enabled(StringType.STATIC, args.disabled_types, args.enabled_types): logger.warning("analyzing specified functions, not showing static strings") args.disabled_types.append(StringType.STATIC) analysis = Analysis( - enable_static_strings=is_string_type_enabled( - StringType.STATIC, args.disabled_types, args.enabled_types - ), - enable_stack_strings=is_string_type_enabled( - StringType.STACK, args.disabled_types, args.enabled_types - ), - enable_tight_strings=is_string_type_enabled( - StringType.TIGHT, args.disabled_types, args.enabled_types - ), - enable_decoded_strings=is_string_type_enabled( - StringType.DECODED, args.disabled_types, args.enabled_types - ), + enable_static_strings=is_string_type_enabled(StringType.STATIC, args.disabled_types, args.enabled_types), + enable_stack_strings=is_string_type_enabled(StringType.STACK, args.disabled_types, args.enabled_types), + enable_tight_strings=is_string_type_enabled(StringType.TIGHT, args.disabled_types, args.enabled_types), + enable_decoded_strings=is_string_type_enabled(StringType.DECODED, args.disabled_types, args.enabled_types), ) if args.load: @@ -635,9 +613,7 @@ def main(argv=None) -> int: if args.json: r = floss.render.json.render(results) else: - r = floss.render.default.render( - results, args.verbose, args.quiet, args.color - ) + r = floss.render.default.render(results, args.verbose, args.quiet, args.color) print(r) @@ -688,30 +664,20 @@ def main(argv=None) -> int: results.metadata.language_version = lang_version if results.metadata.language == Language.GO.value: - if ( - analysis.enable_tight_strings - or analysis.enable_stack_strings - or analysis.enable_decoded_strings - ): + if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings: logger.warning( "FLOSS handles Go static strings, but string deobfuscation may be inaccurate and take a long time" ) elif results.metadata.language == Language.RUST.value: - if ( - analysis.enable_tight_strings - or analysis.enable_stack_strings - or analysis.enable_decoded_strings - ): + if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings: logger.warning( "FLOSS handles Rust static strings, but string deobfuscation may be inaccurate and take a long time" ) elif results.metadata.language == Language.DOTNET.value: logger.warning(".NET language-specific string extraction is not supported yet") - logger.warning( - "FLOSS does NOT attempt to deobfuscate any strings from .NET binaries" - ) + logger.warning("FLOSS does NOT attempt to deobfuscate any strings from .NET binaries") # enable .NET strings once we can extract them # results.metadata.language = Language.DOTNET.value @@ -725,9 +691,7 @@ def main(argv=None) -> int: if args.enabled_types == [] and args.disabled_types == []: # when stdout is redirected, such as in 'floss foo.exe | less' use default prompt values if sys.stdout.isatty(): - prompt = input( - "Do you want to enable string deobfuscation? (this could take a long time) [y/N] " - ) + prompt = input("Do you want to enable string deobfuscation? (this could take a long time) [y/N] ") else: prompt = "n" @@ -759,45 +723,29 @@ def main(argv=None) -> int: logger.info("extracting language-specific Go strings") interim = time() - results.strings.language_strings = ( - floss.language.go.extract.extract_go_strings(sample, args.min_length) - ) + results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length) results.metadata.runtime.language_strings = get_runtime_diff(interim) # missed strings only includes non-identified strings in searched range # here currently only focus on strings in string blob range - string_blob_strings = ( - floss.language.go.extract.get_static_strings_from_blob_range( - sample, static_strings - ) - ) - results.strings.language_strings_missed = ( - floss.language.utils.get_missed_strings( - string_blob_strings, - results.strings.language_strings, - args.min_length, - ) + string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range(sample, static_strings) + results.strings.language_strings_missed = floss.language.utils.get_missed_strings( + string_blob_strings, + results.strings.language_strings, + args.min_length, ) elif results.metadata.language == Language.RUST.value: logger.info("extracting language-specific Rust strings") interim = time() - results.strings.language_strings = ( - floss.language.rust.extract.extract_rust_strings( - sample, args.min_length - ) - ) + results.strings.language_strings = floss.language.rust.extract.extract_rust_strings(sample, args.min_length) results.metadata.runtime.language_strings = get_runtime_diff(interim) # currently Rust strings are only extracted from the .rdata section - rdata_strings = floss.language.rust.extract.get_static_strings_from_rdata( - sample, static_strings - ) - results.strings.language_strings_missed = ( - floss.language.utils.get_missed_strings( - rdata_strings, results.strings.language_strings, args.min_length - ) + rdata_strings = floss.language.rust.extract.get_static_strings_from_rdata(sample, static_strings) + results.strings.language_strings_missed = floss.language.utils.get_missed_strings( + rdata_strings, results.strings.language_strings, args.min_length ) if ( results.analysis.enable_decoded_strings @@ -859,18 +807,14 @@ def main(argv=None) -> int: interim = time() logger.trace("analysis summary:") - for k, v in get_vivisect_meta_info( - vw, selected_functions, decoding_function_features - ).items(): + for k, v in get_vivisect_meta_info(vw, selected_functions, decoding_function_features).items(): logger.trace(" %s: %s", k, v or "N/A") if results.analysis.enable_stack_strings: if results.analysis.enable_tight_strings: # don't run this on functions with tight loops as this will likely result in FPs # and should be caught by the tightstrings extraction below - selected_functions = get_functions_without_tightloops( - decoding_function_features - ) + selected_functions = get_functions_without_tightloops(decoding_function_features) results.strings.stack_strings = extract_stackstrings( vw, @@ -884,9 +828,7 @@ def main(argv=None) -> int: interim = time() if results.analysis.enable_tight_strings: - tightloop_functions = get_functions_with_tightloops( - decoding_function_features - ) + tightloop_functions = get_functions_with_tightloops(decoding_function_features) results.strings.tight_strings = extract_tightstrings( vw, tightloop_functions, @@ -911,16 +853,10 @@ def main(argv=None) -> int: if len(fvas_to_emulate) == 0: logger.info("no candidate decoding functions found.") else: - logger.debug( - "identified %d candidate decoding functions", len(fvas_to_emulate) - ) + logger.debug("identified %d candidate decoding functions", len(fvas_to_emulate)) for fva in fvas_to_emulate: - results.analysis.functions.decoding_function_scores[fva] = ( - decoding_function_features[fva]["score"] - ) - logger.debug( - " - 0x%x: %.3f", fva, decoding_function_features[fva]["score"] - ) + results.analysis.functions.decoding_function_scores[fva] = decoding_function_features[fva]["score"] + logger.debug(" - 0x%x: %.3f", fva, decoding_function_features[fva]["score"]) # TODO filter out strings decoded in library function or function only called by library function(s) results.strings.decoded_strings = decode_strings( diff --git a/floss/render/default.py b/floss/render/default.py index f4fcd932c..f68aab82c 100644 --- a/floss/render/default.py +++ b/floss/render/default.py @@ -1,23 +1,22 @@ # Copyright (C) 2022 Mandiant, Inc. All Rights Reserved. -import collections import io import sys import textwrap +import collections from typing import Dict, List, Tuple, Union from rich import box -from rich.console import Console -from rich.markup import escape from rich.table import Table +from rich.markup import escape +from rich.console import Console -import floss.language.identify -import floss.logging_ import floss.utils as util +import floss.logging_ +import floss.language.identify from floss.render import Verbosity +from floss.results import AddressType, StackString, TightString, DecodedString, ResultDocument, StringEncoding from floss.render.sanitize import sanitize -from floss.results import (AddressType, DecodedString, ResultDocument, - StackString, StringEncoding, TightString) MIN_WIDTH_LEFT_COL = 22 MIN_WIDTH_RIGHT_COL = 82 @@ -87,11 +86,7 @@ def render_meta(results: ResultDocument, console, verbose): if results.metadata.language != "unknown" and results.metadata.language_version else "" ) - lang_s = ( - f" - selected: {results.metadata.language_selected}" - if results.metadata.language_selected - else "" - ) + lang_s = f" - selected: {results.metadata.language_selected}" if results.metadata.language_selected else "" language_value = f"{lang}{lang_v}{lang_s}" if verbose == Verbosity.DEFAULT: @@ -166,27 +161,15 @@ def render_string_type_rows(results: ResultDocument) -> List[Tuple[str, str]]: ), ( " stack strings", - ( - str(len(results.strings.stack_strings)) - if results.analysis.enable_stack_strings - else DISABLED - ), + (str(len(results.strings.stack_strings)) if results.analysis.enable_stack_strings else DISABLED), ), ( " tight strings", - ( - str(len(results.strings.tight_strings)) - if results.analysis.enable_tight_strings - else DISABLED - ), + (str(len(results.strings.tight_strings)) if results.analysis.enable_tight_strings else DISABLED), ), ( " decoded strings", - ( - str(len(results.strings.decoded_strings)) - if results.analysis.enable_decoded_strings - else DISABLED - ), + (str(len(results.strings.decoded_strings)) if results.analysis.enable_decoded_strings else DISABLED), ), ] @@ -211,13 +194,9 @@ def render_function_analysis_rows(results) -> List[Tuple[str, str]]: (" library", results.analysis.functions.library), ] if results.analysis.enable_stack_strings: - rows.append( - (" stack strings", str(results.analysis.functions.analyzed_stack_strings)) - ) + rows.append((" stack strings", str(results.analysis.functions.analyzed_stack_strings))) if results.analysis.enable_tight_strings: - rows.append( - (" tight strings", str(results.analysis.functions.analyzed_tight_strings)) - ) + rows.append((" tight strings", str(results.analysis.functions.analyzed_tight_strings))) if results.analysis.enable_decoded_strings: rows.append( ( @@ -292,9 +271,7 @@ def render_language_strings( console.print(f"0x{s.offset:>0{offset_len}x} {colored_string}") -def render_static_substrings( - strings, encoding, offset_len, console, verbose, disable_headers -): +def render_static_substrings(strings, encoding, offset_len, console, verbose, disable_headers): """Displays static strings with their encoding information to the console. Optionally displays a heading, and then prints each string with its offset to the console. Formatting of strings is influenced by verbosity settings. @@ -309,9 +286,7 @@ def render_static_substrings( """ if verbose != Verbosity.DEFAULT: encoding = heading_style(encoding) - render_sub_heading( - f"FLOSS STATIC STRINGS: {encoding}", len(strings), console, disable_headers - ) + render_sub_heading(f"FLOSS STATIC STRINGS: {encoding}", len(strings), console, disable_headers) for s in strings: if verbose == Verbosity.DEFAULT: console.print(sanitize(s.string), markup=False) @@ -331,14 +306,10 @@ def render_staticstrings(strings, console, verbose, disable_headers): verbose: Verbosity level influencing formatting. disable_headers: A flag to suppress the display of headers. """ - render_heading( - f"FLOSS STATIC STRINGS ({len(strings)})", console, verbose, disable_headers - ) + render_heading(f"FLOSS STATIC STRINGS ({len(strings)})", console, verbose, disable_headers) ascii_strings = list(filter(lambda s: s.encoding == StringEncoding.ASCII, strings)) - unicode_strings = list( - filter(lambda s: s.encoding == StringEncoding.UTF16LE, strings) - ) + unicode_strings = list(filter(lambda s: s.encoding == StringEncoding.UTF16LE, strings)) ascii_offset_len = 0 unicode_offset_len = 0 @@ -348,13 +319,9 @@ def render_staticstrings(strings, console, verbose, disable_headers): unicode_offset_len = len(f"{unicode_strings[-1].offset}") offset_len = max(ascii_offset_len, unicode_offset_len) - render_static_substrings( - ascii_strings, "ASCII", offset_len, console, verbose, disable_headers - ) + render_static_substrings(ascii_strings, "ASCII", offset_len, console, verbose, disable_headers) console.print("\n") - render_static_substrings( - unicode_strings, "UTF-16LE", offset_len, console, verbose, disable_headers - ) + render_static_substrings(unicode_strings, "UTF-16LE", offset_len, console, verbose, disable_headers) def render_stackstrings( @@ -398,9 +365,7 @@ def render_stackstrings( console.print(table) -def render_decoded_strings( - decoded_strings: List[DecodedString], console, verbose, disable_headers -): +def render_decoded_strings(decoded_strings: List[DecodedString], console, verbose, disable_headers): """Renders the results of the string decoding phase. Optionally displays a heading, and then prints each string with its offset to the console. Formatting of strings is influenced by verbosity settings. @@ -558,17 +523,13 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo if verbose == Verbosity.DEFAULT: console.print(f"FLARE FLOSS RESULTS (version {results.metadata.version})\n") else: - colored_str = heading_style( - f"FLARE FLOSS RESULTS (version {results.metadata.version})\n" - ) + colored_str = heading_style(f"FLARE FLOSS RESULTS (version {results.metadata.version})\n") console.print(colored_str) render_meta(results, console, verbose) console.print("\n") if results.analysis.enable_static_strings: - render_staticstrings( - results.strings.static_strings, console, verbose, disable_headers - ) + render_staticstrings(results.strings.static_strings, console, verbose, disable_headers) console.print("\n") if results.metadata.language in ( @@ -592,9 +553,7 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo verbose, disable_headers, ) - render_stackstrings( - results.strings.stack_strings, console, verbose, disable_headers - ) + render_stackstrings(results.strings.stack_strings, console, verbose, disable_headers) console.print("\n") if results.analysis.enable_tight_strings: @@ -604,9 +563,7 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo verbose, disable_headers, ) - render_stackstrings( - results.strings.tight_strings, console, verbose, disable_headers - ) + render_stackstrings(results.strings.tight_strings, console, verbose, disable_headers) console.print("\n") if results.analysis.enable_decoded_strings: @@ -616,9 +573,7 @@ def render(results: floss.results.ResultDocument, verbose, disable_headers, colo verbose, disable_headers, ) - render_decoded_strings( - results.strings.decoded_strings, console, verbose, disable_headers - ) + render_decoded_strings(results.strings.decoded_strings, console, verbose, disable_headers) console.file.seek(0) return console.file.read() diff --git a/floss/render/json.py b/floss/render/json.py index 94298b25a..ff9241649 100644 --- a/floss/render/json.py +++ b/floss/render/json.py @@ -1,8 +1,8 @@ # Copyright (C) 2022 Mandiant, Inc. All Rights Reserved. -import dataclasses -import datetime import json +import datetime +import dataclasses from floss.results import ResultDocument @@ -15,6 +15,7 @@ class FlossJSONEncoder(json.JSONEncoder): * Dataclasses: Converts dataclass instances into their dictionary representations. * Datetimes: Encodes datetime objects into ISO 8601 formatted strings (with timezone information). """ + def default(self, o): """Overrides the default JSON encoding behavior to handle dataclasses and datetime objects. diff --git a/floss/results.py b/floss/results.py index 7be319cd0..493e26670 100644 --- a/floss/results.py +++ b/floss/results.py @@ -1,14 +1,15 @@ # Copyright (C) 2021 Mandiant, Inc. All Rights Reserved. -import datetime -import json import re -from dataclasses import field +import json +import datetime from enum import Enum -from pathlib import Path from typing import Dict, List +from pathlib import Path +from dataclasses import field from pydantic import TypeAdapter, ValidationError + # we use pydantic for dataclasses so that we can # easily load and validate JSON reports. # @@ -20,8 +21,8 @@ import floss.logging_ from floss.render import Verbosity -from floss.render.sanitize import sanitize from floss.version import __version__ +from floss.render.sanitize import sanitize logger = floss.logging_.getLogger(__name__) @@ -69,6 +70,7 @@ class StackString: [bigger addresses] """ + function: int string: str encoding: StringEncoding @@ -81,11 +83,13 @@ class StackString: class TightString(StackString): """A string that is tightly packed in memory.""" + pass class AddressType(str, Enum): """Enumeration of address types.""" + STACK = "STACK" GLOBAL = "GLOBAL" HEAP = "HEAP" @@ -94,6 +98,7 @@ class AddressType(str, Enum): @dataclass(frozen=True) class DecodedString: """A decoding string and details about where it was found.""" + address: int address_type: AddressType string: str @@ -105,6 +110,7 @@ class DecodedString: @dataclass(frozen=True) class StaticString: """A string extracted from the raw bytes of the input.""" + string: str offset: int encoding: StringEncoding @@ -137,6 +143,7 @@ def from_utf8(cls, buf, addr, min_length): @dataclass class Runtime: """The runtime of the analysis.""" + start_date: datetime.datetime = datetime.datetime.now() total: float = 0 vivisect: float = 0 @@ -151,6 +158,7 @@ class Runtime: @dataclass class Functions: """The functions that were analyzed.""" + discovered: int = 0 library: int = 0 analyzed_stack_strings: int = 0 @@ -162,6 +170,7 @@ class Functions: @dataclass class Analysis: """The analysis configuration.""" + enable_static_strings: bool = True enable_stack_strings: bool = True enable_tight_strings: bool = True @@ -169,14 +178,13 @@ class Analysis: functions: Functions = field(default_factory=Functions) -STRING_TYPE_FIELDS = set( - [field for field in Analysis.__annotations__ if field.startswith("enable_")] -) +STRING_TYPE_FIELDS = set([field for field in Analysis.__annotations__ if field.startswith("enable_")]) @dataclass class Metadata: """Metadata about the analysis.""" + file_path: str version: str = __version__ imagebase: int = 0 @@ -190,6 +198,7 @@ class Metadata: @dataclass class Strings: """The strings that were found.""" + stack_strings: List[StackString] = field(default_factory=list) tight_strings: List[TightString] = field(default_factory=list) decoded_strings: List[DecodedString] = field(default_factory=list) @@ -201,6 +210,7 @@ class Strings: @dataclass class ResultDocument: """The result document.""" + metadata: Metadata analysis: Analysis = field(default_factory=Analysis) strings: Strings = field(default_factory=Strings) @@ -247,14 +257,10 @@ def log_result(decoded_string, verbosity): decoded_string.program_counter, ) else: - ValueError( - "unknown decoded or extracted string type: %s", type(decoded_string) - ) + ValueError("unknown decoded or extracted string type: %s", type(decoded_string)) -def load( - sample: Path, analysis: Analysis, functions: List[int], min_length: int -) -> ResultDocument: +def load(sample: Path, analysis: Analysis, functions: List[int], min_length: int) -> ResultDocument: """Load a result document from a file, applying filters as needed. Args: @@ -301,9 +307,7 @@ def read(sample: Path) -> ResultDocument: try: results = ResultDocument(**results) except (TypeError, ValidationError) as e: - raise InvalidResultsFile( - f"{str(sample)} is not a valid FLOSS result document: {e}" - ) + raise InvalidResultsFile(f"{str(sample)} is not a valid FLOSS result document: {e}") return results @@ -318,12 +322,8 @@ def check_set_string_types(results: ResultDocument, wanted_analysis: Analysis) - wanted_analysis: An Analysis object representing the desired analysis configuration. """ for string_type in STRING_TYPE_FIELDS: - if getattr(wanted_analysis, string_type) and not getattr( - results.analysis, string_type - ): - logger.warning( - f"{string_type} not in loaded data, use --only/--no to enable/disable type(s)" - ) + if getattr(wanted_analysis, string_type) and not getattr(results.analysis, string_type): + logger.warning(f"{string_type} not in loaded data, use --only/--no to enable/disable type(s)") setattr(results.analysis, string_type, getattr(wanted_analysis, string_type)) @@ -342,34 +342,20 @@ def filter_functions(results: ResultDocument, functions: List[int]) -> None: filtered_scores = dict() for fva in functions: try: - filtered_scores[fva] = results.analysis.functions.decoding_function_scores[ - fva - ] + filtered_scores[fva] = results.analysis.functions.decoding_function_scores[fva] except KeyError: raise InvalidLoadConfig(f"function 0x{fva:x} not found in loaded data") results.analysis.functions.decoding_function_scores = filtered_scores - results.strings.stack_strings = list( - filter(lambda f: f.function in functions, results.strings.stack_strings) - ) - results.strings.tight_strings = list( - filter(lambda f: f.function in functions, results.strings.tight_strings) - ) + results.strings.stack_strings = list(filter(lambda f: f.function in functions, results.strings.stack_strings)) + results.strings.tight_strings = list(filter(lambda f: f.function in functions, results.strings.tight_strings)) results.strings.decoded_strings = list( - filter( - lambda f: f.decoding_routine in functions, results.strings.decoded_strings - ) + filter(lambda f: f.decoding_routine in functions, results.strings.decoded_strings) ) - results.analysis.functions.analyzed_stack_strings = len( - results.strings.stack_strings - ) - results.analysis.functions.analyzed_tight_strings = len( - results.strings.tight_strings - ) - results.analysis.functions.analyzed_decoded_strings = len( - results.strings.decoded_strings - ) + results.analysis.functions.analyzed_stack_strings = len(results.strings.stack_strings) + results.analysis.functions.analyzed_tight_strings = len(results.strings.tight_strings) + results.analysis.functions.analyzed_decoded_strings = len(results.strings.decoded_strings) def filter_string_len(results: ResultDocument, min_length: int) -> None: @@ -381,15 +367,9 @@ def filter_string_len(results: ResultDocument, min_length: int) -> None: results: A ResultDocument object containing analysis results. min_length: The minimum length a string must have to be retained. """ - results.strings.static_strings = list( - filter(lambda s: len(s.string) >= min_length, results.strings.static_strings) - ) - results.strings.stack_strings = list( - filter(lambda s: len(s.string) >= min_length, results.strings.stack_strings) - ) - results.strings.tight_strings = list( - filter(lambda s: len(s.string) >= min_length, results.strings.tight_strings) - ) + results.strings.static_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.static_strings)) + results.strings.stack_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.stack_strings)) + results.strings.tight_strings = list(filter(lambda s: len(s.string) >= min_length, results.strings.tight_strings)) results.strings.decoded_strings = list( filter(lambda s: len(s.string) >= min_length, results.strings.decoded_strings) ) diff --git a/floss/stackstrings.py b/floss/stackstrings.py index a23206b8f..78eb28ccc 100644 --- a/floss/stackstrings.py +++ b/floss/stackstrings.py @@ -1,19 +1,19 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. +from typing import Set, List, Optional from dataclasses import dataclass -from typing import List, Optional, Set -import envi.archs.amd64 -import envi.archs.i386 import tqdm import viv_utils +import envi.archs.i386 +import envi.archs.amd64 import viv_utils.emulator_drivers -import floss.strings import floss.utils +import floss.strings +from floss.utils import getPointerSize, extract_strings from floss.render import Verbosity from floss.results import StackString -from floss.utils import extract_strings, getPointerSize logger = floss.logging_.getLogger(__name__) MAX_STACK_SIZE = 0x10000 @@ -32,6 +32,7 @@ class CallContext: stack_memory: the active stack frame contents pre_ctx_strings: strings identified before this context """ + pc: int sp: int init_sp: int @@ -42,9 +43,10 @@ class CallContext: class StackstringContextMonitor(viv_utils.emulator_drivers.Monitor): """Observes emulation and extracts the active stack frame contents: - - at each function call in a function, and - - based on heuristics looking for mov instructions to a hardcoded buffer. + - at each function call in a function, and + - based on heuristics looking for mov instructions to a hardcoded buffer. """ + def __init__(self, init_sp, bb_ends): super().__init__() self.ctxs: List[CallContext] = [] @@ -67,9 +69,7 @@ def update_contexts(self, emu, va) -> None: logger.debug("%s", e) # TODO get va here from emu? - def get_call_context( - self, emu, va, pre_ctx_strings: Optional[Set[str]] = None - ) -> CallContext: + def get_call_context(self, emu, va, pre_ctx_strings: Optional[Set[str]] = None) -> CallContext: """Collects context information related to a function call. Retrieves the stack boundaries, reads the stack memory, and creates a `CallContext` object to encapsulate the extracted information. Optionally integrates pre-existing context strings. @@ -227,9 +227,7 @@ def extract_stackstrings( ctx.init_sp - ctx.sp, ) for s in extract_strings(ctx.stack_memory, min_length, seen): - frame_offset = ( - (ctx.init_sp - ctx.sp) - s.offset - getPointerSize(vw) - ) + frame_offset = (ctx.init_sp - ctx.sp) - s.offset - getPointerSize(vw) ss = StackString( function=fva, string=s.string, diff --git a/floss/string_decoder.py b/floss/string_decoder.py index cc0ad04fd..30d82a66c 100644 --- a/floss/string_decoder.py +++ b/floss/string_decoder.py @@ -1,26 +1,29 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. +from typing import Set, List from dataclasses import dataclass -from typing import List, Set import tqdm import viv_utils from vivisect import VivWorkspace -import floss.decoding_manager -import floss.logging_ +import floss.utils import floss.results import floss.strings -import floss.utils -from floss.const import (DS_FUNCTION_CALLS_OFTEN, DS_FUNCTION_CALLS_RARE, - DS_FUNCTION_MIN_DECODED_STRINGS, - DS_FUNCTION_SHORTCUT_THRESHOLD_VERY_OFTEN, - DS_MAX_INSN_COUNT) -from floss.decoding_manager import Delta -from floss.function_argument_getter import extract_decoding_contexts +import floss.logging_ +import floss.decoding_manager +from floss.const import ( + DS_MAX_INSN_COUNT, + DS_FUNCTION_CALLS_RARE, + DS_FUNCTION_CALLS_OFTEN, + DS_FUNCTION_MIN_DECODED_STRINGS, + DS_FUNCTION_SHORTCUT_THRESHOLD_VERY_OFTEN, +) +from floss.utils import is_all_zeros from floss.render import Verbosity from floss.results import AddressType, DecodedString -from floss.utils import is_all_zeros +from floss.decoding_manager import Delta +from floss.function_argument_getter import extract_decoding_contexts logger = floss.logging_.getLogger(__name__) @@ -157,9 +160,7 @@ def decode_strings( decoded_strings = list() function_index = viv_utils.InstructionFunctionIndex(vw) - pb = floss.utils.get_progress_bar( - functions, disable_progress, desc="decoding strings", unit=" functions" - ) + pb = floss.utils.get_progress_bar(functions, disable_progress, desc="decoding strings", unit=" functions") with tqdm.contrib.logging.logging_redirect_tqdm(), floss.utils.redirecting_print_to_tqdm(): for fva in pb: seen: Set[str] = floss.utils.get_referenced_strings(vw, fva) @@ -167,22 +168,14 @@ def decode_strings( n_calls = len(ctxs) for n, ctx in enumerate(ctxs, 1): if isinstance(pb, tqdm.tqdm): - pb.set_description( - f"emulating function 0x{fva:x} (call {n}/{n_calls})" - ) + pb.set_description(f"emulating function 0x{fva:x} (call {n}/{n_calls})") if should_shortcut(fva, n, n_calls, len(seen)): break - for delta in emulate_decoding_routine( - vw, function_index, fva, ctx, max_insn_count - ): - for delta_bytes in extract_delta_bytes( - delta, ctx.decoded_at_va, fva - ): - for s in floss.utils.extract_strings( - delta_bytes.bytes, min_length, seen - ): + for delta in emulate_decoding_routine(vw, function_index, fva, ctx, max_insn_count): + for delta_bytes in extract_delta_bytes(delta, ctx.decoded_at_va, fva): + for s in floss.utils.extract_strings(delta_bytes.bytes, min_length, seen): ds = DecodedString( address=delta_bytes.address + s.offset, address_type=delta_bytes.address_type, @@ -197,9 +190,7 @@ def decode_strings( return decoded_strings -def emulate_decoding_routine( - vw, function_index, function: int, context, max_instruction_count: int -) -> List[Delta]: +def emulate_decoding_routine(vw, function_index, function: int, context, max_instruction_count: int) -> List[Delta]: """Emulate a function with a given context and extract the CPU and memory contexts at interesting points during emulation. These "interesting points" include calls to other functions and @@ -245,9 +236,7 @@ class DeltaBytes: decoding_routine: int -def extract_delta_bytes( - delta: Delta, decoded_at_va: int, source_fva: int = 0x0 -) -> List[DeltaBytes]: +def extract_delta_bytes(delta: Delta, decoded_at_va: int, source_fva: int = 0x0) -> List[DeltaBytes]: """Extract the sequence of byte sequences that differ from before and after snapshots. Args: @@ -312,10 +301,6 @@ def extract_delta_bytes( location_type = AddressType.STACK if not is_all_zeros(diff_bytes): - delta_bytes.append( - DeltaBytes( - address, location_type, diff_bytes, decoded_at_va, source_fva - ) - ) + delta_bytes.append(DeltaBytes(address, location_type, diff_bytes, decoded_at_va, source_fva)) return delta_bytes diff --git a/floss/strings.py b/floss/strings.py index 19e928ebd..c2f5330ec 100644 --- a/floss/strings.py +++ b/floss/strings.py @@ -1,8 +1,8 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. import re -from itertools import chain from typing import Iterable +from itertools import chain from floss.results import StaticString, StringEncoding diff --git a/floss/tightstrings.py b/floss/tightstrings.py index 2c12d0746..c3ec17652 100644 --- a/floss/tightstrings.py +++ b/floss/tightstrings.py @@ -1,28 +1,28 @@ # Copyright (C) 2021 Mandiant, Inc. All Rights Reserved. -from typing import Iterator, List, Optional, Set, Tuple +from typing import Set, List, Tuple, Iterator, Optional -import envi.exc import tqdm -import tqdm.contrib.logging +import envi.exc import viv_utils +import tqdm.contrib.logging import viv_utils.emulator_drivers -import floss.features.features import floss.utils -from floss.const import DS_MAX_ADDRESS_REVISITS_EMULATION, TS_MAX_INSN_COUNT +import floss.features.features +from floss.const import TS_MAX_INSN_COUNT, DS_MAX_ADDRESS_REVISITS_EMULATION +from floss.utils import extract_strings from floss.render import Verbosity from floss.results import TightString from floss.stackstrings import CallContext, StackstringContextMonitor -from floss.utils import extract_strings logger = floss.logging_.getLogger(__name__) class TightstringContextMonitor(StackstringContextMonitor): """Observes emulation and extracts the active stack frame contents: - - at each function call in a function, and - - based on heuristics looking for mov instructions to a hardcoded buffer. + - at each function call in a function, and + - based on heuristics looking for mov instructions to a hardcoded buffer. """ def __init__(self, sp, min_length): @@ -36,16 +36,12 @@ def apicall(self, emu, api, argv): def get_pre_ctx_strings(self, emu) -> Set[str]: try: stack_buf = self.get_call_context(emu, emu.getProgramCounter()).stack_memory - return set( - map(lambda s: s.string, extract_strings(stack_buf, self.min_length)) - ) + return set(map(lambda s: s.string, extract_strings(stack_buf, self.min_length))) except ValueError as e: logger.debug("%s", e) return set() - def get_context( - self, emu, va, pre_ctx_strings: Optional[Set[str]] - ) -> Iterator[CallContext]: + def get_context(self, emu, va, pre_ctx_strings: Optional[Set[str]]) -> Iterator[CallContext]: try: yield self.get_call_context(emu, va, pre_ctx_strings) except ValueError as e: @@ -66,9 +62,7 @@ def extract_tightstring_contexts(vw, fva, min_length, tloops) -> Iterator[CallCo """ emu = floss.utils.make_emulator(vw) monitor = TightstringContextMonitor(emu.getStackCounter(), min_length) - driver_single_path = viv_utils.emulator_drivers.SinglePathEmulatorDriver( - emu, repmax=256 - ) + driver_single_path = viv_utils.emulator_drivers.SinglePathEmulatorDriver(emu, repmax=256) driver_single_path.add_monitor(monitor) driver = viv_utils.emulator_drivers.DebuggerEmulatorDriver( emu, max_hit=DS_MAX_ADDRESS_REVISITS_EMULATION, max_insn=TS_MAX_INSN_COUNT @@ -112,7 +106,7 @@ def extract_tightstrings( disable_progress=False, ) -> List[TightString]: """Extracts tightstrings from functions that contain tight loops. - + Tightstrings are a special form of stackstrings. Their bytes are loaded on the stack and then modified in a tight loop. To extract tightstrings we use a mix between the string decoding and stackstring algorithms. @@ -128,9 +122,7 @@ def extract_tightstrings( Returns: List[TightString]: A list of TightString objects representing the extracted tightstrings. """ - logger.info( - "extracting tightstrings from %d functions...", len(tightloop_functions) - ) + logger.info("extracting tightstrings from %d functions...", len(tightloop_functions)) tight_strings = list() pb = floss.utils.get_progress_bar( @@ -144,9 +136,7 @@ def extract_tightstrings( with floss.utils.timing(f"0x{fva:x}"): logger.debug("extracting tightstrings from function 0x%x", fva) if isinstance(pb, tqdm.tqdm): - pb.set_description( - f"extracting tightstrings from function 0x{fva:x}" - ) + pb.set_description(f"extracting tightstrings from function 0x{fva:x}") ctxs = extract_tightstring_contexts(vw, fva, min_length, tloops) for n, ctx in enumerate(ctxs, 1): @@ -156,14 +146,8 @@ def extract_tightstrings( ctx.init_sp - ctx.sp, ) logger.trace("pre_ctx strings: %s", ctx.pre_ctx_strings) - for s in extract_strings( - ctx.stack_memory, min_length, exclude=ctx.pre_ctx_strings - ): - frame_offset = ( - (ctx.init_sp - ctx.sp) - - s.offset - - floss.utils.getPointerSize(vw) - ) + for s in extract_strings(ctx.stack_memory, min_length, exclude=ctx.pre_ctx_strings): + frame_offset = (ctx.init_sp - ctx.sp) - s.offset - floss.utils.getPointerSize(vw) ts = TightString( function=fva, string=s.string, diff --git a/floss/utils.py b/floss/utils.py index 213644a97..3a20538cb 100644 --- a/floss/utils.py +++ b/floss/utils.py @@ -9,30 +9,25 @@ import argparse import builtins import contextlib -import inspect -import logging -import mmap -import re -import time -from collections import OrderedDict +from typing import Set, Tuple, Iterable, Optional from pathlib import Path -from typing import Iterable, Optional, Set, Tuple +from collections import OrderedDict -import envi.archs -import tabulate import tqdm +import tabulate +import vivisect import viv_utils +import envi.archs import viv_utils.emulator_drivers -import vivisect from envi import Emulator -import floss.logging_ import floss.strings +import floss.logging_ -from .api_hooks import ENABLED_VIV_DEFAULT_HOOKS -from .const import MAX_STRING_LENGTH, MEGABYTE, MOD_NAME +from .const import MEGABYTE, MOD_NAME, MAX_STRING_LENGTH from .results import StaticString from .strings import extract_ascii_unicode_strings +from .api_hooks import ENABLED_VIV_DEFAULT_HOOKS STACK_MEM_NAME = "[stack]" @@ -136,9 +131,7 @@ def make_emulator(vw) -> Emulator: emu.setStackCounter(emu.getStackCounter() - int(0.25 * MEGABYTE)) # do not short circuit rep prefix emu.setEmuOpt("i386:repmax", 256) # 0 == no limit on rep prefix - viv_utils.emulator_drivers.remove_default_viv_hooks( - emu, allow_list=ENABLED_VIV_DEFAULT_HOOKS - ) + viv_utils.emulator_drivers.remove_default_viv_hooks(emu, allow_list=ENABLED_VIV_DEFAULT_HOOKS) return emu @@ -214,9 +207,7 @@ def getPointerSize(vw): elif arch == "i386": return 4 else: - raise NotImplementedError( - "unexpected architecture: %s" % (vw.arch.__class__.__name__) - ) + raise NotImplementedError("unexpected architecture: %s" % (vw.arch.__class__.__name__)) def get_imagebase(vw): @@ -267,13 +258,10 @@ def get_vivisect_meta_info(vw, selected_functions, decoding_function_features): disc = vw.getDiscoveredInfo()[0] undisc = vw.getDiscoveredInfo()[1] if disc + undisc > 0: - info["percentage of discovered executable surface area"] = ( - "%.1f%% (%s / %s)" - % ( - disc * 100.0 / (disc + undisc), - disc, - disc + undisc, - ) + info["percentage of discovered executable surface area"] = "%.1f%% (%s / %s)" % ( + disc * 100.0 / (disc + undisc), + disc, + disc + undisc, ) info["base VA"] = baseva info["entry point(s)"] = ", ".join(map(hex, entry_points)) @@ -284,9 +272,7 @@ def get_vivisect_meta_info(vw, selected_functions, decoding_function_features): if selected_functions: meta = [] for fva in selected_functions: - if is_thunk_function(vw, fva) or viv_utils.flirt.is_library_function( - vw, fva - ): + if is_thunk_function(vw, fva) or viv_utils.flirt.is_library_function(vw, fva): continue xrefs_to = len(vw.getXrefsTo(fva)) @@ -296,9 +282,7 @@ def get_vivisect_meta_info(vw, selected_functions, decoding_function_features): block_count = function_meta.get("BlockCount") size = function_meta.get("Size") score = round(decoding_function_features.get(fva, {}).get("score", 0), 3) - meta.append( - (hex(fva), score, xrefs_to, num_args, size, block_count, instr_count) - ) + meta.append((hex(fva), score, xrefs_to, num_args, size, block_count, instr_count)) info["selected functions' info"] = "\n%s" % tabulate.tabulate( meta, headers=[ @@ -362,9 +346,7 @@ def hex(i): ) -def extract_strings( - buffer: bytes, min_length: int, exclude: Optional[Set[str]] = None -) -> Iterable[StaticString]: +def extract_strings(buffer: bytes, min_length: int, exclude: Optional[Set[str]] = None) -> Iterable[StaticString]: """Extracts potential strings from a buffer and applies filtering. Initial filtering includes length checks, common false-positive patterns, and optional exclusion based on a provided set. Extracted strings are then stripped or sanitized before yielding. @@ -664,9 +646,7 @@ def is_string_type_enabled(type_, disabled_types, enabled_types): return True -def get_max_size( - size: int, max_: int, api: Optional[Tuple] = None, argv: Optional[Tuple] = None -) -> int: +def get_max_size(size: int, max_: int, api: Optional[Tuple] = None, argv: Optional[Tuple] = None) -> int: """Get the maximum size for the given size. Args: @@ -727,9 +707,7 @@ def get_referenced_strings(vw: vivisect.VivWorkspace, fva: int) -> Set[str]: continue else: # see strings.py for why we don't include \r and \n - strings.update( - [ss.rstrip("\x00") for ss in re.split("\r\n", s)] - ) + strings.update([ss.rstrip("\x00") for ss in re.split("\r\n", s)]) return strings diff --git a/scripts/extract_rust_hashes.py b/scripts/extract_rust_hashes.py index 0bc1a1e74..52c9b9f57 100644 --- a/scripts/extract_rust_hashes.py +++ b/scripts/extract_rust_hashes.py @@ -31,9 +31,7 @@ print("Fetching Rust hashes from https://github.com/rust-lang/rust/releases...") while True: - r = requests.get( - "https://github.com/rust-lang/rust/releases?page={}".format(page_number) - ) + r = requests.get("https://github.com/rust-lang/rust/releases?page={}".format(page_number)) soup = BeautifulSoup(r.text, "html.parser") tables = soup.find_all( "div", @@ -46,9 +44,7 @@ # for each table, get the hash and version for table in tables: - hash = str( - table.find("a", attrs={"class": "Link Link--muted mb-2"})["href"] - ).split("/")[-1] + hash = str(table.find("a", attrs={"class": "Link Link--muted mb-2"})["href"]).split("/")[-1] version = table.find("span").text.strip() rust_hashes[hash] = version diff --git a/scripts/idaplugin.py b/scripts/idaplugin.py index 80d8b6b67..3c3a16225 100644 --- a/scripts/idaplugin.py +++ b/scripts/idaplugin.py @@ -8,24 +8,24 @@ author: Willi Ballenthin email: willi.ballenthin@gmail.com """ -import logging import os import time -from pathlib import Path +import logging from typing import List, Union +from pathlib import Path import idc import viv_utils import floss -import floss.identify import floss.main +import floss.utils import floss.render +import floss.identify import floss.stackstrings -import floss.string_decoder import floss.tightstrings -import floss.utils -from floss.results import AddressType, DecodedString, StackString, TightString +import floss.string_decoder +from floss.results import AddressType, StackString, TightString, DecodedString logger = logging.getLogger("floss.idaplugin") @@ -65,9 +65,7 @@ def append_comment(ea: int, s: str, repeatable: bool = False) -> None: idc.set_cmt(ea, cmt, False) -def append_lvar_comment( - fva: int, frame_offset: int, s: str, repeatable: bool = False -) -> None: +def append_lvar_comment(fva: int, frame_offset: int, s: str, repeatable: bool = False) -> None: """ add the given string as a (possibly repeatable) stack variable comment to the given function. does not add the comment if it already exists. @@ -89,15 +87,10 @@ def append_lvar_comment( idc.get_func_attr(fva, idc.FUNCATTR_FRSIZE) - frame_offset ) # alternative: idc.get_frame_lvar_size(fva) - frame_offset if not lvar_offset: - raise RuntimeError( - "failed to compute local variable offset: 0x%x 0x%x %s" % (fva, stack, s) - ) + raise RuntimeError("failed to compute local variable offset: 0x%x 0x%x %s" % (fva, stack, s)) if lvar_offset <= 0: - raise RuntimeError( - "failed to compute positive local variable offset: 0x%x 0x%x %s" - % (fva, stack, s) - ) + raise RuntimeError("failed to compute positive local variable offset: 0x%x 0x%x %s" % (fva, stack, s)) string = idc.get_member_cmt(stack, lvar_offset, repeatable) if not string: @@ -108,10 +101,7 @@ def append_lvar_comment( string = string + "\n" + s if not idc.set_member_cmt(stack, lvar_offset, string, repeatable): - raise RuntimeError( - "failed to set comment: 0x%08x 0x%08x 0x%08x: %s" - % (fva, stack, lvar_offset, s) - ) + raise RuntimeError("failed to set comment: 0x%08x 0x%08x 0x%08x: %s" % (fva, stack, lvar_offset, s)) def apply_decoded_strings(decoded_strings: List[DecodedString]) -> None: @@ -120,14 +110,10 @@ def apply_decoded_strings(decoded_strings: List[DecodedString]) -> None: continue if ds.address_type == AddressType.GLOBAL: - logger.info( - "decoded string at global address 0x%x: %s", ds.address, ds.string - ) + logger.info("decoded string at global address 0x%x: %s", ds.address, ds.string) append_comment(ds.address, ds.string) else: - logger.info( - "decoded string for function call at 0x%x: %s", ds.decoded_at, ds.string - ) + logger.info("decoded string for function call at 0x%x: %s", ds.decoded_at, ds.string) append_comment(ds.decoded_at, ds.string) @@ -191,16 +177,12 @@ def main(argv=None): time0 = time.time() logger.info("identifying decoding functions...") - decoding_function_features, library_functions = ( - floss.identify.find_decoding_function_features( - vw, selected_functions, disable_progress=True - ) + decoding_function_features, library_functions = floss.identify.find_decoding_function_features( + vw, selected_functions, disable_progress=True ) logger.info("extracting stackstrings...") - selected_functions = floss.identify.get_functions_without_tightloops( - decoding_function_features - ) + selected_functions = floss.identify.get_functions_without_tightloops(decoding_function_features) stack_strings = floss.stackstrings.extract_stackstrings( vw, selected_functions, @@ -211,9 +193,7 @@ def main(argv=None): logger.info("decoded %d stack strings", len(stack_strings)) logger.info("extracting tightstrings...") - tightloop_functions = floss.identify.get_functions_with_tightloops( - decoding_function_features - ) + tightloop_functions = floss.identify.get_functions_with_tightloops(decoding_function_features) tight_strings = floss.tightstrings.extract_tightstrings( vw, tightloop_functions, @@ -229,12 +209,8 @@ def main(argv=None): top_functions = floss.identify.get_top_functions(decoding_function_features, 20) fvas_to_emulate = floss.identify.get_function_fvas(top_functions) - fvas_tight_functions = floss.identify.get_tight_function_fvas( - decoding_function_features - ) - fvas_to_emulate = floss.identify.append_unique( - fvas_to_emulate, fvas_tight_functions - ) + fvas_tight_functions = floss.identify.get_tight_function_fvas(decoding_function_features) + fvas_to_emulate = floss.identify.append_unique(fvas_to_emulate, fvas_tight_functions) decoded_strings = floss.string_decoder.decode_strings( vw, fvas_to_emulate, diff --git a/scripts/render-binja-import-script.py b/scripts/render-binja-import-script.py index 81d1ab706..b24a283b3 100644 --- a/scripts/render-binja-import-script.py +++ b/scripts/render-binja-import-script.py @@ -21,10 +21,10 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import argparse +import sys import base64 import logging -import sys +import argparse from pathlib import Path from floss.results import AddressType, ResultDocument @@ -42,39 +42,27 @@ def render_binja_script(result_document: ResultDocument) -> str: b64 = base64.b64encode(ds.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) if ds.address_type == AddressType.GLOBAL: - main_commands.append( - 'print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' - % (ds.address, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s)' % (ds.address, b64) - ) + main_commands.append('print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' % (ds.address, b64)) + main_commands.append('AppendComment(%d, "FLOSS: " + %s)' % (ds.address, b64)) else: main_commands.append( - 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' - % (ds.decoded_at, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64) + 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' % (ds.decoded_at, b64) ) + main_commands.append('AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64)) main_commands.append('print("Imported decoded strings from FLOSS")') for ss in result_document.strings.stack_strings: if ss.string != "": b64 = base64.b64encode(ss.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) - main_commands.append( - 'AppendLvarComment(%d, "FLOSS stackstring: " + %s)' % (ss.function, b64) - ) + main_commands.append('AppendLvarComment(%d, "FLOSS stackstring: " + %s)' % (ss.function, b64)) main_commands.append('print("Imported stackstrings from FLOSS")') for ts in result_document.strings.tight_strings: if ts.string != "": b64 = base64.b64encode(ts.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) - main_commands.append( - 'AppendComment(%d, "FLOSS tightstring: " + %s)' % (ts.function, b64) - ) + main_commands.append('AppendComment(%d, "FLOSS tightstring: " + %s)' % (ts.function, b64)) main_commands.append('print("Imported tightstrings from FLOSS")') script_content = """import base64 @@ -137,18 +125,12 @@ def AppendLvarComment(fva, s): def main(): - parser = argparse.ArgumentParser( - description="Generate an Binary Ninja script to apply FLOSS results." - ) - parser.add_argument( - "/path/to/report.json", help="path to JSON document from `floss --json`" - ) + parser = argparse.ArgumentParser(description="Generate an Binary Ninja script to apply FLOSS results.") + parser.add_argument("/path/to/report.json", help="path to JSON document from `floss --json`") logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/scripts/render-ghidra-import-script.py b/scripts/render-ghidra-import-script.py index 4e8229cbf..77c9fc7d5 100644 --- a/scripts/render-ghidra-import-script.py +++ b/scripts/render-ghidra-import-script.py @@ -21,10 +21,10 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import argparse +import sys import base64 import logging -import sys +import argparse from pathlib import Path from floss.results import AddressType, ResultDocument @@ -42,39 +42,27 @@ def render_ghidra_script(result_document: ResultDocument) -> str: b64 = base64.b64encode(ds.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) if ds.address_type == AddressType.GLOBAL: - main_commands.append( - 'print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' - % (ds.address, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s)' % (ds.address, b64) - ) + main_commands.append('print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' % (ds.address, b64)) + main_commands.append('AppendComment(%d, "FLOSS: " + %s)' % (ds.address, b64)) else: main_commands.append( - 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' - % (ds.decoded_at, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64) + 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' % (ds.decoded_at, b64) ) + main_commands.append('AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64)) main_commands.append('print("Imported decoded strings from FLOSS")') for ss in result_document.strings.stack_strings: if ss.string != "": b64 = base64.b64encode(ss.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) - main_commands.append( - 'AppendLvarComment(%d, "FLOSS stackstring: " + %s)' % (ss.function, b64) - ) + main_commands.append('AppendLvarComment(%d, "FLOSS stackstring: " + %s)' % (ss.function, b64)) main_commands.append('print("Imported stackstrings from FLOSS")') for ts in result_document.strings.tight_strings: if ts.string != "": b64 = base64.b64encode(ts.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % (b64) - main_commands.append( - 'AppendComment(%d, "FLOSS tightstring: " + %s)' % (ts.function, b64) - ) + main_commands.append('AppendComment(%d, "FLOSS tightstring: " + %s)' % (ts.function, b64)) main_commands.append('print("Imported tightstrings from FLOSS")') script_content = """import base64 @@ -125,18 +113,12 @@ def AppendLvarComment(fva, s): def main(): - parser = argparse.ArgumentParser( - description="Generate an Ghidra script to apply FLOSS results." - ) - parser.add_argument( - "/path/to/report.json", help="path to JSON document from `floss --json`" - ) + parser = argparse.ArgumentParser(description="Generate an Ghidra script to apply FLOSS results.") + parser.add_argument("/path/to/report.json", help="path to JSON document from `floss --json`") logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/scripts/render-ida-import-script.py b/scripts/render-ida-import-script.py index 70d259009..51b260def 100644 --- a/scripts/render-ida-import-script.py +++ b/scripts/render-ida-import-script.py @@ -20,10 +20,10 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import argparse +import sys import base64 import logging -import sys +import argparse from pathlib import Path from floss.results import AddressType, ResultDocument @@ -41,21 +41,13 @@ def render_ida_script(result_document: ResultDocument) -> str: b64 = base64.b64encode(ds.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % b64 if ds.address_type == AddressType.GLOBAL: - main_commands.append( - 'print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' - % (ds.address, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s, True)' % (ds.address, b64) - ) + main_commands.append('print("FLOSS: string \\"%%s\\" at global VA 0x%x" %% (%s))' % (ds.address, b64)) + main_commands.append('AppendComment(%d, "FLOSS: " + %s, True)' % (ds.address, b64)) else: main_commands.append( - 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' - % (ds.decoded_at, b64) - ) - main_commands.append( - 'AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64) + 'print("FLOSS: string \\"%%s\\" decoded at VA 0x%x" %% (%s))' % (ds.decoded_at, b64) ) + main_commands.append('AppendComment(%d, "FLOSS: " + %s)' % (ds.decoded_at, b64)) main_commands.append('print("Imported decoded strings from FLOSS")') for ss in result_document.strings.stack_strings: @@ -63,8 +55,7 @@ def render_ida_script(result_document: ResultDocument) -> str: b64 = base64.b64encode(ss.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % b64 main_commands.append( - 'AppendLvarComment(%d, %d, "FLOSS stackstring: " + %s, True)' - % (ss.function, ss.frame_offset, b64) + 'AppendLvarComment(%d, %d, "FLOSS stackstring: " + %s, True)' % (ss.function, ss.frame_offset, b64) ) main_commands.append('print("Imported stackstrings from FLOSS")') @@ -73,8 +64,7 @@ def render_ida_script(result_document: ResultDocument) -> str: b64 = base64.b64encode(ts.string.encode("utf-8")).decode("ascii") b64 = 'base64.b64decode("%s").decode("utf-8")' % b64 main_commands.append( - 'AppendLvarComment(%d, %d, "FLOSS tightstring: " + %s, True)' - % (ts.function, ts.frame_offset, b64) + 'AppendLvarComment(%d, %d, "FLOSS tightstring: " + %s, True)' % (ts.function, ts.frame_offset, b64) ) main_commands.append('print("Imported tightstrings from FLOSS")') @@ -129,18 +119,12 @@ def main(): def main(): - parser = argparse.ArgumentParser( - description="Generate an IDA Python script to apply FLOSS results." - ) - parser.add_argument( - "/path/to/report.json", help="path to JSON document from `floss --json`" - ) + parser = argparse.ArgumentParser(description="Generate an IDA Python script to apply FLOSS results.") + parser.add_argument("/path/to/report.json", help="path to JSON document from `floss --json`") logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/scripts/render-r2-import-script.py b/scripts/render-r2-import-script.py index 0725529ce..703b2e5f1 100644 --- a/scripts/render-r2-import-script.py +++ b/scripts/render-r2-import-script.py @@ -21,10 +21,10 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import argparse +import sys import base64 import logging -import sys +import argparse from pathlib import Path from floss.results import AddressType, ResultDocument @@ -44,60 +44,36 @@ def render_r2_script(result_document: ResultDocument) -> str: b'"FLOSS: %s (floss_%x)"' % (ds.string.encode("utf-8"), ds.address) ).decode("ascii") if ds.address_type == AddressType.GLOBAL: - main_commands.append( - "CCu base64:%s @ %d" % (sanitized_string, ds.address) - ) + main_commands.append("CCu base64:%s @ %d" % (sanitized_string, ds.address)) if ds.decoding_routine not in fvas: main_commands.append("af @ %d" % (ds.decoding_routine)) - main_commands.append( - "afn floss_%x @ %d" % (ds.decoding_routine, ds.decoding_routine) - ) + main_commands.append("afn floss_%x @ %d" % (ds.decoding_routine, ds.decoding_routine)) fvas.append(ds.decoding_routine) else: - main_commands.append( - "CCu base64:%s @ %d" % (sanitized_string, ds.decoded_at) - ) + main_commands.append("CCu base64:%s @ %d" % (sanitized_string, ds.decoded_at)) if ds.decoding_routine not in fvas: main_commands.append("af @ %d" % (ds.decoding_routine)) - main_commands.append( - "afn floss_%x @ %d" % (ds.decoding_routine, ds.decoding_routine) - ) + main_commands.append("afn floss_%x @ %d" % (ds.decoding_routine, ds.decoding_routine)) fvas.append(ds.decoding_routine) for ss in result_document.strings.stack_strings: if ss.string != "": - sanitized_string = base64.b64encode( - b'"FLOSS: %s"' % ss.string.encode("utf-8") - ).decode("ascii") - main_commands.append( - "Ca -0x%x base64:%s @ %d" - % (ss.frame_offset, sanitized_string, ss.function) - ) + sanitized_string = base64.b64encode(b'"FLOSS: %s"' % ss.string.encode("utf-8")).decode("ascii") + main_commands.append("Ca -0x%x base64:%s @ %d" % (ss.frame_offset, sanitized_string, ss.function)) for ts in result_document.strings.tight_strings: if ts.string != "": - sanitized_string = base64.b64encode( - b'"FLOSS: %s"' % ts.string.encode("utf-8") - ).decode("ascii") - main_commands.append( - "Ca -0x%x base64:%s @ %d" - % (ts.frame_offset, sanitized_string, ts.function) - ) + sanitized_string = base64.b64encode(b'"FLOSS: %s"' % ts.string.encode("utf-8")).decode("ascii") + main_commands.append("Ca -0x%x base64:%s @ %d" % (ts.frame_offset, sanitized_string, ts.function)) return "\n".join(main_commands) def main(): - parser = argparse.ArgumentParser( - description="Generate an radare2 script to apply FLOSS results." - ) - parser.add_argument( - "/path/to/report.json", help="path to JSON document from `floss --json`" - ) + parser = argparse.ArgumentParser(description="Generate an radare2 script to apply FLOSS results.") + parser.add_argument("/path/to/report.json", help="path to JSON document from `floss --json`") logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/scripts/render-x64dbg-database.py b/scripts/render-x64dbg-database.py index 4dfc82553..99665b614 100644 --- a/scripts/render-x64dbg-database.py +++ b/scripts/render-x64dbg-database.py @@ -20,14 +20,14 @@ is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import argparse -import dataclasses +import sys import json import logging -import sys -from dataclasses import field -from pathlib import Path +import argparse +import dataclasses from typing import Dict, List +from pathlib import Path +from dataclasses import field from pydantic.dataclasses import dataclass @@ -79,18 +79,12 @@ def render_x64dbg_database(result_document: ResultDocument) -> str: def main(): - parser = argparse.ArgumentParser( - description="Generate an x64dbg script to apply FLOSS results." - ) - parser.add_argument( - "/path/to/report.json", help="path to JSON document from `floss --json`" - ) + parser = argparse.ArgumentParser(description="Generate an x64dbg script to apply FLOSS results.") + parser.add_argument("/path/to/report.json", help="path to JSON document from `floss --json`") logging_group = parser.add_argument_group("logging arguments") - logging_group.add_argument( - "-d", "--debug", action="store_true", help="enable debugging output on STDERR" - ) + logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( "-q", "--quiet", diff --git a/tests/conftest.py b/tests/conftest.py index 277b6d374..ccdf9e3b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,19 +3,22 @@ import os from pathlib import Path +import yaml import pytest import viv_utils -import yaml import floss.main as floss_main import floss.stackstrings as stackstrings -import floss.string_decoder as string_decoder import floss.tightstrings as tightstrings +import floss.string_decoder as string_decoder from floss.const import MIN_STRING_LENGTH -from floss.identify import (find_decoding_function_features, get_function_fvas, - get_functions_with_tightloops, - get_functions_without_tightloops, - get_top_functions) +from floss.identify import ( + get_function_fvas, + get_top_functions, + get_functions_with_tightloops, + find_decoding_function_features, + get_functions_without_tightloops, +) def extract_strings(vw): @@ -29,26 +32,20 @@ def extract_strings(vw): ): yield s_decoded.string - no_tightloop_functions = get_functions_without_tightloops( - decoding_function_features - ) + no_tightloop_functions = get_functions_without_tightloops(decoding_function_features) for s_stack in stackstrings.extract_stackstrings( vw, no_tightloop_functions, MIN_STRING_LENGTH, disable_progress=True ): yield s_stack.string tightloop_functions = get_functions_with_tightloops(decoding_function_features) - for s_tight in tightstrings.extract_tightstrings( - vw, tightloop_functions, MIN_STRING_LENGTH, disable_progress=True - ): + for s_tight in tightstrings.extract_tightstrings(vw, tightloop_functions, MIN_STRING_LENGTH, disable_progress=True): yield s_tight.string def identify_decoding_functions(vw): selected_functions = floss_main.select_functions(vw, None) - decoding_function_features, _ = find_decoding_function_features( - vw, selected_functions, disable_progress=True - ) + decoding_function_features, _ = find_decoding_function_features(vw, selected_functions, disable_progress=True) top_functions = get_top_functions(decoding_function_features, 20) return top_functions, decoding_function_features @@ -104,9 +101,7 @@ class FLOSSDecodingFunctionNotFound(Exception): class FLOSSTest(pytest.Item): def __init__(self, parent, path, platform, arch, filename, spec): - name = "{name:s}::{platform:s}::{arch:s}".format( - name=spec["Test Name"], platform=platform, arch=arch - ) + name = "{name:s}::{platform:s}::{arch:s}".format(name=spec["Test Name"], platform=platform, arch=arch) super(FLOSSTest, self).__init__(name, parent) self.spec = spec self.platform = platform @@ -132,9 +127,7 @@ def _test_strings(self, test_path): def _test_detection(self, test_path): try: - expected_functions = set( - self.spec["Decoding routines"][self.platform][self.arch] - ) + expected_functions = set(self.spec["Decoding routines"][self.platform][self.arch]) except KeyError: expected_functions = set([]) diff --git a/tests/fixtures.py b/tests/fixtures.py index 24e019074..b09af4ae7 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -16,12 +16,5 @@ def exefile() -> str: @pytest.fixture def scfile() -> str: - path = ( - CD - / "data" - / "src" - / "shellcode-stackstrings" - / "bin" - / "shellcode-stackstrings.bin" - ) + path = CD / "data" / "src" / "shellcode-stackstrings" / "bin" / "shellcode-stackstrings.bin" return str(path) diff --git a/tests/test_cli_args.py b/tests/test_cli_args.py index 0a8fd41ce..8f3317924 100644 --- a/tests/test_cli_args.py +++ b/tests/test_cli_args.py @@ -1,7 +1,7 @@ # Copyright (C) 2021 Mandiant, Inc. All Rights Reserved. import pytest -from fixtures import exefile, scfile +from fixtures import scfile, exefile import floss.main diff --git a/tests/test_language_extract_go.py b/tests/test_language_extract_go.py index 6f5a161b9..79dca8975 100644 --- a/tests/test_language_extract_go.py +++ b/tests/test_language_extract_go.py @@ -2,37 +2,21 @@ import pytest -from floss.language.go.extract import extract_go_strings from floss.results import StaticString, StringEncoding +from floss.language.go.extract import extract_go_strings @pytest.fixture(scope="module") def go_strings32(): n = 6 - path = ( - pathlib.Path(__file__).parent - / "data" - / "language" - / "go" - / "go-hello" - / "bin" - / "go-hello.exe" - ) + path = pathlib.Path(__file__).parent / "data" / "language" / "go" / "go-hello" / "bin" / "go-hello.exe" return extract_go_strings(path, n) @pytest.fixture(scope="module") def go_strings64(): n = 6 - path = ( - pathlib.Path(__file__).parent - / "data" - / "language" - / "go" - / "go-hello" - / "bin" - / "go-hello64.exe" - ) + path = pathlib.Path(__file__).parent / "data" / "language" / "go" / "go-hello" / "bin" / "go-hello64.exe" return extract_go_strings(path, n) @@ -41,22 +25,16 @@ def go_strings64(): [ # .data:00534944 A0 35 4A 00 dd offset aAdaptivestacks ; "adaptivestackstart" # .data:00534948 12 db 12h - pytest.param( - "adaptivestackstart", 0xA1BA0, StringEncoding.UTF8, "go_strings32" - ), + pytest.param("adaptivestackstart", 0xA1BA0, StringEncoding.UTF8, "go_strings32"), # .data:00534944 A0 35 4A 00 dd offset aAdaptivestacks ; "adaptivestackstart" # .data:00534948 12 db 12h - pytest.param( - "adaptivestackstart", 0xA9E33, StringEncoding.UTF8, "go_strings64" - ), + pytest.param("adaptivestackstart", 0xA9E33, StringEncoding.UTF8, "go_strings64"), ], ) def test_data_string_offset(request, string, offset, encoding, go_strings): # .data:0000000000541568 33 A8 4A 00 00 00… dq offset aAdaptivestacks ; "adaptivestackstart" # .data:0000000000541570 12 db 12h - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -71,9 +49,7 @@ def test_data_string_offset(request, string, offset, encoding, go_strings): ], ) def test_lea_mov(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -83,21 +59,15 @@ def test_lea_mov(request, string, offset, encoding, go_strings): # .text:0000000000404296 BB 14 00 00 00 mov ebx, 14h # .text:000000000040429B 0F 1F 44 00 00 nop dword ptr [rax+rax+00h] # .text:00000000004042A0 E8 DB 16 03 00 call runtime_printstring - pytest.param( - "write of Go pointer ", 0xAAAC2, StringEncoding.UTF8, "go_strings64" - ), + pytest.param("write of Go pointer ", 0xAAAC2, StringEncoding.UTF8, "go_strings64"), # NOTE: for 32-bit, the string is present in binary file but is not referenced by any instruction # 004A4200 6E 6F 74 20 65 6D 70 74 79 77 72 69 74 65 20 6F not emptywrite o # 004A4210 66 20 47 6F 20 70 6F 69 6E 74 65 72 20 77 73 32 f Go pointer ws2 - pytest.param( - "write of Go pointer ", 0xA2809, StringEncoding.UTF8, "go_strings32" - ), + pytest.param("write of Go pointer ", 0xA2809, StringEncoding.UTF8, "go_strings32"), ], ) def test_lea_mov2(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -108,21 +78,15 @@ def test_lea_mov2(request, string, offset, encoding, go_strings): # .text:00000000004032F2 48 89 DE mov rsi, rbx # .text:00000000004032F5 31 C0 xor eax, eax # .text:00000000004032F7 48 8D 1D A6 A2 0A 00 lea rbx, unk_4AD5A4 - pytest.param( - "comparing uncomparable type ", 0xACBA4, StringEncoding.UTF8, "go_strings64" - ), + pytest.param("comparing uncomparable type ", 0xACBA4, StringEncoding.UTF8, "go_strings64"), # .text:00403276 8D 15 64 63 4A 00 lea edx, unk_4A6364 # .text:0040327C 89 54 24 04 mov [esp+1Ch+var_18], edx # .text:00403280 C7 44 24 08 1C 00 00 00 mov [esp+1Ch+var_14], 1Ch - pytest.param( - "comparing uncomparable type ", 0xA4964, StringEncoding.UTF8, "go_strings32" - ), + pytest.param("comparing uncomparable type ", 0xA4964, StringEncoding.UTF8, "go_strings32"), ], ) def test_mov_lea(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -131,22 +95,16 @@ def test_mov_lea(request, string, offset, encoding, go_strings): # .text:00000000004467E4 48 8D 05 7E 67 06 00 lea rax, aOutOfMemorySta ; "out of memory (stackalloc)" # .text:00000000004467EB BB 1A 00 00 00 mov ebx, 1Ah # .text:00000000004467F0 E8 4B CF FE FF call runtime_throw - pytest.param( - "out of memory (stackalloc)", 0xAC569, StringEncoding.UTF8, "go_strings64" - ), + pytest.param("out of memory (stackalloc)", 0xAC569, StringEncoding.UTF8, "go_strings64"), # NOTE: for 32-bit, the string is present in binary file but is not referenced by any instruction # 004A5D00 20 64 6F 75 62 6C 65 20 77 61 6B 65 75 70 6F 75 double wakeupou # 004A5D10 74 20 6F 66 20 6D 65 6D 6F 72 79 20 28 73 74 61 t of memory (sta # 004A5D20 63 6B 61 6C 6C 6F 63 29 70 65 72 73 69 73 74 65 ckalloc)persiste - pytest.param( - "out of memory (stackalloc)", 0xA430E, StringEncoding.UTF8, "go_strings32" - ), + pytest.param("out of memory (stackalloc)", 0xA430E, StringEncoding.UTF8, "go_strings32"), ], ) def test_lea_mov_call(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -155,21 +113,15 @@ def test_lea_mov_call(request, string, offset, encoding, go_strings): # .text:0000000000481211 48 C7 40 10 19 00 00 00 mov qword ptr [rax+10h], 19h # .text:0000000000481219 48 8D 0D 71 B6 02 00 lea rcx, aExpandenvironm ; "ExpandEnvironmentStringsW" # .text:0000000000481220 48 89 48 08 mov [rax+8], rcx - pytest.param( - "ExpandEnvironmentStringsW", 0xABE91, StringEncoding.UTF8, "go_strings64" - ), + pytest.param("ExpandEnvironmentStringsW", 0xABE91, StringEncoding.UTF8, "go_strings64"), # .text:0047EACA C7 40 0C 19 00 00 00 mov dword ptr [eax+0Ch], 19h # .text:0047EAD1 8D 0D 36 56 4A 00 lea ecx, unk_4A5636 # .text:0047EAD7 89 48 08 mov [eax+8], ecx - pytest.param( - "ExpandEnvironmentStringsW", 0xA3C36, StringEncoding.UTF8, "go_strings32" - ), + pytest.param("ExpandEnvironmentStringsW", 0xA3C36, StringEncoding.UTF8, "go_strings32"), ], ) def test_mov_lea_mov(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.parametrize( @@ -177,20 +129,14 @@ def test_mov_lea_mov(request, string, offset, encoding, go_strings): [ # 00000000004AB080 20 6E 6F 74 20 66 6F 75 6E 64 20 6D 61 72 6B 72 not found markr # 00000000004AB090 6F 6F 74 20 6A 6F 62 73 20 64 6F 6E 65 0A 20 74 oot jobs done. t - pytest.param( - " markroot jobs done\n", 0xAA68A, StringEncoding.UTF8, "go_strings64" - ), + pytest.param(" markroot jobs done\n", 0xAA68A, StringEncoding.UTF8, "go_strings64"), # 004A3DE0 66 6F 75 6E 64 20 6D 61 72 6B 72 6F 6F 74 20 6A found markroot j # 004A3DF0 6F 62 73 20 64 6F 6E 65 0A 20 74 6F 20 75 6E 61 obs done. to una - pytest.param( - " markroot jobs done\n", 0xA23E5, StringEncoding.UTF8, "go_strings32" - ), + pytest.param(" markroot jobs done\n", 0xA23E5, StringEncoding.UTF8, "go_strings32"), ], ) def test_strings_with_newline_char_0A(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) @pytest.mark.skip(reason="not extracted via go_strings") @@ -216,6 +162,4 @@ def test_strings_with_newline_char_0A(request, string, offset, encoding, go_stri ], ) def test_import_data(request, string, offset, encoding, go_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(go_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(go_strings) diff --git a/tests/test_language_extract_rust.py b/tests/test_language_extract_rust.py index d08b33447..9d210ce0c 100644 --- a/tests/test_language_extract_rust.py +++ b/tests/test_language_extract_rust.py @@ -2,37 +2,21 @@ import pytest -from floss.language.rust.extract import extract_rust_strings from floss.results import StaticString, StringEncoding +from floss.language.rust.extract import extract_rust_strings @pytest.fixture(scope="module") def rust_strings32(): n = 6 - path = ( - pathlib.Path(__file__).parent - / "data" - / "language" - / "rust" - / "rust-hello" - / "bin" - / "rust-hello.exe" - ) + path = pathlib.Path(__file__).parent / "data" / "language" / "rust" / "rust-hello" / "bin" / "rust-hello.exe" return extract_rust_strings(path, n) @pytest.fixture(scope="module") def rust_strings64(): n = 6 - path = ( - pathlib.Path(__file__).parent - / "data" - / "language" - / "rust" - / "rust-hello" - / "bin" - / "rust-hello64.exe" - ) + path = pathlib.Path(__file__).parent / "data" / "language" / "rust" / "rust-hello" / "bin" / "rust-hello64.exe" return extract_rust_strings(path, n) @@ -54,9 +38,7 @@ def rust_strings64(): ], ) def test_data_string_offset(request, string, offset, encoding, rust_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(rust_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings) @pytest.mark.parametrize( @@ -66,15 +48,11 @@ def test_data_string_offset(request, string, offset, encoding, rust_strings): # .text:000000014002115C 48 8D 74 24 20 lea rsi, [rsp+38h+var_18] # .text:0000000140021161 41 B9 0B 00 00 00 mov r9d, 11 pytest.param("AccessError", 0xBCB88, StringEncoding.UTF8, "rust_strings64"), - pytest.param( - "already destroyed", 0xBCB93, StringEncoding.UTF8, "rust_strings64" - ), + pytest.param("already destroyed", 0xBCB93, StringEncoding.UTF8, "rust_strings64"), ], ) def test_lea_mov(request, string, offset, encoding, rust_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(rust_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings) @pytest.mark.parametrize( @@ -83,15 +61,11 @@ def test_lea_mov(request, string, offset, encoding, rust_strings): # .text:0041EF8C 68 50 08 4B 00 push offset unk_4B0850 ; "AccessError" # .text:0041EFB8 68 5B 08 4B 00 push offset unk_4B085B "already destroyed" pytest.param("AccessError", 0xAE850, StringEncoding.UTF8, "rust_strings32"), - pytest.param( - "already destroyed", 0xAE85B, StringEncoding.UTF8, "rust_strings32" - ), + pytest.param("already destroyed", 0xAE85B, StringEncoding.UTF8, "rust_strings32"), ], ) def test_push(request, string, offset, encoding, rust_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(rust_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings) @pytest.mark.parametrize( @@ -100,13 +74,9 @@ def test_push(request, string, offset, encoding, rust_strings): # .text:0046B04A BA 1A 00 00 00 mov edx, 1Ah ; jumptable 0046A19C case 8752 # .text:0046B04F B9 A0 C2 4B 00 mov ecx, offset unk_4BC2A0 # .text:0046B054 E9 93 F8 FF FF jmp loc_46A8EC ; jumptable 0046A1CA case 0 - pytest.param( - "DW_AT_SUN_return_value_ptr", 0xBA2A0, StringEncoding.UTF8, "rust_strings32" - ), + pytest.param("DW_AT_SUN_return_value_ptr", 0xBA2A0, StringEncoding.UTF8, "rust_strings32"), pytest.param("DW_AT_SUN_c_vla", 0xBA2BA, StringEncoding.UTF8, "rust_strings32"), ], ) def test_mov_jmp(request, string, offset, encoding, rust_strings): - assert StaticString( - string=string, offset=offset, encoding=encoding - ) in request.getfixturevalue(rust_strings) + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings) diff --git a/tests/test_language_go_coverage.py b/tests/test_language_go_coverage.py index ea1885076..b746332b4 100644 --- a/tests/test_language_go_coverage.py +++ b/tests/test_language_go_coverage.py @@ -1,12 +1,12 @@ -import contextlib import pathlib +import contextlib import pefile import pytest -from floss.language.go.coverage import get_extract_stats -from floss.language.go.extract import extract_go_strings from floss.utils import get_static_strings +from floss.language.go.extract import extract_go_strings +from floss.language.go.coverage import get_extract_stats @pytest.mark.parametrize( diff --git a/tests/test_language_go_known_binary.py b/tests/test_language_go_known_binary.py index 05e584062..a282ac1cb 100644 --- a/tests/test_language_go_known_binary.py +++ b/tests/test_language_go_known_binary.py @@ -38,9 +38,7 @@ def _extract_files(zip_file_name, extracted_dir_name): / extracted_dir_name / zip_info.filename ).resolve() - extracted_file = zip_ref.extract( - zip_info, path=extracted_file_path.parent - ) + extracted_file = zip_ref.extract(zip_info, path=extracted_file_path.parent) extracted_files.append(extracted_file) yield diff --git a/tests/test_language_id.py b/tests/test_language_id.py index cddd13853..b4d4993ff 100644 --- a/tests/test_language_id.py +++ b/tests/test_language_id.py @@ -2,9 +2,8 @@ import pytest -from floss.language.identify import (VERSION_UNKNOWN_OR_NA, Language, - identify_language_and_version) from floss.utils import get_static_strings +from floss.language.identify import VERSION_UNKNOWN_OR_NA, Language, identify_language_and_version @pytest.mark.parametrize( @@ -41,9 +40,5 @@ def test_language_detection(binary_file, expected_result, expected_version): language, version = identify_language_and_version(abs_path, static_strings) - assert ( - language == expected_result - ), f"Expected: {expected_result.value}, Actual: {language.value}" - assert ( - version == expected_version - ), f"Expected: {expected_version}, Actual: {version}" + assert language == expected_result, f"Expected: {expected_result.value}, Actual: {language.value}" + assert version == expected_version, f"Expected: {expected_version}, Actual: {version}" diff --git a/tests/test_language_rust_coverage.py b/tests/test_language_rust_coverage.py index ce31841c5..f6cc25bba 100644 --- a/tests/test_language_rust_coverage.py +++ b/tests/test_language_rust_coverage.py @@ -1,12 +1,12 @@ -import contextlib import pathlib +import contextlib import pefile import pytest -from floss.language.rust.extract import extract_rust_strings -from floss.language.utils import get_extract_stats from floss.strings import extract_ascii_unicode_strings +from floss.language.utils import get_extract_stats +from floss.language.rust.extract import extract_rust_strings @pytest.mark.parametrize( diff --git a/tests/test_language_rust_known_binary.py b/tests/test_language_rust_known_binary.py index e709e4649..5eae74833 100644 --- a/tests/test_language_rust_known_binary.py +++ b/tests/test_language_rust_known_binary.py @@ -38,9 +38,7 @@ def _extract_files(zip_file_name, extracted_dir_name): / extracted_dir_name / zip_info.filename ).resolve() - extracted_file = zip_ref.extract( - zip_info, path=extracted_file_path.parent - ) + extracted_file = zip_ref.extract(zip_info, path=extracted_file_path.parent) extracted_files.append(extracted_file) yield @@ -66,30 +64,14 @@ def extract_files_32(request, extract_files): @pytest.mark.parametrize( "binary_file", [ - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.56.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.58.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.60.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.62.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.64.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.66.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.68.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.70.0.exe" - ), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.56.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.58.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.60.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.62.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.64.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.66.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.68.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_64/rust1.70.0.exe"), ], ) def test_language_detection_64(binary_file, extract_files_64): @@ -119,30 +101,14 @@ def test_language_detection_64(binary_file, extract_files_64): @pytest.mark.parametrize( "binary_file", [ - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.56.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.58.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.60.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.62.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.64.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.66.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.68.0.exe" - ), - ( - "data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.70.0.exe" - ), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.56.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.58.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.60.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.62.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.64.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.66.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.68.0.exe"), + ("data/language/rust/rust-binaries-all-versions/bin/extracted_32/rust1.70.0.exe"), ], ) def test_language_detection_32(binary_file, extract_files_32): diff --git a/tests/test_main.py b/tests/test_main.py index 982acd648..9cae69c2a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,7 +1,7 @@ # Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. -import fixtures import pytest +import fixtures from fixtures import exefile import floss.main diff --git a/tests/test_memdiff.py b/tests/test_memdiff.py index 2f0e19db9..a86d1f59d 100644 --- a/tests/test_memdiff.py +++ b/tests/test_memdiff.py @@ -1,7 +1,7 @@ # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. -import envi.memory import pytest +import envi.memory from floss.string_decoder import memdiff diff --git a/tests/test_render.py b/tests/test_render.py index 28f825ea7..e382127b2 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -1,9 +1,18 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +from floss.results import ( + Strings, + Analysis, + Metadata, + AddressType, + StackString, + TightString, + StaticString, + DecodedString, + ResultDocument, + StringEncoding, +) from floss.render.default import render -from floss.results import (AddressType, Analysis, DecodedString, Metadata, - ResultDocument, StackString, StaticString, - StringEncoding, Strings, TightString) def test_render_rich_markup(): @@ -14,11 +23,7 @@ def test_render_rich_markup(): ), analysis=Analysis(), strings=Strings( - static_strings=[ - StaticString( - string="[/<]one", offset=1033749, encoding=StringEncoding.ASCII - ) - ], + static_strings=[StaticString(string="[/<]one", offset=1033749, encoding=StringEncoding.ASCII)], tight_strings=[ TightString( function=0x4000000, diff --git a/tests/test_scripts.py b/tests/test_scripts.py index d569216d9..6cfaa72b4 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -6,10 +6,10 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import subprocess import sys -from functools import lru_cache +import subprocess from pathlib import Path +from functools import lru_cache import pytest @@ -33,9 +33,7 @@ def run_program(script_path: Path, args): @lru_cache() def get_results_file_path(): res_path = Path("results.json") - p = run_program( - Path("floss/main.py"), ["--no", "static", "-j", str(get_file_path())] - ) + p = run_program(Path("floss/main.py"), ["--no", "static", "-j", str(get_file_path())]) with res_path.open("w") as f: f.write(p.stdout.decode("utf-8")) return str(res_path)