Skip to content

Commit

Permalink
Merge pull request #4 from rodrigo-castellon/locals
Browse files Browse the repository at this point in the history
Locals
  • Loading branch information
Russell-Tran authored May 15, 2024
2 parents b358023 + cfce20c commit fd684eb
Show file tree
Hide file tree
Showing 7 changed files with 964 additions and 216 deletions.
65 changes: 6 additions & 59 deletions go/cmd/gcsobjtable/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"net"
"testing"
"log"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
Expand All @@ -20,7 +18,7 @@ var lis *bufconn.Listener
func init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
pb.RegisterGCSObjServer(s, &server{objectLocations: make(map[uint32][]uint32)})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
Expand All @@ -41,7 +39,7 @@ func TestNotifyOwns(t *testing.T) {
defer conn.Close()
client := pb.NewGCSObjClient(conn)

// Testing NotifyOwns
// Test NotifyOwns
resp, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{
Uid: 1,
NodeId: 100,
Expand Down Expand Up @@ -71,61 +69,10 @@ func TestRequestLocation(t *testing.T) {

// Test RequestLocation
resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1})
if err != nil {
t.Errorf("RequestLocation failed: %v", err)
return
if err != nil || !resp.Success {
t.Errorf("RequestLocation failed: %v, response: %v", err, resp)
}
if resp.NodeId != 100 {
t.Errorf("RequestLocation returned incorrect node ID: got %d, want %d", resp.NodeId, 100)
if resp.Details != "100" {
t.Errorf("RequestLocation returned incorrect node ID: got %s, want %s", resp.Details, "100")
}
}

// Create a unit test in Go where three goroutines are involved, with the first two waiting for an object's location
// and the third notifying the server of the object's presence:
// - Two goroutines will call RequestLocation for a UID that initially doesn't have any node IDs associated with it. They will block until they are notified of a change.
// - One goroutine will perform the NotifyOwns action after a short delay, adding a node ID to the UID, which should then notify the waiting goroutines.
func TestRequestLocationWithNotification(t *testing.T) {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
client := pb.NewGCSObjClient(conn)

var wg sync.WaitGroup
uid := uint64(1) // Example UID for testing

// Start two goroutines that are trying to fetch the object location
for i := 0; i < 2; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: uid})
if err != nil {
t.Errorf("Goroutine %d: RequestLocation failed: %v", index, err)
return
}
if resp.NodeId != 100 {
t.Errorf("Goroutine %d: RequestLocation returned incorrect node ID: got %d, want %d", index, resp.NodeId, 100)
}
}(i)
}

// Goroutine to notify
wg.Add(1)
go func() {
defer wg.Done()
// Let the requests initiate first
time.Sleep(100 * time.Millisecond)
_, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{
Uid: uid,
NodeId: 100,
})
if err != nil {
t.Fatalf("NotifyOwns failed: %v", err)
}
}()

wg.Wait() // Wait for all goroutines to complete
}
129 changes: 114 additions & 15 deletions go/cmd/localobjstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,132 @@ import (
"log"
"net"
"strconv"

"fmt"
context "context"
"errors"
"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"
)

)
// var localObjectStore map[uint64][]byte
// var localObjectChannels map[uint64]chan []byte
// var gcsObjClient pb.GCSObjClient
// var localNodeID uint64
var cfg *config.Config
func main() {
cfg := config.LoadConfig() // Load configuration
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterLocalObjStoreServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
if address == "" {
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterLocalObjStoreServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}


// localObjectStore = make(map[uint64][]byte)
// localObjectChannels = make(map[uint64]chan []byte)

// gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable)
// conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure())
// gcsObjClient = pb.NewGCSObjClient(conn)
// localNodeID = 0
}

// server is used to implement your gRPC service.
type server struct {
pb.UnimplementedLocalObjStoreServer
localObjectStore map[uint64][]byte
localObjectChannels map[uint64]chan []byte
gcsObjClient pb.GCSObjClient
localNodeID uint64
}

func (s* server) Init(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) {
s.localObjectStore = make(map[uint64][]byte)
s.localObjectChannels = make(map[uint64]chan []byte)
s.localNodeID = 1
gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable)
conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure())
s.gcsObjClient = pb.NewGCSObjClient(conn)

return &pb.StatusResponse{Success: true}, nil

}
func (s *server) Store(ctx context.Context, req *pb.StoreRequest) (*pb.StatusResponse, error) {
s.localObjectStore[req.Uid] = req.ObjectBytes

s.gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: req.Uid, NodeId: s.localNodeID})
return &pb.StatusResponse{Success: true}, nil
}

func (s *server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
if val, ok := s.localObjectStore[req.Uid]; ok {
return &pb.GetResponse{Uid : req.Uid, ObjectBytes : val, Local: true}, nil
}

s.localObjectChannels[req.Uid] = make(chan []byte)
if req.Testing == false {
s.gcsObjClient.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: req.Uid, Requester: s.localNodeID})
}

val := <- s.localObjectChannels[req.Uid]
s.localObjectStore[req.Uid] = val
return &pb.GetResponse{Uid : req.Uid, ObjectBytes : s.localObjectStore[req.Uid], Local: false}, nil
}

func (s* server) LocationFound(ctx context.Context, resp *pb.LocationFoundResponse) (*pb.StatusResponse, error) {
var otherLocalAddress string

if resp.Port == 0 {
nodeID := resp.Location;
otherLocalAddress = fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, nodeID, cfg.Ports.LocalObjectStore)
} else {
otherLocalAddress = fmt.Sprintf("%s:%d", resp.Address, resp.Port)
}

conn, err := grpc.Dial(otherLocalAddress, grpc.WithInsecure())

if err != nil {
return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to dial other LOS @:%s ", otherLocalAddress))
}

c := pb.NewLocalObjStoreClient(conn)

x, err := c.Copy(ctx, &pb.CopyRequest{Uid : resp.Uid, Requester : s.localNodeID})

if x == nil || err != nil {
return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to copy from other LOS @:%s ", otherLocalAddress))
}
// if resp.Port == "" {

// gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: resp.Uid, NodeId: localNodeID})
// }

channel, ok := s.localObjectChannels[resp.Uid]
if !ok {
return &pb.StatusResponse{Success: false}, errors.New("channel DNE")
}
channel <- x.ObjectBytes

return &pb.StatusResponse{Success: true}, nil

}

func (s* server) Copy(ctx context.Context, req *pb.CopyRequest) (*pb.CopyResponse, error) {
data, ok:= s.localObjectStore[req.Uid];
if !ok {
return &pb.CopyResponse{Uid: req.Uid, ObjectBytes : nil}, errors.New("object was not in LOS")
}
return &pb.CopyResponse{Uid : req.Uid, ObjectBytes : data}, nil
}

// Implement your service methods here.

Loading

0 comments on commit fd684eb

Please sign in to comment.