Skip to content

Commit

Permalink
Modbus write_register(s) methods accept plain bytearrays
Browse files Browse the repository at this point in the history
  • Loading branch information
kbialek committed Dec 22, 2023
1 parent 66588f0 commit d2ac0e0
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/deye_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def write_register(self, args):
sys.exit(1)
reg_address = int(args[0])
reg_value = int(args[1])
if self.__modbus.write_register(reg_address, reg_value):
if self.__modbus.write_register_uint(reg_address, reg_value):
print("Ok")
else:
print("Error")
Expand Down
2 changes: 1 addition & 1 deletion src/deye_command_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ def handle_command(self, client: Client, userdata, msg: MQTTMessage):
return

self.__log.info("Setting active power regulation to %f", active_power_regulation_factor)
self.__modbus.write_register(40, int(active_power_regulation_factor * 10))
self.__modbus.write_register_uint(40, int(active_power_regulation_factor * 10))
48 changes: 37 additions & 11 deletions src/deye_modbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,51 @@ def read_registers(self, first_reg: int, last_reg: int) -> dict[int, bytearray]:
return {}
return self.__parse_modbus_read_holding_registers_response(modbus_resp_frame, first_reg, last_reg)

def write_register(self, reg_address: int, reg_value: int) -> bool:
"""Write single modbus holding register
def write_register_uint(self, reg_address: int, reg_value: int) -> bool:
"""Write single modbus holding register, assuming the value is an unsigned int
Args:
reg_address (int): The address of the register to write
reg_value (int): The value of the register to write
Returns:
bool: True when the write operation was successful, False otherwise
"""
return self.write_register(reg_address, reg_value.to_bytes(2, "big", signed=False))

def write_register(self, reg_address: int, reg_value: bytearray) -> bool:
"""Write single modbus holding register
Args:
reg_address (int): The address of the register to write
reg_value (bytearray): The value of the register to write
Returns:
bool: True when the write operation was successful, False otherwise
"""
return self.write_registers(reg_address, [reg_value])

def write_registers(self, reg_address: int, reg_values: list[int]) -> bool:
"""Write multiple modbus holding registers.
def write_registers_uint(self, reg_address: int, reg_values: list[int]) -> bool:
"""Write multiple modbus holding registers, assuming the values are unsigned integers.
Args:
reg_address (int): The address of the first register to write
reg_values (list[int]): The values of the registers to write
Returns:
bool: True when the write operation was successful, False otherwise
"""
self.write_registers(reg_address, [v.to_bytes(2, "big", signed=False) for v in reg_values])

def write_registers(self, reg_address: int, reg_values: list[bytearray]) -> bool:
"""Write multiple modbus holding registers.
Args:
reg_address (int): The address of the first register to write
reg_values (list[bytearray]): The values of the registers to write
Returns:
bool: True when the write operation was successful, False otherwise
"""
Expand Down Expand Up @@ -112,15 +137,16 @@ def __parse_modbus_read_holding_registers_response(
a += 1
return registers

def __build_modbus_write_holding_register_request_frame(self, reg_address: int, reg_values: list[int]) -> bytearray:
return bytearray.fromhex(
"0110{:04x}{:04x}{:02x}{}".format(
reg_address, len(reg_values), len(reg_values) * 2, "".join(["{:04x}".format(v) for v in reg_values])
)
)
def __build_modbus_write_holding_register_request_frame(
self, reg_address: int, reg_values: list[bytearray]
) -> bytearray:
result = bytearray.fromhex("0110{:04x}{:04x}{:02x}".format(reg_address, len(reg_values), len(reg_values) * 2))
for v in reg_values:
result.extend(v)
return result

def __parse_modbus_write_holding_register_response(
self, frame: bytes, reg_address: int, reg_values: list[int]
self, frame: bytes, reg_address: int, reg_values: list[bytearray]
) -> bool:
expected_frame_data_len = 6
expected_frame_len = 6 + 2 # 2 bytes for crc
Expand Down
2 changes: 1 addition & 1 deletion src/deye_set_time_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def process(self, events: DeyeEventList):

def __set_time(self) -> bool:
now = datetime.now()
write_status = self.__modbus.write_registers(
write_status = self.__modbus.write_registers_uint(
22,
[
# year and month
Expand Down
6 changes: 3 additions & 3 deletions tests/deye_command_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_handle_valid_value(self, modbus_mock: DeyeModbus):
sut.handle_command(None, None, msg)

# then
modbus_mock.write_register.assert_called_with(40, 1000)
modbus_mock.write_register_uint.assert_called_with(40, 1000)

def test_reject_too_high_value(self, modbus_mock: DeyeModbus):
# given
Expand All @@ -73,7 +73,7 @@ def test_reject_too_high_value(self, modbus_mock: DeyeModbus):
sut.handle_command(None, None, msg)

# then
assert not modbus_mock.write_register.called
assert not modbus_mock.write_register_uint.called

def test_reject_too_low_value(self, modbus_mock: DeyeModbus):
# given
Expand All @@ -87,4 +87,4 @@ def test_reject_too_low_value(self, modbus_mock: DeyeModbus):
sut.handle_command(None, None, msg)

# then
assert not modbus_mock.write_register.called
assert not modbus_mock.write_register_uint.called
19 changes: 18 additions & 1 deletion tests/deye_modbus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,24 @@ def test_write_register_0x12_to_0xa3d4(self, connector):
)

# when
success = sut.write_register(0x12, 0xA3D4)
success = sut.write_register(0x12, bytearray.fromhex("A3D4"))

# then
self.assertTrue(success)
connector.send_request.assert_called_once_with(
bytearray.fromhex("a51a0010450000d202964902000000000000000000000000000001100012000102a3d4dd8d2b15")
)

@patch("deye_connector.DeyeConnector")
def test_write_register_uint_0x12_to_0xa3d4(self, connector):
# given
sut = DeyeModbus(DeyeModbusTcp(self.config, connector))
connector.send_request.return_value = bytearray.fromhex(
"a5000000000000000000000000000000000000000000000000" + "011000120001a1cc" + "0015"
)

# when
success = sut.write_register_uint(0x12, 0xA3D4)

# then
self.assertTrue(success)
Expand Down
6 changes: 3 additions & 3 deletions tests/deye_set_time_processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_set_time__when_logger_is_becoming_online(self, modbus):
processor.process(DeyeEventList([DeyeLoggerStatusEvent(True)]))

# then
modbus.write_registers.assert_any_call(22, unittest.mock.ANY)
modbus.write_registers_uint.assert_any_call(22, unittest.mock.ANY)

# and
self.assertTrue(processor.last_status)
Expand Down Expand Up @@ -61,13 +61,13 @@ def test_keep_stored_status_set_to_offline__when_modbus_write_fails(self, modbus
processor = DeyeSetTimeProcessor(modbus)

# and
modbus.write_registers.return_value = False
modbus.write_registers_uint.return_value = False

# when
processor.process(DeyeEventList([DeyeLoggerStatusEvent(True)]))

# then
modbus.write_registers.assert_any_call(22, unittest.mock.ANY)
modbus.write_registers_uint.assert_any_call(22, unittest.mock.ANY)

# and
self.assertFalse(processor.last_status)

0 comments on commit d2ac0e0

Please sign in to comment.