Skip to content

Commit

Permalink
Write TimeOfUse config to the inverter
Browse files Browse the repository at this point in the history
  • Loading branch information
kbialek committed Dec 22, 2023
1 parent e5d574a commit d842dec
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/deye_modbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __build_modbus_write_holding_register_request_frame(
) -> bytearray:
result = bytearray.fromhex("0110{:04x}{:04x}{:02x}".format(reg_address, len(reg_values), len(reg_values) * 2))
for v in reg_values:
self.__log.info(f"Extending with {v}")
result.extend(v)
return result

Expand Down
24 changes: 13 additions & 11 deletions src/deye_timeofuse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def handle_command(self, client: Client, userdata, msg: MQTTMessage):

def handle_control_command(self, client: Client, userdata, msg: MQTTMessage):
if msg.payload == b"write":
self.write_config()
self.write_config(dry_run=False)
elif msg.payload == b"dry-write":
self.write_config(dry_run=True)

def write_config(self):
def write_config(self, dry_run: bool):
if not self.__read_state:
self.__log.warning("Time-of-use state not yet read from the inverter. Cannot apply config modifications.")
return
Expand All @@ -69,26 +71,26 @@ def write_config(self):
reg_map: dict[int, bytearray] = {}
for sensor in write_state:
reg_map |= sensor.write_value(write_state[sensor])
self.__write_registers(reg_map)
self.__write_registers(reg_map, dry_run)

def __write_registers(self, reg_map: dict[int, bytearray]) -> None:
def __write_registers(self, reg_map: dict[int, bytearray], dry_run: bool) -> None:
first_reg_addr = min(reg_map)
last_reg_addr = max(reg_map)
reg_values = []
reg_data = []
batch_reg_addr = 0
for reg_addr in range(
first_reg_addr, last_reg_addr + 2
): # When last reg is not found, the last batch is written
if reg_addr in reg_map:
reg_values.append(reg_map[reg_addr])
reg_data.append(reg_map[reg_addr])
if not batch_reg_addr:
batch_reg_addr = reg_addr
elif reg_values:
self.__log.info(f"Write time-of-use config registers: {batch_reg_addr}: {reg_values}")
elif reg_data:
self.__log.info(f"Write time-of-use config registers: {batch_reg_addr}: {reg_data}")
if not dry_run:
self.__modbus.write_registers(batch_reg_addr, reg_data)
batch_reg_addr = 0
reg_values = []

# self.__modbus.write_registers()
reg_data.clear()

def process(self, events: DeyeEventList):
read_state = {}
Expand Down

0 comments on commit d842dec

Please sign in to comment.