Skip to content

Commit

Permalink
[patch] First pass at subscription handling in gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
durera committed Feb 2, 2018
1 parent 5d4a53c commit 825df45
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 43 deletions.
9 changes: 4 additions & 5 deletions samples/managedGateway/simpleManagedGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import getopt
import time
import sys
import psutil
import platform
import json
import signal
Expand Down Expand Up @@ -49,12 +48,12 @@ def commandProcessor(cmd):
if __name__ == "__main__":
signal.signal(signal.SIGINT, interruptHandler)

organization = "org_id"
gatewayType = "MY GATEWAY TYPE"
gatewayId = "MY GATEWAY ID"
organization = ""
gatewayType = ""
gatewayId = ""
gatewayName = platform.node()
authMethod = "token"
authToken = "MASKED PASSWORD"
authToken = ""
configFilePath = None

# Seconds to sleep so as to check the error state
Expand Down
112 changes: 74 additions & 38 deletions src/ibmiotf/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class Client(AbstractClient):

def __init__(self, options, logHandlers=None):
self._options = options

# If we are disconnected we lose all our active subscriptions. Keep track of all subscriptions
# so that we can internally restore all subscriptions on reconnect
self._subscriptions = {}

#Defaults
if "domain" not in self._options:
# Default to the domain for the public cloud offering
Expand Down Expand Up @@ -94,7 +97,6 @@ def __init__(self, options, logHandlers=None):
raise ConfigurationException("Missing required property for token based authentication: auth-token")
else:
raise UnsupportedAuthenticationMethod(options['authMethod'])
self._options['subscriptionList'] = {}


self.COMMAND_TOPIC = "iot-2/type/" + self._options['type'] + "/id/" + self._options['id'] + "/cmd/+/fmt/+"
Expand All @@ -110,6 +112,8 @@ def __init__(self, options, logHandlers=None):
port = self._options['port']
)

# Add handler for subscriptions
self.client.on_subscribe = self.__onSubscribe

# Add handler for commands if not connected to QuickStart
if self._options['org'] != "quickstart":
Expand All @@ -127,6 +131,7 @@ def __init__(self, options, logHandlers=None):
self.commandCallback = None
self.deviceCommandCallback = None
self.notificationCallback = None
self.subscriptionCallback = None
self.client.on_connect = self.on_connect
self.setMessageEncoderModule('json', jsonCodec)
self.setMessageEncoderModule('json-iotf', jsonIotfCodec)
Expand All @@ -152,8 +157,13 @@ def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connectEvent.set()
self.logger.info("Connected successfully: %s" % (self.clientId))
#if self._options['org'] != "quickstart":
#self.subscribeToGatewayCommands()

# Restoring previous subscriptions
if len(self._subscriptions) > 0:
for subscription in self._subscriptions:
self.client.subscribe(subscription, qos=self._subscriptions[subscription])
self.logger.debug("Restored %s previous subscriptions" % len(self._subscriptions))

elif rc == 5:
self.logAndRaiseException(ConnectionException("Not authorized: s (%s, %s, %s)" % (self.clientId, self.username, self.password)))
else:
Expand Down Expand Up @@ -260,16 +270,19 @@ def publishGatewayEvent(self, event, msgFormat, data, qos=0, on_publish=None):
def subscribeToDeviceCommands(self, deviceType, deviceId, command='+', format='json', qos=1):
if self._options['org'] == "quickstart":
self.logger.warning("QuickStart not supported in Gateways")
return False
return 0

if not self.connectEvent.wait(timeout=10):
self.logger.warning("Unable to subscribe to device commands because gateway is not currently connected")
return False
return 0
else:
topic = 'iot-2/type/' + deviceType + '/id/' + deviceId + '/cmd/' + command + '/fmt/' + format
self.client.subscribe(topic, qos=qos)
self._options['subscriptionList'][topic] = qos
return True
(result, mid) = self.client.subscribe(topic, qos=qos)
if result == paho.MQTT_ERR_SUCCESS:
self._subscriptions[topic] = qos
return mid
else:
return 0



Expand All @@ -278,38 +291,50 @@ def subscribeToGatewayCommands(self, command='+', format='json', qos=1):
deviceId = self._options['id']
if self._options['org'] == "quickstart":
self.logger.warning("QuickStart not supported in Gateways")
return False
return 0
if not self.connectEvent.wait(timeout=10):
self.logger.warning("Unable to subscribe to gateway commands because gateway is not currently connected")
return False
return 0
else:
topic = 'iot-2/type/' + deviceType + '/id/' + deviceId + '/cmd/' + command + '/fmt/' + format
self.client.subscribe(topic)
self._options['subscriptionList'][topic] = qos
return True
(result, mid) = self.client.subscribe(topic, qos=qos)
if result == paho.MQTT_ERR_SUCCESS:
self._subscriptions[topic] = qos
return mid
else:
return 0


