Skip to content

Commit

Permalink
naive initial merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-castellon committed Jun 3, 2024
2 parents 392aa33 + 0be7190 commit 0e1b825
Show file tree
Hide file tree
Showing 26 changed files with 966 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ENV PROJECT_ROOT=/app
FROM base as driver

# install necessary Python packages to run anything
RUN python3 -m pip install dill cloudpickle --break-system-packages
RUN python3 -m pip install cloudpickle posix_ipc --break-system-packages
RUN cd python && python3 -m pip install -e . --break-system-packages

# install basic necessities to actually do driver stuff
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ py: proto
@echo "Modifying import statements for relative imports..."
# below line is now compatible with both MacOS (BSD) and GNU
sed -i'' -e 's/import rayclient_pb2 as rayclient__pb2/from . import rayclient_pb2 as rayclient__pb2/' python/babyray/rayclient_pb2_grpc.py
# need to copy these to pythonserver/, for worker.py
cp python/babyray/rayclient_pb2_grpc.py pythonserver
cp python/babyray/rayclient_pb2.py pythonserver


build: servers

Expand Down
2 changes: 2 additions & 0 deletions config/app_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
num_worker_nodes: 5
# simulating zero latency with some data throughput speed
simulated_delay: 0.0
node_ids:
gcs: 0
global_scheduler: 1
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.fig8a.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3'

services:
worker1:
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep 5 && python3 -u scripts/fig8a.py & sleep infinity"]
depends_on:
# - zookeeper
- worker2
- worker3
- gcs
- global_scheduler

# worker2:
# command: ["/bin/sh", "-c", "./go/bin/localscheduler & ./go/bin/localobjstore & ./go/bin/worker"]
#
# worker3:
# command: ["/bin/sh", "-c", "./go/bin/localscheduler & ./go/bin/localobjstore & ./go/bin/worker"]

11 changes: 11 additions & 0 deletions docker-compose.fig9.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3'

services:
worker1:
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep 5 && python3 -u scripts/fig9.py & sleep infinity"]
depends_on:
# - zookeeper
- worker2
- worker3
- gcs
- global_scheduler
24 changes: 21 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
version: '3'

services:
# zookeeper:
# image: zookeeper:3.6.3
# ports:
# - "2181:2181"
# networks:
# mynetwork:

gcs:
image: ray-node:base
command: ["/bin/sh", "-c", "./go/bin/gcsfunctable & ./go/bin/gcsobjtable"]
networks:
mynetwork:
aliases:
- node0
# depends_on:
# - zookeeper
ports:
- "50000:50000"

Expand All @@ -25,36 +34,45 @@ services:

