-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
84 lines (71 loc) · 2.98 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import datetime
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
access_token = db.Column(db.String(255), unique=True, nullable=False)
refresh_token = db.Column(db.String(255), unique=True, nullable=False)
token_expiration = db.Column(db.DateTime, nullable=True)
token_scope = db.Column(db.Text, nullable=True)
def __repr__(self):
return '<Token %r>' % self.access_token
class OauthCode(db.Model):
id = db.Column(db.Integer, primary_key=True)
oauth_code = db.Column(db.String(30), unique=True, nullable=False)
oauth_scope = db.Column(db.Text, nullable=True)
def __repr__(self):
return '<OauthCode %r>' % self.oauth_code
def init_db(db):
try:
db.create_all()
db.session.commit()
except Exception as e:
error = "Failed creating the datatabase: {}".format(e)
return error
class AuthDbTools():
""" Class that handles all the DB calls needed to add, fetch, delete auth
tokens coming from twitch, possibly with multi-streamer support coming
down the line"""
def __init__(self, db, table):
self.description = 'Creates, edits, fetches auth tokens from an initialized db'
self.db = db
self.table = table
self.now = datetime.datetime.utcnow()
def get_all_tokens(self):
tokens = self.table.query.all()
return tokens
def get_valid_token(self, scope):
session = self.db.session
query = session.query(self.table)
db_result = query.filter(self.table.token_expiration > self.now).filter(
self.table.token_scope == scope).first()
token_lifetime = db_result.token_expiration - self.now
valid_token = {"access_token": db_result.access_token,
"refresh_token": db_result.refresh_token,
"expires_in": token_lifetime.seconds,
"scope": db_result.token_scope}
return valid_token
def get_oauth_code(self):
session = self.db.session
query = session.query(self.table)
db_result = query.first()
valid_code = db_result.oauth_code
return valid_code
def new_token(self, dict):
token_expiration = self.now + datetime.timedelta(0, dict['expires_in'])
new_token = self.table(access_token=dict['access_token'],
refresh_token=dict['refresh_token'],
token_expiration=token_expiration,
token_scope=dict['scope'][0])
self.db.session.add(new_token)
self.db.session.commit()
return True
def new_oauth_code(self, dict):
scope = dict['scope']
code = dict['code']
new_code = self.table(oauth_code=code,
oauth_scope=scope)
self.db.session.add(new_code)
self.db.session.commit()
return True