Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify message filtering #147

Merged
merged 13 commits into from
Nov 27, 2022
Merged

Simplify message filtering #147

merged 13 commits into from
Nov 27, 2022

Conversation

empicano
Copy link
Owner

Hi Frederik and Jonathan,

We discussed simplifying the message-filtering logic a few days ago. I played around with it for a bit and wanted your opinion. Nothing that's working yet, but for feedback on the general direction.

The idea is to simplify the current way of filtering messages to something like:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("measurements/#")
        async for message in messages:
            if message.matches("measurements/+/something"):
                print(message.payload)

For this, we need to reimplement paho-mqtt's message filtering logic in a new MqttMessage class that subclasses the paho-mqtt one.

In our initial discussion, you noted that you had an idea of how to add this in a backward-compatible way, Frederik. Could you give me some pointers how? 😊

Things I'm unsure about:

  • Should we need to check the given topic for validity? e.g. throw an exception for something like one/#/three
  • Match edge cases:
'one' matches('one/#')  # -> currently: False
'one' matches('one/+')  # -> currently: False
'$share/one' matches('one')  # -> currently: True

~ Felix

@JonathanPlasse
Copy link
Collaborator

JonathanPlasse commented Oct 31, 2022

Hi Felix,
Why do we need to reimplement the filtering logic?
You have in the function rec use recurse, I think.
You can also use pre-commit before pushing.
If mypy bothers you because unfiltered_messages and filtered_messages are used in the tests. You can comment them and run the rest of the tests.
Also, It would be neet that client.messages() be called multiple times like filtered_messages, I rely heavily on this feature.

@empicano
Copy link
Owner Author

empicano commented Nov 1, 2022

Hi Jonathan,

Thanks for taking a look!

Why do we need to reimplement the filtering logic?

As far as I understand it, paho-mqtt builds a tree to filter messages when you pass a topic to filtered_messages(), and then uses that tree to generate iterators that return messages matching these topics. The part that makes the current asyncio-mqtt approach cumbersome is that we have multiple iterators.

I don't see a way to reuse the paho-mqtt filtering logic, at least not directly, to filter inside only one iterator. Maybe you have some idea?

You have in the function rec use recurse, I think.

True, that's an error.

You can also use pre-commit before pushing.

Good tip, thanks! I'll try to get that going. Maybe we can write a CONTRIBUTING.md with handy info like that.

@empicano
Copy link
Owner Author

empicano commented Nov 3, 2022

@frederikaalund, do you have any feedback? 😊

@JonathanPlasse
Copy link
Collaborator

JonathanPlasse commented Nov 4, 2022

Good tip, thanks! I'll try to get that going. Maybe we can write a CONTRIBUTING.md with handy info like that.

I added it. Do you have any feedback?

@empicano
Copy link
Owner Author

empicano commented Nov 4, 2022

Looks great, Jonathan! 😎 🚀 Maybe we can add pre-commit as a development dependency to the setup.py, or the pyproject.toml respectively. Then we don't have to run pip install pre-commit separately. I don't have experience with pre-commit though, so I don't know what the best practice is. What do you think?

@JonathanPlasse
Copy link
Collaborator

I would keep the installation of pre-commit separate. I have never seen it in the extra dependencies.

@empicano
Copy link
Owner Author

empicano commented Nov 4, 2022

All right, sounds good! 👍

@frederikaalund
Copy link
Collaborator

frederikaalund commented Nov 4, 2022

Been a busy week at work, so I apologize for the delay. I'll try to set some time aside for asyncio-mqtt this weekend.

Great to see this PR as per our discussion! Let me have a look. :)

In our initial discussion, you noted that you had an idea of how to add this in a backward-compatible way, Frederik. Could you give me some pointers how? 😊

As long as we don't change the semantics of filtered_messages and unfiltered_messages then we are safe. :) The API that you propose accomplishes this via a new messages method. 👍 This is part of the reason why I didn't use the messages in the existing API (to leave it open for future use). ;)

Should we need to check the given topic for validity? e.g. throw an exception for something like one/#/three

In my oppinion, yes!

I propose that we do the following:

  • Change unfiltered_messages so that it allows multiple, simultaneous calls (this is easy and backwards-compatible)
  • Add a new messages method that wraps unfiltered_messages and wraps each message in a new asyncio_mqtt.Message class.
  • Add a new asyncio_mqtt.Message class. This class has the following properties:
    • Message.topic of type MqttTopic. On construction (in __init__) we check for topic validity according to the MQTT specification. Add a MqttTopic.matches method.
    • Message.payload with the raw message data.

This results in the following API (example):

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("measurements/#")
        async for message in messages:
            if message.topic.matches("measurements/#"):
                print(f"Measurement: {message.payload}")
            if message.topic.matches("measurements/+/something"):
                print(f"Measurement something: {message.payload}")

