diff --git a/burgerbot.py b/burgerbot.py index 9083cae..2f1bfcc 100644 --- a/burgerbot.py +++ b/burgerbot.py @@ -1,14 +1,14 @@ #!/usr/bin/env python -import time -import os import json -import threading import logging +import os import sys +import threading +import time from dataclasses import dataclass, asdict -from typing import List -from datetime import datetime +from datetime import datetime +from typing import List, Any from telegram import ParseMode from telegram.ext import CommandHandler, Updater @@ -17,9 +17,9 @@ from parser import Parser, Slot, build_url - CHATS_FILE = 'chats.json' -ua_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=1&dienstleister=330857&anliegen[]=330869&herkunft=1' +ua_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=1&dienstleister=330857&anliegen[' \ + ']=330869&herkunft=1 ' register_prefix = 'https://service.berlin.de' service_map = { @@ -37,88 +37,92 @@ 120914: 'Zulassung eines Fahrzeuges mit auswärtigem Kennzeichen mit Halterwechsel', } + @dataclass class Message: - message: str - ts: int # timestamp of adding msg to cache in seconds + message: str + ts: int # timestamp of adding msg to cache in seconds @dataclass class User: - chat_id: int - services: List[int] - def __init__(self, chat_id, services=[120686]): - self.chat_id = chat_id - self.services = services if len(services) > 0 else [120686] + chat_id: int + services: List[int] + def __init__(self, chat_id, services=(120686,)): + self.chat_id = chat_id + self.services = services if len(services) > 0 else [120686] - def marshall_user(self) -> str: - self.services = list(set([s for s in self.services if s in list(service_map.keys())])) - return asdict(self) + def marshall_user(self) -> dict[str, Any]: + self.services = list(set([s for s in self.services if s in list(service_map.keys())])) + return asdict(self) class Bot: - def __init__(self) -> None: - self.updater = Updater(os.environ["TELEGRAM_API_KEY"]) - self.__init_chats() - self.users = self.__get_chats() - self.services = self.__get_uq_services() - self.parser = Parser(self.services) - self.dispatcher = self.updater.dispatcher - self.dispatcher.add_handler(CommandHandler('help', self.__help)) - self.dispatcher.add_handler(CommandHandler('start', self.__start)) - self.dispatcher.add_handler(CommandHandler('stop', self.__stop)) - self.dispatcher.add_handler(CommandHandler('add_service', self.__add_service)) - self.dispatcher.add_handler(CommandHandler('remove_service', self.__remove_service)) - self.dispatcher.add_handler(CommandHandler('my_services', self.__my_services)) - self.dispatcher.add_handler(CommandHandler('services', self.__services)) - self.cache: List[Message] = [] - - - def __get_uq_services(self) -> List[int]: - services = [] - for u in self.users: - services.extend(u.services) - services = filter(lambda x: x in service_map.keys(), services) - return list(set(services)) - - def __init_chats(self) -> None: - if not os.path.exists(CHATS_FILE): - with open(CHATS_FILE, "w") as f: - f.write("[]") - - def __get_chats(self) -> List[User]: - with open(CHATS_FILE, 'r') as f: - users = [User(u['chat_id'], u['services']) for u in json.load(f)] - f.close() - print(users) - return users - - def __persist_chats(self) -> None: - with open(CHATS_FILE, 'w') as f: - json.dump([u.marshall_user() for u in self.users], f) - f.close() - - def __add_chat(self, chat_id: int) -> None: - if chat_id not in [u.chat_id for u in self.users]: - logging.info('adding new user') - self.users.append(User(chat_id)) - self.__persist_chats() - - def __remove_chat(self, chat_id: int) -> None: - logging.info('removing the chat ' + str(chat_id)) - self.users = [u for u in self.users if u.chat_id != chat_id] - self.__persist_chats() - - def __services(self, update: Update, _: CallbackContext) -> None: - services_text = "" - for k, v in service_map.items(): - services_text += f"{k} - {v}\n" - update.message.reply_text("Available services:\n" + services_text) - - def __help(self, update: Update, _: CallbackContext) -> None: - try: - update.message.reply_text(""" + def __init__(self) -> None: + self.updater = Updater(os.environ["TELEGRAM_API_KEY"]) + self.__init_chats() + self.users = self.__get_chats() + self.services = self.__get_uq_services() + self.parser = Parser(self.services) + self.dispatcher = self.updater.dispatcher + self.dispatcher.add_handler(CommandHandler('help', self.__help)) + self.dispatcher.add_handler(CommandHandler('start', self.__start)) + self.dispatcher.add_handler(CommandHandler('stop', self.__stop)) + self.dispatcher.add_handler(CommandHandler('add_service', self.__add_service)) + self.dispatcher.add_handler(CommandHandler('remove_service', self.__remove_service)) + self.dispatcher.add_handler(CommandHandler('my_services', self.__my_services)) + self.dispatcher.add_handler(CommandHandler('services', self.__services)) + self.cache: List[Message] = [] + + def __get_uq_services(self) -> List[int]: + services = [] + for u in self.users: + services.extend(u.services) + services = filter(lambda x: x in service_map.keys(), services) + return list(set(services)) + + @staticmethod + def __init_chats() -> None: + if not os.path.exists(CHATS_FILE): + with open(CHATS_FILE, "w") as f: + f.write("[]") + + @staticmethod + def __get_chats() -> List[User]: + with open(CHATS_FILE, 'r') as f: + users = [User(u['chat_id'], u['services']) for u in json.load(f)] + f.close() + print(users) + return users + + def __persist_chats(self) -> None: + with open(CHATS_FILE, 'w') as f: + json.dump([u.marshall_user() for u in self.users], f) + f.close() + + def __add_chat(self, chat_id: int) -> None: + if chat_id not in [u.chat_id for u in self.users]: + logging.info('adding new user') + self.users.append(User(chat_id)) + self.__persist_chats() + + def __remove_chat(self, chat_id: int) -> None: + logging.info('removing the chat ' + str(chat_id)) + self.users = [u for u in self.users if u.chat_id != chat_id] + self.__persist_chats() + + @staticmethod + def __services(update: Update, _: CallbackContext) -> None: + services_text = "" + for k, v in service_map.items(): + services_text += f"{k} - {v}\n" + update.message.reply_text("Available services:\n" + services_text) + + @staticmethod + def __help(update: Update, _: CallbackContext) -> None: + try: + update.message.reply_text(""" /start - start the bot /stop - stop the bot /add_service - add service to your list @@ -126,132 +130,135 @@ def __help(self, update: Update, _: CallbackContext) -> None: /my_services - view services on your list /services - list of available services """) - except Exception as e: - logging.error(e) - - def __start(self, update: Update, _: CallbackContext) -> None: - self.__add_chat(update.message.chat_id) - logging.info(f'got new user with id {update.message.chat_id}') - update.message.reply_text('Welcome to BurgerBot. When there will be slot - you will receive notification. To get information about usage - type /help. To stop it - just type /stop') - - def __stop(self, update: Update, _: CallbackContext) -> None: - self.__remove_chat(update.message.chat_id) - update.message.reply_text('Thanks for using me! Bye!') - - def __my_services(self, update: Update, _: CallbackContext) -> None: - try: - service_ids = set( - service_id for u in self.users - for service_id in u.services - if u.chat_id == update.message.chat_id - ) - msg = "\n".join([f" - {service_id}" for service_id in service_ids]) or " - (none)" - update.message.reply_text("The following services are on your list:\n" + msg) - except Exception as e: - logging.error(e) - - def __add_service(self, update: Update, _: CallbackContext) -> None: - logging.info(f'adding service {update.message}') - try: - service_id = int(update.message.text.split(' ')[1]) - for u in self.users: - if u.chat_id == update.message.chat_id: - u.services.append(int(service_id)) - self.__persist_chats() - break - update.message.reply_text("Service added") - except Exception as e: - update.message.reply_text("Failed to add service, have you specified the service id?") - logging.error(e) - - def __remove_service(self, update: Update, _: CallbackContext) -> None: - logging.info(f'removing service {update.message}') - try: - service_id = int(update.message.text.split(' ')[1]) - for u in self.users: - if u.chat_id == update.message.chat_id: - u.services.remove(int(service_id)) - self.__persist_chats() - break - update.message.reply_text("Service removed") - except IndexError: - update.message.reply_text("Wrong usage. Please type '/remove_service 123456'") - - def __poll(self) -> None: - try: - self.updater.start_polling() - except Exception as e: - logging.warn(e) - logging.warn("got error during polling, retying") - return self.__poll() - - def __parse(self) -> None: - while True: - slots = self.parser.parse() - for slot in slots: - self.__send_message(slot) - time.sleep(30) - - - def __send_message(self, slot: Slot) -> None: - if self.__msg_in_cache(slot.msg): - logging.info('Notification is cached already. Do not repeat sending') - return - self.__add_msg_to_cache(slot.msg) - md_msg = f"There are slots on {self.__date_from_msg(slot.msg)} available for booking for {service_map[slot.service_id]}, click [here]({build_url(slot.service_id)}) to check it out" - users = [u for u in self.users if slot.service_id in u.services] - for u in users: - logging.debug(f"sending msg to {str(u.chat_id)}") - try: - self.updater.bot.send_message(chat_id=u.chat_id, text=md_msg, parse_mode=ParseMode.MARKDOWN_V2) - except Exception as e: - if 'bot was blocked by the user' in e.__str__() or 'user is deactivated' in e.__str__(): - logging.info('removing since user blocked bot or user was deactivated') - self.__remove_chat(u.chat_id) - else: - logging.warning(e) - self.__clear_cache() - - def __msg_in_cache(self, msg: str) -> bool: - for m in self.cache: - if m.message == msg: - return True - return False - - def __add_msg_to_cache(self, msg: str) -> None: - self.cache.append(Message(msg, int(time.time()))) - - def __clear_cache(self) -> None: - cur_ts = int(time.time()) - if len(self.cache) > 0: - logging.info('clearing some messages from cache') - self.cache = [m for m in self.cache if (cur_ts - m.ts) < 300] - - def __date_from_msg(self, msg: str) -> str: - msg_arr = msg.split('/') - logging.info(msg) - ts = int(msg_arr[len(msg_arr) - 2]) + 7200 # adding two hours to match Berlin TZ with UTC - return datetime.fromtimestamp(ts).strftime("%d %B") - - - def start(self) -> None: - logging.info('starting bot') - poll_task = threading.Thread(target=self.__poll) - parse_task= threading.Thread(target=self.__parse) - parse_task.start() - poll_task.start() - parse_task.join() - poll_task.join() - + except Exception as e: + logging.error(e) + + def __start(self, update: Update, _: CallbackContext) -> None: + self.__add_chat(update.message.chat_id) + logging.info(f'got new user with id {update.message.chat_id}') + update.message.reply_text( + 'Welcome to BurgerBot. When there will be slot - you will receive notification. To get information about ' + 'usage - type /help. To stop it - just type /stop') + + def __stop(self, update: Update, _: CallbackContext) -> None: + self.__remove_chat(update.message.chat_id) + update.message.reply_text('Thanks for using me! Bye!') + + def __my_services(self, update: Update, _: CallbackContext) -> None: + try: + service_ids = set( + service_id for u in self.users + for service_id in u.services + if u.chat_id == update.message.chat_id + ) + msg = "\n".join([f" - {service_id}" for service_id in service_ids]) or " - (none)" + update.message.reply_text("The following services are on your list:\n" + msg) + except Exception as e: + logging.error(e) + + def __add_service(self, update: Update, _: CallbackContext) -> None: + logging.info(f'adding service {update.message}') + try: + service_id = int(update.message.text.split(' ')[1]) + for u in self.users: + if u.chat_id == update.message.chat_id: + u.services.append(int(service_id)) + self.__persist_chats() + break + update.message.reply_text("Service added") + except Exception as e: + update.message.reply_text("Failed to add service, have you specified the service id?") + logging.error(e) + + def __remove_service(self, update: Update, _: CallbackContext) -> None: + logging.info(f'removing service {update.message}') + try: + service_id = int(update.message.text.split(' ')[1]) + for u in self.users: + if u.chat_id == update.message.chat_id: + u.services.remove(int(service_id)) + self.__persist_chats() + break + update.message.reply_text("Service removed") + except IndexError: + update.message.reply_text("Wrong usage. Please type '/remove_service 123456'") + + def __poll(self) -> None: + try: + self.updater.start_polling() + except Exception as e: + logging.warning(e) + logging.warning("got error during polling, retying") + return self.__poll() + + def __parse(self) -> None: + while True: + slots = self.parser.parse() + for slot in slots: + self.__send_message(slot) + time.sleep(30) + + def __send_message(self, slot: Slot) -> None: + if self.__msg_in_cache(slot.msg): + logging.info('Notification is cached already. Do not repeat sending') + return + self.__add_msg_to_cache(slot.msg) + md_msg = f"There are slots on {self.__date_from_msg(slot.msg)} available for booking " \ + f"for {service_map[slot.service_id]}, click [here]({build_url(slot.service_id)}) to check it out " + users = [u for u in self.users if slot.service_id in u.services] + for u in users: + logging.debug(f"sending msg to {str(u.chat_id)}") + try: + self.updater.bot.send_message(chat_id=u.chat_id, text=md_msg, parse_mode=ParseMode.MARKDOWN_V2) + except Exception as e: + if 'bot was blocked by the user' in e.__str__() or 'user is deactivated' in e.__str__(): + logging.info('removing since user blocked bot or user was deactivated') + self.__remove_chat(u.chat_id) + else: + logging.warning(e) + self.__clear_cache() + + def __msg_in_cache(self, msg: str) -> bool: + for m in self.cache: + if m.message == msg: + return True + return False + + def __add_msg_to_cache(self, msg: str) -> None: + self.cache.append(Message(msg, int(time.time()))) + + def __clear_cache(self) -> None: + cur_ts = int(time.time()) + if len(self.cache) > 0: + logging.info('clearing some messages from cache') + self.cache = [m for m in self.cache if (cur_ts - m.ts) < 300] + + @staticmethod + def __date_from_msg(msg: str) -> str: + msg_arr = msg.split('/') + logging.info(msg) + ts = int(msg_arr[len(msg_arr) - 2]) + 7200 # adding two hours to match Berlin TZ with UTC + return datetime.fromtimestamp(ts).strftime("%d %B") + + def start(self) -> None: + logging.info('starting bot') + poll_task = threading.Thread(target=self.__poll) + parse_task = threading.Thread(target=self.__parse) + parse_task.start() + poll_task.start() + parse_task.join() + poll_task.join() + def main() -> None: - bot = Bot() - bot.start() + bot = Bot() + bot.start() + if __name__ == '__main__': - log_level = os.getenv('LOG_LEVEL', 'INFO') - logging.basicConfig( - level=log_level, - format="%(asctime)s [%(levelname)-5.5s] %(message)s", - handlers=[logging.StreamHandler(sys.stdout)],) - main() + log_level = os.getenv('LOG_LEVEL', 'INFO') + logging.basicConfig( + level=log_level, + format="%(asctime)s [%(levelname)-5.5s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], ) + main() diff --git a/parser.py b/parser.py index de270bf..0901045 100644 --- a/parser.py +++ b/parser.py @@ -1,74 +1,83 @@ -import time import logging +import time from dataclasses import dataclass from typing import List -from re import S import requests from bs4 import BeautifulSoup -default_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=0&anliegen[]={}&dienstleisterlist=122210,122217,327316,122219,327312,122227,327314,122231,327346,122243,327348,122252,329742,122260,329745,122262,329748,122254,329751,122271,327278,122273,327274,122277,327276,330436,122280,327294,122282,327290,122284,327292,327539,122291,327270,122285,327266,122286,327264,122296,327268,150230,329760,122301,327282,122297,327286,122294,327284,122312,329763,122314,329775,122304,327330,122311,327334,122309,327332,122281,327352,122279,329772,122276,327324,122274,327326,122267,329766,122246,327318,122251,327320,122257,327322,122208,327298,122226,327300,121362,121364&herkunft=http%3A%2F%2Fservice.berlin.de%2Fdienstleistung%2F120686%2F' +default_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=0&anliegen[]={' \ + '}&dienstleisterlist=122210,122217,327316,122219,327312,122227,327314,122231,327346,122243,327348,' \ + '122252,329742,122260,329745,122262,329748,122254,329751,122271,327278,122273,327274,122277,327276,' \ + '330436,122280,327294,122282,327290,122284,327292,327539,122291,327270,122285,327266,122286,327264,' \ + '122296,327268,150230,329760,122301,327282,122297,327286,122294,327284,122312,329763,122314,329775,' \ + '122304,327330,122311,327334,122309,327332,122281,327352,122279,329772,122276,327324,122274,327326,' \ + '122267,329766,122246,327318,122251,327320,122257,327322,122208,327298,122226,327300,121362,' \ + '121364&herkunft=http%3A%2F%2Fservice.berlin.de%2Fdienstleistung%2F120686%2F ' + +naturalization_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=1&dienstleister=324261' \ + '&anliegen[]=318998&herkunft=1 ' -naturalization_url = 'https://service.berlin.de/terminvereinbarung/termin/tag.php?termin=1&dienstleister=324261&anliegen[]=318998&herkunft=1' def build_url(id: int) -> str: - if (id == 318998): - return naturalization_url.format(id) - return default_url.format(id) + if id == 318998: + return naturalization_url.format(id) + return default_url.format(id) + @dataclass class Slot: - msg: str - service_id: int + msg: str + service_id: int class Parser: - def __init__(self, services: List[int]) -> None: - self.services = services - self.proxy_on: bool = False - self.parse() + def __init__(self, services: List[int]) -> None: + self.services = services + self.proxy_on: bool = False + self.parse() - def __get_url(self, url) -> requests.Response: - logging.debug(url) - try: - if self.proxy_on: - return requests.get(url, proxies={'https': 'socks5://127.0.0.1:9050'}) - return requests.get(url) - except Exception as err: - logging.warn('received an error from the server, waiting for 1 minute before retry') - logging.warn(err) - time.sleep(60) - return self.__get_url(url) + def __get_url(self, url) -> requests.Response: + logging.debug(url) + try: + if self.proxy_on: + return requests.get(url, proxies={'https': 'socks5://127.0.0.1:9050'}) + return requests.get(url) + except Exception as err: + logging.warning('received an error from the server, waiting for 1 minute before retry') + logging.warning(err) + time.sleep(60) + return self.__get_url(url) - def __toggle_proxy(self) -> None: - self.proxy_on = not self.proxy_on + def __toggle_proxy(self) -> None: + self.proxy_on = not self.proxy_on - def __parse_page(self, page, service_id) -> List[str]: - try: - if page.status_code == 428: - logging.info('exceeded rate limit. Sleeping for a while') - time.sleep(299) - self.__toggle_proxy() - return None - soup = BeautifulSoup(page.content, 'html.parser') - slots = soup.find_all('td', class_='buchbar') - is_valid = soup.find_all('td', class_='nichtbuchbar') - if len(is_valid) > 0: - logging.info('page is valid') - if len(slots) == 0: - logging.info("no luck yet") - return [Slot(slot.a['href'], service_id) for slot in slots] - except Exception as e: ## sometimes shit happens - logging.warn(e) - self.__toggle_proxy() + def __parse_page(self, page, service_id) -> list[Slot] | None: + try: + if page.status_code == 428: + logging.info('exceeded rate limit. Sleeping for a while') + time.sleep(299) + self.__toggle_proxy() + return None + soup = BeautifulSoup(page.content, 'html.parser') + slots = soup.find_all('td', class_='buchbar') + is_valid = soup.find_all('td', class_='nichtbuchbar') + if len(is_valid) > 0: + logging.info('page is valid') + if len(slots) == 0: + logging.info("no luck yet") + return [Slot(slot.a['href'], service_id) for slot in slots] + except Exception as e: # sometimes shit happens + logging.warning(e) + self.__toggle_proxy() - def add_service(self, service_id: int) -> None: - self.services.append(service_id) + def add_service(self, service_id: int) -> None: + self.services.append(service_id) - def parse(self) -> List[str]: - slots = [] - logging.info('services are: ' + str(self.services)) - for svc in self.services: - page = self.__get_url(build_url(svc)) - slots += self.__parse_page(page, svc) - return slots + def parse(self) -> List[Slot]: + slots = [] + logging.info('services are: ' + str(self.services)) + for svc in self.services: + page = self.__get_url(build_url(svc)) + slots += self.__parse_page(page, svc) + return slots