From 3e2669521bccefa518b734b070929e5197215b57 Mon Sep 17 00:00:00 2001 From: trevleon Date: Mon, 3 Jun 2024 04:27:44 +0000 Subject: [PATCH] change gcs back to remove generating --- go/cmd/gcsobjtable/main.go | 13 +++++-------- go/cmd/globalscheduler/main.go | 15 +++++++++++++-- go/cmd/localscheduler/main.go | 4 ++-- go/cmd/worker/main.go | 3 ++- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/go/cmd/gcsobjtable/main.go b/go/cmd/gcsobjtable/main.go index e012a79..2919423 100644 --- a/go/cmd/gcsobjtable/main.go +++ b/go/cmd/gcsobjtable/main.go @@ -86,11 +86,7 @@ Assumes that s's mutex is locked. */ func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) { nodeIds, exists := s.objectLocations[uid] - - if !exists { - if !s.liveNodes[s.generating[uid]] { - return nil, true - } + if !exists || len(nodeIds) == 0 { return nil, false } @@ -105,8 +101,6 @@ func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) { return nil, true } - - // Note: policy is to pick a random one; in the future it will need to be locality-based randomIndex := rand.Intn(len(nodesToReturn)) nodeId := &nodesToReturn[randomIndex] @@ -190,14 +184,16 @@ func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocat clientAddress := net.JoinHostPort(host, clientPort) uid := req.Uid + LocalLog("Starting get Node ID") nodeId, exists := s.getNodeId(uid) + LocalLog("finished get node ID") if nodeId == nil { // Add client to waiting list if _, waiting := s.waitlist[uid]; !waiting { s.waitlist[uid] = []string{} // Initialize slice if it doesn't exist } s.waitlist[uid] = append(s.waitlist[uid], clientAddress) - + if exists { _, err := s.globalSchedulerClient.Schedule(ctx, s.lineage[uid]) if err != nil { @@ -241,6 +237,7 @@ func (s *GCSObjServer) RegisterLineage(ctx context.Context, req *pb.GlobalSchedu } func (s *GCSObjServer) RegisterLiveNodes(ctx context.Context, req *pb.LiveNodesRequest) (*pb.StatusResponse, error) { + s.liveNodes = req.LiveNodes for uid, node := range s.generating { if !s.liveNodes[node] { diff --git a/go/cmd/globalscheduler/main.go b/go/cmd/globalscheduler/main.go index 732a446..a32274a 100644 --- a/go/cmd/globalscheduler/main.go +++ b/go/cmd/globalscheduler/main.go @@ -22,6 +22,17 @@ const LIVE_NODE_TIMEOUT time.Duration = 400 * time.Millisecond const HEARTBEAT_WAIT = 100 * time.Millisecond var mu sync.RWMutex +// LocalLog formats the message and logs it with a specific prefix +func LocalLog(format string, v ...interface{}) { + var logMessage string + if len(v) == 0 { + logMessage = format // No arguments, use the format string as-is + } else { + logMessage = fmt.Sprintf(format, v...) + } + log.Printf("[localscheduler] %s", logMessage) +} + func main() { ctx := context.Background() cfg = config.GetConfig() // Load configuration @@ -100,7 +111,7 @@ func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest ) if os.Getenv("LOCALITY_AWARE") == "true" { localityFlag = true } - + LocalLog("Got a global schedule request") // gives us back the node id of the worker node_id := getBestWorker(ctx, s, localityFlag, req.Uids) workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, node_id, cfg.Ports.LocalWorkerStart) @@ -113,7 +124,7 @@ func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest ) defer conn.Close() workerClient := pb.NewWorkerClient(conn) - + LocalLog("Contacted the worker") if req.NewObject { s.gcsClient.RegisterGenerating(ctx, &pb.GeneratingRequest{Uid: req.Uid, NodeId: node_id}) } diff --git a/go/cmd/localscheduler/main.go b/go/cmd/localscheduler/main.go index 3ddd441..29a2b01 100644 --- a/go/cmd/localscheduler/main.go +++ b/go/cmd/localscheduler/main.go @@ -93,7 +93,7 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch if err != nil { LocalLog("got error from WorkerStatus() in Sched: %v", err) } - _ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs}) + _ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, Uids: req.Uids}) if err != nil { LocalLog("cant hit gcs: %v", err) } @@ -112,7 +112,7 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch } else { // LocalLog("contacting global scheduler") go func() { - _, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, NewObject: true}) + _, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, NewObject: true, Uids: req.Uids}) if err != nil { LocalLog("cannot contact global scheduler") } else { diff --git a/go/cmd/worker/main.go b/go/cmd/worker/main.go index 98a7ce1..b76e4bf 100644 --- a/go/cmd/worker/main.go +++ b/go/cmd/worker/main.go @@ -119,6 +119,7 @@ func executeFunction(f []byte, args []byte, kwargs []byte) ([]byte, error) { cmd.Stdin = inputBuffer // Capture the output + LocalLog("Running the function here yo") output, err := cmd.Output() if err != nil { log.Fatalf("Error executing function: %v", err) @@ -175,7 +176,7 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR LocalLog("failed to hit gcs %v", err) return nil, err } - + runningTime := float32(time.Since(start).Seconds()) mu.Lock() averageRunningTime = EMA_PARAM*averageRunningTime + (1-EMA_PARAM)*runningTime