-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshelly_api_get.py
77 lines (60 loc) · 2.39 KB
/
shelly_api_get.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from json import loads
import urllib.request
from datetime import datetime
from pytz import timezone
from influxdb import InfluxDBClient
import argparse
"""
Query Shelly Devices and retrive status information.
"""
___version___ = 0.2
config = argparse.ArgumentParser()
config.add_argument("ip", help="Shelly IP Address", type=str)
config.add_argument("data", help="What data to poll (currently only relay temperature is available)", type=str, choices=["temp"])
config.add_argument("shelly", help="Currently Supported Shelly Devices", type=str, choices=["1PM","1L","2.5"])
configValues = config.parse_args()
# Global Variables
shellyUsername="CHANGE"
shellyPassword="CHANGE"
shellyIP = configValues.ip
dataType = configValues.data
shellyType = configValues.shelly
# InfluxDB Configuration
databaseHost = "CHANGE"
databasePort = "CHANGE"
databaseDatabase = "CHANGE"
databaseUsername = "CHANGE"
databasePassword = "CHANGE"
def connectToDatabase():
"""Function to initiate Database Connection"""
# Database Connection: (influxdb)
influxClient = InfluxDBClient(
host=databaseHost,
port=databasePort,
database=databaseDatabase,
username=databaseUsername,
password=databasePassword,
timeout=5
)
return influxClient
def getAPIResult():
"""Function to get the REST API result needed from 'url'"""
url = "http://{}/status".format(shellyIP)
# Handle Shelly Basic Authentication
password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, shellyUsername, shellyPassword)
handler = urllib.request.HTTPBasicAuthHandler(password_mgr)
opener = urllib.request.build_opener(handler)
urllib.request.install_opener(opener)
# Get the data requested
restApiUrlOpen = urllib.request.urlopen(url, timeout=5).read()
result = loads(restApiUrlOpen)
return result
contents = getAPIResult()
if dataType == "temp":
dataBody = {}
dataBody['time'] = datetime.now(timezone('Europe/Vienna'))
dataBody['measurement'] = "Shelly %s, %s" % (shellyType, shellyIP)
dataBody['tags'] = { 'sensor': 'temperature' }
dataBody['fields'] = { 'value': contents['temperature'] }
connectToDatabase().write_points([dataBody])