-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·163 lines (149 loc) · 5.49 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
from requests import get
from influxdb import InfluxDBClient
import uuid
import random
import time
import json
import sys
import os
from datetime import datetime
#Open config file
filepath = os.path.dirname(__file__) + '/config.json'
with open(filepath) as config_file:
config_json = json.load(config_file)
api_schemas = config_json['api_schemas']
idb_host = config_json['influxdb_settings']['host']
idb_port = config_json['influxdb_settings']['port']
loopsecs = int(config_json['script_settings']['loopsecs'])
timestamp_tag = 'date'
alldata_dict = {}
linedatalist = []
client = InfluxDBClient(host=idb_host, port=idb_port)
def get_data(areatype, areacode, metrics):
#Gets COVID data for a specific area code for a specific metric
endpoint = 'https://api.coronavirus.data.gov.uk/v2/data?areaType=' + areatype + '&areaCode=' + areacode + '&'
for metric in metrics:
endpoint += 'metric=' + metric + '&'
endpoint += 'format=json'
sleep = 0
responsecode = 429
while responsecode == 429:
print('Accessing Endpoint: ',endpoint)
response = get(endpoint, timeout=10)
time.sleep(sleep)
responsecode = response.status_code
print('Response Code: ', responsecode)
sleep += 1
return response.json()
def date_timestamp(string):
#Converts the frame.time string formatted as "Jan 9, 2021 11:12:52.206763000 GMT Standard Time" to datetime
i = 0
datetimestr = ''
datetime_obj = datetime.now()
try:
datetime_obj = datetime.strptime(string, '%Y-%m-%d')
except Exception as e:
print('Datetime format error: ',e,'Full String: ', string)
timestamp = int(datetime.timestamp(datetime_obj))
return str(timestamp)
def checkfornone(value):
if value is None:
result = 0
else:
result = value
return str(result)
def chunks(lst, n):
#"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def write_line_data(areatype, tags_values, metrics_values, date):
#Write metric name
linedata = areatype + '-covid-data,'
#Write tags
i = 0
for tag_value in tags_values:
i += 1
if tag_value['name'] == 'areaName':
linedata += 'location=' + tag_value['value'].replace(' ','')
else:
linedata += tag_value['name'] + '=' + tag_value['value']
if not i == len(tags_values):
linedata += ','
else:
linedata += ' '
#Write metrics
i = 0
for metric_value in metrics_values:
i += 1
try:
linedata += metric_value['name'] + '=' + checkfornone(metric_value['value'])
except:
print('Error: ', sys.exc_info()[0])
print(metric_value['name'])
print(metric_value['value'])
raise
if not i == len(metrics_values):
linedata += ','
else:
linedata += ' '
#Write timestamp
linedata += date_timestamp(date)
linedatalist.append(linedata)
#Run the main code block on a loop, every loopsecs seconds
while True:
#Iterate through each schema type
print('Collecting data...')
for api_schema in api_schemas:
try:
#Get the response from the API for the each areacode
for areacode in api_schema['areacodes']:
#Split the l1metrics into batches of 5 as this is the limit of the API
for l1metrics in chunks(api_schema['l1metrics'], 1):
try:
data = get_data(api_schema['areatype'], areacode, l1metrics)
for index in range(len(data['body'])):
l1metrics_values = []
l1tags_values = []
#Go through each L1 Metric and work out if it is a dictionary or not
for metric in l1metrics:
metric_dataset = data['body'][index][metric]
#Check if the metric is a list, or a plain l1 metric
if isinstance(metric_dataset, list) and len(metric_dataset) > 0:
#Go through each metric list and pull out the data and tags
for metric_data in metric_dataset:
l2tags_values = []
l2metrics_values = []
l2tags_values.extend([{'name': 'parentmetric', 'value': metric}])
for tag in api_schema['l1tags']:
l2tags_values.extend([{'name': tag, 'value': data['body'][index][tag]}])
for tag in api_schema['l2tags']:
#Check the tag exists in the data
if tag in metric_data:
l2tags_values.extend([{'name': tag, 'value': metric_data[tag]}])
for l2metric in api_schema['l2metrics']:
#Check if the metric exists in the data
if l2metric in metric_data:
l2metrics_values.extend([{'name': l2metric, 'value': metric_data[l2metric]}])
write_line_data(api_schema['areatype'], l2tags_values, l2metrics_values, data['body'][index][timestamp_tag])
elif not isinstance(metric_dataset, list) and metric_dataset is not None:
l1metrics_values.extend([{'name': metric, 'value': metric_dataset}])
for tag in api_schema['l1tags']:
l1tags_values.extend([{'name': tag, 'value': data['body'][index][tag]}])
if len(l1metrics_values) > 0:
write_line_data(api_schema['areatype'], l1tags_values, l1metrics_values, data['body'][index][timestamp_tag])
except ValueError:
print('Error with JSON response from API')
except:
print('Request Failed. Response code: ', responsecode)
#print(data)
#Go through each instance in the data
except:
print('Error: ', sys.exc_info()[0], 'Schema: ', api_schema)
raise
#Write the data to InfluxDB
print('writing the data to InfluxDB Database')
client.write_points(linedatalist, database='covid', time_precision='s', batch_size=10000, protocol='line')
#Sleep for loopsec seconds
print('Sleeping for ', loopsecs/60, 'minutes...')
time.sleep(loopsecs)