Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Use the worker-id gRPC metadata #32167

Closed
Tracked by #29650
lostluck opened this issue Aug 13, 2024 · 0 comments · Fixed by #33438
Closed
Tracked by #29650

[Prism] Use the worker-id gRPC metadata #32167

lostluck opened this issue Aug 13, 2024 · 0 comments · Fixed by #33438
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Aug 13, 2024

A lesser known feature of the FnAPI protocol is that the SDK needs to set GRPC metadata before a runner should acknowledge the worker for all RPCs.

This allows distinguishes between pipeline workers to avoid needing a new port for each worker instance within a job, but also to distinguish which job the worker is a part of. So there are savings for ports, and GRPC based goroutines, which in extreme cases could cause efficiency issue in thread scheduling.

The proposal is to have a single "multiplexer" layer within prism to route between the handlers for given jobs and workers. This should be on the same single port as JobManagement, since GRPC should allow sharing for different services on the same port. Otherwise allow a single port to be assigned and known at prism startup time for worker endpoint use.

The "worker_id" metadata can be looked up from a GRPC message's context. See grpcx.ReadWorkerID for how to do that: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/grpcx/metadata.go

This multiplexer would likely need to be started up by the jobservices Server as well, adding a dependency between the worker and jobservices package. If that's a problem, we can have the whatever is starting up the jobservice to also start up the worker multiplexer, and provide a way of registering workers for the job on the jobservices.Job type.

The multiplexer would implement the Beam FnAPI but otherwise be delegating to the existing implementations of those methods on the worker.W type, by looking up the appropriate worker.W instance by the jobID.

Workers would need to be unregistered on job termination to keep things tidy, but that can be handled via context cancellation on the job's Root context (the RootCtx field).

Aside: apparently it is possible to also serve the web pages on the same port too for Go: https://stackoverflow.com/questions/63668447/why-grpc-go-can-run-grpc-server-and-http-server-at-the-same-address-and-port-bu. Might be worthwhile to avoid spending ports.

Note that this would consolidate the GRPC internal per-worker Goroutines and structures. Each worker in a job would still have ~9 Goroutines to manage communication for that physical worker.

@lostluck lostluck changed the title Use the worker-id GRPC metadata [prism] Use the worker-id GRPC metadata Aug 13, 2024
@damondouglas damondouglas self-assigned this Oct 22, 2024
@damondouglas damondouglas changed the title [prism] Use the worker-id GRPC metadata [prism] Use the worker-id gRPC metadata Dec 26, 2024
@damondouglas damondouglas changed the title [prism] Use the worker-id gRPC metadata [Prism] Use the worker-id gRPC metadata Dec 26, 2024
damondouglas added a commit that referenced this issue Jan 3, 2025
* Implement MultiplexW and Pool

* Add missing license header

* Add multiplex worker to prism execute

* remove unused props

* Fix Prism python precommit

* Handle worker_id is empty string error

* Fix python worker id interceptor

* default empty _worker_id

* Revert defaulting worker id

* Fix worker_id in docker env

* Update per PR comments

* Add lock/unlock to MultiplexW

* Delegate W deletion via MW

* Remove unnecessary guard

* Small fixes after PR review

* Add code comment to MakeWorker

* clean up commented out code

* Revert portable/common changes
@github-actions github-actions bot added this to the 2.63.0 Release milestone Jan 3, 2025
stankiewicz pushed a commit to stankiewicz/beam that referenced this issue Jan 16, 2025
* Implement MultiplexW and Pool

* Add missing license header

* Add multiplex worker to prism execute

* remove unused props

* Fix Prism python precommit

* Handle worker_id is empty string error

* Fix python worker id interceptor

* default empty _worker_id

* Revert defaulting worker id

* Fix worker_id in docker env

* Update per PR comments

* Add lock/unlock to MultiplexW

* Delegate W deletion via MW

* Remove unnecessary guard

* Small fixes after PR review

* Add code comment to MakeWorker

* clean up commented out code

* Revert portable/common changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants