-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlinux_labs.py
executable file
·212 lines (203 loc) · 10.1 KB
/
linux_labs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#! /usr/bin/env python3
import paramiko, ftplib, os
from getpass import getpass
from datetime import date, datetime
from optparse import OptionParser
from colorama import Fore, Back, Style
from time import strftime, localtime, time
status_color = {
'+': Fore.GREEN,
'-': Fore.RED,
'*': Fore.YELLOW,
':': Fore.CYAN,
' ': Fore.WHITE
}
def display(status, data, start='', end='\n'):
print(f"{start}{status_color[status]}[{status}] {Fore.BLUE}[{date.today()} {strftime('%H:%M:%S', localtime())}] {status_color[status]}{Style.BRIGHT}{data}{Fore.RESET}{Style.RESET_ALL}", end=end)
def get_arguments(*args):
parser = OptionParser()
for arg in args:
parser.add_option(arg[0], arg[1], dest=arg[2], help=arg[3])
return parser.parse_args()[0]
def isLeap(year):
if year % 4 == 0:
if year % 100 == 0:
if year % 400 == 0:
return True
else:
return False
else:
return True
else:
return False
months = {"Jan": 0,
"Feb": 31,
"Mar": 60 if isLeap(datetime.now().year) else 59,
"Apr": 91 if isLeap(datetime.now().year) else 90,
"May": 121 if isLeap(datetime.now().year) else 120,
"Jun": 152 if isLeap(datetime.now().year) else 151,
"Jul": 182 if isLeap(datetime.now().year) else 181,
"Aug": 213 if isLeap(datetime.now().year) else 212,
"Sep": 244 if isLeap(datetime.now().year) else 243,
"Oct": 274 if isLeap(datetime.now().year) else 273,
"Nov": 305 if isLeap(datetime.now().year) else 304,
"Dec": 335 if isLeap(datetime.now().year) else 334}
ips = None
location = None
total_ips = None
location_users = None
location_info = None
user_timings = {None: ''}
timeout = 1
default_users = ["root"]
with open("users.txt", 'r') as file:
all_cc_users = file.read().split('\n')
with open("template/template_start.html", 'r') as file:
template_start = file.read()
with open("template/template_end.html", 'r') as file:
template_end = file.read()
def connectSSH(ip, user, password, port=22, timeout=30):
try:
t1 = time()
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, port=port, username=user, password=password, allow_agent=False, timeout=timeout)
display('+', f"Authenticated => {Back.MAGENTA}{ip}{Back.RESET}")
t2 = time()
return ssh, t2-t1
except Exception as err:
return err, -1
def createPage():
page = template_start
page += f"<h1>Last Updated : {datetime.now().hour}:{datetime.now().minute} {datetime.now().day} {datetime.now().strftime('%B')} {datetime.now().year}</h1>"
page += """<table>
<thead>
<tr>
<th>Computer</th>
<th>User</th>
<th>SSH Users</th>
<th>Status</th>
</tr>
</thead>
<tbody>"""
for ip, users in location_users.items():
if users["users"] != None:
status = "occupied"
else:
status = "free"
if location_info[ip]["authenticated"] == False:
status = "power-off"
page += f"<tr class={status}><td>{ips[ip]}</td><td onclick=\"alert('{user_timings[users['users']]}');\">{users['users'] if users['users'] != None else '-'}</td><td>{(','.join(users['ssh_users'])) if len(users['ssh_users']) > 0 else '-'}</td><td>{status.upper()}</td></tr>\n"
page += template_end
with open(f"pages/{location}.html", 'w') as file:
file.write(page)
if __name__ == "__main__":
arguments = get_arguments(('-u', "--user", "user", "Computer Center (CC) User ID"),
('-w', "--webhome-user", "webhome_user", "Webhome User (Default=CC User ID)"),
('-t', "--timeout", "timeout", f"Timeout for Authenticating to a Linux Lab Computer (Default={timeout}seconds)"),
('-l', "--location", "location", f"Location in Campus (Linux Lab: {', '.join([file.split('.')[0] for file in os.listdir('csv')])})"))
if not arguments.user:
display('-', f"Please Provide a {Back.YELLOW}CC User ID!{Back.RESET}")
exit(0)
if not arguments.timeout:
arguments.timeout = timeout
else:
arguments.timeout = int(arguments.timeout)
if not arguments.location:
display('-', f"Please Provide a Location")
exit(0)
else:
location = arguments.location
password = getpass(f"Enter Password for {arguments.user} : ")
if arguments.webhome_user:
webhome_password = getpass(f"Enter Password for {arguments.webhome_user}@webhome : ")
else:
arguments.webhome_user = arguments.user
webhome_password = password
default_users.append(arguments.user)
with open(f"csv/{location}.csv", 'r') as file:
ips = {line.split(',')[0]: line.split(',')[1] for line in file.read().split('\n') if line != ''}
total_ips = len(ips)
location_users = {ip: {"ssh_users": [], "users": None} for ip in ips}
location_info = {ip: {"ssh_client": None, "authenticated": False, "authentication_time": None, "error": None} for ip in ips}
display(':', f"IPs = {Back.MAGENTA}{total_ips}{Back.RESET}")
try:
ftp_server = ftplib.FTP("webhome.cc.iitk.ac.in", arguments.webhome_user, webhome_password)
except Exception as error:
display('-', f"Error Occured while Connecting to {Back.MAGENTA}WEBHOME{Back.RESET} => {Back.YELLOW}{error}{Back.RESET}")
exit(0)
try:
while True:
for ip in location_users.keys():
try:
while location_info[ip]["authenticated"] == False:
ssh_client, authentication_time = connectSSH(ip, arguments.user, password, 22, timeout)
if authentication_time != -1:
location_info[ip]["authenticated"] = True
location_info[ip]["ssh_client"] = ssh_client
location_info[ip]["authentication_time"] = authentication_time
else:
location_info[ip]["error"] = ssh_client
display('-', f"Error Occurred while Connecting to {Back.MAGENTA}{ip}{Back.RESET} => {Back.YELLOW}{ssh_client}{Back.RESET}")
break
if location_info[ip]["error"] != None:
continue
stdin, stdout, stderr = location_info[ip]["ssh_client"].exec_command("who")
output = stdout.readlines()
users = None
ssh_users = []
last_login_time = 1
max_screen = -1
for line in output:
try:
for spaces in range(20, 1, -1):
line = line.replace(' '*spaces, ' '*(spaces-1))
details = line.split(' ')
user = details[0]
screen = int(details[1][1:])
if user not in default_users and user in all_cc_users:
if '-' in details[2]:
month = int(details[2].split('-')[1])
month = list(months.keys())[month-1]
login_date = int(details[2].split('-')[2])
hour = int(details[3].split(':')[0])
minutes = int(details[3].split(':')[1])
length_for_user = len(details[4])
else:
month = details[2]
login_date = int(details[3])
hour = int(details[4].split(':')[0])
minutes = int(details[4].split(':')[1])
length_for_user = len(details[5])
current_month = datetime.now().strftime("%B")[:3]
current_date = datetime.now().day
current_hour = datetime.now().hour
current_minute = datetime.now().minute
login_time = (months[current_month]+current_date+current_hour/24+current_minute/(24*60))-(months[month]+login_date+hour/24+minutes/(60*24))
if login_time < 0.5:
if length_for_user > 10:
ssh_users.append(user)
elif login_time < last_login_time:
user_timings[user] = f"{hour}:{'0'*(2-len(str(minutes)))+str(minutes)} {login_date} {month}"
last_login_time = login_time
if screen > max_screen:
users = user
max_screen = screen
except:
pass
location_users[ip]["ssh_users"] = ssh_users
location_users[ip]["users"] = users
display(':', f"{Back.MAGENTA}{ip}{Back.RESET} => Users:{users}, SSH Users:{','.join(ssh_users)}")
except:
location_info[ip]["authenticated"] = False
display('-', f"Error Occurred while Connecting to {Back.MAGENTA}{ip}{Back.RESET} => {Back.YELLOW}{ssh_client}{Back.RESET}")
createPage()
ftp_server = ftplib.FTP("webhome.cc.iitk.ac.in", arguments.webhome_user, webhome_password)
with open(f"pages/{location}.html", 'rb') as file:
ftp_server.storbinary(f"STOR /www/{arguments.webhome_user}/www/{location}.html", file)
ftp_server.quit()
except KeyboardInterrupt:
display('*', f"Keyboard Interrupt Detected...", start='\n')
display(':', "Exiting")
except Exception as error:
display('-', f"Error Occured => {Back.YELLOW}{error}{Back.RESET}")