-
Notifications
You must be signed in to change notification settings - Fork 307
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support for Google´s Bumble Bluetooth Controller stack
The backend supports direct use with Bumble. The HCI Controller is managed by the Bumble stack and the transport layer can be defined by the user (e.g. VHCI, Serial, TCP, android-netsim).
- Loading branch information
Showing
17 changed files
with
2,063 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ Contributors | |
* David Johansen <[email protected]> | ||
* JP Hutchins <[email protected]> | ||
* Bram Duvigneau <[email protected]> | ||
* Victor Chavez <[email protected]> | ||
|
||
Sponsors | ||
-------- | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# SPDX-License-Identifier: MIT | ||
# Copyright (c) 2024 Victor Chavez | ||
"""Bumble backend.""" | ||
from enum import Enum | ||
from typing import Dict, Final, Optional | ||
|
||
from bumble.controller import Controller | ||
from bumble.link import LocalLink | ||
from bumble.transport import Transport, open_transport | ||
|
||
transports: Dict[str, Transport] = {} | ||
_link: Final = LocalLink() | ||
|
||
|
||
class TransportScheme(Enum): | ||
SERIAL = "serial" | ||
UDP = "udp" | ||
TCP_CLIENT = "tcp-client" | ||
TCP_SERVER = "tcp-server" | ||
WS_CLIENT = "ws-client" | ||
WS_SERVER = "ws-server" | ||
PTY = "pty" | ||
FILE = "file" | ||
VHCI = "vhci" | ||
HCI_SOCKET = "hci-socket" | ||
USB = "usb" | ||
PYUSB = "pyusb" | ||
ANDROID_EMULATOR = "android-emulator" | ||
ANDROID_NETSIM = "android-netsim" | ||
UNIX = "unix" | ||
|
||
|
||
class BumbleTransport: | ||
def __init__(self, scheme: TransportScheme, args: Optional[str] = None): | ||
""" | ||
Args: | ||
scheme: TransportScheme: The transport scheme supported by bumble | ||
args: Optional[str]: The arguments used to initialize the transport. | ||
See https://google.github.io/bumble/transports/index.html | ||
""" | ||
self.scheme: Final = scheme | ||
self.args: Final = args | ||
|
||
def __str__(self): | ||
return f"{self.scheme.value}:{self.args}" if self.args else self.scheme.value | ||
|
||
|
||
def get_default_transport() -> BumbleTransport: | ||
return BumbleTransport(TransportScheme.TCP_SERVER, "127.0.0.1:1234") | ||
|
||
|
||
async def start_transport(transport: BumbleTransport) -> None: | ||
transport_cmd = str(transport) | ||
if transport_cmd not in transports.keys(): | ||
transports[transport_cmd] = await open_transport(transport_cmd) | ||
Controller( | ||
"ext", | ||
host_source=transports[transport_cmd].source, | ||
host_sink=transports[transport_cmd].sink, | ||
link=_link, | ||
) | ||
|
||
|
||
def get_link(): | ||
# Assume all transports are linked | ||
return _link |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# SPDX-License-Identifier: MIT | ||
# Copyright (c) 2024 Victor Chavez | ||
|
||
from typing import Callable, Final, List, Union | ||
from uuid import UUID | ||
|
||
from bumble.gatt import Characteristic | ||
from bumble.gatt_client import CharacteristicProxy, ServiceProxy | ||
|
||
from bleak import normalize_uuid_str | ||
from bleak.backends.bumble.utils import uuid_bytes_to_str | ||
from bleak.backends.characteristic import BleakGATTCharacteristic | ||
from bleak.backends.descriptor import BleakGATTDescriptor | ||
|
||
|
||
class BleakGATTCharacteristicBumble(BleakGATTCharacteristic): | ||
"""GATT Characteristic implementation for the Bumble backend.""" | ||
|
||
def __init__( | ||
self, | ||
obj: CharacteristicProxy, | ||
max_write_without_response_size: Callable[[], int], | ||
svc: ServiceProxy, | ||
): | ||
super().__init__(obj, max_write_without_response_size) | ||
self.__descriptors = [] | ||
props = [flag for flag in Characteristic.Properties if flag in obj.properties] | ||
self.__props: Final = [str(prop) for prop in props] | ||
self.__svc: Final = svc | ||
uuid = uuid_bytes_to_str(obj.uuid.uuid_bytes) | ||
self.__uuid: Final = normalize_uuid_str(uuid) | ||
|
||
@property | ||
def service_uuid(self) -> str: | ||
"""The uuid of the Service containing this characteristic""" | ||
return self.__svc.uuid | ||
|
||
@property | ||
def service_handle(self) -> int: | ||
"""The integer handle of the Service containing this characteristic""" | ||
return self.__svc.handle | ||
|
||
@property | ||
def handle(self) -> int: | ||
"""The handle of this characteristic""" | ||
return int(self.obj.handle) | ||
|
||
@property | ||
def uuid(self) -> str: | ||
"""The uuid of this characteristic""" | ||
return self.__uuid | ||
|
||
@property | ||
def properties(self) -> List[str]: | ||
"""Properties of this characteristic""" | ||
return self.__props | ||
|
||
@property | ||
def descriptors(self) -> List[BleakGATTDescriptor]: | ||
"""List of descriptors for this characteristic""" | ||
return self.__descriptors | ||
|
||
def get_descriptor( | ||
self, specifier: Union[int, str, UUID] | ||
) -> Union[BleakGATTDescriptor, None]: | ||
"""Get a descriptor by handle (int) or UUID (str or uuid.UUID)""" | ||
try: | ||
if isinstance(specifier, int): | ||
return next(filter(lambda x: x.handle == specifier, self.descriptors)) | ||
else: | ||
return next( | ||
filter(lambda x: x.uuid == str(specifier), self.descriptors) | ||
) | ||
except StopIteration: | ||
return None | ||
|
||
def add_descriptor(self, descriptor: BleakGATTDescriptor): | ||
"""Add a :py:class:`~BleakGATTDescriptor` to the characteristic. | ||
Should not be used by end user, but rather by `bleak` itself. | ||
""" | ||
self.__descriptors.append(descriptor) |
Oops, something went wrong.