Skip to content

Commit

Permalink
DlmsConnectionSettings for handling special behaviour.
Browse files Browse the repository at this point in the history
Some manufacturers have spacial cases in the communication. To handle all these differences the settings object is introduced so it can be serialized and documented.
Also added support for first settings to suppoirt Fiorentini meters.

Fixes #72
  • Loading branch information
Henrik Wahlgren committed Jan 22, 2024
1 parent 45df228 commit dc773c4
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 37 deletions.
4 changes: 3 additions & 1 deletion dlms_cosem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dlms_cosem import cosem, dlms_data, enumerations, exceptions, state, utils
from dlms_cosem.security import AuthenticationMethodManager
from dlms_cosem.io import DlmsTransport
from dlms_cosem.connection import DlmsConnection
from dlms_cosem.connection import DlmsConnection, DlmsConnectionSettings
from dlms_cosem.cosem.selective_access import RangeDescriptor
from dlms_cosem.protocol import acse, xdlms
from dlms_cosem.protocol.xdlms import ConfirmedServiceError
Expand Down Expand Up @@ -41,6 +41,7 @@ class DlmsClient:
client_initial_invocation_counter: int = attr.ib(default=0)
meter_initial_invocation_counter: int = attr.ib(default=0)
timeout: int = attr.ib(default=10)
connection_settings: Optional[DlmsConnectionSettings] = attr.ib(default=None)

dlms_connection: DlmsConnection = attr.ib(
default=attr.Factory(
Expand All @@ -55,6 +56,7 @@ class DlmsClient:
max_pdu_size=self.max_pdu_size,
client_invocation_counter=self.client_initial_invocation_counter,
meter_invocation_counter=self.meter_initial_invocation_counter,
settings=self.connection_settings,
),
takes_self=True,
)
Expand Down
70 changes: 46 additions & 24 deletions dlms_cosem/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import *

import attr
import attrs
import structlog

from dlms_cosem import enumerations as enums
Expand Down Expand Up @@ -101,6 +102,18 @@ class ProtectionError(Exception):
"""Unable to perform cryptographic function"""


@attr.s(auto_attribs=True)
class DlmsConnectionSettings:
"""
Class to hold values and settings to handle different quirks of different dlms
server implementations and manufacturers specific irregularity.
"""

# In Pietro Fiorentini local communication over HDLC the system title in GeneralGlobalCiphering is omitted.
empty_system_title_in_general_glo_ciphering: bool = attr.ib(default=False)



