Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iteration method stuck in a loop when parent process dies #325

Open
giovannapalomba opened this issue Aug 9, 2024 · 3 comments
Open

Iteration method stuck in a loop when parent process dies #325

giovannapalomba opened this issue Aug 9, 2024 · 3 comments

Comments

@giovannapalomba
Copy link

Hello,

I refer to issue #298.
As it happens to the guy who opened it, the script execution gets stuck inside the for loop that flows the messages from the mqtt broker.
I read in the code of the updated library, in the anext() function that it is unblocked only for:
-disconnecting the broker
-iterating over the next arrived message

However, when the thread/task is a child of another process that dies, its execution gets stuck inside the function and so there is no way to kill the child as well.

I tried the given solution but it overloads the CPU.

To solve this problem I would propose to use the function threading.Event.variable_name.is_set() which checks the state of an asserted flag of the parent process in case it dies.
This check should be put inside the function that iterates anext() messages, so as to interrupt execution.

async def __anext__(variable_name) :
    [...]

    if variable_name.is_set():
        raise Exception("Parent died")

    [...]

Are there better solutions?

Translated with DeepL.com (free version)

@empicano
Copy link
Owner

Hi there, thanks for opening this issue!

The behavior you describe (__anext__ only returns on disconnection from the broker) is expected and IMO also the right one. What speaks against your proposed changes is that we might still want to be able to start a task that listens for messages in a fire-and-forget way.

Nonetheless, your use case sounds reasonable, so let's see how we can solve it 🙂 I'd suggest using a TaskGroup. The documentation states that "If the body of the async with statement exits with an exception [...] the remaining tasks are cancelled and then waited for, and non-cancellation exceptions are grouped into an exception group and raised."

Let's see an example:

import asyncio
import aiomqtt


async def child():
    print("Child task running")
    try:
        async with aiomqtt.Client("test.mosquitto.org") as client:
            await client.subscribe("temperature/#")
            async for message in client.messages:
                print(message.payload)
    except asyncio.CancelledError:
        print("Child task cancelled")


async def parent():
    print("Parent task running")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(child())
        await asyncio.sleep(1)
        print("Raising exception in parent task")
        raise Exception


asyncio.run(parent())

Let me know if that helps! 🙂

@giovannapalomba
Copy link
Author

Thank you for your suggestion.
It works, but in order to catch the exception asyncio.CancelledError the client must be alive until the parent dies, causing the CPU to overflow.

# Method to kill thread if parent thread is dead
async def client_killer(t):
 
    # Polling if parent is still alive
    while not shut_down_event.is_set():
       await asyncio.sleep(0)
 
    closing_result = t.cancel()
   
async def client_main(argv,  name_th):
 
    try:

        async with aiomqtt.Client(brokerIP,port) as client_BandR:
            await client_BandR.subscribe(topic_to_read, qos)
            async with asyncio.TaskGroup() as tg:  
               # client_Task is the main function to execute
                task = tg.create_task(client_Task(client_BandR))
               # killer of client_Task
                await client_killer(task)
           
    except Exception as e:
        u.exception_handler(e)

If we fix the code with the code I suggested earlier, there is no more need of the while in the killer task.
What do you think?

@empicano
Copy link
Owner

empicano commented Sep 5, 2024

Thanks for reporting back! 👍

You're right that we should really avoid the await asyncio.sleep(0). There's a lot we can do with asyncio and without changes to aiomqtt. The RealPython asyncio walkthrough is a good introduction to asyncio and the official asyncio docs are a good reference.

If you want to cancel the client manually inside the parent, you can do something like this:

import asyncio
import aiomqtt


async def child():
    print("Child task running")
    try:
        async with aiomqtt.Client("test.mosquitto.org") as client:
            await client.subscribe("temperature/#")
            async for message in client.messages:
                print(message.payload)
    except asyncio.CancelledError:
        print("Child task cancelled")


async def parent():
    print("Parent task running")
    async with asyncio.TaskGroup() as tg:
        task = tg.create_task(child())
        await asyncio.sleep(1)
        # Do something that might raise an exception on which the child should be prematurely canceled
        if False:
            print("Raising exception in parent task")
            raise Exception
        # Cancel the child task manually
        print("Canceling the child manually")
        task.cancel()
    # Do other things
    print("Doing other things inside the parent")
    await asyncio.sleep(2)


asyncio.run(parent())

Let me know if that helps 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants