forked from MarieluiseOden/FiLiP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiot_mqtt_example.py
246 lines (212 loc) · 11 KB
/
iot_mqtt_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
This example shows how to provision a virtual iot device in a FIWARE-based
IoT Platform using FiLiP and PahoMQTT
"""
import json
import logging
import random
import time
import paho.mqtt.client as mqtt
from urllib.parse import urlparse
from filip.models import FiwareHeader
from filip.models.ngsi_v2.iot import \
Device, \
DeviceCommand, \
DeviceAttribute, \
ServiceGroup, \
StaticDeviceAttribute
from filip.models.ngsi_v2.context import NamedCommand
from filip.clients.ngsi_v2 import HttpClient, HttpClientConfig
# Setting up logging
logging.basicConfig(
level='INFO',
format='%(asctime)s %(name)s %(levelname)s: %(message)s')
logger = logging.getLogger('filip-iot-example')
# Before running the example you should set some global variables
CB_URL = "http://yourHost:yourPort"
IOTA_URL = "http://yourHost:yourPort"
MQTT_BROKER_URL = "http://yourHost:yourPort"
DEVICE_APIKEY = 'filip-iot-example-device'
SERVICE_GROUP_APIKEY= 'filip-iot-example-service-group'
FIWARE_SERVICE = 'filip'
FIWARE_SERVICE_PATH = '/iot_examples'
if __name__ == '__main__':
# Since we want to use the multi-tenancy concept of fiware we always start
# with create a fiware header
fiware_header = FiwareHeader(service=FIWARE_SERVICE,
service_path=FIWARE_SERVICE_PATH)
# First we create our device configuration using the models provided for
# filip.models.ngsi_v2.iot
# creating an attribute for incoming measurements from e.g. a sensor we do
# add the metadata for units here using the unit models. You will later
# notice that the library automatically will augment the provided
# information about units.
device_attr1 = DeviceAttribute(name='temperature',
object_id='t',
type="Number",
metadata={"unit":
{"type": "Unit",
"value": {
"name": {
"type": "Text",
"value": "degree "
"Celsius"}
}}
})
# creating a static attribute that holds additional information
static_device_attr = StaticDeviceAttribute(name='info',
type="Text",
value="Filip example for virtual "
"IoT device")
# creating a command that the IoT device will liston to
device_command = DeviceCommand(name='heater', type="Boolean")
# NOTE: You need to know that if you define an apikey for a single device it
# will be only used for outgoing traffic. This is does not become very clear
# in the official documentation.
# https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html
device = Device(device_id='MyDevice',
entity_name='MyDevice',
entity_type='Thing2',
protocol='IoTA-JSON',
transport='MQTT',
apikey=DEVICE_APIKEY,
attributes=[device_attr1],
static_attributes=[static_device_attr],
commands=[device_command])
# You can also add additional attributes via the Device API
device_attr2 = DeviceAttribute(name='humidity',
object_id='h',
type="Number",
metadata={"unitText":
{"value": "percent",
"type": "Text"}})
device.add_attribute(attribute=device_attr2)
# This will print our configuration that we will send
logger.info("This is our device configuration: \n" + device.json(indent=2))
# Send device configuration to FIWARE via the IoT-Agent. We use the general
# ngsiv2 httpClient for this.
# This will automatically create an data entity in the context broker and
# make the device with it. The name of the entity will be our device_id in
# this case for more complex configuration you need to set the entity_name
# and entity_type in the previous Device-Model
# in order to change the apikey of out devices for incoming data we need to
# create a service group that our device weill be we attached to
# NOTE: This is important in order to adjust the apikey for incoming traffic.
service_group = ServiceGroup(service=fiware_header.service,
subservice=fiware_header.service_path,
apikey=SERVICE_GROUP_APIKEY,
resource='/iot/json')
# create the Http client node that once sent the device cannot be posted again
# and you need to use the update command
config=HttpClientConfig(cb_url=CB_URL, iota_url=IOTA_URL)
client = HttpClient(fiware_header=fiware_header, config=config)
client.iota.post_group(service_group=service_group, update=True)
client.iota.post_device(device=device, update=True)
time.sleep(0.5)
# check if the device is correctly configured. You will notice that
# unfortunately the iot API does not return all the metadata. However,
# it will still appear in the context-entity
device = client.iota.get_device(device_id=device.device_id)
logger.info(f"{device.json(indent=2)}")
# check if the data entity is created in the context broker
entity = client.cb.get_entity(entity_id=device.device_id,
entity_type=device.entity_type)
logger.info("This is our data entity belonging to our device: \n" +
entity.json(indent=2))
# create a mqtt client that we use as representation of an IoT device
# following the official documentation of Paho-MQTT.
# https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php
# NOTE: Since Paho-MQTT is no requirement to the library at current stage
# you probably need need to install it first.
#
# pip install paho-mqtt
#
# WE USE THE IMPLEMENTATION OF MQTTv5 which slightly different from former
# versions. Especially, the arguments of the well-known function have
# change a little. It's now more verbose than it used to be. Furthermore,
# you have to handle the properties argument.
# The callback for when the mqtt client receives a CONNACK response from the
# server. All callbacks need to have this specific arguments, Otherwise the
# client will not be able to execute them.
def on_connect(client, userdata, flags, reasonCode, properties=None):
if reasonCode != 0:
logger.error(f"Connection failed with error code: '{reasonCode}'")
else:
logger.info("Successfully, connected with result code "+str(
reasonCode))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# We do subscribe to the topic that the platfrom will publish our
# commands on
client.subscribe(f"/{device.apikey}/{device.device_id}/cmd")
# Callback when the command topic is succesfully subscribed
def on_subscribe(client, userdata, mid, granted_qos, properties=None):
logger.info("Successfully subscribed to with QoS: %s", granted_qos)
# The callback for when the device receives a PUBLISH message like a command
# from the server. Here, the received command will be printed and an
# command-acknowledge will be sent to the platform.
# NOTE: We need to use the apikey of the service-group to send the message to
# the platform
def on_message(client, userdata, msg):
logger.info(msg.topic+" "+str(msg.payload))
data = json.loads(msg.payload)
res = {k: v for k, v in data.items()}
client.publish(topic=f"/json/{service_group.apikey}"
f"/{device.device_id}/cmdexe",
payload=json.dumps(res))
def on_disconnect(client, userdata, reasonCode):
logger.info("MQTT client disconnected" + str(reasonCode))
mqtt_client = mqtt.Client(client_id="filip-iot-example",
userdata=None,
protocol=mqtt.MQTTv5,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.on_disconnect = on_disconnect
# connect to the server
mqtt_url = urlparse(MQTT_BROKER_URL)
mqtt_client.connect(host=mqtt_url.hostname,
port=mqtt_url.port,
keepalive=60,
bind_address="",
bind_port=0,
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
properties=None)
# create a non-blocking thread for mqtt communication
mqtt_client.loop_start()
for attr in device.attributes:
payload = json.dumps({attr.object_id: random.randint(0,9)})
logger.info("Send data to platform:" + payload)
mqtt_client.publish(
topic=f"/json/{service_group.apikey}/{device.device_id}/attrs",
payload=json.dumps({attr.object_id: random.randint(0,9)}))
time.sleep(1)
entity = client.cb.get_entity(entity_id=device.device_id,
entity_type=device.entity_type)
logger.info("This is updated entity status after measurements are "
"received: \n" + entity.json(indent=2))
# create and send a command via the context broker
context_command = NamedCommand(name=device_command.name,
value=False)
client.cb.post_command(entity_id=entity.id,
entity_type=entity.type,
command=context_command)
time.sleep(1)
# check the entity the command attribute should now show the PENDING
entity = client.cb.get_entity(entity_id=device.device_id,
entity_type=device.entity_type)
logger.info("This is updated entity status after the command was sent "
"and the acknowledge message was received: "
"\n" + entity.json(indent=2))
# close the mqtt listening thread
mqtt_client.loop_stop()
# disconnect the mqtt device
mqtt_client.disconnect()
# cleanup the server and delete everything
client.iota.delete_device(device_id=device.device_id)
client.iota.delete_group(resource=service_group.resource,
apikey=service_group.apikey)
client.cb.delete_entity(entity_id=entity.id,
entity_type=entity.type)