diff --git a/mqtt_to_eventhub_module.py b/mqtt_to_eventhub_module.py index 9b9e76d..90dcee9 100644 --- a/mqtt_to_eventhub_module.py +++ b/mqtt_to_eventhub_module.py @@ -262,8 +262,9 @@ async def message_loop( async with client: logger.debug(f"Subscribing to base topic: {MQTT_BASE_TOPIC}") await client.subscribe(MQTT_BASE_TOPIC) - async with client.messages() as messages: - await process_message_batch(client, eventhub_producer, messages, logger) + # async with client.messages() as messages: + # see https://sbtinstruments.github.io/aiomqtt/migration-guide-v2.html#changes-to-the-message-queue + await process_message_batch(client, eventhub_producer, client.messages, logger) def log_error(error: Exception, *args) -> None: diff --git a/poetry.lock b/poetry.lock index 4af81ea..51d8dde 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,14 +1,14 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiomqtt" -version = "1.2.1" +version = "2.0.0" description = "The idiomatic asyncio MQTT client, wrapped around paho-mqtt" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "aiomqtt-1.2.1-py3-none-any.whl", hash = "sha256:3925b40b2b95b1905753d53ef3a9162e903cfab35ebe9647ab4d52e45ffb727f"}, - {file = "aiomqtt-1.2.1.tar.gz", hash = "sha256:7582f4341f08ef7110dd9ab3a559454dc28ccda1eac502ff8f08a73b238ecede"}, + {file = "aiomqtt-2.0.0-py3-none-any.whl", hash = "sha256:f3b97eca4a5a2c40769ed14f660520f733be1d2ec383a9976153fe49141e2fa2"}, + {file = "aiomqtt-2.0.0.tar.gz", hash = "sha256:3d480429334bdba4e4b9936c6cc198ea4f76a94d36cf294e0f713ec59f6a2120"}, ] [package.dependencies] @@ -17,13 +17,13 @@ typing-extensions = {version = ">=4.4.0,<5.0.0", markers = "python_version < \"3 [[package]] name = "azure-core" -version = "1.30.0" +version = "1.30.1" description = "Microsoft Azure Core Library for Python" optional = false python-versions = ">=3.7" files = [ - {file = "azure-core-1.30.0.tar.gz", hash = "sha256:6f3a7883ef184722f6bd997262eddaf80cfe7e5b3e0caaaf8db1695695893d35"}, - {file = "azure_core-1.30.0-py3-none-any.whl", hash = "sha256:3dae7962aad109610e68c9a7abb31d79720e1d982ddf61363038d175a5025e89"}, + {file = "azure-core-1.30.1.tar.gz", hash = "sha256:26273a254131f84269e8ea4464f3560c731f29c0c1f69ac99010845f239c1a8f"}, + {file = "azure_core-1.30.1-py3-none-any.whl", hash = "sha256:7c5ee397e48f281ec4dd773d67a0a47a0962ed6fa833036057f9ea067f688e74"}, ] [package.dependencies] @@ -753,13 +753,13 @@ testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtuale [[package]] name = "python-dateutil" -version = "2.8.2" +version = "2.9.0.post0" description = "Extensions to the standard Python datetime module" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" files = [ - {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, - {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, ] [package.dependencies] @@ -975,4 +975,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "eab9a6885b75445a52fc5354a03100e7b672f75e4e4f33b7da71f38d567efe61" +content-hash = "1e6cb34c33cb0b52bce8bb7337b90e11b1ca72bd4d7cda0dce0d66cd4a323801" diff --git a/pyproject.toml b/pyproject.toml index ea6cc44..ce73b8a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,11 +8,10 @@ readme = "README.md" [tool.poetry.dependencies] python = ">=3.9,<4.0" -paho-mqtt = "^1.6.1" python-dotenv-vault = "^0.6.3" azure-eventhub = "^5.11.4" requests = "^2.31.0" -aiomqtt = "^1.2.0" +aiomqtt = "^2.0.0" [tool.poetry.group.dev.dependencies] black = "^24.0.0" diff --git a/tests/test_mqtt_to_eventhub_module.py b/tests/test_mqtt_to_eventhub_module.py index 9abe3b1..dd665c8 100644 --- a/tests/test_mqtt_to_eventhub_module.py +++ b/tests/test_mqtt_to_eventhub_module.py @@ -70,16 +70,17 @@ async def test_real_mqtt_connection(self): client = mqtt_to_eventhub_module.get_client() # Async function to handle connection - async def on_connect_async(client, _userdata, _flags, rc): + async def on_connect_async(_client, _userdata, _flags, rc, _properties): assert rc == 0 # 0 means successful connection client.on_connect = on_connect_async - await client.connect() # .connect_async(MQTT_HOST, MQTT_PORT) + # see https://sbtinstruments.github.io/aiomqtt/migration-guide-v2.html + await client.__aenter__() # .connect_async(MQTT_HOST, MQTT_PORT) await asyncio.sleep(1) # give it a second to connect # Disconnect - await client.disconnect() + await client.__aexit__(None, None, None) @pytest.mark.asyncio @patch("mqtt_to_eventhub_module.on_success_async", new_callable=AsyncMock) @@ -807,20 +808,12 @@ async def test_message_loop(self, logger_mock, process_message_batch_mock): client_mock = AsyncMock(spec=aiomqtt.Client) client_mock.subscribe = AsyncMock() - # Mock the messages context manager to be an async generator - messages_mock = AsyncMock() - + # Prepare messages async generator directly without using __aenter__ async def messages_async_gen(): - for msg in [ - AsyncMock() - # aiomqtt.Message(topic=self.PATCHED_BASE_TOPIC, payload=b"test", qos=0, retain=False, mid=0, properties={}), - ]: - yield msg + yield AsyncMock() # Mock a message - expected_messages_generator = messages_async_gen() - # Use AsyncMock to return an async generator when entering the context manager - messages_mock.__aenter__.return_value = expected_messages_generator - client_mock.messages.return_value = messages_mock + # Mock `client.messages` as an async iterator + client_mock.messages = messages_async_gen() # Run the message loop with the mocked objects await mqtt_to_eventhub_module.message_loop(eventhub_producer_mock, client_mock) @@ -833,14 +826,14 @@ async def messages_async_gen(): client_mock, eventhub_producer_mock, ANY, logger_mock ) - # for the async generator, we need to check that the actual generator passed to process_message_batch is the same as the one we expect + # we need to check that the actual generator passed to process_message_batch is the same as the one we expect actual_call_args = process_message_batch_mock.await_args actual_messages_arg = actual_call_args[0][ 2 - ] # This should be the async generator - # Now check if actual_messages_arg is the async generator we expect + ] # This should be client_mock.messages + # Now check if actual_messages_arg is called with client_mock.messages as we expect assert ( - actual_messages_arg is expected_messages_generator + actual_messages_arg is client_mock.messages ), "process_message_batch was not called with the expected async generator" # If you need to check log messages