Skip to content

Commit

Permalink
Merge pull request #17 from crestalnetwork/fix/skip-new-bot-err
Browse files Browse the repository at this point in the history
Feat / Telegram On the fly Agent Token Change
  • Loading branch information
hyacinthus authored Jan 2, 2025
2 parents 922d5fd + 744d677 commit 2a735eb
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 44 deletions.
30 changes: 21 additions & 9 deletions app/entrypoints/tg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from sqlmodel import Session, select

from app.config.config import config
from app.models.db import get_engine, init_db
from app.models.agent import Agent
from app.models.db import get_engine, init_db
from tg.bot import pool
from tg.bot.pool import BotPool

Expand All @@ -18,7 +18,7 @@ class AgentScheduler:
def __init__(self, bot_pool):
self.bot_pool = bot_pool

def check_new_bots(self):
def sync_bots(self):
with Session(get_engine()) as db:
# Get all telegram agents
agents = db.exec(
Expand All @@ -28,24 +28,36 @@ def check_new_bots(self):
).all()

new_bots = []
changed_token_bots = []
for agent in agents:
if agent.telegram_config["token"] not in pool._bots:
agent.telegram_config["agent_id"] = agent.id
new_bots.append(agent.telegram_config)
token = agent.telegram_config["token"]
cfg = agent.telegram_config
cfg["agent_id"] = agent.id

if agent.id not in pool._agent_bots:
new_bots.append(cfg)
logger.info("New agent with id {id} found...".format(id=agent.id))
elif token not in pool._bots:
changed_token_bots.append(cfg)

return new_bots
return new_bots, changed_token_bots

async def start(self, interval):
logger.info("New agent addition tracking started...")
while True:
logger.info("check for new bots...")
logger.info("sync bots...")
await asyncio.sleep(interval)
if self.check_new_bots() is not None:
for new_bot in self.check_new_bots():
new_bots, changed_token_bots = self.sync_bots()
if new_bots is not None:
for new_bot in new_bots:
await self.bot_pool.init_new_bot(
new_bot["agent_id"], new_bot["kind"], new_bot["token"]
)
if changed_token_bots is not None:
for changed_bot in changed_token_bots:
await self.bot_pool.change_bot_token(
changed_bot["agent_id"], changed_bot["token"]
)


def run_telegram_server() -> None:
Expand Down
2 changes: 1 addition & 1 deletion tg/bot/filter/content_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from aiogram.filters import BaseFilter
from aiogram.types import Message, ContentType
from aiogram.types import ContentType, Message


class ContentTypeFilter(BaseFilter):
Expand Down
110 changes: 88 additions & 22 deletions tg/bot/kind/ai_relayer/router.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import inspect
import logging

from aiogram import Router
from aiogram.filters import CommandStart
from aiogram.fsm.context import FSMContext
Expand All @@ -9,6 +12,17 @@
from tg.bot.filter.chat_type import GroupOnlyFilter
from tg.bot.filter.content_type import TextOnlyFilter

logger = logging.getLogger(__name__)


def cur_func_name():
return inspect.stack()[1][3]


def cur_mod_name():
return inspect.getmodule(inspect.stack()[1][0]).__name__


general_router = Router()


Expand All @@ -26,10 +40,19 @@ async def gp_command_start(message: Message):
if message.from_user.is_bot:
return

group_title = message.from_user.first_name
await message.answer(
text=f"🤖 Hi Everybody, {group_title}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
try:
group_title = message.from_user.first_name
await message.answer(
text=f"🤖 Hi Everybody, {group_title}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
except Exception:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name,
agent_id=pool.bot_by_token(message.bot.token).get("agent_id"),
token=message.bot.token,
)
)


@general_router.message(GroupOnlyFilter(), TextOnlyFilter())
Expand All @@ -42,13 +65,31 @@ async def gp_process_message(message: Message) -> None:
message.reply_to_message
and message.reply_to_message.from_user.id == message.bot.id
) or bot.username in message.text:
agent_id = pool.bot_by_token(message.bot.token)["agent_id"]
thread_id = pool.agent_thread_id(agent_id, message.chat.id)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
cached_bot = pool.bot_by_token(message.bot.token)
if cached_bot is None:
logger.warning(
"bot with token {token} not found in cache.".format(
token=message.bot.token
)
)
return

