-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetter.py
117 lines (96 loc) · 3.68 KB
/
getter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import discord
import asyncio
from datetime import datetime, timedelta
import os
import csv
from typing import AsyncIterator
# Create a new instance of the Discord client
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
FORUM_CHANNEL_ID = 1047214314349658172
GUILD_ID = 484437221055922177
AFTER_DAYS = 5
@client.event
async def on_ready():
print(f"Logged in as {client.user}")
# Specify the guild and channel by ID
guild = discord.utils.get(client.guilds, id=GUILD_ID)
channel = discord.utils.get(guild.channels, id=FORUM_CHANNEL_ID)
if guild is None or channel is None:
print("Guild or Channel not found")
return
days_ago = (datetime.utcnow() - timedelta(days=AFTER_DAYS)).timestamp()
threads = await guild.active_threads()
solved_threads = []
for thread in threads:
if (
str(thread.parent) == "zkapps-questions"
or str(thread.parent) == "zkapps-developers"
):
if thread.created_at.timestamp() < days_ago:
solved_threads.append(thread)
continue
for tag in thread.applied_tags:
if tag.name == "Solved":
solved_threads.append(thread)
break
else:
continue
thread_ids = [id.id for id in solved_threads]
os.makedirs("threads", exist_ok=True)
print(len(solved_threads))
for counter, thread_id in enumerate(thread_ids):
thread = guild.get_thread(thread_id)
if thread:
with open(f"threads/{counter}.csv", "w", newline="") as f:
try:
message_writer = csv.writer(f)
message_writer.writerow(
[
"thread_name",
"id",
"channel_id",
"author",
"content",
"timestamp",
"mentions",
"reactions",
"referenced_message",
"member",
]
)
except:
print("error")
print(f"Retrieving messages for thread: {thread.name}")
async for message in thread.history(limit=100):
try:
message_writer = csv.writer(f)
message_writer.writerow(
[
thread.name,
message.id,
message.channel.id,
message.author.id,
message.content,
message.created_at,
message.mentions,
message.reactions,
message.reference.message_id
if message.reference is not None
else None,
message.author,
]
)
except Exception as e:
print(e)
else:
print(f"Thread with ID {thread_id} not found in guild {guild.name}")
print("All tasks completed. Bot is shutting down...")
await client.close()
# Run the client
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv(), override=True)
discord_token = os.getenv("DISCORD_TOKEN")
cor = client.start(discord_token)
asyncio.get_event_loop().run_until_complete(cor)