Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locals #4

Merged
merged 95 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
ad240a6
loc object store and sched
trevleon May 2, 2024
81396ca
add rpcs
trevleon May 2, 2024
41e4b44
change copy
trevleon May 3, 2024
7a823a4
syntax
trevleon May 5, 2024
c0ab3d5
syntax
trevleon May 5, 2024
fedc3b0
syntax
trevleon May 5, 2024
8e226dc
= to :
trevleon May 5, 2024
1e18fcb
bytes to byte slices
trevleon May 5, 2024
1b69d96
fix gcs grpc connection
trevleon May 5, 2024
cde0928
correct cfg names
trevleon May 6, 2024
a490939
explicit field declaration
trevleon May 6, 2024
68466c0
typing
trevleon May 6, 2024
e13cb95
capitalization
trevleon May 6, 2024
ccc70a7
capitals
trevleon May 6, 2024
94d7e28
retur types
trevleon May 6, 2024
cf3e745
fixes
trevleon May 6, 2024
6dea56b
conn interface and cfg fix
trevleon May 6, 2024
70d443f
import context
trevleon May 6, 2024
52ea140
syntax
trevleon May 6, 2024
669f58f
syntax
trevleon May 6, 2024
2561596
channel read change
trevleon May 6, 2024
1c69c01
bugs
trevleon May 6, 2024
1863cc8
typing
trevleon May 6, 2024
88c4493
types
trevleon May 6, 2024
bd64f51
equals
trevleon May 6, 2024
baedc16
typing
trevleon May 6, 2024
61804ad
channel type
trevleon May 6, 2024
6823107
fix scheduler
trevleon May 7, 2024
94a14f3
fixes
trevleon May 8, 2024
6de7e85
typing
trevleon May 8, 2024
1884655
fixes
trevleon May 9, 2024
664ddf7
schedule client
trevleon May 9, 2024
7565c35
fixes
trevleon May 9, 2024
69d1ef5
ctx
trevleon May 9, 2024
19f28b6
Merge branch 'main' into locals
trevleon May 9, 2024
3204708
load is semantically different from get
trevleon May 9, 2024
1494c1c
pointer
trevleon May 9, 2024
427b302
pointer
trevleon May 9, 2024
04edfa4
Merge branch 'main' into locals
trevleon May 9, 2024
b58c1ef
merge from main
trevleon May 9, 2024
f65775d
multi server test
trevleon May 12, 2024
55ff4ad
int64
trevleon May 12, 2024
659ad74
more int32
trevleon May 12, 2024
8ca209e
fix
trevleon May 12, 2024
95c324b
requester
trevleon May 12, 2024
dbcb7b5
stupid go
trevleon May 12, 2024
f2099c9
silly golang
trevleon May 12, 2024
353eb35
remove name from schedule request
trevleon May 12, 2024
84fdfff
fix
trevleon May 12, 2024
7387837
fix test
trevleon May 12, 2024
f12ae1b
fix goroutine
trevleon May 12, 2024
37c783e
fix test
trevleon May 12, 2024
2737929
fix errors
trevleon May 12, 2024
9d34c19
errors
trevleon May 12, 2024
267c4eb
serving
trevleon May 12, 2024
d623306
serving
trevleon May 12, 2024
fe594e1
serve
trevleon May 12, 2024
9b277d4
colonoscopy
trevleon May 12, 2024
5a8a530
colons
trevleon May 12, 2024
7cc8416
call main
trevleon May 12, 2024
ec85369
dont start server in main
trevleon May 12, 2024
acd85b5
hacky
trevleon May 12, 2024
276a2f4
stop server
trevleon May 12, 2024
916bb76
error checking
trevleon May 12, 2024
536d48a
import errors
trevleon May 12, 2024
af2f112
error checking
trevleon May 12, 2024
8d96d47
cant copy?
trevleon May 12, 2024
ff286a9
printing
trevleon May 12, 2024
aa8d7d7
error
trevleon May 12, 2024
aa78087
sprinting
trevleon May 12, 2024
fef7fe7
!= != ==
trevleon May 12, 2024
3700a0c
dont hit gcs in testing
trevleon May 12, 2024
023e78c
copy response error
trevleon May 12, 2024
e0baa1a
timeout on grpc
trevleon May 12, 2024
dfea6a2
dont worry about gcs
trevleon May 12, 2024
09b7d88
error checking
trevleon May 12, 2024
e2ba073
we printing
trevleon May 12, 2024
eeda7b4
more printing
trevleon May 12, 2024
cbab941
final print
trevleon May 12, 2024
e80c535
checking if chan exists
trevleon May 12, 2024
f93af3d
channel chasing
trevleon May 12, 2024
25d65bb
sprinting
trevleon May 12, 2024
74928c1
change to get resopnse
trevleon May 12, 2024
f2172ef
local checking
trevleon May 12, 2024
1b4876d
change uid
trevleon May 12, 2024
42c2003
change to struct
trevleon May 12, 2024
01bd7bb
fix
trevleon May 12, 2024
b92c4b9
func name
trevleon May 12, 2024
c0ecdc7
add to interface
trevleon May 12, 2024
2f2418b
init rpc
trevleon May 12, 2024
fb84c5d
init
trevleon May 12, 2024
99c1547
arguing
trevleon May 12, 2024
1ad226f
unit tests passing
trevleon May 12, 2024
6192925
Run make go to reflect Trevor's changes to the protocol
Russell-Tran May 12, 2024
cfce20c
Merge branch 'main' into locals
Russell-Tran May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 6 additions & 59 deletions go/cmd/gcsobjtable/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"net"
"testing"
"log"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
Expand All @@ -20,7 +18,7 @@ var lis *bufconn.Listener
func init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
pb.RegisterGCSObjServer(s, NewGCSObjServer())
pb.RegisterGCSObjServer(s, &server{objectLocations: make(map[uint32][]uint32)})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
Expand All @@ -41,7 +39,7 @@ func TestNotifyOwns(t *testing.T) {
defer conn.Close()
client := pb.NewGCSObjClient(conn)

// Testing NotifyOwns
// Test NotifyOwns
resp, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{
Uid: 1,
NodeId: 100,
Expand Down Expand Up @@ -71,61 +69,10 @@ func TestRequestLocation(t *testing.T) {

// Test RequestLocation
resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: 1})
if err != nil {
t.Errorf("RequestLocation failed: %v", err)
return
if err != nil || !resp.Success {
t.Errorf("RequestLocation failed: %v, response: %v", err, resp)
}
if resp.NodeId != 100 {
t.Errorf("RequestLocation returned incorrect node ID: got %d, want %d", resp.NodeId, 100)
if resp.Details != "100" {
t.Errorf("RequestLocation returned incorrect node ID: got %s, want %s", resp.Details, "100")
}
}

// Create a unit test in Go where three goroutines are involved, with the first two waiting for an object's location
// and the third notifying the server of the object's presence:
// - Two goroutines will call RequestLocation for a UID that initially doesn't have any node IDs associated with it. They will block until they are notified of a change.
// - One goroutine will perform the NotifyOwns action after a short delay, adding a node ID to the UID, which should then notify the waiting goroutines.
func TestRequestLocationWithNotification(t *testing.T) {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
client := pb.NewGCSObjClient(conn)

var wg sync.WaitGroup
uid := uint64(1) // Example UID for testing

// Start two goroutines that are trying to fetch the object location
for i := 0; i < 2; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
resp, err := client.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: uid})
if err != nil {
t.Errorf("Goroutine %d: RequestLocation failed: %v", index, err)
return
}
if resp.NodeId != 100 {
t.Errorf("Goroutine %d: RequestLocation returned incorrect node ID: got %d, want %d", index, resp.NodeId, 100)
}
}(i)
}

