-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspawn-mqtt-ml.py
74 lines (64 loc) · 1.88 KB
/
spawn-mqtt-ml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# MQTT MULTI LEVEL DEVICE EMULATOR FOR EDGEX
# spawns an emulated mqtt device with multithreading
# responds and generates async events
# for PING you need to be root
#
## USAGE : spawn-mqtt-ml.py <device_name> <response>
# UTILS FOR DEBUGGING
# listen on topic: mosquitto_sub -d -h <host> -t <topic>
# listen on all: mosquitto_sub -d -h <host> -t "#"
#from random import seed
#from random import randint
import paho.mqtt.client as mqtt
from pythonping import ping
import json
import sys
import time
import datetime
import threading
#BROKER_HOST="127.0.0.1"
BROKER_HOST="edgex"
# CONNECT & SUBSCRIBE
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
SRCTOPIC="command/"+DEV+"/#"
client.subscribe(SRCTOPIC)
# PUBLISH CALLBACK
def on_message(client, userdata, msg):
#ECHO MESSAGE
print(msg.topic+" "+str(msg.payload))
#PARSER
cmd=msg.topic.split("/")
#CRAFT TEST RESPONSE
data = {
"deviceName": DEV,
"message": RES
}
resp=json.dumps(data)
uid=msg.topic.split("/")[-1]
topic="command/response/"+DEV+"/"+uid
client.publish(topic, payload=resp)
# SIMULATE ASYNC EVENT
def gen_async_event(client):
while True:
rttavg = ping(BROKER_HOST).rtt_avg_ms
topic="incoming/data/"+DEV+"/ping"; client.publish(topic, payload=rttavg)
time.sleep(10)
event="heartbeat "+DEV+" "+str(datetime.datetime.now())
topic="incoming/data/"+DEV+"/message"; client.publish(topic, payload=event)
# MAIN
#seed(1)
# CHECK INPUT
DEV = sys.argv[1]
if len(sys.argv) < 3: RES="00000"
else: RES=sys.argv[2]
# LOOP
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
#client.connect("edgex", 1883, 60)
client.connect("edgex")
#start async event simulator
async_t = threading.Thread(target=gen_async_event,args=(client,))
async_t.start()
client.loop_forever()