-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb_check.py
78 lines (74 loc) · 3.33 KB
/
web_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from time_converter import TimeConverter
from io import BytesIO
import pycurl
class WebCheck:
def __init__(self, checks={}):
default_values = {'name': 'unreal-ip', 'url': '256.256.256.256', 'scheme': 'https', 'method': 'get', 'request_timeout': '3',
'connection_timeout': '3', 'update_interval': '60', 'return_http_code': '200', 'arguments': '',
'postfields': '', 'application_json': '', 'follow_redirect': ''}
item = {**default_values, **checks}
self.name = item['name'].lower()
self.scheme = item['scheme'].lower()
self.url = item['url']
self.arguments = item['arguments']
self.method = item['method'].lower()
self.postfields = item['postfields']
self.request_timeout = item['request_timeout'].lower()
self.connection_timeout = item['connection_timeout'].lower()
self.update_interval = str(item['update_interval']).lower()
self.return_http_code = int(item['return_http_code'])
self.application_json = bool(item['application_json'])
self.follow_redirect = bool(item['follow_redirect'])
self.success = '0'
def get_uri(self):
arguments = self.arguments
scheme = self.scheme.replace('://', '')
if self.arguments:
arguments = '?{}'.format(self.arguments.replace('?', ''))
return '{}://{}{}'.format(scheme, self.url, arguments)
def make_request(self):
if self.method == 'get':
b_obj = BytesIO()
crl = pycurl.Curl()
crl.setopt(crl.URL, self.get_uri())
crl.setopt(pycurl.TIMEOUT, int(TimeConverter(self.request_timeout)))
crl.setopt(pycurl.CONNECTTIMEOUT, int(TimeConverter(self.connection_timeout)))
crl.setopt(pycurl.USERAGENT, 'em-agent')
if self.follow_redirect:
crl.setopt(pycurl.FOLLOWLOCATION, 1)
crl.setopt(crl.WRITEDATA, b_obj)
if self.application_json:
crl.setopt(pycurl.HTTPHEADER, ['Content-Type: application/json'])
try:
crl.perform()
if crl.getinfo(pycurl.HTTP_CODE) == self.return_http_code:
self.success = '1'
else:
self.success = '0'
except pycurl.error:
self.success = '0'
pass
finally:
crl.close()
elif self.method == 'post':
b_obj = BytesIO()
crl = pycurl.Curl()
crl.setopt(crl.URL, self.get_uri())
crl.setopt(crl.POSTFIELDS, self.postfields)
crl.setopt(pycurl.TIMEOUT, int(TimeConverter(self.request_timeout)))
crl.setopt(pycurl.CONNECTTIMEOUT, int(TimeConverter(self.connection_timeout)))
crl.setopt(pycurl.USERAGENT, 'em-agent')
crl.setopt(crl.WRITEDATA, b_obj)
if self.application_json:
crl.setopt(pycurl.HTTPHEADER, ['Content-Type: application/json'])
try:
crl.perform()
if crl.getinfo(pycurl.HTTP_CODE) == self.return_http_code:
self.success = '1'
else:
self.success = '0'
except pycurl.error:
self.success = '0'
pass
finally:
crl.close()