Skip to content

Commit

Permalink
Merge pull request #19 from crestalnetwork/feat/telegram-bot-config-u…
Browse files Browse the repository at this point in the history
…pdate

feat: telegram bot config change handle
  • Loading branch information
hyacinthus authored Jan 3, 2025
2 parents d81726e + 1d443aa commit ee9511c
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 113 deletions.
1 change: 0 additions & 1 deletion app/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __init__(self):
self.tg_server_host = self.load("TG_SERVER_HOST", "127.0.0.1")
self.tg_server_port = self.load("TG_SERVER_PORT", "8081")
self.tg_new_agent_poll_interval = self.load("TG_NEW_AGENT_POLL_INTERVAL", "60")
self.tg_group_memory_public = self.load("TG_GROUP_MEMORY_PUBLIC", "true")
# Twitter
self.twitter_entrypoint_interval = int(
self.load("TWITTER_ENTRYPOINT_INTERVAL", "15")
Expand Down
50 changes: 21 additions & 29 deletions app/entrypoints/tg.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,38 @@ class AgentScheduler:
def __init__(self, bot_pool):
self.bot_pool = bot_pool

def sync_bots(self):
async def sync(self):
with Session(get_engine()) as db:
# Get all telegram agents
agents = db.exec(
select(Agent).where(
Agent.telegram_enabled,
)
).all()

new_bots = []
changed_token_bots = []
agents = db.exec(select(Agent)).all()

new_agents = []
token_changed_agents = []
modified_agents = []
for agent in agents:
token = agent.telegram_config["token"]
cfg = agent.telegram_config
cfg["agent_id"] = agent.id

if agent.id not in pool._agent_bots:
new_bots.append(cfg)
logger.info("New agent with id {id} found...".format(id=agent.id))
elif token not in pool._bots:
changed_token_bots.append(cfg)

return new_bots, changed_token_bots
if agent.telegram_enabled:
new_agents.append(agent)
logger.info(f"New agent with id {agent.id} found...")
await self.bot_pool.init_new_bot(agent)
else:
cached_agent = pool._agent_bots[agent.id]
if cached_agent["last_modified"] != agent.updated_at:
if token not in pool._bots:
await self.bot_pool.change_bot_token(agent)
else:
await self.bot_pool.modify_config(agent)

return new_agents, token_changed_agents, modified_agents

async def start(self, interval):
logger.info("New agent addition tracking started...")
while True:
logger.info("sync bots...")
logger.info("sync agents...")
await asyncio.sleep(interval)
new_bots, changed_token_bots = self.sync_bots()
if new_bots is not None:
for new_bot in new_bots:
await self.bot_pool.init_new_bot(
new_bot["agent_id"], new_bot["kind"], new_bot["token"]
)
if changed_token_bots is not None:
for changed_bot in changed_token_bots:
await self.bot_pool.change_bot_token(
changed_bot["agent_id"], changed_bot["token"]
)
await self.sync()


def run_telegram_server() -> None:
Expand Down
1 change: 1 addition & 0 deletions app/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Agent(SQLModel, table=True):
skill_sets: Optional[Dict[str, Dict[str, Any]]] = Field(
sa_column=Column(JSONB, nullable=True)
)
# auto timestamp
created_at: datetime | None = Field(
default_factory=lambda: datetime.now(timezone.utc),
sa_type=DateTime(timezone=True),
Expand Down
18 changes: 18 additions & 0 deletions tg/bot/filter/id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from aiogram.filters import BaseFilter
from aiogram.types import Message

from tg.bot import pool


class WhitelistedChatIDsFilter(BaseFilter):
def __init__(self):
pass

async def __call__(self, message: Message) -> bool:
whitelist = pool.bot_by_token(message.bot.token)["cfg"].get(
"whitelist_chat_ids"
)
if whitelist is not None and len(whitelist) > 0:
return str(message.chat.id) in whitelist

return True
10 changes: 10 additions & 0 deletions tg/bot/filter/no_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from aiogram.filters import BaseFilter
from aiogram.types import Message


class NoBotFilter(BaseFilter):
def __init__(self):
pass

async def __call__(self, message: Message) -> bool:
return not message.from_user.is_bot
103 changes: 44 additions & 59 deletions tg/bot/kind/ai_relayer/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import logging

from aiogram import Router
from aiogram.filters import CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.filters import Command, CommandStart
from aiogram.types import Message

from app.core.ai import execute_agent
from tg.bot import pool
from tg.bot.filter.chat_type import GroupOnlyFilter
from tg.bot.filter.content_type import TextOnlyFilter
from tg.bot.filter.id import WhitelistedChatIDsFilter
from tg.bot.filter.no_bot import NoBotFilter

logger = logging.getLogger(__name__)

Expand All @@ -26,122 +26,107 @@ def cur_mod_name():
general_router = Router()


class GeneralForm(StatesGroup):
name = State()
like_bots = State()
language = State()
@general_router.message(Command("chat_id"), NoBotFilter(), TextOnlyFilter())
async def command_chat_id(message: Message) -> None:
try:
await message.answer(text=str(message.chat.id))
except Exception as e:
logger.warning(
f"error processing in function:{cur_func_name()}, for agent:{pool.bot_by_token(message.bot.token)["agent_id"]} token:{message.bot.token} err: {str(e)}"
)


## group commands and messages


@general_router.message(GroupOnlyFilter(), TextOnlyFilter(), CommandStart())
@general_router.message(
CommandStart(),
NoBotFilter(),
WhitelistedChatIDsFilter(),
GroupOnlyFilter(),
TextOnlyFilter(),
)
async def gp_command_start(message: Message):
if message.from_user.is_bot:
return

try:
group_title = message.from_user.first_name
await message.answer(
text=f"🤖 Hi Everybody, {group_title}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
except Exception:
except Exception as e:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name,
agent_id=pool.bot_by_token(message.bot.token).get("agent_id"),
token=message.bot.token,
)
f"error processing in function:{cur_func_name()}, for agent:{pool.bot_by_token(message.bot.token)["agent_id"]} token:{message.bot.token} err: {str(e)}"
)


@general_router.message(GroupOnlyFilter(), TextOnlyFilter())
@general_router.message(
WhitelistedChatIDsFilter(), NoBotFilter(), GroupOnlyFilter(), TextOnlyFilter()
)
async def gp_process_message(message: Message) -> None:
if message.from_user.is_bot:
return

bot = await message.bot.get_me()
if (
message.reply_to_message
and message.reply_to_message.from_user.id == message.bot.id
) or bot.username in message.text:
cached_bot = pool.bot_by_token(message.bot.token)
if cached_bot is None:
logger.warning(
"bot with token {token} not found in cache.".format(
token=message.bot.token
)
)
logger.warning(f"bot with token {message.bot.token} not found in cache.")
return

try:
agent_id = cached_bot["agent_id"]
# TODO: use config to control if group memory is public
# thread_id = pool.agent_thread_id(agent_id, message.chat.id)
thread_id = f"{agent_id}-public"
thread_id = pool.agent_thread_id(
agent_id, cached_bot["is_public"], message.chat.id
)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
except Exception:
except Exception as e:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=cached_bot.get("agent_id"),
token=message.bot.token,
)
f"error processing in function:{cur_func_name()}, for agent:{cached_bot["agent_id"]} token:{message.bot.token}, err={str(e)}"
)


