Skip to content

Commit

Permalink
[Session keepalive] Use subscription keepalive instead of renewing se…
Browse files Browse the repository at this point in the history
…cure channel

When a session has a short session timeout, the secure_channel is
renewed at 75% of it lifetime. While this is convenient to keep
the session alives, this might be an overkill in some situations
since the encryption involved behind the scene can be expensive,
specifically on hardware with limited resources (i.e: OPC-UA
server embedded on PLC module for example).

What I suggested here is to have more control over the secure channel
renewal and instead to rely on the subscription keepalive mechanism,
thus, the client won't timeout when subscriptions are inactive.
There's still the case when there's no subscription (client is
connected but idling), in this situation the user has to set a
secure_channel_timeout < session_timeout.

Finally, the library ua-server doesn't respect the session_timeout
sent by the client during session_creation. Probably worth to follow-up
on this.

Test with MaxKeepAliveCount == 22
(session_timeout = 30; publish_interval = 1000)
```
2021-02-07 16:23:00,147 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: CreateSubscriptionRequest(TypeId:i=787, RequestHeader:RequestHeader(AuthenticationToken:b=b'2\xd2\x82w\xad\x0f\x99\x9a %\xfaj\xb6\xb9\xaf\xa8\xe7\x13\xe6\xec\x15G\xf7G\x15\xbb\x82\x1d\xda\x17\xafh', Timestamp:2021-02-08 00:23:00.147523, RequestHandle:4, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:4000, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:CreateSubscriptionParameters(RequestedPublishingInterval:1000, RequestedLifetimeCount:10000, RequestedMaxKeepAliveCount:22, MaxNotificationsPerPublish:10000, PublishingEnabled:True, Priority:0))
2021-02-07 16:23:00,170 - asyncua.client.ua_client.UaClient - DEBUG - publish []
2021-02-07 16:23:00,170 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'2\xd2\x82w\xad\x0f\x99\x9a %\xfaj\xb6\xb9\xaf\xa8\xe7\x13\xe6\xec\x15G\xf7G\x15\xbb\x82\x1d\xda\x17\xafh', Timestamp:2021-02-08 00:23:00.170972, RequestHandle:6, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[]))
21-02-08 00:23:00.939388))], DiagnosticInfos:[])]), Results:[], DiagnosticInfos:[])
DataChangeNotification(<asyncua.common.subscription.SubscriptionItemData object at 0x7fdbf1444c10>, MonitoredItemNotification(ClientHandle:201, Value:DataValue(Value:Variant(val:300000,type:VariantType.Int32), StatusCode:StatusCode(Good), SourceTimestamp:2021-02-08 00:23:00.900000, ServerTimestamp:2021-02-08 00:23:00.939388)))

2021-02-07 16:23:01,189 - asyncua.client.ua_client.UaClient - DEBUG - publish [SubscriptionAcknowledgement(SubscriptionId:75, SequenceNumber:1)]
2021-02-07 16:23:01,189 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'2\xd2\x82w\xad\x0f\x99\x9a %\xfaj\xb6\xb9\xaf\xa8\xe7\x13\xe6\xec\x15G\xf7G\x15\xbb\x82\x1d\xda\x17\xafh', Timestamp:2021-02-08 00:23:01.189588, RequestHandle:7, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[SubscriptionAcknowledgement(SubscriptionId:75, SequenceNumber:1)]))
2021-02-07 16:23:23,198 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:75, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:23:23.195861, NotificationData:[]), Results:[StatusCode(Good)], DiagnosticInfos:[])

2021-02-07 16:23:45,175 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:75, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:23:45.172315, NotificationData:[]), Results:[], DiagnosticInfos:[])
2021-02-07 16:23:45,175 - asyncua.client.ua_client.UaClient - DEBUG - publish []
2021-02-07 16:23:45,175 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'2\xd2\x82w\xad\x0f\x99\x9a %\xfaj\xb6\xb9\xaf\xa8\xe7\x13\xe6\xec\x15G\xf7G\x15\xbb\x82\x1d\xda\x17\xafh', Timestamp:2021-02-08 00:23:45.175790, RequestHandle:9, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[]))
2021-02-07 16:24:07,185 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:75, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:24:07.182767, NotificationData:[]), Results:[], DiagnosticInfos:[])
```