def subscribeToGatewayNotifications(self):
deviceType = self._options['type']
deviceId = self._options['id']
if self._options['org'] == "quickstart":
self.logger.warning("QuickStart not supported in Gateways")
return False
return 0
if not self.connectEvent.wait(timeout=10):
self.logger.warning("Unable to subscribe to notifications because gateway is not currently connected")
return False
return 0
else:
topic = 'iot-2/type/' + deviceType + '/id/' + deviceId + '/notify'
self.client.subscribe(topic)
#self._options['subscriptionList'][topic] = qos
return True


'''
Internal callback for device command messages, parses source device from topic string and
passes the information on to the registered device command callback
'''
(result, mid) = self.client.subscribe(topic, qos=0)
if result == paho.MQTT_ERR_SUCCESS:
self._subscriptions[topic] = 0
return mid
else:
return 0

def __onSubscribe(self, client, userdata, mid, grantedQoS):
'''
Internal callback for handling subscription acknowledgement
'''
self.logger.debug("Subscribe callback: mid: %s qos: %s" % (mid, grantedQoS))
if self.subscriptionCallback: self.subscriptionCallback(mid, grantedQoS)

def __onCommand(self, client, userdata, pahoMessage):
'''
Internal callback for device command messages, parses source device from topic string and
passes the information on to the registered device command callback
'''
with self._recvLock:
self.recv = self.recv + 1
try:
Expand All @@ -320,11 +345,11 @@ def __onCommand(self, client, userdata, pahoMessage):
self.logger.debug("Received device command '%s'" % (command.command))
if self.commandCallback: self.commandCallback(command)

'''
Internal callback for gateway command messages, parses source device from topic string and
passes the information on to the registered device command callback
'''
def __onDeviceCommand(self, client, userdata, pahoMessage):
'''
Internal callback for gateway command messages, parses source device from topic string and
passes the information on to the registered device command callback
'''
with self._recvLock:
self.recv = self.recv + 1
try:
Expand Down Expand Up @@ -403,7 +428,7 @@ def __init__(self, options, logHandlers=None, deviceInfo=None):

# Add handler for supported device management commands
self.client.message_callback_add("iotdm-1/#", self.__onDeviceMgmtResponse)
self.client.on_subscribe = self.on_subscribe
self.client.on_subscribe = self.__onSubscribe

self.readyForDeviceMgmt = threading.Event()

Expand Down Expand Up @@ -496,19 +521,30 @@ def on_connect(self, client, userdata, flags, rc):
if self._options['org'] != "quickstart":
dm_response_topic = ManagedClient.DM_RESPONSE_TOPIC_TEMPLATE % (self._gatewayType,self._gatewayId)
dm_observe_topic = ManagedClient.DM_OBSERVE_TOPIC_TEMPLATE % (self._gatewayType,self._gatewayId)
self.client.subscribe( [(dm_response_topic, 1), (dm_observe_topic, 1), (self.COMMAND_TOPIC, 1)] )
(self.dmSubscriptionResult, self.dmSubscriptionMid) = self.client.subscribe( [(dm_response_topic, 1), (dm_observe_topic, 1), (self.COMMAND_TOPIC, 1)] )

if self.dmSubscriptionResult != paho.MQTT_ERR_SUCCESS:
self.logAndRaiseException(ConnectionException("Unable to subscribe to device management topics"))

elif rc == 5:
self.logAndRaiseException(ConnectionException("Not authorized: s (%s, %s, %s)" % (self.clientId, self.username, self.password)))
else:
self.logAndRaiseException(ConnectionException("Connection failed: RC= %s" % (rc)))


def on_subscribe(self, client, userdata, mid, granted_qos):
# Once Watson IoT acknowledges the subscriptions we are able to process commands and responses from device management server
self.subscriptionsAcknowledged.set()
self.manage()


def __onSubscribe(self, client, userdata, mid, grantedQoS):
'''
Internal callback for handling subscription acknowledgement
'''
if mid == self.dmSubscriptionMid:
# Once Watson IoT acknowledges the DM subscriptions we are able to
# process commands and responses from device management server
self.subscriptionsAcknowledged.set()
self.manage()
else:
self.logger.debug("Subscribe callback: mid: %s qos: %s" % (mid, grantedQoS))
if self.subscriptionCallback: self.subscriptionCallback(mid, grantedQoS)


def manage(self, lifetime=3600, supportDeviceActions=False, supportFirmwareActions=False):
# TODO: throw an error, minimum lifetime this client will support is 1 hour, but for now set lifetime to infinite if it's invalid
if lifetime < 3600:
Expand Down

0 comments on commit 825df45

Please sign in to comment.