From cd7b0601899da45e489bb76a1ea4126af6924ba2 Mon Sep 17 00:00:00 2001 From: Russell Tran Date: Wed, 8 May 2024 16:36:04 -0700 Subject: [PATCH] WIP on refactoring GCS object table to handle multiple waiting requests --- go/cmd/gcsobjtable/main.go | 52 +++++++++++++++++-------------- go/cmd/gcsobjtable/main_test.go | 2 +- go/pkg/rayclient.pb.go | 54 ++++++++++++++++----------------- proto/rayclient.proto | 21 +++++++------ 4 files changed, 70 insertions(+), 59 deletions(-) diff --git a/go/cmd/gcsobjtable/main.go b/go/cmd/gcsobjtable/main.go index 479f292..56befde 100644 --- a/go/cmd/gcsobjtable/main.go +++ b/go/cmd/gcsobjtable/main.go @@ -7,6 +7,7 @@ import ( "net" "strconv" "sync" + "math/rand" "google.golang.org/grpc" pb "github.com/rodrigo-castellon/babyray/pkg" @@ -23,7 +24,7 @@ func main() { } _ = lis; s := grpc.NewServer() - pb.RegisterGCSObjServer(s, &GCSObjServer{}) // TODO: do i need to do a create for the map instead of a raw struct? + pb.RegisterGCSObjServer(s, NewGCSObjServer()) log.Printf("server listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) @@ -33,8 +34,18 @@ func main() { // server is used to implement your gRPC service. type GCSObjServer struct { pb.UnimplementedGCSObjServer - objectLocations map[uint32][]uint32 + objectLocations map[uint64][]uint64 mu sync.Mutex + cond *sync.Cond +} + +func NewGCSObjServer() *GCSObjServer { + mu := sync.Mutex{} // Create a mutex. + return &GCSObjServer{ + objectLocations: make(map[uint64][]uint64), + mu: mu, + cond: sync.NewCond(&mu), + } } // Implement your service methods here. @@ -44,34 +55,31 @@ func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest // Append the nodeId to the list for the given uid s.objectLocations[req.Uid] = append(s.objectLocations[req.Uid], req.NodeId) - //log.Printf("NotifyOwns: Added Node %d to UID %d", req.NodeId, req.Uid) + s.cond.Broadcast() return &pb.StatusResponse{Success: true}, nil } - -// TODO: Actually needs to match the pseudocode from our project architecture -- like it should -// be hanging out with the channels thing -func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.StatusResponse, error) { +func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.RequestLocationResponse, error) { s.mu.Lock() + defer s.mu.Unlock() nodeIds, exists := s.objectLocations[req.Uid] - s.mu.Unlock() - - if !exists || len(nodeIds) == 0 { - log.Printf("RequestLocation: No locations found for UID %d", req.Uid) - // Returning a StatusResponse indicating failure, instead of nil and a Go error - return &pb.StatusResponse{ - Success: false, - ErrorCode: 404, // Or another appropriate error code - ErrorMessage: "no locations found for given UID", - Details: "The requested UID does not exist in the object locations map.", - }, nil // No error returned here; encoding the failure in the response message + + for !exists || len(nodeIds) == 0 { + s.cond.Wait() + nodeIds, exists = s.objectLocations[req.Uid] + if ctx.Err() != nil { + return nil, status.Error(codes.Canceled, "request cancelled or context deadline exceeded") + } } // Assume successful case - //log.Printf("RequestLocation: Returning location for UID %d: Node %d", req.Uid, nodeIds[0]) - return &pb.StatusResponse{ - Success: true, - Details: strconv.Itoa(int(nodeIds[0])), + randomIndex := rand.Intn(len(nodeIds)) + return &pb.RequestLocationResponse{ + NodeId : nodeIds[randomIndex] }, nil } + +// NOTE: We only use one cv for now, which may cause performance issues in the future +// However, there is significant overhead if we want one cv per object, as then we would have to manage +// the cleanup through reference counting \ No newline at end of file diff --git a/go/cmd/gcsobjtable/main_test.go b/go/cmd/gcsobjtable/main_test.go index accad4e..4516c52 100644 --- a/go/cmd/gcsobjtable/main_test.go +++ b/go/cmd/gcsobjtable/main_test.go @@ -18,7 +18,7 @@ var lis *bufconn.Listener func init() { lis = bufconn.Listen(bufSize) s := grpc.NewServer() - pb.RegisterGCSObjServer(s, &GCSObjServer{objectLocations: make(map[uint32][]uint32)}) + pb.RegisterGCSObjServer(s, &GCSObjServer{objectLocations: make(map[uint64][]uint64)}) go func() { if err := s.Serve(lis); err != nil { log.Fatalf("Server exited with error: %v", err) diff --git a/go/pkg/rayclient.pb.go b/go/pkg/rayclient.pb.go index 00703bb..196c233 100644 --- a/go/pkg/rayclient.pb.go +++ b/go/pkg/rayclient.pb.go @@ -98,7 +98,7 @@ type GlobalScheduleRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Args []byte `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"` Kwargs []byte `protobuf:"bytes,4,opt,name=kwargs,proto3" json:"kwargs,omitempty"` @@ -136,7 +136,7 @@ func (*GlobalScheduleRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{1} } -func (x *GlobalScheduleRequest) GetUid() uint32 { +func (x *GlobalScheduleRequest) GetUid() uint64 { if x != nil { return x.Uid } @@ -234,7 +234,7 @@ type ScheduleResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` } func (x *ScheduleResponse) Reset() { @@ -269,7 +269,7 @@ func (*ScheduleResponse) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{3} } -func (x *ScheduleResponse) GetUid() uint32 { +func (x *ScheduleResponse) GetUid() uint64 { if x != nil { return x.Uid } @@ -281,7 +281,7 @@ type StoreRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` ObjectBytes []byte `protobuf:"bytes,2,opt,name=objectBytes,proto3" json:"objectBytes,omitempty"` } @@ -317,7 +317,7 @@ func (*StoreRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{4} } -func (x *StoreRequest) GetUid() uint32 { +func (x *StoreRequest) GetUid() uint64 { if x != nil { return x.Uid } @@ -336,7 +336,7 @@ type GetRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` } func (x *GetRequest) Reset() { @@ -371,7 +371,7 @@ func (*GetRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{5} } -func (x *GetRequest) GetUid() uint32 { +func (x *GetRequest) GetUid() uint64 { if x != nil { return x.Uid } @@ -430,7 +430,7 @@ type RunRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` Name uint64 `protobuf:"varint,2,opt,name=name,proto3" json:"name,omitempty"` Args []byte `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"` Kwargs []byte `protobuf:"bytes,4,opt,name=kwargs,proto3" json:"kwargs,omitempty"` @@ -468,7 +468,7 @@ func (*RunRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{7} } -func (x *RunRequest) GetUid() uint32 { +func (x *RunRequest) GetUid() uint64 { if x != nil { return x.Uid } @@ -501,8 +501,8 @@ type NotifyOwnsRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` - NodeId uint32 `protobuf:"varint,2,opt,name=nodeId,proto3" json:"nodeId,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + NodeId uint64 `protobuf:"varint,2,opt,name=nodeId,proto3" json:"nodeId,omitempty"` } func (x *NotifyOwnsRequest) Reset() { @@ -537,14 +537,14 @@ func (*NotifyOwnsRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{8} } -func (x *NotifyOwnsRequest) GetUid() uint32 { +func (x *NotifyOwnsRequest) GetUid() uint64 { if x != nil { return x.Uid } return 0 } -func (x *NotifyOwnsRequest) GetNodeId() uint32 { +func (x *NotifyOwnsRequest) GetNodeId() uint64 { if x != nil { return x.NodeId } @@ -556,8 +556,8 @@ type RequestLocationRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint32 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` - NodeId uint32 `protobuf:"varint,2,opt,name=nodeId,proto3" json:"nodeId,omitempty"` + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` + NodeId uint64 `protobuf:"varint,2,opt,name=nodeId,proto3" json:"nodeId,omitempty"` } func (x *RequestLocationRequest) Reset() { @@ -592,14 +592,14 @@ func (*RequestLocationRequest) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{9} } -func (x *RequestLocationRequest) GetUid() uint32 { +func (x *RequestLocationRequest) GetUid() uint64 { if x != nil { return x.Uid } return 0 } -func (x *RequestLocationRequest) GetNodeId() uint32 { +func (x *RequestLocationRequest) GetNodeId() uint64 { if x != nil { return x.NodeId } @@ -811,7 +811,7 @@ var file_rayclient_proto_rawDesc = []byte{ 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x22, 0x69, 0x0a, 0x15, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, @@ -822,30 +822,30 @@ var file_rayclient_proto_rawDesc = []byte{ 0x67, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x22, 0x24, 0x0a, 0x10, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, - 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x75, 0x69, 0x64, + 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x22, 0x42, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x75, + 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x22, 0x2f, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x5e, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x11, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4f, 0x77, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, - 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6e, 0x6f, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, + 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x42, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, - 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x75, 0x69, 0x64, - 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, + 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, + 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x39, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, 0x18, 0x01, 0x20, diff --git a/proto/rayclient.proto b/proto/rayclient.proto index b2a745d..536d074 100644 --- a/proto/rayclient.proto +++ b/proto/rayclient.proto @@ -25,7 +25,7 @@ service GlobalScheduler { } message GlobalScheduleRequest { - uint32 uid = 1; + uint64 uid = 1; string name = 2; bytes args = 3; bytes kwargs = 4; @@ -47,7 +47,7 @@ message ScheduleRequest { // returns a Future, which is just the UID for // the generated object message ScheduleResponse { - uint32 uid = 1; + uint64 uid = 1; } /* END WORKER NODE LOCAL SCHEDULER */ @@ -59,12 +59,12 @@ service LocalObjStore { } message StoreRequest { - uint32 uid = 1; + uint64 uid = 1; bytes objectBytes = 2; } message GetRequest { - uint32 uid = 1; + uint64 uid = 1; } message GetResponse { @@ -79,7 +79,7 @@ service Worker { } message RunRequest { - uint32 uid = 1; + uint64 uid = 1; uint64 name = 2; bytes args = 3; bytes kwargs = 4; @@ -95,13 +95,16 @@ service GCSObj { } message NotifyOwnsRequest { - uint32 uid = 1; - uint32 nodeId = 2; + uint64 uid = 1; + uint64 nodeId = 2; } message RequestLocationRequest { - uint32 uid = 1; - uint32 nodeId = 2; + uint64 uid = 1; +} + +message RequestLocationResponse { + uint64 nodeId = 1; } /* END GCS OBJECT TABLE */