From 0de4486619c08abd1026c9419ee8d2f9d221080e Mon Sep 17 00:00:00 2001 From: Fidel Yin <fidel.yin@hotmail.com> Date: Sat, 15 Apr 2023 17:41:44 +0800 Subject: [PATCH 1/3] Add CLI mode --- README.md | 6 + cli.py | 704 +++++++++++++++++++++++++++++++++++++++++++++++ main.py | 36 ++- manual.txt | 2 + requirements.txt | 2 + twitch.py | 7 +- 6 files changed, 743 insertions(+), 14 deletions(-) create mode 100644 cli.py diff --git a/README.md b/README.md index 7a50b91d..bb35fdb3 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,12 @@ Every ~60 seconds, the application sends a "minute watched" event to the channel - Make sure to link your Twitch account to game accounts on the [campaigns page](https://www.twitch.tv/drops/campaigns), to enable more games to be mined. - Persistent cookies will be stored in the `cookies.jar` file, from which the authorization (login) information will be restored on each subsequent run. +#### Running on server without GUI: + +- Add the argument `--cli` to launch in the Command Line Interface mode. +- To log in, you should open a link shown in the message, and enter the shown code on the webpage. +- Only necessary information is displayed, and all settings are configured in `settings.json`. + ### Pictures: ![Main](https://user-images.githubusercontent.com/4180725/164298155-c0880ad7-6423-4419-8d73-f3c053730a1b.png) diff --git a/cli.py b/cli.py new file mode 100644 index 00000000..9d8bc08c --- /dev/null +++ b/cli.py @@ -0,0 +1,704 @@ +from __future__ import annotations + +import curses +import asyncio +import logging +from collections import abc +from math import log10, ceil +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, TYPE_CHECKING + +from translate import _ +from exceptions import ExitRequest +from utils import Game, _T +from constants import ( + OUTPUT_FORMATTER, WS_TOPICS_LIMIT +) + +if TYPE_CHECKING: + from twitch import Twitch + from channel import Channel + from settings import Settings + from inventory import DropsCampaign, TimedDrop + +DIGITS = ceil(log10(WS_TOPICS_LIMIT)) + +WINDOW_WIDTH = 100 + + +class _LoggingHandler(logging.Handler): + def __init__(self, output: CLIManager): + super().__init__() + self._output = output + + def emit(self, record): + self._output.print(self.format(record)) + + +class StatusBar: + def __init__(self): + self._window = curses.newwin(1, WINDOW_WIDTH, 1, 0) + self.update("") + + def update(self, status: str): + self._window.clear() + self._window.addstr(0, 0, f"{_('gui', 'status', 'name')}: ") + self._window.addstr(status) + self._window.refresh() + + +@dataclass +class _WSEntry: + status: str + topics: int + + +class WebsocketStatus: + """ + The websocket status display is removed from CLI mode + to reduce unnecessary display space use + """ + + def __init__(self): + # self._window = curses.newwin(MAX_WEBSOCKETS, 30, 1, 0) + # self._label = '\n'.join( + # _("gui", "websocket", "websocket").format(id=i) + # for i in range(1, MAX_WEBSOCKETS + 1) + # ) + # + # self._status: str = "" + # self._topics: str = "" + # + # self._items: dict[int, _WSEntry | None] = {i: None for i in range(MAX_WEBSOCKETS)} + # self._update() + pass + + def update(self, idx: int, status: str | None = None, topics: int | None = None): + # if status is None and topics is None: + # raise TypeError("You need to provide at least one of: status, topics") + # entry = self._items.get(idx) + # if entry is None: + # entry = self._items[idx] = _WSEntry( + # status=_("gui", "websocket", "disconnected"), topics=0 + # ) + # if status is not None: + # entry.status = status + # if topics is not None: + # entry.topics = topics + # self._update() + pass + + def remove(self, idx: int): + # if idx in self._items: + # del self._items[idx] + # self._update() + pass + + # def _update(self): + # self._window.clear() + # self._window.addstr(0, 0, self._label) + # + # for idx in range(MAX_WEBSOCKETS): + # if (item := self._items.get(idx)) is not None: + # self._window.addstr(idx, 15, item.status) + # self._window.addstr(idx, 25, f"{item.topics:>{DIGITS}}/{WS_TOPICS_LIMIT}") + # + # self._window.refresh() + + +@dataclass +class LoginData: + username: str + password: str + token: str + + +class LoginHandler: + def __init__(self, manager: CLIManager): + self._manager = manager + self._window = curses.newwin(1, WINDOW_WIDTH, 0, 0) + labels = _("gui", "login", "labels").split("\n") + self._label_status = labels[0] + ' ' + self._label_user_id = labels[1] + ' ' + + self.update(_("gui", "login", "logged_out"), None) + + async def ask_enter_code(self, user_code: str) -> None: + self.update(_("gui", "login", "required"), None) + self._manager.print(_("gui", "login", "request")) + + self._manager.print(f"Open this link to login: https://www.twitch.tv/activate") + self._manager.print(f"Enter this code on the Twitch's device activation page: {user_code}") + + def update(self, status: str, user_id: int | None): + self._window.clear() + self._window.addstr(0, 0, self._label_status) + self._window.addstr(status) + self._window.addstr(0, 30, self._label_user_id) + self._window.addstr(str(user_id) if user_id else "-") + self._window.refresh() + + +@dataclass +class CampaignDisplay: + name: str + status: str = "" + starts_at: str = "" + ends_at: str = "" + link: str = "" + allowed_channels: str = "" + drops: list[str] = field(default_factory=list) + visible: bool = True + + +class DropDisplay: + benefits: list[str] = [] + progress: str = "" + + +class InventoryHandler: + """ + The inventory display is removed from CLI mode + to reduce unnecessary display space use + """ + + def __init__(self, manager: CLIManager): + # self._settings: Settings = manager._twitch.settings + # self._filters: dict[str, bool] = { + # "linked": True, + # "upcoming": True, + # "expired": False, + # "excluded": False, + # "finished": False, + # } + # # manager.tabs.add_view_event(self._on_tab_switched) + # + # # Inventory view + # self._campaigns: dict[DropsCampaign, CampaignDisplay] = {} + # self._drops: dict[str, DropDisplay] = {} + pass + + def _update_visibility(self, campaign: DropsCampaign): + # True if the campaign is supposed to show, False makes it hidden. + # campaign_display = self._campaigns[campaign] + # linked = self._filters["linked"] + # expired = self._filters["expired"] + # excluded = self._filters["excluded"] + # upcoming = self._filters["upcoming"] + # finished = self._filters["finished"] + # priority_only = self._settings.priority_only + # campaign_display.visible = ( + # (not linked or campaign.linked) + # and (campaign.active or upcoming and campaign.upcoming or expired and campaign.expired) + # and ( + # excluded + # or ( + # campaign.game.name not in self._settings.exclude + # and not priority_only or campaign.game.name in self._settings.priority + # ) + # ) + # and (finished or not campaign.finished) + # ) + pass + + def _on_tab_switched(self) -> None: + # if self._manager.tabs.current_tab() == 1: + # # refresh only if we're switching to the tab + # self.refresh() + pass + + def refresh(self): + # for campaign in self._campaigns: + # # status + # status_label = self._campaigns[campaign]["status"] + # status_text, status_color = self.get_status(campaign) + # status_label.config(text=status_text, foreground=status_color) + # # visibility + # self._update_visibility(campaign) + # self._canvas_update() + pass + + def _canvas_update(self): + pass + + async def add_campaign(self, campaign: DropsCampaign) -> None: + # # Name + # campaign_display = CampaignDisplay(name=campaign.name) + # + # # Status + # status_text, status_color = self.get_status(campaign) + # campaign_display.status = status_text + # + # # Starts / Ends + # campaign_display.starts_at = _("gui", "inventory", "starts").format( + # time=campaign.starts_at.astimezone().replace(microsecond=0, tzinfo=None)) + # campaign_display.ends_at = _("gui", "inventory", "ends").format( + # time=campaign.ends_at.astimezone().replace(microsecond=0, tzinfo=None)) + # + # # Linking status + # if campaign.linked: + # campaign_display.link = _("gui", "inventory", "status", "linked") + # else: + # campaign_display.link = _("gui", "inventory", "status", "not_linked") + # campaign_display.link += campaign.link_url + # + # # ACL channels + # acl = campaign.allowed_channels + # if acl: + # if len(acl) <= 5: + # allowed_text: str = '\n'.join(ch.name for ch in acl) + # else: + # allowed_text = '\n'.join(ch.name for ch in acl[:4]) + # allowed_text += ( + # f"\n{_('gui', 'inventory', 'and_more').format(amount=len(acl) - 4)}" + # ) + # else: + # allowed_text = _("gui", "inventory", "all_channels") + # campaign_display.allowed_channels = f"{_('gui', 'inventory', 'allowed_channels')}\n{allowed_text}" + # + # # Drops display + # for i, drop in enumerate(campaign.drops): + # campaign_display.drops.append(drop.id) + # drop_display = DropDisplay() + # + # # Benefits + # for benefit in drop.benefits: + # drop_display.benefits.append(benefit.name) + # + # # Progress + # progress_text, progress_color = self.get_progress(drop) + # drop_display.progress = progress_text + # + # self._campaigns[campaign] = campaign_display + # + # # if self._manager.tabs.current_tab() == 1: + # # self._update_visibility(campaign) + # # self._canvas_update() + pass + + def clear(self) -> None: + # self._drops.clear() + # self._campaigns.clear() + pass + + @staticmethod + def get_status(campaign: DropsCampaign) -> tuple[str, str]: + if campaign.active: + status_text = _("gui", "inventory", "status", "active") + status_color = "green" + elif campaign.upcoming: + status_text = _("gui", "inventory", "status", "upcoming") + status_color = "goldenrod" + else: + status_text = _("gui", "inventory", "status", "expired") + status_color = "red" + return status_text, status_color + + @staticmethod + def get_progress(drop: TimedDrop) -> tuple[str, str]: + progress_color = "" + if drop.is_claimed: + progress_color = "green" + progress_text = _("gui", "inventory", "status", "claimed") + elif drop.can_claim: + progress_color = "goldenrod" + progress_text = _("gui", "inventory", "status", "ready_to_claim") + elif drop.current_minutes or drop.can_earn(): + progress_text = _("gui", "inventory", "percent_progress").format( + percent=f"{drop.progress:3.1%}", + minutes=drop.required_minutes, + ) + else: + progress_text = _("gui", "inventory", "minutes_progress").format( + minutes=drop.required_minutes + ) + return progress_text, progress_color + + def update_drop(self, drop: TimedDrop) -> None: + # if drop.id not in self._drops: + # return + # + # progress_text, progress_color = self.get_progress(drop) + # self._drops[drop.id].progress = progress_text + pass + + +class SettingsHandler: + """ + The setting panel has been removed from CLI mode + to reduce unnecessary display space use + Please edit settings.json manually + """ + + def __init__(self, manager: CLIManager): + self._twitch = manager._twitch + self._settings: Settings = manager._twitch.settings + + self._exclude_list = [] + self._priority_list = [] + + def set_games(self, games: abc.Iterable[Game]) -> None: + games_list = sorted(map(str, games)) + self._exclude_list = games_list + self._priority_list = games_list + + def priorities(self) -> dict[str, int]: + # NOTE: we shift the indexes so that 0 can be used as the default one + size = len(self._priority_list) + return { + game_name: size - i for i, game_name in enumerate(self._priority_list) + } + + +class ProgressHandler: + def __init__(self): + self._window = curses.newwin(9, WINDOW_WIDTH, 3, 0) + + self._drop: TimedDrop | None = None + self._timer_task: asyncio.Task[None] | None = None + + self._campaign_name: str = "" + self._campaign_game: str = "" + self._campaign_progress: float = 0 + self._campaign_percentage: str = "" + self._campaign_remaining: str = "" + + self._drop_rewards: str = "" + self._drop_progress: float = 0 + self._drop_percentage: str = "" + self._drop_remaining: str = "" + + self.display(None) + + @staticmethod + def _divmod(minutes: int, seconds: int) -> tuple[int, int]: + if seconds < 60 and minutes > 0: + minutes -= 1 + hours, minutes = divmod(minutes, 60) + return hours, minutes + + def _update_time(self, seconds: int): + drop = self._drop + if drop is not None: + drop_minutes = drop.remaining_minutes + campaign_minutes = drop.campaign.remaining_minutes + else: + drop_minutes = 0 + campaign_minutes = 0 + + dseconds = seconds % 60 + hours, minutes = self._divmod(drop_minutes, seconds) + self._drop_remaining = f"{hours:>2}:{minutes:02}:{dseconds:02}" + + hours, minutes = self._divmod(campaign_minutes, seconds) + self._campaign_remaining = f"{hours:>2}:{minutes:02}:{dseconds:02}" + + async def _timer_loop(self): + seconds = 60 + self._update_time(seconds) + while seconds > 0: + await asyncio.sleep(1) + seconds -= 1 + self._update_time(seconds) + self._timer_task = None + + def start_timer(self): + if self._timer_task is None: + if self._drop is None or self._drop.remaining_minutes <= 0: + # if we're starting the timer at 0 drop minutes, + # all we need is a single instant time update setting seconds to 60, + # to avoid substracting a minute from campaign minutes + self._update_time(60) + else: + self._timer_task = asyncio.create_task(self._timer_loop()) + + def stop_timer(self): + if self._timer_task is not None: + self._timer_task.cancel() + self._timer_task = None + + def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: bool = False): + self._drop = drop + self.stop_timer() + + if drop is None: + # clear the drop display + self._drop_rewards = "..." + self._drop_progress = 0 + self._drop_percentage = "-%" + self._campaign_name = "..." + self._campaign_game = "..." + self._campaign_progress = 0 + self._campaign_percentage = "-%" + self._update_time(0) + self._update() + return + + self._drop_rewards = drop.rewards_text() + self._drop_progress = drop.progress + self._drop_percentage = f"{drop.progress:6.1%}" + + campaign = drop.campaign + self._campaign_name = campaign.name + self._campaign_game = campaign.game.name + self._campaign_progress = campaign.progress + self._campaign_percentage = f"{campaign.progress:6.1%} ({campaign.claimed_drops}/{campaign.total_drops})" + + if countdown: + # restart our seconds update timer + self.start_timer() + elif subone: + # display the current remaining time at 0 seconds (after substracting the minute) + # this is because the watch loop will substract this minute + # right after the first watch payload returns with a time update + self._update_time(0) + else: + # display full time with no substracting + self._update_time(60) + + self._update() + + @staticmethod + def _progress_bar(progress: float, width: int) -> str: + finished = int((width - 2) * progress) + remaining = int((width - 2) - finished) + return f"[{'=' * finished}{'-' * remaining}]" + + def _update(self): + self._window.clear() + + self._window.addstr(0, 0, _("gui", "progress", "game") + ' ') + self._window.addstr(self._campaign_game) + self._window.addstr(1, 0, _("gui", "progress", "campaign") + ' ') + self._window.addstr(self._campaign_name) + self._window.addstr(2, 0, _("gui", "progress", "campaign_progress") + ' ') + self._window.addstr(f"{self._campaign_percentage}") + self._window.addstr(2, 30, _("gui", "progress", "remaining").format(time=self._campaign_remaining)) + self._window.addstr(3, 0, self._progress_bar(self._campaign_progress, WINDOW_WIDTH)) + + self._window.addstr(5, 0, _("gui", "progress", "drop") + ' ') + self._window.addstr(self._drop_rewards) + self._window.addstr(6, 0, _("gui", "progress", "drop_progress") + ' ') + self._window.addstr(f"{self._drop_percentage}") + self._window.addstr(6, 30, _("gui", "progress", "remaining").format(time=self._drop_remaining)) + self._window.addstr(7, 0, self._progress_bar(self._drop_progress, WINDOW_WIDTH)) + + self._window.refresh() + + +class ConsoleOutput: + _BUFFER_SIZE = 6 + + def __init__(self): + self._window = curses.newwin(self._BUFFER_SIZE, WINDOW_WIDTH, 12, 0) + + self._buffer: list[str] = [] + + def print(self, message: str): + stamp = datetime.now().strftime("%X") + ": " + max_length = WINDOW_WIDTH - len(stamp) + + lines = message.split("\n") # Split the message by lines + for line in lines: + for i in range(0, len(line), max_length): # Split te line by length + self._buffer.append(stamp + line[i:i + max_length]) + + self._buffer = self._buffer[-self._BUFFER_SIZE:] # Keep the last lines + self._update() + + def _update(self): + self._window.clear() + for i, line in enumerate(self._buffer): + self._window.addstr(i, 0, line) + self._window.refresh() + + +class ChannelsHandler: + """ + The channel list is removed from CLI mode + to reduce unnecessary display space use + """ + + def __init__(self): + # self._channels: dict[str, dict] = {} + # self._channel_map: dict[str, Channel] = {} + # self._selection = None + pass + + def _set(self, iid: str, column: str, value: any): + # self._channels[iid][column] = value + pass + + def _insert(self, iid: str, values: dict[str, any]): + # self._channels[iid] = values + pass + + def clear(self): + # self._channels.clear() + pass + + def set_watching(self, channel: Channel): + # self.clear_watching() + # iid = channel.iid + # self._channels[iid]["watching"] = True + pass + + def clear_watching(self): + # for channel in self._channels.values(): + # channel["watching"] = False + pass + + def get_selection(self) -> Channel | None: + # if not self._channel_map: + # return None + # if not self._selection: + # return None + # return self._channel_map[self._selection] + pass + + def display(self, channel: Channel, *, add: bool = False): + # iid = channel.iid + # if not add and iid not in self._channel_map: + # # the channel isn't on the list and we're not supposed to add it + # return + # # ACL-based + # acl_based = channel.acl_based + # # status + # if channel.online: + # status = _("gui", "channels", "online") + # elif channel.pending_online: + # status = _("gui", "channels", "pending") + # else: + # status = _("gui", "channels", "offline") + # # game + # game = str(channel.game or '') + # # drops + # drops = channel.drops_enabled + # # viewers + # viewers = '' + # if channel.viewers is not None: + # viewers = channel.viewers + # # points + # points = '' + # if channel.points is not None: + # points = channel.points + # if iid in self._channel_map: + # self._set(iid, "game", game) + # self._set(iid, "drops", drops) + # self._set(iid, "status", status) + # self._set(iid, "viewers", viewers) + # self._set(iid, "acl_base", acl_based) + # if points != '': # we still want to display 0 + # self._set(iid, "points", points) + # elif add: + # self._channel_map[iid] = channel + # self._insert( + # iid, + # { + # "game": game, + # "drops": drops, + # "points": points, + # "status": status, + # "viewers": viewers, + # "acl_base": acl_based, + # "channel": channel.name, + # }, + # ) + pass + + +class CLIManager: + def __init__(self, twitch: Twitch): + self._twitch: Twitch = twitch + self._close_requested = asyncio.Event() + + # GUI + self._stdscr = curses.initscr() + + self.output = ConsoleOutput() + # register logging handler + self._handler = _LoggingHandler(self) + self._handler.setFormatter(OUTPUT_FORMATTER) + logger = logging.getLogger("TwitchDrops") + logger.addHandler(self._handler) + if (logging_level := logger.getEffectiveLevel()) < logging.ERROR: + self.print(f"Logging level: {logging.getLevelName(logging_level)}") + + self.status = StatusBar() + self.websockets = WebsocketStatus() + self.inv = InventoryHandler(self) + self.login = LoginHandler(self) + self.progress = ProgressHandler() + self.channels = ChannelsHandler() + self.settings = SettingsHandler(self) + + @property + def close_requested(self) -> bool: + return self._close_requested.is_set() + + async def wait_until_closed(self): + # wait until the user closes the window + await self._close_requested.wait() + + async def coro_unless_closed(self, coro: abc.Awaitable[_T]) -> _T: + # In Python 3.11, we need to explicitly wrap awaitables + tasks = [asyncio.ensure_future(coro), asyncio.ensure_future(self._close_requested.wait())] + done: set[asyncio.Task[Any]] + pending: set[asyncio.Task[Any]] + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + for task in pending: + task.cancel() + if self._close_requested.is_set(): + raise ExitRequest() + return await next(iter(done)) + + def prevent_close(self): + self._close_requested.clear() + + def start(self): + curses.noecho() + curses.cbreak() + self._stdscr.keypad(True) + + # self.progress.start_timer() + + def stop(self): + curses.nocbreak() + self._stdscr.keypad(False) + curses.echo() + curses.endwin() + + self.progress.stop_timer() + + def close(self, *args) -> int: + """ + Requests the GUI application to close. + The window itself will be closed in the closing sequence later. + """ + self._close_requested.set() + # notify client we're supposed to close + self._twitch.close() + return 0 + + def close_window(self): + """ + Closes the window. Invalidates the logger. + """ + logging.getLogger("TwitchDrops").removeHandler(self._handler) + + def save(self, *, force: bool = False): + pass + + def set_games(self, games: abc.Iterable[Game]): + self.settings.set_games(games) + + def display_drop(self, drop: TimedDrop, *, countdown: bool = True, subone: bool = False) -> None: + self.progress.display(drop, countdown=countdown, subone=subone) # main tab + # inventory overview is updated from within drops themselves via change events + + def clear_drop(self): + self.progress.display(None) + + def print(self, message: str): + self.output.print(message) diff --git a/main.py b/main.py index 7311139a..a2d5f18b 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,6 @@ # import an additional thing for proper PyInstaller freeze support from multiprocessing import freeze_support - if __name__ == "__main__": freeze_support() import io @@ -34,6 +33,7 @@ warnings.simplefilter("default", ResourceWarning) + class Parser(argparse.ArgumentParser): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -49,6 +49,7 @@ def exit(self, status: int = 0, message: str | None = None) -> NoReturn: finally: messagebox.showerror("Argument Parser Error", self._message.getvalue()) + class ParsedArgs(argparse.Namespace): _verbose: int _debug_ws: bool @@ -89,16 +90,13 @@ def debug_gql(self) -> int: return logging.INFO return logging.NOTSET + # handle input parameters # NOTE: parser output is shown via message box # we also need a dummy invisible window for the parser - root = tk.Tk() - root.overrideredirect(True) - root.withdraw() - root.iconphoto( - True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) - ) - root.update() + # TODO: To make it works with CLI mode, I have to move the parser to + # TODO: before creating the window temporarily. A better method is needed. + # TODO: Maybe direct the parser output to the console? parser = Parser( SELF_PATH.name, description="A program that allows you to mine timed drops on Twitch.", @@ -107,6 +105,7 @@ def debug_gql(self) -> int: parser.add_argument("-v", dest="_verbose", action="count", default=0) parser.add_argument("--tray", action="store_true") parser.add_argument("--log", action="store_true") + parser.add_argument("--cli", action="store_true") # undocumented debug args parser.add_argument( "--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS @@ -118,6 +117,16 @@ def debug_gql(self) -> int: "--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS ) args = parser.parse_args(namespace=ParsedArgs()) + + if not args.cli: + root = tk.Tk() + root.overrideredirect(True) + root.withdraw() + root.iconphoto( + True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) + ) + root.update() + # load settings try: settings = Settings(args) @@ -127,10 +136,13 @@ def debug_gql(self) -> int: f"There was an error while loading the settings file:\n\n{traceback.format_exc()}" ) sys.exit(4) - # dummy window isn't needed anymore - root.destroy() + + if not args.cli: + # dummy window isn't needed anymore + root.destroy() + del root # get rid of unneeded objects - del root, parser + del parser # check if we're not already running if sys.platform == "win32": @@ -168,7 +180,7 @@ def debug_gql(self) -> int: exit_status = 0 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - client = Twitch(settings) + client = Twitch(settings, args.cli) signal.signal(signal.SIGINT, lambda *_: client.gui.close()) signal.signal(signal.SIGTERM, lambda *_: client.gui.close()) try: diff --git a/manual.txt b/manual.txt index 7fefdbfa..5e9c5d6c 100644 --- a/manual.txt +++ b/manual.txt @@ -14,6 +14,8 @@ Available command line arguments: matches the level set by `-v`. • --version Show application version information +• --cli + Launch in Command Line Interface mode. Note: Additional settings are available within the application GUI. diff --git a/requirements.txt b/requirements.txt index f497250b..c6a892ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,9 @@ aiohttp>2.0,<4.0 Pillow pystray selenium-wire +yarl # use a fork that has an extra option of hiding the unneeded Chrome's console cmd window git+https://github.com/sebdelsol/undetected-chromedriver # this is installed only on windows pywin32; sys_platform == "win32" +windows-curses; sys_platform == "win32" \ No newline at end of file diff --git a/twitch.py b/twitch.py index 86fd34ad..767dbfe1 100644 --- a/twitch.py +++ b/twitch.py @@ -30,7 +30,6 @@ ) from exc from translate import _ -from gui import GUIManager from channel import Channel from websocket import WebsocketPool from inventory import DropsCampaign @@ -605,7 +604,7 @@ def invalidate(self, *, auth: bool = False, integrity: bool = False): class Twitch: - def __init__(self, settings: Settings): + def __init__(self, settings: Settings, cli: bool = False): self.settings: Settings = settings # State management self._state: State = State.IDLE @@ -618,6 +617,10 @@ def __init__(self, settings: Settings): self._session: aiohttp.ClientSession | None = None self._auth_state: _AuthState = _AuthState(self) # GUI + if cli: + from cli import CLIManager as GUIManager + else: + from gui import GUIManager self.gui = GUIManager(self) # Storing and watching channels self.channels: OrderedDict[int, Channel] = OrderedDict() From c785ebcf2eb8935b184c258cf126a49be3cb7288 Mon Sep 17 00:00:00 2001 From: Fidel Yin <fidel.yin@hotmail.com> Date: Wed, 19 Apr 2023 19:31:50 +0800 Subject: [PATCH 2/3] Improvement to CLI mode & Extract a abstract class for UI Managers --- base_ui.py | 278 +++++++++++++++++++++++++++++++ cli.py | 415 ++++++++++++++--------------------------------- gui.py | 28 ++-- main.py | 34 ++-- requirements.txt | 1 - twitch.py | 4 +- 6 files changed, 436 insertions(+), 324 deletions(-) create mode 100644 base_ui.py diff --git a/base_ui.py b/base_ui.py new file mode 100644 index 00000000..fd71c3f2 --- /dev/null +++ b/base_ui.py @@ -0,0 +1,278 @@ +from abc import abstractmethod, ABCMeta + + +class BaseStatusBar(metaclass=ABCMeta): + @abstractmethod + def update(self, text): + pass + + @abstractmethod + def clear(self): + pass + + +class BaseWebsocketStatus(metaclass=ABCMeta): + @abstractmethod + def update(self, idx, status, topics): + pass + + @abstractmethod + def remove(self, idx): + pass + + +class BaseLoginForm(metaclass=ABCMeta): + @abstractmethod + def clear(self, login, password, token): + pass + + @abstractmethod + def wait_for_login_press(self): + pass + + @abstractmethod + def ask_login(self): + pass + + @abstractmethod + def update(self, status, user_id): + pass + + @abstractmethod + def ask_enter_code(self, user_code): + pass + + +class BaseTrayIcon(metaclass=ABCMeta): + @abstractmethod + def is_tray(self): + pass + + @abstractmethod + def get_title(self, drop): + pass + + @abstractmethod + def start(self): + pass + + @abstractmethod + def stop(self): + pass + + @abstractmethod + def quit(self): + pass + + @abstractmethod + def minimize(self): + pass + + @abstractmethod + def restore(self): + pass + + @abstractmethod + def notify(self, message, title, duration): + pass + + @abstractmethod + def update_title(self, drop): + pass + + +class BaseSettingsPanel(metaclass=ABCMeta): + @abstractmethod + def clear_selection(self): + pass + + @abstractmethod + def update_notifications(self): + pass + + @abstractmethod + def update_autostart(self): + pass + + @abstractmethod + def set_games(self, games): + pass + + @abstractmethod + def priorities(self): + pass + + @abstractmethod + def priority_add(self): + pass + + @abstractmethod + def priority_move(self, up): + pass + + @abstractmethod + def priority_delete(self): + pass + + @abstractmethod + def priority_only(self): + pass + + @abstractmethod + def exclude_add(self): + pass + + @abstractmethod + def exclude_delete(self): + pass + + +class BaseInventoryOverview(metaclass=ABCMeta): + @abstractmethod + def refresh(self): + pass + + @abstractmethod + def add_campaign(self, campaign): + pass + + @abstractmethod + def clear(self): + pass + + @staticmethod + @abstractmethod + def get_status(campaign): + pass + + @staticmethod + @abstractmethod + def get_progress(drop): + pass + + @abstractmethod + def update_drop(self, drop): + pass + + +class BaseCampaignProgress(metaclass=ABCMeta): + @abstractmethod + def start_timer(self): + pass + + @abstractmethod + def stop_timer(self): + pass + + @abstractmethod + def display(self, drop, countdown, subone): + pass + + +class BaseConsoleOutput(metaclass=ABCMeta): + @abstractmethod + def print(self, message): + pass + + +class BaseChannelList(metaclass=ABCMeta): + @abstractmethod + def shrink(self): + pass + + @abstractmethod + def clear_watching(self): + pass + + @abstractmethod + def set_watching(self, channel): + pass + + @abstractmethod + def get_selection(self): + pass + + @abstractmethod + def clear_selection(self): + pass + + @abstractmethod + def clear(self): + pass + + @abstractmethod + def display(self, channel, add): + pass + + @abstractmethod + def remove(self, channel): + pass + + +class BaseInterfaceManager(metaclass=ABCMeta): + @abstractmethod + def wnd_proc(self, hwnd, msg, w_param, l_param): + """ + This function serves as a message processor for all messages sent + to the application by Windows. + """ + pass + + @abstractmethod + async def wait_until_closed(self): + pass + + @abstractmethod + async def coro_unless_closed(self, coro): + pass + + @abstractmethod + def prevent_close(self): + pass + + @abstractmethod + def start(self): + pass + + @abstractmethod + def stop(self): + pass + + @abstractmethod + def close(self, args): + """ + Requests the GUI application to close. + The window itself will be closed in the closing sequence later. + """ + pass + + @abstractmethod + def close_window(self): + """ + Closes the window. Invalidates the logger. + """ + pass + + @abstractmethod + def unfocus(self, event): + pass + + @abstractmethod + def save(self, force): + pass + + @abstractmethod + def set_games(self, games): + pass + + @abstractmethod + def display_drop(self, drop, countdown, subone): + pass + + @abstractmethod + def clear_drop(self): + pass + + @abstractmethod + def print(self, message): + pass diff --git a/cli.py b/cli.py index 9d8bc08c..6f9428f3 100644 --- a/cli.py +++ b/cli.py @@ -4,27 +4,25 @@ import asyncio import logging from collections import abc -from math import log10, ceil from dataclasses import dataclass, field from datetime import datetime from typing import Any, TYPE_CHECKING from translate import _ +from base_ui import BaseInterfaceManager, BaseSettingsPanel, BaseInventoryOverview, BaseCampaignProgress, \ + BaseConsoleOutput, BaseChannelList, BaseTrayIcon, BaseLoginForm, BaseWebsocketStatus, BaseStatusBar from exceptions import ExitRequest from utils import Game, _T from constants import ( - OUTPUT_FORMATTER, WS_TOPICS_LIMIT + OUTPUT_FORMATTER ) if TYPE_CHECKING: from twitch import Twitch from channel import Channel - from settings import Settings from inventory import DropsCampaign, TimedDrop -DIGITS = ceil(log10(WS_TOPICS_LIMIT)) - -WINDOW_WIDTH = 100 +WINDOW_WIDTH = 80 class _LoggingHandler(logging.Handler): @@ -36,85 +34,34 @@ def emit(self, record): self._output.print(self.format(record)) -class StatusBar: +class StatusBar(BaseStatusBar): def __init__(self): self._window = curses.newwin(1, WINDOW_WIDTH, 1, 0) self.update("") def update(self, status: str): self._window.clear() - self._window.addstr(0, 0, f"{_('gui', 'status', 'name')}: ") + self._window.addstr(0, 0, f"{_('gui', 'status', 'name')}: ", curses.A_BOLD) self._window.addstr(status) self._window.refresh() - -@dataclass -class _WSEntry: - status: str - topics: int + def clear(self): + self.update("") -class WebsocketStatus: +class WebsocketStatus(BaseWebsocketStatus): """ - The websocket status display is removed from CLI mode - to reduce unnecessary display space use + The websocket status display is not implemented in CLI """ - def __init__(self): - # self._window = curses.newwin(MAX_WEBSOCKETS, 30, 1, 0) - # self._label = '\n'.join( - # _("gui", "websocket", "websocket").format(id=i) - # for i in range(1, MAX_WEBSOCKETS + 1) - # ) - # - # self._status: str = "" - # self._topics: str = "" - # - # self._items: dict[int, _WSEntry | None] = {i: None for i in range(MAX_WEBSOCKETS)} - # self._update() - pass - def update(self, idx: int, status: str | None = None, topics: int | None = None): - # if status is None and topics is None: - # raise TypeError("You need to provide at least one of: status, topics") - # entry = self._items.get(idx) - # if entry is None: - # entry = self._items[idx] = _WSEntry( - # status=_("gui", "websocket", "disconnected"), topics=0 - # ) - # if status is not None: - # entry.status = status - # if topics is not None: - # entry.topics = topics - # self._update() pass def remove(self, idx: int): - # if idx in self._items: - # del self._items[idx] - # self._update() pass - # def _update(self): - # self._window.clear() - # self._window.addstr(0, 0, self._label) - # - # for idx in range(MAX_WEBSOCKETS): - # if (item := self._items.get(idx)) is not None: - # self._window.addstr(idx, 15, item.status) - # self._window.addstr(idx, 25, f"{item.topics:>{DIGITS}}/{WS_TOPICS_LIMIT}") - # - # self._window.refresh() - - -@dataclass -class LoginData: - username: str - password: str - token: str - -class LoginHandler: +class LoginHandler(BaseLoginForm): def __init__(self, manager: CLIManager): self._manager = manager self._window = curses.newwin(1, WINDOW_WIDTH, 0, 0) @@ -124,6 +71,15 @@ def __init__(self, manager: CLIManager): self.update(_("gui", "login", "logged_out"), None) + def clear(self, login, password, token): + pass + + def wait_for_login_press(self): + pass + + def ask_login(self): + pass + async def ask_enter_code(self, user_code: str) -> None: self.update(_("gui", "login", "required"), None) self._manager.print(_("gui", "login", "request")) @@ -133,15 +89,50 @@ async def ask_enter_code(self, user_code: str) -> None: def update(self, status: str, user_id: int | None): self._window.clear() - self._window.addstr(0, 0, self._label_status) + self._window.addstr(0, 0, self._label_status, curses.A_BOLD) self._window.addstr(status) - self._window.addstr(0, 30, self._label_user_id) + self._window.addstr(0, 30, self._label_user_id, curses.A_BOLD) self._window.addstr(str(user_id) if user_id else "-") self._window.refresh() +class TrayHandler(BaseTrayIcon): + """ + Not implemented because not required in CLI + """ + + def is_tray(self) -> bool: + pass + + def get_title(self, drop: TimedDrop | None) -> str: + pass + + def start(self): + pass + + def stop(self): + pass + + def quit(self): + pass + + def minimize(self): + pass + + def restore(self): + pass + + def notify( + self, message: str, title: str | None = None, duration: float = 10 + ) -> asyncio.Task[None] | None: + pass + + def update_title(self, drop: TimedDrop | None): + pass + + @dataclass -class CampaignDisplay: +class _CampaignDisplay: name: str status: str = "" starts_at: str = "" @@ -152,134 +143,23 @@ class CampaignDisplay: visible: bool = True -class DropDisplay: +class _DropDisplay: benefits: list[str] = [] progress: str = "" -class InventoryHandler: +class InventoryHandler(BaseInventoryOverview): """ - The inventory display is removed from CLI mode - to reduce unnecessary display space use + The inventory display is not implemented in CLI """ - def __init__(self, manager: CLIManager): - # self._settings: Settings = manager._twitch.settings - # self._filters: dict[str, bool] = { - # "linked": True, - # "upcoming": True, - # "expired": False, - # "excluded": False, - # "finished": False, - # } - # # manager.tabs.add_view_event(self._on_tab_switched) - # - # # Inventory view - # self._campaigns: dict[DropsCampaign, CampaignDisplay] = {} - # self._drops: dict[str, DropDisplay] = {} - pass - - def _update_visibility(self, campaign: DropsCampaign): - # True if the campaign is supposed to show, False makes it hidden. - # campaign_display = self._campaigns[campaign] - # linked = self._filters["linked"] - # expired = self._filters["expired"] - # excluded = self._filters["excluded"] - # upcoming = self._filters["upcoming"] - # finished = self._filters["finished"] - # priority_only = self._settings.priority_only - # campaign_display.visible = ( - # (not linked or campaign.linked) - # and (campaign.active or upcoming and campaign.upcoming or expired and campaign.expired) - # and ( - # excluded - # or ( - # campaign.game.name not in self._settings.exclude - # and not priority_only or campaign.game.name in self._settings.priority - # ) - # ) - # and (finished or not campaign.finished) - # ) - pass - - def _on_tab_switched(self) -> None: - # if self._manager.tabs.current_tab() == 1: - # # refresh only if we're switching to the tab - # self.refresh() - pass - def refresh(self): - # for campaign in self._campaigns: - # # status - # status_label = self._campaigns[campaign]["status"] - # status_text, status_color = self.get_status(campaign) - # status_label.config(text=status_text, foreground=status_color) - # # visibility - # self._update_visibility(campaign) - # self._canvas_update() - pass - - def _canvas_update(self): pass async def add_campaign(self, campaign: DropsCampaign) -> None: - # # Name - # campaign_display = CampaignDisplay(name=campaign.name) - # - # # Status - # status_text, status_color = self.get_status(campaign) - # campaign_display.status = status_text - # - # # Starts / Ends - # campaign_display.starts_at = _("gui", "inventory", "starts").format( - # time=campaign.starts_at.astimezone().replace(microsecond=0, tzinfo=None)) - # campaign_display.ends_at = _("gui", "inventory", "ends").format( - # time=campaign.ends_at.astimezone().replace(microsecond=0, tzinfo=None)) - # - # # Linking status - # if campaign.linked: - # campaign_display.link = _("gui", "inventory", "status", "linked") - # else: - # campaign_display.link = _("gui", "inventory", "status", "not_linked") - # campaign_display.link += campaign.link_url - # - # # ACL channels - # acl = campaign.allowed_channels - # if acl: - # if len(acl) <= 5: - # allowed_text: str = '\n'.join(ch.name for ch in acl) - # else: - # allowed_text = '\n'.join(ch.name for ch in acl[:4]) - # allowed_text += ( - # f"\n{_('gui', 'inventory', 'and_more').format(amount=len(acl) - 4)}" - # ) - # else: - # allowed_text = _("gui", "inventory", "all_channels") - # campaign_display.allowed_channels = f"{_('gui', 'inventory', 'allowed_channels')}\n{allowed_text}" - # - # # Drops display - # for i, drop in enumerate(campaign.drops): - # campaign_display.drops.append(drop.id) - # drop_display = DropDisplay() - # - # # Benefits - # for benefit in drop.benefits: - # drop_display.benefits.append(benefit.name) - # - # # Progress - # progress_text, progress_color = self.get_progress(drop) - # drop_display.progress = progress_text - # - # self._campaigns[campaign] = campaign_display - # - # # if self._manager.tabs.current_tab() == 1: - # # self._update_visibility(campaign) - # # self._canvas_update() pass def clear(self) -> None: - # self._drops.clear() - # self._campaigns.clear() pass @staticmethod @@ -316,42 +196,50 @@ def get_progress(drop: TimedDrop) -> tuple[str, str]: return progress_text, progress_color def update_drop(self, drop: TimedDrop) -> None: - # if drop.id not in self._drops: - # return - # - # progress_text, progress_color = self.get_progress(drop) - # self._drops[drop.id].progress = progress_text pass -class SettingsHandler: +class SettingsHandler(BaseSettingsPanel): """ - The setting panel has been removed from CLI mode - to reduce unnecessary display space use + The setting panel is not implemented in CLI Please edit settings.json manually """ - def __init__(self, manager: CLIManager): - self._twitch = manager._twitch - self._settings: Settings = manager._twitch.settings + def clear_selection(self): + pass - self._exclude_list = [] - self._priority_list = [] + def update_notifications(self): + pass + + def update_autostart(self): + pass def set_games(self, games: abc.Iterable[Game]) -> None: - games_list = sorted(map(str, games)) - self._exclude_list = games_list - self._priority_list = games_list + pass def priorities(self) -> dict[str, int]: - # NOTE: we shift the indexes so that 0 can be used as the default one - size = len(self._priority_list) - return { - game_name: size - i for i, game_name in enumerate(self._priority_list) - } + return {} + def priority_add(self): + pass + + def priority_move(self, up): + pass + + def priority_delete(self): + pass -class ProgressHandler: + def priority_only(self): + pass + + def exclude_add(self): + pass + + def exclude_delete(self): + pass + + +class ProgressHandler(BaseCampaignProgress): def __init__(self): self._window = curses.newwin(9, WINDOW_WIDTH, 3, 0) @@ -468,18 +356,18 @@ def _progress_bar(progress: float, width: int) -> str: def _update(self): self._window.clear() - self._window.addstr(0, 0, _("gui", "progress", "game") + ' ') + self._window.addstr(0, 0, _("gui", "progress", "game") + ' ', curses.A_BOLD) self._window.addstr(self._campaign_game) - self._window.addstr(1, 0, _("gui", "progress", "campaign") + ' ') + self._window.addstr(1, 0, _("gui", "progress", "campaign") + ' ', curses.A_BOLD) self._window.addstr(self._campaign_name) - self._window.addstr(2, 0, _("gui", "progress", "campaign_progress") + ' ') + self._window.addstr(2, 0, _("gui", "progress", "campaign_progress") + ' ', curses.A_BOLD) self._window.addstr(f"{self._campaign_percentage}") self._window.addstr(2, 30, _("gui", "progress", "remaining").format(time=self._campaign_remaining)) self._window.addstr(3, 0, self._progress_bar(self._campaign_progress, WINDOW_WIDTH)) - self._window.addstr(5, 0, _("gui", "progress", "drop") + ' ') + self._window.addstr(5, 0, _("gui", "progress", "drop") + ' ', curses.A_BOLD) self._window.addstr(self._drop_rewards) - self._window.addstr(6, 0, _("gui", "progress", "drop_progress") + ' ') + self._window.addstr(6, 0, _("gui", "progress", "drop_progress") + ' ', curses.A_BOLD) self._window.addstr(f"{self._drop_percentage}") self._window.addstr(6, 30, _("gui", "progress", "remaining").format(time=self._drop_remaining)) self._window.addstr(7, 0, self._progress_bar(self._drop_progress, WINDOW_WIDTH)) @@ -487,7 +375,7 @@ def _update(self): self._window.refresh() -class ConsoleOutput: +class ConsoleOutput(BaseConsoleOutput): _BUFFER_SIZE = 6 def __init__(self): @@ -514,101 +402,37 @@ def _update(self): self._window.refresh() -class ChannelsHandler: +class ChannelsHandler(BaseChannelList): """ - The channel list is removed from CLI mode - to reduce unnecessary display space use + The channel list is not implemented in CLI """ - def __init__(self): - # self._channels: dict[str, dict] = {} - # self._channel_map: dict[str, Channel] = {} - # self._selection = None - pass - - def _set(self, iid: str, column: str, value: any): - # self._channels[iid][column] = value - pass - - def _insert(self, iid: str, values: dict[str, any]): - # self._channels[iid] = values + def shrink(self): pass def clear(self): - # self._channels.clear() pass def set_watching(self, channel: Channel): - # self.clear_watching() - # iid = channel.iid - # self._channels[iid]["watching"] = True pass def clear_watching(self): - # for channel in self._channels.values(): - # channel["watching"] = False pass def get_selection(self) -> Channel | None: - # if not self._channel_map: - # return None - # if not self._selection: - # return None - # return self._channel_map[self._selection] + pass + + def clear_selection(self): pass def display(self, channel: Channel, *, add: bool = False): - # iid = channel.iid - # if not add and iid not in self._channel_map: - # # the channel isn't on the list and we're not supposed to add it - # return - # # ACL-based - # acl_based = channel.acl_based - # # status - # if channel.online: - # status = _("gui", "channels", "online") - # elif channel.pending_online: - # status = _("gui", "channels", "pending") - # else: - # status = _("gui", "channels", "offline") - # # game - # game = str(channel.game or '') - # # drops - # drops = channel.drops_enabled - # # viewers - # viewers = '' - # if channel.viewers is not None: - # viewers = channel.viewers - # # points - # points = '' - # if channel.points is not None: - # points = channel.points - # if iid in self._channel_map: - # self._set(iid, "game", game) - # self._set(iid, "drops", drops) - # self._set(iid, "status", status) - # self._set(iid, "viewers", viewers) - # self._set(iid, "acl_base", acl_based) - # if points != '': # we still want to display 0 - # self._set(iid, "points", points) - # elif add: - # self._channel_map[iid] = channel - # self._insert( - # iid, - # { - # "game": game, - # "drops": drops, - # "points": points, - # "status": status, - # "viewers": viewers, - # "acl_base": acl_based, - # "channel": channel.name, - # }, - # ) - pass - - -class CLIManager: + pass + + def remove(self, channel: Channel): + pass + + +class CLIManager(BaseInterfaceManager): def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch self._close_requested = asyncio.Event() @@ -616,6 +440,7 @@ def __init__(self, twitch: Twitch): # GUI self._stdscr = curses.initscr() + # Output self.output = ConsoleOutput() # register logging handler self._handler = _LoggingHandler(self) @@ -625,13 +450,17 @@ def __init__(self, twitch: Twitch): if (logging_level := logger.getEffectiveLevel()) < logging.ERROR: self.print(f"Logging level: {logging.getLevelName(logging_level)}") + self.tray = TrayHandler() self.status = StatusBar() self.websockets = WebsocketStatus() - self.inv = InventoryHandler(self) + self.inv = InventoryHandler() self.login = LoginHandler(self) self.progress = ProgressHandler() self.channels = ChannelsHandler() - self.settings = SettingsHandler(self) + self.settings = SettingsHandler() + + def wnd_proc(self, hwnd, msg, w_param, l_param): + pass @property def close_requested(self) -> bool: @@ -672,10 +501,6 @@ def stop(self): self.progress.stop_timer() def close(self, *args) -> int: - """ - Requests the GUI application to close. - The window itself will be closed in the closing sequence later. - """ self._close_requested.set() # notify client we're supposed to close self._twitch.close() @@ -687,6 +512,9 @@ def close_window(self): """ logging.getLogger("TwitchDrops").removeHandler(self._handler) + def unfocus(self, event): + pass + def save(self, *, force: bool = False): pass @@ -694,8 +522,7 @@ def set_games(self, games: abc.Iterable[Game]): self.settings.set_games(games) def display_drop(self, drop: TimedDrop, *, countdown: bool = True, subone: bool = False) -> None: - self.progress.display(drop, countdown=countdown, subone=subone) # main tab - # inventory overview is updated from within drops themselves via change events + self.progress.display(drop, countdown=countdown, subone=subone) def clear_drop(self): self.progress.display(None) diff --git a/gui.py b/gui.py index 1f7320fa..e4a00128 100644 --- a/gui.py +++ b/gui.py @@ -27,6 +27,8 @@ import win32gui from translate import _ +from base_ui import BaseInterfaceManager, BaseSettingsPanel, BaseInventoryOverview, BaseCampaignProgress, \ + BaseConsoleOutput, BaseChannelList, BaseTrayIcon, BaseLoginForm, BaseWebsocketStatus, BaseStatusBar from cache import ImageCache from exceptions import ExitRequest from utils import resource_path, Game, _T @@ -329,7 +331,7 @@ def get(self) -> _T: ########################################### -class StatusBar: +class StatusBar(BaseStatusBar): def __init__(self, manager: GUIManager, master: ttk.Widget): frame = ttk.LabelFrame(master, text=_("gui", "status", "name"), padding=(4, 0, 4, 4)) frame.grid(column=0, row=0, columnspan=3, sticky="nsew", padx=2) @@ -348,7 +350,7 @@ class _WSEntry(TypedDict): topics: int -class WebsocketStatus: +class WebsocketStatus(BaseWebsocketStatus): def __init__(self, manager: GUIManager, master: ttk.Widget): frame = ttk.LabelFrame(master, text=_("gui", "websocket", "name"), padding=(4, 0, 4, 4)) frame.grid(column=0, row=1, sticky="nsew", padx=2) @@ -419,7 +421,7 @@ class LoginData: token: str -class LoginForm: +class LoginForm(BaseLoginForm): def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager self._var = StringVar(master) @@ -527,7 +529,7 @@ class _ProgressVars(TypedDict): drop: _DropVars -class CampaignProgress: +class CampaignProgress(BaseCampaignProgress): BAR_LENGTH = 420 def __init__(self, manager: GUIManager, master: ttk.Widget): @@ -686,7 +688,7 @@ def display(self, drop: TimedDrop | None, *, countdown: bool = True, subone: boo self._update_time(60) -class ConsoleOutput: +class ConsoleOutput(BaseConsoleOutput): def __init__(self, manager: GUIManager, master: ttk.Widget): frame = ttk.LabelFrame(master, text=_("gui", "output"), padding=(4, 0, 4, 4)) frame.grid(column=0, row=3, columnspan=3, sticky="nsew", padx=2) @@ -728,7 +730,7 @@ class _Buttons(TypedDict): load_points: ttk.Button -class ChannelList: +class ChannelList(BaseChannelList): def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager frame = ttk.LabelFrame(master, text=_("gui", "channels", "name"), padding=(4, 0, 4, 4)) @@ -976,7 +978,7 @@ def remove(self, channel: Channel): self._table.delete(iid) -class TrayIcon: +class TrayIcon(BaseTrayIcon): TITLE = "Twitch Drops Miner" def __init__(self, manager: GUIManager, master: ttk.Widget): @@ -1088,7 +1090,7 @@ class CampaignDisplay(TypedDict): status: ttk.Label -class InventoryOverview: +class InventoryOverview(BaseInventoryOverview): def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager self._cache: ImageCache = manager._cache @@ -1332,7 +1334,8 @@ def clear(self) -> None: self._drops.clear() self._campaigns.clear() - def get_status(self, campaign: DropsCampaign) -> tuple[str, tk._Color]: + @staticmethod + def get_status(campaign: DropsCampaign) -> tuple[str, tk._Color]: if campaign.active: status_text: str = _("gui", "inventory", "status", "active") status_color: tk._Color = "green" @@ -1344,7 +1347,8 @@ def get_status(self, campaign: DropsCampaign) -> tuple[str, tk._Color]: status_color = "red" return (status_text, status_color) - def get_progress(self, drop: TimedDrop) -> tuple[str, tk._Color]: + @staticmethod + def get_progress(drop: TimedDrop) -> tuple[str, tk._Color]: progress_text: str progress_color: tk._Color = '' if drop.is_claimed: @@ -1392,7 +1396,7 @@ class _SettingsVars(TypedDict): tray_notifications: IntVar -class SettingsPanel: +class SettingsPanel(BaseSettingsPanel): AUTOSTART_NAME: str = "TwitchDropsMiner" AUTOSTART_KEY: str = "HKCU/Software/Microsoft/Windows/CurrentVersion/Run" @@ -1770,7 +1774,7 @@ def __init__(self, manager: GUIManager, master: ttk.Widget): ########################################## -class GUIManager: +class GUIManager(BaseInterfaceManager): def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch self._poll_task: asyncio.Task[NoReturn] | None = None diff --git a/main.py b/main.py index a2d5f18b..612a82c8 100644 --- a/main.py +++ b/main.py @@ -47,7 +47,10 @@ def exit(self, status: int = 0, message: str | None = None) -> NoReturn: try: super().exit(status, message) # sys.exit(2) finally: - messagebox.showerror("Argument Parser Error", self._message.getvalue()) + try: + messagebox.showerror("Argument Parser Error", self._message.getvalue()) + except Exception: # dummy window doesn't exist + print("Argument Parser Error", self._message.getvalue()) class ParsedArgs(argparse.Namespace): @@ -91,12 +94,20 @@ def debug_gql(self) -> int: return logging.NOTSET + try: + root = tk.Tk() + root.overrideredirect(True) + root.withdraw() + root.iconphoto( + True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) + ) + root.update() + except Exception: # root window doesn't created + pass + # handle input parameters # NOTE: parser output is shown via message box # we also need a dummy invisible window for the parser - # TODO: To make it works with CLI mode, I have to move the parser to - # TODO: before creating the window temporarily. A better method is needed. - # TODO: Maybe direct the parser output to the console? parser = Parser( SELF_PATH.name, description="A program that allows you to mine timed drops on Twitch.", @@ -118,15 +129,6 @@ def debug_gql(self) -> int: ) args = parser.parse_args(namespace=ParsedArgs()) - if not args.cli: - root = tk.Tk() - root.overrideredirect(True) - root.withdraw() - root.iconphoto( - True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) - ) - root.update() - # load settings try: settings = Settings(args) @@ -137,10 +139,12 @@ def debug_gql(self) -> int: ) sys.exit(4) - if not args.cli: + try: # dummy window isn't needed anymore root.destroy() del root + except NameError: # root doesn't exist + pass # get rid of unneeded objects del parser @@ -180,7 +184,7 @@ def debug_gql(self) -> int: exit_status = 0 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - client = Twitch(settings, args.cli) + client = Twitch(settings) signal.signal(signal.SIGINT, lambda *_: client.gui.close()) signal.signal(signal.SIGTERM, lambda *_: client.gui.close()) try: diff --git a/requirements.txt b/requirements.txt index c6a892ed..125ab1ff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ aiohttp>2.0,<4.0 Pillow pystray selenium-wire -yarl # use a fork that has an extra option of hiding the unneeded Chrome's console cmd window git+https://github.com/sebdelsol/undetected-chromedriver # this is installed only on windows diff --git a/twitch.py b/twitch.py index 767dbfe1..ab74b5e0 100644 --- a/twitch.py +++ b/twitch.py @@ -604,7 +604,7 @@ def invalidate(self, *, auth: bool = False, integrity: bool = False): class Twitch: - def __init__(self, settings: Settings, cli: bool = False): + def __init__(self, settings: Settings): self.settings: Settings = settings # State management self._state: State = State.IDLE @@ -617,7 +617,7 @@ def __init__(self, settings: Settings, cli: bool = False): self._session: aiohttp.ClientSession | None = None self._auth_state: _AuthState = _AuthState(self) # GUI - if cli: + if self.settings.cli: from cli import CLIManager as GUIManager else: from gui import GUIManager From 3d840028b425f274c6057e8d3afd157438093c52 Mon Sep 17 00:00:00 2001 From: Fidel Yin <fidel.yin@hotmail.com> Date: Thu, 20 Apr 2023 21:26:39 +0800 Subject: [PATCH 3/3] Make parser work with CLI mode; Improvement to CLI mode --- cli.py | 57 +++++++++++++++++++++++++--------------------------- main.py | 62 +++++++++++++++++++++++++++++++++++---------------------- 2 files changed, 65 insertions(+), 54 deletions(-) diff --git a/cli.py b/cli.py index 6f9428f3..be642bb2 100644 --- a/cli.py +++ b/cli.py @@ -1,10 +1,11 @@ from __future__ import annotations +import sys import curses import asyncio import logging +import traceback from collections import abc -from dataclasses import dataclass, field from datetime import datetime from typing import Any, TYPE_CHECKING @@ -23,6 +24,7 @@ from inventory import DropsCampaign, TimedDrop WINDOW_WIDTH = 80 +WINDOW_HEIGHT = 9 class _LoggingHandler(logging.Handler): @@ -131,23 +133,6 @@ def update_title(self, drop: TimedDrop | None): pass -@dataclass -class _CampaignDisplay: - name: str - status: str = "" - starts_at: str = "" - ends_at: str = "" - link: str = "" - allowed_channels: str = "" - drops: list[str] = field(default_factory=list) - visible: bool = True - - -class _DropDisplay: - benefits: list[str] = [] - progress: str = "" - - class InventoryHandler(BaseInventoryOverview): """ The inventory display is not implemented in CLI @@ -241,7 +226,7 @@ def exclude_delete(self): class ProgressHandler(BaseCampaignProgress): def __init__(self): - self._window = curses.newwin(9, WINDOW_WIDTH, 3, 0) + self._window = curses.newwin(WINDOW_HEIGHT, WINDOW_WIDTH, 3, 0) self._drop: TimedDrop | None = None self._timer_task: asyncio.Task[None] | None = None @@ -440,8 +425,29 @@ def __init__(self, twitch: Twitch): # GUI self._stdscr = curses.initscr() - # Output - self.output = ConsoleOutput() + try: + self.tray = TrayHandler() + self.status = StatusBar() + self.websockets = WebsocketStatus() + self.inv = InventoryHandler() + self.login = LoginHandler(self) + self.progress = ProgressHandler() + self.output = ConsoleOutput() + self.channels = ChannelsHandler() + self.settings = SettingsHandler() + except curses.error: + curses.nocbreak() + self._stdscr.keypad(False) + curses.echo() + curses.endwin() + + sys.stderr.write( + f"An error occurred while creating the curses window, probably due to the window size being too " + f"small (minimum width = {WINDOW_WIDTH}, minimum height = {WINDOW_HEIGHT}).\n" + ) + sys.stderr.write(traceback.format_exc()) + sys.exit(1) + # register logging handler self._handler = _LoggingHandler(self) self._handler.setFormatter(OUTPUT_FORMATTER) @@ -450,15 +456,6 @@ def __init__(self, twitch: Twitch): if (logging_level := logger.getEffectiveLevel()) < logging.ERROR: self.print(f"Logging level: {logging.getLevelName(logging_level)}") - self.tray = TrayHandler() - self.status = StatusBar() - self.websockets = WebsocketStatus() - self.inv = InventoryHandler() - self.login = LoginHandler(self) - self.progress = ProgressHandler() - self.channels = ChannelsHandler() - self.settings = SettingsHandler() - def wnd_proc(self, hwnd, msg, w_param, l_param): pass diff --git a/main.py b/main.py index 612a82c8..b6245c89 100644 --- a/main.py +++ b/main.py @@ -38,19 +38,21 @@ class Parser(argparse.ArgumentParser): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._message: io.StringIO = io.StringIO() + self.is_error: bool = False + self.status: int = 0 + self.message: str = "" def _print_message(self, message: str, file: IO[str] | None = None) -> None: self._message.write(message) # print(message, file=self._message) - def exit(self, status: int = 0, message: str | None = None) -> NoReturn: + def exit(self, status: int = 0, message: str | None = None) -> None: try: - super().exit(status, message) # sys.exit(2) - finally: - try: - messagebox.showerror("Argument Parser Error", self._message.getvalue()) - except Exception: # dummy window doesn't exist - print("Argument Parser Error", self._message.getvalue()) + super().exit(status, message) + except SystemExit: # don't exit, but store the error message and handle it afterwards + self.is_error = True + self.status = status + self.message = self._message.getvalue() class ParsedArgs(argparse.Namespace): @@ -94,20 +96,19 @@ def debug_gql(self) -> int: return logging.NOTSET - try: - root = tk.Tk() - root.overrideredirect(True) - root.withdraw() - root.iconphoto( - True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) - ) - root.update() - except Exception: # root window doesn't created - pass + def show_error(title: str, message: str, cli: bool): + """ + Show the error message to the console or a window, depending on whether CLI or GUI mode is specified. + """ + if cli: # for CLI mode + # Output the error message to the console + sys.stderr.write(f"{title}: {message}\n") + else: # for GUI mode + # Show the error message in a window + messagebox.showerror(title, message) + # handle input parameters - # NOTE: parser output is shown via message box - # we also need a dummy invisible window for the parser parser = Parser( SELF_PATH.name, description="A program that allows you to mine timed drops on Twitch.", @@ -129,22 +130,35 @@ def debug_gql(self) -> int: ) args = parser.parse_args(namespace=ParsedArgs()) + # create the dummy window if in GUI mode + if not args.cli: + root = tk.Tk() + root.overrideredirect(True) + root.withdraw() + root.iconphoto( + True, PhotoImage(master=root, image=Image_module.open(resource_path("pickaxe.ico"))) + ) + root.update() + + if parser.is_error: + show_error("Argument Parser Error", parser.message, args.cli) + sys.exit(parser.status) + # load settings try: settings = Settings(args) except Exception: - messagebox.showerror( + show_error( "Settings error", - f"There was an error while loading the settings file:\n\n{traceback.format_exc()}" + f"There was an error while loading the settings file:\n\n{traceback.format_exc()}", + args.cli ) sys.exit(4) - try: + if not args.cli: # dummy window isn't needed anymore root.destroy() del root - except NameError: # root doesn't exist - pass # get rid of unneeded objects del parser