Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data library async implementation #402

Open
wants to merge 39 commits into
base: master
Choose a base branch
from

Conversation

georgebanasios
Copy link
Contributor

@georgebanasios georgebanasios commented Nov 28, 2024

Added

  • The SDK now provides Reactor Core-based asynchronous APIs for all query, management, streaming query/ingestion (StreamingClient) endpoints, enabling non-blocking operations.

Changed

  • [BREAKING] All synchronous query/management, streaming query/ingestion (StreamingClient) APIs now delegate to their asynchronous counterparts internally and block for results.

@AsafMah
Copy link
Contributor

AsafMah commented Dec 1, 2024

Will you let us know when it's ready to review?

@georgebanasios
Copy link
Contributor Author

@AsafMah
Of course! This is my priority right now, and I’m fully focused on it.
I’ll let you know as soon as it’s ready for review.

@georgebanasios
Copy link
Contributor Author

georgebanasios commented Dec 3, 2024

Hello @AsafMah, @ohadbitt !

I have a question about the approach for managing shared resources, particularly regarding concurrency and synchronization.
Given that the current design uses locks to manage concurrent access to shared state (e.g., resource sets, identity token), I’m curious about the reasoning behind this decision when fully moving to async.

Were you guys considering any specific strategies or patterns to handle concurrency in an async context? For instance, as async workflows will likely involve multiple tasks accessing and updating shared resources concurrently, I’m wondering if the locking approach will be adapted or if there's an intention to remove the locks in favor of another solution, (perhaps using atomic non-blocking operations, reactive patterns, etc.) in order to ensure that async tasks can safely read/write shared resources without blocking each other.

@AsafMah
Copy link
Contributor

AsafMah commented Dec 4, 2024

@georgebanasios we encountered the same issues, and currently are still looking for a good solution.

Eliminating locks whenever possible is usually an advantage either way.

As far as I could find, there is no standard async-supporting lock, you might be able to simulate it with a task that periodically checks it.

@ohadbitt
Copy link
Collaborator

ohadbitt commented Dec 4, 2024

@

@georgebanasios we encountered the same issues, and currently are still looking for a good solution.

Eliminating locks whenever possible is usually an advantage either way.

As far as I could find, there is no standard async-supporting lock, you might be able to simulate it with a task that periodically checks it.

@georgebanasios: Thank you for your contribution, we too thought about the solution you did here, but we think its better to make sure the fetch is only called once.

We conversed about it, i found this article interesting :
https://nitschinger.at/Reactive-Barriers-with-Reactor
Although the usage of MonoProcessor is deprecated, project reactor suggests to use instead Sinks.One. Here's a copilot sample
Sinks.One sink = Sinks.one();
Mono mono = sink.asMono();
mono.subscribe(value -> System.out.println("Received: " + value));
sink.emitValue("Hello, Reactor!");

@georgebanasios
Copy link
Contributor Author

georgebanasios commented Dec 4, 2024

@

@georgebanasios we encountered the same issues, and currently are still looking for a good solution.
Eliminating locks whenever possible is usually an advantage either way.
As far as I could find, there is no standard async-supporting lock, you might be able to simulate it with a task that periodically checks it.

@georgebanasios: Thank you for your contribution, we too thought about the solution you did here, but we think its better to make sure the fetch is only called once.

We conversed about it, i found this article interesting : https://nitschinger.at/Reactive-Barriers-with-Reactor Although the usage of MonoProcessor is deprecated, project reactor suggests to use instead Sinks.One. Here's a copilot sample Sinks.One sink = Sinks.one(); Mono mono = sink.asMono(); mono.subscribe(value -> System.out.println("Received: " + value)); sink.emitValue("Hello, Reactor!");

@ohadbitt Thanks for the suggestion!

Just to clarify, are you referring specifically to the fetch operations related to resource management (i.e.., ingestion resources, identity token)?

@ohadbitt
Copy link
Collaborator

ohadbitt commented Dec 5, 2024

@

@georgebanasios we encountered the same issues, and currently are still looking for a good solution.
Eliminating locks whenever possible is usually an advantage either way.
As far as I could find, there is no standard async-supporting lock, you might be able to simulate it with a task that periodically checks it.

@georgebanasios: Thank you for your contribution, we too thought about the solution you did here, but we think its better to make sure the fetch is only called once.
We conversed about it, i found this article interesting : https://nitschinger.at/Reactive-Barriers-with-Reactor Although the usage of MonoProcessor is deprecated, project reactor suggests to use instead Sinks.One. Here's a copilot sample Sinks.One sink = Sinks.one(); Mono mono = sink.asMono(); mono.subscribe(value -> System.out.println("Received: " + value)); sink.emitValue("Hello, Reactor!");

@ohadbitt Thanks for the suggestion!

Just to clarify, are you referring specifically to the fetch operations related to resource management (i.e.., ingestion resources, identity token)?

Yes I'm referring to any result we used to take once (or once every .. )
Actually there's no need to do the ingest library on the same PR .. i mentioned here the issue the issue you raised with the CloudSettings

@georgebanasios georgebanasios changed the title Async implementation Data library async implementation Dec 5, 2024
@georgebanasios georgebanasios marked this pull request as ready for review December 7, 2024 20:48
@georgebanasios
Copy link
Contributor Author

georgebanasios commented Dec 7, 2024

Hello @AsafMah @ohadbitt
The PR is now ready for your review whenever you have the time, thank you!

As far as cloud settings, since sinks emit results serially, and we want concurrent access but while fetching only once for a specific cluster url, do you think a computeIfAbsent is sufficient?

@AsafMah AsafMah self-requested a review January 1, 2025 07:07
Copy link
Collaborator

@ohadbitt ohadbitt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still trying to remove most of the flatMaps where its not async call :)

  • it seems to me to be a big deal but maybe im just new to reactive

@georgebanasios
Copy link
Contributor Author

georgebanasios commented Jan 2, 2025

still trying to remove most of the flatMaps where its not async call :)

  • it seems to me to be a big deal but maybe im just new to reactive

No, you are right. I managed to remove almost all where there is not an async call. The ones left are on BaseClient (for the async parsing response), and on ClientImpl (for the http calls). I'll look into it if any remaining can be removed.
Edit: down goes every flatMap from ClientImpl.

@ohadbitt
Copy link
Collaborator

ohadbitt commented Jan 5, 2025

Approved but please do add the "throws " to the async API as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants