-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththingsCloud.lua
355 lines (322 loc) · 10.5 KB
/
thingsCloud.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
-- 合宙模组 luat 接入 ThingsCloud 云平台的代码库
-- ThingsCloud MQTT 接入文档:https://docs.thingscloud.xyz/guide/connect-device/mqtt.html
require "log"
require "http"
require "mqtt"
module(..., package.seeall)
local projectKey = "" -- project_key
local accessToken = "" -- access_token
local deviceKey = "" -- device_key
local typeKey = "" -- type_key
local host, port = "", 1883
local apiEndpoint = "" -- api endpoint
local mqttc = nil
local connected = false
local deviceInfo = {}
local certFetchRetryMax = 5
local certFetchRetryCnt = 0
local mqttConnectRetryMax = 5
local socketRetryMax = 15
local SUBSCRIBE_PREFIX = {
ATTRIBUTES_GET_REPONSE = "attributes/get/response/",
ATTRIBUTES_PUSH = "attributes/push",
COMMAND_SEND = "command/send/",
COMMAND_REPLY_RESPONSE = "command/reply/response/",
DATA_SET = "data/",
GW_ATTRIBUTES_PUSH = "gateway/attributes/push",
GW_COMMAND_SEND = "gateway/command/send"
}
local EVENT_TYPES = {
fetch_cert = true,
connect = true,
attributes_report_response = true,
attributes_get_response = true,
attributes_push = true,
command_send = true,
command_reply_response = true,
data_set = true,
gw_attributes_push = true,
gw_command_send = true
}
local CALLBACK = {}
local QUEUE = {
PUBLISH = {}
}
local logger = {}
function logger.info(...)
log.info("ThingsCloud", ...)
end
function on(eType, cb)
if not eType or not EVENT_TYPES[eType] or type(cb) ~= "function" then
return
end
CALLBACK[eType] = cb
logger.info("on", eType)
end
local function cb(eType, ...)
if not eType or not EVENT_TYPES[eType] or not CALLBACK[eType] then
return
end
CALLBACK[eType](...)
logger.info("cb", eType, ...)
end
function setCertFetchRetryMax(number)
certFetchRetryMax = number
end
function setMqttConnectRetryMax(number)
mqttConnectRetryMax = number
end
function setSocketRetryMax(number)
socketRetryMax = number
end
local function mqttConnect()
local retryCount = 0
logger.info("mqtt connecting...")
while not mqttc:connect(host, port) do
-- try reconnect
retryCount = retryCount + 1
if (retryCount > mqttConnectRetryMax) then
-- connect fail callback
cb("connect", false)
return
end
sys.wait(1000 * 10)
logger.info(string.format("mqtt reconnecting[%d/%d]...", retryCount, mqttConnectRetryMax))
end
-- connected succ
connected = true
logger.info("mqtt connected")
-- auto subscribe basic topics
subscribe("attributes/push")
subscribe("attributes/get/response/+")
subscribe("command/send/+")
subscribe("command/reply/response/+")
-- connect succ callback
cb("connect", true)
end
function connect(param)
if not param.host or not param.projectKey then
logger.info("host or projectKey not found")
return false
end
host = param.host
projectKey = param.projectKey
deviceKey = param.deviceKey or ""
local retryCount = 0
logger.info("waiting socket ready...")
while not socket.isReady() do
retryCount = retryCount + 1
if (retryCount > socketRetryMax) then
logger.info("socket retry timeout")
return false
end
sys.wait(2000)
logger.info(string.format("waiting socket retrying[%d/%d]...", retryCount, socketRetryMax))
end
if param.accessToken then
accessToken = param.accessToken
sys.taskInit(function()
sys.taskInit(procConnect)
end)
else
if not param.apiEndpoint then
logger.info("apiEndpoint not found")
return false
end
apiEndpoint = param.apiEndpoint
if param.typeKey ~= "" or param.typeKey ~= nil then
typeKey = param.typeKey
end
sys.taskInit(function()
sys.taskInit(fetchDeviceCert)
end)
end
return true
end
function disconnect()
connected = false
mqttc:disconnect()
end
-- 一型一密,使用 DeviceKey,领取设备证书 AccessToken
function fetchDeviceCert(is_retry)
if is_retry == nil then
certFetchRetryCnt = 0
end
local header = {}
header["Project-Key"] = projectKey
header["Content-Type"] = "application/json"
local url = apiEndpoint .. "/device/v1/certificate"
if deviceKey == "" then
-- 如果没有指定 deviceKey,默认使用模组 IMEI
deviceKey = misc.getImei()
end
http.request("POST", url, nil, header, json.encode({
device_key = deviceKey,
type_key = typeKey
}), 3000, function(result, prompt, head, body)
log.info("http fetch cert:", deviceKey, result, prompt, head, body)
if result and prompt == "200" then
local data = json.decode(body)
if data.result == 1 then
sys.taskInit(function()
cb("fetch_cert", true)
end)
deviceInfo = data.device
accessToken = deviceInfo.access_token
sys.taskInit(procConnect)
return
end
end
if certFetchRetryCnt < certFetchRetryMax then
-- 重试
certFetchRetryCnt = certFetchRetryCnt + 1
sys.wait(1000 * 10)
logger.info(string.format("cert fetch retry[%d/%d]...", certFetchRetryCnt, certFetchRetryMax))
fetchDeviceCert(true)
else
cb("fetch_cert", false)
end
end)
end
function procConnect()
mqttc = mqtt.client(misc.getImei(), 300, accessToken, projectKey)
mqttConnect()
if not isConnected() then
return
end
while true do
if #QUEUE.PUBLISH > 0 then
local item = table.remove(QUEUE.PUBLISH, 1)
logger.info("mqtt publish", item.topic, item.data)
if mqttc:publish(item.topic, item.data) then
-- publish succ
end
end
local result, data, param = mqttc:receive(100, "pub_msg")
if result then
logger.info("mqtt receive", data.topic or nil, data.payload or "nil")
if (data.topic:sub(1, SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE:len()) ==
SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE) then
local response = json.decode(data.payload)
local responseId = tonumber(data.topic:sub(SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE:len() + 1))
cb("attributes_get_response", response, responseId)
elseif (data.topic == SUBSCRIBE_PREFIX.ATTRIBUTES_PUSH) then
local response = json.decode(data.payload)
cb("attributes_push", response)
elseif (data.topic:sub(1, SUBSCRIBE_PREFIX.COMMAND_SEND:len()) == SUBSCRIBE_PREFIX.COMMAND_SEND) then
local response = json.decode(data.payload)
if response.method and response.params then
cb("command_send", response)
end
elseif (data.topic:sub(1, SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE:len()) ==
SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE) then
local response = json.decode(data.payload)
local replyId = tonumber(data.topic:sub(SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE:len() + 1))
cb("command_reply_response", response, replyId)
elseif (data.topic:sub(1, SUBSCRIBE_PREFIX.DATA_SET:len()) == SUBSCRIBE_PREFIX.DATA_SET) then
local tmp = split(data.topic, "/")
if #tmp == 3 and tmp[3] == "set" then
local identifier = tmp[2]
cb("data_set", data.payload)
end
elseif (data.topic == SUBSCRIBE_PREFIX.GW_ATTRIBUTES_PUSH) then
local response = json.decode(data.payload)
cb("gw_attributes_push", response)
elseif (data.topic == SUBSCRIBE_PREFIX.GW_COMMAND_SEND) then
local response = json.decode(data.payload)
cb("gw_command_send", response)
end
else
if data == "pub_msg" then
elseif data == "timeout" then
-- no downlink message
elseif data == "CLOSED" then
logger.info("mqtt closed")
disconnect()
sys.wait(3000)
sys.taskInit(procConnect)
break
else
logger.info("mqtt disconnected")
disconnect()
sys.wait(3000)
sys.taskInit(procConnect)
break
end
end
if not isConnected() then
break
end
end
logger.info("mqtt loop break")
end
function isConnected()
return connected
end
local function insertPublishQueue(topic, data)
if not isConnected() then
return
end
table.insert(QUEUE.PUBLISH, {
topic = topic,
data = data
})
end
function subscribe(topic)
if not isConnected() then
return
end
logger.info("subscribe", topic)
mqttc:subscribe(topic)
end
function publish(topic, data)
insertPublishQueue(topic, data)
end
function reportAttributes(tableData)
insertPublishQueue("attributes", json.encode(tableData))
sys.publish("QUEUE_PUBLISH", "ATTRIBUTES")
end
function getAttributes(attrsList, options)
options = options or {}
options.getId = options.getId or 1000
local data = {
keys = attrsList
}
if #attrsList == 0 then
data = {}
end
insertPublishQueue("attributes/get/" .. tostring(options.getId), json.encode(data))
end
function reportEvent(event, options)
options = options or {}
options.eventId = options.eventId or 1000
insertPublishQueue("event/report/" .. tostring(options.eventId), json.encode(event))
end
function replyCommand(commandReply, options)
options = options or {}
options.replyId = options.replyId or 1000
insertPublishQueue("command/reply/" .. tostring(options.replyId), json.encode(commandReply))
end
function publishCustomTopic(identifier, data, options)
if type(identifier) ~= "string" then
return
end
insertPublishQueue("data/" .. identifier, data)
end
function getAccessToken()
return accessToken
end
function isGateway()
if deviceInfo.conn_type == "3" then
return true
end
return false
end
function split(str, sep)
local sep, fields = sep or ":", {}
local pattern = string.format("([^%s]+)", sep)
str:gsub(pattern, function(c)
fields[#fields + 1] = c
end)
return fields
end