From e35be8114edae6bbc057d4d8a8f14a5f0fedf04e Mon Sep 17 00:00:00 2001 From: Bob Steers Date: Thu, 13 Jun 2024 16:12:24 +0200 Subject: [PATCH] implement spark USB discovery --- .github/workflows/build.yml | 4 +- Dockerfile | 6 + .../__init__.py | 0 .../app_factory.py | 12 +- brewblox_usb_proxy/connected_api.py | 17 +++ brewblox_usb_proxy/discovery.py | 117 ++++++++++++++++++ brewblox_usb_proxy/models.py | 30 +++++ {your_package => brewblox_usb_proxy}/utils.py | 7 ++ entrypoint.sh | 2 +- poetry.lock | 50 +++----- pyproject.toml | 8 +- tasks.py | 2 +- test/conftest.py | 4 +- test/test_discovery.py | 2 + test/test_http_example.py | 38 ------ test/test_mqtt_publish_example.py | 73 ----------- your_package/http_example.py | 28 ----- your_package/models.py | 49 -------- your_package/mqtt.py | 43 ------- your_package/mqtt_publish_example.py | 55 -------- your_package/mqtt_subscribe_example.py | 34 ----- 21 files changed, 209 insertions(+), 372 deletions(-) rename {your_package => brewblox_usb_proxy}/__init__.py (100%) rename {your_package => brewblox_usb_proxy}/app_factory.py (83%) create mode 100644 brewblox_usb_proxy/connected_api.py create mode 100644 brewblox_usb_proxy/discovery.py create mode 100644 brewblox_usb_proxy/models.py rename {your_package => brewblox_usb_proxy}/utils.py (89%) create mode 100644 test/test_discovery.py delete mode 100644 test/test_http_example.py delete mode 100644 test/test_mqtt_publish_example.py delete mode 100644 your_package/http_example.py delete mode 100644 your_package/models.py delete mode 100644 your_package/mqtt.py delete mode 100644 your_package/mqtt_publish_example.py delete mode 100644 your_package/mqtt_subscribe_example.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f959b68..a8ac19a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,11 +8,11 @@ on: workflow_dispatch: {} env: - DOCKER_IMAGE: ghcr.io/you/your-package + DOCKER_IMAGE: ghcr.io/BrewBlox/brewblox-usb-proxy jobs: build: - if: false + if: github.repository_owner == 'BrewBlox' runs-on: ubuntu-22.04 steps: diff --git a/Dockerfile b/Dockerfile index 48f9876..57d0824 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,6 +31,12 @@ COPY ./entrypoint.sh ./entrypoint.sh RUN < FastAPI: setup_logging(config.debug) # Call setup functions for modules - mqtt.setup() - mqtt_subscribe_example.setup() + discovery.setup() # Create app # OpenApi endpoints are set to /api/doc for backwards compatibility @@ -70,7 +67,6 @@ def create_app() -> FastAPI: redoc_url=f'{prefix}/api/redoc', openapi_url=f'{prefix}/openapi.json') - # Include all endpoints declared by modules - app.include_router(http_example.router, prefix=prefix) + app.include_router(connected_api.router, prefix=prefix) return app diff --git a/brewblox_usb_proxy/connected_api.py b/brewblox_usb_proxy/connected_api.py new file mode 100644 index 0000000..2ed5882 --- /dev/null +++ b/brewblox_usb_proxy/connected_api.py @@ -0,0 +1,17 @@ +import logging + +from fastapi import APIRouter + +from . import discovery + +LOGGER = logging.getLogger(__name__) + +router = APIRouter(prefix='/connected', tags=['Connected']) + + +@router.get('/spark') +async def discovery_connected() -> dict[str, int]: + """ + Get device ID and port for all connected Spark USB devices + """ + return discovery.CV.get().connection_index diff --git a/brewblox_usb_proxy/discovery.py b/brewblox_usb_proxy/discovery.py new file mode 100644 index 0000000..cf81091 --- /dev/null +++ b/brewblox_usb_proxy/discovery.py @@ -0,0 +1,117 @@ +import asyncio +import logging +import os +import signal +from asyncio.subprocess import Process +from contextlib import asynccontextmanager, suppress +from contextvars import ContextVar +from dataclasses import dataclass + +from serial.tools import list_ports + +from . import utils + +SPARK_HWIDS = [ + r'USB VID\:PID=2B04\:C006.*', # Photon + r'USB VID\:PID=2B04\:C008.*', # P1 +] + +# Construct a regex OR'ing all allowed hardware ID matches +# Example result: (?:HWID_REGEX_ONE|HWID_REGEX_TWO) +SPARK_DEVICE_REGEX = f'(?:{"|".join([dev for dev in SPARK_HWIDS])})' +USB_BAUD_RATE = 115200 + + +LOGGER = logging.getLogger(__name__) +CV: ContextVar['SparkDiscovery'] = ContextVar('discovery.SparkDiscovery') + + +@dataclass(frozen=True) +class SparkConnection: + device_id: str + port: int + proc: Process + + def close(self): + with suppress(Exception): + os.killpg(os.getpgid(self.proc.pid), signal.SIGINT) + + +class SparkDiscovery: + def __init__(self) -> None: + self.config = utils.get_config() + self._connections: set[SparkConnection] = set() + + @property + def connection_index(self) -> dict[str, int]: + return dict((c.device_id, c.port) + for c in self._connections + if c.proc.returncode is None) + + def _next_port(self): + ports = [c.port for c in self._connections] + return next((p for p + in range(self.config.port_start, self.config.port_end) + if p not in ports)) + + async def connect_usb(self, device_id: str, device_serial: str) -> SparkConnection: + port = self._next_port() + proc = await asyncio.create_subprocess_exec('/usr/bin/socat', + f'tcp-listen:{port},reuseaddr,fork', + f'file:{device_serial},raw,echo=0,b{USB_BAUD_RATE}', + preexec_fn=os.setsid, + shell=False) + return SparkConnection(device_id=device_id, + port=port, + proc=proc) + + async def discover_usb(self) -> None: + connected_ids = set(c.device_id for c in self._connections) + discovered_ids = set() + + for usb_port in list_ports.grep(SPARK_DEVICE_REGEX): + device_id = usb_port.serial_number.lower() + discovered_ids.add(device_id) + if device_id not in connected_ids: + conn = await self.connect_usb(device_id, usb_port.device) + LOGGER.info(f'Added {conn}') + self._connections.add(conn) + + def test_connected(conn: SparkConnection) -> bool: + if conn.proc.returncode is not None: + LOGGER.info(f'Removed {conn} (returncode={conn.proc.returncode})') + return False + + if conn.device_id not in discovered_ids: + conn.close() + + return True + + LOGGER.debug(f'{discovered_ids=}') + self._connections = set(filter(test_connected, self._connections)) + + async def repeat(self): + interval = self.config.discovery_interval + while True: + try: + await self.discover_usb() + await asyncio.sleep(interval.total_seconds()) + except Exception as ex: + LOGGER.error(utils.strex(ex)) + await asyncio.sleep(interval.total_seconds()) + + def close(self): + for conn in self._connections: + conn.close() + self._connections.clear() + + +@asynccontextmanager +async def lifespan(): + async with utils.task_context(CV.get().repeat()): + yield + CV.get().close() + + +def setup(): + CV.set(SparkDiscovery()) diff --git a/brewblox_usb_proxy/models.py b/brewblox_usb_proxy/models.py new file mode 100644 index 0000000..a03f813 --- /dev/null +++ b/brewblox_usb_proxy/models.py @@ -0,0 +1,30 @@ +""" +Pydantic models are declared here, and then imported wherever needed +""" + +from datetime import timedelta + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class ServiceConfig(BaseSettings): + """ + Global service configuration. + + Pydantic Settings (https://docs.pydantic.dev/latest/concepts/pydantic_settings/) + provides the `BaseSettings` model that loads values from environment variables. + + To access the loaded model, we use `utils.get_config()`. + """ + model_config = SettingsConfigDict( + env_prefix='brewblox_usb_proxy_', + case_sensitive=False, + json_schema_extra='ignore', + ) + + name: str = 'usb-proxy' + debug: bool = False + + discovery_interval: timedelta = timedelta(seconds=5) + port_start: int = 9000 + port_end: int = 9500 diff --git a/your_package/utils.py b/brewblox_usb_proxy/utils.py similarity index 89% rename from your_package/utils.py rename to brewblox_usb_proxy/utils.py index 442aa78..6d40e54 100644 --- a/your_package/utils.py +++ b/brewblox_usb_proxy/utils.py @@ -21,6 +21,13 @@ def get_config() -> ServiceConfig: # pragma: no cover return ServiceConfig() +def strex(ex: Exception): + """ + Formats exception as `Exception(message)` + """ + return f'{type(ex).__name__}({str(ex)})' + + @asynccontextmanager async def task_context(coro: Coroutine, cancel_timeout=timedelta(seconds=5) diff --git a/entrypoint.sh b/entrypoint.sh index c20bd21..139048e 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -5,4 +5,4 @@ exec uvicorn \ --host 0.0.0.0 \ --port 5000 \ --factory \ - your_package.app_factory:create_app + brewblox_usb_proxy.app_factory:create_app diff --git a/poetry.lock b/poetry.lock index 3f73473..c754484 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "annotated-types" @@ -114,23 +114,6 @@ typing-extensions = ">=4.8.0" [package.extras] all = ["email-validator (>=2.0.0)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.7)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] -[[package]] -name = "fastapi-mqtt" -version = "2.1.0" -description = "fastapi-mqtt is extension for MQTT protocol" -optional = false -python-versions = ">=3.7,<4.0" -files = [ - {file = "fastapi_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:59bd9f44f37619f578de9e1a27df2de85e347a7da909faebfcc6bcdd6d459b33"}, - {file = "fastapi_mqtt-2.1.0.tar.gz", hash = "sha256:7f9c5322a5673237906c4cf75875469c56f5bc5b1cb9b1dd2b2da48fb0957e57"}, -] - -[package.dependencies] -fastapi = ">=0.103" -gmqtt = ">=0.6.11" -pydantic = ">=2.3.0" -uvicorn = ">=0.19.0" - [[package]] name = "flake8" version = "7.0.0" @@ -177,20 +160,6 @@ files = [ flake8 = "*" setuptools = "*" -[[package]] -name = "gmqtt" -version = "0.6.14" -description = "Client for MQTT protocol" -optional = false -python-versions = ">=3.5" -files = [ - {file = "gmqtt-0.6.14-py3-none-any.whl", hash = "sha256:aed71c5e4bbcb8abde923ba917a733408c3d7988b9d9fcc2fdb04717504d5ed7"}, - {file = "gmqtt-0.6.14.tar.gz", hash = "sha256:45b0f7794247455b9163155eeedf41c86e303c72b79056bf65d33038b17443a3"}, -] - -[package.extras] -test = ["atomicwrites (>=1.3.0)", "attrs (>=19.1.0)", "codecov (>=2.0.15)", "coverage (>=4.5.3)", "more-itertools (>=7.0.0)", "pluggy (>=0.11.0)", "py (>=1.8.0)", "pytest (>=5.4.0)", "pytest-asyncio (>=0.12.0)", "pytest-cov (>=2.7.1)", "six (>=1.12.0)", "uvloop (>=0.14.0)"] - [[package]] name = "h11" version = "0.14.0" @@ -512,6 +481,20 @@ files = [ {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, ] +[[package]] +name = "pyserial" +version = "3.5" +description = "Python Serial Port Extension" +optional = false +python-versions = "*" +files = [ + {file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"}, + {file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"}, +] + +[package.extras] +cp2110 = ["hidapi"] + [[package]] name = "pytest" version = "8.0.0" @@ -624,7 +607,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -954,4 +936,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.11,<4" -content-hash = "b07667c62f049ad71af26de28a01afbd783c38478f7d2067c39ea119d4cc899c" +content-hash = "a081ac6779de0a3d20c794b954b27ac3a69769251eff5d07707266f041d2b49c" diff --git a/pyproject.toml b/pyproject.toml index 561936a..ab8e8c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,8 @@ [tool.poetry] -name = "your_package" +name = "brewblox_usb_proxy" version = "0.1.0" -description = "ADD DESCRIPTION HERE" -authors = ["Code Fairies Inc "] +description = "USB communication proxy for Brewblox devices" +authors = ["BrewPi B.V. "] license = "GPL-3.0" readme = "README.md" @@ -11,7 +11,7 @@ python = ">=3.11,<4" fastapi = "^0.109.2" uvicorn = { extras = ["standard"], version = "^0.27.1" } pydantic-settings = "^2.1.0" -fastapi-mqtt = "^2.1.0" +pyserial = "^3.5" [tool.poetry.group.dev.dependencies] pytest-mock = "*" diff --git a/tasks.py b/tasks.py index 689ed60..0a17bd0 100644 --- a/tasks.py +++ b/tasks.py @@ -3,7 +3,7 @@ from invoke import Context, task ROOT = Path(__file__).parent.resolve() -IMAGE = 'ghcr.io/you/your-package' +IMAGE = 'ghcr.io/BrewBlox/brewblox-usb-proxy' @task diff --git a/test/conftest.py b/test/conftest.py index bd5f147..3e18954 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -13,8 +13,8 @@ from httpx import AsyncClient from pydantic_settings import BaseSettings, PydanticBaseSettingsSource -from your_package import app_factory, utils -from your_package.models import ServiceConfig +from brewblox_usb_proxy import app_factory, utils +from brewblox_usb_proxy.models import ServiceConfig LOGGER = logging.getLogger(__name__) diff --git a/test/test_discovery.py b/test/test_discovery.py new file mode 100644 index 0000000..10cf3ad --- /dev/null +++ b/test/test_discovery.py @@ -0,0 +1,2 @@ +def test_dummy(): + pass diff --git a/test/test_http_example.py b/test/test_http_example.py deleted file mode 100644 index 224c3f6..0000000 --- a/test/test_http_example.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -Tests the HTTP example endpoints. - -This includes minimal setup. We don't load what we don't need for this test. -""" - -import pytest -from fastapi import FastAPI -from httpx import AsyncClient - -from your_package import http_example - - -@pytest.fixture -def app() -> FastAPI: - """ - We override the `app` fixture from conftest.py - to set up the components we need for these tests. - - Now, when we use the `client` fixture, it uses this `app` fixture. - """ - - app = FastAPI() - app.include_router(http_example.router) - - return app - - -async def test_endpoint(client: AsyncClient): - # We didn't prefix the router with service name in `app.include_router()` - # The endpoint is router prefix (/example) + endpoint address (/endpoint) - resp = await client.post('/example/endpoint', json={'content': 'hello'}) - assert resp.status_code == 200 - assert resp.json() == {'content': 'Hi! You said `hello`.'} - - # If we send invalid data, the service immediately returns a 422 error - resp = await client.post('/example/endpoint', json={}) - assert resp.status_code == 422 diff --git a/test/test_mqtt_publish_example.py b/test/test_mqtt_publish_example.py deleted file mode 100644 index 9b4101c..0000000 --- a/test/test_mqtt_publish_example.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Tests the MQTT publishing example tests. - -This includes minimal setup, and mocking of the actual MQTT client. -""" - - -import asyncio -from contextlib import AsyncExitStack, asynccontextmanager -from datetime import timedelta -from unittest.mock import ANY, Mock - -import pytest -from asgi_lifespan import LifespanManager -from fastapi import FastAPI -from fastapi_mqtt import MQTTClient - -from your_package import mqtt, mqtt_publish_example, utils - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """ - We replicate `lifespan()` from `app_factory.py` here, - but only with what we need for this test - """ - async with AsyncExitStack() as stack: - await stack.enter_async_context(mqtt_publish_example.lifespan()) - yield - - -@pytest.fixture -def app() -> FastAPI: - """ - We override the `app` fixture from conftest.py - to set up the components we need for these tests. - - Now, when we use the `manager` fixture, it uses this `app` fixture. - """ - # We don't want to wait multiple seconds when testing - # Override the setting so we don't have to - config = utils.get_config() - config.publish_interval = timedelta(milliseconds=10) - - # We create a Mock object that replicates the functions of `MQTTClient` - # We set the ContextVar in `mqtt.py` with this mock, - # so it gets used by `mqtt_publish_example.py` - # This replaces `mqtt.setup()` - mqtt.CV.set(Mock(spec=MQTTClient)) - - app = FastAPI(lifespan=lifespan) - return app - - -async def test_publish(manager: LifespanManager): - # We want the app lifespan() to start, but we don't need to send HTTP requests - # For this test, we can use the `manager` fixture from conftest. - - # In the `app` fixture, we placed a Mock object in `mqtt.CV` - m_mqtt_client: Mock = mqtt.CV.get() - - # Let the publisher do it's thing for a bit - await asyncio.sleep(0.1) - - # We expect the publish call to have been used - assert m_mqtt_client.publish.call_count > 0 - - # The topic is `f'{config.history_topic}/{config.name}'` - # In tests, that equals `brewcast/history/your_package` - # The value is random, so we just match it with `ANY` - assert m_mqtt_client.publish.call_args_list[0].args == ('brewcast/history/your_package', - {'key': 'your_package', - 'data': {'value[degC]': ANY}}) diff --git a/your_package/http_example.py b/your_package/http_example.py deleted file mode 100644 index 2e33aa6..0000000 --- a/your_package/http_example.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -An example of how to register HTTP endpoints. - -This is just a minimal example. -For more information, see https://fastapi.tiangolo.com/ -""" - -from fastapi import APIRouter - -from .models import ExampleMessage - -# By using APIRouter, we can declare endpoints in separate files -# We create the `FastAPI` app in app_factory.py, and then load this router -router = APIRouter(prefix='/example', tags=['Example']) - - -@router.post('/endpoint') -async def endpoint_post(message: ExampleMessage) -> ExampleMessage: - """ - An example endpoint. - - The Pydantic models are automatically validated for both arguments - and returned data. - - All endpoints are listed and can be called at {service_name}/api/doc. - This docstring message is used as the description for this endpoint. - """ - return ExampleMessage(content=f'Hi! You said `{message.content}`.') diff --git a/your_package/models.py b/your_package/models.py deleted file mode 100644 index c4179f9..0000000 --- a/your_package/models.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -Pydantic models are declared here, and then imported wherever needed -""" - -from datetime import timedelta -from typing import Literal - -from pydantic import BaseModel -from pydantic_settings import BaseSettings, SettingsConfigDict - - -class ServiceConfig(BaseSettings): - """ - Global service configuration. - - Pydantic Settings (https://docs.pydantic.dev/latest/concepts/pydantic_settings/) - provides the `BaseSettings` model that loads values from environment variables. - - To access the loaded model, we use `utils.get_config()`. - """ - model_config = SettingsConfigDict( - # `name` is now loaded from the environment variable `your_package_name` - env_prefix='your_package_', - - # You can use either `your_package_name=name` or `YOUR_PACKAGE_NAME=name` - case_sensitive=False, - - # Ignores all unknown environment variables that happen to have the same prefix - json_schema_extra='ignore', - ) - - name: str = 'your_package' - debug: bool = False - - mqtt_protocol: Literal['mqtt', 'mqtts'] = 'mqtt' - mqtt_host: str = 'eventbus' - mqtt_port: int = 1883 - - history_topic: str = 'brewcast/history' - publish_interval: timedelta = timedelta(seconds=5) - - -class ExampleMessage(BaseModel): - """ - The data model for the example HTTP endpoint. - - For more options, see https://docs.pydantic.dev/latest/ - """ - content: str diff --git a/your_package/mqtt.py b/your_package/mqtt.py deleted file mode 100644 index 4571390..0000000 --- a/your_package/mqtt.py +++ /dev/null @@ -1,43 +0,0 @@ -from contextlib import asynccontextmanager -from contextvars import ContextVar - -from fastapi_mqtt.config import MQTTConfig -from fastapi_mqtt.fastmqtt import FastMQTT - -from . import utils - -CV: ContextVar[FastMQTT] = ContextVar('mqtt.client') -""" -The shared MQTT client for this service. - -ContextVar allows us to store a variable in a way that's both easily accessible, -and automatically cleared between tests or when hot reloading the service. -""" - - -def setup(): - """ - Creates the MQTT client, - and makes it available to other modules through the `CV` ContextVar. - """ - config = utils.get_config() - mqtt_config = MQTTConfig(host=config.mqtt_host, - port=config.mqtt_port, - ssl=(config.mqtt_protocol == 'mqtts'), - reconnect_retries=-1) - fmqtt = FastMQTT(config=mqtt_config) - CV.set(fmqtt) - - -@asynccontextmanager -async def lifespan(): - """ - Handles startup and shutdown of the MQTT client. - - `setup()` must have been called first. - This function only yields after it is connected to the MQTT eventbus. - """ - fmqtt = CV.get() - await fmqtt.mqtt_startup() - yield - await fmqtt.mqtt_shutdown() diff --git a/your_package/mqtt_publish_example.py b/your_package/mqtt_publish_example.py deleted file mode 100644 index 9b4d365..0000000 --- a/your_package/mqtt_publish_example.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -An example of how to publish MQTT events. - -MQTT is a publish/subscribe message protocol. -In Brewblox it's commonly used to publish history data and service state. - -For an explanation on MQTT, see https://randomnerdtutorials.com/what-is-mqtt-and-how-it-works/ - -For reference documentation on MQTT in Brewblox, see https://www.brewblox.com/dev/ - -The result is the same as https://www.brewblox.com/dev/tutorials/pubscript -but now as part of a service that can multiple things at the same time. -""" - -import asyncio -import logging -from contextlib import asynccontextmanager -from random import random - -from . import mqtt, utils - -LOGGER = logging.getLogger(__name__) - - -async def run(): - config = utils.get_config() - mqtt_client = mqtt.CV.get() - - topic: str = f'{config.history_topic}/{config.name}' - interval: float = config.publish_interval.total_seconds() - value: float = 20 - - while True: - await asyncio.sleep(interval) - - # Add a random value [-5, 5] so we see steady changes - value += ((random() - 0.5) * 10) - - # Format the message as a Brewblox history event - # https://www.brewblox.com/dev/reference/history_events.html - message = { - 'key': config.name, - 'data': {'value[degC]': value} - } - - mqtt_client.publish(topic, message) - LOGGER.info(f'sent {message}') - - -@asynccontextmanager -async def lifespan(): - # `utils.task_context()` wraps the async `run()` function - # in a background task that starts now, and is cancelled when the context ends. - async with utils.task_context(run()): - yield diff --git a/your_package/mqtt_subscribe_example.py b/your_package/mqtt_subscribe_example.py deleted file mode 100644 index 70f8321..0000000 --- a/your_package/mqtt_subscribe_example.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -An example of how to subscribe to MQTT events. - -MQTT is a publish/subscribe message protocol. -In Brewblox it's commonly used to publish history data and service state. - -For an explanation on MQTT, see https://randomnerdtutorials.com/what-is-mqtt-and-how-it-works/ - -For reference documentation on MQTT in Brewblox, see https://www.brewblox.com/dev/ - -For this example, we'll listen in on Brewblox history messages. -""" - -import json -import logging - -from . import mqtt, utils - -LOGGER = logging.getLogger(__name__) - - -def setup(): - config = utils.get_config() - mqtt_client = mqtt.CV.get() # `mqtt.CV` is set in `mqtt.setup()` - - # Set a callback for when the eventbus receives a message that matches this topic - # Subscriptions must be set before the MQTT client is connected - @mqtt_client.subscribe(config.history_topic + '/#') - async def on_history_message(client, topic, payload, qos, properties): - evt = json.loads(payload) - key = evt['key'] - data = evt['data'] - LOGGER.info(f'{topic=}, {key=}') - LOGGER.debug(f'{key=}, {data=}')