Skip to content

Commit

Permalink
[XRay] Register object store and raylet with the GCS (ray-project#1860)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Apr 10, 2018
1 parent 7c9e291 commit 834e594
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 8 deletions.
4 changes: 2 additions & 2 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,14 +1401,14 @@ def start_ray_processes(address_info=None,
raylet_stdout_file, raylet_stderr_file = (
new_log_files("raylet_{}".format(i),
redirect_output=redirect_output))
address_info["raylet_socket_name"] = start_raylet(
address_info["raylet_socket_names"] = [start_raylet(
redis_address,
node_ip_address,
object_store_addresses[i].name,
worker_path,
stdout_file=None,
stderr_file=None,
cleanup=cleanup)
cleanup=cleanup)]

if not use_raylet:
# Start any workers that the local scheduler has not already started.
Expand Down
11 changes: 6 additions & 5 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,11 +1241,12 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address,
redis_ip_address == ray.services.get_node_ip_address())):
raylets.append(client)

# TODO(rkn): The ObjectStoreSocketName field does not exist.
object_store_addresses = [
raylet.ObjectStoreSocketName().decode("ascii")
for raylet in raylets]
raylet_socket_names = [raylet.NodeManagerAddress().decode("ascii") for
services.ObjectStoreAddress(
name=raylet.ObjectStoreSocketName().decode("ascii"),
manager_name=None,
manager_port=None) for raylet in raylets]
raylet_socket_names = [raylet.RayletSocketName().decode("ascii") for
raylet in raylets]
return {"node_ip_address": node_ip_address,
"redis_address": redis_address,
Expand Down Expand Up @@ -1515,7 +1516,7 @@ def _init(address_info=None,
address_info["local_scheduler_socket_names"][0])
else:
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_name"])
address_info["raylet_socket_names"][0])
connect(driver_address_info, object_id_seed=object_id_seed,
mode=driver_mode, worker=global_worker, use_raylet=use_raylet)
return address_info
Expand Down
4 changes: 4 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ table ClientTableData {
client_id: string;
// The IP address of the client's node manager.
node_manager_address: string;
// The IPC socket name of the client's raylet.
raylet_socket_name: string;
// The IPC socket name of the client's plasma store.
object_store_socket_name: string;
// The port at which the client's node manager is listening for TCP
// connections from other node managers.
node_manager_port: int;
Expand Down
9 changes: 8 additions & 1 deletion src/ray/raylet/raylet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Raylet::Raylet(boost::asio::io_service &main_service,
object_manager_(main_service, std::move(object_manager_service),
object_manager_config, gcs_client),
node_manager_(main_service, node_manager_config, object_manager_, gcs_client_),
socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
object_manager_acceptor_(
Expand All @@ -35,7 +36,9 @@ Raylet::Raylet(boost::asio::io_service &main_service,
DoAcceptObjectManager();
DoAcceptNodeManager();

RAY_CHECK_OK(RegisterGcs(node_ip_address, redis_address, redis_port, main_service,
RAY_CHECK_OK(RegisterGcs(node_ip_address, socket_name_,
object_manager_config.store_socket_name,
redis_address, redis_port, main_service,
node_manager_config));

RAY_CHECK_OK(RegisterPeriodicTimer(main_service));
Expand All @@ -52,6 +55,8 @@ ray::Status Raylet::RegisterPeriodicTimer(boost::asio::io_service &io_service) {
}

ray::Status Raylet::RegisterGcs(const std::string &node_ip_address,
const std::string &raylet_socket_name,
const std::string &object_store_socket_name,
const std::string &redis_address, int redis_port,
boost::asio::io_service &io_service,
const NodeManagerConfig &node_manager_config) {
Expand All @@ -60,6 +65,8 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address,

ClientTableDataT client_info = gcs_client_->client_table().GetLocalClient();
client_info.node_manager_address = node_ip_address;
client_info.raylet_socket_name = raylet_socket_name;
client_info.object_store_socket_name = object_store_socket_name;
client_info.object_manager_port = object_manager_acceptor_.local_endpoint().port();
client_info.node_manager_port = node_manager_acceptor_.local_endpoint().port();
// Add resource information.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/raylet.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class Raylet {
private:
/// Register GCS client.
ray::Status RegisterGcs(const std::string &node_ip_address,
const std::string &raylet_socket_name,
const std::string &object_store_socket_name,
const std::string &redis_address, int redis_port,
boost::asio::io_service &io_service, const NodeManagerConfig &);

Expand All @@ -71,6 +73,8 @@ class Raylet {
ObjectManager object_manager_;
/// Manages client requests for task submission and execution.
NodeManager node_manager_;
/// The name of the socket this raylet listens on.
std::string socket_name_;

/// An acceptor for new clients.
boost::asio::local::stream_protocol::acceptor acceptor_;
Expand Down

0 comments on commit 834e594

Please sign in to comment.