-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlauncher.py
134 lines (109 loc) · 5.1 KB
/
launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
import asyncio
from random import randint
from bot.config.config import settings
from bot.core.tapper import Tapper
from bot.utils.device_manager import DeviceManager
from bot.utils.logger import logger
from bot.utils.proxy_chain import ProxyChain
from bot.utils.registrator import Registrator
from bot.utils.first_run import FirstRun
from bot.core.accounting import Accounts
art_splash = '''
// ) ) / / // | | /| / / // / / // ) )
//___/ / / / //__| | //| / / //____ ((
/ ____ / / / / ___ | // | / / / ____ \\
// / / // | | // | / / // ) )
// / /____/ / // | | // |/ / //____/ / ((___ / /
'''
author = "by WubbaLubbaDubDubDev"
menu_items = """
Select an action:
1. Run bot
2. Create session
3. Quit
"""
async def process() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--action", type=int, help="Action to perform")
parser.add_argument("-d", "--docker", action="store_true", help="Run in Docker mode")
args = parser.parse_args()
proxy_chain = None
if settings.USE_PROXY or settings.USE_PROXY_WITHOUT_BINDINGS:
proxy_chain = ProxyChain(sessions_workdir=settings.SESSIONS_DIR,
proxies_file=settings.PROXIES_FILE)
device_manager = DeviceManager(workdir=settings.SESSIONS_DIR)
registrator = Registrator(workdir=settings.SESSIONS_DIR,
device_manager=device_manager,
api_id=settings.API_ID,
api_hash=settings.API_HASH,
skip_proxy_binding=settings.SKIP_PROXY_BINDING)
action = args.action
if not action:
print('\033[97m' + '\033[97m' + art_splash + '\033[97m')
print('\033[1m' + '\033[93m' + author + '\033[0m')
while True:
if not args.docker:
print(menu_items)
while True:
action = input("> ")
if not action.isdigit():
logger.warning("Action must be number")
elif action not in ["1", "2", "3"]:
logger.warning("Action must be 1, 2 or 3")
else:
action = int(action)
break
else:
print("\nRunning in Docker mode!\n")
action = 1
if action == 1:
await run(registrator=registrator,
device_manager=device_manager,
proxy_chain=proxy_chain,
docker=args.docker)
elif action == 2:
await register_session(registrator=registrator,
proxy_chain=proxy_chain)
while True:
repeat = input("Do you want to register another session? (y/n): ").strip().lower()
if repeat == 'y':
break
elif repeat == 'n':
return
elif repeat not in ['y', 'n']:
print("Invalid choice. Please enter 'y' or 'n'.")
elif action == 3:
print("Goodbye!")
return
async def run(registrator: Registrator,
device_manager: DeviceManager,
proxy_chain: ProxyChain,
docker: bool):
accounts_mngr = Accounts(workdir=settings.SESSIONS_DIR,
registrator=registrator,
device_manager=device_manager,
accept_bindings_creation=True if docker else settings.ALWAYS_ACCEPT_BINDINGS_CREATION,
accept_device_creation=True if docker else settings.ALWAYS_ACCEPT_DEVICE_CREATION,
use_proxy=settings.USE_PROXY or settings.USE_PROXY_WITHOUT_BINDINGS)
tasks = []
proxy_chain_without_bindings = ProxyChain(proxies_file=settings.PROXIES_FILE,
sessions_workdir=settings.SESSIONS_DIR,
load_proxies_from_json=False)
delay_between_account = 0
for account in await accounts_mngr.get_accounts(proxy_chain=proxy_chain):
if settings.USE_PROXY_WITHOUT_BINDINGS:
account.proxy = proxy_chain_without_bindings.get_next_proxy()
first_run = FirstRun(sessions_dir=settings.SESSIONS_STATE_DIR).check_session(account.session_name)
tapper = Tapper(account=account,
registrator=registrator,
first_run=first_run)
tasks.append(tapper.run(start_delay=delay_between_account))
delay_between_account = randint(settings.START_DELAY[0], settings.START_DELAY[1])
await asyncio.gather(*tasks)
async def register_session(registrator: Registrator,
proxy_chain: ProxyChain = None):
if settings.AUTO_BIND_PROXIES:
await registrator.register_session(proxy_chain=proxy_chain)
else:
await registrator.register_session()