Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ignore me] Integrate GCS with Locals, basic functionality #5

Merged
merged 27 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a317252
Create extract IP address from context util function
Russell-Tran May 9, 2024
71a4cd3
WIP refactoring GCS object table, local object table, protocol, for c…
Russell-Tran May 12, 2024
d9717dd
Update protocol to reflect rayclient.proto
Russell-Tran May 12, 2024
6211d70
Resolve some compiler errors for GCS object table
Russell-Tran May 12, 2024
ecb6810
Rename LocationFoundResponse to LocationFoundCallback
Russell-Tran May 12, 2024
b13b979
Continued refactor of LocationFoundResponse to LocationFoundCallback
Russell-Tran May 12, 2024
e11f21b
Refactor uint32 to uint64
Russell-Tran May 12, 2024
d2bd36a
GCS object table successful compile & test by commenting out old tests
Russell-Tran May 12, 2024
efc7dd2
Silly comment
Russell-Tran May 12, 2024
27b04e0
Conversion of uint32 to uint64 and some other protocol fixes s.t. loc…
Russell-Tran May 12, 2024
52c965b
Comment
Russell-Tran May 12, 2024
f794d6c
Merge branch 'locals' into locals-meets-gcs
Russell-Tran May 12, 2024
6678322
Resolve a merge issue; the rest of this is VSCode's linting
Russell-Tran May 12, 2024
3c944c4
Refactor name
Russell-Tran May 12, 2024
8c6d34c
Temporarily put back the address and port testing fields in LocationF…
Russell-Tran May 12, 2024
209eb25
Revert "Temporarily put back the address and port testing fields in L…
Russell-Tran May 12, 2024
7b041f6
Implement waitlist functionality
Russell-Tran May 12, 2024
293feed
Revert "Revert "Temporarily put back the address and port testing fie…
Russell-Tran May 13, 2024
045bf14
go get github.com/stretchr/testify
Russell-Tran May 13, 2024
0fff148
DOES NOT WORK - WIP writing tests for GCSObjectTable using UnaryInter…
Russell-Tran May 13, 2024
7b3f855
Frustrating checkpoint for another GCSObjectTable mock attempt that d…
Russell-Tran May 13, 2024
29560bf
Frustration mounting - at least I discovered that Go does not obey tr…
Russell-Tran May 13, 2024
9bd78f5
Make some unit tests that pass
Russell-Tran May 14, 2024
fe0dc89
Update README.md
Russell-Tran May 14, 2024
b358023
Update README.md
Russell-Tran May 14, 2024
fd684eb
Merge pull request #4 from rodrigo-castellon/locals
Russell-Tran May 15, 2024
528f2e3
Merge branch 'main' into locals-meets-gcs
Russell-Tran May 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,5 @@ go.work
go/cmd/gcsfunctable/main
go/cmd/gcsobjtable/gcsobjtable
python/babyray/rayclient_pb2_grpc.py-e
go/cmd/localobjstore/localobjstore
go/cmd/localscheduler/localscheduler
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@
Currently a work in progress. By the end of this project, you should be able to simulate a full Ray cluster using Docker Compose and launch CPU jobs using Python, using exactly the same API exposed by the real Ray Core library.


## Automatic deployment - shorthand version
```bash
make all
```

```bash
docker-compose up
```

Separate terminal:
```bash
./log_into_driver.sh
```

## Automatic deployment

You can automatically deploy the entire cluster (workers, GCS, global scheduler nodes) by following these instructions.
Expand Down
218 changes: 152 additions & 66 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
@@ -1,88 +1,174 @@
package main

import (
"context"
// "errors"
"log"
"net"
"strconv"
"sync"
"math/rand"

"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"

"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"context"
"time"

// "errors"
"log"
"math/rand"
"net"
"strconv"
"sync"

"github.com/rodrigo-castellon/babyray/config"
pb "github.com/rodrigo-castellon/babyray/pkg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/peer"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var cfg *config.Config

func main() {
cfg := config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

// server is used to implement your gRPC service.
type GCSObjServer struct {
pb.UnimplementedGCSObjServer
objectLocations map[uint64][]uint64
mu sync.Mutex
cond *sync.Cond
pb.UnimplementedGCSObjServer
objectLocations map[uint64][]uint64 // object uid -> list of nodeIds as uint64
waitlist map[uint64][]string // object uid -> list of IP addresses as string
mu sync.Mutex // lock should be used for both objectLocations and waitlist
}

func NewGCSObjServer() *GCSObjServer {
server := &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
mu: sync.Mutex{}, // Mutex initialized here.
}
server.cond = sync.NewCond(&server.mu) // Properly pass the address of the struct's mutex.
return server
server := &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
waitlist: make(map[uint64][]string),
mu: sync.Mutex{},
}
return server
}

/*
Returns a nodeId that has object uid. If it doesn't exist anywhere,
then the second return value will be false.
Assumes that s's mutex is locked.
*/
func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) {
nodeIds, exists := s.objectLocations[uid]
if !exists || len(nodeIds) == 0 {
return nil, false
}

// Note: policy is to pick a random one; in the future it will need to be locality-based
randomIndex := rand.Intn(len(nodeIds))
nodeId := &nodeIds[randomIndex]
return nodeId, true
}

// sendCallback sends a location found callback to the local object store client
// This should be used as a goroutine
func (s *GCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uint64) {
// Set up a new gRPC connection to the client
// TODO: Refactor to save gRPC connections rather than creating a new one each time
// Dial is lazy-loading, but we should still save the connection for future use
conn, err := grpc.Dial(clientAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// Log the error instead of returning it
log.Printf("Failed to connect back to client at %s: %v", clientAddress, err)
return
}
defer conn.Close() // TODO: remove in some eventual universe

localObjStoreClient := pb.NewLocalObjStoreClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Call LocationFound and handle any potential error
_, err = localObjStoreClient.LocationFound(ctx, &pb.LocationFoundCallback{Uid: uid, Location: nodeId})
if err != nil {
log.Printf("Failed to send LocationFound callback for UID %d to client at %s: %v", uid, clientAddress, err)
}
}

// Implement your service methods here.
func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest) (*pb.StatusResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

uid, nodeId := req.Uid, req.NodeId

// Append the nodeId to the list for the given object uid
if _, exists := s.objectLocations[uid]; !exists {
s.objectLocations[uid] = []uint64{} // Initialize slice if it doesn't exist
}
s.objectLocations[uid] = append(s.objectLocations[uid], nodeId)

// Append the nodeId to the list for the given uid
s.objectLocations[req.Uid] = append(s.objectLocations[req.Uid], req.NodeId)
s.cond.Broadcast()
// Clear waitlist for this uid, if any
waitingIPs, exists := s.waitlist[uid]
if exists {
for _, clientAddress := range waitingIPs {
go s.sendCallback(clientAddress, uid, nodeId)
}
// Clear the waitlist for the given uid after processing
delete(s.waitlist, uid)
}

return &pb.StatusResponse{Success: true}, nil
return &pb.StatusResponse{Success: true}, nil
}

func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.RequestLocationResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
nodeIds, exists := s.objectLocations[req.Uid]

for !exists || len(nodeIds) == 0 {
s.cond.Wait()
nodeIds, exists = s.objectLocations[req.Uid]
if ctx.Err() != nil {
return nil, status.Error(codes.Canceled, "request cancelled or context deadline exceeded")
}
}

// Assume successful case
randomIndex := rand.Intn(len(nodeIds))
return &pb.RequestLocationResponse{
NodeId : nodeIds[randomIndex],
}, nil
}
s.mu.Lock()
defer s.mu.Unlock()

// Extract client address
p, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Error(codes.Internal, "could not get peer information; hence, failed to extract client address")
}

// /////
// log.SetOutput(os.Stdout)
// log.Println(p.Addr.String())
// //////////

// NOTE: We only use one cv for now, which may cause performance issues in the future
// However, there is significant overhead if we want one cv per object, as then we would have to manage
// the cleanup through reference counting
host, _, err := net.SplitHostPort(p.Addr.String()) // Strip out the ephemeral port
if err != nil {
//return nil, status.Error(codes.Internal, "could not split host and port")
// Temporary workaround: If splitting fails, use the entire address string as the host
host = p.Addr.String()
}
clientPort := strconv.Itoa(cfg.Ports.LocalObjectStore) // Replace the ephemeral port with the official LocalObjectStore port
clientAddress := net.JoinHostPort(host, clientPort)

uid := req.Uid
nodeId, exists := s.getNodeId(uid)
if !exists {
// Add client to waiting list
if _, exists := s.waitlist[uid]; !exists {
s.waitlist[uid] = []string{} // Initialize slice if it doesn't exist
}
s.waitlist[uid] = append(s.waitlist[uid], clientAddress)

// Reply to this gRPC request
return &pb.RequestLocationResponse{
ImmediatelyFound: false,
}, nil
}

// Send immediate callback
go s.sendCallback(clientAddress, uid, *nodeId)

// Reply to this gRPC request
return &pb.RequestLocationResponse{
ImmediatelyFound: true,
}, nil
}
Loading
Loading