Skip to content

Commit

Permalink
Fix the voltage value of the relay switch (#272)
Browse files Browse the repository at this point in the history
Co-authored-by: J. Nick Koston <[email protected]>
  • Loading branch information
greyeee and bdraco authored Nov 26, 2024
1 parent 6f297c9 commit 02b3632
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 9 deletions.
6 changes: 3 additions & 3 deletions switchbot/adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .adv_parsers.motion import process_wopresence
from .adv_parsers.plug import process_woplugmini
from .adv_parsers.relay_switch import (
process_worelay_switch_1plus,
process_worelay_switch_1,
process_worelay_switch_1pm,
)
from .const import SwitchbotModel
Expand Down Expand Up @@ -191,9 +191,9 @@ class SwitchbotSupportedType(TypedDict):
"manufacturer_id": 2409,
},
";": {
"modelName": SwitchbotModel.RELAY_SWITCH_1_PLUS,
"modelName": SwitchbotModel.RELAY_SWITCH_1,
"modelFriendlyName": "Relay Switch 1",
"func": process_worelay_switch_1plus,
"func": process_worelay_switch_1,
"manufacturer_id": 2409,
},
}
Expand Down
2 changes: 1 addition & 1 deletion switchbot/adv_parsers/relay_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_worelay_switch_1pm(
}


def process_worelay_switch_1plus(
def process_worelay_switch_1(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoStrip services data."""
Expand Down
2 changes: 1 addition & 1 deletion switchbot/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SwitchbotModel(StrEnum):
HUB2 = "WoHub2"
KEYPAD = "WoKeypad"
RELAY_SWITCH_1PM = "Relay Switch 1PM"
RELAY_SWITCH_1_PLUS = "Relay Switch 1"
RELAY_SWITCH_1 = "Relay Switch 1"


class LockStatus(Enum):
Expand Down
38 changes: 34 additions & 4 deletions switchbot/devices/relay_switch.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import asyncio
import logging
import time
from typing import Any

from bleak.backends.device import BLEDevice
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from ..const import SwitchbotModel
from .device import SwitchbotSequenceDevice
from ..models import SwitchBotAdvertisement
from .device import SwitchbotDevice

_LOGGER = logging.getLogger(__name__)

COMMAND_HEADER = "57"
COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
PASSIVE_POLL_INTERVAL = 1 * 60
PASSIVE_POLL_INTERVAL = 10 * 60


class SwitchbotRelaySwitch(SwitchbotSequenceDevice):
class SwitchbotRelaySwitch(SwitchbotDevice):
"""Representation of a Switchbot relay switch 1pm."""

def __init__(
Expand All @@ -41,8 +46,30 @@ def __init__(
self._key_id = key_id
self._encryption_key = bytearray.fromhex(encryption_key)
self._model: SwitchbotModel = model
self._force_next_update = False
super().__init__(device, None, interface, **kwargs)

def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
"""Update device data from advertisement."""
# Obtain voltage and current through command.
adv_data = advertisement.data["data"]
if previous_voltage := self._get_adv_value("voltage"):
adv_data["voltage"] = previous_voltage
if previous_current := self._get_adv_value("current"):
adv_data["current"] = previous_current
current_state = self._get_adv_value("sequence_number")
super().update_from_advertisement(advertisement)
new_state = self._get_adv_value("sequence_number")
_LOGGER.debug(
"%s: update advertisement: %s (seq before: %s) (seq after: %s)",
self.name,
advertisement,
current_state,
new_state,
)
if current_state != new_state:
self._force_next_update = True

async def update(self, interface: int | None = None) -> None:
"""Update state of device."""
if info := await self.get_voltage_and_current():
Expand All @@ -56,13 +83,16 @@ async def get_voltage_and_current(self) -> dict[str, Any] | None:
ok = self._check_command_result(result, 0, {1})
if ok:
return {
"voltage": (result[9] << 8) + result[10],
"voltage": ((result[9] << 8) + result[10]) / 10,
"current": (result[11] << 8) + result[12],
}
return None

def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
"""Return if device needs polling."""
if self._force_next_update:
self._force_next_update = False
return True
if (
seconds_since_last_poll is not None
and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
Expand Down
65 changes: 65 additions & 0 deletions tests/test_adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1672,3 +1672,68 @@ def test_parse_advertisement_data_keypad():
rssi=-67,
active=True,
)


def test_parse_advertisement_data_relay_switch_1pm():
"""Test parse_advertisement_data for the keypad."""
ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={2409: b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00"},
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"<\x00\x00\x00"},
rssi=-67,
)
result = parse_advertisement_data(
ble_device, adv_data, SwitchbotModel.RELAY_SWITCH_1PM
)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"switchMode": True,
"sequence_number": 71,
"isOn": True,
"power": 4.9,
"voltage": 0,
"current": 0,
},
"isEncrypted": False,
"model": "<",
"modelFriendlyName": "Relay Switch 1PM",
"modelName": SwitchbotModel.RELAY_SWITCH_1PM,
"rawAdvData": b"<\x00\x00\x00",
},
device=ble_device,
rssi=-67,
active=True,
)


def test_parse_advertisement_data_relay_switch_1():
"""Test parse_advertisement_data for the keypad."""
ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={2409: b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00"},
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b";\x00\x00\x00"},
rssi=-67,
)
result = parse_advertisement_data(
ble_device, adv_data, SwitchbotModel.RELAY_SWITCH_1
)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"switchMode": True,
"sequence_number": 71,
"isOn": True,
},
"isEncrypted": False,
"model": ";",
"modelFriendlyName": "Relay Switch 1",
"modelName": SwitchbotModel.RELAY_SWITCH_1,
"rawAdvData": b";\x00\x00\x00",
},
device=ble_device,
rssi=-67,
active=True,
)
60 changes: 60 additions & 0 deletions tests/test_relay_switch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from unittest.mock import AsyncMock

import pytest
from bleak.backends.device import BLEDevice

from switchbot import SwitchBotAdvertisement, SwitchbotModel
from switchbot.devices import relay_switch

from .test_adv_parser import generate_ble_device


def create_device_for_command_testing(calibration=True, reverse_mode=False):
ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
relay_switch_device = relay_switch.SwitchbotRelaySwitch(
ble_device, "ff", "ffffffffffffffffffffffffffffffff"
)
relay_switch_device.update_from_advertisement(make_advertisement_data(ble_device))
return relay_switch_device


def make_advertisement_data(ble_device: BLEDevice):
"""Set advertisement data with defaults."""

return SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"rawAdvData": b"$X|\x0866G\x81\x00\x00\x001\x00\x00\x00\x00",
"data": {
"switchMode": True,
"sequence_number": 71,
"isOn": True,
"power": 4.9,
"voltage": 0,
"current": 0,
},
"isEncrypted": False,
"model": "<",
"modelFriendlyName": "Relay Switch 1PM",
"modelName": SwitchbotModel.RELAY_SWITCH_1PM,
},
device=ble_device,
rssi=-80,
active=True,
)


@pytest.mark.asyncio
async def test_turn_on():
relay_switch_device = create_device_for_command_testing()
relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
await relay_switch_device.turn_on()
assert relay_switch_device.is_on() is True


@pytest.mark.asyncio
async def test_trun_off():
relay_switch_device = create_device_for_command_testing()
relay_switch_device._send_command = AsyncMock(return_value=b"\x01")
await relay_switch_device.turn_off()
assert relay_switch_device.is_on() is False

0 comments on commit 02b3632

Please sign in to comment.