-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt_components.py
72 lines (55 loc) · 1.9 KB
/
mqtt_components.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from xai_components.base import InArg, OutArg, InCompArg, Component, BaseComponent, xai_component
from paho.mqtt import client as mqtt_client
@xai_component
class MQTTConnect(Component):
broker: InArg[str]
port: InArg[int]
client_id: InArg[str]
username: InArg[str]
password: InArg[str]
def execute(self, ctx) -> None:
client = mqtt_client.Client(self.client_id.value)
if self.username.value is not None:
client.username_pw_set(self.username.value, self.password.value)
client.connect(self.broker.value, self.port.value)
ctx['mqtt_client'] = client
@xai_component
class MQTTPublish(Component):
topic: InArg[str]
message: InArg[str]
result: OutArg[int]
def execute(self, ctx) -> None:
client = ctx['mqtt_client']
result = client.publish(self.topic.value, self.message.value)
client.loop()
self.result.value = result[0]
@xai_component
class MQTTSubscribe(Component):
on_message: BaseComponent
topic: InArg[str]
message: OutArg[str]
def execute(self, ctx) -> None:
client = ctx['mqtt_client']
client.subscribe(self.topic.value)
client.on_message = lambda c, data, msg: self.process_message(ctx, c, data, msg)
def process_message(self, ctx, client, userdata, message):
self.message.value = message.payload.decode()
ctx['mqtt_message'] = message
ctx['mqtt_userdata'] = userdata
self.on_message.do(ctx)
@xai_component
class MQTTStartLoop(Component):
def execute(self, ctx) -> None:
client = ctx['mqtt_client']
try:
client.loop_forever()
except Exception as e:
print(e)
@xai_component
class MQTTDisconnect(Component):
def execute(self, ctx) -> None:
client = ctx['mqtt_client']
try:
client.loop_stop()
except Exception as e:
print(e)