Skip to content

Commit

Permalink
add manual testing/debugging for lineage recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-castellon committed Jun 3, 2024
1 parent bf17bd2 commit a800486
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 4 deletions.
12 changes: 12 additions & 0 deletions docker-compose.fig11.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3'

services:
worker1:
# -u for the python script to flush output immediately
command: ["/bin/sh", "-c", "python3 -u -m pythonserver.worker & ./go/bin/localobjstore & ./go/bin/localscheduler & sleep 5 && python3 -u scripts/fig11.py & sleep infinity"]
depends_on:
# - zookeeper
- worker2
- worker3
- gcs
- global_scheduler
1 change: 1 addition & 0 deletions go/cmd/gcsobjtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (s *GCSObjServer) RequestLocation(ctx context.Context, req *pb.RequestLocat
LocalLog("Starting get Node ID")
nodeId := s.getNodeId(uid)
LocalLog("finished get node ID")
LocalLog("node id = %v", nodeId)
if nodeId == nil {
// Add client to waiting list
if _, waiting := s.waitlist[uid]; !waiting {
Expand Down
5 changes: 4 additions & 1 deletion go/cmd/globalscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ func (s *server) Schedule(ctx context.Context , req *pb.GlobalScheduleRequest )
}
LocalLog("Got a global schedule request")
// gives us back the node id of the worker
node_id := getBestWorker(ctx, s, localityFlag, req.Uids)
node_id := req.NodeId
if (req.NodeId == 0) {
node_id = getBestWorker(ctx, s, localityFlag, req.Uids)
}
workerAddress := fmt.Sprintf("%s%d:%d", cfg.DNS.NodePrefix, node_id, cfg.Ports.LocalWorkerStart)

conn, err := grpc.Dial(workerAddress, grpc.WithInsecure())
Expand Down
16 changes: 16 additions & 0 deletions go/cmd/localscheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func (s *server) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.Sch
if err != nil {
LocalLog("cant hit gcs: %v", err)
}

// custom behavior if the client itself specifies where we should send this
// computation
if (req.NodeId != 0) {
LocalLog("Doing something special with req.NodeId = %v", req.NodeId)
go func() {
_, err := s.globalSchedulerClient.Schedule(s.globalCtx, &pb.GlobalScheduleRequest{Uid: uid, Name: req.Name, Args: req.Args, Kwargs: req.Kwargs, Uids: req.Uids, NodeId: req.NodeId})
if err != nil {
LocalLog("cannot contact global scheduler")
} else {
// LocalLog("Just ran it on global!")
}
}()
return &pb.ScheduleResponse{Uid: uid}, nil
}

if scheduleLocally.NumRunningTasks < MAX_TASKS {
// LocalLog("Just running locally")
go func() {
Expand Down
4 changes: 4 additions & 0 deletions proto/rayclient.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ message GlobalScheduleRequest {
bytes kwargs = 4;
repeated uint64 uids = 5;
bool newObject = 6;
// optional but in case the client wants to specify a node ID
// to send a computation to
uint64 nodeId = 7;
}

message HeartbeatRequest {
Expand All @@ -58,6 +61,7 @@ message ScheduleRequest {
bytes args = 2;
bytes kwargs = 3;
repeated uint64 uids = 4;
uint64 nodeId = 5;
}

// returns a Future, which is just the UID for
Expand Down
40 changes: 37 additions & 3 deletions python/babyray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,40 @@ class RemoteFunction:
def __init__(self, func):
self.func = func
self.name = None
self.node_id = None

def set_node(self, node_id):
# set the node ID to run this remote function on
self.node_id = node_id

def unset_node(self):
self.node_id = None

def remote(self, *args, **kwargs):
# do gRPC here
arg_uids = []
for arg in args:
if type(arg) is Future:
for arg in args:
if type(arg) is Future:
arg_uids.append(arg.uid)
if self.name is not None:
uid = local_scheduler_gRPC.Schedule(
rayclient_pb2.ScheduleRequest(
name=self.name, args=pickle.dumps(args), kwargs=pickle.dumps(kwargs), uids = arg_uids
name=self.name,
args=pickle.dumps(args),
kwargs=pickle.dumps(kwargs),
uids=arg_uids,
nodeId=self.node_id,
)
).uid

# uid = local_scheduler_gRPC.Schedule(
# rayclient_pb2.ScheduleRequest(
# name=self.name,
# args=pickle.dumps(args),
# kwargs=pickle.dumps(kwargs),
# uids=arg_uids,
# )
# ).uid
return Future(uid)

def register(self):
Expand Down Expand Up @@ -110,6 +131,19 @@ def get(futures):
return futures.get() if hasattr(futures, "get") else futures


def kill_node(node_id):
# kill node node_id

local_scheduler_channel = grpc.insecure_channel(
f"node{node_id}:{str(LOCAL_SCHEDULER_PORT)}"
)
node_local_scheduler_gRPC = rayclient_pb2_grpc.LocalSchedulerStub(
local_scheduler_channel
)

node_local_scheduler_gRPC.KillServer(rayclient_pb2.StatusResponse())


def demo():
# Example function to simulate Ray's behavior
@remote
Expand Down
33 changes: 33 additions & 0 deletions scripts/fig11.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import sys
from babyray import init, remote, get, Future, kill_node
import random
import time

from utils import *


init()


# client-side code


# ask for a node that is not ourself
@remote
def f():
time.sleep(5)
return 0


f.set_node(3)

fut = f.remote()

time.sleep(2)

kill_node(3)

# here: watch the docker compose logs to see if things go the way we expect

out = get(fut)
log("out", out)
23 changes: 23 additions & 0 deletions scripts/run_fig11.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
# scripts/run_tests.sh

# Build and run the tests
# docker-compose build
docker-compose -f docker-compose.yml -f docker-compose.fig11.yml up

# Wait for the services to be up and running
sleep 20

# Check the logs for the worker1 container to see if the tests passed
docker-compose logs worker1 | tee output.log

# Check if "All tests passed" is in the logs
if grep -q "1 passed" output.log; then
echo "All tests passed!"
docker-compose down
exit 0
else
echo "Some tests failed."
docker-compose down
exit 1
fi
11 changes: 11 additions & 0 deletions scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from datetime import datetime


def log(*s):
# Get the current datetime
current_datetime = datetime.now()

# Format the datetime
formatted_datetime = current_datetime.strftime("%Y/%m/%d %H:%M:%S.%f")[:-3]

print(formatted_datetime, *s, flush=True)

0 comments on commit a800486

Please sign in to comment.