-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriptions_manager.py
265 lines (213 loc) · 7.37 KB
/
subscriptions_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import logging
import sqlite3
from datetime import datetime
from typing import Any, List, Optional, Tuple, Union
logger = logging.getLogger(__name__)
def exec_select(
query: str, parameters: Tuple[Union[str, int], ...] = ()
) -> List[Tuple[Any, ...]]:
assert query.startswith("SELECT")
assert query.count("?") == len(parameters)
results = []
with sqlite3.connect("subscriptions.db") as connection:
cursor = connection.cursor()
cursor.execute(query, parameters)
results = cursor.fetchall()
return results
def exec_sql(query: str, parameters: Tuple[Union[str, int], ...] = ()) -> None:
assert query.count("?") == len(parameters)
# TODO proper mocking
# print(f"SQL: {query} {parameters}")
# return
with sqlite3.connect("subscriptions.db") as connection:
cursor = connection.cursor()
cursor.execute(query, parameters)
def create_tables():
exec_sql(
"""
CREATE TABLE IF NOT EXISTS subscriptions(
chat_id INTEGER NOT NULL,
subreddit TEXT NOT NULL CHECK(subreddit LIKE "r/%" OR subreddit LIKE "u/%"),
per_month INTEGER NOT NULL,
"timestamp" DATETIME,
PRIMARY KEY (chat_id, subreddit)
);
"""
)
exec_sql(
"""
CREATE TABLE IF NOT EXISTS exceptions (
subreddit TEXT NOT NULL,
reason TEXT NOT NULL,
chat_id INTEGER NOT NULL,
PRIMARY KEY (chat_id, subreddit, reason)
);
"""
)
exec_sql(
"""
CREATE TABLE IF NOT EXISTS messages (
chat_id INTEGER NOT NULL,
post_id TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
subreddit TEXT,
PRIMARY KEY (chat_id, post_id)
);
"""
)
exec_sql(
"""
CREATE TRIGGER IF NOT EXISTS insert_Timestamp_Trigger
AFTER INSERT ON subscriptions
BEGIN
UPDATE subscriptions SET timestamp = CURRENT_TIMESTAMP
WHERE chat_id = NEW.chat_id AND subreddit = NEW.subreddit;
END;
"""
)
exec_sql(
"""
CREATE INDEX IF NOT EXISTS messages_timestamp_idx ON messages(timestamp);
"""
)
exec_sql(
"""
CREATE INDEX IF NOT EXISTS messages_chat_id_post_id_idx ON messages(chat_id, post_id);
"""
)
exec_sql(
"""
CREATE INDEX IF NOT EXISTS messages_chat_id_subreddit_timestamp_idx ON messages(chat_id, subreddit, timestamp);
"""
)
create_tables()
logger.info("Connected! (Hopefully)")
def subscribe(chat_id: int, subreddit: str, monthly_rank: int) -> bool:
"""
returns False if the user is already subscribed
"""
if is_subscribed(chat_id, subreddit):
return False
exec_sql(
"INSERT INTO subscriptions (chat_id, subreddit, per_month) VALUES (?,?,?)",
(chat_id, subreddit, monthly_rank),
)
return True
def is_subscribed(chat_id: int, subreddit: str) -> bool:
results = exec_select(
"SELECT * FROM subscriptions WHERE chat_id=? AND subreddit=?",
(chat_id, subreddit),
)
return bool(results)
def unsubscribe(chat_id: int, subreddit: str) -> bool:
"""
returns False if there is no matching subscription
"""
if not is_subscribed(chat_id, subreddit):
return False
exec_sql(
"DELETE FROM subscriptions WHERE chat_id=? AND subreddit=?",
(chat_id, subreddit),
)
return True
def update_per_month(chat_id: int, subreddit: str, new_monthly_rank: int):
exec_sql(
"UPDATE subscriptions SET per_month=? " " WHERE chat_id=? AND subreddit=?",
(new_monthly_rank, chat_id, subreddit),
)
def get_subscriptions() -> List[Tuple[int, str, int]]:
return exec_select("SELECT chat_id, subreddit, per_month FROM subscriptions") # type: ignore
def get_per_month(chat_id: int, subreddit: str) -> int:
results = exec_select(
"SELECT per_month FROM subscriptions WHERE chat_id=? AND subreddit=?",
(chat_id, subreddit),
)
return results[0][0]
def all_subreddits() -> List[str]:
results = exec_select("SELECT DISTINCT subreddit FROM subscriptions")
return [sub for (sub,) in results]
def sub_followers(subreddit: str) -> List[Tuple[int, int]]:
return exec_select( # type: ignore
"SELECT chat_id, per_month FROM subscriptions WHERE subreddit=?",
(subreddit,),
)
def user_subreddits(chat_id: int) -> List[str]:
rows = exec_select(
"SELECT subreddit FROM subscriptions WHERE chat_id=?", (chat_id,)
)
return [sub for (sub,) in rows]
def user_subscriptions(chat_id: int) -> List[Tuple[str, int]]:
return exec_select( # type: ignore
"SELECT subreddit, per_month FROM subscriptions WHERE chat_id=?",
(chat_id,),
)
def already_sent(chat_id: int, post_id: str) -> bool:
rows = exec_select(
"SELECT * FROM messages WHERE chat_id=? AND post_id=?", (chat_id, post_id)
)
return bool(rows)
def mark_as_sent(chat_id: int, post_id: str, subreddit: str):
exec_sql(
"INSERT INTO messages(chat_id, post_id, subreddit) VALUES (?,?,?)",
(chat_id, post_id, subreddit),
)
def already_sent_exception(chat_id: int, subreddit: str, reason: str):
rows = exec_select(
"SELECT * FROM exceptions WHERE chat_id=? AND subreddit=? AND reason=?",
(chat_id, subreddit, reason),
)
return bool(rows)
def mark_exception_as_sent(chat_id: int, subreddit: str, reason: str):
exec_sql("INSERT INTO exceptions VALUES (?,?,?)", (subreddit, reason, chat_id))
def delete_exception(subreddit: str):
exec_sql("DELETE FROM exceptions WHERE subreddit=?", (subreddit,))
def get_old_subscribers(subreddit: str) -> List[int]:
rows = exec_select(
"SELECT DISTINCT chat_id FROM exceptions WHERE subreddit=?", (subreddit,)
)
return [chat_id for (chat_id,) in rows]
def get_last_subscription_message(chat_id: int, subreddit: str) -> Optional[datetime]:
rows = exec_select(
"SELECT MAX(timestamp) from messages WHERE chat_id=? AND subreddit=?",
(chat_id, subreddit),
)
for (timestamp,) in rows:
try:
return datetime.fromisoformat(timestamp)
except TypeError:
return None
return None
def unavailable_subreddits() -> List[str]:
rows = exec_select("SELECT DISTINCT subreddit FROM exceptions")
return [sub for (sub,) in rows]
def delete_user(chat_id: int):
for sub in user_subreddits(chat_id):
unsubscribe(chat_id, sub)
def get_next_subscription_to_update() -> Tuple[str, int, int, float]:
subreddit, chat_id, per_month, time_left = exec_select(
"""SELECT
subscriptions.subreddit, subscriptions.chat_id, subscriptions.per_month,
(
(31.0 * 24.0 * 3600.0 / per_month) -
(
CAST(strftime('%s', CURRENT_TIMESTAMP) as integer) -
COALESCE(
CAST(strftime('%s', t.last_message_timestamp) as integer),
0.0)
)
) as priority
FROM subscriptions LEFT JOIN (
SELECT chat_id, subreddit,
COALESCE(max(timestamp), 0) as last_message_timestamp
FROM messages
GROUP BY chat_id, subreddit
ORDER BY last_message_timestamp ASC
) t ON (
t.chat_id = subscriptions.chat_id
AND t.subreddit = subscriptions.subreddit
)
ORDER BY priority ASC
LIMIT 1;
"""
)[0]
return subreddit, chat_id, per_month, time_left