From fdec62a95584bebdeb0ee634a9a10f6f6eb3ad21 Mon Sep 17 00:00:00 2001 From: trevleon Date: Mon, 3 Jun 2024 03:58:27 +0000 Subject: [PATCH] logging --- go/cmd/gcsobjtable/main.go | 1 + go/cmd/localscheduler/main.go | 13 ++++++++++--- go/cmd/worker/main.go | 6 +++++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/go/cmd/gcsobjtable/main.go b/go/cmd/gcsobjtable/main.go index b41135d..e012a79 100644 --- a/go/cmd/gcsobjtable/main.go +++ b/go/cmd/gcsobjtable/main.go @@ -74,6 +74,7 @@ func NewGCSObjServer() *GCSObjServer { lineage: make(map[uint64]*pb.GlobalScheduleRequest), globalSchedulerClient: globalSchedulerClient, liveNodes: make(map[uint64]bool), + generating: make(map[uint64]uint64), } return server } diff --git a/go/cmd/localscheduler/main.go b/go/cmd/localscheduler/main.go index cb70ddc..3ddd441 100644 --- a/go/cmd/localscheduler/main.go +++ b/go/cmd/localscheduler/main.go @@ -89,8 +89,14 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch worker_id, _ = strconv.Atoi(os.Getenv("NODE_ID")) uid := rand.Uint64() - scheduleLocally, _ := s.workerClient.WorkerStatus(ctx, &pb.StatusResponse{}) - s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs}) + scheduleLocally, err := s.workerClient.WorkerStatus(ctx, &pb.StatusResponse{}) + if err != nil { + LocalLog("got error from WorkerStatus() in Sched: %v", err) + } + _ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs}) + if err != nil { + LocalLog("cant hit gcs: %v", err) + } if scheduleLocally.NumRunningTasks < MAX_TASKS { // LocalLog("Just running locally") go func() { @@ -130,6 +136,7 @@ func (s *server) ReviveServer(ctx context.Context, req *pb.StatusResponse) (*pb. } func (s *server) SendHeartbeats(ctx context.Context, globalSchedulerClient pb.GlobalSchedulerClient, nodeId uint64 ) { + workerAddress := fmt.Sprintf("localhost:%d", cfg.Ports.LocalWorkerStart) workerConn, err := grpc.Dial(workerAddress, grpc.WithInsecure()) if err != nil { @@ -176,7 +183,7 @@ func (s *server) SendHeartbeats(ctx context.Context, globalSchedulerClient pb.Gl for { avgBandwidth, err = lobsClient.AvgBandwidth(ctx, &pb.StatusResponse{}) if err != nil { - LocalLog("got error from WorkerStatus(): %v", err) + LocalLog("got error from AvgBand(): %v", err) LocalLog("retrying in %v seconds", backoff) time.Sleep(time.Duration(backoff) * time.Second) backoff *= 2 diff --git a/go/cmd/worker/main.go b/go/cmd/worker/main.go index e3345df..98a7ce1 100644 --- a/go/cmd/worker/main.go +++ b/go/cmd/worker/main.go @@ -137,6 +137,7 @@ func executeFunction(f []byte, args []byte, kwargs []byte) ([]byte, error) { // run executes the function by fetching it, running it, and storing the result. func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusResponse, error) { LocalLog("in Run() rn") + defer LocalLog("Finished running") mu.Lock() numQueuedTasks++ mu.Unlock() @@ -159,16 +160,19 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR funcResponse, err := s.funcClient.FetchFunc(ctx, &pb.FetchRequest{Name: req.Name}) if err != nil { + LocalLog("Failed to hit func table: %v", err) return nil, err } output, err := executeFunction(funcResponse.SerializedFunc, req.Args, req.Kwargs) if err != nil { + LocalLog("failed to exec func %v", err) return nil, err } _, err = s.storeClient.Store(ctx, &pb.StoreRequest{Uid: req.Uid, ObjectBytes: output}) if err != nil { + LocalLog("failed to hit gcs %v", err) return nil, err } @@ -183,7 +187,7 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR func (s *workerServer) WorkerStatus(ctx context.Context, req *pb.StatusResponse) (*pb.WorkerStatusResponse, error) { mu.Lock() defer mu.Unlock() - // log.Printf("num queued tasks is currently: %v", numQueuedTasks) + //log.Printf("num queued tasks is currently: %v", numQueuedTasks) return &pb.WorkerStatusResponse{ NumRunningTasks: numRunningTasks, NumQueuedTasks: numQueuedTasks,