Test with MaxKeepAliveCount == 0
```
2021-02-07 16:25:40,994 - asyncua.client.ua_client.UaClient - DEBUG - create_subscription
2021-02-07 16:25:40,994 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: CreateSubscriptionRequest(TypeId:i=787, RequestHeader:RequestHeader(AuthenticationToken:b=b'\x18*m\xa6E\xf3?\x9d\x86\xe2\xe9rgI\xa7\x073\xe90\xdb\x82\x82~F\x81\xe8\xb2T\xdb\xb0\xf8\xea', Timestamp:2021-02-08 00:25:40.994336, RequestHandle:4, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:4000, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:CreateSubscriptionParameters(RequestedPublishingInterval:1000, RequestedLifetimeCount:10000, RequestedMaxKeepAliveCount:0, MaxNotificationsPerPublish:10000, PublishingEnabled:True, Priority:0))
2021-02-07 16:25:45,006 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:76, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:25:45.003386, NotificationData:[]), Results:[StatusCode(Good)], DiagnosticInfos:[])
2021-02-07 16:25:45,007 - asyncua.client.ua_client.UaClient - DEBUG - publish []
2021-02-07 16:25:45,007 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'\x18*m\xa6E\xf3?\x9d\x86\xe2\xe9rgI\xa7\x073\xe90\xdb\x82\x82~F\x81\xe8\xb2T\xdb\xb0\xf8\xea', Timestamp:2021-02-08 00:25:45.007090, RequestHandle:8, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[]))

2021-02-07 16:25:42,023 - asyncua.client.ua_client.UaClient - DEBUG - publish [SubscriptionAcknowledgement(SubscriptionId:76, SequenceNumber:1)]
2021-02-07 16:25:42,023 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'\x18*m\xa6E\xf3?\x9d\x86\xe2\xe9rgI\xa7\x073\xe90\xdb\x82\x82~F\x81\xe8\xb2T\xdb\xb0\xf8\xea', Timestamp:2021-02-08 00:25:42.023578, RequestHandle:7, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[SubscriptionAcknowledgement(SubscriptionId:76, SequenceNumber:1)]))

2021-02-07 16:25:45,006 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:76, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:25:45.003386, NotificationData:[]), Results:[StatusCode(Good)], DiagnosticInfos:[])
2021-02-07 16:25:45,007 - asyncua.client.ua_client.UaClient - DEBUG - publish []
2021-02-07 16:25:45,007 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'\x18*m\xa6E\xf3?\x9d\x86\xe2\xe9rgI\xa7\x073\xe90\xdb\x82\x82~F\x81\xe8\xb2T\xdb\xb0\xf8\xea', Timestamp:2021-02-08 00:25:45.007090, RequestHandle:8, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[]))

2021-02-07 16:25:48,016 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:76, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:25:48.013598, NotificationData:[]), Results:[], DiagnosticInfos:[])
2021-02-07 16:25:48,016 - asyncua.client.ua_client.UaClient - DEBUG - publish []
2021-02-07 16:25:48,017 - asyncua.client.ua_client.UASocketProtocol - DEBUG - Sending: PublishRequest(TypeId:i=826, RequestHeader:RequestHeader(AuthenticationToken:b=b'\x18*m\xa6E\xf3?\x9d\x86\xe2\xe9rgI\xa7\x073\xe90\xdb\x82\x82~F\x81\xe8\xb2T\xdb\xb0\xf8\xea', Timestamp:2021-02-08 00:25:48.017009, RequestHandle:9, ReturnDiagnostics:0, AuditEntryId:None, TimeoutHint:0, AdditionalHeader:ExtensionObject(TypeId:i=0, Encoding:0, None bytes)), Parameters:PublishParameters(SubscriptionAcknowledgements:[]))
2021-02-07 16:25:51,016 - asyncua.common.subscription - INFO - Publish callback called with result: PublishResult(SubscriptionId:76, AvailableSequenceNumbers:[], MoreNotifications:False, NotificationMessage:NotificationMessage(SequenceNumber:2, PublishTime:2021-02-08 00:25:51.012809, NotificationData:[]), Results:[], DiagnosticInfos:[])
```
  • Loading branch information
