-
Notifications
You must be signed in to change notification settings - Fork 99
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(search): New Search command, Service method, SearchResult Class
- Loading branch information
1 parent
10a01b0
commit 4e9b561
Showing
3 changed files
with
233 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import re | ||
import sys | ||
from typing import Any, Optional | ||
|
||
import click | ||
import yaml | ||
from rich.padding import Padding | ||
from rich.rule import Rule | ||
from rich.tree import Tree | ||
|
||
from devine.commands.dl import dl | ||
from devine.core.config import config | ||
from devine.core.console import console | ||
from devine.core.constants import context_settings | ||
from devine.core.proxies import Basic, Hola, NordVPN | ||
from devine.core.service import Service | ||
from devine.core.services import Services | ||
from devine.core.utilities import get_binary_path | ||
from devine.core.utils.click_types import ContextData | ||
from devine.core.utils.collections import merge_dict | ||
|
||
|
||
class search: | ||
@click.group( | ||
short_help="Search for titles from a Service.", | ||
cls=Services, | ||
context_settings=dict( | ||
**context_settings, | ||
token_normalize_func=Services.get_tag | ||
)) | ||
@click.option("-p", "--profile", type=str, default=None, | ||
help="Profile to use for Credentials and Cookies (if available).") | ||
@click.option("--proxy", type=str, default=None, | ||
help="Proxy URI to use. If a 2-letter country is provided, it will try get a proxy from the config.") | ||
@click.option("--no-proxy", is_flag=True, default=False, | ||
help="Force disable all proxy use.") | ||
@click.pass_context | ||
def cli(ctx: click.Context, **kwargs: Any) -> search: | ||
return search(ctx, **kwargs) | ||
|
||
def __init__( | ||
self, | ||
ctx: click.Context, | ||
no_proxy: bool, | ||
profile: Optional[str] = None, | ||
proxy: Optional[str] = None, | ||
*_: Any, | ||
**__: Any | ||
): | ||
if not ctx.invoked_subcommand: | ||
raise ValueError("A subcommand to invoke was not specified, the main code cannot continue.") | ||
|
||
self.log = logging.getLogger("search") | ||
|
||
self.service = Services.get_tag(ctx.invoked_subcommand) | ||
self.profile = profile | ||
|
||
if self.profile: | ||
self.log.info(f"Using profile: '{self.profile}'") | ||
|
||
with console.status("Loading Service Config...", spinner="dots"): | ||
service_config_path = Services.get_path(self.service) / config.filenames.config | ||
if service_config_path.exists(): | ||
self.service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) | ||
self.log.info("Service Config loaded") | ||
else: | ||
self.service_config = {} | ||
merge_dict(config.services.get(self.service), self.service_config) | ||
|
||
self.proxy_providers = [] | ||
if no_proxy: | ||
ctx.params["proxy"] = None | ||
else: | ||
with console.status("Loading Proxy Providers...", spinner="dots"): | ||
if config.proxy_providers.get("basic"): | ||
self.proxy_providers.append(Basic(**config.proxy_providers["basic"])) | ||
if config.proxy_providers.get("nordvpn"): | ||
self.proxy_providers.append(NordVPN(**config.proxy_providers["nordvpn"])) | ||
if get_binary_path("hola-proxy"): | ||
self.proxy_providers.append(Hola()) | ||
for proxy_provider in self.proxy_providers: | ||
self.log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}") | ||
|
||
if proxy: | ||
requested_provider = None | ||
if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE): | ||
# requesting proxy from a specific proxy provider | ||
requested_provider, proxy = proxy.split(":", maxsplit=1) | ||
if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE): | ||
proxy = proxy.lower() | ||
with console.status(f"Getting a Proxy to {proxy}...", spinner="dots"): | ||
if requested_provider: | ||
proxy_provider = next(( | ||
x | ||
for x in self.proxy_providers | ||
if x.__class__.__name__.lower() == requested_provider | ||
), None) | ||
if not proxy_provider: | ||
self.log.error(f"The proxy provider '{requested_provider}' was not recognised.") | ||
sys.exit(1) | ||
proxy_uri = proxy_provider.get_proxy(proxy) | ||
if not proxy_uri: | ||
self.log.error(f"The proxy provider {requested_provider} had no proxy for {proxy}") | ||
sys.exit(1) | ||
proxy = ctx.params["proxy"] = proxy_uri | ||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}") | ||
else: | ||
for proxy_provider in self.proxy_providers: | ||
proxy_uri = proxy_provider.get_proxy(proxy) | ||
if proxy_uri: | ||
proxy = ctx.params["proxy"] = proxy_uri | ||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}") | ||
break | ||
else: | ||
self.log.info(f"Using explicit Proxy: {proxy}") | ||
|
||
ctx.obj = ContextData( | ||
config=self.service_config, | ||
cdm=None, | ||
proxy_providers=self.proxy_providers, | ||
profile=self.profile | ||
) | ||
|
||
# needs to be added this way instead of @cli.result_callback to be | ||
# able to keep `self` as the first positional | ||
self.cli._result_callback = self.result | ||
|
||
def result( | ||
self, | ||
service: Service, | ||
*_: Any, | ||
**__: Any | ||
) -> None: | ||
with console.status("Authenticating with Service...", spinner="dots"): | ||
cookies = dl.get_cookie_jar(self.service, self.profile) | ||
credential = dl.get_credentials(self.service, self.profile) | ||
service.authenticate(cookies, credential) | ||
if cookies or credential: | ||
self.log.info("Authenticated with Service") | ||
|
||
search_results = Tree("Search Results", hide_root=True) | ||
with console.status("Searching...", spinner="dots"): | ||
for result in service.search(): | ||
result_text = f"[bold text]{result.title}[/]" | ||
if result.url: | ||
result_text = f"[link={result.url}]{result_text}[/link]" | ||
if result.label: | ||
result_text += f" [pink]{result.label}[/]" | ||
if result.description: | ||
result_text += f"\n[text2]{result.description}[/]" | ||
result_text += f"\n[bright_black]id: {result.id}[/]" | ||
search_results.add(result_text + "\n") | ||
|
||
# update cookies | ||
cookie_file = dl.get_cookie_path(self.service, self.profile) | ||
if cookie_file: | ||
dl.save_cookies(cookie_file, service.session.cookies) | ||
|
||
console.print(Padding( | ||
Rule(f"[rule.text]{len(search_results.children)} Search Results"), | ||
(1, 2) | ||
)) | ||
|
||
if search_results.children: | ||
console.print(Padding( | ||
search_results, | ||
(0, 5) | ||
)) | ||
else: | ||
console.print(Padding( | ||
"[bold text]No matches[/]\n[bright_black]Please check spelling and search again....[/]", | ||
(0, 5) | ||
)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from typing import Optional, Union | ||
|
||
|
||
class SearchResult: | ||
def __init__( | ||
self, | ||
id_: Union[str, int], | ||
title: str, | ||
description: Optional[str] = None, | ||
label: Optional[str] = None, | ||
url: Optional[str] = None | ||
): | ||
""" | ||
A Search Result for any support Title Type. | ||
Parameters: | ||
id_: The search result's Title ID. | ||
title: The primary display text, e.g., the Title's Name. | ||
description: The secondary display text, e.g., the Title's Description or | ||
further title information. | ||
label: The tertiary display text. This will typically be used to display | ||
an informative label or tag to the result. E.g., "unavailable", the | ||
title's price tag, region, etc. | ||
url: A hyperlink to the search result or title's page. | ||
""" | ||
if not isinstance(id_, (str, int)): | ||
raise TypeError(f"Expected id_ to be a {str} or {int}, not {type(id_)}") | ||
if not isinstance(title, str): | ||
raise TypeError(f"Expected title to be a {str}, not {type(title)}") | ||
if not isinstance(description, (str, type(None))): | ||
raise TypeError(f"Expected description to be a {str}, not {type(description)}") | ||
if not isinstance(label, (str, type(None))): | ||
raise TypeError(f"Expected label to be a {str}, not {type(label)}") | ||
if not isinstance(url, (str, type(None))): | ||
raise TypeError(f"Expected url to be a {str}, not {type(url)}") | ||
|
||
self.id = id_ | ||
self.title = title | ||
self.description = description | ||
self.label = label | ||
self.url = url | ||
|
||
|
||
__all__ = ("SearchResult",) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters