Skip to content

Commit

Permalink
change gcs back to remove generating
Browse files Browse the repository at this point in the history
  • Loading branch information
trevleon committed Jun 3, 2024
1 parent fdec62a commit 3e26695
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
13 changes: 5 additions & 8 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ Assumes that s's mutex is locked.
*/
func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) {
nodeIds, exists := s.objectLocations[uid]

if !exists {
if !s.liveNodes[s.generating[uid]] {
return nil, true
}
if !exists || len(nodeIds) == 0 {
return nil, false
}

Expand All @@ -105,8 +101,6 @@ func (s *GCSObjServer) getNodeId(uid uint64) (*uint64, bool) {
return nil, true
}



// Note: policy is to pick a random one; in the future it will need to be locality-based
randomIndex := rand.Intn(len(nodesToReturn))
nodeId := &nodesToReturn[randomIndex]
Expand Down Expand Up @@ -190,14 +184,16 @@ func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocat
clientAddress := net.JoinHostPort(host, clientPort)

uid := req.Uid
LocalLog("Starting get Node ID")
nodeId, exists := s.getNodeId(uid)
LocalLog("finished get node ID")
if nodeId == nil {
// Add client to waiting list
if _, waiting := s.waitlist[uid]; !waiting {
s.waitlist[uid] = []string{} // Initialize slice if it doesn't exist
}
s.waitlist[uid] = append(s.waitlist[uid], clientAddress)

if exists {
_, err := s.globalSchedulerClient.Schedule(ctx, s.lineage[uid])
if err != nil {
Expand Down Expand Up @@ -241,6 +237,7 @@ func (s *GCSObjServer) RegisterLineage(ctx context.Context, req *pb.GlobalSchedu
}

func (s *GCSObjServer) RegisterLiveNodes(ctx context.Context, req *pb.LiveNodesRequest) (*pb.StatusResponse, error) {

s.liveNodes = req.LiveNodes
for uid, node := range s.generating {
if !s.liveNodes[node] {
Expand Down
15 changes: 13 additions & 2 deletions go/cmd/globalscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ const LIVE_NODE_TIMEOUT time.Duration = 400 * time.Millisecond
const HEARTBEAT_WAIT = 100 * time.Millisecond
var mu sync.RWMutex

// LocalLog formats the message and logs it with a specific prefix
func LocalLog(format string, v ...interface{}) {
var logMessage string
if len(v) == 0 {
logMessage = format // No arguments, use the format string as-is
} else {
logMessage = fmt.Sprintf(format, v...)
}
log.Printf("[localscheduler] %s", logMessage)
}

func main() {
ctx := context.Background()
cfg = config.GetConfig() // Load configuration
Expand Down Expand Up @@ -100,7 +111,7 @@ func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest )
if os.Getenv("LOCALITY_AWARE") == "true" {
localityFlag = true
}

LocalLog("Got a global schedule request")
// gives us back the node id of the worker
node_id := getBestWorker(ctx, s, localityFlag, req.Uids)
workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, node_id, cfg.Ports.LocalWorkerStart)
Expand All @@ -113,7 +124,7 @@ func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest )
defer conn.Close()

workerClient := pb.NewWorkerClient(conn)

LocalLog("Contacted the worker")
if req.NewObject {
s.gcsClient.RegisterGenerating(ctx, &pb.GeneratingRequest{Uid: req.Uid, NodeId: node_id})
}
Expand Down
4 changes: 2 additions & 2 deletions go/cmd/localscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch
if err != nil {
LocalLog("got error from WorkerStatus() in Sched: %v", err)
}
_ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs})
_ , err = s.gcsClient.RegisterLineage(ctx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, Uids: req.Uids})
if err != nil {
LocalLog("cant hit gcs: %v", err)
}
Expand All @@ -112,7 +112,7 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch
} else {
// LocalLog("contacting global scheduler")
go func() {
_, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, NewObject: true})
_, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, NewObject: true, Uids: req.Uids})
if err != nil {
LocalLog("cannot contact global scheduler")
} else {
Expand Down
3 changes: 2 additions & 1 deletion go/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func executeFunction(f []byte, args []byte, kwargs []byte) ([]byte, error) {
cmd.Stdin = inputBuffer

// Capture the output
LocalLog("Running the function here yo")
output, err := cmd.Output()
if err != nil {
log.Fatalf("Error executing function: %v", err)
Expand Down Expand Up @@ -175,7 +176,7 @@ func (s *workerServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.StatusR
LocalLog("failed to hit gcs %v", err)
return nil, err
}

runningTime := float32(time.Since(start).Seconds())
mu.Lock()
averageRunningTime = EMA_PARAM*averageRunningTime + (1-EMA_PARAM)*runningTime
Expand Down

0 comments on commit 3e26695

Please sign in to comment.