Skip to content

Commit

Permalink
Update apache arrow to include TensorFlow fix (ray-project#2345)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Jul 6, 2018
1 parent 4185aae commit fbde8ca
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 94 deletions.
95 changes: 50 additions & 45 deletions src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ void process_status_request(ClientConnection *client_conn, ObjectID object_id);
* @param context Client connection.
* @return Status of object_id as defined in plasma.h
*/
int request_status(ObjectID object_id,
const std::vector<DBClientID> &manager_vector,
void *context);
ObjectStatus request_status(ObjectID object_id,
const std::vector<DBClientID> &manager_vector,
void *context);

/**
* Send requested object_id back to the Plasma Manager identified
Expand Down Expand Up @@ -316,12 +316,13 @@ bool ClientConnection_request_finished(ClientConnection *client_conn) {
return client_conn->cursor == -1;
}

std::unordered_map<ObjectID, std::vector<WaitRequest *>> &
object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {
std::unordered_map<ObjectID, std::vector<WaitRequest *>>
&object_wait_requests_from_type(PlasmaManagerState *manager_state,
plasma::ObjectRequestType type) {
/* We use different types of hash tables for different requests. */
RAY_CHECK(type == plasma::PLASMA_QUERY_LOCAL ||
type == plasma::PLASMA_QUERY_ANYWHERE);
if (type == plasma::PLASMA_QUERY_LOCAL) {
RAY_CHECK(type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL ||
type == plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE);
if (type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) {
return manager_state->object_wait_requests_local;
} else {
return manager_state->object_wait_requests_remote;
Expand All @@ -330,7 +331,7 @@ object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {

void add_wait_request_for_object(PlasmaManagerState *manager_state,
ObjectID object_id,
int type,
plasma::ObjectRequestType type,
WaitRequest *wait_req) {
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
Expand All @@ -343,7 +344,7 @@ void add_wait_request_for_object(PlasmaManagerState *manager_state,

void remove_wait_request_for_object(PlasmaManagerState *manager_state,
ObjectID object_id,
int type,
plasma::ObjectRequestType type,
WaitRequest *wait_req) {
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
Expand Down Expand Up @@ -392,8 +393,8 @@ void return_from_wait(PlasmaManagerState *manager_state,

void update_object_wait_requests(PlasmaManagerState *manager_state,
ObjectID obj_id,
int type,
int status) {
plasma::ObjectRequestType type,
ObjectStatus status) {
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
/* Update the in-progress wait requests in the specified table. */
Expand All @@ -417,7 +418,7 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
/* Check that we found the object. */
RAY_CHECK(object_request != wait_req->object_requests.end());
/* Check that the object found was not previously known to us. */
RAY_CHECK(object_request->second.status == ObjectStatus_Nonexistent);
RAY_CHECK(object_request->second.status == ObjectStatus::Nonexistent);
/* Update the found object's status to a known status. */
object_request->second.status = status;

Expand Down Expand Up @@ -608,13 +609,13 @@ void send_queued_request(event_loop *loop,
PlasmaRequestBuffer *buf = conn->transfer_queue.front();
int err = 0;
switch (buf->type) {
case MessageType_PlasmaDataRequest:
case MessageType::PlasmaDataRequest:
err = handle_sigpipe(
plasma::SendDataRequest(conn->fd, buf->object_id.to_plasma_id(),
state->addr, state->port),
conn->fd);
break;
case MessageType_PlasmaDataReply:
case MessageType::PlasmaDataReply:
RAY_LOG(DEBUG) << "Transferring object to manager";
if (ClientConnection_request_finished(conn)) {
/* If the cursor is not set, we haven't sent any requests for this object
Expand All @@ -635,7 +636,7 @@ void send_queued_request(event_loop *loop,

/* If the other side hung up, stop sending to this manager. */
if (err != 0) {
if (buf->type == MessageType_PlasmaDataReply) {
if (buf->type == MessageType::PlasmaDataReply) {
/* We errored while sending the object, so release it before removing the
* connection. The corresponding call to plasma_get occurred in
* process_transfer_request. */
Expand All @@ -646,7 +647,7 @@ void send_queued_request(event_loop *loop,
ClientConnection_free(conn);
} else if (ClientConnection_request_finished(conn)) {
/* If we are done with this request, remove it from the transfer queue. */
if (buf->type == MessageType_PlasmaDataReply) {
if (buf->type == MessageType::PlasmaDataReply) {
/* We are done sending the object, so release it. The corresponding call
* to plasma_get occurred in process_transfer_request. */
ARROW_CHECK_OK(conn->manager_state->plasma_conn->Release(
Expand Down Expand Up @@ -827,7 +828,7 @@ void process_transfer_request(event_loop *loop,
RAY_CHECK(object_buffer.metadata->data() ==
object_buffer.data->data() + object_buffer.data->size());
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
buf->type = MessageType_PlasmaDataReply;
buf->type = MessageType::PlasmaDataReply;
buf->object_id = obj_id;
/* We treat buf->data as a pointer to the concatenated data and metadata, so
* we don't actually use buf->metadata. */
Expand Down Expand Up @@ -938,7 +939,7 @@ void request_transfer_from(PlasmaManagerState *manager_state,
}

PlasmaRequestBuffer *transfer_request = new PlasmaRequestBuffer();
transfer_request->type = MessageType_PlasmaDataRequest;
transfer_request->type = MessageType::PlasmaDataRequest;
transfer_request->object_id = fetch_req->object_id;

if (manager_conn->transfer_queue.size() == 0) {
Expand Down Expand Up @@ -1085,8 +1086,8 @@ void object_table_subscribe_callback(ObjectID object_id,
}
/* Run the callback for wait requests. */
update_object_wait_requests(manager_state, object_id,
plasma::PLASMA_QUERY_ANYWHERE,
ObjectStatus_Remote);
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE,
ObjectStatus::Remote);
}

void process_fetch_requests(ClientConnection *client_conn,
Expand Down Expand Up @@ -1167,7 +1168,7 @@ void process_wait_request(ClientConnection *client_conn,
/* Check if this object is already present locally. If so, mark the object
* as present. */
if (is_object_local(manager_state, obj_id)) {
object_request.status = ObjectStatus_Local;
object_request.status = ObjectStatus::Local;
wait_req->num_satisfied += 1;
continue;
}
Expand All @@ -1176,10 +1177,11 @@ void process_wait_request(ClientConnection *client_conn,
add_wait_request_for_object(manager_state, obj_id, object_request.type,
wait_req);

if (object_request.type == plasma::PLASMA_QUERY_LOCAL) {
if (object_request.type == plasma::ObjectRequestType::PLASMA_QUERY_LOCAL) {
/* TODO(rkn): If desired, we could issue a fetch command here to retrieve
* the object. */
} else if (object_request.type == plasma::PLASMA_QUERY_ANYWHERE) {
} else if (object_request.type ==
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE) {
/* Add this object ID to the list of object IDs to request notifications
* for from the object table. */
object_ids_to_request[num_object_ids_to_request] = obj_id;
Expand Down Expand Up @@ -1228,34 +1230,35 @@ void request_status_done(ObjectID object_id,
const std::vector<DBClientID> &manager_vector,
void *context) {
ClientConnection *client_conn = (ClientConnection *) context;
int status = request_status(object_id, manager_vector, context);
int status =
static_cast<int>(request_status(object_id, manager_vector, context));
plasma::ObjectID object_id_copy = object_id.to_plasma_id();
handle_sigpipe(
plasma::SendStatusReply(client_conn->fd, &object_id_copy, &status, 1),
client_conn->fd);
}

int request_status(ObjectID object_id,
const std::vector<DBClientID> &manager_vector,
void *context) {
ObjectStatus request_status(ObjectID object_id,
const std::vector<DBClientID> &manager_vector,
void *context) {
ClientConnection *client_conn = (ClientConnection *) context;

/* Return success immediately if we already have this object. */
if (is_object_local(client_conn->manager_state, object_id)) {
return ObjectStatus_Local;
return ObjectStatus::Local;
}

/* Since object is not stored at the local locally, manager_vector.size() > 0
* means that the object is stored at another remote object. Otherwise, if
* manager_vector.size() == 0, the object is not stored anywhere. */
return (manager_vector.size() > 0 ? ObjectStatus_Remote
: ObjectStatus_Nonexistent);
return manager_vector.size() > 0 ? ObjectStatus::Remote
: ObjectStatus::Nonexistent;
}

void object_table_lookup_fail_callback(ObjectID object_id,
void *user_context,
void *user_data) {
/* Fail for now. Later, we may want to send a ObjectStatus_Nonexistent to the
/* Fail for now. Later, we may want to send a ObjectStatus::Nonexistent to the
* client. */
RAY_CHECK(0);
}
Expand All @@ -1264,15 +1267,15 @@ void process_status_request(ClientConnection *client_conn,
plasma::ObjectID object_id) {
/* Return success immediately if we already have this object. */
if (is_object_local(client_conn->manager_state, object_id)) {
int status = ObjectStatus_Local;
int status = static_cast<int>(ObjectStatus::Local);
handle_sigpipe(
plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1),
client_conn->fd);
return;
}

if (client_conn->manager_state->db == NULL) {
int status = ObjectStatus_Nonexistent;
auto status = static_cast<int>(ObjectStatus::Nonexistent);
handle_sigpipe(
plasma::SendStatusReply(client_conn->fd, &object_id, &status, 1),
client_conn->fd);
Expand Down Expand Up @@ -1369,10 +1372,12 @@ void process_add_object_notification(PlasmaManagerState *state,
}

/* Update the in-progress local and remote wait requests. */
update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_LOCAL,
ObjectStatus_Local);
update_object_wait_requests(state, object_id, plasma::PLASMA_QUERY_ANYWHERE,
ObjectStatus_Local);
update_object_wait_requests(state, object_id,
plasma::ObjectRequestType::PLASMA_QUERY_LOCAL,
ObjectStatus::Local);
update_object_wait_requests(state, object_id,
plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE,
ObjectStatus::Local);
}

void process_object_notification(event_loop *loop,
Expand Down Expand Up @@ -1473,8 +1478,8 @@ void process_message(event_loop *loop,
uint8_t *data;
read_message(client_sock, &type, &length, &data);

switch (type) {
case MessageType_PlasmaDataRequest: {
switch (static_cast<MessageType>(type)) {
case MessageType::PlasmaDataRequest: {
RAY_LOG(DEBUG) << "Processing data request";
plasma::ObjectID object_id;
char *address;
Expand All @@ -1484,7 +1489,7 @@ void process_message(event_loop *loop,
process_transfer_request(loop, object_id, address, port, conn);
free(address);
} break;
case MessageType_PlasmaDataReply: {
case MessageType::PlasmaDataReply: {
RAY_LOG(DEBUG) << "Processing data reply";
plasma::ObjectID object_id;
int64_t object_size;
Expand All @@ -1494,7 +1499,7 @@ void process_message(event_loop *loop,
process_data_request(loop, client_sock, object_id, object_size,
metadata_size, conn);
} break;
case MessageType_PlasmaFetchRequest: {
case MessageType::PlasmaFetchRequest: {
RAY_LOG(DEBUG) << "Processing fetch remote";
std::vector<plasma::ObjectID> object_ids_to_fetch;
/* TODO(pcm): process_fetch_requests allocates an array of num_objects
Expand All @@ -1503,7 +1508,7 @@ void process_message(event_loop *loop,
process_fetch_requests(conn, object_ids_to_fetch.size(),
object_ids_to_fetch.data());
} break;
case MessageType_PlasmaWaitRequest: {
case MessageType::PlasmaWaitRequest: {
RAY_LOG(DEBUG) << "Processing wait";
plasma::ObjectRequestMap object_requests;
int64_t timeout_ms;
Expand All @@ -1513,13 +1518,13 @@ void process_message(event_loop *loop,
process_wait_request(conn, std::move(object_requests), timeout_ms,
num_ready_objects);
} break;
case MessageType_PlasmaStatusRequest: {
case MessageType::PlasmaStatusRequest: {
RAY_LOG(DEBUG) << "Processing status";
plasma::ObjectID object_id;
ARROW_CHECK_OK(plasma::ReadStatusRequest(data, length, &object_id, 1));
process_status_request(conn, object_id);
} break;
case static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT): {
case static_cast<MessageType>(CommonMessageType::DISCONNECT_CLIENT): {
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
event_loop_remove_file(loop, client_sock);
ClientConnection_free(conn);
Expand Down
5 changes: 2 additions & 3 deletions src/plasma/plasma_manager.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef PLASMA_MANAGER_H
#define PLASMA_MANAGER_H

#include <poll.h>

#ifndef RAY_NUM_RETRIES
#define NUM_RETRIES 5
#else
Expand All @@ -11,6 +9,7 @@

typedef struct PlasmaManagerState PlasmaManagerState;
typedef struct ClientConnection ClientConnection;
enum class MessageType : int64_t;

/**
* Initializes the plasma manager state. This connects the manager to the local
Expand Down Expand Up @@ -156,7 +155,7 @@ ClientConnection *ClientConnection_listen(event_loop *loop,

/* Buffer for requests between plasma managers. */
typedef struct PlasmaRequestBuffer {
int type;
MessageType type;
ray::ObjectID object_id;
uint8_t *data;
int64_t data_size;
Expand Down
Loading

0 comments on commit fbde8ca

Please sign in to comment.