Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
trevleon committed Jun 3, 2024
1 parent 461a6d6 commit fdec62a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
1 change: 1 addition & 0 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewGCSObjServer() *GCSObjServer {
lineage: make(map[uint64]*pb.GlobalScheduleRequest),
globalSchedulerClient: globalSchedulerClient,
liveNodes: make(map[uint64]bool),
generating: make(map[uint64]uint64),
}
return server
}
Expand Down
13 changes: 10 additions & 3 deletions go/cmd/localscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,14 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch
worker_id, _ = strconv.Atoi(os.Getenv("NODE_ID"))
uid := rand.Uint64()

scheduleLocally, _ := s.workerClient.WorkerStatus(ctx, &pb.StatusResponse{})
s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs})
scheduleLocally, err := s.workerClient.WorkerStatus(ctx, &pb.StatusResponse{})
if err != nil {
LocalLog("got error from WorkerStatus() in Sched: %v", err)
}
_ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs})
if err != nil {
LocalLog("cant hit gcs: %v", err)
}
if scheduleLocally.NumRunningTasks < MAX_TASKS {
// LocalLog("Just running locally")
go func() {
Expand Down Expand Up @@ -130,6 +136,7 @@ func (s *server) ReviveServer(ctx context.Context, req *pb.StatusResponse) (*pb.
}

func (s *server) SendHeartbeats(ctx context.Context, globalSchedulerClient pb.GlobalSchedulerClient, nodeId uint64 ) {

workerAddress := fmt.Sprintf("localhost:%d", cfg.Ports.LocalWorkerStart)
workerConn, err := grpc.Dial(workerAddress, grpc.WithInsecure())
if err != nil {
Expand Down Expand Up @@ -176,7 +183,7 @@ func (s *server) SendHeartbeats(ctx context.Context, globalSchedulerClient pb.Gl
for {
avgBandwidth, err = lobsClient.AvgBandwidth(ctx, &pb.StatusResponse{})
if err != nil {
LocalLog("got error from WorkerStatus(): %v", err)
LocalLog("got error from AvgBand(): %v", err)
LocalLog("retrying in %v seconds", backoff)
time.Sleep(time.Duration(backoff) * time.Second)
backoff *= 2
Expand Down
6 changes: 5 additions & 1 deletion go/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func executeFunction(f []byte, args []byte, kwargs []byte) ([]byte, error) {
// run executes the function by fetching it, running it, and storing the result.
func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusResponse, error) {
LocalLog("in Run() rn")
defer LocalLog("Finished running")
mu.Lock()
numQueuedTasks++
mu.Unlock()
Expand All @@ -159,16 +160,19 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR

funcResponse, err := s.funcClient.FetchFunc(ctx, &pb.FetchRequest{Name: req.Name})
if err != nil {
LocalLog("Failed to hit func table: %v", err)
return nil, err
}

output, err := executeFunction(funcResponse.SerializedFunc, req.Args, req.Kwargs)
if err != nil {
LocalLog("failed to exec func %v", err)
return nil, err
}

_, err = s.storeClient.Store(ctx, &pb.StoreRequest{Uid: req.Uid, ObjectBytes: output})
if err != nil {
LocalLog("failed to hit gcs %v", err)
return nil, err
}

Expand All @@ -183,7 +187,7 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR
func (s *workerServer) WorkerStatus(ctx context.Context, req *pb.StatusResponse) (*pb.WorkerStatusResponse, error) {
mu.Lock()
defer mu.Unlock()
// log.Printf("num queued tasks is currently: %v", numQueuedTasks)
//log.Printf("num queued tasks is currently: %v", numQueuedTasks)
return &pb.WorkerStatusResponse{
NumRunningTasks: numRunningTasks,
NumQueuedTasks: numQueuedTasks,
Expand Down

0 comments on commit fdec62a

Please sign in to comment.