## direct commands and messages
@general_router.message(CommandStart(), TextOnlyFilter())
async def command_start(message: Message, state: FSMContext) -> None:
if message.from_user.is_bot:
return


@general_router.message(
CommandStart(), NoBotFilter(), WhitelistedChatIDsFilter(), TextOnlyFilter()
)
async def command_start(message: Message) -> None:
try:
first_name = message.from_user.first_name
await message.answer(
text=f"🤖 Hi, {first_name}! 🎉\nGreetings, traveler of the digital realm! You've just awakened the mighty powers of this chat bot. Brace yourself for an adventure filled with wit, wisdom, and possibly a few jokes.",
)
except Exception:
except Exception as e:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=pool.bot_by_token(message.bot.token).get("agent_id"),
token=message.bot.token,
)
f"error processing in function:{cur_func_name()}, for agent:{pool.bot_by_token(message.bot.token)["agent_id"]} token:{message.bot.token} err: {str(e)}"
)


@general_router.message(
TextOnlyFilter(),
NoBotFilter(),
WhitelistedChatIDsFilter(),
)
async def process_message(message: Message, state: FSMContext) -> None:
if message.from_user.is_bot:
return

async def process_message(message: Message) -> None:
cached_bot = pool.bot_by_token(message.bot.token)
if cached_bot is None:
logger.warning(
"bot with token {token} not found in cache.".format(token=message.bot.token)
)
logger.warning(f"bot with token {message.bot.token} not found in cache.")
return

try:
agent_id = cached_bot["agent_id"]
thread_id = pool.agent_thread_id(agent_id, message.chat.id)
# only group memory can be public, dm always private
thread_id = pool.agent_thread_id(agent_id, False, message.chat.id)
response = execute_agent(agent_id, message.text, thread_id)
await message.answer(
text="\n".join(response),
reply_to_message_id=message.message_id,
)
except Exception:
except Exception as e:
logger.warning(
"error processing in function:{func}, for agent:{agent_id} token:{token}".format(
func=cur_func_name(),
agent_id=cached_bot.get("agent_id"),
token=message.bot.token,
)
f"error processing in function:{cur_func_name()}, for agent:{cached_bot["agent_id"]} token:{message.bot.token} err:{str(e)}"
)
Loading

0 comments on commit ee9511c

Please sign in to comment.