-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#32167][Prism] Use the worker-id gRPC metadata #33438
[#32167][Prism] Use the worker-id gRPC metadata #33438
Conversation
See #33450 and its resolving PR #33453: At 3636a3c, Python sends an empty string for worker_id key in the gRPC metadata. Debugging the The error
```
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "worker id in ctx metadata is an empty string"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, created_time:"2024-12-23T16:08:00.205761402-08:00"}"
>
During handling of the above exception, another exception occurred: Traceback (most recent call last):
|
fb83e65
to
ac3f7fe
Compare
ac3f7fe
to
3fd5559
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #33438 +/- ##
============================================
- Coverage 59.02% 59.02% -0.01%
Complexity 3185 3185
============================================
Files 1146 1147 +1
Lines 176085 176108 +23
Branches 3368 3368
============================================
+ Hits 103942 103944 +2
- Misses 68787 68804 +17
- Partials 3356 3360 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
R: @lostluck |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close but we can improve it! This top comment is a summary of what the more pointed comments are mentioning.
As it stands, we should not introduce a package global (the worker pool). That will only make things harder for later maintainers, so getting the plumbing right now is a good idea.
The main adjustment would be to not make it so we need to add 4 new locations for creating the net.Listen calls. We shouldn't need to be adding more of those as a result of this work. In principle, we should be getting fewer of them.
First, make the worker pool a struct. You already have this type in the form of the MultiplexW type. Move the map, and the lock onto that, and out of the package namespace.
Then we add a method to register onto a provided GRPC server, and stop making it's own. This lets us re-use an existing GRPC server, such as the one for JobManagement.
prism.CreateJobServer can be updated to create the pool and pass it to jobservices.NewServer. For it's part, the jobservices package can have an interface for accepting this worker factory, without even refering directly to any of the types from the worker
package. This lets the Job spit out prefixed workers.
The main gap is that worker production method would need to be part of the interface, so it can be exposed via the Job type. That method can return an any, that we can then cast to a *worker.W to do the remaining worker set up. I don't love this, but it's not the end of the world. If that type is wrong, everything breaks very early, which is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some largely minor comments I trust you can change without an additional review gate. Please make them before merging.
Please let me know if you have questions or want a subsequent final review.
This PR closes #32167 via implementation of a worker.MultiplexW to forward FnAPI gRPC requests to *W stored by id that matches the worker-id gRPC context metadata. In addition to typical go test workflow, to validate this PR
./gradlew :runners:portability:java:ulrLoopbackValidatesRunnerTests -PjobEndpoint=localhost:8073
was ran on a few initial tests. Theidle_shutdown_timeout
was tested by visual inspection to validate the additional service does not block the executable from shutting down.Depends on PR #33453
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.