Note that both if statements may be true at the same time. That is a valid use case. Let me know what you think of this API.

I vote that we keep the "old" filtered_messages and unfiltered_messages methods around to not break compatibility.

The biggest drawback of this new API is that we have to replicate existing paho-mqtt functionality in the MqttTopic.matches method. I don't like that aspect. I do really like the resulting API, though. 😅 Decisions, decisions. :)

Again thanks for this PR. I invite everyone to give their oppinion in this discussion. 👍

~Frederik

@JonathanPlasse
Copy link
Collaborator

JonathanPlasse commented Nov 4, 2022

Should we need to check the given topic for validity? e.g. throw an exception for something like one/#/three

In my opinion, yes!

mqtt.Client._filter_wildcard_len_check() called in mqtt.Client.subscribe() implements it.

@empicano
Copy link
Owner Author

empicano commented Nov 5, 2022

Thanks for the feedback, Frederik!

I propose that we do the following:

  • Change unfiltered_messages so that it allows multiple, simultaneous calls (this is easy and backwards-compatible)
  • Add a new messages method that wraps unfiltered_messages and wraps each message in a new asyncio_mqtt.Message class.
  • Add a new asyncio_mqtt.Message class. This class has the following properties:
    • Message.topic of type MqttTopic. On construction (in __init__) we check for topic validity according to the MQTT specification. Add a MqttTopic.matches method.
    • Message.payload with the raw message data.

Sounds good! Using an own Topic class is a good idea I think 👍 message.topic.matches() is clearer than message.matches().

mqtt.Client._filter_wildcard_len_check() called in mqtt.Client.subscribe() implements it.

Great find, thanks Jonathan!

@empicano empicano force-pushed the simplify-filtering-messages branch from 92f2418 to f5e32b9 Compare November 15, 2022 01:24
@codecov
Copy link

codecov bot commented Nov 15, 2022

Codecov Report

Merging #147 (4d43388) into master (5a09db5) will increase coverage by 1.4%.
The diff coverage is 95.9%.

@@           Coverage Diff            @@
##           master    #147     +/-   ##
========================================
+ Coverage    89.4%   90.9%   +1.4%     
========================================
  Files           7       7             
  Lines         552     704    +152     
  Branches      110     150     +40     
========================================
+ Hits          494     640    +146     
- Misses         37      41      +4     
- Partials       21      23      +2     
Impacted Files Coverage Δ
asyncio_mqtt/client.py 83.3% <93.3%> (+2.7%) ⬆️
asyncio_mqtt/__init__.py 100.0% <100.0%> (ø)
tests/test_client.py 100.0% <100.0%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@empicano
Copy link
Owner Author

Finally had some time again! 😊

Jonathan, following the CONTRIBUTING.md worked really nicely, I got pre-commit set up now 👍

I'm glad for all feedback, I'm still a bit unsure if what I'm doing makes sense 😄

Change unfiltered_messages so that it allows multiple, simultaneous calls (this is easy and backwards-compatible)

Here I got stuck a bit. We can't simply use message_callback_add (as we do for unfiltered_messages) as consecutive calls with the same topic value ("#") overwrite the old callback value.

I could set the _on_message callback each time with a function that calls all old _put_in_queue callbacks plus the new one. But then I don't know how to delete them again when we exit the context manager. Or am I going in the wrong direction?

@JonathanPlasse
Copy link
Collaborator

JonathanPlasse commented Nov 15, 2022

Jonathan, following the CONTRIBUTING.md worked really nicely, I got pre-commit set up now 👍

🎉

@frederikaalund
Copy link
Collaborator

Great to see continued work on this. Keep it up. 😄 👍

Here I got stuck a bit. We can't simply use message_callback_add (as we do for unfiltered_messages) as consecutive calls with the same topic value ("#") overwrite the old callback value.

I could set the _on_message callback each time with a function that calls all old _put_in_queue callbacks plus the new one. But then I don't know how to delete them again when we exit the context manager. Or am I going in the wrong direction?

I was thinking something like this:

  • Set the self._client.on_message callback to a new internal method, e.g., _broadcast_unfiltered_messages during __init__. Never change self._client.on_message since then.
  • Rework the unfiltered_messages method so that it simply adds cb to a client-wide set of "listeners". E.g., self._listener_cbs.add(cb). It removes cb from the same set in the finally block.
  • Inside _broadcast_unfiltered_messages we iterate through all cbs in self._listener_cbs and call it. This way, we effectively broadcast the message.

Does it make sense? If not, just let me know and I'll try to write up some pseudo code for it. :)

@frederikaalund
Copy link
Collaborator

It's kind of a poor man's implementation of the Observer pattern. 😁

@empicano empicano force-pushed the simplify-filtering-messages branch 3 times, most recently from ec4cf23 to 25274b9 Compare November 19, 2022 22:23
@empicano empicano marked this pull request as ready for review November 19, 2022 23:11
@empicano
Copy link
Owner Author

empicano commented Nov 19, 2022

It removes cb from the same set in the finally block.

That's what I was missing! Thank you for your always very well thought out reviews, Frederik! 🎉


I think this is done from my side.

Summary of changes:

  • New message Topic class that implements a matches method to filter against wildcard topics
  • New Message class, ultimately only used to able to use our new Topic class
  • New messages() generator, that works like our unfiltered_messages() generator, but returns messages of our own Message class
  • Multiple messages() generators can now be used at the same time; The old un/filtered_messages() generators work exactly the same as before
  • On calling them, issue warning that un/filtered_messages() will be removed in a future release
  • Updated tests and documentation

I'm glad about any feedback! When you give the approval I'll merge 😊

Copy link
Collaborator

@frederikaalund frederikaalund left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a busy weekend, so my review is pretty short.

Overall, it looks good! :) I like the new API a lot (it feels right).

Decisions from here:

  • Should we release a 1.0.0 before we merge this in?

README.md Show resolved Hide resolved
asyncio_mqtt/client.py Outdated Show resolved Hide resolved
asyncio_mqtt/client.py Outdated Show resolved Hide resolved
asyncio_mqtt/client.py Outdated Show resolved Hide resolved
asyncio_mqtt/client.py Outdated Show resolved Hide resolved
asyncio_mqtt/client.py Show resolved Hide resolved
asyncio_mqtt/client.py Show resolved Hide resolved
asyncio_mqtt/client.py Show resolved Hide resolved
@empicano
Copy link
Owner Author

Thanks for the comments, Frederik! 🚀

Should we release a 1.0.0 before we merge this in?

Staying on 0.x is cheating a bit, but I think the flexibility is great for us maintainers, especially with new ideas like discussed in the anyio PR. In my opinion we're playing fair if we release 0.15 with this and then remove the old un/filtered_messages in the release after (together with other improvements). Like this the breaking change comes with enough warning.

@frederikaalund
Copy link
Collaborator

Staying on 0.x is cheating a bit, but I think the flexibility is great for us maintainers, especially with new ideas like discussed in the anyio PR. In my opinion we're playing fair if we release 0.15 with this and then remove the old un/filtered_messages in the release after (together with other improvements). Like this the breaking change comes with enough warning.

Too hurt as few users as possible let's at least have a long deprecation period on the old (un)filtered_messages. Otherwise, I agree. 👍 As long as we are on 0.x.x we can break things. I do think that we should release a 1.0.0 soon, though.

@frederikaalund
Copy link
Collaborator

Let me know if you need more feedback/review from me. 👍 It's been a busy week, so I kind of lost track of these PRs/issues.

@empicano empicano force-pushed the simplify-filtering-messages branch 2 times, most recently from d8541e5 to 9038718 Compare November 27, 2022 19:38
@empicano empicano force-pushed the simplify-filtering-messages branch from 9038718 to b057d48 Compare November 27, 2022 20:29
@empicano empicano force-pushed the simplify-filtering-messages branch from b057d48 to 4d43388 Compare November 27, 2022 20:34
@empicano empicano merged commit a6a1f83 into master Nov 27, 2022
@empicano empicano deleted the simplify-filtering-messages branch November 27, 2022 20:37
@flavio-fernandes
Copy link

flavio-fernandes commented Aug 19, 2023

hi @empicano . Sorry but the deprecation message was not very clear to me in regards to how to use messages() going forwards.

I currently have a code that looks like this:

https://github.com/flavio-fernandes/mqtt2kasa/blob/9269fba62be65a6f68ff336646c9e9057a4ba7dc/mqtt2kasa/main.py#L144-L154

Is it proper for me to simply change the last few lines to look like :

        messages = await stack.enter_async_context(client.messages())
        await client.subscribe("#")
        task = asyncio.create_task(handle_mqtt_messages(messages, main_events_q))
        tasks.add(task)

? or is there a better recommended way? Please document a before and after example, so people like me don't have to bother you. ;)

Thanks,
-- flaviof

@empicano
Copy link
Owner Author

Hi Flavio,

Sorry for the confusion! I admit the deprecation message could be a bit more descriptive.

messages() is mostly the same as the deprecated unfiltered_messages(), so you should be able to simply switch it out in your case (minus the added await client.subscribe("#"); As far as I understand your code, you're doing more fine-grained subscriptions a few lines after). This PR mainly changed how filtering messages work, but you don't use the deprecated filtered_messages() as far as I can see.

The main difference between messages() and unfiltered_messages() is that messages() returns instances of our own Message class and not the paho-mqtt one.

Hope that helps! 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants