-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAuth.py
executable file
·133 lines (87 loc) · 3.07 KB
/
Auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import Config.DataSource as ds
import hashlib
import random
import base64
class Auth:
def __init__(self):
self.token = ''
def __init__(self, token):
assert isinstance(token, str)
self.token = token
# TO BE IN LOGIN HANDLER FILE
def check_password(self,hashed_password, user_password):
password, salt = hashed_password.split(':')
return password == hashlib.sha256(salt.encode() + user_password.encode()).hexdigest()
def generate_token(self):
x = random.randint(1000, 99000)
token = hashlib.sha3_256(str(x).encode()).hexdigest()
self.token = token
def store_server_session(self):
# GET THE DATABASE
data_source = ds.DataSource()
db = data_source.getDB()
# GET THE REQUIRED COLLECTION
db_users = db['users']
# INSERT TOKEN
db_users.insert_one({"token": self.token})
def delete_server_session(self):
# GET THE DATABASE
data_source = ds.DataSource()
db = data_source.getDB()
# GET THE REQUIRED COLLECTION
db_users = db['users']
# DELETE TOKEN
db_users.delete_one({"token": self.token})
def start_server_session(self):
self.generate_token()
self.store_server_session()
self.throw_auth_token()
def throw_auth_token(self):
return {"token":self.token}
def close_server_session(self):
self.delete_server_session()
return {"message": "Logged out Successfully"}
def authenticate_user(self,credentials):
result=None
cred_dec=self.decode_auth_header(credentials)
username=cred_dec["username"]
password=cred_dec["password"]
# GET THE DATABASE
data_source = ds.DataSource()
db = data_source.getDB()
# GET THE REQUIRED COLLECTION
db_users = db['users']
#MAKE QUERY DOCUMENT
query={"user_name":username}
# QUERY THE COLLECTION INTO CURSOR
users = db_users.find(query)
for user in users:
if self.check_password(user['password'],password):
#AUTHENTICATED
self.start_server_session()
self.throw_auth_token()
else:
return {"token":"UNAUTHORIZED"}
def decode_auth_header(self,cred):
decoded=base64.b64decode(cred).decode()
cnt=0;
for char in decoded:
if(char==':'):
break
cnt=cnt+1
username=decoded[:cnt]
password=decoded[cnt+1:]
return {"username":username, "password":password}
#Outside of class
def is_authorized(token):
# GET THE DATABASE
data_source = ds.DataSource()
db = data_source.getDB()
# GET THE REQUIRED COLLECTION
db_tokens = db['auth_tokens']
query = {"token":token}
retrieved = db_tokens.find_one()
if retrieved is not None:
return True
else:
return False