Skip to content

Commit

Permalink
Merge pull request #10 from rodrigo-castellon/locals-meets-gcs
Browse files Browse the repository at this point in the history
[Official] Integrate GCS with Locals, basic functionality
  • Loading branch information
Russell-Tran authored May 15, 2024
2 parents fd684eb + 528f2e3 commit f43563f
Show file tree
Hide file tree
Showing 13 changed files with 1,193 additions and 535 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,5 @@ go.work
go/cmd/gcsfunctable/main
go/cmd/gcsobjtable/gcsobjtable
python/babyray/rayclient_pb2_grpc.py-e
go/cmd/localobjstore/localobjstore
go/cmd/localscheduler/localscheduler
218 changes: 152 additions & 66 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
@@ -1,88 +1,174 @@
package main

import (
"context"
// "errors"
"log"
"net"
"strconv"
"sync"
"math/rand"

"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"

"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"context"
"time"

// "errors"
"log"
"math/rand"
"net"
"strconv"
"sync"

"github.com/rodrigo-castellon/babyray/config"
pb "github.com/rodrigo-castellon/babyray/pkg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/peer"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var cfg *config.Config

func main() {
cfg := config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

// server is used to implement your gRPC service.
type GCSObjServer struct {
pb.UnimplementedGCSObjServer
objectLocations map[uint64][]uint64
mu sync.Mutex
cond *sync.Cond
pb.UnimplementedGCSObjServer
objectLocations map[uint64][]uint64 // object uid -> list of nodeIds as uint64
waitlist map[uint64][]string // object uid -> list of IP addresses as string
mu sync.Mutex // lock should be used for both objectLocations and waitlist
}

func NewGCSObjServer() *GCSObjServer {
server := &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
mu: sync.Mutex{}, // Mutex initialized here.
}
server.cond = sync.NewCond(&server.mu) // Properly pass the address of the struct's mutex.
return server
server := &GCSObjServer{
objectLocations: make(map[uint64][]uint64),
waitlist: make(map[uint64][]string),
mu: sync.Mutex{},
}
return server
}

/*
Returns a nodeId that has object uid. If it doesn't exist anywhere,
then the second return value will be false.
Assumes that s's mutex is locked.
*/
func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) {
nodeIds, exists := s.objectLocations[uid]
if !exists || len(nodeIds) == 0 {
return nil, false
}

// Note: policy is to pick a random one; in the future it will need to be locality-based
randomIndex := rand.Intn(len(nodeIds))
nodeId := &nodeIds[randomIndex]
return nodeId, true
}

// sendCallback sends a location found callback to the local object store client
// This should be used as a goroutine
func (s *GCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uint64) {
// Set up a new gRPC connection to the client
// TODO: Refactor to save gRPC connections rather than creating a new one each time
// Dial is lazy-loading, but we should still save the connection for future use
conn, err := grpc.Dial(clientAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// Log the error instead of returning it
log.Printf("Failed to connect back to client at %s: %v", clientAddress, err)
return
}
defer conn.Close() // TODO: remove in some eventual universe

localObjStoreClient := pb.NewLocalObjStoreClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Call LocationFound and handle any potential error
_, err = localObjStoreClient.LocationFound(ctx, &pb.LocationFoundCallback{Uid: uid, Location: nodeId})
if err != nil {
log.Printf("Failed to send LocationFound callback for UID %d to client at %s: %v", uid, clientAddress, err)
}
}

// Implement your service methods here.
func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest) (*pb.StatusResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

uid, nodeId := req.Uid, req.NodeId

// Append the nodeId to the list for the given object uid
if _, exists := s.objectLocations[uid]; !exists {
s.objectLocations[uid] = []uint64{} // Initialize slice if it doesn't exist
}
s.objectLocations[uid] = append(s.objectLocations[uid], nodeId)

// Append the nodeId to the list for the given uid
s.objectLocations[req.Uid] = append(s.objectLocations[req.Uid], req.NodeId)
s.cond.Broadcast()
// Clear waitlist for this uid, if any
waitingIPs, exists := s.waitlist[uid]
if exists {
for _, clientAddress := range waitingIPs {
go s.sendCallback(clientAddress, uid, nodeId)
}
// Clear the waitlist for the given uid after processing
delete(s.waitlist, uid)
}

return &pb.StatusResponse{Success: true}, nil
return &pb.StatusResponse{Success: true}, nil
}

func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocationRequest) (*pb.RequestLocationResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
nodeIds, exists := s.objectLocations[req.Uid]

for !exists || len(nodeIds) == 0 {
s.cond.Wait()
nodeIds, exists = s.objectLocations[req.Uid]
if ctx.Err() != nil {
return nil, status.Error(codes.Canceled, "request cancelled or context deadline exceeded")
}
}

// Assume successful case
randomIndex := rand.Intn(len(nodeIds))
return &pb.RequestLocationResponse{
NodeId : nodeIds[randomIndex],
}, nil
}
s.mu.Lock()
defer s.mu.Unlock()

// Extract client address
p, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Error(codes.Internal, "could not get peer information; hence, failed to extract client address")
}

// /////
// log.SetOutput(os.Stdout)
// log.Println(p.Addr.String())
// //////////

// NOTE: We only use one cv for now, which may cause performance issues in the future
// However, there is significant overhead if we want one cv per object, as then we would have to manage
// the cleanup through reference counting
host, _, err := net.SplitHostPort(p.Addr.String()) // Strip out the ephemeral port
if err != nil {
//return nil, status.Error(codes.Internal, "could not split host and port")
// Temporary workaround: If splitting fails, use the entire address string as the host
host = p.Addr.String()
}
clientPort := strconv.Itoa(cfg.Ports.LocalObjectStore) // Replace the ephemeral port with the official LocalObjectStore port
clientAddress := net.JoinHostPort(host, clientPort)

uid := req.Uid
nodeId, exists := s.getNodeId(uid)
if !exists {
// Add client to waiting list
if _, exists := s.waitlist[uid]; !exists {
s.waitlist[uid] = []string{} // Initialize slice if it doesn't exist
}
s.waitlist[uid] = append(s.waitlist[uid], clientAddress)

// Reply to this gRPC request
return &pb.RequestLocationResponse{
ImmediatelyFound: false,
}, nil
}

// Send immediate callback
go s.sendCallback(clientAddress, uid, *nodeId)

// Reply to this gRPC request
return &pb.RequestLocationResponse{
ImmediatelyFound: true,
}, nil
}
Loading

0 comments on commit f43563f

Please sign in to comment.