Skip to content

Commit

Permalink
WIP on refactoring GCS object table to handle multiple waiting requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Russell-Tran committed May 8, 2024
1 parent c70783e commit cd7b060
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 59 deletions.
52 changes: 30 additions & 22 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strconv"
"sync"
"math/rand"

"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
Expand All @@ -23,7 +24,7 @@ func main() {
}
_ = lis;
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, &GCSObjServer{}) // TODO: do i need to do a create for the map instead of a raw struct?
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
Expand All @@ -33,8 +34,18 @@ func main() {
// server is used to implement your gRPC service.
type GCSObjServer struct {
pb.UnimplementedGCSObjServer
objectLocations map[uint32][]uint32
objectLocations map[uint64][]uint64
mu sync.Mutex
cond *sync.Cond
}

func NewGCSObjServer() *GCSObjServer {
mu := sync.Mutex{} // Create a mutex.
return &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
mu: mu,
cond: sync.NewCond(&mu),
}
}

// Implement your service methods here.
Expand All @@ -44,34 +55,31 @@ func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest

// Append the nodeId to the list for the given uid
s.objectLocations[req.Uid] = append(s.objectLocations[req.Uid], req.NodeId)
//log.Printf("NotifyOwns: Added Node %d to UID %d", req.NodeId, req.Uid)
s.cond.Broadcast()

return &pb.StatusResponse{Success: true}, nil
}


// TODO: Actually needs to match the pseudocode from our project architecture -- like it should
// be hanging out with the channels thing
func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.StatusResponse, error) {
func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.RequestLocationResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
nodeIds, exists := s.objectLocations[req.Uid]
s.mu.Unlock()

if !exists || len(nodeIds) == 0 {
log.Printf("RequestLocation: No locations found for UID %d", req.Uid)
// Returning a StatusResponse indicating failure, instead of nil and a Go error
return &pb.StatusResponse{
Success: false,
ErrorCode: 404, // Or another appropriate error code
ErrorMessage: "no locations found for given UID",
Details: "The requested UID does not exist in the object locations map.",
}, nil // No error returned here; encoding the failure in the response message

for !exists || len(nodeIds) == 0 {
s.cond.Wait()
nodeIds, exists = s.objectLocations[req.Uid]
if ctx.Err() != nil {
return nil, status.Error(codes.Canceled, "request cancelled or context deadline exceeded")
}
}

// Assume successful case
//log.Printf("RequestLocation: Returning location for UID %d: Node %d", req.Uid, nodeIds[0])
return &pb.StatusResponse{
Success: true,
Details: strconv.Itoa(int(nodeIds[0])),
randomIndex := rand.Intn(len(nodeIds))
return &pb.RequestLocationResponse{
NodeId : nodeIds[randomIndex]
}, nil
}

// NOTE: We only use one cv for now, which may cause performance issues in the future
// However, there is significant overhead if we want one cv per object, as then we would have to manage
// the cleanup through reference counting
2 changes: 1 addition & 1 deletion go/cmd/gcsobjtable/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var lis *bufconn.Listener
func init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, &GCSObjServer{objectLocations: make(map[uint32][]uint32)})
pb.RegisterGCSObjServer(s, &GCSObjServer{objectLocations: make(map[uint64][]uint64)})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
Expand Down
54 changes: 27 additions & 27 deletions go/pkg/rayclient.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions proto/rayclient.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ service GlobalScheduler {
}

message GlobalScheduleRequest {
uint32 uid = 1;
uint64 uid = 1;
string name = 2;
bytes args = 3;
bytes kwargs = 4;
Expand All @@ -47,7 +47,7 @@ message ScheduleRequest {
// returns a Future, which is just the UID for
// the generated object
message ScheduleResponse {
uint32 uid = 1;
uint64 uid = 1;
}

/* END WORKER NODE LOCAL SCHEDULER */
Expand All @@ -59,12 +59,12 @@ service LocalObjStore {
}

message StoreRequest {
uint32 uid = 1;
uint64 uid = 1;
bytes objectBytes = 2;
}

message GetRequest {
uint32 uid = 1;
uint64 uid = 1;
}

message GetResponse {
Expand All @@ -79,7 +79,7 @@ service Worker {
}

message RunRequest {
uint32 uid = 1;
uint64 uid = 1;
uint64 name = 2;
bytes args = 3;
bytes kwargs = 4;
Expand All @@ -95,13 +95,16 @@ service GCSObj {
}

message NotifyOwnsRequest {
uint32 uid = 1;
uint32 nodeId = 2;
uint64 uid = 1;
uint64 nodeId = 2;
}

message RequestLocationRequest {
uint32 uid = 1;
uint32 nodeId = 2;
uint64 uid = 1;
}

message RequestLocationResponse {
uint64 nodeId = 1;
}

/* END GCS OBJECT TABLE */
Expand Down

0 comments on commit cd7b060

Please sign in to comment.