// Goroutine to notify
wg.Add(1)
go func() {
defer wg.Done()
// Let the requests initiate first
time.Sleep(100 * time.Millisecond)
_, err := client.NotifyOwns(ctx, &pb.NotifyOwnsRequest{
Uid: uid,
NodeId: 100,
})
if err != nil {
t.Fatalf("NotifyOwns failed: %v", err)
}
}()

wg.Wait() // Wait for all goroutines to complete
}
129 changes: 114 additions & 15 deletions go/cmd/localobjstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,132 @@ import (
"log"
"net"
"strconv"

"fmt"
context "context"
"errors"
"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"
)

)
// var localObjectStore map[uint64][]byte
// var localObjectChannels map[uint64]chan []byte
// var gcsObjClient pb.GCSObjClient
// var localNodeID uint64
var cfg *config.Config
func main() {
cfg := config.LoadConfig() // Load configuration
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.LocalObjectStore) // Prepare the network address

lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterLocalObjStoreServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
if address == "" {
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
pb.RegisterLocalObjStoreServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}


// localObjectStore = make(map[uint64][]byte)
// localObjectChannels = make(map[uint64]chan []byte)

// gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable)
// conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure())
// gcsObjClient = pb.NewGCSObjClient(conn)
// localNodeID = 0
}

