Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent stream #164

Merged
merged 49 commits into from
Mar 21, 2024
Merged

Concurrent stream #164

merged 49 commits into from
Mar 21, 2024

Conversation

yoshuawuyts
Copy link
Owner

@yoshuawuyts yoshuawuyts commented Feb 16, 2024

This implements a system for concurrent handling of items in streams.

Pitch: we don't want to ever work with a Stream<Item = impl Future> for concurrency; what we really want is a ConcurrentAsyncIterator trait which does all the right things for us. That means the solution to "barbara battles buffered streams" is to tell her not to put futures inside of streams, and instead use "concurrent streams" directly.

This is likely a more robust direction than trying to weave a new poll_bg / poll_progress method through every future and async iterator. If we do things right, resolving this issue will come at the overhead of roughly an extra copy.

todos

  • base function impl based on StreamGroup
  • make the for_each closure return a future (T_T can't use nightly features here)
  • write integration tests to validate the impl
  • split it into a type with methods on it to set the values
  • convert that into own trait
  • add a drain method
  • add a conversion from a regular stream
  • implement a map method
  • implement a for_each method
  • split limit out into its own method
  • try_for_each to short-circuit on error
  • leverage a private Try family of traits
  • implement a collect method
  • stack pin the internal FutureGroup

References

@yoshuawuyts
Copy link
Owner Author

This should be good for review now I reckon. It's all implemented and working, just not fully documented and maybe,, smooth? But yeah, I think it's good now for people to start taking a look!

@yoshuawuyts
Copy link
Owner Author

Oh I guess maybe one more TODO item we could do here is remove the Pin<Box> from some of the impls. I think that will require pinning the entire thing, but that's probably fine. This is a consequence of futures being pinned, which we're storing inside of a FutureGroup. A little annoying, but probably OK.

@yoshuawuyts
Copy link
Owner Author

Oh and for anyone wondering about CI: nightly is currently broken because of rust-lang/rust#121708.

@yoshuawuyts
Copy link
Owner Author

yoshuawuyts commented Mar 20, 2024

@matheus-consoli I think this PR should be ready to be merged - can I ask you to work your magic to enable CI to pass?

@matheus-consoli
Copy link
Collaborator

🪄 simsalabim faça o CI funcionar pra mim 🪄

@yoshuawuyts yoshuawuyts merged commit 398a2f1 into main Mar 21, 2024
5 checks passed
@delete-merged-branch delete-merged-branch bot deleted the concurrent-stream branch March 21, 2024 09:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants