diff --git a/.gitignore b/.gitignore index b31356d..3cdb695 100644 --- a/.gitignore +++ b/.gitignore @@ -196,3 +196,5 @@ go.work go/cmd/gcsfunctable/main go/cmd/gcsobjtable/gcsobjtable python/babyray/rayclient_pb2_grpc.py-e +go/cmd/localobjstore/localobjstore +go/cmd/localscheduler/localscheduler diff --git a/go/cmd/gcsobjtable/main.go b/go/cmd/gcsobjtable/main.go index fb37dcd..2b1735f 100644 --- a/go/cmd/gcsobjtable/main.go +++ b/go/cmd/gcsobjtable/main.go @@ -1,88 +1,174 @@ package main import ( - "context" - // "errors" - "log" - "net" - "strconv" - "sync" - "math/rand" - - "google.golang.org/grpc" - pb "github.com/rodrigo-castellon/babyray/pkg" - "github.com/rodrigo-castellon/babyray/config" - - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" + "context" + "time" + + // "errors" + "log" + "math/rand" + "net" + "strconv" + "sync" + + "github.com/rodrigo-castellon/babyray/config" + pb "github.com/rodrigo-castellon/babyray/pkg" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/peer" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +var cfg *config.Config + func main() { - cfg := config.GetConfig() // Load configuration - address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address - - lis, err := net.Listen("tcp", address) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - _ = lis; - s := grpc.NewServer() - pb.RegisterGCSObjServer(s, NewGCSObjServer()) - log.Printf("server listening at %v", lis.Addr()) - if err := s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) - } + cfg = config.GetConfig() // Load configuration + address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address + + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + _ = lis + s := grpc.NewServer() + pb.RegisterGCSObjServer(s, NewGCSObjServer()) + log.Printf("server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } } // server is used to implement your gRPC service. type GCSObjServer struct { - pb.UnimplementedGCSObjServer - objectLocations map[uint64][]uint64 - mu sync.Mutex - cond *sync.Cond + pb.UnimplementedGCSObjServer + objectLocations map[uint64][]uint64 // object uid -> list of nodeIds as uint64 + waitlist map[uint64][]string // object uid -> list of IP addresses as string + mu sync.Mutex // lock should be used for both objectLocations and waitlist } func NewGCSObjServer() *GCSObjServer { - server := &GCSObjServer{ - objectLocations: make(map[uint64][]uint64), - mu: sync.Mutex{}, // Mutex initialized here. - } - server.cond = sync.NewCond(&server.mu) // Properly pass the address of the struct's mutex. - return server + server := &GCSObjServer{ + objectLocations: make(map[uint64][]uint64), + waitlist: make(map[uint64][]string), + mu: sync.Mutex{}, + } + return server +} + +/* +Returns a nodeId that has object uid. If it doesn't exist anywhere, +then the second return value will be false. +Assumes that s's mutex is locked. +*/ +func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) { + nodeIds, exists := s.objectLocations[uid] + if !exists || len(nodeIds) == 0 { + return nil, false + } + + // Note: policy is to pick a random one; in the future it will need to be locality-based + randomIndex := rand.Intn(len(nodeIds)) + nodeId := &nodeIds[randomIndex] + return nodeId, true +} + +// sendCallback sends a location found callback to the local object store client +// This should be used as a goroutine +func (s *GCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uint64) { + // Set up a new gRPC connection to the client + // TODO: Refactor to save gRPC connections rather than creating a new one each time + // Dial is lazy-loading, but we should still save the connection for future use + conn, err := grpc.Dial(clientAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + // Log the error instead of returning it + log.Printf("Failed to connect back to client at %s: %v", clientAddress, err) + return + } + defer conn.Close() // TODO: remove in some eventual universe + + localObjStoreClient := pb.NewLocalObjStoreClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Call LocationFound and handle any potential error + _, err = localObjStoreClient.LocationFound(ctx, &pb.LocationFoundCallback{Uid: uid, Location: nodeId}) + if err != nil { + log.Printf("Failed to send LocationFound callback for UID %d to client at %s: %v", uid, clientAddress, err) + } } -// Implement your service methods here. func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest) (*pb.StatusResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() + + uid, nodeId := req.Uid, req.NodeId + + // Append the nodeId to the list for the given object uid + if _, exists := s.objectLocations[uid]; !exists { + s.objectLocations[uid] = []uint64{} // Initialize slice if it doesn't exist + } + s.objectLocations[uid] = append(s.objectLocations[uid], nodeId) - // Append the nodeId to the list for the given uid - s.objectLocations[req.Uid] = append(s.objectLocations[req.Uid], req.NodeId) - s.cond.Broadcast() + // Clear waitlist for this uid, if any + waitingIPs, exists := s.waitlist[uid] + if exists { + for _, clientAddress := range waitingIPs { + go s.sendCallback(clientAddress, uid, nodeId) + } + // Clear the waitlist for the given uid after processing + delete(s.waitlist, uid) + } - return &pb.StatusResponse{Success: true}, nil + return &pb.StatusResponse{Success: true}, nil } func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.RequestLocationResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() - nodeIds, exists := s.objectLocations[req.Uid] - - for !exists || len(nodeIds) == 0 { - s.cond.Wait() - nodeIds, exists = s.objectLocations[req.Uid] - if ctx.Err() != nil { - return nil, status.Error(codes.Canceled, "request cancelled or context deadline exceeded") - } - } - - // Assume successful case - randomIndex := rand.Intn(len(nodeIds)) - return &pb.RequestLocationResponse{ - NodeId : nodeIds[randomIndex], - }, nil -} + s.mu.Lock() + defer s.mu.Unlock() + + // Extract client address + p, ok := peer.FromContext(ctx) + if !ok { + return nil, status.Error(codes.Internal, "could not get peer information; hence, failed to extract client address") + } + + // ///// + // log.SetOutput(os.Stdout) + // log.Println(p.Addr.String()) + // ////////// -// NOTE: We only use one cv for now, which may cause performance issues in the future -// However, there is significant overhead if we want one cv per object, as then we would have to manage -// the cleanup through reference counting \ No newline at end of file + host, _, err := net.SplitHostPort(p.Addr.String()) // Strip out the ephemeral port + if err != nil { + //return nil, status.Error(codes.Internal, "could not split host and port") + // Temporary workaround: If splitting fails, use the entire address string as the host + host = p.Addr.String() + } + clientPort := strconv.Itoa(cfg.Ports.LocalObjectStore) // Replace the ephemeral port with the official LocalObjectStore port + clientAddress := net.JoinHostPort(host, clientPort) + + uid := req.Uid + nodeId, exists := s.getNodeId(uid) + if !exists { + // Add client to waiting list + if _, exists := s.waitlist[uid]; !exists { + s.waitlist[uid] = []string{} // Initialize slice if it doesn't exist + } + s.waitlist[uid] = append(s.waitlist[uid], clientAddress) + + // Reply to this gRPC request + return &pb.RequestLocationResponse{ + ImmediatelyFound: false, + }, nil + } + + // Send immediate callback + go s.sendCallback(clientAddress, uid, *nodeId) + + // Reply to this gRPC request + return &pb.RequestLocationResponse{ + ImmediatelyFound: true, + }, nil +} diff --git a/go/cmd/gcsobjtable/main_test.go b/go/cmd/gcsobjtable/main_test.go index 0d447ec..aa75142 100644 --- a/go/cmd/gcsobjtable/main_test.go +++ b/go/cmd/gcsobjtable/main_test.go @@ -1,14 +1,21 @@ package main import ( - "context" - "net" - "testing" + "context" + "fmt" "log" + "math/rand" + "net" + "strconv" + "sync" + "testing" + "time" - "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" - pb "github.com/rodrigo-castellon/babyray/pkg" + "github.com/rodrigo-castellon/babyray/config" + pb "github.com/rodrigo-castellon/babyray/pkg" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/test/bufconn" ) const bufSize = 1024 * 1024 @@ -16,63 +23,552 @@ const bufSize = 1024 * 1024 var lis *bufconn.Listener func init() { - lis = bufconn.Listen(bufSize) - s := grpc.NewServer() - pb.RegisterGCSObjServer(s, &server{objectLocations: make(map[uint32][]uint32)}) - go func() { - if err := s.Serve(lis); err != nil { - log.Fatalf("Server exited with error: %v", err) - } - }() + cfg = config.GetConfig() // Load configuration + lis = bufconn.Listen(bufSize) + s := grpc.NewServer() + pb.RegisterGCSObjServer(s, NewGCSObjServer()) + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() } func bufDialer(context.Context, string) (net.Conn, error) { - return lis.Dial() + return lis.Dial() } +func TestGetNodeId(t *testing.T) { + // Seed the random number generator for reproducibility in tests + rand.Seed(1) + + // Initialize the GCSObjServer + server := &GCSObjServer{ + objectLocations: make(map[uint64][]uint64), + waitlist: make(map[uint64][]string), + mu: sync.Mutex{}, + } + + // Test case: UID exists with multiple NodeIds + server.objectLocations[1] = []uint64{100, 101, 102} + nodeId, exists := server.getNodeId(1) + if !exists { + t.Errorf("Expected UID 1 to exist") + } + if nodeId == nil || (*nodeId != 100 && *nodeId != 101 && *nodeId != 102) { + t.Errorf("Expected nodeId to be one of [100, 101, 102], got %v", nodeId) + } + + // Test case: UID exists with a single NodeId + server.objectLocations[2] = []uint64{200} + nodeId, exists = server.getNodeId(2) + if !exists { + t.Errorf("Expected UID 2 to exist") + } + if nodeId == nil || *nodeId != 200 { + t.Errorf("Expected nodeId to be 200, got %v", nodeId) + } + + // Test case: UID does not exist + nodeId, exists = server.getNodeId(3) + if exists { + t.Errorf("Expected UID 3 to not exist") + } + if nodeId != nil { + t.Errorf("Expected nodeId to be nil, got %v", nodeId) + } + + // Test case: UID exists but with an empty NodeId list + server.objectLocations[4] = []uint64{} + nodeId, exists = server.getNodeId(4) + if exists { + t.Errorf("Expected UID 4 to not exist due to empty NodeId list") + } + if nodeId != nil { + t.Errorf("Expected nodeId to be nil, got %v", nodeId) + } +} + +// Test the getNodeId function +func TestGetNodeId_2(t *testing.T) { + // Initialize the server + server := NewGCSObjServer() + + // Seed the random number generator to produce consistent results + rand.Seed(time.Now().UnixNano()) + + // Define test cases + testCases := []struct { + uid uint64 + nodeIds []uint64 + expectNil bool + expectExists bool + }{ + {uid: 1, nodeIds: []uint64{101, 102, 103}, expectNil: false, expectExists: true}, + {uid: 2, nodeIds: []uint64{}, expectNil: true, expectExists: false}, + {uid: 3, nodeIds: nil, expectNil: true, expectExists: false}, + } + + for _, tc := range testCases { + // Populate the objectLocations map + if tc.nodeIds != nil { + server.objectLocations[tc.uid] = tc.nodeIds + } + + // Call getNodeId + nodeId, exists := server.getNodeId(tc.uid) + + // Check if the result is nil or not as expected + if tc.expectNil && nodeId != nil { + t.Errorf("Expected nil, but got %v for uid %d", *nodeId, tc.uid) + } + + if !tc.expectNil && nodeId == nil { + t.Errorf("Expected non-nil, but got nil for uid %d", tc.uid) + } + + // Check if the existence flag is as expected + if exists != tc.expectExists { + t.Errorf("Expected exists to be %v, but got %v for uid %d", tc.expectExists, exists, tc.uid) + } + + // If nodeId is not nil, ensure it is one of the expected nodeIds + if nodeId != nil && !contains(tc.nodeIds, *nodeId) { + t.Errorf("NodeId %d is not in expected nodeIds %v for uid %d", *nodeId, tc.nodeIds, tc.uid) + } + } +} + +// Mock implementation of the LocalObjStoreServer +type mockLocalObjStoreServer struct { + pb.UnimplementedLocalObjStoreServer + callbackReceived chan struct{} +} + +func (m *mockLocalObjStoreServer) LocationFound(ctx context.Context, req *pb.LocationFoundCallback) (*pb.StatusResponse, error) { + m.callbackReceived <- struct{}{} + return &pb.StatusResponse{Success: true}, nil +} + +func NewMockLocalObjStoreServer(callbackBufferSize int) *mockLocalObjStoreServer { + return &mockLocalObjStoreServer{ + callbackReceived: make(chan struct{}, callbackBufferSize), + } +} + +// Does not check for a callback hit +func TestSendCallback_Dumb(t *testing.T) { + // Setup a mock listener + lis := bufconn.Listen(bufSize) + s := grpc.NewServer() + pb.RegisterLocalObjStoreServer(s, &mockLocalObjStoreServer{}) + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + + // Mock client address + clientAddress := "localhost:689" + + // Initialize the GCSObjServer + server := &GCSObjServer{} + + // Use bufDialer in grpc.Dial call + conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + // Run sendCallback as a goroutine + go server.sendCallback(clientAddress, 1, 100) + + // Allow some time for the goroutine to execute + time.Sleep(200 * time.Millisecond) + +} + +// Checks for a callback hit using go channel +func TestSendCallback_Hit(t *testing.T) { + // Create a context with a timeout to manage server and test lifecycle + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Prepare the network address + address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) + lis, err := net.Listen("tcp", address) + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + // Create and start the gRPC server + s := grpc.NewServer() + mock := NewMockLocalObjStoreServer(1) + pb.RegisterLocalObjStoreServer(s, mock) + log.Printf("server listening at %v", lis.Addr()) + + // Run the server in a goroutine + go func() { + if err := s.Serve(lis); err != nil && err != grpc.ErrServerStopped { + log.Fatalf("failed to serve: %v", err) + } + }() + + // Ensure the server is shut down cleanly at the end of the test + defer func() { + s.GracefulStop() + lis.Close() + }() + + // Allow some time for the server to start + time.Sleep(200 * time.Millisecond) + + // Mock client address - ephemeral outbound + host := "localhost" + clientPort := strconv.Itoa(cfg.Ports.LocalObjectStore) // Replace the ephemeral port with the official LocalObjectStore port + clientAddress := net.JoinHostPort(host, clientPort) + + // Initialize the GCSObjServer + server := &GCSObjServer{} + + // Use a WaitGroup to wait for the callback goroutine + var wg sync.WaitGroup + wg.Add(1) + + // Run sendCallback as a goroutine + go func() { + defer wg.Done() + server.sendCallback(clientAddress, 1, 100) + }() + + // Wait for the callback or timeout + select { + case <-mock.callbackReceived: + // Callback received, continue + case <-ctx.Done(): + t.Errorf("Timeout waiting for callback") + } + + // Wait for all goroutines to finish + wg.Wait() +} + +// func TestSendCallback_Hit(t *testing.T) { + +// address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) // Prepare the network address +// lis, err := net.Listen("tcp", address) +// if err != nil { +// log.Fatalf("failed to listen: %v", err) +// } +// s := grpc.NewServer() +// mock := NewMockLocalObjStoreServer(1) +// pb.RegisterLocalObjStoreServer(s, mock) +// log.Printf("server listening at %v", lis.Addr()) +// if err := s.Serve(lis); err != nil { +// log.Fatalf("failed to serve: %v", err) +// } + +// // Allow some time for the server to start +// time.Sleep(200 * time.Millisecond) + +// // Mock client address - ephemeral outbound +// clientAddress := "localhost:777" + +// // Initialize the GCSObjServer +// server := &GCSObjServer{} + +// // Run sendCallback as a goroutine +// go server.sendCallback(clientAddress, 1, 100) + +// // Catch it +// select { +// case <-mock.callbackReceived: +// // Callback received, continue +// case <-time.After(1 * time.Second): +// t.Errorf("Timeout waiting for callback") +// } +// } + func TestNotifyOwns(t *testing.T) { - ctx := context.Background() - conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) - if err != nil { - t.Fatalf("Failed to dial bufnet: %v", err) - } - defer conn.Close() - client := pb.NewGCSObjClient(conn) - - // Test NotifyOwns - resp, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{ - Uid: 1, - NodeId: 100, - }) - if err != nil || !resp.Success { - t.Errorf("NotifyOwns failed: %v, response: %v", err, resp) - } + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + client := pb.NewGCSObjClient(conn) + + // Testing NotifyOwns + resp, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{ + Uid: 1, + NodeId: 100, + }) + if err != nil || !resp.Success { + t.Errorf("NotifyOwns failed: %v, response: %v", err, resp) + } } +// We don't listen for a callback in this test func TestRequestLocation(t *testing.T) { - ctx := context.Background() - conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) - if err != nil { - t.Fatalf("Failed to dial bufnet: %v", err) - } - defer conn.Close() - client := pb.NewGCSObjClient(conn) - - // First, ensure the object is registered to test retrieval - _, err = client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{ - Uid: 1, - NodeId: 100, - }) - if err != nil { - t.Fatalf("Setup failure: could not register UID: %v", err) - } - - // Test RequestLocation - resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1}) - if err != nil || !resp.Success { - t.Errorf("RequestLocation failed: %v, response: %v", err, resp) - } - if resp.Details != "100" { - t.Errorf("RequestLocation returned incorrect node ID: got %s, want %s", resp.Details, "100") - } + clientAddress := "127.0.0.1:8080" + addr, err := net.ResolveTCPAddr("tcp", clientAddress) + if err != nil { + fmt.Printf("Error resolving address: %v\n", err) + return + } + p := &peer.Peer{ + Addr: addr, + } + + // Create a new context with the peer information + ctx := peer.NewContext(context.Background(), p) + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + client := pb.NewGCSObjClient(conn) + + // Ensure the object is registered to test retrieval + _, err = client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{ + Uid: 1, + NodeId: 100, + }) + if err != nil { + t.Fatalf("Setup failure: could not register UID: %v", err) + } + + // Test RequestLocation when the location should be found immediately + resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1}) + if err != nil { + t.Errorf("RequestLocation failed: %v", err) + return + } + if !resp.ImmediatelyFound { + t.Errorf("Expected to find location immediately, but it was not found") + } + + // Test RequestLocation for a UID that does not exist + resp, err = client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 2}) + if err != nil { + t.Errorf("RequestLocation failed: %v", err) + return + } + if resp.ImmediatelyFound { + t.Errorf("Expected location not to be found immediately, but it was found") + } +} + +// // MockGCSObjServer inherits GCSObjServer and overrides sendCallback +// type MockGCSObjServer struct { +// *GCSObjServer +// callbackReceived chan struct{} +// } + +// func NewMockGCSObjServer(callbackBufferSize int) *MockGCSObjServer { +// return &MockGCSObjServer{ +// GCSObjServer: NewGCSObjServer(), +// callbackReceived: make(chan struct{}, callbackBufferSize), +// } +// } + +// // sendCallback is the dummy method for testing +// func (s *MockGCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uint64) { +// //log.Printf("Mock sendCallback called with clientAddress: %s, uid: %d, nodeId: %d", clientAddress, uid, nodeId) +// s.callbackReceived <- struct{}{} +// } + +// var mockLis *bufconn.Listener + +// func mockBufDialer(context.Context, string) (net.Conn, error) { +// return mockLis.Dial() +// } + +// A test which simulates the following: +// - Two LocalObjStore nodes will call RequestLocation for a UID that initially doesn't have any node IDs associated with it. They will not be happy until they are notified of a change. +// - One LocalObjStore node will perform the NotifyOwns action after a short delay, adding a node ID to the UID, which should then notify the waiting nodes. +// - One GCSObjTable node will be tested, but its sendCallback will be simulated + +// Node A: a LocalObjStore requesting location +// Node B: a LocalObjStore requesting location +// Node C: a LocalObjStore notifying ownership +// Node D: a GCSObjTable being tested +// func TestRequestLocationNotifyOwnsHitsCallback(t *testing.T) { +// NUM_CALLBACKS_EXPECTED := 2 + +// // Initialize the server and listener +// mockLis = bufconn.Listen(bufSize) +// s := grpc.NewServer() +// mock := NewMockGCSObjServer(NUM_CALLBACKS_EXPECTED) // USE OF MOCK HERE +// pb.RegisterGCSObjServer(s, mock) +// go func() { +// if err := s.Serve(mockLis); err != nil { +// log.Fatalf("Server exited with error: %v", err) +// } +// }() + +// // Setup gRPC client +// ctx := context.Background() +// conn, err := grpc.DialContext(ctx, "junk", grpc.WithContextDialer(mockBufDialer), grpc.WithInsecure()) +// if err != nil { +// t.Fatalf("Failed to dial bufnet: %v", err) +// } +// defer conn.Close() +// client := pb.NewGCSObjClient(conn) + +// // Simulate Node A calling RequestLocation +// resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 555}) +// if err != nil { +// t.Errorf("RequestLocation failed: %v", err) +// return +// } +// if resp.ImmediatelyFound { +// t.Errorf("Expected location not to be found immediately, but it was found") +// } + +// // Simulate Node B calling RequestLocation +// resp, err = client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 555}) +// if err != nil { +// t.Errorf("RequestLocation failed: %v", err) +// return +// } +// if resp.ImmediatelyFound { +// t.Errorf("Expected location not to be found immediately, but it was found") +// } + +// // Simulate Node C calling NotifyOwns +// _, err = client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: 555, NodeId: 100}) +// if err != nil { +// t.Errorf("NotifyOwns failed: %v", err) +// } + +// // Wait for both callbacks to be received +// for i := 0; i < 2; i++ { +// select { +// case <-mock.callbackReceived: +// // Callback received, continue +// case <-time.After(1 * time.Second): +// t.Errorf("Timeout waiting for callback") +// } +// } +// } + +// ============== + +// Unit test in Go + +// Create a unit test in Go where three goroutines are involved, with the first two waiting for an object's location +// and the third notifying the server of the object's presence: +// - Two goroutines will call RequestLocation for a UID that initially doesn't have any node IDs associated with it. They will block until they are notified of a change. +// - One goroutine will perform the NotifyOwns action after a short delay, adding a node ID to the UID, which should then notify the waiting goroutines. +// func TestRequestLocationWithNotification(t *testing.T) { +// // Initialize the server and listener +// lis := bufconn.Listen(bufSize) +// s := grpc.NewServer() +// pb.RegisterGCSObjServer(s, MockNewGCSObjServer()) +// go func() { +// if err := s.Serve(lis); err != nil { +// log.Fatalf("Server exited with error: %v", err) +// } +// }() + +// // Setup gRPC client +// ctx := context.Background() +// conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) +// if err != nil { +// t.Fatalf("Failed to dial bufnet: %v", err) +// } +// defer conn.Close() +// client := pb.NewGCSObjClient(conn) + +// // Use a wait group to wait for both goroutines to complete +// var wg sync.WaitGroup +// wg.Add(2) + +// // A channel to listen for callback notifications +// callbackReceived := make(chan struct{}, 2) + +// // Mock the sendCallback to capture its invocation +// originalSendCallback := (&GCSObjServer{}).sendCallback +// mockSendCallback := func(clientAddress string, uid uint64, nodeId uint64) { +// originalSendCallback(clientAddress, uid, nodeId) +// callbackReceived <- struct{}{} +// } + +// server := NewGCSObjServer() +// server.sendCallback = mockSendCallback + +// // First goroutine calls RequestLocation +// go func() { +// defer wg.Done() +// resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1}) +// if err != nil { +// t.Errorf("RequestLocation failed: %v", err) +// return +// } +// if resp.ImmediatelyFound { +// t.Errorf("Expected location not to be found immediately, but it was found") +// } +// }() + +// // Second goroutine calls RequestLocation +// go func() { +// defer wg.Done() +// resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1}) +// if err != nil { +// t.Errorf("RequestLocation failed: %v", err) +// return +// } +// if resp.ImmediatelyFound { +// t.Errorf("Expected location not to be found immediately, but it was found") +// } +// }() + +// // Third goroutine performs NotifyOwns after a short delay +// go func() { +// time.Sleep(100 * time.Millisecond) +// _, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: 1, NodeId: 100}) +// if err != nil { +// t.Errorf("NotifyOwns failed: %v", err) +// } +// }() + +// // Wait for both callbacks to be received +// for i := 0; i < 2; i++ { +// select { +// case <-callbackReceived: +// // Callback received, continue +// case <-time.After(1 * time.Second): +// t.Errorf("Timeout waiting for callback") +// } +// } + +// // Wait for all goroutines to finish +// wg.Wait() +// } + +// type MockGCSObjServer struct { +// GCSObjServer +// } + +// func (s *MockGCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uint64) { +// return +// } + +// func MockNewGCSObjServer() *MockGCSObjServer { +// return NewGCSObjServer() +// } + +// Helper function to check if a slice contains a specific value +func contains(slice []uint64, value uint64) bool { + for _, v := range slice { + if v == value { + return true + } + } + return false } diff --git a/go/cmd/localobjstore/main.go b/go/cmd/localobjstore/main.go index a7acb38..ba34263 100644 --- a/go/cmd/localobjstore/main.go +++ b/go/cmd/localobjstore/main.go @@ -1,136 +1,135 @@ package main import ( - // "context" - "log" - "net" - "strconv" - "fmt" - context "context" - "errors" - "google.golang.org/grpc" - pb "github.com/rodrigo-castellon/babyray/pkg" - "github.com/rodrigo-castellon/babyray/config" - + // "context" + context "context" + "errors" + "fmt" + "log" + "net" + "strconv" + + "github.com/rodrigo-castellon/babyray/config" + pb "github.com/rodrigo-castellon/babyray/pkg" + "google.golang.org/grpc" ) + // var localObjectStore map[uint64][]byte // var localObjectChannels map[uint64]chan []byte // var gcsObjClient pb.GCSObjClient // var localNodeID uint64 var cfg *config.Config + func main() { - cfg = config.GetConfig() // Load configuration - address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) // Prepare the network address - if address == "" { - lis, err := net.Listen("tcp", address) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - _ = lis; - s := grpc.NewServer() - pb.RegisterLocalObjStoreServer(s, &server{}) - log.Printf("server listening at %v", lis.Addr()) - if err := s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) - } - } - - - // localObjectStore = make(map[uint64][]byte) - // localObjectChannels = make(map[uint64]chan []byte) - - // gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable) - // conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure()) - // gcsObjClient = pb.NewGCSObjClient(conn) - // localNodeID = 0 + cfg = config.GetConfig() // Load configuration + address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) // Prepare the network address + if address == "" { + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + _ = lis + s := grpc.NewServer() + pb.RegisterLocalObjStoreServer(s, &server{}) + log.Printf("server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + } + + // localObjectStore = make(map[uint64][]byte) + // localObjectChannels = make(map[uint64]chan []byte) + + // gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable) + // conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure()) + // gcsObjClient = pb.NewGCSObjClient(conn) + // localNodeID = 0 } // server is used to implement your gRPC service. type server struct { - pb.UnimplementedLocalObjStoreServer - localObjectStore map[uint64][]byte - localObjectChannels map[uint64]chan []byte - gcsObjClient pb.GCSObjClient - localNodeID uint64 + pb.UnimplementedLocalObjStoreServer + localObjectStore map[uint64][]byte + localObjectChannels map[uint64]chan []byte + gcsObjClient pb.GCSObjClient + localNodeID uint64 } -func (s* server) Init(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) { - s.localObjectStore = make(map[uint64][]byte) - s.localObjectChannels = make(map[uint64]chan []byte) - s.localNodeID = 1 - gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable) - conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure()) - s.gcsObjClient = pb.NewGCSObjClient(conn) +func (s *server) Init(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) { + s.localObjectStore = make(map[uint64][]byte) + s.localObjectChannels = make(map[uint64]chan []byte) + s.localNodeID = 1 + gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable) + conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure()) + s.gcsObjClient = pb.NewGCSObjClient(conn) + + return &pb.StatusResponse{Success: true}, nil - return &pb.StatusResponse{Success: true}, nil - } func (s *server) Store(ctx context.Context, req *pb.StoreRequest) (*pb.StatusResponse, error) { - s.localObjectStore[req.Uid] = req.ObjectBytes - - s.gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: req.Uid, NodeId: s.localNodeID}) - return &pb.StatusResponse{Success: true}, nil + s.localObjectStore[req.Uid] = req.ObjectBytes + + s.gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: req.Uid, NodeId: s.localNodeID}) + return &pb.StatusResponse{Success: true}, nil } func (s *server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) { - if val, ok := s.localObjectStore[req.Uid]; ok { - return &pb.GetResponse{Uid : req.Uid, ObjectBytes : val, Local: true}, nil - } - - s.localObjectChannels[req.Uid] = make(chan []byte) - if req.Testing == false { - s.gcsObjClient.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: req.Uid, Requester: s.localNodeID}) - } - - val := <- s.localObjectChannels[req.Uid] - s.localObjectStore[req.Uid] = val - return &pb.GetResponse{Uid : req.Uid, ObjectBytes : s.localObjectStore[req.Uid], Local: false}, nil + if val, ok := s.localObjectStore[req.Uid]; ok { + return &pb.GetResponse{Uid: req.Uid, ObjectBytes: val, Local: true}, nil + } + + s.localObjectChannels[req.Uid] = make(chan []byte) + if req.Testing == false { + s.gcsObjClient.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: req.Uid, Requester: s.localNodeID}) + } + + val := <-s.localObjectChannels[req.Uid] + s.localObjectStore[req.Uid] = val + return &pb.GetResponse{Uid: req.Uid, ObjectBytes: s.localObjectStore[req.Uid], Local: false}, nil } -func (s* server) LocationFound(ctx context.Context, resp *pb.LocationFoundResponse) (*pb.StatusResponse, error) { - var otherLocalAddress string - - if resp.Port == 0 { - nodeID := resp.Location; - otherLocalAddress = fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, nodeID, cfg.Ports.LocalObjectStore) - } else { - otherLocalAddress = fmt.Sprintf("%s:%d", resp.Address, resp.Port) - } - - conn, err := grpc.Dial(otherLocalAddress, grpc.WithInsecure()) - - if err != nil { - return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to dial other LOS @:%s ", otherLocalAddress)) - } - - c := pb.NewLocalObjStoreClient(conn) - - x, err := c.Copy(ctx, &pb.CopyRequest{Uid : resp.Uid, Requester : s.localNodeID}) - - if x == nil || err != nil { - return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to copy from other LOS @:%s ", otherLocalAddress)) - } - // if resp.Port == "" { - - // gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: resp.Uid, NodeId: localNodeID}) - // } - - channel, ok := s.localObjectChannels[resp.Uid] - if !ok { - return &pb.StatusResponse{Success: false}, errors.New("channel DNE") - } - channel <- x.ObjectBytes - - return &pb.StatusResponse{Success: true}, nil +func (s *server) LocationFound(ctx context.Context, resp *pb.LocationFoundCallback) (*pb.StatusResponse, error) { + var otherLocalAddress string -} + if resp.Port == 0 { + nodeID := resp.Location + otherLocalAddress = fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, nodeID, cfg.Ports.LocalObjectStore) + } else { + otherLocalAddress = fmt.Sprintf("%s:%d", resp.Address, resp.Port) + } -func (s* server) Copy(ctx context.Context, req *pb.CopyRequest) (*pb.CopyResponse, error) { - data, ok:= s.localObjectStore[req.Uid]; - if !ok { - return &pb.CopyResponse{Uid: req.Uid, ObjectBytes : nil}, errors.New("object was not in LOS") - } - return &pb.CopyResponse{Uid : req.Uid, ObjectBytes : data}, nil -} + conn, err := grpc.Dial(otherLocalAddress, grpc.WithInsecure()) + + if err != nil { + return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to dial other LOS @:%s ", otherLocalAddress)) + } + c := pb.NewLocalObjStoreClient(conn) + x, err := c.Copy(ctx, &pb.CopyRequest{Uid: resp.Uid, Requester: s.localNodeID}) + + if x == nil || err != nil { + return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to copy from other LOS @:%s ", otherLocalAddress)) + } + // if resp.Port == "" { + + // gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: resp.Uid, NodeId: localNodeID}) + // } + + channel, ok := s.localObjectChannels[resp.Uid] + if !ok { + return &pb.StatusResponse{Success: false}, errors.New("channel DNE") + } + channel <- x.ObjectBytes + + return &pb.StatusResponse{Success: true}, nil + +} + +func (s *server) Copy(ctx context.Context, req *pb.CopyRequest) (*pb.CopyResponse, error) { + data, ok := s.localObjectStore[req.Uid] + if !ok { + return &pb.CopyResponse{Uid: req.Uid, ObjectBytes: nil}, errors.New("object was not in LOS") + } + return &pb.CopyResponse{Uid: req.Uid, ObjectBytes: data}, nil +} diff --git a/go/cmd/localobjstore/main_test.go b/go/cmd/localobjstore/main_test.go index b293047..808f97f 100644 --- a/go/cmd/localobjstore/main_test.go +++ b/go/cmd/localobjstore/main_test.go @@ -1,15 +1,15 @@ package main import ( - "context" - "net" - "testing" "bytes" - "time" - "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" - pb "github.com/rodrigo-castellon/babyray/pkg" - + "context" + "net" + "testing" + "time" + + pb "github.com/rodrigo-castellon/babyray/pkg" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" ) const bufSize = 1024 * 1024 @@ -17,92 +17,93 @@ const bufSize = 1024 * 1024 var lis *bufconn.Listener func init() { - //startServer("50051") - main() - // lis = bufconn.Listen(bufSize) - // s := grpc.NewServer() - // pb.RegisterLocalObjStoreServer(s, &server{}) - // go func() { - // if err := s.Serve(lis); err != nil { - // log.Fatalf("Server exited with error: %v", err) - // } - // }() + //startServer("50051") + main() + // lis = bufconn.Listen(bufSize) + // s := grpc.NewServer() + // pb.RegisterLocalObjStoreServer(s, &server{}) + // go func() { + // if err := s.Serve(lis); err != nil { + // log.Fatalf("Server exited with error: %v", err) + // } + // }() } func startServer(port string) (*grpc.Server, error) { - lis, err := net.Listen("tcp", port) - if err != nil { - return nil, err - } - s := grpc.NewServer() - pb.RegisterLocalObjStoreServer(s, &server{}) - go func() { - s.Serve(lis) - }() - return s, nil + lis, err := net.Listen("tcp", port) + if err != nil { + return nil, err + } + s := grpc.NewServer() + pb.RegisterLocalObjStoreServer(s, &server{}) + go func() { + s.Serve(lis) + }() + return s, nil } func bufDialer(context.Context, string) (net.Conn, error) { - return lis.Dial() + return lis.Dial() } + type mockGCSClient struct { - pb.GCSObjClient - statusResp *pb.StatusResponse - resp *pb.RequestLocationResponse - err error + pb.GCSObjClient + statusResp *pb.StatusResponse + resp *pb.RequestLocationResponse + err error } -func(m *mockGCSClient) RequestLocation(ctx context.Context, in *pb.RequestLocationRequest, opts ...grpc.CallOption) (*pb.RequestLocationResponse, error) { - return m.resp, m.err +func (m *mockGCSClient) RequestLocation(ctx context.Context, in *pb.RequestLocationRequest, opts ...grpc.CallOption) (*pb.RequestLocationResponse, error) { + return m.resp, m.err } type mockStoreClient struct { - pb.LocalObjStoreClient // Embedding the interface for forward compatibility - statusResp *pb.StatusResponse - resp *pb.GetResponse - err error + pb.LocalObjStoreClient // Embedding the interface for forward compatibility + statusResp *pb.StatusResponse + resp *pb.GetResponse + err error } func (m *mockStoreClient) Store(ctx context.Context, in *pb.StoreRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) { - return m.statusResp, m.err + return m.statusResp, m.err } func (m *mockStoreClient) Get(ctx context.Context, in *pb.GetRequest, opts ...grpc.CallOption) (*pb.GetResponse, error) { - return m.resp, m.err + return m.resp, m.err } func TestStoreAndGet_Local(t *testing.T) { - ctx := context.Background() - // conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) - - // if err != nil { - // t.Fatalf("Failed to dial bufnet: %v", err) - // } - server, err := startServer(":50051") - defer server.Stop() - if err != nil { - t.Fatalf("failed to start server: %v", err) - } - conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithInsecure()) - defer conn.Close() + ctx := context.Background() + // conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + + // if err != nil { + // t.Fatalf("Failed to dial bufnet: %v", err) + // } + server, err := startServer(":50051") + defer server.Stop() + if err != nil { + t.Fatalf("failed to start server: %v", err) + } + conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithInsecure()) + defer conn.Close() data := []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100} - client := pb.NewLocalObjStoreClient(conn) - client.Init(ctx, &pb.StatusResponse{}) - - // Test Store - resp, err := client.Store(ctx, &pb.StoreRequest{ - Uid: 1, - ObjectBytes: data, - }) - if err != nil || !resp.Success { - t.Errorf("Store failed: %v, response: %v", err, resp) - } + client := pb.NewLocalObjStoreClient(conn) + client.Init(ctx, &pb.StatusResponse{}) + + // Test Store + resp, err := client.Store(ctx, &pb.StoreRequest{ + Uid: 1, + ObjectBytes: data, + }) + if err != nil || !resp.Success { + t.Errorf("Store failed: %v, response: %v", err, resp) + } // Test Get resp2, err2 := client.Get(ctx, &pb.GetRequest{ - Uid: 1, - Testing: true, + Uid: 1, + Testing: true, }) if err2 != nil || !bytes.Equal(data, resp2.ObjectBytes) { t.Errorf("Get failed: %v, response: %v", err2, resp2) @@ -111,90 +112,87 @@ func TestStoreAndGet_Local(t *testing.T) { } func TestStoreAndGet_External(t *testing.T) { ctx := context.Background() - s1, err := startServer(":50051") - if err != nil { - t.Fatalf("Failed to start server 1: %v", err) - } - defer s1.Stop() - - s2, err := startServer(":50052") - if err != nil { - t.Fatalf("Failed to start server 2: %v", err) - } - defer s2.Stop() - - - if err != nil { - t.Fatalf("Failed to dial server 1: %v", err) - } - conn1, err1 := grpc.DialContext(ctx, "localhost:50051", grpc.WithInsecure()) - if err1 != nil { - t.Fatalf("Failed to dial LOC1") - - } - conn2, err2 := grpc.DialContext(ctx, "localhost:50052", grpc.WithInsecure()) - if err2 != nil { - t.Fatalf("failed to dial LOC2") - } - defer conn1.Close() - defer conn2.Close() - client1 := pb.NewLocalObjStoreClient(conn1) - client2 := pb.NewLocalObjStoreClient(conn2) - client1.Init(ctx, &pb.StatusResponse{}) - client2.Init(ctx, &pb.StatusResponse{}) + s1, err := startServer(":50051") + if err != nil { + t.Fatalf("Failed to start server 1: %v", err) + } + defer s1.Stop() + + s2, err := startServer(":50052") + if err != nil { + t.Fatalf("Failed to start server 2: %v", err) + } + defer s2.Stop() + + if err != nil { + t.Fatalf("Failed to dial server 1: %v", err) + } + conn1, err1 := grpc.DialContext(ctx, "localhost:50051", grpc.WithInsecure()) + if err1 != nil { + t.Fatalf("Failed to dial LOC1") + + } + conn2, err2 := grpc.DialContext(ctx, "localhost:50052", grpc.WithInsecure()) + if err2 != nil { + t.Fatalf("failed to dial LOC2") + } + defer conn1.Close() + defer conn2.Close() + client1 := pb.NewLocalObjStoreClient(conn1) + client2 := pb.NewLocalObjStoreClient(conn2) + client1.Init(ctx, &pb.StatusResponse{}) + client2.Init(ctx, &pb.StatusResponse{}) data := []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100} - - // mockStore := &mockStoreClient{ - // statusResp: &pb.StatusResponse{}, - // resp: &pb.GetResponse{Uid: 1, ObjectBytes: data}, - // err: nil, - // } - - // mockGCSClient := &mockGCSClient{ - // statusResp: &pb.StatusResponse{}, - // resp: &pb.RequestLocationResponse{NodeId: 2}, - // err: nil - // } - - sresp, err := client2.Store(ctx, &pb.StoreRequest{Uid: 3, ObjectBytes: data}) - if !sresp.Success || err != nil { - t.Errorf("Failed to store value on LOS 2") - } - response2, err := client2.Get(ctx, &pb.GetRequest{Uid: 3, Testing: true}) - - if err != nil || !bytes.Equal(response2.ObjectBytes, data) { - t.Errorf("Failed to get value on LOS 2") - } - - - go func(){ - timeout, cancel := context.WithTimeout(context.Background(), 5 * time.Second) - defer cancel() - response1, err := client1.Get(timeout, &pb.GetRequest{Uid: 3, Testing: true}) - if response1.Local { - t.Errorf("found on local") - } - if err != nil || !bytes.Equal(response1.ObjectBytes, data) { - if timeout.Err() == context.DeadlineExceeded { - t.Errorf("Timeout on client1 get call") - } - t.Errorf("Failed to get value on LOS 1") - } - - }() - - - time.Sleep(1 * time.Second) - timeout, cancel := context.WithTimeout(context.Background(), 5 * time.Second) - defer cancel() - locStatusResp, err := client1.LocationFound(timeout, &pb.LocationFoundResponse{Uid: 3, Address: "localhost", Port: 50052}) - if err != nil || locStatusResp == nil || !locStatusResp.Success { - if timeout.Err() == context.DeadlineExceeded { - t.Errorf("Timeout on client1 location call") - } - t.Errorf("Failed to tell LOS 1 about location: %v", err) - } - + + // mockStore := &mockStoreClient{ + // statusResp: &pb.StatusResponse{}, + // resp: &pb.GetResponse{Uid: 1, ObjectBytes: data}, + // err: nil, + // } + + // mockGCSClient := &mockGCSClient{ + // statusResp: &pb.StatusResponse{}, + // resp: &pb.RequestLocationResponse{NodeId: 2}, + // err: nil + // } + + sresp, err := client2.Store(ctx, &pb.StoreRequest{Uid: 3, ObjectBytes: data}) + if !sresp.Success || err != nil { + t.Errorf("Failed to store value on LOS 2") + } + response2, err := client2.Get(ctx, &pb.GetRequest{Uid: 3, Testing: true}) + + if err != nil || !bytes.Equal(response2.ObjectBytes, data) { + t.Errorf("Failed to get value on LOS 2") + } + + go func() { + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + response1, err := client1.Get(timeout, &pb.GetRequest{Uid: 3, Testing: true}) + if response1.Local { + t.Errorf("found on local") + } + if err != nil || !bytes.Equal(response1.ObjectBytes, data) { + if timeout.Err() == context.DeadlineExceeded { + t.Errorf("Timeout on client1 get call") + } + t.Errorf("Failed to get value on LOS 1") + } + + }() + + time.Sleep(1 * time.Second) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + locStatusResp, err := client1.LocationFound(timeout, &pb.LocationFoundCallback{Uid: 3, Address: "localhost", Port: 50052}) + if err != nil || locStatusResp == nil || !locStatusResp.Success { + if timeout.Err() == context.DeadlineExceeded { + t.Errorf("Timeout on client1 location call") + } + t.Errorf("Failed to tell LOS 1 about location: %v", err) + } + } // func TestLocationFound(t *testing.T) { diff --git a/go/cmd/localscheduler/main.go b/go/cmd/localscheduler/main.go index c1932d1..3098ca6 100644 --- a/go/cmd/localscheduler/main.go +++ b/go/cmd/localscheduler/main.go @@ -1,73 +1,72 @@ package main import ( - "context" - "log" - "net" - "strconv" - "math/rand" - "fmt" - "google.golang.org/grpc" - pb "github.com/rodrigo-castellon/babyray/pkg" - "github.com/rodrigo-castellon/babyray/config" -) + "context" + "fmt" + "log" + "math/rand" + "net" + "strconv" + "github.com/rodrigo-castellon/babyray/config" + pb "github.com/rodrigo-castellon/babyray/pkg" + "google.golang.org/grpc" +) var globalSchedulerClient pb.GlobalSchedulerClient var localNodeID uint64 var cfg *config.Config -func main() { - cfg = config.GetConfig() // Load configuration - address := ":" + strconv.Itoa(cfg.Ports.LocalScheduler) // Prepare the network address - lis, err := net.Listen("tcp", address) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - _ = lis; - s := grpc.NewServer() - pb.RegisterLocalSchedulerServer(s, &server{}) - log.Printf("server listening at %v", lis.Addr()) - if err := s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) - } - globalSchedulerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GlobalScheduler, cfg.Ports.GlobalScheduler) - conn, _ := grpc.Dial(globalSchedulerAddress, grpc.WithInsecure()) - globalSchedulerClient = pb.NewGlobalSchedulerClient(conn) - localNodeID = 0 +func main() { + cfg = config.GetConfig() // Load configuration + address := ":" + strconv.Itoa(cfg.Ports.LocalScheduler) // Prepare the network address + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + _ = lis + s := grpc.NewServer() + pb.RegisterLocalSchedulerServer(s, &server{}) + log.Printf("server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + globalSchedulerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GlobalScheduler, cfg.Ports.GlobalScheduler) + conn, _ := grpc.Dial(globalSchedulerAddress, grpc.WithInsecure()) + globalSchedulerClient = pb.NewGlobalSchedulerClient(conn) + localNodeID = 0 } // server is used to implement your gRPC service. type server struct { - pb.UnimplementedLocalSchedulerServer + pb.UnimplementedLocalSchedulerServer } // Implement your service methods here. - func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error) { - var worker_id int - // worker_id = check_resources() - worker_id = -1 - uid := uint64(rand.Intn(100)) - if worker_id != -1 { - workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.Ourself, cfg.Ports.LocalWorkerStart + worker_id) - conn, _ := grpc.Dial(workerAddress, grpc.WithInsecure()) - workerClient := pb.NewWorkerClient(conn) - _, err := workerClient.Run(ctx, &pb.RunRequest{Uid: uid, Args: req.Args, Kwargs: req.Kwargs}) - if err != nil { - log.Printf("cannot contact worker %d", worker_id) - } - } else { - - _, err := globalSchedulerClient.Schedule(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs}) - if err != nil { - log.Printf("cannot contact global scheduler") - } + var worker_id int + // worker_id = check_resources() + worker_id = -1 + uid := uint64(rand.Intn(100)) + if worker_id != -1 { + workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.Ourself, cfg.Ports.LocalWorkerStart+worker_id) + conn, _ := grpc.Dial(workerAddress, grpc.WithInsecure()) + workerClient := pb.NewWorkerClient(conn) + _, err := workerClient.Run(ctx, &pb.RunRequest{Uid: uid, Args: req.Args, Kwargs: req.Kwargs}) + if err != nil { + log.Printf("cannot contact worker %d", worker_id) + } + } else { + + _, err := globalSchedulerClient.Schedule(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs}) + if err != nil { + log.Printf("cannot contact global scheduler") + } - } - return &pb.ScheduleResponse{Uid: uid}, nil + } + return &pb.ScheduleResponse{Uid: uid}, nil } diff --git a/go/go.mod b/go/go.mod index 9f8c2ca..fa65a1a 100644 --- a/go/go.mod +++ b/go/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/golang/protobuf v1.5.4 // indirect + github.com/stretchr/testify v1.9.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go/go.sum b/go/go.sum index e835cfd..1837c58 100644 --- a/go/go.sum +++ b/go/go.sum @@ -2,6 +2,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= diff --git a/go/pkg/rayclient.pb.go b/go/pkg/rayclient.pb.go index 3911257..05f95de 100644 --- a/go/pkg/rayclient.pb.go +++ b/go/pkg/rayclient.pb.go @@ -99,7 +99,7 @@ type GlobalScheduleRequest struct { unknownFields protoimpl.UnknownFields Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Name uint64 `protobuf:"varint,2,opt,name=name,proto3" json:"name,omitempty"` Args []byte `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"` Kwargs []byte `protobuf:"bytes,4,opt,name=kwargs,proto3" json:"kwargs,omitempty"` } @@ -143,11 +143,11 @@ func (x *GlobalScheduleRequest) GetUid() uint64 { return 0 } -func (x *GlobalScheduleRequest) GetName() string { +func (x *GlobalScheduleRequest) GetName() uint64 { if x != nil { return x.Name } - return "" + return 0 } func (x *GlobalScheduleRequest) GetArgs() []byte { @@ -449,19 +449,19 @@ func (x *GetResponse) GetLocal() bool { return false } -type LocationFoundResponse struct { +type LocationFoundCallback struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` - Location uint64 `protobuf:"varint,2,opt,name=location,proto3" json:"location,omitempty"` - Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"` //For testing - Port uint64 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` //For testing + Uid uint64 `protobuf:"varint,1,opt,name=uid,proto3" json:"uid,omitempty"` // this is the uid of the object that was originally asked for + Location uint64 `protobuf:"varint,2,opt,name=location,proto3" json:"location,omitempty"` // this is the node number of the node who has this object + Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"` //For testing + Port uint64 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` //For testing } -func (x *LocationFoundResponse) Reset() { - *x = LocationFoundResponse{} +func (x *LocationFoundCallback) Reset() { + *x = LocationFoundCallback{} if protoimpl.UnsafeEnabled { mi := &file_rayclient_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -469,13 +469,13 @@ func (x *LocationFoundResponse) Reset() { } } -func (x *LocationFoundResponse) String() string { +func (x *LocationFoundCallback) String() string { return protoimpl.X.MessageStringOf(x) } -func (*LocationFoundResponse) ProtoMessage() {} +func (*LocationFoundCallback) ProtoMessage() {} -func (x *LocationFoundResponse) ProtoReflect() protoreflect.Message { +func (x *LocationFoundCallback) ProtoReflect() protoreflect.Message { mi := &file_rayclient_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -487,33 +487,33 @@ func (x *LocationFoundResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use LocationFoundResponse.ProtoReflect.Descriptor instead. -func (*LocationFoundResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use LocationFoundCallback.ProtoReflect.Descriptor instead. +func (*LocationFoundCallback) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{7} } -func (x *LocationFoundResponse) GetUid() uint64 { +func (x *LocationFoundCallback) GetUid() uint64 { if x != nil { return x.Uid } return 0 } -func (x *LocationFoundResponse) GetLocation() uint64 { +func (x *LocationFoundCallback) GetLocation() uint64 { if x != nil { return x.Location } return 0 } -func (x *LocationFoundResponse) GetAddress() string { +func (x *LocationFoundCallback) GetAddress() string { if x != nil { return x.Address } return "" } -func (x *LocationFoundResponse) GetPort() uint64 { +func (x *LocationFoundCallback) GetPort() uint64 { if x != nil { return x.Port } @@ -816,7 +816,7 @@ type RequestLocationResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - NodeId uint64 `protobuf:"varint,1,opt,name=nodeId,proto3" json:"nodeId,omitempty"` + ImmediatelyFound bool `protobuf:"varint,1,opt,name=immediatelyFound,proto3" json:"immediatelyFound,omitempty"` } func (x *RequestLocationResponse) Reset() { @@ -851,11 +851,11 @@ func (*RequestLocationResponse) Descriptor() ([]byte, []int) { return file_rayclient_proto_rawDescGZIP(), []int{13} } -func (x *RequestLocationResponse) GetNodeId() uint64 { +func (x *RequestLocationResponse) GetImmediatelyFound() bool { if x != nil { - return x.NodeId + return x.ImmediatelyFound } - return 0 + return false } type RegisterRequest struct { @@ -957,7 +957,8 @@ type FetchRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name uint64 `protobuf:"varint,1,opt,name=name,proto3" json:"name,omitempty"` + Name uint64 `protobuf:"varint,1,opt,name=name,proto3" json:"name,omitempty"` + CallbackNodeId uint64 `protobuf:"varint,2,opt,name=callbackNodeId,proto3" json:"callbackNodeId,omitempty"` } func (x *FetchRequest) Reset() { @@ -999,6 +1000,13 @@ func (x *FetchRequest) GetName() uint64 { return 0 } +func (x *FetchRequest) GetCallbackNodeId() uint64 { + if x != nil { + return x.CallbackNodeId + } + return 0 +} + type FetchResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1064,7 +1072,7 @@ var file_rayclient_proto_rawDesc = []byte{ 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, + 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6b, 0x77, 0x61, 0x72, 0x67, 0x73, 0x22, 0x51, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x64, @@ -1089,7 +1097,7 @@ var file_rayclient_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x22, 0x73, 0x0a, 0x15, 0x4c, 0x6f, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, @@ -1118,73 +1126,77 @@ var file_rayclient_proto_rawDesc = []byte{ 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x22, 0x31, 0x0a, 0x17, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x22, 0x45, 0x0a, 0x17, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x39, 0x0a, 0x0f, 0x52, - 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, - 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, - 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, 0x22, 0x26, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, - 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x22, - 0x0a, 0x0c, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, + 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, + 0x6c, 0x79, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x69, + 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x6c, 0x79, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x22, + 0x39, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, + 0x46, 0x75, 0x6e, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x73, 0x65, 0x72, 0x69, + 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, 0x22, 0x26, 0x0a, 0x10, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x22, 0x37, 0x0a, 0x0d, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, - 0x64, 0x46, 0x75, 0x6e, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x73, 0x65, 0x72, - 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, 0x32, 0x4e, 0x0a, 0x0f, 0x47, - 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x12, 0x3b, - 0x0a, 0x08, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x72, 0x61, 0x79, - 0x2e, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x49, 0x0a, 0x0e, 0x4c, - 0x6f, 0x63, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x12, 0x37, 0x0a, - 0x08, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x14, 0x2e, 0x72, 0x61, 0x79, 0x2e, - 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x15, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8b, 0x02, 0x0a, 0x0d, 0x4c, 0x6f, 0x63, 0x61, 0x6c, - 0x4f, 0x62, 0x6a, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x2f, 0x0a, 0x05, 0x53, 0x74, 0x6f, 0x72, - 0x65, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x03, 0x47, 0x65, 0x74, - 0x12, 0x0f, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x10, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x0d, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, - 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x1a, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x6d, 0x65, 0x22, 0x4a, 0x0a, 0x0c, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x26, 0x0a, 0x0e, 0x63, 0x61, 0x6c, 0x6c, 0x62, 0x61, + 0x63, 0x6b, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, + 0x63, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x37, + 0x0a, 0x0d, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x26, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, + 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, + 0x7a, 0x65, 0x64, 0x46, 0x75, 0x6e, 0x63, 0x32, 0x4e, 0x0a, 0x0f, 0x47, 0x6c, 0x6f, 0x62, 0x61, + 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x47, 0x6c, 0x6f, + 0x62, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x49, 0x0a, 0x0e, 0x4c, 0x6f, 0x63, 0x61, 0x6c, + 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x12, 0x37, 0x0a, 0x08, 0x53, 0x63, 0x68, + 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x14, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x63, 0x68, 0x65, + 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x72, 0x61, + 0x79, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x32, 0x8b, 0x02, 0x0a, 0x0d, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x4f, 0x62, 0x6a, 0x53, + 0x74, 0x6f, 0x72, 0x65, 0x12, 0x2f, 0x0a, 0x05, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x11, 0x2e, + 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, 0x04, 0x43, 0x6f, 0x70, 0x79, 0x12, 0x10, 0x2e, - 0x72, 0x61, 0x79, 0x2e, 0x43, 0x6f, 0x70, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x11, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x43, 0x6f, 0x70, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x30, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x72, 0x61, 0x79, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, - 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x35, 0x0a, 0x06, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x2b, - 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x0f, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x75, 0x6e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x91, 0x01, 0x0a, 0x06, - 0x47, 0x43, 0x53, 0x4f, 0x62, 0x6a, 0x12, 0x39, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, - 0x4f, 0x77, 0x6e, 0x73, 0x12, 0x16, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, - 0x79, 0x4f, 0x77, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x0f, 0x2e, 0x72, + 0x61, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, + 0x72, 0x61, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x40, 0x0a, 0x0d, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, + 0x12, 0x1a, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, + 0x6f, 0x75, 0x6e, 0x64, 0x43, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x4c, 0x0a, 0x0f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1c, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, - 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, - 0x7a, 0x0a, 0x07, 0x47, 0x43, 0x53, 0x46, 0x75, 0x6e, 0x63, 0x12, 0x3b, 0x0a, 0x0c, 0x52, 0x65, - 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x46, 0x75, 0x6e, 0x63, 0x12, 0x14, 0x2e, 0x72, 0x61, 0x79, - 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x15, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x09, 0x46, 0x65, 0x74, 0x63, 0x68, - 0x46, 0x75, 0x6e, 0x63, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x46, 0x65, - 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x6f, 0x64, 0x72, 0x69, 0x67, - 0x6f, 0x2d, 0x63, 0x61, 0x73, 0x74, 0x65, 0x6c, 0x6c, 0x6f, 0x6e, 0x2f, 0x62, 0x61, 0x62, 0x79, - 0x72, 0x61, 0x79, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x67, 0x72, 0x70, - 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x12, 0x2b, 0x0a, 0x04, 0x43, 0x6f, 0x70, 0x79, 0x12, 0x10, 0x2e, 0x72, 0x61, 0x79, 0x2e, + 0x43, 0x6f, 0x70, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x72, 0x61, + 0x79, 0x2e, 0x43, 0x6f, 0x70, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, + 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x13, 0x2e, 0x72, 0x61, + 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x32, 0x35, 0x0a, 0x06, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x2b, 0x0a, 0x03, 0x52, 0x75, + 0x6e, 0x12, 0x0f, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x91, 0x01, 0x0a, 0x06, 0x47, 0x43, 0x53, 0x4f, + 0x62, 0x6a, 0x12, 0x39, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4f, 0x77, 0x6e, 0x73, + 0x12, 0x16, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4f, 0x77, 0x6e, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4c, 0x0a, + 0x0f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1b, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, + 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x6f, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x7a, 0x0a, 0x07, 0x47, + 0x43, 0x53, 0x46, 0x75, 0x6e, 0x63, 0x12, 0x3b, 0x0a, 0x0c, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x46, 0x75, 0x6e, 0x63, 0x12, 0x14, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x72, + 0x61, 0x79, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x09, 0x46, 0x65, 0x74, 0x63, 0x68, 0x46, 0x75, 0x6e, 0x63, + 0x12, 0x11, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x72, 0x61, 0x79, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x6f, 0x64, 0x72, 0x69, 0x67, 0x6f, 0x2d, 0x63, 0x61, + 0x73, 0x74, 0x65, 0x6c, 0x6c, 0x6f, 0x6e, 0x2f, 0x62, 0x61, 0x62, 0x79, 0x72, 0x61, 0x79, 0x2f, + 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x3b, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1208,7 +1220,7 @@ var file_rayclient_proto_goTypes = []interface{}{ (*StoreRequest)(nil), // 4: ray.StoreRequest (*GetRequest)(nil), // 5: ray.GetRequest (*GetResponse)(nil), // 6: ray.GetResponse - (*LocationFoundResponse)(nil), // 7: ray.LocationFoundResponse + (*LocationFoundCallback)(nil), // 7: ray.LocationFoundCallback (*CopyRequest)(nil), // 8: ray.CopyRequest (*CopyResponse)(nil), // 9: ray.CopyResponse (*RunRequest)(nil), // 10: ray.RunRequest @@ -1225,7 +1237,7 @@ var file_rayclient_proto_depIdxs = []int32{ 2, // 1: ray.LocalScheduler.Schedule:input_type -> ray.ScheduleRequest 4, // 2: ray.LocalObjStore.Store:input_type -> ray.StoreRequest 5, // 3: ray.LocalObjStore.Get:input_type -> ray.GetRequest - 7, // 4: ray.LocalObjStore.LocationFound:input_type -> ray.LocationFoundResponse + 7, // 4: ray.LocalObjStore.LocationFound:input_type -> ray.LocationFoundCallback 8, // 5: ray.LocalObjStore.Copy:input_type -> ray.CopyRequest 0, // 6: ray.LocalObjStore.Init:input_type -> ray.StatusResponse 10, // 7: ray.Worker.Run:input_type -> ray.RunRequest @@ -1343,7 +1355,7 @@ func file_rayclient_proto_init() { } } file_rayclient_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*LocationFoundResponse); i { + switch v := v.(*LocationFoundCallback); i { case 0: return &v.state case 1: diff --git a/go/pkg/rayclient_grpc.pb.go b/go/pkg/rayclient_grpc.pb.go index caf9b1a..a6a4fb8 100644 --- a/go/pkg/rayclient_grpc.pb.go +++ b/go/pkg/rayclient_grpc.pb.go @@ -212,7 +212,7 @@ const ( type LocalObjStoreClient interface { Store(ctx context.Context, in *StoreRequest, opts ...grpc.CallOption) (*StatusResponse, error) Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) - LocationFound(ctx context.Context, in *LocationFoundResponse, opts ...grpc.CallOption) (*StatusResponse, error) + LocationFound(ctx context.Context, in *LocationFoundCallback, opts ...grpc.CallOption) (*StatusResponse, error) Copy(ctx context.Context, in *CopyRequest, opts ...grpc.CallOption) (*CopyResponse, error) Init(ctx context.Context, in *StatusResponse, opts ...grpc.CallOption) (*StatusResponse, error) } @@ -243,7 +243,7 @@ func (c *localObjStoreClient) Get(ctx context.Context, in *GetRequest, opts ...g return out, nil } -func (c *localObjStoreClient) LocationFound(ctx context.Context, in *LocationFoundResponse, opts ...grpc.CallOption) (*StatusResponse, error) { +func (c *localObjStoreClient) LocationFound(ctx context.Context, in *LocationFoundCallback, opts ...grpc.CallOption) (*StatusResponse, error) { out := new(StatusResponse) err := c.cc.Invoke(ctx, LocalObjStore_LocationFound_FullMethodName, in, out, opts...) if err != nil { @@ -276,7 +276,7 @@ func (c *localObjStoreClient) Init(ctx context.Context, in *StatusResponse, opts type LocalObjStoreServer interface { Store(context.Context, *StoreRequest) (*StatusResponse, error) Get(context.Context, *GetRequest) (*GetResponse, error) - LocationFound(context.Context, *LocationFoundResponse) (*StatusResponse, error) + LocationFound(context.Context, *LocationFoundCallback) (*StatusResponse, error) Copy(context.Context, *CopyRequest) (*CopyResponse, error) Init(context.Context, *StatusResponse) (*StatusResponse, error) mustEmbedUnimplementedLocalObjStoreServer() @@ -292,7 +292,7 @@ func (UnimplementedLocalObjStoreServer) Store(context.Context, *StoreRequest) (* func (UnimplementedLocalObjStoreServer) Get(context.Context, *GetRequest) (*GetResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") } -func (UnimplementedLocalObjStoreServer) LocationFound(context.Context, *LocationFoundResponse) (*StatusResponse, error) { +func (UnimplementedLocalObjStoreServer) LocationFound(context.Context, *LocationFoundCallback) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method LocationFound not implemented") } func (UnimplementedLocalObjStoreServer) Copy(context.Context, *CopyRequest) (*CopyResponse, error) { @@ -351,7 +351,7 @@ func _LocalObjStore_Get_Handler(srv interface{}, ctx context.Context, dec func(i } func _LocalObjStore_LocationFound_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(LocationFoundResponse) + in := new(LocationFoundCallback) if err := dec(in); err != nil { return nil, err } @@ -363,7 +363,7 @@ func _LocalObjStore_LocationFound_Handler(srv interface{}, ctx context.Context, FullMethod: LocalObjStore_LocationFound_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(LocalObjStoreServer).LocationFound(ctx, req.(*LocationFoundResponse)) + return srv.(LocalObjStoreServer).LocationFound(ctx, req.(*LocationFoundCallback)) } return interceptor(ctx, in, info, handler) } diff --git a/go/util/util.go b/go/util/util.go new file mode 100644 index 0000000..c4e4f55 --- /dev/null +++ b/go/util/util.go @@ -0,0 +1,26 @@ +package util + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// DEPRECATED -- DO NOT USE +func ExtractAddressFromCtx(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + // If no metadata is present at all, return an InvalidArgument error + return "", status.Error(codes.InvalidArgument, "no metadata available in context") + } + + addresses := md.Get("client-address") + if len(addresses) == 0 { + // Metadata is there but does not have the expected 'client-address' key + return "", status.Error(codes.InvalidArgument, "client address not provided in metadata") + } + + return addresses[0], nil +} diff --git a/go/util/util_test.go b/go/util/util_test.go new file mode 100644 index 0000000..b7c5ddb --- /dev/null +++ b/go/util/util_test.go @@ -0,0 +1,36 @@ +package util + +import ( + "context" + "testing" + + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestExtractAddressFromCtx(t *testing.T) { + // Create a context with metadata + md := metadata.New(map[string]string{"client-address": "192.168.1.1"}) + ctx := metadata.NewIncomingContext(context.Background(), md) + + // Test extraction + addr, err := ExtractAddressFromCtx(ctx) + if err != nil { + t.Fatalf("Failed to extract address: %v", err) + } + if addr != "192.168.1.1" { + t.Errorf("Extracted address is incorrect, got: %s, want: %s", addr, "192.168.1.1") + } + + // Test with no metadata + ctxNoMeta := context.Background() + _, err = ExtractAddressFromCtx(ctxNoMeta) + if err == nil { + t.Errorf("Expected an error for missing metadata, but got none") + } + st, _ := status.FromError(err) + if st.Code() != codes.InvalidArgument { + t.Errorf("Expected InvalidArgument, got: %v", st.Code()) + } +} diff --git a/proto/rayclient.proto b/proto/rayclient.proto index 661f31a..047d8a1 100644 --- a/proto/rayclient.proto +++ b/proto/rayclient.proto @@ -26,7 +26,7 @@ service GlobalScheduler { message GlobalScheduleRequest { uint64 uid = 1; - string name = 2; + uint64 name = 2; bytes args = 3; bytes kwargs = 4; } @@ -56,7 +56,7 @@ message ScheduleResponse { service LocalObjStore { rpc Store(StoreRequest) returns (StatusResponse); rpc Get(GetRequest) returns (GetResponse); - rpc LocationFound(LocationFoundResponse) returns (StatusResponse); + rpc LocationFound(LocationFoundCallback) returns (StatusResponse); rpc Copy(CopyRequest) returns (CopyResponse); rpc Init(StatusResponse) returns (StatusResponse); } @@ -77,9 +77,9 @@ message GetResponse { bool local = 3; } -message LocationFoundResponse { - uint64 uid = 1; - uint64 location = 2; +message LocationFoundCallback { + uint64 uid = 1; // this is the uid of the object that was originally asked for + uint64 location = 2; // this is the node number of the node who has this object string address = 3; //For testing uint64 port = 4; //For testing } @@ -127,8 +127,8 @@ message RequestLocationRequest { uint64 requester = 2; } -message RequestLocationResponse { - uint64 nodeId = 1; +message RequestLocationResponse { + bool immediatelyFound = 1; } /* END GCS OBJECT TABLE */ @@ -149,6 +149,7 @@ message RegisterResponse { message FetchRequest { uint64 name = 1; + uint64 callbackNodeId = 2; } message FetchResponse {