diff --git a/docs/change_log.md b/docs/change_log.md index 9749bb5..0579014 100644 --- a/docs/change_log.md +++ b/docs/change_log.md @@ -1,6 +1,15 @@ # Change Log -## Current +## Current (0.1.8) + +- Add ability for single-file mods to be run by pymhf. ([gh-19](https://github.com/monkeyman192/pyMHF/issues/19)) +- Changed the config system to use toml files. + +### 0.1.7 (10/10/2024) + +- Implement ability to call overloaded functions which have patterns. +- Improve safety of hooking functions and keyboard bindings as well as GUI reload fix. +- Added functions to set the main window active ([gh-6](https://github.com/monkeyman192/pyMHF/issues/6)) - Contributed by `Foundit3923` ### 0.1.6 (08/09/2024) diff --git a/docs/settings.md b/docs/settings.md index c22310d..d0ada27 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -1,4 +1,6 @@ -# pyMHF settings file +# pyMHF settings file [OBSOLETE] + +**TODO: Update these settings** *pyMHF* contains a file called `pymhf.cfg` which (currently) must be situated within the root directory of the modding library (cf. [here](../writing_libraries.md)) This file has a number of properties in different sections. Some are required and others are not: diff --git a/docs/single_file_mods.md b/docs/single_file_mods.md new file mode 100644 index 0000000..b2ae3a1 --- /dev/null +++ b/docs/single_file_mods.md @@ -0,0 +1,67 @@ +# Single-file mods + +While `pymhf` is designed to be used to create libraries upon which mods can be made, for smaller scale projects or small tests, this is not convenient. +To accomodate this, `pymhf` can run a single file containing a mod. + +This file **MAY** have the following attributes defined in it to simplify the defintions: + +`__pymhf_func_binary__` (`str`): The name of the binary all the function offsets/patterns are to be found relative to/in. If provided this will be the default for all functions, however any `manual_hook` with a different value provided will supersede this value. +`__pymhf_func_offsets__` (`dict[str, Union[int, dict[str, int]]]`): A lookup containing the function names and offsets. Generally the keys will be the function names that you want to call the function by, and the values will simply be the offset relative to the start of the binary. For overloads the value can be another dictionary with keys being some identifier for the overload (eg. the argument types), and the values being the offsets. +`__pymhf_func_call_sigs__` (`dict[str, Union[FUNCDEF, dict[str, FUNCDEF]]]`): A lookup containing the function names and function call signatures. The keys will be the function names that you want to call the function by, and the values will either be the `FUNCDEF` itself, or a dictionary similar to the `__pymhf_func_offsets__` mapping. +`__pymhf_func_patterns__` (`dict[str, Union[str, dict[str, str]]]`): A lookup containing the function names and byte patterns to find the functions at. The keys will be the function names that you want to call the function by, and the values will be either a string containing the pattern, or a dictionary similar to the `__pymhf_func_offsets__` mapping. + +Note that none of these are mandatory, however they do simplify the code in the file a decent amount and keep the file more easy to maintain. + +Below is an example single-file mod for the game No Man's Sky: + +```py +# /// script +# dependencies = ["pymhf"] +# +# [tool.pymhf] +# exe = "NMS.exe" +# steam_gameid = 275850 +# start_paused = false +# +# [tool.pymhf.logging] +# log_dir = "." +# log_level = "info" +# window_name_override = "NMS test mod" +# /// + +from logging import getLogger + +from pymhf import Mod, load_mod_file +from pymhf.core.hooking import on_key_pressed + +logger = getLogger("testmod") + + +class MyMod(Mod): + @on_key_pressed("p") + def press_p(self): + logger.info("Pressed P now!") + + +if __name__ == "__main__": + load_mod_file(__file__) +``` + +This mod clearly has very little functionality, but the above highlights the minimum requirements to make a single-file mod using `pymhf`. + +To run this mod it is **strongly** recommended to use [uv](https://github.com/astral-sh/uv) as it has full support for inline script metadata. + +The steps to run the above script from scratch (ie. with no `uv` installed) are as follows: + +``` +python -m pip install uv +uv run script.py +``` + +In the above ensure that the `python` command runs a python version between 3.9 and 3.11 INCLUSIVE. +Replace `script.py` with the name of the script as it was saved. + +To modify the above script, the only values that really need to be changed are the `steam_gameid` and `exe` values. + +If the game is being ran via steam, replace `steam_gameid` with the value found in steam, and set `exe` to be the name of the game binary. +If the game or program is not run through steam, remove the `steam_gameid` value and instead set `exe` and the absolute path to the binary. diff --git a/pymhf/__init__.py b/pymhf/__init__.py index 3052d7b..eb8a10c 100644 --- a/pymhf/__init__.py +++ b/pymhf/__init__.py @@ -1,5 +1,4 @@ import argparse -import configparser import os import os.path as op import shutil @@ -10,7 +9,7 @@ from .core._types import FUNCDEF # noqa from .core.hooking import FuncHook # noqa from .core.mod_loader import Mod, ModState # noqa -from .main import load_module # noqa +from .main import load_mod_file, load_module # noqa try: __version__ = version("pymhf") @@ -48,13 +47,14 @@ def _is_int(val: str) -> bool: ], ) +CFG_FILENAME = "pymhf.toml" + -# TODO: -# Need to support the following commands: -# --config -> will configure the library def run(): """Main entrypoint which can be used to run programs with pymhf. - This will take the first argument as the name of a module which has been installed.""" + This will take the first argument as the name of a module which has been installed. + """ + from .utils.parse_toml import read_pymhf_settings, write_pymhf_settings parser = argparse.ArgumentParser( prog="pyMHF program runner", @@ -72,6 +72,15 @@ def run(): plugin_name: str = args.plugin_name is_config_mode: bool = args.config + standalone = False + + if op.isfile(plugin_name) and op.exists(plugin_name): + # In this case we are running in stand-alone mode + standalone = True + + if standalone: + load_mod_file(plugin_name) + return # Get the location of app data, then construct the expected folder name. appdata_data = os.environ.get("APPDATA", op.expanduser("~")) @@ -116,7 +125,7 @@ def run(): module_dir = op.dirname(loaded_lib.__file__) - cfg_file = op.join(module_dir, "pymhf.cfg") + cfg_file = op.join(module_dir, CFG_FILENAME) config_progress_file = op.join(cfg_folder, ".config_in_progress") if not op.exists(cfg_file): print( @@ -125,7 +134,7 @@ def run(): ) return else: - dst = op.join(cfg_folder, "pymhf.cfg") + dst = op.join(cfg_folder, CFG_FILENAME) if not op.exists(dst) or op.exists(config_progress_file): # In this case we can prompt the user to enter the config values which need to be changed. initial_config = True @@ -136,51 +145,44 @@ def run(): # Write the file which indicates we are in progress. with open(config_progress_file, "w") as f: f.write("") - config = configparser.ConfigParser() - if not config.read(dst): - print("Cannot read config file for some reason... Exiting") - return + pymhf_settings = read_pymhf_settings(cfg_file) # Modify some of the values in the config file, allowing the user to enter the values they want. if (mod_dir := MOD_DIR_Q.ask()) is not None: - config.set("binary", "mod_dir", mod_dir) + pymhf_settings["mod_dir"] = mod_dir else: return # Write the config back and then delete the temporary file only once everything is ok. - with open(dst, "w") as f: - config.write(f) + write_pymhf_settings(pymhf_settings, dst) os.remove(config_progress_file) initial_config = False elif is_config_mode: - config = configparser.ConfigParser() - if not config.read(dst): - print("Cannot read config file for some reason... Exiting") - return + pymhf_settings = read_pymhf_settings(dst) keep_going = True while keep_going: config_choice = CONFIG_SELECT_Q.ask() if config_choice == CFG_OPT_BIN_PATH: if (exe_path := EXE_PATH_Q.ask()) is not None: - config.set("binary", "path", exe_path) - config.remove_option("binary", "steam_gameid") + pymhf_settings["exe_path"] = exe_path + del pymhf_settings["steam_gameid"] else: return elif config_choice == CFG_OPT_STEAM_ID: if (steam_id := STEAM_ID_Q.ask()) is not None: - config.set("binary", "steam_gameid", steam_id) - config.remove_option("binary", "path") + pymhf_settings["steam_gameid"] = steam_id + del pymhf_settings["exe_path"] else: return elif config_choice == CFG_OPT_MOD_PATH: if (mod_dir := MOD_DIR_Q.ask()) is not None: - config.set("binary", "mod_dir", mod_dir) + pymhf_settings["mod_dir"] = mod_dir else: return elif config_choice == CFG_OPT_START_PAUSED: if (start_paused := START_PAUSED.ask()) is not None: - config.set("binary", "start_paused", str(start_paused)) + pymhf_settings["start_paused"] = start_paused else: return elif config_choice is None: @@ -189,8 +191,7 @@ def run(): keep_going = CONTINUE_CONFIGURING_Q.ask() if keep_going is None: return - with open(dst, "w") as f: - config.write(f) + write_pymhf_settings(dst, pymhf_settings) if not RUN_GAME.ask(): return diff --git a/pymhf/core/_internal.py b/pymhf/core/_internal.py index 4b0f546..5030638 100644 --- a/pymhf/core/_internal.py +++ b/pymhf/core/_internal.py @@ -1,3 +1,4 @@ +import ctypes from concurrent.futures import ThreadPoolExecutor CWD: str = "" @@ -8,8 +9,11 @@ BINARY_HASH: str = "" BASE_ADDRESS: int = -1 SIZE_OF_IMAGE: int = -1 -CFG_DIR: str = "" +CONFIG: dict = {} EXE_NAME: str = "" +BINARY_PATH: str = "" +SINGLE_FILE_MOD: bool = False +MOD_SAVE_DIR: str = "" _executor: ThreadPoolExecutor = None # type: ignore @@ -30,3 +34,5 @@ def game_loaded(self, val: bool): GameState: _GameState = _GameState() + +imports: dict[str, dict[str, ctypes._CFuncPtr]] = {} diff --git a/pymhf/core/_types.py b/pymhf/core/_types.py index 48c1fb7..18c1930 100644 --- a/pymhf/core/_types.py +++ b/pymhf/core/_types.py @@ -19,6 +19,7 @@ class KeyPressProtocol(Protocol): class HookProtocol(Protocol): _is_funchook: bool _is_manual_hook: bool + _is_imported_func_hook: bool _has__result_: bool _hook_func_name: str _hook_time: DetourTime @@ -33,3 +34,8 @@ class ManualHookProtocol(HookProtocol): _hook_pattern: Optional[str] _hook_binary: Optional[str] _hook_func_def: FUNCDEF + + +class ImportedHookProtocol(HookProtocol): + _dll_name: str + _hook_func_def: FUNCDEF diff --git a/pymhf/core/calling.py b/pymhf/core/calling.py index ac234fe..1226bfd 100644 --- a/pymhf/core/calling.py +++ b/pymhf/core/calling.py @@ -4,18 +4,23 @@ import pymhf.core._internal as _internal from pymhf.core._types import FUNCDEF -from pymhf.core.memutils import find_pattern_in_binary +from pymhf.core.errors import UnknownFunctionError +from pymhf.core.memutils import _get_binary_info, find_pattern_in_binary from pymhf.core.module_data import module_data +from pymhf.core.utils import saferun_decorator calling_logger = getLogger("CallingManager") +@saferun_decorator def call_function( name: str, *args, overload: Optional[str] = None, pattern: Optional[str] = None, func_def: Optional[FUNCDEF] = None, + offset: Optional[int] = None, + binary: Optional[str] = None, ) -> Any: """Call a named function. @@ -31,32 +36,43 @@ def call_function( pattern The pattern which can be used to find where the function is. If provided this will be used instead of the offset as determined by the name. + offset + The offset relative to the binary. + If provided it will take precedence over the `pattern` argument. + binary + The name of the binary to search for the pattern within, or to find the offset relative to. + If not provided, will fallback to the name of the binary as provided by the `exe` config value. """ if func_def is not None: _sig = func_def else: _sig = module_data.FUNC_CALL_SIGS[name] - if pattern: - offset = find_pattern_in_binary(pattern, False) - else: - if (_pattern := module_data.FUNC_PATTERNS.get(name)) is not None: - if isinstance(_pattern, str): - offset = find_pattern_in_binary(_pattern, False) - else: - if (opattern := _pattern.get(overload)) is not None: - offset = find_pattern_in_binary(opattern, False) - else: - first = list(_pattern.items())[0] - calling_logger.warning(f"No pattern overload was provided for {name}. ") - calling_logger.warning(f"Falling back to the first overload ({first[0]})") - offset = find_pattern_in_binary(first[1], False) + # If we have a binary defined in module_data, use it. + binary = binary or module_data.FUNC_BINARY + if offset is None: + if pattern: + offset = find_pattern_in_binary(pattern, False, binary) else: - offset = module_data.FUNC_OFFSETS.get(name) - if offset is None: - raise NameError(f"Cannot find function {name}") + if (_pattern := module_data.FUNC_PATTERNS.get(name)) is not None: + if isinstance(_pattern, str): + offset = find_pattern_in_binary(_pattern, False, binary) + else: + if (opattern := _pattern.get(overload)) is not None: + offset = find_pattern_in_binary(opattern, False, binary) + else: + first = list(_pattern.items())[0] + calling_logger.warning(f"No pattern overload was provided for {name}. ") + calling_logger.warning(f"Falling back to the first overload ({first[0]})") + offset = find_pattern_in_binary(first[1], False, binary) + else: + offset = module_data.FUNC_OFFSETS.get(name) + if offset is None: + raise UnknownFunctionError(f"Cannot find function {name}") + if isinstance(_sig, FUNCDEF): sig = CFUNCTYPE(_sig.restype, *_sig.argtypes) - else: + elif isinstance(_sig, dict): + # TODO: Check to see if _sig is actually a dict. If it's not then we should raise an error. # Look up the overload: if (osig := _sig.get(overload)) is not None: # type: ignore sig = CFUNCTYPE(osig.restype, *osig.argtypes) @@ -68,6 +84,11 @@ def call_function( calling_logger.warning(f"No function arguments overload was provided for {name}. ") calling_logger.warning(f"Falling back to the first overload ({first[0]})") sig = CFUNCTYPE(first[1].restype, *first[1].argtypes) + else: + raise ValueError( + f"Invalid data type provided for `sig` argument: {type(_sig)}. Must be one of FUNCDEF or a " + "dictionary containing a mapping of FUNCDEF objects representing overloads." + ) if isinstance(offset, dict): # Handle overloads if (_offset := offset.get(overload)) is not None: # type: ignore @@ -77,6 +98,12 @@ def call_function( calling_logger.warning(f"No function arguments overload was provided for {name}. ") calling_logger.warning(f"Falling back to the first overload ({_offset[0]})") offset = _offset[1] + binary_base = _internal.BASE_ADDRESS + # TODO: This is inefficient to look it up every time. This should be optimised at some point. + if binary is not None: + if (hm := _get_binary_info(binary)) is not None: + _, module = hm + binary_base = module.lpBaseOfDll - cfunc = sig(_internal.BASE_ADDRESS + offset) + cfunc = sig(binary_base + offset) return cfunc(*args) diff --git a/pymhf/core/common.py b/pymhf/core/common.py index bd3cbaf..0d8f6ad 100644 --- a/pymhf/core/common.py +++ b/pymhf/core/common.py @@ -1,12 +1,4 @@ -import os -import os.path as op from concurrent.futures import ThreadPoolExecutor -import pymhf.core._internal as _internal - # TODO: Move somewhere else? Not sure where but this doesn't really fit here... executor: ThreadPoolExecutor = None # type: ignore - -mod_save_dir = op.join(_internal.CFG_DIR, "MOD_SAVES") -if not op.exists(mod_save_dir): - os.makedirs(mod_save_dir) diff --git a/pymhf/core/hooking.py b/pymhf/core/hooking.py index 2449173..9a1799e 100644 --- a/pymhf/core/hooking.py +++ b/pymhf/core/hooking.py @@ -1,4 +1,5 @@ import ast +import ctypes import inspect import logging import traceback @@ -14,12 +15,11 @@ FUNCDEF, DetourTime, HookProtocol, + ImportedHookProtocol, KeyPressProtocol, ManualHookProtocol, ) - -# from pymhf.core.errors import UnknownFunctionError -from pymhf.core.memutils import find_pattern_in_binary +from pymhf.core.memutils import _get_binary_info, find_pattern_in_binary from pymhf.core.module_data import module_data # from pymhf.core.caching import function_cache, pattern_cache @@ -52,7 +52,8 @@ class FuncHook(cyminhook.MinHook): _name: str _should_enable: bool _invalid: bool = False - _call_func: Optional[FUNCDEF] + _func_def: Optional[FUNCDEF] + _offset_is_absolute: bool def __init__( self, @@ -60,14 +61,27 @@ def __init__( *, offset: Optional[int] = None, pattern: Optional[int] = None, - call_func: Optional[FUNCDEF] = None, + func_def: Optional[FUNCDEF] = None, overload: Optional[str] = None, binary: Optional[str] = None, + offset_is_absolute: bool = False, ): - self._offset = offset + self._offset_is_absolute = offset_is_absolute + if self._offset_is_absolute: + self.target = offset + self._offset = None + else: + self._offset = offset self._pattern = pattern - self._call_func = call_func + self._func_def = func_def self._binary = binary + if self._binary is not None: + if (hm := _get_binary_info(self._binary)) is not None: + _, module = hm + self._binary_base = module.lpBaseOfDll + else: + self._binary = _internal.EXE_NAME + self._binary_base = _internal.BASE_ADDRESS self._before_detours: list[HookProtocol] = [] self._after_detours: list[HookProtocol] = [] @@ -96,7 +110,7 @@ def _init(self): offset = None # 1. Check to see if an offset is provided. If so, use this as the offset to find the address at. if self._offset is not None: - self.target = _internal.BASE_ADDRESS + self._offset + self.target = self._binary_base + self._offset # 2. If there is no offset provided, check to see if a pattern is provided. If so use this to find # the offset. @@ -107,7 +121,7 @@ def _init(self): if offset is not None: self._offset = offset pattern_cache[self._pattern] = offset - self.target = _internal.BASE_ADDRESS + offset + self.target = self._binary_base + offset hook_logger.debug(f"Found {self._pattern} at 0x{offset:X}") else: hook_logger.error(f"Could not find pattern {self._pattern}... Hook won't be added") @@ -135,19 +149,21 @@ def _init(self): if offset is not None: self._offset = offset pattern_cache[_pattern] = offset - self.target = _internal.BASE_ADDRESS + offset + self.target = self._binary_base + offset hook_logger.debug(f"Found {self.name} with pattern {_pattern!r} at 0x{offset:X}") # 4. If there is still no offset, look up the offset in the module_data. + # Note: If this is created by passing in an absolute offset, then the above 3 conditions will fail, + # but this one will not as self.target will have already been assigned. if not self.target: _offset = module_data.FUNC_OFFSETS.get(self._name) if _offset is not None: if isinstance(_offset, int): - self.target = _internal.BASE_ADDRESS + _offset + self.target = self._binary_base + _offset else: # This is an overload if (overload_offset := _offset.get(self.overload)) is not None: - self.target = _internal.BASE_ADDRESS + overload_offset + self.target = self._binary_base + overload_offset else: # Need to fallback on something. Raise a warning that no # overload was defined and that it will fallback to the @@ -155,15 +171,15 @@ def _init(self): first = list(_offset.items())[0] hook_logger.warning(f"No overload was provided for {self._name}. ") hook_logger.warning(f"Falling back to the first overload ({first[0]})") - self.target = _internal.BASE_ADDRESS + first[1] + self.target = self._binary_base + first[1] else: hook_logger.error(f"Cannot find the function {self._name} in {_internal.EXE_NAME}") self._invalid = True return # 5. if func_sig is provided, use it, otherwise look it up. - if self._call_func is not None: - self.signature = CFUNCTYPE(self._call_func.restype, *self._call_func.argtypes) + if self._func_def is not None: + self.signature = CFUNCTYPE(self._func_def.restype, *self._func_def.argtypes) else: if (sig := module_data.FUNC_CALL_SIGS.get(self._name)) is not None: if isinstance(sig, FUNCDEF): @@ -374,8 +390,9 @@ def disable(self): self.state = "disabled" @property - def offset(self): - return self.target - _internal.BASE_ADDRESS + def offset(self) -> int: + """The relative offset of the target to the binary base.""" + return self.target - self._binary_base class HookFactory: @@ -398,6 +415,7 @@ def _set_detour_as_funchook( """Set all the standard attributes required for a function hook.""" setattr(detour, "_is_funchook", True) setattr(detour, "_is_manual_hook", False) + setattr(detour, "_is_imported_func_hook", False) setattr(detour, "_has__result_", False) if cls: if hasattr(cls, "_overload"): @@ -473,8 +491,8 @@ def manual_hook( """ def inner(detour: Callable[..., Any]) -> ManualHookProtocol: - if offset is None and pattern is None: - raise ValueError(f"One of pattern or offset must be set for the manual hook: {detour}") + # if offset is None and pattern is None: + # raise ValueError(f"One of pattern or offset must be set for the manual hook: {detour}") HookFactory._set_detour_as_funchook(detour, None, name) if detour_time == "before": setattr(detour, "_hook_time", DetourTime.BEFORE) @@ -501,6 +519,18 @@ def disable(obj): return obj +def imported(dll_name: str, func_name: str, func_def: FUNCDEF): + def inner(detour: Callable[..., Any]) -> ImportedHookProtocol: + HookFactory._set_detour_as_funchook(detour, None, func_name) + setattr(detour, "_dll_name", dll_name) + setattr(detour, "_is_imported_func_hook", True) + setattr(detour, "_hook_func_def", func_def) + setattr(detour, "_hook_time", DetourTime.AFTER) + return detour + + return inner + + def one_shot(func: HookProtocol) -> HookProtocol: """Run this detour once only.""" setattr(func, "_is_one_shot", True) @@ -539,7 +569,8 @@ def __init__(self): def resolve_dependencies(self): """Resolve dependencies of hooks. This will get all the functions which are to be hooked and construct - compound hooks as required.""" + compound hooks as required. + """ # TODO: Make work. pass @@ -575,8 +606,7 @@ def call_custom_callbacks(self, callback_key: str, detour_time: DetourTime = Det cb() def register_hook(self, hook: HookProtocol): - """Register a hook. There will be on of these for each function which is hooked and each one may - have multiple methods assigned to it.""" + """Register the provided hook.""" hook_func_name = hook._hook_func_name # If the hook has an overload, add it here so that we can disambiguate them. if getattr(hook, "_func_overload", None) is not None: @@ -589,18 +619,56 @@ def register_hook(self, hook: HookProtocol): if hook._is_manual_hook: hook = cast(ManualHookProtocol, hook) funcdef = hook._hook_func_def + if funcdef is None: + # If no funcdef is explicitly provided, look and see if we have one defined in the module + # data for this function name. + funcdef = module_data.FUNC_CALL_SIGS.get(hook_func_name) if funcdef is None: raise SyntaxError( "When creating a manual hook, the first detour for any given name MUST have a " "func_def argument." ) + if (offset := hook._hook_offset) is None: + offset = module_data.FUNC_OFFSETS.get(hook_func_name) + if (pattern := hook._hook_offset) is None: + pattern = module_data.FUNC_PATTERNS.get(hook_func_name) + if offset is None and pattern is None: + hook_logger.error( + f"The manual hook for {hook_func_name} was defined with no offset or pattern. One of" + "these is required to register a hook. The hook will not be registered." + ) + return + binary = hook._hook_binary or module_data.FUNC_BINARY self.hooks[hook_func_name] = FuncHook( hook._hook_func_name, - offset=hook._hook_offset, - pattern=hook._hook_pattern, - call_func=funcdef, - binary=hook._hook_binary, + offset=offset, + pattern=pattern, + func_def=funcdef, + binary=binary, + ) + elif hook._is_imported_func_hook: + hook = cast(ImportedHookProtocol, hook) + dll_name = hook._dll_name.lower() + hook_func_name = hook._hook_func_name + hook_func_def = hook._hook_func_def + hook_logger.info( + f"Trying to load imported hook: {dll_name}.{hook_func_name} with func def: " + f"{hook_func_def}" ) + if (dll_func_ptrs := _internal.imports.get(dll_name)) is not None: + func_ptr = dll_func_ptrs.get(hook_func_name) + # For now, cast the func_ptr object back to the target location in memory. + # This is wasteful, but simple for now for testing... + target = ctypes.cast(func_ptr, ctypes.c_void_p).value + hook_logger.info(f"{func_ptr} points to 0x{target:X}") + self.hooks[hook_func_name] = FuncHook( + hook._hook_func_name, + offset=target, + func_def=hook_func_def, + offset_is_absolute=True, + ) + else: + hook_logger.error(f"Cannot find {dll_name} in the import list") else: # TODO: have a way to differentiate the binary here. self.hooks[hook_func_name] = FuncHook( @@ -627,10 +695,13 @@ def initialize_hooks(self) -> int: # Try and enable the hook. try: hook.enable() - hook_logger.info( - f"Enabled hook for {hook_name} at {_internal.EXE_NAME}+" - f"0x{hook.target - _internal.BASE_ADDRESS:X}" - ) + if hook._offset_is_absolute: + offset = hook.target + prefix = "" + else: + offset = hook.offset + prefix = f"{hook._binary}+" + hook_logger.info(f"Enabled hook for {hook_name} at {prefix}0x{offset:X}") except Exception: hook_logger.error(f"Unable to enable {hook_name} because:") hook_logger.error(traceback.format_exc()) diff --git a/pymhf/core/importing.py b/pymhf/core/importing.py index ba81790..5f9164f 100644 --- a/pymhf/core/importing.py +++ b/pymhf/core/importing.py @@ -16,7 +16,8 @@ def _clean_name(name: str) -> str: """Remove any disallowed characters from the filename so that we get a - valid module name.""" + valid module name. + """ out = "" for char in name: if char not in VALID_CHARS: diff --git a/pymhf/core/logging.py b/pymhf/core/logging.py index 256a808..4d1c770 100644 --- a/pymhf/core/logging.py +++ b/pymhf/core/logging.py @@ -15,9 +15,29 @@ def flush(self): pass -def open_log_console(log_script: str, module_path: str, name_override: str = "pymhf console") -> int: - """Open the logging console and return the pid of it.""" - cmd = ["cmd.exe", "/c", "start", name_override, "python", log_script, module_path] +def open_log_console(log_script: str, log_dir: str, name_override: str = "pymhf console") -> int: + """Open the logging console and return the pid of it. + + Parameters + ---------- + log_script + Path to the logging script to be run. + log_dir: + Path where the log files will be written to. + name_override: + Name to override the default "pymhf console" window title. + """ + # TODO: The following doesn't quite work... Need to figure out why. + # The issue it is meant to fix is passing in non-ascii name overrides. + # if '"' not in name_override: + # # Wrap the string explicitly in double quotation marks so that it will be used correctly in the case + # # of non-ascii characters. + # name_override = f'''"{name_override}"''' + # print(f"name override: {name_override!r}") + # else: + # # For now, leave this as an edge case... + # pass + cmd = ["cmd.exe", "/c", "start", name_override, "python", log_script, log_dir] with subprocess.Popen(cmd) as proc: log_ppid = proc.pid for proc in psutil.process_iter(["pid", "name", "ppid"]): diff --git a/pymhf/core/memutils.py b/pymhf/core/memutils.py index 40dddf9..78b3a74 100644 --- a/pymhf/core/memutils.py +++ b/pymhf/core/memutils.py @@ -1,4 +1,3 @@ -import configparser import ctypes import logging import sys @@ -45,11 +44,9 @@ # Temporary solution to create a mapping of pattern/binary pairs to the offset within the binary. offset_cache = {} -config = configparser.ConfigParser() - def getsize(obj): - """sum size of object & members.""" + """Sum size of object & members.""" if isinstance(obj, BLACKLIST): raise TypeError("getsize() does not take argument of type: " + str(type(obj))) seen_ids = set() @@ -153,6 +150,7 @@ def get_field_info(obj, logger=None, indent: int = 0, as_hex: bool = True, max_d def get_addressof(obj) -> int: + """Get the address in memory of some object.""" try: # If it's a pointer, this is the branch that is used. return ctypes.cast(obj, ctypes.c_void_p).value @@ -232,6 +230,21 @@ def pattern_to_bytes(patt: str) -> bytes: return b"".join([f"\\x{x}".encode() if x != "??" else b"." for x in split]) +def _get_binary_info(binary: str) -> Optional[tuple[int, MODULEINFO]]: + if binary not in hm_cache: + try: + pm_process = pymem.Pymem(_internal.EXE_NAME) + handle = pm_process.process_handle + if (module := cache.module_map.get(binary)) is None: + return None + hm_cache[binary] = (handle, module) + return (handle, module) + except TypeError: + return None + else: + return hm_cache[binary] + + def find_pattern_in_binary( pattern: str, return_multiple: bool = False, @@ -244,20 +257,14 @@ def find_pattern_in_binary( """ if binary is None: binary = _internal.EXE_NAME - if binary not in hm_cache: - try: - pm_process = pymem.Pymem(_internal.EXE_NAME) - handle = pm_process.process_handle - if (module := cache.module_map.get(binary)) is None: - return None - hm_cache[binary] = (handle, module) - except TypeError: - return None + hm = _get_binary_info(binary) + if not hm: + return None + handle, module = hm # Create a key which is the original pattern and the binary so that we may cache the result. key = (pattern, binary) if (_offset := offset_cache.get(key)) is not None: return _offset - handle, module = hm_cache[binary] patt = pattern_to_bytes(pattern) _offset = pymem.pattern.pattern_scan_module(handle, module, patt, return_multiple=return_multiple) if _offset: diff --git a/pymhf/core/mod_loader.py b/pymhf/core/mod_loader.py index 872b886..77c5609 100644 --- a/pymhf/core/mod_loader.py +++ b/pymhf/core/mod_loader.py @@ -3,6 +3,7 @@ # Mods will consist of a single file which will generally contain a number of # hooks. +import ctypes import importlib import importlib.util import inspect @@ -23,11 +24,12 @@ from packaging.version import parse as parse_version import pymhf.core._internal as _internal -import pymhf.core.common as common from pymhf.core._types import HookProtocol from pymhf.core.errors import NoSaveError from pymhf.core.hooking import HookManager from pymhf.core.importing import import_file +from pymhf.core.memutils import get_addressof, map_struct +from pymhf.core.module_data import module_data from pymhf.core.utils import does_pid_have_focus, saferun from pymhf.gui.protocols import ButtonProtocol, VariableProtocol @@ -112,8 +114,8 @@ def object_hook(seld, obj: dict): class ModState(ABC): - """A class which is used as a base class to indicate that the class is to be - used as a mod state. + """A class which is used as a base class to indicate that the class is to be used as a mod state. + Mod State classes will persist across mod reloads so any variables set in it will have the same value after the mod has been reloaded. """ @@ -121,6 +123,7 @@ class ModState(ABC): _save_fields_: tuple[str] def save(self, name: str): + """Save the current mod state to file.""" _data = {} if hasattr(self, "_save_fields_") and self._save_fields_: for field in self._save_fields_: @@ -135,12 +138,20 @@ def save(self, name: str): "have the _save_fields_ attribute. State was not saved" ) return - with open(op.join(common.mod_save_dir, name), "w") as fobj: + if not _internal.MOD_SAVE_DIR: + _internal.MOD_SAVE_DIR = op.join(_internal.MODULE_PATH, "MOD_SAVES") + mod_logger.warning( + f"No mod_save_dir config value set. Please set one. Falling back to {_internal.MOD_SAVE_DIR}" + ) + if not op.exists(_internal.MOD_SAVE_DIR): + os.makedirs(_internal.MOD_SAVE_DIR) + with open(op.join(_internal.MOD_SAVE_DIR, name), "w") as fobj: json.dump(_data, fobj, cls=StructEncoder, indent=1) def load(self, name: str): + """Load the mod state from file.""" try: - with open(op.join(common.mod_save_dir, name), "r") as f: + with open(op.join(_internal.MOD_SAVE_DIR, name), "r") as f: data = json.load(f, cls=StructDecoder) except FileNotFoundError as e: raise NoSaveError from e @@ -198,10 +209,10 @@ def __init__(self, hook_manager: HookManager): def _load_module(self, module: ModuleType) -> bool: """Load a mod from the provided module. + This will be called when initially loading the mods, and also when we wish to reload a mod. """ - d: dict[str, type[Mod]] = dict( inspect.getmembers(module, partial(_is_mod_predicate, ref_module=module)) ) @@ -249,12 +260,48 @@ def _load_module(self, module: ModuleType) -> bool: self._mod_paths[mod_name] = module return True - def load_mod(self, fpath) -> bool: - """Load a mod from the given filepath.""" + def load_mod(self, fpath) -> Optional[ModuleType]: + """Load a mod from the given filepath. + + This returns the loaded module if it contains a valid mod and can be loaded correctly. + """ module = import_file(fpath) if module is None: - return False - return self._load_module(module) + return None + if self._load_module(module): + return module + + # TODO: Can probably move the duplicated functionality between this and the next method into a single + # function. + def load_single_mod(self, fpath: str, bind: bool = True): + """Load a single mod file. + + Params + ------ + folder + The path of the folder to be loaded. All mod files within this directory will be loaded and + installed. + bind + Whether or not to actual bind and initialize the hooks within the mod. + This should almost always be True except when loading the internal mods initially since it's not + necessary. + If this function is called with False, then it MUST be called again with True before the hooks + are enabled. + """ + self.load_mod(fpath) + # Once all the mods in the folder have been loaded, then parse the mod + # for function hooks and register then with the hook loader. + loaded_mods = len(self._preloaded_mods) + for _mod in self._preloaded_mods.values(): + self.instantiate_mod(_mod) + + self._preloaded_mods.clear() + + bound_hooks = 0 + if bind: + bound_hooks = self.hook_manager.initialize_hooks() + + return loaded_mods, bound_hooks def load_mod_folder(self, folder: str, bind: bool = True) -> tuple[int, int]: """Load the mod folder. @@ -352,7 +399,7 @@ def reload(self, name: str, gui: "GUI"): del sys.modules[module.__name__] # Then, add everything back. - self.load_mod(module.__file__) + new_module = self.load_mod(module.__file__) for _mod in self._preloaded_mods.values(): mod = self.instantiate_mod(_mod) @@ -361,8 +408,46 @@ def reload(self, name: str, gui: "GUI"): if mod_state := self.mod_states.get(name): for ms in mod_state: field, state = ms + member_req_reinst = {} + for x in inspect.getmembers(state): + member, member_type = x + if not member.startswith("__"): + if ( + _module := getattr(member_type.__class__, "__module__", None) + ) is not None and isinstance(member_type, ctypes.Structure): + if _module == module.__spec__.name: + # In this case, the instance of the attribute in the ModState was + # defined in the module that is being reloaded. We need to + # re-instantiate it so that we can get any potential changes to + # it. + member_req_reinst[member] = member_type + logging.info(f"{member}: {_module}") + logging.info( + f"Reinstantiating the following members: {list(member_req_reinst.keys())}" + ) + for name, type_ in member_req_reinst.items(): + data_offset = get_addressof(type_) + new_obj_type_name = type_.__class__.__name__ + mod_logger.info(f"{name} is of type {new_obj_type_name}") + new_obj_type = getattr(new_module, new_obj_type_name) + new_obj = map_struct(data_offset, new_obj_type) + setattr(state, name, new_obj) + del member_type setattr(mod, field, state) + # Check also to see if the file had any module-level __pymhf attributes which we might + # to update the `module_data` with. + _new_binary = getattr(new_module, "__pymhf_func_binary__", None) + _new_offsets = getattr(new_module, "__pymhf_func_offsets__", {}) + _new_patterns = getattr(new_module, "__pymhf_func_patterns__", {}) + _new_func_call_sigs = getattr(new_module, "__pymhf_func_call_sigs__", {}) + + if _new_binary: + module_data.FUNC_BINARY = _new_binary + module_data.FUNC_OFFSETS.update(_new_offsets) + module_data.FUNC_PATTERNS.update(_new_patterns) + module_data.FUNC_CALL_SIGS.update(_new_func_call_sigs) + # Return to GUI land to reload the mod. gui.reload_tab(mod) diff --git a/pymhf/core/module_data.py b/pymhf/core/module_data.py index 523505c..4e32a10 100644 --- a/pymhf/core/module_data.py +++ b/pymhf/core/module_data.py @@ -1,7 +1,4 @@ -# TODO: This is imported already by some things. We maybe need to create an -# object which contains this data as attributes so that we may set them after -# import more easily. -from typing import Union +from typing import Optional, Union from pymhf.core._types import FUNCDEF @@ -18,6 +15,7 @@ class ModuleData: FUNC_OFFSETS: dict[str, Union[int, dict[str, int]]] FUNC_PATTERNS: dict[str, Union[str, dict[str, str]]] FUNC_CALL_SIGS: dict[str, Union[FUNCDEF, dict[str, FUNCDEF]]] + FUNC_BINARY: Optional[str] = None module_data = ModuleData() diff --git a/pymhf/core/utils.py b/pymhf/core/utils.py index 8fa09b1..f15dfb0 100644 --- a/pymhf/core/utils.py +++ b/pymhf/core/utils.py @@ -1,7 +1,7 @@ import logging from collections.abc import Callable -from configparser import ConfigParser from ctypes import byref, c_ulong, create_unicode_buffer, windll +from functools import wraps from typing import Optional import pywinctl as pwc @@ -86,7 +86,8 @@ def get_main_window(): def safe_assign_enum(enum, index: int): """Safely try and get the enum with the associated integer value. - If the index isn't one in the enum return the original index.""" + If the index isn't one in the enum return the original index. + """ try: return enum(index) except ValueError: @@ -117,36 +118,54 @@ def does_pid_have_focus(pid: int) -> bool: return pid == get_foreground_pid() -class AutosavingConfig(ConfigParser): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._filename: str - self._encoding: Optional[str] +# TODO: Do something about this... +# class AutosavingConfig(ConfigParser): +# def __init__(self, *args, **kwargs): +# super().__init__(*args, **kwargs) +# self._filename: str +# self._encoding: Optional[str] - def read(self, filenames, encoding=None): - super().read(filenames, encoding) - self._filename = filenames - self._encoding = encoding +# def read(self, filenames, encoding=None): +# super().read(filenames, encoding) +# self._filename = filenames +# self._encoding = encoding - def set(self, section: str, option: str, value=None): - if value is not None: - val = str(value) - else: - val = value - try: - super().set(section, option, val) - with open(self._filename, "w", encoding=self._encoding) as f: - self.write(f, space_around_delimiters=True) - except Exception: - logger.exception("Error saving file") +# def set(self, section: str, option: str, value=None): +# if value is not None: +# val = str(value) +# else: +# val = value +# try: +# super().set(section, option, val) +# with open(self._filename, "w", encoding=self._encoding) as f: +# self.write(f, space_around_delimiters=True) +# except Exception: +# logger.exception("Error saving file") def saferun(func, *args, **kwargs): """Safely run the specified function with args and kwargs. - Any exception raised will be shown and ignored""" + + Any exception raised will be shown and ignored + """ ret = None try: ret = func(*args, **kwargs) except Exception: logger.exception(f"There was an exception while calling {func}:") return ret + + +def saferun_decorator(func): + """Safely run the decorated function so that any errors are caught and logged.""" + + @wraps(func) + def inner(*args, **kwargs): + ret = None + try: + ret = func(*args, **kwargs) + except Exception: + logger.exception(f"There was an exception while calling {func}:") + return ret + + return inner diff --git a/pymhf/gui/gui.py b/pymhf/gui/gui.py index d4fefaf..1c82921 100644 --- a/pymhf/gui/gui.py +++ b/pymhf/gui/gui.py @@ -7,7 +7,6 @@ import dearpygui.dearpygui as dpg from pymhf.core.mod_loader import Mod, ModManager -from pymhf.core.utils import AutosavingConfig from pymhf.gui.protocols import ButtonProtocol, VariableProtocol, VariableType SETTINGS_NAME = "_pymhf_gui_settings" @@ -32,9 +31,9 @@ class Widgets(TypedDict): class GUI: - def __init__(self, mod_manager: ModManager, config: AutosavingConfig): + def __init__(self, mod_manager: ModManager, config: dict): self.config = config - self.scale = config.getint("gui", "scale", fallback=1) + self.scale = config.get("gui", {}).get("scale", 1) dpg.create_context() dpg.create_viewport( title="pyMHF", diff --git a/pymhf/injected.py b/pymhf/injected.py index 56d051e..342e649 100644 --- a/pymhf/injected.py +++ b/pymhf/injected.py @@ -31,24 +31,21 @@ import pymhf.core._internal as _internal from pymhf.core.importing import import_file - from pymhf.core.utils import AutosavingConfig + from pymhf.utils.config import canonicalize_setting - # # Before any pymhf imports occur, set the os.environ value for the - # # binary hash: - # if _internal.BINARY_HASH: - # os.environ["PYMHF_BINARY_HASH"] = _internal.BINARY_HASH - # else: - # # If there is no binary hash, something has gone wrong. Exit now since - # # we can't continue. - # sys.exit(-1) + log_level = _internal.CONFIG.get("logging", {}).get("log_level", "info") + log_level = "info" - config = AutosavingConfig() - cfg_file = op.join(_internal.CFG_DIR, "pymhf.cfg") - read = config.read(cfg_file) - log_level = config.get("pymhf", "log_level", fallback="info") + _module_path = _internal.MODULE_PATH + if op.isfile(_module_path): + _module_path = op.dirname(_module_path) + _binary_dir = op.dirname(_internal.BINARY_PATH) - internal_mod_folder = config.get("binary", "internal_mod_dir", fallback=None) - mod_folder = config.get("binary", "mod_dir", fallback=None) + internal_mod_folder = _internal.CONFIG.get("internal_mod_dir") + internal_mod_folder = canonicalize_setting(internal_mod_folder, "pymhf", _module_path, _binary_dir) + mod_folder = _internal.CONFIG.get("mod_dir") + + mod_folder = canonicalize_setting(mod_folder, "pymhf", _module_path, _binary_dir) debug_mode = log_level.lower() == "debug" if debug_mode: @@ -58,9 +55,23 @@ from pymhf.core.module_data import module_data - module_data.FUNC_OFFSETS = getattr(module.__pymhf_functions__, "FUNC_OFFSETS", {}) - module_data.FUNC_PATTERNS = getattr(module.__pymhf_functions__, "FUNC_PATTERNS", {}) - module_data.FUNC_CALL_SIGS = getattr(module.__pymhf_functions__, "FUNC_CALL_SIGS", {}) + _pymhf_functions_ = getattr(module, "__pymhf_functions__", None) + if _pymhf_functions_ is not None: + module_data.FUNC_OFFSETS = getattr(module.__pymhf_functions__, "FUNC_OFFSETS", {}) + module_data.FUNC_PATTERNS = getattr(module.__pymhf_functions__, "FUNC_PATTERNS", {}) + module_data.FUNC_CALL_SIGS = getattr(module.__pymhf_functions__, "FUNC_CALL_SIGS", {}) + else: + # Can also try and see if there are any other "magic" attributes assigned, otherwise fallback to + # empty dicts + module_data.FUNC_BINARY = getattr(module, "__pymhf_func_binary__", None) + module_data.FUNC_OFFSETS = getattr(module, "__pymhf_func_offsets__", {}) + module_data.FUNC_PATTERNS = getattr(module, "__pymhf_func_patterns__", {}) + module_data.FUNC_CALL_SIGS = getattr(module, "__pymhf_func_call_sigs__", {}) + + import keyboard._winkeyboard as kwk + + # Prefill the key name tables to avoid taking a hit when hooking. + kwk._setup_name_tables() import pymhf.core.caching as cache from pymhf.core.hooking import hook_manager @@ -73,7 +84,13 @@ ExecutionEndedException, custom_exception_handler, ) - from pymhf.gui.gui import GUI + + try: + from pymhf.gui.gui import GUI + except ModuleNotFoundError: + # If we can't import this, then DearPyGUI is missing, so we won't create the GUI. + GUI = None + from pymhf.utils.get_imports import get_imports asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) @@ -99,7 +116,8 @@ class ExecutingProtocol(asyncio.Protocol): """A protocol factory to be passed to a asyncio loop.create_server call which will accept requests, execute them and persist any variables to - globals().""" + globals(). + """ def connection_made(self, transport: asyncio.transports.WriteTransport): self.transport: asyncio.transports.WriteTransport = transport @@ -110,8 +128,11 @@ def connection_made(self, transport: asyncio.transports.WriteTransport): globals()["print"] = partial(builtins.print, file=self) def write(self, value: str): - """Method to allow this protocol to be used as a file to write to. - This allows us to have `print` write to this protocol.""" + """ + Method to allow this protocol to be used as a file to write to. + + This allows us to have `print` write to this protocol. + """ self.transport.write(value.encode()) def data_received(self, __data: bytes): @@ -133,8 +154,7 @@ def data_received(self, __data: bytes): self.persist_to_globals(locals()) def persist_to_globals(self, data: dict): - """Take the dictionary which was determined by calling `locals()`, and - update `gloabsl()` with it.""" + """Take the dict which was determined by calling `locals()`, and update `gloabsl()` with it.""" data.pop("self") data.pop(f"_{type(self).__name__}__data") globals().update(data) @@ -173,6 +193,10 @@ def top_globals(limit: Optional[int] = 10): binary = pymem.Pymem(_internal.EXE_NAME) cache.module_map = {x.name: x for x in pymem.process.enum_process_module(binary.process_handle)} + # Read the imports + if _internal.BINARY_PATH: + _internal.imports = get_imports(_internal.BINARY_PATH) + mod_manager = ModManager(hook_manager) # First, load our internal mod before anything else. if internal_mod_folder is not None: @@ -198,13 +222,17 @@ def top_globals(limit: Optional[int] = 10): _loaded_mods = 0 _loaded_hooks = 0 try: - if mod_folder is not None: - _loaded_mods, _loaded_hooks = mod_manager.load_mod_folder(mod_folder) + if _internal.SINGLE_FILE_MOD: + # For a single file mod, we just load that file. + _loaded_mods, _loaded_hooks = mod_manager.load_single_mod(_internal.MODULE_PATH) else: - logging.warning( - """You have not configured the `binary.mod_dir` variable in the pymhf.cfg file. - Please do so so that you can load mods.""" - ) + if mod_folder is not None: + _loaded_mods, _loaded_hooks = mod_manager.load_mod_folder(mod_folder) + else: + logging.warning( + """You have not configured the `binary.mod_dir` variable in the pymhf.cfg file. + Please do so so that you can load mods.""" + ) except Exception: logging.error(traceback.format_exc()) logging.info(f"Loaded {_loaded_mods} mods and {_loaded_hooks} hooks in {time.time() - start_time:.3f}s") @@ -223,8 +251,8 @@ def top_globals(limit: Optional[int] = 10): # logging.info("Executing protocol is ready to go!") futures = [] - if config.getboolean("gui", "shown", fallback=True): - gui = GUI(mod_manager, config) + if _internal.CONFIG.get("gui", {}).get("shown", True) and GUI is not None: + gui = GUI(mod_manager, _internal.CONFIG) # For each mod, add the corresponding tab to the gui. for mod in mod_manager.mods.values(): gui.add_tab(mod) diff --git a/pymhf/main.py b/pymhf/main.py index 81fa401..6f99309 100644 --- a/pymhf/main.py +++ b/pymhf/main.py @@ -1,6 +1,5 @@ import asyncio import concurrent.futures -import configparser import os import os.path as op import subprocess @@ -8,7 +7,7 @@ import webbrowser from functools import partial from signal import SIGTERM -from typing import Optional +from typing import Any, Optional import psutil import pymem @@ -19,8 +18,11 @@ from pymhf.core.logging import open_log_console from pymhf.core.process import start_process from pymhf.core.protocols import ESCAPE_SEQUENCE, TerminalProtocol +from pymhf.utils.config import canonicalize_setting +from pymhf.utils.parse_toml import read_pymhf_settings CWD = op.dirname(__file__) +APPDATA_DIR = os.environ.get("APPDATA", op.expanduser("~")) class WrappedProcess: @@ -111,37 +113,68 @@ def get_process_when_ready( return None, None +def load_mod_file(filepath): + """Load an individual file as a mod.""" + pymhf_settings = read_pymhf_settings(filepath, True) + _run_module(filepath, pymhf_settings, None) + + def load_module(plugin_name: str, module_path: str, is_local: bool = False): - # Parse the config file first so we can load anything we need to know. - config = configparser.ConfigParser() - # If we are not running local, then we try find the config file in the user APPDATA directory. - if not is_local: - appdata_data = os.environ.get("APPDATA", op.expanduser("~")) - cfg_folder = op.join(appdata_data, "pymhf", plugin_name) - cfg_file = op.join(cfg_folder, "pymhf.cfg") + """Load the module.""" + if is_local: + cfg_file = op.join(module_path, "pymhf.toml") else: - cfg_folder = module_path - cfg_file = op.join(module_path, "pymhf.cfg") - read = config.read(cfg_file) - if not read: - print(f"No pymhf.cfg file found in specified directory: {module_path}") - print("Cannot proceed with loading") - return + cfg_file = op.join(APPDATA_DIR, "pymhf", plugin_name, "pymhf.toml") + pymhf_config = read_pymhf_settings(cfg_file, not is_local) + _run_module(module_path, pymhf_config, plugin_name, is_local) + + +def _required_config_val(config: dict[str, str], key: str) -> Any: + if (val := config.get(key)) is not None: + return val + raise ValueError(f"[tool.pymhf] missing config value: {key}") + + +def _run_module(module_path: str, config: dict[str, str], plugin_name: Optional[str], is_local: bool = False): + """Run the module provided. + + Parameters + ---------- + module_path + The path to the module or single-file mod to be run. + config + A mapping of the associated pymhf config. + """ + single_file_mod = False + if op.isfile(module_path): + single_file_mod = True + binary_path = None - binary_exe = config.get("binary", "exe") + binary_exe = _required_config_val(config, "exe") required_assemblies = [] - if _required_assemblies := config.get("binary", "required_assemblies", fallback=""): - required_assemblies = [x.strip() for x in _required_assemblies.split(",")] - steam_gameid = config.getint("binary", "steam_gameid", fallback=0) - start_paused = config.getboolean("binary", "start_paused", fallback=False) - log_window_name_override = config.get("pymhf", "log_window_name_override", fallback="pymhf console") + required_assemblies = config.get("required_assemblies", []) + try: + steam_gameid = int(config.get("steam_gameid", 0)) + except (ValueError, TypeError): + steam_gameid = 0 + start_paused = config.get("start_paused", False) + + # Check if the module_path is a file or a folder. + _module_path = module_path + if op.isfile(module_path): + _module_path = op.dirname(module_path) + is_steam = False if steam_gameid: cmd = f"steam://rungameid/{steam_gameid}" is_steam = True else: - binary_path = config["binary"]["path"] - cmd = binary_path + if op.isdir(binary_exe): + cmd = binary_path = binary_exe + # We only need the binary_exe to be the name from here on. + binary_exe = op.basename(binary_exe) + else: + raise ValueError("[tool.pymhf].exe must be a full path if no steam_gameid is provided.") pm_binary, proc = get_process_when_ready(cmd, binary_exe, required_assemblies, is_steam, start_paused) @@ -155,8 +188,20 @@ def load_module(plugin_name: str, module_path: str, is_local: bool = False): if binary_path is None: binary_path = proc.proc.exe() + binary_dir = None + if binary_path is not None: + binary_dir = op.dirname(binary_path) + print(f"Found PID: {pm_binary.process_id}") + logging_config = config.get("logging", {}) + log_window_name_override = logging_config.get("window_name_override", "pymhf console") + log_dir = logging_config.get("log_dir", "{CURR_DIR}") + log_dir = canonicalize_setting(log_dir, plugin_name, _module_path, binary_dir, "LOGS") + + mod_save_dir = config.get("mod_save_dir", "{CURR_DIR}") + mod_save_dir = canonicalize_setting(mod_save_dir, plugin_name, _module_path, binary_dir, "MOD_SAVES") + executor = None futures = [] loop = asyncio.get_event_loop() @@ -174,7 +219,7 @@ def kill_injected_code(loop: asyncio.AbstractEventLoop): loop.run_until_complete(client_completed) try: - log_pid = open_log_console(op.join(CWD, "log_terminal.py"), module_path, log_window_name_override) + log_pid = open_log_console(op.join(CWD, "log_terminal.py"), log_dir, log_window_name_override) # Have a small nap just to give it some time. time.sleep(0.5) print(f"Opened the console log with PID: {log_pid}") @@ -211,9 +256,6 @@ def kill_injected_code(loop: asyncio.AbstractEventLoop): binary_base = offset_map[assem_name][0] binary_size = offset_map[assem_name][1] - # Inject some other dlls: - # pymem.process.inject_dll(nms.process_handle, b"path") - try: cwd = CWD.replace("\\", "\\\\") import sys @@ -231,6 +273,7 @@ def kill_injected_code(loop: asyncio.AbstractEventLoop): _preinject_shellcode = f.read() pm_binary.inject_python_shellcode(_preinject_shellcode) # Inject the common NMS variables which are required for general use. + module_path = op.realpath(module_path) module_path = module_path.replace("\\", "\\\\") pm_binary.inject_python_shellcode( f""" @@ -241,8 +284,11 @@ def kill_injected_code(loop: asyncio.AbstractEventLoop): pymhf.core._internal.PID = {pm_binary.process_id!r} pymhf.core._internal.HANDLE = {pm_binary.process_handle!r} pymhf.core._internal.BINARY_HASH = {binary_hash!r} -pymhf.core._internal.CFG_DIR = {cfg_folder!r} +pymhf.core._internal.CONFIG = {config!r} pymhf.core._internal.EXE_NAME = {binary_exe!r} +pymhf.core._internal.BINARY_PATH = {binary_path!r} +pymhf.core._internal.SINGLE_FILE_MOD = {single_file_mod!r} +pymhf.core._internal.MOD_SAVE_DIR = {mod_save_dir!r} """ ) except Exception as e: diff --git a/pymhf/utils/config.py b/pymhf/utils/config.py new file mode 100644 index 0000000..8165976 --- /dev/null +++ b/pymhf/utils/config.py @@ -0,0 +1,70 @@ +import os +import os.path as op +import re +from typing import Optional + +PATH_RE = re.compile(r"^\{(?PEXE_DIR|USER_DIR|CURR_DIR)\}(?P[^{}]*)$") + + +def canonicalize_setting( + value: Optional[str], + plugin_name: Optional[str], + module_dir: str, + exe_dir: str, + suffix: Optional[str] = None, +) -> str: + """Convert the "magic" names into real values. + + Possible keys: + - EXE_DIR + - USER_DIR / "~" + - CURR_DIR / "." + """ + + # This can receive None as the value. + # In this case we simply return as we don't want to do anything with it. + if value is None: + return None + + # Parse the value to determine what directory type we are asking for. + tag = None + rest = tuple() + if (m := re.match(PATH_RE, value)) is not None: + tag = m["tag"] + if m["rest"]: + rest = op.split(m["rest"].strip("/\\")) + else: + if value == ".": + tag = "CURR_DIR" + elif value == "~": + tag = "USER_DIR" + else: + raise ValueError("Invalid path") + + # If the path provided already exists, simply return it. + if tag is None and op.exists(value): + return value + + if suffix is None: + _suffix = rest + else: + _suffix = rest + (suffix,) + + if tag == "USER_DIR": + appdata_data = os.environ.get("APPDATA", op.expanduser("~")) + if appdata_data == "~": + # In this case the APPDATA environment variable isn't set and ~ also fails to resolve. + # Raise a error and stop. + print("Critical Error: Cannot find user directory. Ensure APPDATA environment variable is set") + exit() + if plugin_name is not None: + return op.realpath(op.join(appdata_data, "pymhf", plugin_name, *_suffix)) + else: + raise ValueError("{USER_DIR} cannot be used for single-file mods.") + elif tag == "EXE_DIR": + if exe_dir: + return op.realpath(op.join(exe_dir, *_suffix)) + else: + raise ValueError("Exe directory cannot be determined") + elif tag == "CURR_DIR": + return op.realpath(op.join(module_dir, *_suffix)) diff --git a/pymhf/utils/get_imports.py b/pymhf/utils/get_imports.py new file mode 100644 index 0000000..f66e31f --- /dev/null +++ b/pymhf/utils/get_imports.py @@ -0,0 +1,52 @@ +import ctypes +import os.path as op +from logging import getLogger + +import pefile + +imports = {} + +logger = getLogger(__name__) + + +def get_imports(binary_path: str) -> dict: + directory, binary = op.split(binary_path) + pe = pefile.PE(op.join(directory, binary), fast_load=True) + pe.parse_data_directories(directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"]]) + for entry in pe.DIRECTORY_ENTRY_IMPORT: + dll_name: str = entry.dll.decode() + if dll_name.lower().endswith(".dll"): + dll_name = dll_name.lower()[:-4] + dll_imports = [] + for imp in entry.imports: + if imp.name: + dll_imports.append(imp.name.decode()) + imports[dll_name] = dll_imports + + funcptrs = {} + for _dll, dll_imports in imports.items(): + try: + dll = ctypes.WinDLL(_dll) + except FileNotFoundError: + fullpath = op.join(directory, _dll) + try: + dll = ctypes.WinDLL(fullpath) + except FileNotFoundError: + logger.error(f"Cannot find dll {_dll}") + continue + _funcptrs = {} + for name in dll_imports: + func_ptr = getattr(dll, name, None) + if func_ptr is None: + logger.error(f"Cannot find function {_dll}.{name}") + continue + _funcptrs[name] = func_ptr + funcptrs[_dll] = _funcptrs + + return funcptrs + + +if __name__ == "__main__": + BINARY_PATH = "C:\\Games\\No Man's Sky\\Binaries\\NMS.exe" + ptrs = get_imports(BINARY_PATH) + print(ptrs["user32"]) diff --git a/pymhf/utils/parse_toml.py b/pymhf/utils/parse_toml.py new file mode 100644 index 0000000..d5272bc --- /dev/null +++ b/pymhf/utils/parse_toml.py @@ -0,0 +1,44 @@ +import re +from typing import Optional + +import tomlkit + +REGEX = r"(?m)^# /// (?P[a-zA-Z0-9-]+)$\s(?P(^#(| .*)$\s)+)^# ///$" + + +def read_inline_metadata(script: str) -> Optional[tomlkit.TOMLDocument]: + """Read a file and extract the toml info contained in the script if there is one. + This is taken directly from the reference implementation in https://peps.python.org/pep-0723/ + and modified to use `tomlkit` to ensure compatibility for python 3.9+ + """ + name = "script" + matches = list(filter(lambda m: m.group("type") == name, re.finditer(REGEX, script))) + if len(matches) > 1: + raise ValueError(f"Multiple {name} blocks found") + elif len(matches) == 1: + content = "".join( + line[2:] if line.startswith("# ") else line[1:] + for line in matches[0].group("content").splitlines(keepends=True) + ) + return tomlkit.parse(content) + else: + return None + + +def read_pymhf_settings(fpath: str, standalone: bool = False) -> dict: + with open(fpath, "r") as f: + if standalone: + settings = read_inline_metadata(f.read()) + else: + settings = tomlkit.loads(f.read()) + if not settings: + return {} + if standalone: + return settings.get("tool", {}).get("pymhf", {}) + else: + return settings.get("pymhf", {}) + + +def write_pymhf_settings(settings: dict, fpath: str): + with open(fpath, "w") as f: + tomlkit.dump(settings, f) diff --git a/pyproject.toml b/pyproject.toml index ecb3616..6630f69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,13 +27,19 @@ dependencies = [ "pymem[speed]~=1.12.0", "keyboard", "pywin32", - "dearpygui~=1.11.0", "questionary", "pywinctl", "packaging", + "tomlkit", + "pefile", ] dynamic = ["version"] +[project.optional-dependencies] +gui = [ + "dearpygui~=1.11.0", +] + [tool.setuptools.package-dir] pymhf = "pymhf" @@ -54,6 +60,12 @@ line-length = 110 [tool.ruff.lint] select = ["E", "F", "I"] preview = true +ignore = [ + "D100", # Missing docstring in public module +] + +[tool.ruff.lint.pydocstyle] +convention = "numpy" [tool.ruff.lint.extend-per-file-ignores] "cpptypes.py" = ["E501"] diff --git a/uv.lock b/uv.lock index 7e61910..5efeb26 100644 --- a/uv.lock +++ b/uv.lock @@ -408,6 +408,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/aa/cc0199a5f0ad350994d660967a8efb233fe0416e4639146c089643407ce6/packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124", size = 53985 }, ] +[[package]] +name = "pefile" +version = "2024.8.26" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/03/4f/2750f7f6f025a1507cd3b7218691671eecfd0bbebebe8b39aa0fe1d360b8/pefile-2024.8.26.tar.gz", hash = "sha256:3ff6c5d8b43e8c37bb6e6dd5085658d658a7a0bdcd20b6a07b1fcfc1c4e9d632", size = 76008 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/54/16/12b82f791c7f50ddec566873d5bdd245baa1491bac11d15ffb98aecc8f8b/pefile-2024.8.26-py3-none-any.whl", hash = "sha256:76f8b485dcd3b1bb8166f1128d395fa3d87af26360c2358fb75b80019b957c6f", size = 74766 }, +] + [[package]] name = "pkginfo" version = "1.10.0" @@ -444,8 +453,6 @@ version = "5.9.8" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/90/c7/6dc0a455d111f68ee43f27793971cf03fe29b6ef972042549db29eec39a2/psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c", size = 503247 } wheels = [ - { url = "https://files.pythonhosted.org/packages/fe/5f/c26deb822fd3daf8fde4bdb658bf87d9ab1ffd3fca483816e89a9a9a9084/psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e", size = 248660 }, - { url = "https://files.pythonhosted.org/packages/32/1d/cf66073d74d6146187e2d0081a7616df4437214afa294ee4f16f80a2f96a/psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631", size = 251966 }, { url = "https://files.pythonhosted.org/packages/e7/e3/07ae864a636d70a8a6f58da27cb1179192f1140d5d1da10886ade9405797/psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81", size = 248702 }, { url = "https://files.pythonhosted.org/packages/b3/bd/28c5f553667116b2598b9cc55908ec435cb7f77a34f2bff3e3ca765b0f78/psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421", size = 285242 }, { url = "https://files.pythonhosted.org/packages/c5/4f/0e22aaa246f96d6ac87fe5ebb9c5a693fbe8877f537a1022527c47ca43c5/psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4", size = 288191 }, @@ -488,18 +495,24 @@ speed = [ [[package]] name = "pymhf" -version = "0.1.7.dev15" +version = "0.1.8.dev4" source = { editable = "." } dependencies = [ { name = "cyminhook" }, - { name = "dearpygui" }, { name = "keyboard" }, { name = "packaging" }, + { name = "pefile" }, { name = "psutil" }, { name = "pymem", extra = ["speed"] }, { name = "pywin32" }, { name = "pywinctl" }, { name = "questionary" }, + { name = "tomlkit" }, +] + +[package.optional-dependencies] +gui = [ + { name = "dearpygui" }, ] [package.dev-dependencies] @@ -513,14 +526,16 @@ dev = [ [package.metadata] requires-dist = [ { name = "cyminhook", specifier = ">=0.1.4" }, - { name = "dearpygui", specifier = "~=1.11.0" }, + { name = "dearpygui", marker = "extra == 'gui'", specifier = "~=1.11.0" }, { name = "keyboard" }, { name = "packaging" }, + { name = "pefile" }, { name = "psutil", specifier = "~=5.9.5" }, { name = "pymem", extras = ["speed"], specifier = "~=1.12.0" }, { name = "pywin32" }, { name = "pywinctl" }, { name = "questionary" }, + { name = "tomlkit" }, ] [package.metadata.requires-dev] @@ -3360,6 +3375,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/97/75/10a9ebee3fd790d20926a90a2547f0bf78f371b2f13aa822c759680ca7b9/tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc", size = 12757 }, ] +[[package]] +name = "tomlkit" +version = "0.13.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b1/09/a439bec5888f00a54b8b9f05fa94d7f901d6735ef4e55dcec9bc37b5d8fa/tomlkit-0.13.2.tar.gz", hash = "sha256:fff5fe59a87295b278abd31bec92c15d9bc4a06885ab12bcea52c71119392e79", size = 192885 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/b6/a447b5e4ec71e13871be01ba81f5dfc9d0af7e473da256ff46bc0e24026f/tomlkit-0.13.2-py3-none-any.whl", hash = "sha256:7a974427f6e119197f670fbbbeae7bef749a6c14e793db934baefc1b5f03efde", size = 37955 }, +] + [[package]] name = "twine" version = "5.1.1"