-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchatbot.py
310 lines (277 loc) · 12.8 KB
/
chatbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import sys
import asyncio
import aiohttp
import logging
import irc.bot
import irc.client
import irc.client_aio
import irc.strings
import logging
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
from conf import *
from commands import CommandHandler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import STATE_STOPPED, STATE_RUNNING, STATE_PAUSED
from db import do_calc_happiness
sentry_logging = LoggingIntegration(
level=logging.DEBUG,
event_level=logging.ERROR
)
sentry_sdk.init(
dsn="https://[email protected]/1",
integrations=[sentry_logging]
)
log = logging.getLogger("chatbot")
epicfilehandler = logging.FileHandler("chatbot.log", 'a', 'utf-8')
epicfilehandler.setFormatter(logging.Formatter("[%(asctime)s] [%(module)s] [%(levelname)s]: %(message)s"))
log.setLevel(logging.DEBUG)
log.addHandler(epicfilehandler)
log.addHandler(logging.StreamHandler(sys.stdout))
class TheBot(irc.client_aio.AioSimpleIRCClient):
def __init__(self):
irc.client.SimpleIRCClient.__init__(self)
self.config = Conf(os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini"))
self.target = "#" + self.config.CHANNEL_NAME # The name of the twitch irc channel
self.channel_name = self.config.CHANNEL_NAME # The display name of the twitch channel
self.channel_id = "" # ID is saved as a string because JSON sends it that way
self.host = self.config.HOST # The name of the host of the bot
self.log = logging.getLogger("chatbot") # Centralized logging
# for twitch api stuff
self.auth_token = ""
self.aio_session = None
# shortcut to the async loop
self.loop = self.connection.reactor.loop
# we have to get the aiosession in an async way because deprecated methods
self.loop.create_task(self.set_aio())
# and then refresh the token on the startup
self.loop.create_task(self.refresh_token())
# loop every once in a while to check if channel is live
self.loop.create_task(self.is_live_loop())
self.live = False
# command handler stuff
self.command_handler = CommandHandler(self, self.config.PREFIX)
# hydration reminder
self.loop.create_task(self.remind_drink_water())
# scheduler stuff
self.scheduler = AsyncIOScheduler()
self.scheduler.add_job(do_calc_happiness, 'cron', hour='11', jitter=1800)
self.scheduler.add_job(self.reconnect_loop, 'interval', hours=3)
self.scheduler.start()
async def set_aio(self):
self.aio_session = aiohttp.ClientSession(headers={"Client-ID": self.config.CLIENT_ID, "Authorization": "Bearer %s" % self.auth_token, "User-Agent": "Brie/0.1 (+https://brie.everything.moe/)"})
async def validate_token(self):
'''
Just verify that the token we have right now is correct.
We have to use "OAuth" instead of "Bearer" and the reason why isn't very clear
'''
tmp_session = aiohttp.ClientSession(headers={"Client-ID": self.config.CLIENT_ID, "Authorization": f"OAuth {self.auth_token}"})
try:
left = 0
async with tmp_session.get("https://id.twitch.tv/oauth2/validate") as response:
output = await response.json()
left = int(output["expires_in"])
# dead sessions stay dead
await tmp_session.close()
return left > 0
except:
# Probably failed to validate.
log.exception(f"There was an exception while validating the Auth Token.")
return False
async def refresh_token(self):
'''
Refresh the Bearer token for use in the Twitch API.
We don't need to specify scopes here at the moment since we aren't modifying anything or reading sensitive info.
'''
output = {}
async with self.aio_session.post(f"https://id.twitch.tv/oauth2/token?client_id={self.config.CLIENT_ID}&client_secret={self.config.CLIENT_SECRET}&grant_type=client_credentials") as response:
output = await response.json()
self.auth_token = output["access_token"]
# old sessions must die
try:
await self.aio_session.close()
except:
log.exception("A harmless exception occurred while closing the old ClientSession to refresh the Auth Token.")
await self.set_aio()
log.info(f"Refreshed Auth Token. Expire Time: {output['expires_in']}")
async def is_live_loop(self):
'''
Loop every 30 seconds and try to see if the defined channel is live.
'''
while True:
await asyncio.sleep(30)
if self.scheduler.state == STATE_STOPPED:
# at this point we assume the scheduler stopped by shutdown command
# so we kill everything explosively
sys.exit(0)
if not self.connection.is_connected():
log.warning("Somehow, we lost the connection without knowing it.")
await self.connection.connect("irc.chat.twitch.tv", 6667, self.config.BOT_NAME, password=self.config.AUTH_ID)
if self.aio_session is None:
continue
if not await self.validate_token():
try:
log.info("It appears the Auth Token failed to validate or is expired. Refreshing.")
await self.refresh_token()
except:
log.exception(f"An exception occurred while refreshing the Auth Token.")
try:
status = await self.is_live()
self.live = status
except:
log.exception(f"An exception occurred while updating the Live Status for {self.channel_id}")
async def remind_drink_water(self):
'''
Quick and dirty reminder to drink water every 30 minutes.
'''
msg = "/me Squeak squeak! Ms. Bobber told me to come remind all her students to drink water and stay hydrated! A healthy mouse is a happy mouse! brieYay Let your fellow students know by posting bobberDrink !"
while True:
await asyncio.sleep(45*60)
try:
if self.live:
self.connection.privmsg(self.target, msg)
except:
log.exception("Failed to send hydration reminder in IRC chat")
def on_welcome(self, connection, event):
'''
Event run on entrance to the IRC
'''
if irc.client.is_channel(self.target):
connection.cap("REQ", ":twitch.tv/membership")
connection.cap("REQ", ":twitch.tv/tags")
connection.cap("REQ", ":twitch.tv/commands")
connection.join(self.target)
print("Connected to the Server...")
log.info("Connected to IRC.")
else:
print("Something is wrong and everything is broken (config is probably wrong)")
def on_join(self, connection, event):
'''
Event triggered by IRC JOIN Messages, which anyone can cause (starting with yourself)
'''
# print("Someone joined the Twitch IRC Channel.")
pass
def on_disconnect(self, connection, event):
'''
Event run on disconnecting from IRC
'''
# Changed handling in main(). This should work with SystemExit Exception.
log.info("Disconnected from IRC.")
# sys.exit(0) # this force quits the program on disconnect
# Sub module dispatcher should be this function (this is the main menu essentially)
def on_pubmsg(self, connection, event):
'''
Event run for every message sent in the IRC Channel
'''
name = event.source.nick.lower()
message = event.arguments[0].strip()
id = ""
for d in event.tags: # irc, why did you decide this format was good?
if d["key"] == "user-id":
id = d["value"]
user = (name, id)
self.loop.create_task(
self.command_handler.parse_for_command(user, message)
)
async def wait_for_request_window(self, url):
'''
sometimes we can get rate limited. wait for the rate limit window by doing this.
also retry every 1 second on other errors (but only up to 30 times)
'''
attempt = True
output = {}
retries = 0
while attempt and retries < 30:
async with self.aio_session.get(url) as response:
output = await response.json()
if "status" in output:
log.warning(f"Got status {output['status']} error while requesting on {url}.")
if output["status"] == 429:
await asyncio.sleep(15)
else:
await asyncio.sleep(1)
retries += 1
else:
attempt = False
return output
async def get_channel_id_by_name(self, specific_login = None):
'''
Use new twitch api to get a channel id by login name
Pass a display name string to try a different channel name
'''
if specific_login is None:
specific_login = self.channel_name
url = f"https://api.twitch.tv/helix/users?login={specific_login}"
json_response = await self.wait_for_request_window(url)
channel_id = ""
try:
channel_id = json_response["data"][0]["id"]
except:
# this would fail if data was empty (it usually isnt)
print("Channel ID retrieval via login name failed.")
log.warning(f"Channel ID retrieval for login {specific_login} failed.")
return channel_id
async def is_live(self, channel_id = None):
'''
Use new twitch api in a scuffed way to find out if a channel is live
Pass a channel id string to try a specific channel id
'''
# fallback to main channel ID if none is specified
if self.channel_id == "" and channel_id is None:
self.channel_id = await self.get_channel_id_by_name()
channel_id = self.channel_id
elif channel_id is None:
channel_id = self.channel_id
# empty returns from the streams api endpoint mean the channel is offline
url = f"https://api.twitch.tv/helix/streams?user_id={channel_id}"
json_response = await self.wait_for_request_window(url)
return len(json_response["data"]) != 0
async def is_mod(self, user_name = None, channel_id = None, user_id = None):
'''
Use a dank undocumented v5 twitch api method to find the mod badge
But also use the new twitch api because IRC doesnt tell us the user id
'''
if user_id is None:
user_id = await self.get_channel_id_by_name(specific_login=user_name)
# fallback to main channel ID if none is specified
if self.channel_id == "" and channel_id is None:
self.channel_id = await self.get_channel_id_by_name()
channel_id = self.channel_id
elif channel_id is None:
channel_id = self.channel_id
url = f"https://api.twitch.tv/kraken/users/{user_id}/chat/channels/{channel_id}?api_version=5"
# the response on api v5, is simply { ... : ... } with lists or dicts optionally embedded
# it seems to always exist as far as i can tell
json_response = await self.wait_for_request_window(url)
badges = json_response.get("badges", [])
for entry in badges:
if entry["id"] in ("moderator", "broadcaster"):
return True
return False
async def reconnect_loop(self):
'''
Just reconnect to IRC like we start out.
This is meant to be run on a scheduler but can be called any time.
'''
log.info("Reconnecting to Twitch IRC to make sure connection remains alive.")
await self.connection.connect("irc.chat.twitch.tv", 6667, self.config.BOT_NAME, password=self.config.AUTH_ID)
def main():
'''
Initializing the bot object, connecting to IRC, and running everything until it eventually dies
'''
bot = TheBot()
bot.connect("irc.chat.twitch.tv", 6667, bot.config.BOT_NAME, password=bot.config.AUTH_ID)
try:
bot.start()
except SystemExit:
for t in asyncio.Task.all_tasks():
t.cancel()
bot.reactor.loop.run_until_complete(bot.reactor.loop.shutdown_asyncgens())
bot.reactor.loop.stop()
log.info("Bot working to disconnect and close (initial stage).")
finally:
bot.connection.disconnect()
bot.reactor.loop.close()
log.info("Bot disconnected and closed (final stage).")
if __name__ == "__main__":
main()