Skip to content

Commit

Permalink
Update erm.py
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahCxrest authored Dec 30, 2024
1 parent 04744e6 commit ced7968
Showing 1 changed file with 92 additions and 94 deletions.
186 changes: 92 additions & 94 deletions erm.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,20 +909,70 @@ def update_timestamp(self, guild_id: int, log_type: str, timestamp: int):
@tasks.loop(minutes=10, reconnect=True)
async def iterate_prc_logs():
try:
# Use a rate limiter for PRC API calls
rate_limit = asyncio.Semaphore(20)
request_interval = 1.0 / 20 # 20 requests per second
last_request_time = 0

async def rate_limited_request(coro):
nonlocal last_request_time
async with rate_limit:
current_time = time.time()
time_since_last = current_time - last_request_time
if time_since_last < request_interval:
await asyncio.sleep(request_interval - time_since_last)
last_request_time = time.time()
return await coro
server_count = await bot.settings.db.aggregate([
{
'$match': {
'ERLC': {'$exists': True},
'$or': [
{'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}},
{'ERLC.kill_logs': {'$type': 'long', '$ne': 0}},
{'ERLC.player_logs': {'$type': 'long', '$ne': 0}},
{"ERLC.welcome_message": {"$exists": True}},
{"ERLC.team_restrictions": {"$exists": True}}
]
}
},
{
'$lookup': {
'from': 'server_keys',
'localField': '_id',
'foreignField': '_id',
'as': 'server_key'
}
},
{
'$match': {
'server_key': {'$ne': []}
}
},
{
'$count': 'total'
}
]).to_list(1)
server_count = server_count[0]['total'] if server_count else 0

logging.warning(f"[ITERATE] Starting iteration for {server_count} servers")
processed = 0
start_time = time.time()

batch_size = 15
pipeline = [
{
'$match': {
'ERLC': {'$exists': True},
'$or': [
{'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}},
{'ERLC.kill_logs': {'$type': 'long', '$ne': 0}},
{'ERLC.player_logs': {'$type': 'long', '$ne': 0}},
{"ERLC.welcome_message": {"$exists": True}},
{"ERLC.team_restrictions": {"$exists": True}}
]
}
},
{
'$lookup': {
'from': 'server_keys',
'localField': '_id',
'foreignField': '_id',
'as': 'server_key'
}
},
{
'$match': {
'server_key': {'$ne': []}
}
}
]

async def send_log_batch(channel, embeds):
if not embeds:
Expand All @@ -947,7 +997,6 @@ async def is_username_found(username: str, members: list[discord.Member]) -> boo
break

return member_found

async def check_team_restrictions(guild_id, players):
logging.info(f"Checking team restrictions for server {guild_id}")
team_restrictions = settings["ERLC"].get("team_restrictions", {})
Expand Down Expand Up @@ -1011,21 +1060,21 @@ async def check_team_restrictions(guild_id, players):
kick_against.append(plr.username)
team_restrictions_infractions[guild_id][plr.username] = 0


if len(load_against) > 0:
try:
await rate_limited_request(bot.prc_api.run_command(guild_id, f":load {','.join(load_against)}"))
await bot.prc_api.run_command(guild_id, f":load {','.join(load_against)}")
except prc_api.ResponseFailure:
logging.warning("PRC API Rate limit reached when loading.")

for message, plrs_to_send in pm_against.items():
try:
await rate_limited_request(bot.prc_api.run_command(guild_id, f":pm {','.join(plrs_to_send)} {message}"))
await bot.prc_api.run_command(guild_id, f":pm {','.join(plrs_to_send)} {message}")
except prc_api.ResponseFailure:
logging.warning("PRC API Rate limit reached when PMing.")

if len(kick_against) > 0:
try:
await rate_limited_request(bot.prc_api.run_command(guild_id, f":kick {','.join(kick_against)}"))
await bot.prc_api.run_command(guild_id, f":kick {','.join(kick_against)}")
except prc_api.ResponseFailure:
logging.warning("PRC API Rate limit reached when kicking.")

Expand Down Expand Up @@ -1058,6 +1107,7 @@ async def check_team_restrictions(guild_id, players):
)
)


async def send_welcome_message(guild_id, player_logs, last_timestamp) -> int:
welcome_message = settings["ERLC"].get("welcome_message", "")

Expand All @@ -1077,11 +1127,12 @@ async def send_welcome_message(guild_id, player_logs, last_timestamp) -> int:
if len(players) == 0:
return sorted(player_logs, key=lambda x: x.timestamp, reverse=True)[0].timestamp
try:
await rate_limited_request(bot.prc_api.run_command(guild_id, f":pm {','.join(players)} {welcome_message}"))
await bot.prc_api.run_command(guild_id, f":pm {','.join(players)} {welcome_message}")
except prc_api.ResponseFailure:
pass
return sorted(player_logs, key=lambda x: x.timestamp, reverse=True)[0].timestamp


