diff --git a/nonebot_desktop_wing/__init__.py b/nonebot_desktop_wing/__init__.py index 959595c..31b7ce7 100644 --- a/nonebot_desktop_wing/__init__.py +++ b/nonebot_desktop_wing/__init__.py @@ -1,11 +1,6 @@ from .constants import PYPI_MIRRORS as PYPI_MIRRORS -from .hindsight import BackgroundObject as BackgroundObject -from .lazylib import meta as meta -from .molecules import ( - import_with_lock as import_with_lock, +from .utils import ( list_paginate as list_paginate, - exec_new_win as exec_new_win, - open_new_win as open_new_win, system_open as system_open, perform_pip_command as perform_pip_command, perform_pip_install as perform_pip_install, @@ -13,7 +8,7 @@ ) from .project import ( find_python as find_python, - distributions as distributions, + _distributions as _distributions, getdist as getdist, create as create, get_builtin_plugins as get_builtin_plugins, diff --git a/nonebot_desktop_wing/constants.py b/nonebot_desktop_wing/constants.py index fb55a08..00940cd 100644 --- a/nonebot_desktop_wing/constants.py +++ b/nonebot_desktop_wing/constants.py @@ -1,8 +1,7 @@ import os import sys -from typing import List, Tuple -PYPI_MIRRORS: List[str] = [ +PYPI_MIRRORS: list[str] = [ "https://pypi.org/simple", "https://pypi.doubanio.com/simple", "https://mirrors.163.com/pypi/simple", @@ -14,8 +13,5 @@ ] """PyPI mirror lists, including official index, mainly for Chinese (Mainland) users.""" -LINUX_TERMINALS: Tuple[str, ...] = ("gnome-terminal", "konsole", "xfce4-terminal", "xterm", "st") -"""Some terminal emulators on Linux for choosing.""" - WINDOWS: bool = sys.platform.startswith("win") or (sys.platform == "cli" and os.name == "nt") """Windows platform identifier.""" \ No newline at end of file diff --git a/nonebot_desktop_wing/hindsight.py b/nonebot_desktop_wing/hindsight.py deleted file mode 100644 index 78de770..0000000 --- a/nonebot_desktop_wing/hindsight.py +++ /dev/null @@ -1,64 +0,0 @@ -from threading import Thread -from typing import IO, Callable, Generic, TypeVar -from typing_extensions import ParamSpec - -T = TypeVar("T") -AnyStr_T = TypeVar("AnyStr_T", str, bytes) -P = ParamSpec("P") - - -class BackgroundObject(Generic[P, T]): - """A descriptor for running functions in background.""" - def __init__( - self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs - ) -> None: - """ - Initialize (start) a call in background. - - - func: `(P...) -> T` - A callable to be called in background. - - *args: `P.args` - Positional args for the callable. - - **kwargs: `P.kwargs` - Keyword args for the callable. - """ - self._func = func - self._args = args - self._kwds = kwargs - self._thread = Thread(None, self._work, None, args, kwargs) - self._thread.start() - # bgobject_log_func( - # f"[BackgroundObject] '{func.__module__}.{func.__name__}' is " - # f"running in background with {args=}, {kwargs=}" - # ) - - def __get__(self, obj, objtype=None) -> T: - return self.get() - - def _work(self, *args: P.args, **kwargs: P.kwargs) -> None: - self._value = self._func(*args, **kwargs) - # bgobject_log_func( - # f"[BackgroundObject] '{self._func.__module__}." - # f"{self._func.__name__}' is done with {args=}, {kwargs=}" - # ) - - def get(self) -> T: - """Wait for value.""" - self._thread.join() - if not hasattr(self, "_value"): - self._work(*self._args, **self._kwds) - return self._value - - -class RealtimeIOPipe(Generic[AnyStr_T]): - def __init__( - self, stream: IO[AnyStr_T], writer: Callable[[AnyStr_T], object] - ) -> None: - self.stream = stream - self.thread = Thread(target=self._pull) - self.writer = writer - self.thread.start() - - def _pull(self) -> None: - while True: - try: - self.writer(self.stream.read(1)) - except ValueError: - break \ No newline at end of file diff --git a/nonebot_desktop_wing/lazylib.py b/nonebot_desktop_wing/lazylib.py deleted file mode 100644 index 76e7c51..0000000 --- a/nonebot_desktop_wing/lazylib.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -from threading import Thread -from nonebot_desktop_wing.hindsight import BackgroundObject -from nonebot_desktop_wing.molecules import import_with_lock -from nonebot_desktop_wing.resources import load_module_data_raw - - -class _nb_cli: - __singleton = None - - def __new__(cls): - if cls.__singleton is None: - cls.handlers = BackgroundObject(import_with_lock, "nb_cli.handlers", "*") - cls.config = BackgroundObject(import_with_lock, "nb_cli.config", "*") - cls.__singleton = object.__new__(cls) - return cls.__singleton - - -nb_cli = _nb_cli - -Thread(target=_nb_cli).start() - - -class _meta: - __singleton = None - - def __new__(cls): - if cls.__singleton is None: - # load in __new__ to avoid lagging when getting `nb_cli.handlers` - cls.drivers = BackgroundObject(asyncio.run, nb_cli().handlers.load_module_data("driver")) - cls.adapters = BackgroundObject(asyncio.run, nb_cli().handlers.load_module_data("adapter")) - cls.plugins = BackgroundObject(asyncio.run, nb_cli().handlers.load_module_data("plugin")) - cls.raw_drivers = BackgroundObject(load_module_data_raw, "drivers") - cls.raw_adapters = BackgroundObject(load_module_data_raw, "adapters") - cls.raw_plugins = BackgroundObject(load_module_data_raw, "plugins") - cls.__singleton = object.__new__(cls) - return cls.__singleton - - -meta = _meta - -Thread(target=_meta).start() \ No newline at end of file diff --git a/nonebot_desktop_wing/lazylib.pyi b/nonebot_desktop_wing/lazylib.pyi deleted file mode 100644 index 2e32e5a..0000000 --- a/nonebot_desktop_wing/lazylib.pyi +++ /dev/null @@ -1,37 +0,0 @@ -from typing import List, Type -from nb_cli import handlers as _handlers -from nb_cli import config as _config -from nb_cli.config import Driver, Plugin, Adapter - - -class _nb_cli: - handlers = _handlers - config = _config - - -nb_cli: Type[_nb_cli] - -RawInfo = dict[ - { - "module_name": str, - "project_link": str, - "name": str, - "desc": str, - "author": str, - "homepage": str, - "tags": List[dict[{"label": str, "color": str}]], - "is_official": bool - } -] - - -class _meta: - drivers: List[Driver] - adapters: List[Adapter] - plugins: List[Plugin] - raw_drivers: List[RawInfo] - raw_adapters: List[RawInfo] - raw_plugins: List[RawInfo] - - -meta: Type[_meta] \ No newline at end of file diff --git a/nonebot_desktop_wing/models.py b/nonebot_desktop_wing/models.py new file mode 100644 index 0000000..1494f55 --- /dev/null +++ b/nonebot_desktop_wing/models.py @@ -0,0 +1,59 @@ +from datetime import datetime +from pydantic import BaseModel + + +class ColoredTag(BaseModel): + """Tag with color""" + + label: str + """Tag content""" + + color: str + """Tag color""" + + +class NoneBotCommonInfo(BaseModel): + """Common info for drivers, adapters and plugins""" + + module_name: str + """Name for importing""" + + project_link: str + """Name for downloading from PyPI""" + + name: str + """Human-readable name""" + + desc: str + """Description""" + + author: str + """Author""" + + homepage: str + """Project homepage""" + + tags: list[ColoredTag] + """Tags""" + + is_official: bool + """Whether an extension is official""" + + +class NoneBotPluginInfo(NoneBotCommonInfo): + """Plugin info model""" + + # Some plugins are still not prepared for these metadata, so `None` is still needed + # for now, until the registry is ready for all plugins. + + type: str | None + """Plugin category""" + + supported_adapters: list[str] | None + """Supported adapters, `None` for all adapters""" + + valid: bool + """Plugin load test result""" + + time: datetime + """Plugin load test time""" diff --git a/nonebot_desktop_wing/molecules.py b/nonebot_desktop_wing/molecules.py deleted file mode 100644 index 68e1fd9..0000000 --- a/nonebot_desktop_wing/molecules.py +++ /dev/null @@ -1,298 +0,0 @@ -from __future__ import annotations -import os -from pathlib import Path -from shutil import which -import subprocess -from tempfile import mkstemp -from threading import Lock -from types import ModuleType -from typing import ( - Iterable, List, Literal, Optional, Tuple, TypeVar, Union, overload -) - -from nonebot_desktop_wing.constants import LINUX_TERMINALS, WINDOWS - -_import_lock = Lock() -T = TypeVar("T") - - -def anojoin(args: Iterable[str]) -> str: - """Like `shlex.join`, but uses dquote (`"`) instead of squote (`'`).""" - dq, cdq = "\"", "\\\"" - return " ".join([f"\"{s.replace(dq, cdq)}\"" for s in args]) - - -def import_with_lock( - name: str, - package: Optional[str] = None -) -> ModuleType: - """Like `importlib.import_module(...)`, but using a lock to avoid - conflicts when importing package. - """ - from importlib import import_module - with _import_lock: - return import_module(name, package) - - -def list_paginate(lst: List[T], sz: int) -> List[List[T]]: - """ - Cut a list to lists whose length are equal-to-or-less-than a specified - size. - - - lst: `List[T]` - a list to be cut. - - sz: `int` - max length for cut lists. - - - return: `List[List[T]]` - """ - return [lst[st:st + sz] for st in range(0, len(lst), sz)] - - -def get_pause_cmd() -> str: - """Get pause command on users' platforms.""" - if WINDOWS: - return "pause" - return "read -n1 -p 进程已结束,按任意键关闭。" - - -def get_terminal_starter() -> Tuple[str, ...]: - """Get args for opening a new window, purposed for executing scripts.""" - if WINDOWS: - return ("start", "cmd.exe", "/c") - for te in LINUX_TERMINALS: - if which(te) is not None: - return (te, "-e") - raise FileNotFoundError("no terminal emulator found") - - -def get_terminal_starter_pure() -> Tuple[str, ...]: - """Get args for opening a new window, only for opening a new window.""" - if WINDOWS: - return ("start", "cmd.exe") - for te in LINUX_TERMINALS: - if which(te) is not None: - return (te,) - raise FileNotFoundError("no terminal emulator found") - - -def gen_run_script( - cmd: str, cwd: Union[str, Path, None] = None, activate_venv: bool = False -) -> str: - """ - Generate executable scripts, for running commands in a new window. - - - cmd: `str` - commands to be executed. - - cwd: `Union[str, Path, None]` - the first work directory to be set. - - activate_venv: `bool` - whether to attempt to find and activate - the venv in `cwd`. - - - return: `str` - temp script path. - """ - fd, fp = mkstemp(".bat" if WINDOWS else ".sh", "nbdesktop-") - if not WINDOWS: - os.chmod(fd, 0o755) - with open(fd, "w") as f: - if not WINDOWS: - f.write(f"#!/usr/bin/env bash\n") - - if cwd is not None: - pcwd = Path(cwd) - if activate_venv and (pcwd / ".venv").exists(): - if WINDOWS: - f.write( - f"{pcwd / '.venv' / 'Scripts' / 'activate.bat'}\n" - ) - else: - f.write(f"source {pcwd / '.venv' / 'bin' / 'activate'}\n") - - if WINDOWS: - # change drive first in cmd.exe - f.write(f"{pcwd.drive}\n") - f.write(f"cd \"{cwd}\"\n") - f.write(f"{cmd}\n") - f.write(f"{get_pause_cmd()}\n") - return fp - - -def exec_nowin( - cmd: str, cwd: Union[str, Path, None] = None, - *, catch_output: bool = True -) -> Tuple[subprocess.Popen[bytes], str]: - """ - Execute commands in a subprocess. - - - cmd: `str` - commands to be executed. - - cwd: `Union[str, Path, None]` - the first work directory to be set. - - catch_output: `bool` - whether to catch output stdout and - stderr. - - - return: `(Popen[bytes], str)` - the process running commands and temp - script path. - """ - sname = gen_run_script(cmd, cwd) - return subprocess.Popen( - anojoin((*get_terminal_starter(), sname)), shell=True, - stdout=subprocess.PIPE if catch_output else None, - stderr=subprocess.STDOUT if catch_output else None - ), sname - - -def exec_new_win( - cmd: str, cwd: Union[str, Path, None] = None -) -> Tuple[subprocess.Popen[bytes], str]: - """ - Execute commands in a new window. - - - cmd: `str` - commands to be executed. - - cwd: `Union[str, Path, None]` - the first work directory to be set. - - - return: `(Popen[bytes], str)` - the process running commands and temp - script path. - """ - sname = gen_run_script(cmd, cwd) - return subprocess.Popen( - anojoin((*get_terminal_starter(), sname)), shell=True, - ), sname - - -def open_new_win( - cwd: Union[str, Path, None] = None -) -> subprocess.Popen[bytes]: - """ - Open a new terminal window. - - - cwd: `Union[str, Path, None]` - the first work directory to be set. - - - return: `Popen[bytes]` - the process running new window. - """ - return subprocess.Popen( - anojoin(get_terminal_starter_pure()), shell=True, cwd=cwd - ) - - -def system_open( - fp: Union[str, Path], *, catch_output: bool = False -) -> subprocess.Popen[bytes]: - """ - Use system applications to open a file path or URI. - - - fp: `Union[str, Path]` - a file path or URI. - - catch_output: `bool` - whether to catch output stdout and stderr. - - - return: `Popen[bytes]` - the process running external applications. - """ - return subprocess.Popen( - anojoin(("start" if WINDOWS else "xdg-open", str(fp))), shell=True, - stdout=subprocess.PIPE if catch_output else None, - stderr=subprocess.STDOUT if catch_output else None - ) - - -@overload -def perform_pip_command( - pyexec: str, command: str, *args: str, - new_win: Literal[False] = False, catch_output: bool = False -) -> subprocess.Popen[bytes]: - ... - - -@overload -def perform_pip_command( - pyexec: str, command: str, *args: str, new_win: Literal[True] = True -) -> Tuple[subprocess.Popen[bytes], str]: - ... - - -def perform_pip_command( - pyexec: str, command: str, *args: str, - new_win: bool = False, catch_output: bool = False -) -> Union[subprocess.Popen[bytes], Tuple[subprocess.Popen[bytes], str]]: - """ - Run pip commands. - - - pyexec: `str` - path to python executable. - - command: `str` - pip command. - - *args: `str` - args after pip command. - - new_win: `bool` - whether to open a new terminal window. - - catch_output: `bool` - whether to catch output stdout and stderr. - - - return: - the process running commands (and temp script - path if `new_win` is set `True`). - - `Popen[bytes] if new_win == False` - - `(Popen[bytes], str) if new_win == True` - """ - cmd = [pyexec, "-m", "pip", command, *args] - if not new_win: - return subprocess.Popen( - cmd, - stdout=subprocess.PIPE if catch_output else None, - stderr=subprocess.STDOUT if catch_output else None - ) - return exec_new_win(anojoin(cmd)) - - -@overload -def perform_pip_install( - pyexec: str, *packages: str, update: bool = False, index: str = "", - new_win: Literal[False] = False, catch_output: bool = False -) -> subprocess.Popen[bytes]: - ... - - -@overload -def perform_pip_install( - pyexec: str, *packages: str, update: bool = False, index: str = "", - new_win: Literal[True] = True, catch_output: bool = False -) -> Tuple[subprocess.Popen[bytes], str]: - ... - - -def perform_pip_install( - pyexec: str, *packages: str, update: bool = False, index: str = "", - new_win: bool = False, catch_output: bool = False -) -> Union[subprocess.Popen[bytes], Tuple[subprocess.Popen[bytes], str]]: - """ - Run pip install. - - - pyexec: `str` - path to python executable. - - *packages: `str` - packages to be installed. - - index: `str` - index for downloading. - - new_win: `bool` - whether to open a new terminal window. - - catch_output: `bool` - whether to catch output stdout and stderr. - - - return: - the process running commands (and temp script - path if `new_win` is set `True`). - - `Popen[bytes] if new_win == False` - - `(Popen[bytes], str) if new_win == True` - """ - args = (*packages,) - if update: - args += ("-U",) - if index: - args += ("-i", index) - return perform_pip_command( - pyexec, "install", *args, - new_win=new_win, # type: ignore - catch_output=catch_output - ) - - -def rrggbb_bg2fg(color: str) -> Literal['#000000', '#ffffff']: - """ - Convert hex color code background to black or white. - - - color: `str` - color code with the shape of '#rrggbb' - - - return: `str` - converted color code (`'#000000'` or `'#ffffff'`) - """ - c_int = int(color[1:], base=16) - # Formula for choosing color: - # 0.2126 × R + 0.7152 × G + 0.0722 × B > 0.5 - # => bright color ==> use opposite dark - c_bgr: List[int] = [] - for _ in range(3): - c_bgr.append(c_int & 0xff) - c_int >>= 8 - b, g, r = (x / 255 for x in c_bgr) - return ( - "#000000" if 0.2126 * r + 0.7152 * g + 0.0722 * b > 0.5 else "#ffffff" - ) \ No newline at end of file diff --git a/nonebot_desktop_wing/project.py b/nonebot_desktop_wing/project.py index f12f906..b588093 100644 --- a/nonebot_desktop_wing/project.py +++ b/nonebot_desktop_wing/project.py @@ -1,24 +1,29 @@ -from __future__ import annotations import asyncio from pathlib import Path import sys -from typing import ( - TYPE_CHECKING, Iterable, List, Literal, Optional, Tuple, Union, overload -) +from typing import TYPE_CHECKING, Iterable from dotenv.main import DotEnv -from nonebot_desktop_wing.constants import WINDOWS -from nonebot_desktop_wing.lazylib import nb_cli -from nonebot_desktop_wing.molecules import perform_pip_install +from .constants import WINDOWS +from .models import NoneBotCommonInfo +from .utils import perform_pip_install if TYPE_CHECKING: + import nb_cli + import nb_cli.config + from nb_cli.config import ConfigManager from importlib.metadata import Distribution - from nb_cli.config import Driver, Adapter, ConfigManager from subprocess import Popen -def find_python(fp: Union[str, Path]) -> str: +def setup_nbcli(): + from importlib import import_module + globals()["nb_cli"] = import_module("nb_cli") + import_module("nb_cli.config") + + +def find_python(fp: str | Path) -> str: """Find a python executable in a directory.""" pfp = Path(fp) veexec = ( @@ -29,65 +34,38 @@ def find_python(fp: Union[str, Path]) -> str: return str(veexec) if veexec.exists() else sys.executable -def distributions(*fp: str) -> Iterable[Distribution]: +def _distributions(*fp: str) -> Iterable["Distribution"]: from importlib import metadata if fp: return metadata.distributions(path=list(fp)) return metadata.distributions() -def getdist(root: Union[str, Path]) -> Iterable[Distribution]: +def getdist(root: str | Path) -> Iterable["Distribution"]: """Get packages installed in a directory.""" return ( - distributions( + _distributions( *(str(si) for si in Path(root).glob(".venv/**/site-packages")) ) ) -@overload def create( fp: str, - drivers: List[Driver], - adapters: List[Adapter], + drivers: list[NoneBotCommonInfo], + adapters: list[NoneBotCommonInfo], dev: bool, usevenv: bool, - index: Optional[str] = None, - new_win: Literal[False] = False, - catch_output: bool = False -) -> Popen[bytes]: - ... - - -@overload -def create( - fp: str, - drivers: List[Driver], - adapters: List[Adapter], - dev: bool, - usevenv: bool, - index: Optional[str] = None, - new_win: Literal[True] = True -) -> Tuple[Popen[bytes], str]: - ... - - -def create( - fp: str, - drivers: List[Driver], - adapters: List[Adapter], - dev: bool, - usevenv: bool, - index: Optional[str] = None, + index: str | None = None, new_win: bool = False, catch_output: bool = False -) -> Union[Popen[bytes], Tuple[Popen[bytes], str]]: +) -> Popen[bytes]: """ Create a new NoneBot project. - fp: `str` - path to target project (must be empty or not exist) - - drivers: `List[Driver]` - drivers to be installed. + - drivers: `list[Driver]` - drivers to be installed. - adapters: `List[Adapter]` - adapters to be installed. - dev: `bool` - whether to use a profile for developing plugins. @@ -135,29 +113,29 @@ def create( ) -def get_builtin_plugins(pypath: str) -> List[str]: +def get_builtin_plugins(pypath: str) -> list[str]: """Get built-in plugins, using python in an environment.""" return asyncio.run( nb_cli.handlers.list_builtin_plugins(python_path=pypath) ) -def find_env_file(fp: Union[str, Path]) -> List[str]: +def find_env_file(fp: str | Path) -> list[str]: """Find all dotenv files in a directory.""" return [p.name for p in Path(fp).glob(".env*")] -def get_env_config(ep: Union[str, Path], config: str) -> Optional[str]: +def get_env_config(ep: str | Path, config: str) -> str | None: return DotEnv(ep).get(config) def recursive_find_env_config( - fp: Union[str, Path], config: str -) -> Optional[str]: + fp: str | Path, config: str +) -> str | None: """ Recursively find a config in dotenv files. - - fp: `Union[str, Path]` - project directory. + - fp: `str | Path` - project directory. - config: `str` - config string. - return: `Optional[str]` - value of the config. @@ -177,12 +155,12 @@ def recursive_find_env_config( def recursive_update_env_config( - fp: Union[str, Path], config: str, value: str + fp: str | Path, config: str, value: str ) -> None: """ Recursively edit a config in dotenv files. - - fp: `Union[str, Path]` - project directory. + - fp: `str | Path` - project directory. - config: `str` - config string. - value: `str` - new value of the config. """ @@ -210,7 +188,10 @@ def recursive_update_env_config( f.writelines(f"{k}={v}\n" for k, v in useenv.items() if k and v) -def get_toml_config(basedir: Union[str, Path]) -> ConfigManager: +def get_config_manager(basedir: str | Path) -> "ConfigManager": """Get project TOML manager for a project.""" basepath = Path(basedir) - return nb_cli.config.ConfigManager(find_python(basepath), basepath / "pyproject.toml") \ No newline at end of file + return nb_cli.config.ConfigManager( + working_dir=basepath, + python_path=find_python(basepath) + ) \ No newline at end of file diff --git a/nonebot_desktop_wing/resources.py b/nonebot_desktop_wing/resources.py index dfec083..301120a 100644 --- a/nonebot_desktop_wing/resources.py +++ b/nonebot_desktop_wing/resources.py @@ -1,19 +1,27 @@ -from functools import lru_cache -from typing import Any, Dict, List, Literal +from functools import cache +from typing import Any, Literal +from nonebot_desktop_wing.models import NoneBotCommonInfo, NoneBotPluginInfo -@lru_cache +drivers: list[NoneBotCommonInfo] +adapters: list[NoneBotCommonInfo] +plugins: list[NoneBotPluginInfo] + + +@cache def load_module_data_raw( module_name: Literal["adapters", "plugins", "drivers"] -) -> List[Dict[str, Any]]: +) -> list[dict[str, Any]]: """Get raw module data.""" from concurrent.futures import ThreadPoolExecutor, as_completed import httpx - exceptions: List[Exception] = [] + exceptions: list[Exception] = [] urls = [ - f"https://v2.nonebot.dev/{module_name}.json", - f"https://raw.fastgit.org/nonebot/nonebot2/master/website/static/{module_name}.json", - f"https://cdn.jsdelivr.net/gh/nonebot/nonebot2/website/static/{module_name}.json", + f"https://registry.nonebot.dev/{module_name}.json", + f"https://cdn.jsdelivr.net/gh/nonebot/registry@results/{module_name}.json", + f"https://cdn.staticaly.com/gh/nonebot/registry@results/{module_name}.json", + f"https://jsd.cdn.zzko.cn/gh/nonebot/registry@results/{module_name}.json", + f"https://ghproxy.com/https://raw.githubusercontent.com/nonebot/registry/results/{module_name}.json", ] with ThreadPoolExecutor(max_workers=5) as executor: tasks = [executor.submit(httpx.get, url) for url in urls] @@ -25,4 +33,12 @@ def load_module_data_raw( except Exception as e: exceptions.append(e) - raise Exception("Download failed", exceptions) \ No newline at end of file + raise Exception("Download failed", exceptions) + + +def init_resources() -> None: + """Initialize index resources (drivers, adapters, and plugins).""" + global drivers, adapters, plugins + drivers = [NoneBotCommonInfo.parse_obj(u) for u in load_module_data_raw("drivers")] + adapters = [NoneBotCommonInfo.parse_obj(u) for u in load_module_data_raw("adapters")] + plugins = [NoneBotPluginInfo.parse_obj(u) for u in load_module_data_raw("plugins")] \ No newline at end of file diff --git a/nonebot_desktop_wing/utils.py b/nonebot_desktop_wing/utils.py new file mode 100644 index 0000000..32ef713 --- /dev/null +++ b/nonebot_desktop_wing/utils.py @@ -0,0 +1,119 @@ +from pathlib import Path +import subprocess +from typing import Iterable, Literal, TypeVar + +from .constants import WINDOWS + +T = TypeVar("T") + + +def anojoin(args: Iterable[str]) -> str: + """Like `shlex.join`, but uses dquote (`"`) instead of squote (`'`).""" + dq, cdq = "\"", "\\\"" + return " ".join([f"\"{s.replace(dq, cdq)}\"" for s in args]) + + +def list_paginate(lst: list[T], sz: int) -> list[list[T]]: + """ + Cut a list to lists whose length are equal-to-or-less-than a specified + size. + + - lst: `List[T]` - a list to be cut. + - sz: `int` - max length for cut lists. + + - return: `List[List[T]]` + """ + return [lst[st:st + sz] for st in range(0, len(lst), sz)] + + +def system_open(fp: str | Path) -> subprocess.Popen[bytes]: + """ + Use system applications to open a file path or URI. + + - fp: `Union[str, Path]` - a file path or URI. + - catch_output: `bool` - whether to catch output stdout and stderr. + + - return: `Popen[bytes]` - the process running external applications. + """ + return subprocess.Popen( + anojoin(("start" if WINDOWS else "xdg-open", str(fp))), shell=True + ) + + +def perform_pip_command( + pyexec: str, command: str, *args: str, + nt_new_win: bool = False, catch_output: bool = False +) -> subprocess.Popen[bytes]: + """ + Run pip commands. + + - pyexec: `str` - path to python executable. + - command: `str` - pip command. + - *args: `str` - args after pip command. + - nt_new_win: `bool` - whether to open a new terminal window (nt + only). + - catch_output: `bool` - whether to catch output stdout and stderr. + + - return: `Popen[bytes]` - the process running commands. + """ + cmd = [pyexec, "-m", "pip", command, *args] + _flag = 0 + if nt_new_win and WINDOWS: + _flag = subprocess.CREATE_NEW_CONSOLE + return subprocess.Popen( + cmd, + stdout=subprocess.PIPE if catch_output else None, + stderr=subprocess.STDOUT if catch_output else None, + creationflags=_flag + ) + + +def perform_pip_install( + pyexec: str, *packages: str, update: bool = False, index: str = "", + nt_new_win: bool = False, catch_output: bool = False +) -> subprocess.Popen[bytes]: + """ + Run pip install. + + - pyexec: `str` - path to python executable. + - *packages: `str` - packages to be installed. + - update: `bool` - whether to update packages. + - index: `str` - index for downloading. + - nt_new_win: `bool` - whether to open a new terminal window (nt + only). + - catch_output: `bool` - whether to catch output stdout and stderr. + + - return: `Popen[bytes]` - the process running commands. + """ + args = (*packages,) + if update: + args += ("-U",) + if index: + args += ("-i", index) + return perform_pip_command( + pyexec, "install", *args, + nt_new_win=nt_new_win, + catch_output=catch_output + ) + + +def rrggbb_bg2fg(color: str) -> Literal['#000000', '#ffffff']: + """ + Convert hex color code background to black or white. + + - color: `str` - color code with the shape of '#rrggbb' + + - return: `str` - converted color code (`'#000000'` or `'#ffffff'`) + """ + c_int = int(color[1:], base=16) + # Formula for choosing color: + # 0.2126 × R + 0.7152 × G + 0.0722 × B > 0.5 + # => bright color ==> use opposite dark + c_bgr: list[int] = [] + for _ in range(3): + c_bgr.append(c_int & 0xff) + c_int >>= 8 + b, g, r = (x / 255 for x in c_bgr) + return ( + "#000000" if 0.2126 * r + 0.7152 * g + 0.0722 * b > 0.5 else "#ffffff" + ) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index cf4968c..a755392 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,19 @@ [project] name = "nonebot-desktop-wing" -version = "0.2.8" +version = "0.3.0" description = "Wings for NoneBot desktop applications." authors = [ {name = "HivertMoZara", email = "worldmozara@163.com"}, ] license = {text = "MIT"} -dependencies = ["nb_cli~=1.0.0", "python-dotenv~=1.0.0", "httpx>=0.23", "typing-extensions>=4.5.0"] -requires-python = ">=3.8" +dependencies = [ + "nb-cli>=1.2.2", + "pydantic>=1.10,<2", + "python-dotenv~=1.0.0", + "httpx>=0.23", + "typing-extensions>=4.5.0" +] +requires-python = ">=3.10" readme = "README.md" [project.urls]