Skip to content

Commit

Permalink
Windows compatibility (ray-project#57)
Browse files Browse the repository at this point in the history
* Add Python and Redis submodules, and remove old third-party modules

* Update VS projects (WARNING: references files that do not exist yet)

* Update code & add shims for APIs except AF_UNIX/{send,recv}msg()

* Minor style changes.
  • Loading branch information
mehrdadn authored and robertnishihara committed Nov 23, 2016
1 parent a93c6b7 commit 7237ec4
Show file tree
Hide file tree
Showing 65 changed files with 2,233 additions and 7,126 deletions.
8 changes: 8 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[submodule "src/common/thirdparty/redis"]
path = src/common/thirdparty/redis-windows
url = https://github.com/MSOpenTech/redis.git
ignore = all
[submodule "src/common/thirdparty/python"]
path = src/common/thirdparty/python
url = https://github.com/austinsc/python.git
ignore = all
240 changes: 62 additions & 178 deletions Ray.sln

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/common/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const unique_id NIL_ID = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
unique_id globally_unique_id(void) {
/* Use /dev/urandom for "real" randomness. */
int fd;
if ((fd = open("/dev/urandom", O_RDONLY)) == -1) {
int const flags = 0 /* for Windows compatibility */;
if ((fd = open("/dev/urandom", O_RDONLY, flags)) == -1) {
LOG_ERROR("Could not generate random number");
}
unique_id result;
Expand Down
11 changes: 10 additions & 1 deletion src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include <string.h>
#include <errno.h>
#include <inttypes.h>
#ifndef _WIN32
#include <execinfo.h>
#endif

#include "utarray.h"

Expand Down Expand Up @@ -53,7 +55,7 @@

#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_FATAL)
#define LOG_FATAL(M, ...)
#else
#elif defined(_EXECINFO_H) || !defined(_WIN32)
#define LOG_FATAL(M, ...) \
do { \
fprintf(stderr, "[FATAL] (%s:%d) " M "\n", __FILE__, __LINE__, \
Expand All @@ -63,6 +65,13 @@
backtrace_symbols_fd(buffer, calls, 1); \
exit(-1); \
} while (0);
#else
#define LOG_FATAL(M, ...) \
do { \
fprintf(stderr, "[FATAL] (%s:%d) " M "\n", __FILE__, __LINE__, \
##__VA_ARGS__); \
exit(-1); \
} while (0);
#endif

#define CHECKM(COND, M, ...) \
Expand Down
7 changes: 7 additions & 0 deletions src/common/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
#define EVENT_LOOP_H

#include <stdint.h>

#ifdef _WIN32
/* Quirks mean that Windows version needs to be included differently */
#include <hiredis/hiredis.h>
#include <ae.h>
#else
#include "ae/ae.h"
#endif

/* Unique timer ID that will be generated when the timer is added to the
* event loop. Will not be reused later on in another call
Expand Down
3 changes: 2 additions & 1 deletion src/common/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ int bind_inet_sock(const int port, bool shall_listen) {
close(socket_fd);
return -1;
}
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
int *const pon = (char const *) &on;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, pon, sizeof(on)) < 0) {
LOG_ERROR("setsockopt failed for port %d", port);
close(socket_fd);
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/python/common_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static long PyObjectID_hash(PyObjectID *self) {
}

static PyObject *PyObjectID_repr(PyObjectID *self) {
int hex_length = 2 * UNIQUE_ID_SIZE + 1;
enum { hex_length = 2 * UNIQUE_ID_SIZE + 1 };
char hex_id[hex_length];
sha1_to_hex(self->object_id.id, hex_id);
UT_string *repr;
Expand Down
4 changes: 3 additions & 1 deletion src/common/logging.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "logging.h"

#include <stdint.h>
#include <inttypes.h>
#include <stdint.h>
#include <sys/time.h>

#include <hiredis/hiredis.h>
#include <utstring.h>

Expand Down
69 changes: 69 additions & 0 deletions src/common/shims/windows/getopt.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* http://stackoverflow.com/a/17195644/541686 */

#include <string.h>
#include <stdio.h>

int opterr = 1, /* if error message should be printed */
optind = 1, /* index into parent argv vector */
optopt, /* character checked for validity */
optreset; /* reset getopt */
char *optarg; /* argument associated with option */

#define BADCH (int) '?'
#define BADARG (int) ':'
#define EMSG ""

/*
* getopt --
* Parse argc/argv argument vector.
*/
int getopt(int nargc, char *const nargv[], const char *ostr) {
static char *place = EMSG; /* option letter processing */
const char *oli; /* option letter list index */

if (optreset || !*place) { /* update scanning pointer */
optreset = 0;
if (optind >= nargc || *(place = nargv[optind]) != '-') {
place = EMSG;
return (-1);
}
if (place[1] && *++place == '-') { /* found "--" */
++optind;
place = EMSG;
return (-1);
}
} /* option letter okay? */
if ((optopt = (int) *place++) == (int) ':' || !(oli = strchr(ostr, optopt))) {
/*
* if the user didn't specify '-' as an option,
* assume it means -1.
*/
if (optopt == (int) '-')
return (-1);
if (!*place)
++optind;
if (opterr && *ostr != ':')
(void) printf("illegal option -- %c\n", optopt);
return (BADCH);
}
if (*++oli != ':') { /* don't need argument */
optarg = NULL;
if (!*place)
++optind;
} else { /* need an argument */
if (*place) /* no white space */
optarg = place;
else if (nargc <= ++optind) { /* no arg */
place = EMSG;
if (*ostr == ':')
return (BADARG);
if (opterr)
(void) printf("option requires an argument -- %c\n", optopt);
return (BADCH);
} else /* white space */
optarg = nargv[optind];
place = EMSG;
++optind;
}
return (optopt); /* dump back option letter */
}
4 changes: 4 additions & 0 deletions src/common/shims/windows/getopt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#ifndef GETOPT_H
#define GETOPT_H

#endif /* GETOPT_H */
208 changes: 208 additions & 0 deletions src/common/shims/windows/msg.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#include <sys/socket.h>

int socketpair(int domain, int type, int protocol, int sv[2]) {
if ((domain != AF_UNIX && domain != AF_INET) || type != SOCK_STREAM) {
return INVALID_SOCKET;
}
SOCKET sockets[2];
int r = dumb_socketpair(sockets);
sv[0] = (int) sockets[0];
sv[1] = (int) sockets[1];
return r;
}

#pragma comment(lib, "IPHlpAPI.lib")

struct _MIB_TCPROW2 {
DWORD dwState, dwLocalAddr, dwLocalPort, dwRemoteAddr, dwRemotePort,
dwOwningPid;
enum _TCP_CONNECTION_OFFLOAD_STATE dwOffloadState;
};

struct _MIB_TCPTABLE2 {
DWORD dwNumEntries;
struct _MIB_TCPROW2 table[1];
};

DECLSPEC_IMPORT ULONG WINAPI GetTcpTable2(struct _MIB_TCPTABLE2 *TcpTable,
PULONG SizePointer,
BOOL Order);

static DWORD getsockpid(SOCKET client) {
/* http://stackoverflow.com/a/25431340 */
DWORD pid = 0;

struct sockaddr_in Server = {0};
int ServerSize = sizeof(Server);

struct sockaddr_in Client = {0};
int ClientSize = sizeof(Client);

if ((getsockname(client, (struct sockaddr *) &Server, &ServerSize) == 0) &&
(getpeername(client, (struct sockaddr *) &Client, &ClientSize) == 0)) {
struct _MIB_TCPTABLE2 *TcpTable = NULL;
ULONG TcpTableSize = 0;
ULONG result;
do {
result = GetTcpTable2(TcpTable, &TcpTableSize, TRUE);
if (result != ERROR_INSUFFICIENT_BUFFER) {
break;
}
free(TcpTable);
TcpTable = (struct _MIB_TCPTABLE2 *) malloc(TcpTableSize);
} while (TcpTable != NULL);

if (result == NO_ERROR) {
for (DWORD dw = 0; dw < TcpTable->dwNumEntries; ++dw) {
struct _MIB_TCPROW2 *row = &(TcpTable->table[dw]);
if ((row->dwState == 5 /* MIB_TCP_STATE_ESTAB */) &&
(row->dwLocalAddr == Client.sin_addr.s_addr) &&
((row->dwLocalPort & 0xFFFF) == Client.sin_port) &&
(row->dwRemoteAddr == Server.sin_addr.s_addr) &&
((row->dwRemotePort & 0xFFFF) == Server.sin_port)) {
pid = row->dwOwningPid;
break;
}
}
}

free(TcpTable);
}

return pid;
}

ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) {
ssize_t result = -1;
struct cmsghdr *header = CMSG_FIRSTHDR(msg);
if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
/* We're trying to send over a handle of some kind.
* We have to look up which process we're communicating with,
* open a handle to it, and then duplicate our handle into it.
* However, the first two steps cannot be done atomically.
* Therefore, this code HAS A RACE CONDITIONS and is therefore NOT SECURE.
* In the absense of a malicious actor, though, it is exceedingly unlikely
* that the child process closes AND that its process ID is reassigned
* to another existing process.
*/
struct msghdr const old_msg = *msg;
int *const pfd = (int *) CMSG_DATA(header);
msg->msg_control = NULL;
msg->msg_controllen = 0;
WSAPROTOCOL_INFO protocol_info = {0};
BOOL const is_socket = !!FDAPI_GetSocketStatePtr(*pfd);
DWORD const target_pid = getsockpid(sockfd);
HANDLE target_process = NULL;
if (target_pid) {
if (!is_socket) {
/* This is a regular handle... fit it into the same struct */
target_process = OpenProcess(PROCESS_DUP_HANDLE, FALSE, target_pid);
if (target_process) {
if (DuplicateHandle(GetCurrentProcess(), (HANDLE)(intptr_t) *pfd,
target_process, (HANDLE *) &protocol_info, 0,
TRUE, DUPLICATE_SAME_ACCESS)) {
result = 0;
}
}
} else {
/* This is a socket... */
result = FDAPI_WSADuplicateSocket(*pfd, target_pid, &protocol_info);
}
}
if (result == 0) {
int const nbufs = msg->dwBufferCount + 1;
WSABUF *const bufs =
(struct _WSABUF *) _alloca(sizeof(*msg->lpBuffers) * nbufs);
bufs[0].buf = (char *) &protocol_info;
bufs[0].len = sizeof(protocol_info);
memcpy(&bufs[1], msg->lpBuffers,
msg->dwBufferCount * sizeof(*msg->lpBuffers));
DWORD nb;
msg->lpBuffers = bufs;
msg->dwBufferCount = nbufs;
GUID const wsaid_WSASendMsg = {
0xa441e712,
0x754f,
0x43ca,
{0x84, 0xa7, 0x0d, 0xee, 0x44, 0xcf, 0x60, 0x6d}};
typedef INT PASCAL WSASendMsg_t(
SOCKET s, LPWSAMSG lpMsg, DWORD dwFlags, LPDWORD lpNumberOfBytesSent,
LPWSAOVERLAPPED lpOverlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
WSASendMsg_t *WSASendMsg = NULL;
result = FDAPI_WSAIoctl(sockfd, SIO_GET_EXTENSION_FUNCTION_POINTER,
&wsaid_WSASendMsg, sizeof(wsaid_WSASendMsg),
&WSASendMsg, sizeof(WSASendMsg), &nb, NULL, 0);
if (result == 0) {
result = (*WSASendMsg)(sockfd, msg, flags, &nb, NULL, NULL) == 0
? (ssize_t)(nb - sizeof(protocol_info))
: 0;
}
}
if (result != 0 && target_process && !is_socket) {
/* we failed to send the handle, and it needs cleaning up! */
HANDLE duplicated_back = NULL;
if (DuplicateHandle(target_process, *(HANDLE *) &protocol_info,
GetCurrentProcess(), &duplicated_back, 0, FALSE,
DUPLICATE_CLOSE_SOURCE)) {
CloseHandle(duplicated_back);
}
}
if (target_process) {
CloseHandle(target_process);
}
*msg = old_msg;
}
return result;
}

ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) {
int result = -1;
struct cmsghdr *header = CMSG_FIRSTHDR(msg);
if (msg->msg_controllen &&
flags == 0 /* We can't send flags on Windows... */) {
struct msghdr const old_msg = *msg;
msg->msg_control = NULL;
msg->msg_controllen = 0;
WSAPROTOCOL_INFO protocol_info = {0};
int const nbufs = msg->dwBufferCount + 1;
WSABUF *const bufs =
(struct _WSABUF *) _alloca(sizeof(*msg->lpBuffers) * nbufs);
bufs[0].buf = (char *) &protocol_info;
bufs[0].len = sizeof(protocol_info);
memcpy(&bufs[1], msg->lpBuffers,
msg->dwBufferCount * sizeof(*msg->lpBuffers));
typedef INT PASCAL WSARecvMsg_t(
SOCKET s, LPWSAMSG lpMsg, LPDWORD lpNumberOfBytesRecvd,
LPWSAOVERLAPPED lpOverlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
WSARecvMsg_t *WSARecvMsg = NULL;
DWORD nb;
GUID const wsaid_WSARecvMsg = {
0xf689d7c8,
0x6f1f,
0x436b,
{0x8a, 0x53, 0xe5, 0x4f, 0xe3, 0x51, 0xc3, 0x22}};
result = FDAPI_WSAIoctl(sockfd, SIO_GET_EXTENSION_FUNCTION_POINTER,
&wsaid_WSARecvMsg, sizeof(wsaid_WSARecvMsg),
&WSARecvMsg, sizeof(WSARecvMsg), &nb, NULL, 0);
if (result == 0) {
result = (*WSARecvMsg)(sockfd, msg, &nb, NULL, NULL) == 0
? (ssize_t)(nb - sizeof(protocol_info))
: 0;
}
if (result == 0) {
int *const pfd = (int *) CMSG_DATA(header);
if (protocol_info.iSocketType == 0 && protocol_info.iProtocol == 0) {
*pfd = *(int *) &protocol_info;
} else {
*pfd = FDAPI_WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO, &protocol_info, 0, 0);
}
header->cmsg_level = SOL_SOCKET;
header->cmsg_type = SCM_RIGHTS;
}
*msg = old_msg;
}
return result;
}
4 changes: 4 additions & 0 deletions src/common/shims/windows/netdb.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#ifndef NETDB_H
#define NETDB_H

#endif /* NETDB_H */
4 changes: 4 additions & 0 deletions src/common/shims/windows/netinet/in.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#ifndef IN_H
#define IN_H

#endif /* IN_H */
4 changes: 4 additions & 0 deletions src/common/shims/windows/poll.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#ifndef POLL_H
#define POLL_H

#endif /* POLL_H */
Loading

0 comments on commit 7237ec4

Please sign in to comment.