def process_kill_logs(kill_logs, last_timestamp):
"""Process kill logs and return embeds"""
embeds = []
Expand Down Expand Up @@ -1120,101 +1171,48 @@ def process_player_logs(player_logs, last_timestamp):

return embeds, latest_timestamp

async def fetch_logs_with_retry(guild_id, bot, retries=3):
"""Helper function to fetch logs with retry logic"""
for attempt in range(retries):
try:
kill_logs = await rate_limited_request(bot.prc_api.fetch_kill_logs(guild_id))
player_logs = await rate_limited_request(bot.prc_api.fetch_player_logs(guild_id))
return kill_logs, player_logs
except prc_api.ResponseFailure as e:
if e.status_code == 429 and attempt < retries - 1:
retry_after = float(e.response.get('retry_after', 5))
await asyncio.sleep(retry_after)
continue
raise
return None, None

server_count = await bot.settings.db.aggregate([
{
'$match': {
'ERLC': {'$exists': True},
'$or': [
{'ERLC.rdm_channel': {'$type': 'long', '$ne': 0}},
{'ERLC.kill_logs': {'$type': 'long', '$ne': 0}},
{'ERLC.player_logs': {'$type': 'long', '$ne': 0}},
{"ERLC.welcome_message": {"$exists": True}},
{"ERLC.team_restrictions": {"$exists": True}}
]
}
},
{
'$lookup': {
'from': 'server_keys',
'localField': '_id',
'foreignField': '_id',
'as': 'server_key'
}
},
{
'$match': {
'server_key': {'$ne': []}
}
},
{
'$count': 'total'
}
]).to_list(1)
server_count = server_count[0]['total'] if server_count else 0

logging.warning(f"[ITERATE] Starting iteration for {server_count} servers")
processed = 0
start_time = time.time()

async def process_server(guild_id, settings):
async for items in bot.settings.db.aggregate(pipeline).batch_size(batch_size):
try:
guild = await bot.fetch_guild(guild_id)
guild = await bot.fetch_guild(items['_id'])
settings = await bot.settings.find_by_id(guild.id)
erlc_settings = settings.get('ERLC', {})

channels = {
'kill_logs': erlc_settings.get('kill_logs'),
'player_logs': erlc_settings.get('player_logs')
}

# if not any(channels.values()):
# continue

channels = {k: await fetch_get_channel(guild, v) for k, v in channels.items() if v}
has_welcome_message = bool(erlc_settings.get("welcome_message", False))
has_team_restrictions = bool(erlc_settings.get("team_restrictions"))

if not channels and not has_welcome_message and not has_team_restrictions:
return

# Fetch all required data concurrently
kill_logs, player_logs = None, None
if channels.get('kill_logs') or channels.get('player_logs') or has_welcome_message:
kill_logs, player_logs = await fetch_logs_with_retry(guild_id, bot)

server_players = None
if has_team_restrictions:
server_players = await rate_limited_request(
bot.prc_api.get_server_players(guild_id)
)
continue

try:
kill_logs, player_logs = await fetch_logs_with_retry(guild.id, bot)
except Exception as e:
logging.error(f"Failed to fetch logs for {guild.id}: {e}")
continue
tasks = []

if has_welcome_message and player_logs:
last_timestamp = log_tracker.get_last_timestamp(guild_id, 'player_logs')
latest_timestamp = await send_welcome_message(guild_id, player_logs, last_timestamp)
log_tracker.update_timestamp(guild_id, "welcome_message", latest_timestamp)
if has_welcome_message:
last_timestamp = log_tracker.get_last_timestamp(guild.id, 'player_logs')
latest_timestamp = await send_welcome_message(guild.id, player_logs, last_timestamp)
log_tracker.update_timestamp(guild.id, "welcome_message", latest_timestamp)

if has_team_restrictions and server_players:
await check_team_restrictions(guild_id, server_players)
if has_team_restrictions:
await check_team_restrictions(guild.id, await bot.prc_api.get_server_players(guild.id))

if channels.get('kill_logs') and kill_logs:
last_timestamp = log_tracker.get_last_timestamp(guild_id, 'kill_logs')
if 'kill_logs' in channels and kill_logs:
last_timestamp = log_tracker.get_last_timestamp(guild.id, 'kill_logs')
embeds, latest_timestamp = process_kill_logs(kill_logs, last_timestamp)
if embeds:
tasks.append(send_log_batch(channels['kill_logs'], embeds))
log_tracker.update_timestamp(guild_id, 'kill_logs', latest_timestamp)
log_tracker.update_timestamp(guild.id, 'kill_logs', latest_timestamp)

if 'player_logs' in channels and player_logs:
last_timestamp = log_tracker.get_last_timestamp(guild.id, 'player_logs')
Expand Down

0 comments on commit ced7968

Please sign in to comment.