From 04744e6640bdd43754ae6490d7d9a9446dcf09ae Mon Sep 17 00:00:00 2001 From: Noah <117038300+NoahCxrest@users.noreply.github.com> Date: Sun, 29 Dec 2024 18:03:52 -0700 Subject: [PATCH] Update erm.py --- erm.py | 186 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/erm.py b/erm.py index f2a93c3..d4b0edf 100644 --- a/erm.py +++ b/erm.py @@ -909,70 +909,20 @@ def update_timestamp(self, guild_id: int, log_type: str, timestamp: int): @tasks.loop(minutes=10, reconnect=True) async def iterate_prc_logs(): try: - server_count = await bot.settings.db.aggregate([ - { - '$match': { - 'ERLC': {'$exists': True}, - '$or': [ - {'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}}, - {'ERLC.kill_logs': {'$type': 'long', '$ne': 0}}, - {'ERLC.player_logs': {'$type': 'long', '$ne': 0}}, - {"ERLC.welcome_message": {"$exists": True}}, - {"ERLC.team_restrictions": {"$exists": True}} - ] - } - }, - { - '$lookup': { - 'from': 'server_keys', - 'localField': '_id', - 'foreignField': '_id', - 'as': 'server_key' - } - }, - { - '$match': { - 'server_key': {'$ne': []} - } - }, - { - '$count': 'total' - } - ]).to_list(1) - server_count = server_count[0]['total'] if server_count else 0 - - logging.warning(f"[ITERATE] Starting iteration for {server_count} servers") - processed = 0 - start_time = time.time() - - batch_size = 15 - pipeline = [ - { - '$match': { - 'ERLC': {'$exists': True}, - '$or': [ - {'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}}, - {'ERLC.kill_logs': {'$type': 'long', '$ne': 0}}, - {'ERLC.player_logs': {'$type': 'long', '$ne': 0}}, - {"ERLC.welcome_message": {"$exists": True}}, - {"ERLC.team_restrictions": {"$exists": True}} - ] - } - }, - { - '$lookup': { - 'from': 'server_keys', - 'localField': '_id', - 'foreignField': '_id', - 'as': 'server_key' - } - }, - { - '$match': { - 'server_key': {'$ne': []} - } - } - ] + # Use a rate limiter for PRC API calls + rate_limit = asyncio.Semaphore(20) + request_interval = 1.0 / 20 # 20 requests per second + last_request_time = 0 + + async def rate_limited_request(coro): + nonlocal last_request_time + async with rate_limit: + current_time = time.time() + time_since_last = current_time - last_request_time + if time_since_last < request_interval: + await asyncio.sleep(request_interval - time_since_last) + last_request_time = time.time() + return await coro async def send_log_batch(channel, embeds): if not embeds: @@ -997,6 +947,7 @@ async def is_username_found(username: str, members: list[discord.Member]) -> boo break return member_found + async def check_team_restrictions(guild_id, players): logging.info(f"Checking team restrictions for server {guild_id}") team_restrictions = settings["ERLC"].get("team_restrictions", {}) @@ -1060,21 +1011,21 @@ async def check_team_restrictions(guild_id, players): kick_against.append(plr.username) team_restrictions_infractions[guild_id][plr.username] = 0 - if len(load_against) > 0: try: - await bot.prc_api.run_command(guild_id, f":load {','.join(load_against)}") + await rate_limited_request(bot.prc_api.run_command(guild_id, f":load {','.join(load_against)}")) except prc_api.ResponseFailure: logging.warning("PRC API Rate limit reached when loading.") + for message, plrs_to_send in pm_against.items(): try: - await bot.prc_api.run_command(guild_id, f":pm {','.join(plrs_to_send)} {message}") + await rate_limited_request(bot.prc_api.run_command(guild_id, f":pm {','.join(plrs_to_send)} {message}")) except prc_api.ResponseFailure: logging.warning("PRC API Rate limit reached when PMing.") if len(kick_against) > 0: try: - await bot.prc_api.run_command(guild_id, f":kick {','.join(kick_against)}") + await rate_limited_request(bot.prc_api.run_command(guild_id, f":kick {','.join(kick_against)}")) except prc_api.ResponseFailure: logging.warning("PRC API Rate limit reached when kicking.") @@ -1107,7 +1058,6 @@ async def check_team_restrictions(guild_id, players): ) ) - async def send_welcome_message(guild_id, player_logs, last_timestamp) -> int: welcome_message = settings["ERLC"].get("welcome_message", "") @@ -1127,12 +1077,11 @@ async def send_welcome_message(guild_id, player_logs, last_timestamp) -> int: if len(players) == 0: return sorted(player_logs, key=lambda x: x.timestamp, reverse=True)[0].timestamp try: - await bot.prc_api.run_command(guild_id, f":pm {','.join(players)} {welcome_message}") + await rate_limited_request(bot.prc_api.run_command(guild_id, f":pm {','.join(players)} {welcome_message}")) except prc_api.ResponseFailure: pass return sorted(player_logs, key=lambda x: x.timestamp, reverse=True)[0].timestamp - def process_kill_logs(kill_logs, last_timestamp): """Process kill logs and return embeds""" embeds = [] @@ -1171,10 +1120,60 @@ def process_player_logs(player_logs, last_timestamp): return embeds, latest_timestamp - async for items in bot.settings.db.aggregate(pipeline).batch_size(batch_size): + async def fetch_logs_with_retry(guild_id, bot, retries=3): + """Helper function to fetch logs with retry logic""" + for attempt in range(retries): + try: + kill_logs = await rate_limited_request(bot.prc_api.fetch_kill_logs(guild_id)) + player_logs = await rate_limited_request(bot.prc_api.fetch_player_logs(guild_id)) + return kill_logs, player_logs + except prc_api.ResponseFailure as e: + if e.status_code == 429 and attempt < retries - 1: + retry_after = float(e.response.get('retry_after', 5)) + await asyncio.sleep(retry_after) + continue + raise + return None, None + + server_count = await bot.settings.db.aggregate([ + { + '$match': { + 'ERLC': {'$exists': True}, + '$or': [ + {'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}}, + {'ERLC.kill_logs': {'$type': 'long', '$ne': 0}}, + {'ERLC.player_logs': {'$type': 'long', '$ne': 0}}, + {"ERLC.welcome_message": {"$exists": True}}, + {"ERLC.team_restrictions": {"$exists": True}} + ] + } + }, + { + '$lookup': { + 'from': 'server_keys', + 'localField': '_id', + 'foreignField': '_id', + 'as': 'server_key' + } + }, + { + '$match': { + 'server_key': {'$ne': []} + } + }, + { + '$count': 'total' + } + ]).to_list(1) + server_count = server_count[0]['total'] if server_count else 0 + + logging.warning(f"[ITERATE] Starting iteration for {server_count} servers") + processed = 0 + start_time = time.time() + + async def process_server(guild_id, settings): try: - guild = await bot.fetch_guild(items['_id']) - settings = await bot.settings.find_by_id(guild.id) + guild = await bot.fetch_guild(guild_id) erlc_settings = settings.get('ERLC', {}) channels = { @@ -1182,37 +1181,40 @@ def process_player_logs(player_logs, last_timestamp): 'player_logs': erlc_settings.get('player_logs') } - # if not any(channels.values()): - # continue - channels = {k: await fetch_get_channel(guild, v) for k, v in channels.items() if v} has_welcome_message = bool(erlc_settings.get("welcome_message", False)) has_team_restrictions = bool(erlc_settings.get("team_restrictions")) if not channels and not has_welcome_message and not has_team_restrictions: - continue + return + + # Fetch all required data concurrently + kill_logs, player_logs = None, None + if channels.get('kill_logs') or channels.get('player_logs') or has_welcome_message: + kill_logs, player_logs = await fetch_logs_with_retry(guild_id, bot) + + server_players = None + if has_team_restrictions: + server_players = await rate_limited_request( + bot.prc_api.get_server_players(guild_id) + ) - try: - kill_logs, player_logs = await fetch_logs_with_retry(guild.id, bot) - except Exception as e: - logging.error(f"Failed to fetch logs for {guild.id}: {e}") - continue tasks = [] - if has_welcome_message: - last_timestamp = log_tracker.get_last_timestamp(guild.id, 'player_logs') - latest_timestamp = await send_welcome_message(guild.id, player_logs, last_timestamp) - log_tracker.update_timestamp(guild.id, "welcome_message", latest_timestamp) + if has_welcome_message and player_logs: + last_timestamp = log_tracker.get_last_timestamp(guild_id, 'player_logs') + latest_timestamp = await send_welcome_message(guild_id, player_logs, last_timestamp) + log_tracker.update_timestamp(guild_id, "welcome_message", latest_timestamp) - if has_team_restrictions: - await check_team_restrictions(guild.id, await bot.prc_api.get_server_players(guild.id)) + if has_team_restrictions and server_players: + await check_team_restrictions(guild_id, server_players) - if 'kill_logs' in channels and kill_logs: - last_timestamp = log_tracker.get_last_timestamp(guild.id, 'kill_logs') + if channels.get('kill_logs') and kill_logs: + last_timestamp = log_tracker.get_last_timestamp(guild_id, 'kill_logs') embeds, latest_timestamp = process_kill_logs(kill_logs, last_timestamp) if embeds: tasks.append(send_log_batch(channels['kill_logs'], embeds)) - log_tracker.update_timestamp(guild.id, 'kill_logs', latest_timestamp) + log_tracker.update_timestamp(guild_id, 'kill_logs', latest_timestamp) if 'player_logs' in channels and player_logs: last_timestamp = log_tracker.get_last_timestamp(guild.id, 'player_logs')