try:
agent_id = cached_bot["agent_id"]
thread_id = pool.agent_thread_id(agent_id, message.chat.id)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
except Exception:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=cached_bot.get("agent_id"),
token=message.bot.token,
)
)


## direct commands and messages
Expand All @@ -59,10 +100,19 @@ async def command_start(message: Message, state: FSMContext) -> None:
if message.from_user.is_bot:
return

first_name = message.from_user.first_name
await message.answer(
text=f"🤖 Hi, {first_name}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
try:
first_name = message.from_user.first_name
await message.answer(
text=f"🤖 Hi, {first_name}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
except Exception:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=pool.bot_by_token(message.bot.token).get("agent_id"),
token=message.bot.token,
)
)


@general_router.message(
Expand All @@ -72,10 +122,26 @@ async def process_message(message: Message, state: FSMContext) -> None:
if message.from_user.is_bot:
return

agent_id = pool.bot_by_token(message.bot.token)["agent_id"]
thread_id = pool.agent_thread_id(agent_id, message.chat.id)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
cached_bot = pool.bot_by_token(message.bot.token)
if cached_bot is None:
logger.warning(
"bot with token {token} not found in cache.".format(token=message.bot.token)
)
return

try:
agent_id = cached_bot["agent_id"]
thread_id = pool.agent_thread_id(agent_id, message.chat.id)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
except Exception:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=cached_bot.get("agent_id"),
token=message.bot.token,
)
)
70 changes: 58 additions & 12 deletions tg/bot/pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import aiohttp
from aiogram import Bot, Dispatcher
from aiogram.client.bot import DefaultBotProperties
from aiogram.enums import ParseMode
Expand All @@ -26,11 +27,11 @@


def bot_by_token(token):
return _bots[token]
return _bots.get(token)


def bot_by_agent_id(agent_id):
return _agent_bots[agent_id]
return _agent_bots.get(agent_id)


def agent_thread_id(agent_id, chat_id):
Expand Down Expand Up @@ -88,16 +89,61 @@ def init_all_dispatchers(self):
logger.info("{kind} router initialized...".format(kind=kind))

async def init_new_bot(self, agent_id, kind, token):
bot = Bot(
token=token,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await bot.delete_webhook(drop_pending_updates=True)
await bot.set_webhook(self.base_url.format(kind=kind, bot_token=token))

_bots[token] = {"agent_id": agent_id, "bot": bot}
_agent_bots[agent_id] = {"token": token, "bot": bot}
logger.info("Bot with token {token} initialized...".format(token=token))
try:
bot = Bot(
token=token,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await bot.delete_webhook(drop_pending_updates=True)
await bot.set_webhook(self.base_url.format(kind=kind, bot_token=token))

_bots[token] = {"agent_id": agent_id, "kind": kind, "bot": bot}
_agent_bots[agent_id] = {"token": token, "kind": kind, "bot": bot}
logger.info("Bot with token {token} initialized...".format(token=token))

except Exception:
logger.error(
"failed to init new bot for agent {agent_id}.".format(agent_id=agent_id)
)

async def change_bot_token(self, agent_id, new_token):
try:
old_cached_bot = bot_by_agent_id(agent_id)
kind = old_cached_bot["kind"]

old_bot = Bot(
token=old_cached_bot["token"],
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await old_bot.session.close()
await old_bot.delete_webhook(drop_pending_updates=True)

new_bot = Bot(
token=new_token,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await new_bot.set_webhook(
self.base_url.format(kind=kind, bot_token=new_token)
)

del _bots[old_cached_bot["token"]]
_bots[new_token] = {"agent_id": agent_id, "kind": kind, "bot": new_bot}
_agent_bots[agent_id] = {"token": new_token, "kind": kind, "bot": new_bot}
logger.info(
"bot for agent {agent_id} with token {token} changed to {new_token}...".format(
agent_id=agent_id,
token=old_cached_bot["token"],
new_token=new_token,
),
)
except aiohttp.ClientError:
pass
except Exception:
logger.error(
"failed to change bot token for agent {agent_id}.".format(
agent_id=agent_id
)
)

def start(self, asyncio_loop, host, port):
web.run_app(self.app, loop=asyncio_loop, host=host, port=port)

0 comments on commit 2a735eb

Please sign in to comment.