// server is used to implement your gRPC service.
type server struct {
pb.UnimplementedLocalObjStoreServer
localObjectStore map[uint64][]byte
localObjectChannels map[uint64]chan []byte
gcsObjClient pb.GCSObjClient
localNodeID uint64
}

func (s* server) Init(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) {
s.localObjectStore = make(map[uint64][]byte)
s.localObjectChannels = make(map[uint64]chan []byte)
s.localNodeID = 1
gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable)
conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure())
s.gcsObjClient = pb.NewGCSObjClient(conn)

return &pb.StatusResponse{Success: true}, nil

}
func (s *server) Store(ctx context.Context, req *pb.StoreRequest) (*pb.StatusResponse, error) {
s.localObjectStore[req.Uid] = req.ObjectBytes

s.gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: req.Uid, NodeId: s.localNodeID})
return &pb.StatusResponse{Success: true}, nil
}

func (s *server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
if val, ok := s.localObjectStore[req.Uid]; ok {
return &pb.GetResponse{Uid : req.Uid, ObjectBytes : val, Local: true}, nil
}

s.localObjectChannels[req.Uid] = make(chan []byte)
if req.Testing == false {
s.gcsObjClient.RequestLocation(ctx, &pb.RequestLocationRequest{Uid: req.Uid, Requester: s.localNodeID})
}

val := <- s.localObjectChannels[req.Uid]
s.localObjectStore[req.Uid] = val
return &pb.GetResponse{Uid : req.Uid, ObjectBytes : s.localObjectStore[req.Uid], Local: false}, nil
}

func (s* server) LocationFound(ctx context.Context, resp *pb.LocationFoundResponse) (*pb.StatusResponse, error) {
var otherLocalAddress string

if resp.Port == 0 {
nodeID := resp.Location;
otherLocalAddress = fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, nodeID, cfg.Ports.LocalObjectStore)
} else {
otherLocalAddress = fmt.Sprintf("%s:%d", resp.Address, resp.Port)
}

conn, err := grpc.Dial(otherLocalAddress, grpc.WithInsecure())

if err != nil {
return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to dial other LOS @:%s ", otherLocalAddress))
}

c := pb.NewLocalObjStoreClient(conn)

x, err := c.Copy(ctx, &pb.CopyRequest{Uid : resp.Uid, Requester : s.localNodeID})

if x == nil || err != nil {
return &pb.StatusResponse{Success: false}, errors.New(fmt.Sprintf("failed to copy from other LOS @:%s ", otherLocalAddress))
}
// if resp.Port == "" {

// gcsObjClient.NotifyOwns(ctx, &pb.NotifyOwnsRequest{Uid: resp.Uid, NodeId: localNodeID})
// }

channel, ok := s.localObjectChannels[resp.Uid]
if !ok {
return &pb.StatusResponse{Success: false}, errors.New("channel DNE")
}
channel <- x.ObjectBytes

return &pb.StatusResponse{Success: true}, nil

}

func (s* server) Copy(ctx context.Context, req *pb.CopyRequest) (*pb.CopyResponse, error) {
data, ok:= s.localObjectStore[req.Uid];
if !ok {
return &pb.CopyResponse{Uid: req.Uid, ObjectBytes : nil}, errors.New("object was not in LOS")
}
return &pb.CopyResponse{Uid : req.Uid, ObjectBytes : data}, nil
}

// Implement your service methods here.

Loading
Loading