Skip to content

Commit

Permalink
Python SDK: Add Gopro-specific advertisement parsing (#633)
Browse files Browse the repository at this point in the history
* Add in adv parsing structs and demo
  • Loading branch information
tcamise-gpsw committed Nov 19, 2024
1 parent 3667cad commit c8b3664
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# adv_parsing.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
# This copyright was auto-generated on Mon Nov 18 21:03:38 UTC 2024

"""Demo to retrieve and parse bleak-level advertisements"""

import asyncio

from bleak import BleakScanner

from open_gopro.models.ble_advertisement import AdvData, GoProAdvData


async def main() -> None:
adv_data = AdvData()

async with BleakScanner(service_uuids=["0000fea6-0000-1000-8000-00805f9b34fb"]) as scanner:
async for _, data in scanner.advertisement_data():
adv_data.update(data)
if adv_data.local_name: # Once we've received the scan response...
break

print(f"GoPro Data: {GoProAdvData.fromAdvData(adv_data)}")


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# ble_advertisement.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
# This copyright was auto-generated on Mon Nov 18 21:03:37 UTC 2024

"""GoPro specific advertisement entities and parsing structures"""

from __future__ import annotations

import json
from dataclasses import asdict, dataclass, field
from typing import Any

from bleak.backends.scanner import AdvertisementData
from construct import (
Adapter,
BitStruct,
Byte,
Bytes,
Flag,
GreedyString,
Hex,
Int16ub,
Int16ul,
PaddedString,
Padding,
Struct,
this,
)

from open_gopro.util import deeply_update_dict


class Hexlify(Adapter):
"""Construct adapter for pretty hex representation"""

def _decode(self, obj: bytes, context: Any, path: Any) -> str:
return obj.hex(":")

def _encode(self, obj: str, context: Any, path: Any) -> list[int]:
return list(map(int, obj.split(":")))


camera_status_struct = BitStruct(
"processor_state" / Flag,
"wifi_ap_state" / Flag,
"peripheral_pairing_state" / Flag,
"central_role_enabled" / Flag,
"is_new_media_available" / Flag,
"reserved" / Padding(3),
)


camera_capability_struct = BitStruct(
"cnc" / Flag,
"ble_metadata" / Flag,
"wideband_audio" / Flag,
"concurrent_master_slave" / Flag,
"onboarding" / Flag,
"new_media_available" / Flag,
"reserved" / Padding(10),
)

media_offload_status_struct = BitStruct(
"available" / Flag,
"new_media_available" / Flag,
"battery_ok" / Flag,
"sd_card_ok" / Flag,
"busy" / Flag,
"paused" / Flag,
"reserved" / Padding(2),
)

manuf_data_struct = Struct(
"schema_version" / Byte,
"camera_status" / camera_status_struct,
"camera_id" / Byte,
"camera_capabilities" / camera_capability_struct,
"id_hash" / Hexlify(Bytes(6)),
"media_offload_status" / media_offload_status_struct,
)

adv_data_struct = Struct(
"flags_length" / Byte,
"flags" / Hex(Int16ub),
"uuids_length" / Byte,
"uuids_type" / Hex(Byte),
"uuids" / Hex(Int16ul),
"manuf_length" / Byte,
"manuf_type" / Hex(Byte),
"company_id" / Hex(Int16ub),
"manuf_data" / manuf_data_struct,
)

service_data_struct = Struct(
"ap_mac_address" / Hexlify(Bytes(4)),
"serial_number" / GreedyString("utf8"),
)

scan_response_struct = Struct(
"name_length" / Byte,
"name_type" / Hex(Byte),
"name" / PaddedString(this.name_length - 1, encoding="utf8"),
"service_length" / Byte,
"service_type" / Hex(Byte),
"service_uuid" / Hex(Int16ul),
"service_data" / service_data_struct,
)


@dataclass
class Jsonable:
"""Mixin to use pretty hex presentation for JSON decoding"""

def __str__(self) -> str:
def default_decode(obj: Any) -> Any:
if isinstance(obj, (bytes, bytearray)):
return obj.hex(":")
return str(obj)

return json.dumps(asdict(self), indent=4, default=default_decode)


@dataclass
class GoProAdvData(Jsonable):
"""GoPro-specific advertising data"""

name: str
schema_version: int
processor_state: bool
wifi_ap_state: bool
peripheral_pairing_state: bool
is_new_media_available: bool
camera_id: str
supports_cnc: bool
supports_ble_metadata: bool
supports_wideband_audio: bool
supports_concurrent_master_slave: bool
supports_onboarding: bool
supports_new_media_available: bool
id_hash: bytes
is_media_upload_new_media_available: bool
is_media_upload_available: bool
is_media_upload_battery_ok: bool
is_media_upload_sd_card_ok: bool
is_media_upload_busy: bool
is_media_upload_paused: bool
ap_mac_address: bytes
partial_serial_number: bytes

@classmethod
def fromAdvData(cls, data: AdvData) -> GoProAdvData:
"""Build GoPro specific advertisement from standard BLE advertisement data
Args:
data (AdvData): standard BLE advertisement data
Returns:
GoProAdvData: parsed GoPro specific advertising data
"""
manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0])
service_data = service_data_struct.parse(list(data.service_data.values())[0])
return GoProAdvData(
# Name from scan response data
name=data.local_name,
# Schema version from advertising data manufacturer data
schema_version=manuf_data.schema_version,
# Camera status from advertising data manufacturer data
processor_state=manuf_data.camera_status.processor_state,
wifi_ap_state=manuf_data.camera_status.wifi_ap_state,
peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state,
is_new_media_available=manuf_data.camera_status.is_new_media_available,
# Camera ID from advertising data manufacturer data
camera_id=manuf_data.camera_id,
# Camera capabilities from advertising data manufacturer data
supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata,
supports_cnc=manuf_data.camera_capabilities.cnc,
supports_onboarding=manuf_data.camera_capabilities.onboarding,
supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio,
supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave,
supports_new_media_available=manuf_data.camera_capabilities.new_media_available,
# ID Hash from advertising data manufacturer's data
id_hash=manuf_data.id_hash,
# Media offload status status from advertising data manufacturer's data
is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available,
is_media_upload_available=manuf_data.media_offload_status.available,
is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok,
is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok,
is_media_upload_busy=manuf_data.media_offload_status.busy,
is_media_upload_paused=manuf_data.media_offload_status.paused,
# Mac address from scan response data service data
ap_mac_address=service_data.ap_mac_address,
# Partial serial number from scan response data service data
partial_serial_number=service_data.serial_number,
)


@dataclass
class AdvData(Jsonable):
"""Standard BLE advertising data
Only contains fields that are currently used by GoPro
"""

local_name: str = ""
manufacturer_data: dict[str, Any] = field(default_factory=dict)
service_uuids: list[str] = field(default_factory=list)
service_data: dict = field(default_factory=dict)

def update(self, data: AdvertisementData) -> None:
"""Update with a (potentially incomplete) advertisement
Args:
data (AdvertisementData): advertisement to use for updating
"""
self_dict = asdict(self)
for k, v in data._asdict().items():
if not v:
continue
if isinstance(v, dict):
setattr(self, k, deeply_update_dict(self_dict[k], v))
elif isinstance(v, list):
setattr(self, k, [*self_dict[k], v])
else:
setattr(self, k, v)
18 changes: 18 additions & 0 deletions demos/python/sdk_wireless_camera_control/open_gopro/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,21 @@ def get_current_dst_aware_time() -> tuple[datetime, int, bool]:
if is_dst:
offset += 60
return (now, int(offset), is_dst)


def deeply_update_dict(d: dict, u: dict) -> dict:
"""Recursively update a dict
Args:
d (dict): original dict
u (dict): dict to apply updates from
Returns:
dict: updated original dict
"""
for k, v in u.items():
if isinstance(v, dict):
d[k] = deeply_update_dict(d.get(k, {}), v)
else:
d[k] = v
return d
114 changes: 113 additions & 1 deletion demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from open_gopro.api.parsers import ByteParserBuilders
from open_gopro.communicator_interface import GoProBle
from open_gopro.constants import CmdId
from open_gopro.models.response import BleRespBuilder, GlobalParsers
from open_gopro.models.ble_advertisement import adv_data_struct, scan_response_struct
from open_gopro.models.response import GlobalParsers
from open_gopro.parser_interface import Parser
from open_gopro.proto import EnumResultGeneric, ResponseGetApEntries

Expand All @@ -32,3 +33,114 @@ def test_recursive_protobuf_proxying():
assert len(parsed.entries) == 2
assert parsed.entries[0].ssid == "one"
assert parsed.entries[1].ssid == "two"


def test_ble_advertisement_parsing():
# GIVEN
adv_data = bytes(
[
0x02,
0x01,
0x02,
0x03,
0x02,
0xA6,
0xFE,
0x0F,
0xFF,
0xF2,
0x02,
0x02,
0x01,
0x38,
0x33,
0x00,
0xB3,
0xFE,
0x2A,
0x79,
0xDC,
0xEB,
0x0F,
]
)

# WHEN
adv = adv_data_struct.parse(adv_data)
manuf_data = adv.manuf_data
camera_status = manuf_data.camera_status
camera_capabilities = manuf_data.camera_capabilities
media_offload_status = manuf_data.media_offload_status

# THEN
assert adv.flags == 0x0102
assert adv.uuids == 0xFEA6
assert adv.manuf_type == 0xFF
assert adv.company_id == 0xF202

assert manuf_data.schema_version == 2
assert manuf_data.camera_id == 56
assert manuf_data.id_hash == "b3:fe:2a:79:dc:eb"

assert camera_status.processor_state == False
assert camera_status.wifi_ap_state == False
assert camera_status.peripheral_pairing_state == False
assert camera_status.central_role_enabled == False
assert camera_status.is_new_media_available == False

assert camera_capabilities.cnc == False
assert camera_capabilities.ble_metadata == False
assert camera_capabilities.wideband_audio == True
assert camera_capabilities.concurrent_master_slave == True
assert camera_capabilities.onboarding == False
assert camera_capabilities.new_media_available == False

assert media_offload_status.available == False
assert media_offload_status.new_media_available == False
assert media_offload_status.battery_ok == False
assert media_offload_status.sd_card_ok == False
assert media_offload_status.busy == True
assert media_offload_status.paused == True


def test_ble_scan_response_parsing():
# GIVEN
scan_response_data = bytes(
[
0x0B,
0x09,
0x47,
0x6F,
0x50,
0x72,
0x6F,
0x20,
0x31,
0x30,
0x35,
0x38,
0x0B,
0x16,
0xA6,
0xFE,
0xF7,
0xA9,
0x76,
0x88,
0x31,
0x30,
0x35,
0x38,
]
)

# WHEN
scan_response = scan_response_struct.parse(scan_response_data)
print(scan_response)

# THEN
assert scan_response.name == "GoPro 1058"
assert scan_response.service_type == 0x16
assert scan_response.service_uuid == 0xFEA6
assert scan_response.service_data.ap_mac_address == "f7:a9:76:88"
assert scan_response.service_data.serial_number == "1058"

0 comments on commit c8b3664

Please sign in to comment.