@attr.s(auto_attribs=True)
class DlmsConnection:
"""
Expand Down Expand Up @@ -193,18 +206,23 @@ class DlmsConnection:
takes_self=True,
)
)
settings: DlmsConnectionSettings = attr.ib(
default=DlmsConnectionSettings(),
converter=attr.converters.default_if_none(
factory=DlmsConnectionSettings
))

@classmethod
def with_pre_established_association(
cls,
conformance: Conformance,
max_pdu_size: int = 65535,
global_encryption_key: Optional[bytes] = None,
global_authentication_key: Optional[bytes] = None,
client_invocation_counter: Optional[int] = None,
meter_invocation_counter: Optional[int] = None,
client_system_title: Optional[bytes] = None,
meter_system_title: Optional[bytes] = None,
cls,
conformance: Conformance,
max_pdu_size: int = 65535,
global_encryption_key: Optional[bytes] = None,
global_authentication_key: Optional[bytes] = None,
client_invocation_counter: Optional[int] = None,
meter_invocation_counter: Optional[int] = None,
client_system_title: Optional[bytes] = None,
meter_system_title: Optional[bytes] = None,
):
"""
A pre-established association does not need the ACSE APDUs. It is
Expand Down Expand Up @@ -249,7 +267,7 @@ def send(self, event) -> bytes:
# Only invalid state change is to send the ReleaseRequestApdu. But it is not
# possible to close a pre-established association.
if isinstance(
event, (acse.ReleaseRequest, acse.ApplicationAssociationRequest)
event, (acse.ReleaseRequest, acse.ApplicationAssociationRequest)
):
raise exceptions.PreEstablishedAssociationError(
f"It is not allowed to send a {type(event)} when the association is"
Expand Down Expand Up @@ -311,8 +329,8 @@ def next_event(self):

if self.is_pre_established:
if isinstance(
apdu,
(acse.ApplicationAssociationResponse, acse.ReleaseResponse),
apdu,
(acse.ApplicationAssociationResponse, acse.ReleaseResponse),
):
raise exceptions.PreEstablishedAssociationError(
f"Received a {apdu.__class__.__name__}. In a pre-established "
Expand Down Expand Up @@ -344,13 +362,13 @@ def next_event(self):
if apdu.status != enums.ActionResultStatus.SUCCESS:
self.state.process_event(dlms_state.HlsFailed())
if self.authentication.hls_meter_data_is_valid(
utils.parse_as_dlms_data(apdu.data), self
utils.parse_as_dlms_data(apdu.data), self
):
self.state.process_event(dlms_state.HlsSuccess())
else:
self.state.process_event(dlms_state.HlsFailed())
elif isinstance(
apdu, (xdlms.ActionResponseNormalWithError, xdlms.ActionResponseNormal)
apdu, (xdlms.ActionResponseNormalWithError, xdlms.ActionResponseNormal)
):
self.state.process_event(dlms_state.HlsFailed())

Expand All @@ -370,8 +388,8 @@ def use_protection(self) -> bool:
:return:
"""
if (
self.global_encryption_key is not None
or self.global_authentication_key is not None
self.global_encryption_key is not None
or self.global_authentication_key is not None
):
return True
else:
Expand All @@ -387,7 +405,6 @@ def protect(self, event) -> Any:
# ASCE have different rules about protection
if isinstance(event, (acse.ApplicationAssociationRequest, acse.ReleaseRequest)):
if event.user_information:

ciphered_text, ic = self.encrypt(
event.user_information.content.to_bytes()
)
Expand All @@ -404,8 +421,13 @@ def protect(self, event) -> Any:
elif isinstance(event, AbstractXDlmsApdu):
ciphered_text, ic = self.encrypt(event.to_bytes())

if self.settings.empty_system_title_in_general_glo_ciphering:
system_title = None
else:
system_title = self.client_system_title

event = xdlms.GeneralGlobalCipher(
system_title=self.client_system_title,
system_title=system_title,
security_control=self.security_control,
invocation_counter=ic,
ciphered_text=ciphered_text,
Expand Down Expand Up @@ -448,8 +470,8 @@ def encrypt(self, plain_text: bytes) -> Tuple[bytes, int]:
return ciphered_text, invocation_counter

def decrypt(
self,
ciphered_text: bytes,
self,
ciphered_text: bytes,
):
"""
Encrypts ciphered bytes according to the current association and connection.
Expand Down Expand Up @@ -487,12 +509,12 @@ def unprotect(self, event):
"""

if isinstance(
event, (acse.ApplicationAssociationResponse, acse.ReleaseResponse)
event, (acse.ApplicationAssociationResponse, acse.ReleaseResponse)
):
if event.user_information:
if isinstance(
event.user_information.content,
xdlms.GlobalCipherInitiateResponse,
event.user_information.content,
xdlms.GlobalCipherInitiateResponse,
):
self.update_meter_invocation_counter(
event.user_information.content.invocation_counter
Expand Down Expand Up @@ -563,7 +585,7 @@ def get_rlrq(self) -> acse.ReleaseRequest:
)

def update_negotiated_parameters(
self, aare: acse.ApplicationAssociationResponse
self, aare: acse.ApplicationAssociationResponse
) -> None:
"""
When an AARE is received we need to update the connection to the negotiated
Expand Down
38 changes: 34 additions & 4 deletions dlms_cosem/hdlc/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ class UnNumberedAcknowledgmentFrame(BaseHdlcFrame):

fixed_length_bytes = 7

@property
def frame_length(self) -> int:

if self.information:
# with an informatoin field it should have HSC
fixed = self.fixed_length_bytes
else:
fixed = self.fixed_length_bytes - 2

return (
fixed
+ self.destination_address.length
+ self.source_address.length
+ len(self.information)
)


@property
def information(self) -> bytes:
"""
Expand All @@ -203,6 +220,17 @@ def information(self) -> bytes:

return b"".join(out)

@property
def hcs(self) -> bytes:
"""
UnNumberedAcknowledgmentFrame is an HDLC S-frame and does if it does not contain an
information field it should also not contain a HCS field. That means that there is no HCS field present, only FCS
"""
if self.payload:
return HCS.calculate_for(self.header_content)
else:
return b""

def get_control_field(self):
return fields.UaControlField()

Expand Down Expand Up @@ -232,10 +260,12 @@ def from_bytes(cls, frame_bytes: bytes):

frame = cls(destination_address, source_address, information)

if hcs != frame.hcs:
raise hdlc_exceptions.HdlcParsingError(
f"HCS is not correct. " f"Calculated: {frame.hcs!r}, in data: {hcs!r}"
)
if frame.hcs:
"Some frames might not have hcs so we should not check it."
if hcs != frame.hcs:
raise hdlc_exceptions.HdlcParsingError(
f"HCS is not correct. " f"Calculated: {frame.hcs!r}, in data: {hcs!r}"
)

if fcs != frame.fcs:
raise hdlc_exceptions.HdlcParsingError("FCS is not correct")
Expand Down
11 changes: 8 additions & 3 deletions dlms_cosem/protocol/xdlms/general_global_cipher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import partial
from typing import Optional

import attr

Expand Down Expand Up @@ -41,7 +42,8 @@ class GeneralGlobalCipher(AbstractXDlmsApdu):
]
)

system_title: bytes
# Some implementations does not send the system_title. But it seems like it is against the standard.
system_title: Optional[bytes]
security_control: SecurityControlField
invocation_counter: int
ciphered_text: bytes
Expand All @@ -66,8 +68,11 @@ def from_bytes(cls, source_bytes: bytes):
def to_bytes(self) -> bytes:
out = bytearray()
out.append(self.TAG)
out.append(len(self.system_title))
out.extend(self.system_title)
if self.system_title:
out.append(len(self.system_title))
out.extend(self.system_title)
else:
out.extend(b"\x00")
out.append(
len(
self.security_control.to_bytes()
Expand Down
23 changes: 23 additions & 0 deletions tests/test_clients/test_dlms_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dlms_cosem.connection import DlmsConnectionSettings
from dlms_cosem.security import NoSecurityAuthentication
from dlms_cosem.client import DlmsClient
from dlms_cosem.io import BlockingTcpIO, TcpTransport
Expand Down Expand Up @@ -32,3 +33,25 @@ def test_client_invocation_counter_setter(self):
client.client_invocation_counter = 1000
assert client.client_invocation_counter == 1000
assert client.dlms_connection.client_invocation_counter == 1000


class TestDlmsClientWithConnectionSettings:

def test_can_get_settings_from_client(self):
settings = DlmsConnectionSettings(empty_system_title_in_general_glo_ciphering=True)

transport = TcpTransport(
io=BlockingTcpIO(host="localhost", port=4059),
client_logical_address=1,
server_logical_address=1,
)


client = DlmsClient(
client_initial_invocation_counter=500,
transport=transport,
authentication=NoSecurityAuthentication(),
connection_settings=settings
)

assert client.dlms_connection.settings.empty_system_title_in_general_glo_ciphering == True
54 changes: 53 additions & 1 deletion tests/test_dlms_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlms_cosem.connection import (
DlmsConnection,
XDlmsApduFactory,
make_client_to_server_challenge,
make_client_to_server_challenge, DlmsConnectionSettings,
)
from dlms_cosem.exceptions import LocalDlmsProtocolError
from dlms_cosem.protocol import acse, xdlms
Expand Down Expand Up @@ -59,6 +59,58 @@ def test_negotiated_conformance_is_updated():
assert c.state.current_state == state.READY


def test_settings_exists_on_simple_init():
c = DlmsConnection(
client_system_title=b"12345678",
authentication=NoSecurityAuthentication(),
)
assert c.settings is not None

def test_settings_empty_system_title_in_general_glo_cipher_false(get_request: xdlms.GetRequestNormal):
"""
Make sure that system_title is is used when protecting APDUs with default connection settings.
"""
system_title = b"12345678"
c = DlmsConnection(
state=state.DlmsConnectionState(current_state=state.READY),
client_system_title=system_title,
authentication=NoSecurityAuthentication(),
global_encryption_key=b"1111111111111111",
global_authentication_key=b"0000000000000000"
)

assert c.settings is not None

print(c.settings)

ciphered = c.protect(get_request)
print(ciphered)
assert ciphered.system_title is not None
assert ciphered.system_title == system_title


def test_settings_empty_system_title_in_general_glo_cipher_true(get_request: xdlms.GetRequestNormal):
"""
Make sure that system_title is not used when protecting APDUs if the connections settings is to leave it empty
"""
system_title = b"12345678"
settings = DlmsConnectionSettings(empty_system_title_in_general_glo_ciphering=True)

c = DlmsConnection(
state=state.DlmsConnectionState(current_state=state.READY),
client_system_title=system_title,
authentication=NoSecurityAuthentication(),
global_encryption_key=b"1111111111111111",
global_authentication_key=b"0000000000000000",
settings=settings,
)

assert c.settings is not None

ciphered = c.protect(get_request)
assert ciphered.system_title is None


def test_cannot_re_associate(aarq: acse.ApplicationAssociationRequest):
c = DlmsConnection(
state=state.DlmsConnectionState(current_state=state.READY),
Expand Down
Loading

0 comments on commit dc773c4

Please sign in to comment.