Skip to content

Commit

Permalink
smallfixes: global sched does not pick dead node, actually send heart…
Browse files Browse the repository at this point in the history
…beats, never double re-issue
  • Loading branch information
rodrigo-castellon committed Jun 3, 2024
1 parent fbcd136 commit 8b45be5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
31 changes: 29 additions & 2 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,27 @@ func (s *GCSObjServer) RegisterLineage(ctx context.Context, req *pb.GlobalSchedu

func (s *GCSObjServer) RegisterLiveNodes(ctx context.Context, req *pb.LiveNodesRequest) (*pb.StatusResponse, error) {

// LocalLog("got RegisterLiveNodes() call.")

s.liveNodes = req.LiveNodes
toSchedule := make(map[uint64]bool)
// toSchedule := make(map[uint64]bool)

for uid, node := range s.generating {
if !s.liveNodes[node] {
LocalLog("this node is no longer existent: %v", node)
delete(s.generating, uid)
s.globalSchedulerClient.Schedule(ctx, s.lineage[uid])
toSchedule[uid] = true
// s.globalSchedulerClient.Schedule(ctx, s.lineage[uid])
}
}

for uid, nodes := range s.objectLocations {
if _, ok := toSchedule[uid]; ok {
continue
// fmt.Printf("Key '%s' exists in the map with value %d\n", key, value)
}

allDead := true
for node := range nodes {
if s.liveNodes[uint64(node)] {
Expand All @@ -267,14 +279,29 @@ func (s *GCSObjServer) RegisterLiveNodes(ctx context.Context, req *pb.LiveNodesR
}
}
if allDead {
s.globalSchedulerClient.Schedule(ctx, s.lineage[uint64(uid)])
LocalLog("all of them are dead so we are re-scheduling this one")
toSchedule[uid] = true
// s.globalSchedulerClient.Schedule(ctx, s.lineage[uint64(uid)])
}

}

for uid, _ := range toSchedule {
LocalLog("uid = %v, rescheduling", uid)
go func() {
_, err := s.globalSchedulerClient.Schedule(ctx, s.lineage[uid])
if err != nil {
LocalLog("cannot contact global scheduler")
} else {
// LocalLog("Just ran it on global!")
}
}()
}
return &pb.StatusResponse{Success: true}, nil
}

func (s *GCSObjServer) RegisterGenerating(ctx context.Context, req *pb.GeneratingRequest) (*pb.StatusResponse, error) {
LocalLog("trying to register node %v as a generating node", req.NodeId)
if id, ok := s.generating[req.Uid]; ok {
return &pb.StatusResponse{Success: false}, status.Error(codes.Internal, fmt.Sprintf("node %d is already generating uid %d", id, req.Uid))
}
Expand Down
19 changes: 17 additions & 2 deletions go/cmd/globalscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
pb.RegisterGlobalSchedulerServer(s, server)
defer conn.Close()
log.Printf("server listening at %v", lis.Addr())
go server.SendLiveNodes(ctx)
go server.LiveNodesHeartbeat(ctx)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (s *server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest ) (*pb.
}

func (s *server) LiveNodesHeartbeat(ctx context.Context) (error) {

ctx = context.Background()
for {
s.SendLiveNodes(ctx)
time.Sleep(HEARTBEAT_WAIT)
Expand All @@ -102,6 +102,12 @@ func(s *server) SendLiveNodes(ctx context.Context) (error) {
liveNodes[uid] = time.Since(heartbeat.timeReceived) < LIVE_NODE_TIMEOUT

}
// LocalLog("sending RegisterLiveNodes() call now")
// go func() {
// if _, err := s.gcsClient.RegisterLiveNodes(ctx, &pb.LiveNodesRequest{LiveNodes: liveNodes}); err != nil {
// LocalLog("Error registering live nodes: %v", err)
// }
// }()
s.gcsClient.RegisterLiveNodes(ctx, &pb.LiveNodesRequest{LiveNodes: liveNodes})
return nil
}
Expand Down Expand Up @@ -169,6 +175,10 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
}

for loc, bytes := range locationToBytes {
// skip dead nodes
if heartbeat, _ := s.status[loc]; !(time.Since(heartbeat.timeReceived) < LIVE_NODE_TIMEOUT) {
continue
}
mu.RLock()
queueingTime := float32(s.status[loc].numQueuedTasks) * s.status[loc].avgRunningTime
transferTime := float32(total - bytes) * s.status[loc].avgBandwidth
Expand All @@ -185,6 +195,11 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
// TODO: make iteration order random for maximum fairness
mu.RLock()
for id, heartbeat := range s.status {
// skip dead nodes
if !(time.Since(heartbeat.timeReceived) < LIVE_NODE_TIMEOUT) {
continue
}

if float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime < minTime {
minId = id
minTime = float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/localscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch
if (req.NodeId != 0) {
LocalLog("Doing something special with req.NodeId = %v", req.NodeId)
go func() {
_, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, Uids: req.Uids, NodeId: req.NodeId})
_, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, Uids: req.Uids, NewObject: true, NodeId: req.NodeId})
if err != nil {
LocalLog("cannot contact global scheduler")
} else {
Expand Down

0 comments on commit 8b45be5

Please sign in to comment.