ZuZuD authored and oroulet committed Feb 8, 2021
1 parent 705898c commit ee844ed
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
19 changes: 16 additions & 3 deletions asyncua/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ async def _renew_channel_loop(self):
but it does not cost much..
"""
try:
duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
# Part4 5.5.2.1:
# Clients should request a new SecurityToken after 75 % of its lifetime has elapsed
duration = self.secure_channel_timeout * 0.75 / 1000
while True:
# 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
await asyncio.sleep(duration)
_logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
Expand Down Expand Up @@ -546,14 +547,26 @@ async def create_subscription(self, period, handler, publishing=True):
params = ua.CreateSubscriptionParameters()
params.RequestedPublishingInterval = period
params.RequestedLifetimeCount = 10000
params.RequestedMaxKeepAliveCount = 3000
params.RequestedMaxKeepAliveCount = self.get_keepalive_count(period)
params.MaxNotificationsPerPublish = 10000
params.PublishingEnabled = publishing
params.Priority = 0
subscription = Subscription(self.uaclient, params, handler)
await subscription.init()
return subscription

def get_keepalive_count(self, period) -> int:
"""
We request the server to send a Keepalive notification when
no notification has been received for 75% of the session lifetime.
This is especially useful to keep the sesssion up
when self.session_timeout < self.secure_channel_timeout.
Part4 5.13.2: If the requested value is 0, the Server
shall revise with the smallest supported keep-alive count.
"""
return int((self.session_timeout / period) * 0.75)

async def get_namespace_array(self):
ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
return await ns_node.read_value()
Expand Down
3 changes: 2 additions & 1 deletion asyncua/client/ha/ha_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class HaConfig:
reconciliator_timer: int = 15
session_timeout: int = 60
request_timeout: int = 30
secure_channel_timeout: int = 3600
session_name: str = "HaClient"
urls: List[str] = field(default_factory=list)

Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(
c = Client(url, timeout=self._config.request_timeout, loop=self.loop)
# timeouts for the session and secure channel are in ms
c.session_timeout = self._config.session_timeout * 1000
c.secure_channel_timeout = self._config.request_timeout * 1000
c.secure_channel_timeout = self._config.secure_channel_timeout * 1000
c.description = self._config.session_name
server_info = ServerInfo(url)
self.clients[c] = server_info
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ha_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def test_init_ha_client(self, ha_client):
urls.remove(url)
assert client.session_timeout == ha_config.session_timeout * 1000
assert client.session_timeout == ha_client.session_timeout * 1000
assert client.secure_channel_timeout == ha_config.request_timeout * 1000
assert client.secure_channel_timeout == ha_config.secure_channel_timeout * 1000
assert client.uaclient._timeout == ha_config.request_timeout
assert client.description == ha_config.session_name
# ideal map is empty at startup
Expand Down
23 changes: 22 additions & 1 deletion tests/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from datetime import datetime, timedelta
from asynctest import CoroutineMock

from asyncua.common.subscription import Subscription
import asyncua
from asyncua import ua
from asyncua import ua, Client

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -356,6 +357,26 @@ async def test_subscription_data_change_many(opc):
await sub.delete()
await opc.opc.delete_nodes([v1, v2])

async def test_subscription_keepalive_count(mocker):
"""
Check the subscription parameter MaxKeepAliveCount value
with various publishInterval and session_timeout values.
"""

mock_subscription = mocker.patch("asyncua.common.subscription.Subscription.init", new=CoroutineMock())
c = Client("opc.tcp://fake")
# session timeout < publish_interval
c.session_timeout = 30000 # ms
publish_interval = 1000 # ms
handler = 123
sub = await c.create_subscription(publish_interval, handler)
assert sub.parameters.RequestedMaxKeepAliveCount == 22
# session_timeout > publish_interval
c.session_timeout = 30000
publish_interval = 75000
sub = await c.create_subscription(publish_interval, handler)
assert sub.parameters.RequestedMaxKeepAliveCount == 0


async def test_subscribe_server_time(opc):
"""
Expand Down

0 comments on commit ee844ed

Please sign in to comment.