-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
start using any io for structured concurency #1065
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@CAPITAINMARVEL @adeelsohailahmed please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good generally. Found one small breaking change.
Personally, since this is kind of a bigger change, I have fears if something else may break due to this, since we have no tests to verify if everything is working as it should. So I wouldn't feel very confident using this in a production app.
coros = [] | ||
for link in links: | ||
coros.append(link.fetch()) | ||
return await asyncio.gather(*coros) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method used to return here previously. I don't see how are the "results" being returned now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. will add test to his this is breaking change
if links: | ||
async with create_task_group() as tg: | ||
for link in links: | ||
tg.start_soon(link.fetch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken (since I don't know anyio), here we should use something else apart from tg.start_soon() since it can't be awaited...
Alternatively, if somehow these results can be collected afterwards then we await all those tasks and return all the collected results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g. FastAPI does something like this, lines 856-867:
https://github.com/fastapi/fastapi/blob/bffb4115a9b63127948cc5e1aa14d73940734f75/fastapi/dependencies/utils.py#L856
It calls tg.start_soon(), but then in the method it calls it stores those results to a variable declared in the "upper" scope. Not sure if this is feasible to do here, since we call some "internal" Link class method. Perhaps feasible, but with some more refactoring required...
Rationale
gather does not cancel other running tasks.
using structured concurency (aka TaskGroup) will cancel running if any of corutines in same group fails.
This avoid unnecesary and unexpected memory leak performance because we will be running "zombie" tasks we no longer need throwing out results.
blurb from python documentation
this is awailible in python 3.12 and equivalent in trio