worker1:
image: ray-node:driver
command: ["/bin/sh", "-c", "./go/bin/worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
networks:
mynetwork:
aliases:
- node2
# depends_on:
# - zookeeper
shm_size: '4g'
ports:
- "50002:50002"
environment:
NODE_ID: 2

worker2:
image: ray-node:driver
command: ["/bin/sh", "-c", "./go/bin/worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
networks:
mynetwork:
aliases:
- node3
# depends_on:
# - zookeeper
shm_size: '4g'
ports:
- "50003:50003"
environment:
NODE_ID: 3

worker3:
image: ray-node:driver
command: ["/bin/sh", "-c", "./go/bin/worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep infinity"]
#command: ["/bin/sh", "-c", "./go/bin/localscheduler & ./go/bin/localobjstore & ./go/bin/worker"]
networks:
mynetwork:
aliases:
- node4
# depends_on:
# - zookeeper
shm_size: '4g'
ports:
- "50004:50004"
environment:
Expand Down
3 changes: 2 additions & 1 deletion go/cmd/gcsfunctable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"
"github.com/rodrigo-castellon/babyray/util"

"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
Expand All @@ -26,7 +27,7 @@ func main() {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
s := grpc.NewServer(util.GetServerOptions()...)
pb.RegisterGCSFuncServer(s, NewGCSFuncServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
Expand Down
20 changes: 15 additions & 5 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"strconv"
"sync"
"fmt"

"github.com/rodrigo-castellon/babyray/config"
"github.com/rodrigo-castellon/babyray/util"
"github.com/rodrigo-castellon/babyray/customlog"
pb "github.com/rodrigo-castellon/babyray/pkg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/peer"

"google.golang.org/grpc/codes"
Expand All @@ -32,10 +33,11 @@ func LocalLog(format string, v ...interface{}) {
} else {
logMessage = fmt.Sprintf(format, v...)
}
log.Printf("[worker] %s", logMessage)
log.Printf("[gcsobjtable] %s", logMessage)
}

func main() {
customlog.Init()
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GCSObjectTable) // Prepare the network address

Expand All @@ -44,7 +46,7 @@ func main() {
log.Fatalf("failed to listen: %v", err)
}
_ = lis
s := grpc.NewServer()
s := grpc.NewServer(util.GetServerOptions()...)
pb.RegisterGCSObjServer(s, NewGCSObjServer())
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
Expand Down Expand Up @@ -126,7 +128,7 @@ func (s *GCSObjServer) sendCallback(clientAddress string, uid uint64, nodeId uin
// Set up a new gRPC connection to the client
// TODO: Refactor to save gRPC connections rather than creating a new one each time
// Dial is lazy-loading, but we should still save the connection for future use
conn, err := grpc.Dial(clientAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(clientAddress, util.GetDialOptions()...)
if err != nil {
// Log the error instead of returning it
log.Printf("Failed to connect back to client at %s: %v", clientAddress, err)
Expand All @@ -150,6 +152,8 @@ func (s *GCSObjServer) NotifyOwns(ctx context.Context, req *pb.NotifyOwnsRequest
s.mu.Lock()
defer s.mu.Unlock()

log.Printf("WAS JUST NOTIFYOWNS()ED")

uid, nodeId := req.Uid, req.NodeId
delete(s.generating, uid)
// Append the nodeId to the list for the given object uid
Expand Down Expand Up @@ -234,6 +238,12 @@ func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocat

func (s *GCSObjServer) GetObjectLocations(ctx context.Context, req *pb.ObjectLocationsRequest) (*pb.ObjectLocationsResponse, error) {
locations := make(map[uint64]*pb.LocationByteTuple)
// log.Printf("DEEP PRINT!")
// log.Printf("length = %v", len(s.objectLocations))
// for k, v := range s.objectLocations {
// log.Printf("s.objectLocations[%v] = %v", k, v)
// }

for _, u := range req.Args {
if _,ok := s.objectLocations[uint64(u)]; ok {
locations[uint64(u)] = &pb.LocationByteTuple{Locations: s.objectLocations[uint64(u)], Bytes: s.objectSizes[uint64(u)]}
Expand Down
85 changes: 67 additions & 18 deletions go/cmd/globalscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ import (
"log"
"net"
"strconv"
// "math/rand"
"math/rand"
"math"
// "bytes"

"os"
// "os"
// "time"
// "os"
"sync"
"time"
"fmt"
"google.golang.org/grpc"
pb "github.com/rodrigo-castellon/babyray/pkg"
"github.com/rodrigo-castellon/babyray/config"
"github.com/rodrigo-castellon/babyray/customlog"
"github.com/rodrigo-castellon/babyray/util"

// "github.com/go-zookeeper/zk"
)
var cfg *config.Config
const LIVE_NODE_TIMEOUT time.Duration = 400 * time.Millisecond
Expand All @@ -34,6 +40,7 @@ func LocalLog(format string, v ...interface{}) {
}

func main() {
customlog.Init()
ctx := context.Background()
cfg = config.GetConfig() // Load configuration
address := ":" + strconv.Itoa(cfg.Ports.GlobalScheduler) // Prepare the network address
Expand All @@ -43,14 +50,14 @@ func main() {
log.Fatalf("failed to listen: %v", err)
}
_ = lis;
s := grpc.NewServer()
s := grpc.NewServer(util.GetServerOptions()...)
gcsAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, cfg.NodeIDs.GCS, cfg.Ports.GCSObjectTable)
conn, _ := grpc.Dial(gcsAddress, grpc.WithInsecure())
server := &server{gcsClient: pb.NewGCSObjClient(conn), status: make(map[uint64]HeartbeatEntry)}
pb.RegisterGlobalSchedulerServer(s, server)
conn, _ := grpc.Dial(gcsAddress, util.GetDialOptions()...)
serverInstance := &server{gcsClient: pb.NewGCSObjClient(conn), status: make(map[uint64]HeartbeatEntry)}
pb.RegisterGlobalSchedulerServer(s, serverInstance)
defer conn.Close()
log.Printf("server listening at %v", lis.Addr())
go server.LiveNodesHeartbeat(ctx)
go serverInstance.LiveNodesHeartbeat(ctx)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
Expand Down Expand Up @@ -81,6 +88,11 @@ type ObjClient interface {
func (s *server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest ) (*pb.StatusResponse, error) {
//log.Printf("heartbeat from %v", req.NodeId)
mu.Lock()
numQueuedTasks := req.QueuedTasks
if (req.RunningTasks == 10) {
numQueuedTasks = numQueuedTasks + 1 // also need to wait for something currently running to finish
}

s.status[req.NodeId] = HeartbeatEntry{timeReceived: time.Now(), numRunningTasks: req.RunningTasks, numQueuedTasks: req.QueuedTasks, avgRunningTime: req.AvgRunningTime, avgBandwidth: req.AvgBandwidth}
mu.Unlock()
return &pb.StatusResponse{Success: true}, nil
Expand Down Expand Up @@ -113,19 +125,26 @@ func(s *server) SendLiveNodes(ctx context.Context) (error) {
}

func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest ) (*pb.StatusResponse, error) {
localityFlag := false
if os.Getenv("LOCALITY_AWARE") == "true" {
localityFlag = true
}
LocalLog("Got a global schedule request")
localityFlag := req.LocalityFlag
// localityFlag := false
// if os.Getenv("LOCALITY_AWARE") == "true" {
// localityFlag = true
// }

log.Printf("locality aware? it's: %v", localityFlag)

log.Printf("THE REQ UIDS ARE = %v", req.Uids)

// gives us back the node id of the worker
node_id := req.NodeId
if (req.NodeId == 0) {
node_id = getBestWorker(ctx, s, localityFlag, req.Uids)
}

log.Printf("best worker was node_id = %v", node_id)
workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, node_id, cfg.Ports.LocalWorkerStart)

conn, err := grpc.Dial(workerAddress, grpc.WithInsecure())
conn, err := grpc.Dial(workerAddress, util.GetDialOptions()...)
if err != nil {
log.Printf("failed to connect to %s: %v", workerAddress, err)
return nil, err
Expand All @@ -152,7 +171,9 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
var foundBest bool

minTime = math.MaxFloat32
if localityFlag {
if (localityFlag && len(uids) > 0) {
log.Printf("LOCALITY FLAG IS ON!")
log.Printf("ASKING THE GCS FOR THESE OBJECTS: %v", uids)
locationsResp, err := s.gcsClient.GetObjectLocations(ctx, &pb.ObjectLocationsRequest{Args: uids})
if err != nil {
log.Fatalf("Failed to ask gcs for object locations: %v", err)
Expand All @@ -169,6 +190,7 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
total = 0
for _, val := range locationsResp.Locations {
locs := val.Locations
log.Printf("the locations resp val = %v", val)
for _, loc := range locs {
locationToBytes[uint64(loc)] += val.Bytes
total += val.Bytes
Expand All @@ -182,9 +204,11 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
}
mu.RLock()
queueingTime := float32(s.status[loc].numQueuedTasks) * s.status[loc].avgRunningTime
transferTime := float32(total - bytes) * s.status[loc].avgBandwidth
transferTime := float32(total - bytes) / s.status[loc].avgBandwidth
mu.RUnlock()

log.Printf("worker = %v; queueing and transfer = %v, %v", loc, queueingTime, transferTime)

waitingTime := queueingTime + transferTime
if waitingTime < minTime {
minTime = waitingTime
Expand All @@ -193,21 +217,46 @@ func getBestWorker(ctx context.Context, s *server, localityFlag bool, uids []uin
}
}
} else {
log.Printf("doing the statuses rn")
// TODO: make iteration order random for maximum fairness
mu.RLock()
for id, heartbeat := range s.status {
// skip dead nodes

// Collect keys from the map
keys := make([]uint64, 0, len(s.status))
for id := range s.status {
keys = append(keys, id)
}

// Shuffle the keys
rand.Shuffle(len(keys), func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})

for _, id := range keys {
heartbeat := s.status[id]
if !(time.Since(heartbeat.timeReceived) < LIVE_NODE_TIMEOUT) {
continue
}

if float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime < minTime {
log.Printf("worker = %v; queued time = %v", id, float32(heartbeat.numQueuedTasks)*heartbeat.avgRunningTime)
if float32(heartbeat.numQueuedTasks)*heartbeat.avgRunningTime < minTime {
minId = id
minTime = float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime
foundBest = true
}
}
mu.RUnlock()

// mu.RLock()
// for id, heartbeat := range s.status {
// log.Printf("worker = %v; queued time = %v", id, float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime)
// if float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime < minTime {
// minId = id
// minTime = float32(heartbeat.numQueuedTasks) * heartbeat.avgRunningTime
// foundBest = true
// }
// }
// mu.RUnlock()
}

if !foundBest {
Expand Down
Loading

0 comments on commit 0e1b825

Please sign in to comment.