-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·130 lines (110 loc) · 5.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
import argparse
import asyncio
import signal
import json
import aiomqtt
from bleak.exc import BleakError, BleakDeviceNotFoundError
from bleclient import BleClient
send_config = True
reconnect_interval = 5 # In seconds
async def mqtt_publish(details: dict[str, any], client: aiomqtt.Client):
global send_config
# Define the base topic for MQTT Discovery
base_topic = "homeassistant"
# Define the device information
device_info = {
"identifiers": ["solarlife_mppt_ble"],
"name": "Solarlife MPPT",
"manufacturer": "Solarlife",
}
# Publish each item in the details dictionary to its own MQTT topic
for key, value in details.items():
state_topic = f"{base_topic}/sensor/solarlife/{key}/state"
topic = f"{base_topic}/sensor/solarlife/{key}/config"
# Create the MQTT Discovery payload
payload = {
"name": f"Solarlife {key.replace('_', ' ').title()}",
"device": device_info,
"unique_id": f"solarlife_{key}",
"state_topic": state_topic,
"unit_of_measurement": BleClient.get_unit_of_measurement(key)
}
if "daily_energy" in key:
payload['device_class'] = "energy"
payload['state_class'] = "total_increasing"
elif "total_energy" in key:
payload['device_class'] = "energy"
payload['state_class'] = "total"
elif "voltage" in key:
payload['device_class'] = "voltage"
payload['state_class'] = "measurement"
elif "current" in key:
payload['device_class'] = "current"
payload['state_class'] = "measurement"
elif "power" in key:
payload['device_class'] = "power"
payload['state_class'] = "measurement"
elif "temperature" in key:
payload['device_class'] = "temperature"
payload['state_class'] = "measurement"
elif key == "battery_percentage":
payload['device_class'] = "battery"
payload['state_class'] = "measurement"
# Publish the MQTT Discovery payload
if send_config:
print(f"Publishing MQTT Discovery payload for {key}")
await client.publish(topic, payload=json.dumps(payload), retain=True)
# Publish the entity state
await client.publish(state_topic, payload=str(value))
send_config = False
async def main(address, host, port, username, password):
async def run_mppt():
while True:
try:
async with aiomqtt.Client(hostname=host, port=port, username=username, password=password) as client:
print(f"Connecting to MQTT broker at {host}:{port}")
while True:
try:
async with BleClient(address) as mppt:
while True:
details = await mppt.request_details()
if details:
print(f"Battery: {details['battery_percentage']}% ({details['battery_voltage']}V)")
await mqtt_publish(details, client)
else:
print("No values recieved")
await asyncio.sleep(20.0)
except BleakDeviceNotFoundError:
print(f"BLE device with address {address} was not found")
await asyncio.sleep(5)
except BleakError as e:
print(f"BLE error occurred: {e}")
await asyncio.sleep(5)
except aiomqtt.MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
except asyncio.CancelledError:
raise # Re-raise the CancelledError to stop the task
except Exception as e:
print(f"An error occurred during BLE communication: {e}")
await asyncio.sleep(5) # Wait for 5 seconds before retrying
try:
loop = asyncio.get_running_loop()
task = loop.create_task(run_mppt())
# Setup signal handler to cancel the task on termination
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame),
task.cancel)
await task # Wait for the task to complete
except asyncio.CancelledError:
pass # Task was cancelled, no need for an error message
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Solarlife MPPT BLE Client')
parser.add_argument('address', help='BLE device address')
parser.add_argument('--host', help='MQTT broker host', default='localhost')
parser.add_argument('--port', help='MQTT broker port', default=1883, type=int)
parser.add_argument('--username', help='MQTT username')
parser.add_argument('--password', help='MQTT password')
args = parser.parse_args()
asyncio.run(main(args.address, args.host, args.port, args.username, args.password))