From 5cd3cef10580f070eaca728bc624cab13e7c5a88 Mon Sep 17 00:00:00 2001 From: Danielle Church Date: Fri, 6 Aug 2021 18:42:50 -0400 Subject: [PATCH 1/3] ADDED: extra_streams() option to process_create/3 This enables the Unix-style pattern of passing additional file descriptors to child processes beyond stdin, stdout, and stderr, so that parent and child can communicate on multiple streams. For example, a Prolog program could launch a fully-interactive child process, passing stdin/stdout/stderr connected to the TTY, while reserving a separate pair of descriptors for programmatic communication with the child. --- config.h.cmake | 3 +- process.c | 506 +++++++++++++++++++++++++++--------------------- process.pl | 40 +++- test_process.pl | 35 ++++ 4 files changed, 363 insertions(+), 221 deletions(-) diff --git a/config.h.cmake b/config.h.cmake index 0290c8a..33b05a7 100644 --- a/config.h.cmake +++ b/config.h.cmake @@ -26,9 +26,10 @@ #cmakedefine HAVE_MEMORY_H @HAVE_MEMORY_H@ #cmakedefine HAVE_NETINET_TCP_H @HAVE_NETINET_TCP_H@ #cmakedefine HAVE_OPEN_MEMSTREAM @HAVE_OPEN_MEMSTREAM@ -#cmakedefine HAVE_PIPE @HAVE_PIPE@2 +#cmakedefine HAVE_PIPE2 @HAVE_PIPE2@ #cmakedefine HAVE_POLL @HAVE_POLL@ #cmakedefine HAVE_POLL_H @HAVE_POLL_H@ +#cmakedefine HAVE_POSIX_SPAWN @HAVE_POSIX_SPAWN@ #cmakedefine HAVE_PRCTL @HAVE_PRCTL@ #cmakedefine HAVE_SETGROUPS @HAVE_SETGROUPS@ #cmakedefine HAVE_SETPGID @HAVE_SETPGID@ diff --git a/process.c b/process.c index ffc50d6..15a7c79 100644 --- a/process.c +++ b/process.c @@ -80,6 +80,7 @@ static atom_t ATOM_stdin; static atom_t ATOM_stdout; static atom_t ATOM_stderr; +static atom_t ATOM_extra_streams; static atom_t ATOM_std; static atom_t ATOM_null; static atom_t ATOM_process; @@ -107,6 +108,8 @@ static functor_t FUNCTOR_system_error2; static functor_t FUNCTOR_pipe1; static functor_t FUNCTOR_pipe2; static functor_t FUNCTOR_stream1; +static functor_t FUNCTOR_from_child1; +static functor_t FUNCTOR_to_child1; static functor_t FUNCTOR_exit1; static functor_t FUNCTOR_killed1; static functor_t FUNCTOR_eq2; /* =/2 */ @@ -156,7 +159,8 @@ typedef enum std_type { std_std, std_null, std_pipe, - std_stream + std_stream, + std_unbound, } std_type; @@ -164,6 +168,7 @@ typedef struct p_stream { term_t term; /* P in pipe(P) */ std_type type; /* type of stream */ IOENC encoding; /* Encoding for the stream */ + int mode; /* SIO_INPUT/SIO_OUTPUT, from parent's view */ #ifdef __WINDOWS__ HANDLE fd[2]; /* pipe handles */ #else @@ -172,6 +177,8 @@ typedef struct p_stream int cloexec; /* close on exec activated */ } p_stream; +# define PARENT_OPEN_FLAGS(mode) ((mode & SIO_INPUT) ? O_RDONLY : O_WRONLY) +# define CHILD_OPEN_FLAGS(mode) ((mode & SIO_OUTPUT) ? O_RDONLY : O_WRONLY) typedef struct ecbuf { echar *buffer; @@ -196,12 +203,17 @@ typedef struct p_options term_t pid; /* process(PID) */ int pipes; /* #pipes found */ p_stream streams[3]; + p_stream *extra_streams; /* stream definitions beyond [0,1,2] */ + int extra_stream_count; /* #extra streams past 2 */ int detached; /* create as detached */ int window; /* Show a window? */ int priority; /* Process priority */ } p_options; - +#define INFO_STREAM(info, fd) ((fd <= 2) ? &(info->streams[fd]) : &(info->extra_streams[fd - 3])) +#define FOREACH_STREAM(info, fdvar, streamvar) \ + for (p_stream *streamvar = info->streams, *__loopctr = NULL; __loopctr == NULL; __loopctr++) \ + for (int fdvar = 0; fdvar < 3 + info->extra_stream_count; fdvar++, streamvar = (fdvar == 3 ? info->extra_streams : streamvar+1)) typedef struct wait_options { double timeout; int has_timeout; @@ -439,10 +451,56 @@ get_encoding(term_t head, IOENC *enc) } +static p_stream * +find_matching_stream(p_options *info, p_stream *stream) +{ FOREACH_STREAM(info, fd2, stream2) + { if ( stream2 == stream || fd2 >= info->extra_stream_count + 3 ) + break; + if ( stream2->mode == stream->mode && stream2->term && PL_compare(stream->term, stream2->term) == 0 ) + { return stream2; + break; + } + } + return NULL; +} + + static int get_stream(term_t t, p_options *info, p_stream *stream, atom_t name) { atom_t a; - int i; + IOSTREAM *s = NULL; + + assert(stream->mode == 0); + assert(stream->term == 0); + if ( PL_is_functor(t, FUNCTOR_from_child1) ) + stream->mode = SIO_INPUT; /* we are reading, child is writing */ + else if ( PL_is_functor(t, FUNCTOR_to_child1) ) + stream->mode = SIO_OUTPUT; /* we are writing, child is reading */ + + if ( stream->mode ) + { _PL_get_arg(1, t, t); + /* Parse the argument of read/1 or write/1 as a standard stream item */ + } else + { /* determine input/output from stream name */ + if ( name == ATOM_stdin ) + stream->mode = SIO_OUTPUT; /* parent writes to child's stdin */ + else if ( name == ATOM_stdout || name == ATOM_stderr ) + stream->mode = SIO_INPUT; /* parent reads from child's stdout/stderr */ + else if ( PL_is_functor(t, FUNCTOR_stream1) ) + { /* Determine mode from provided stream */ + stream->term = PL_new_term_ref(); + _PL_get_arg(1, t, stream->term); + if ( !PL_get_stream_handle(stream->term, &s) ) + return FALSE; /* invalid stream OR stream pair (can't autodetect direction) */ + if ( s->flags & SIO_INPUT ) + stream->mode = SIO_INPUT; + else + stream->mode = SIO_OUTPUT; + } + else + return PL_type_error("explicit_rw_specification", t); + } + if ( PL_get_atom(t, &a) ) { if ( a == ATOM_null ) { stream->type = std_null; @@ -458,14 +516,8 @@ get_stream(term_t t, p_options *info, p_stream *stream, atom_t name) { stream->term = PL_new_term_ref(); stream->encoding = ENC_ANSI; _PL_get_arg(1, t, stream->term); - if ( !PL_is_variable(stream->term) ) - { for (i = 0; i < info->pipes; i++) - { if (PL_compare(info->streams[i].term, t) == 0) - break; - } - if (i == info->pipes) - return PL_uninstantiation_error(stream->term); - } + if ( !PL_is_variable(stream->term) && !find_matching_stream(info, stream) ) + return PL_uninstantiation_error(stream->term); if ( PL_is_functor(t, FUNCTOR_pipe2) ) { term_t tail = PL_new_term_ref(); term_t head = PL_new_term_ref(); @@ -490,15 +542,15 @@ get_stream(term_t t, p_options *info, p_stream *stream, atom_t name) info->pipes++; return TRUE; } else if ( PL_is_functor(t, FUNCTOR_stream1) ) - { IOSTREAM *s; - int fd; - stream->term = PL_new_term_ref(); - _PL_get_arg(1, t, stream->term); - if ( !PL_get_stream(stream->term, &s, - name == ATOM_stdin ? SIO_INPUT : SIO_OUTPUT) ) - return FALSE; + { int fd; + if ( !stream->term ) /* could have been fetched above */ + { stream->term = PL_new_term_ref(); + _PL_get_arg(1, t, stream->term); + if ( !PL_get_stream(stream->term, &s, stream->mode) ) + return FALSE; + } stream->type = std_stream; - if ( (fd = Sfileno(s)) > 0 ) + if ( (fd = Sfileno(s)) >= 0 ) { #ifdef __WINDOWS__ stream->fd[0] = stream->fd[1] = (HANDLE)_get_osfhandle(fd); @@ -514,6 +566,39 @@ get_stream(term_t t, p_options *info, p_stream *stream, atom_t name) } +static int +get_extra_streams(term_t t, p_options *info, int *count, p_stream **streams) +{ term_t tail = PL_copy_term_ref(t); + term_t head = PL_new_term_ref(); + int allocated = 0; + + assert(*count == 0); + assert(*streams == NULL); + + allocated = 8; + *streams = PL_malloc(sizeof(p_stream) * allocated); + + while( PL_get_list(tail, head, tail) ) + { if ( *count >= allocated ) + { allocated <<= 1; + *streams = PL_realloc(*streams, sizeof(p_stream) * allocated); + } + memset(&(*streams)[*count], 0, sizeof(p_stream)); + if ( PL_is_variable(head) ) + (*streams)[*count].type = std_unbound; + else if ( !get_stream(head, info, &(*streams)[*count], ATOM_extra_streams) ) + return FALSE; + (*count)++; + } + + if ( *count < allocated ) + { *streams = PL_realloc(*streams, sizeof(p_stream) * *count); + } + + return TRUE; +} + + static int parse_options(term_t options, p_options *info) { term_t tail = PL_copy_term_ref(options); @@ -539,6 +624,11 @@ parse_options(term_t options, p_options *info) } else if ( name == ATOM_stderr ) { if ( !get_stream(arg, info, &info->streams[2], name) ) return FALSE; +#ifndef __WINDOWS__ + } else if ( name == ATOM_extra_streams ) + { if ( !get_extra_streams(arg, info, &info->extra_stream_count, &info->extra_streams) ) + return FALSE; +#endif } else if ( name == ATOM_process ) { info->pid = PL_copy_term_ref(arg); } else if ( name == ATOM_detached ) @@ -644,6 +734,11 @@ free_options(p_options *info) /* TBD: close streams */ } #endif free_ecbuf(&info->envbuf); + if ( info->extra_streams ) + { PL_free(info->extra_streams); + info->extra_streams = NULL; + info->extra_stream_count = 0; + } #ifdef __WINDOWS__ if ( info->cmdline ) { PL_free(info->cmdline); @@ -672,6 +767,14 @@ free_options(p_options *info) /* TBD: close streams */ *******************************/ #define PROCESS_MAGIC 0x29498001 +#define PROCESSFD_MAGIC 0x29498002 + +struct process_context; +typedef struct process_extrafd +{ int magic; /* PROCESSFD_MAGIC */ + int fd; /* extension of pipes[] array */ + int fdnum; /* effective index into extended pipes[] array */ +} process_extrafd; typedef struct process_context { int magic; /* PROCESS_MAGIC */ @@ -680,27 +783,73 @@ typedef struct process_context #else pid_t pid; /* the process id */ #endif - int open_mask; /* Open streams */ + int open_count; /* Open streams */ int pipes[3]; /* stdin/stdout/stderr */ atom_t exe_name; /* exe as atom */ + int extra_count; /* number of entries in extra_pipes[] array */ + process_extrafd extra_pipes[]; /* extension of pipes[] for more than three fds */ } process_context; +#define PROCESS_CONTEXT_SIZE(extra_count) (sizeof(process_context) + (sizeof(process_extrafd) * extra_count)) +#define PROCESS_CONTEXT_FROM_EXTRAFD(efd) \ + (process_context*)(((void*)efd) - \ + (sizeof(process_extrafd) * (efd->fdnum - 3)) - \ + offsetof(process_context, extra_pipes)) + static int wait_for_process(process_context *pc); +static process_context* +create_process_context(p_options *info, intptr_t pid_handle) +{ process_context *pc; + + pc = PL_malloc(PROCESS_CONTEXT_SIZE(info->extra_stream_count)); + memset(pc, 0, PROCESS_CONTEXT_SIZE(info->extra_stream_count)); + *pc = (process_context) + { .magic = PROCESS_MAGIC, +#ifdef __WINDOWS__ + .handle = (HANDLE)pid_handle, +#else + .pid = (int)pid_handle, +#endif + .exe_name = info->exe_name, + .pipes = {-1, -1, -1}, + .extra_count = info->extra_stream_count, + }; + PL_register_atom(pc->exe_name); + return pc; +} + + static int -process_fd(void *handle, process_context **PC) +process_fd_ex(void *handle, process_context **PC, int **FD) { process_context *pc = (process_context*) ((uintptr_t)handle&~(uintptr_t)0x3); + process_extrafd *efd = (process_extrafd *)pc; int pipe = (int)(uintptr_t)handle & 0x3; if ( pc->magic == PROCESS_MAGIC ) { if ( PC ) *PC = pc; - return pc->pipes[pipe]; + if ( FD ) + *FD = &pc->pipes[pipe]; + return pipe; + } else if ( efd->magic == PROCESSFD_MAGIC ) + { if ( PC ) + *PC = PROCESS_CONTEXT_FROM_EXTRAFD(efd); + if ( FD ) + *FD = &efd->fd; + return efd->fdnum; } return -1; } +static int +process_fd(void *handle, process_context **PC) +{ int *FD, rval; + rval = process_fd_ex(handle, PC, &FD); + return rval >= 0 ? *FD : -1; +} + static ssize_t Sread_process(void *handle, char *buf, size_t size) @@ -721,18 +870,22 @@ Swrite_process(void *handle, char *buf, size_t size) static int Sclose_process(void *handle) { process_context *pc; - int fd = process_fd(handle, &pc); + int *FD; + int which = process_fd_ex(handle, &pc, &FD); + int fd = which >= 0 ? *FD : -1; if ( fd >= 0 ) - { int which = (int)(uintptr_t)handle & 0x3; - int rc; + { int rc; + + assert(pc->open_count > 0); rc = (*Sfilefunctions.close)((void*)(uintptr_t)fd); - pc->open_mask &= ~(1<open_count--; + *FD = -1; /* Make sure no one tries to decrement open_count again */ - DEBUG(Sdprintf("Closing fd[%d]; mask = 0x%x\n", which, pc->open_mask)); + DEBUG(Sdprintf("Closing fd[%d] (%d); count = %d\n", which, fd, pc->open_count)); - if ( !pc->open_mask ) + if ( !pc->open_count ) { int rcw = wait_for_process(pc); return rcw ? 0 : -1; @@ -775,33 +928,43 @@ static IOFUNCTIONS Sprocessfunctions = static IOSTREAM * open_process_pipe(process_context *pc, p_options *info, int which, int fdn) { void *handle; + p_stream *stream = INFO_STREAM(info, which); #ifdef __WINDOWS__ - HANDLE fd = info->streams[which].fd[fdn]; + HANDLE fd = stream->fd[fdn]; #else - int fd = info->streams[which].fd[fdn]; + int fd = stream->fd[fdn]; #endif int flags = SIO_RECORDPOS|SIO_FBUF; IOSTREAM *s; + int *FD; - pc->open_mask |= (1<pipes[which]; + handle = (void *)((uintptr_t)pc | (uintptr_t)which); + } else + { assert(which - 3 < pc->extra_count); + process_extrafd *efd = &pc->extra_pipes[which - 3]; + if ( efd->magic != PROCESSFD_MAGIC ) + { *efd = (process_extrafd){.magic = PROCESSFD_MAGIC, .fdnum = which, .fd = -1}; + } + FD = &efd->fd; + handle = efd; + } + assert(*FD == -1); + pc->open_count++; #ifdef __WINDOWS__ - pc->pipes[which] = _open_osfhandle((intptr_t)fd, _O_BINARY); + *FD = _open_osfhandle((intptr_t)fd, _O_BINARY); #else - pc->pipes[which] = fd; + *FD = fd; #endif - if ( info->streams[which].encoding != ENC_OCTET ) + if ( stream->encoding != ENC_OCTET ) flags |= SIO_TEXT; - if ( which == 0 ) - flags |= SIO_OUTPUT; - else - flags |= SIO_INPUT; - - handle = (void *)((uintptr_t)pc | (uintptr_t)which); + flags |= stream->mode; if ( (s=Snew(handle, flags, &Sprocessfunctions)) ) - s->encoding = info->streams[which].encoding; + s->encoding = stream->encoding; return s; } @@ -1159,8 +1322,7 @@ wait_for_process(process_context *pc) static int create_pipes(p_options *info) -{ int i; - SECURITY_ATTRIBUTES sa; +{ SECURITY_ATTRIBUTES sa; sa.nLength = sizeof(sa); /* Length in bytes */ sa.bInheritHandle = 1; /* the child must inherit these handles */ @@ -1322,16 +1484,10 @@ do_create_process(p_options *info) if ( info->pipes > 0 && info->pid == 0 ) { IOSTREAM *s; - process_context *pc = PL_malloc(sizeof(*pc)); + process_context *pc = create_process_context(info, (intptr_t)pi.hProcess); DEBUG(Sdprintf("Wait on pipes\n")); - memset(pc, 0, sizeof(*pc)); - pc->magic = PROCESS_MAGIC; - pc->handle = pi.hProcess; - pc->exe_name = info->exe_name; - PL_register_atom(pc->exe_name); - if ( info->streams[0].type == std_pipe ) { CloseHandle(info->streams[0].fd[0]); if ( (s = open_process_pipe(pc, info, 0, 1)) ) @@ -1488,16 +1644,12 @@ for a runtime solution. static int create_pipes(p_options *info) -{ int i; - - for(i=0; i<3; i++) - { p_stream *s = &info->streams[i]; - - if ( s->term && s->type == std_pipe ) - { if ( i == 2 && info->streams[1].term && - PL_compare(info->streams[1].term, info->streams[2].term) == 0 ) - { s->fd[0] = info->streams[1].fd[0]; - s->fd[1] = info->streams[1].fd[1]; +{ FOREACH_STREAM(info, i, s) + { if ( s->term && s->type == std_pipe ) + { p_stream *matched_stream = find_matching_stream(info, s); + if ( matched_stream ) + { s->fd[0] = matched_stream->fd[0]; + s->fd[1] = matched_stream->fd[1]; } else { int my_side; @@ -1516,7 +1668,7 @@ create_pipes(p_options *info) Sdprintf("pipe(): unexpected error: %s\n", strerror(errno)); return PL_resource_error("open_files"); } - my_side = (i == 0 ? s->fd[1] : s->fd[0]); + my_side = ((s->mode & SIO_OUTPUT) ? s->fd[1] : s->fd[0]); #ifdef F_SETFD if ( fcntl(my_side, F_SETFD, FD_CLOEXEC) == 0 ) s->cloexec = TRUE; @@ -1654,18 +1806,18 @@ close_ok(int fd) } static IOSTREAM * -p_fdopen(p_options *info, int which, int fdn, char *mode) +p_fdopen(p_options *info, p_stream *stream, int fdn) { IOSTREAM *s; char m[10]; char *mp = m; - *mp++ = mode[0]; - if ( info->streams[which].encoding == ENC_OCTET ) + *mp++ = (stream->mode & SIO_INPUT) ? 'r' : 'w'; + if ( stream->encoding == ENC_OCTET ) *mp++ = 'b'; *mp = 0; - if ( (s=Sfdopen(info->streams[which].fd[fdn], m)) ) - s->encoding = info->streams[which].encoding; + if ( (s=Sfdopen(stream->fd[fdn], m)) ) + s->encoding = stream->encoding; return s; } @@ -1674,67 +1826,31 @@ p_fdopen(p_options *info, int which, int fdn, char *mode) static int process_parent_side(p_options *info, int pid) { int rc = TRUE; + process_context *pc = NULL; if ( info->pipes > 0 && info->pid == 0 ) - { IOSTREAM *s; /* no pid(Pid): wait */ - process_context *pc = PL_malloc(sizeof(*pc)); + { /* no pid(Pid): wait */ + pc = create_process_context(info, pid); DEBUG(Sdprintf("Wait on pipes\n")); + } - memset(pc, 0, sizeof(*pc)); - pc->magic = PROCESS_MAGIC; - pc->pid = pid; - pc->exe_name = info->exe_name; - PL_register_atom(pc->exe_name); - - if ( info->streams[0].type == std_pipe ) - { close_ok(info->streams[0].fd[0]); - if ( (s = open_process_pipe(pc, info, 0, 1)) ) - rc = PL_unify_stream(info->streams[0].term, s); - else - close_ok(info->streams[0].fd[1]); - } - if ( info->streams[1].type == std_pipe ) - { close_ok(info->streams[1].fd[1]); - if ( rc && (s = open_process_pipe(pc, info, 1, 0)) ) - rc = PL_unify_stream(info->streams[1].term, s); - else - close_ok(info->streams[1].fd[0]); - } - if ( info->streams[2].type == std_pipe && - ( !info->streams[1].term || PL_compare(info->streams[1].term, info->streams[2].term) != 0 ) ) - { close_ok(info->streams[2].fd[1]); - if ( rc && (s = open_process_pipe(pc, info, 2, 0)) ) - rc = PL_unify_stream(info->streams[2].term, s); - else - close_ok(info->streams[2].fd[0]); - } - - return rc; - } else if ( info->pipes > 0 ) + if ( info->pipes > 0 ) { IOSTREAM *s; - - if ( info->streams[0].type == std_pipe ) - { close_ok(info->streams[0].fd[0]); - if ( (s = p_fdopen(info, 0, 1, "w")) ) - rc = PL_unify_stream(info->streams[0].term, s); - else - close_ok(info->streams[0].fd[1]); - } - if ( info->streams[1].type == std_pipe ) - { close_ok(info->streams[1].fd[1]); - if ( rc && (s = p_fdopen(info, 1, 0, "r")) ) - rc = PL_unify_stream(info->streams[1].term, s); - else - close_ok(info->streams[1].fd[0]); - } - if ( info->streams[2].type == std_pipe && - ( !info->streams[1].term || PL_compare(info->streams[1].term, info->streams[2].term) != 0 ) ) - { close_ok(info->streams[2].fd[1]); - if ( rc && (s = p_fdopen(info, 2, 0, "r")) ) - PL_unify_stream(info->streams[2].term, s); - else - close_ok(info->streams[2].fd[0]); + FOREACH_STREAM(info, fd, stream) + { if ( stream->type == std_pipe && !find_matching_stream(info, stream) ) + { int myside = (stream->mode & SIO_OUTPUT) ? 1 : 0; + close_ok(stream->fd[1 - myside]); + if ( (s = pc + ? open_process_pipe(pc, info, fd, myside) + : p_fdopen(info, stream, myside)) ) + rc = PL_unify_stream(stream->term, s); + else + close_ok(stream->fd[myside]); + } } + + if ( info->pid == 0 ) + return rc; } assert(rc); /* What else? */ @@ -1829,61 +1945,32 @@ do_create_process_fork(p_options *info, create_method method) } } - /* stdin */ - switch( info->streams[0].type ) - { case std_pipe: - case std_stream: - dup2(info->streams[0].fd[0], 0); - if ( !info->streams[0].cloexec ) - close(info->streams[0].fd[1]); - break; - case std_null: - if ( (fd = open("/dev/null", O_RDONLY)) >= 0 ) - dup2(fd, 0); - break; - case std_std: - { int fd = Sfileno(Suser_input); - if ( fd > 0 ) - dup2(fd, 0); - break; - } - } - /* stdout */ - switch( info->streams[1].type ) - { case std_pipe: - case std_stream: - dup2(info->streams[1].fd[1], 1); - if ( !info->streams[1].cloexec ) - close(info->streams[1].fd[0]); - break; - case std_null: - if ( (fd = open("/dev/null", O_WRONLY)) >= 0 ) - dup2(fd, 1); - break; - case std_std: - { int fd = Sfileno(Suser_output); - if ( fd >= 0 && fd != 1 ) - dup2(fd, 1); - break; - } - } - /* stderr */ - switch( info->streams[2].type ) - { case std_pipe: - case std_stream: - dup2(info->streams[2].fd[1], 2); - if ( !info->streams[2].cloexec ) - close(info->streams[2].fd[0]); - break; - case std_null: - if ( (fd = open("/dev/null", O_WRONLY)) >= 0 ) - dup2(fd, 2); - break; - case std_std: - { int fd = Sfileno(Suser_error); - if ( fd >= 0 && fd != 2 ) - dup2(fd, 2); - break; + FOREACH_STREAM(info, streamfd, stream) + { switch( stream->type ) + { case std_pipe: + case std_stream: + { int pairhalf = (stream->mode & SIO_OUTPUT) /* parent writes, child reads */ + ? 0 : 1; + dup2(stream->fd[pairhalf], streamfd); + if ( !stream->cloexec ) + close(stream->fd[1 - pairhalf]); + break; + } + case std_null: + if ( (fd = open("/dev/null", CHILD_OPEN_FLAGS(stream->mode))) >= 0 ) + dup2(fd, streamfd); + break; + case std_std: + fd = streamfd == 0 ? Sfileno(Suser_input) + : streamfd == 1 ? Sfileno(Suser_output) + : streamfd == 2 ? Sfileno(Suser_error) + : -1; + if ( fd >= 0 && fd != streamfd ) + dup2(fd, streamfd); + break; + case std_unbound: + close(streamfd); + break; } } @@ -1931,49 +2018,27 @@ do_create_process(p_options *info) if ( info->detached ) posix_spawnattr_setpgroup(&attr, 0); - switch( info->streams[0].type ) /* stdin */ - { case std_pipe: - case std_stream: - posix_spawn_file_actions_adddup2(&file_actions, info->streams[0].fd[0], 0); - if ( !info->streams[0].cloexec ) - posix_spawn_file_actions_addclose(&file_actions, info->streams[0].fd[1]); - break; - case std_null: - posix_spawn_file_actions_addopen(&file_actions, 0, - "/dev/null", O_RDONLY, 0); - break; - case std_std: - break; - } - /* stdout */ - switch( info->streams[1].type ) - { case std_pipe: - case std_stream: - posix_spawn_file_actions_adddup2(&file_actions, info->streams[1].fd[1], 1); - if ( !info->streams[1].cloexec ) - posix_spawn_file_actions_addclose(&file_actions, info->streams[1].fd[0]); - break; - case std_null: - posix_spawn_file_actions_addopen(&file_actions, 1, - "/dev/null", O_WRONLY, 0); - break; - case std_std: - break; - } - /* stderr */ - switch( info->streams[2].type ) - { case std_pipe: - case std_stream: - posix_spawn_file_actions_adddup2(&file_actions, info->streams[2].fd[1], 2); - if ( !info->streams[2].cloexec ) - posix_spawn_file_actions_addclose(&file_actions, info->streams[2].fd[0]); - break; - case std_null: - posix_spawn_file_actions_addopen(&file_actions, 2, - "/dev/null", O_WRONLY, 0); - break; - case std_std: - break; + FOREACH_STREAM(info, fd, stream) + { switch( stream->type ) + { case std_pipe: + case std_stream: + { int pairhalf = (stream->mode & SIO_OUTPUT ) /* parent writes, child reads */ + ? 0 : 1; + posix_spawn_file_actions_adddup2(&file_actions, stream->fd[pairhalf], fd); + if ( !stream->cloexec ) + posix_spawn_file_actions_addclose(&file_actions, stream->fd[1 - pairhalf]); + break; + } + case std_null: + posix_spawn_file_actions_addopen(&file_actions, fd, + "/dev/null", CHILD_OPEN_FLAGS(stream->mode), 0); + break; + case std_std: + break; + case std_unbound: + posix_spawn_file_actions_addclose(&file_actions, fd); + break; + } } rc = posix_spawn(&pid, info->exe, @@ -2244,6 +2309,7 @@ install_process() MKATOM(stdin); MKATOM(stdout); MKATOM(stderr); + MKATOM(extra_streams); MKATOM(std); MKATOM(null); MKATOM(process); @@ -2270,6 +2336,8 @@ install_process() MKFUNCTOR(type, 1); MKFUNCTOR(encoding, 1); MKFUNCTOR(stream, 1); + MKFUNCTOR(from_child, 1); + MKFUNCTOR(to_child, 1); MKFUNCTOR(error, 2); MKFUNCTOR(process_error, 2); MKFUNCTOR(system_error, 2); diff --git a/process.pl b/process.pl index a9b947c..68974e3 100644 --- a/process.pl +++ b/process.pl @@ -60,6 +60,7 @@ [ stdin(any), stdout(any), stderr(any), + extra_streams(list(any)), cwd(atom), env(list(any)), environment(list(any)), @@ -165,6 +166,35 @@ % This option is __not__ provided by the SICStus % implementation. % +% * extra_streams(+SpecList) +% Unix-only: Bind extra file descriptors in the new process, +% beyond the standard 0,1,2. Each entry in Speclist describes +% the next sequential descriptor, starting with 3. Each list +% member follows the same format as the arguments to the +% =stdin=, =stdout=, and =stderr= options, with the following +% additional or changed semantics: +% +% * _ +% Any member of SpecList which is an unbound variable will +% not be passed to the child process. This enables skipping +% file descriptors; for example, a SpecList defined as +% =[_,_,_,stream(In)]= will bind file descriptor 6 while +% leaving 3, 4, and 5 unbound. +% * from_child(+Spec) +% * to_child(+Spec) +% Defines the direction of the stream handed to the child +% process. Exactly one of these __must__ be specified for +% any additional file descriptor, unless the descriptor +% is using the =stream(+Stream)= format and the referenced +% stream is unidirectional. The Spec argument can take any +% of the forms given to the =stdin= option family. +% * std +% When specified for an extra file descriptor, this passes +% the Prolog process's file descriptor on to the child +% without modification. This is probably not very useful +% unless the Prolog program is exercising strict control +% over its own file descriptors. +% % * cwd(+Directory) % Run the new process in Directory. Directory can be a % compound specification, which is converted using @@ -201,7 +231,7 @@ % lower their own priority. Only the super-user may _raise_ it % to less-than zero. % -% If the user specifies the process(-PID) option, he *must* call +% If the user specifies the process(-PID) option, they *must* call % process_wait/2 to reclaim the process. Without this option, the % system will wait for completion of the process after the last % pipe stream is closed. @@ -226,6 +256,14 @@ % likely to support more window specific options and replace % win_exec/2. % +% While the CreateProcess() API allows for specifying the stream +% handles for the three standard file descriptors, it does not +% provide any way to specify bindings for any other numbered file +% descriptors. So, while the stream bindings themselves can be +% passed to a child process, there is no way to guarantee what +% Unix-style file descriptor they would acquire once in the child +% process's namespace. +% % *Examples* % % First, a very simple example that behaves the same as diff --git a/test_process.pl b/test_process.pl index 7e12932..cb079aa 100644 --- a/test_process.pl +++ b/test_process.pl @@ -92,6 +92,41 @@ ['-c', 'echo "error" 1>&2'], [stderr(pipe(Out))]), read_process(Out, X). +test(read_extra, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true)),X == 'fd-3\n']) :- + process_create(path(sh), + ['-c', 'echo "fd-3" 1>&3'], + [extra_streams([from_child(pipe(Out))])]), + read_process(Out, X). +test(null_extra, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true))]) :- + process_create(path(sh), + ['-c', 'echo "THIS IS AN ERROR" 1>&3'], + [extra_streams([from_child(null)])]). +test(write_extra, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true)),X == 'hello fd4\n']) :- + process_create(path(sh), + ['-c', 'cat 0<&4'], + [stdout(pipe(Out)), extra_streams([_,to_child(pipe(In))])]), + format(In, 'hello fd4~n', []), + close(In), + read_process(Out, X). +test(separate_error, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true)),X == 'separated error\n']) :- + process_create(path(sh), + ['-c', 'echo "THIS IS AN ERROR" 1>&2; echo "separated error" 1>&3'], + [stderr(null), extra_streams([from_child(pipe(Out))])]), + read_process(Out, X). +test(auto_detect_read, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true)),X == "0"]) :- + open('/dev/null', read, In, [type(binary)]), + process_create(path(sh), + ['-c', 'wc -c 0<&3'], + [stdout(pipe(Out)), extra_streams([stream(In)])]), + close(In), + read_process(Out, X0), + split_string(X0, "", " \r\n", [X]). +test(auto_detect_write, [condition(has_exe(sh)),condition(\+current_prolog_flag(windows, true))]) :- + open('/dev/null', write, Out, [type(binary)]), + process_create(path(sh), + ['-c', 'echo "THIS IS AN ERROR" 1>&3'], + [extra_streams([stream(Out)])]), + close(Out). test(echo, [condition(has_exe(sh)), X == 'hello\n']) :- process_create(path(sh), ['-c', 'echo hello'], From c98e0eed31f49712196a9b864b921a942ff5c349 Mon Sep 17 00:00:00 2001 From: Jan Wielemaker Date: Sat, 7 Aug 2021 16:26:55 +0200 Subject: [PATCH 2/3] Modernized PlDoc comment --- process.pl | 63 +++++++++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/process.pl b/process.pl index 68974e3..2b5e3d1 100644 --- a/process.pl +++ b/process.pl @@ -3,9 +3,10 @@ Author: Jan Wielemaker E-mail: J.Wielemaker@vu.nl WWW: http://www.swi-prolog.org - Copyright (c) 2008-2019, University of Amsterdam + Copyright (c) 2008-2021, University of Amsterdam VU University Amsterdam CWI, Amsterdam + SWI-Prolog Solutions b.v. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -83,11 +84,11 @@ In addition to the predicates, this module defines a file search path (see user:file_search_path/2 and absolute_file_name/3) named =path= that locates files on the system's search path for executables. E.g. the -following finds the executable for =ls=: +following finds the executable for ``ls``: - == + ``` ?- absolute_file_name(path(ls), Path, [access(execute)]). - == + ``` *|Incompatibilities and current limitations|* @@ -114,7 +115,7 @@ % % Create a new process running the file Exe and using arguments % from the given list. Exe is a file specification as handed to -% absolute_file_name/3. Typically one use the =path= file alias to +% absolute_file_name/3. Typically one use the `path` file alias to % specify an executable file on the current PATH. Args is a list % of arguments that are handed to the new process. On Unix % systems, each element in the list becomes a separate argument in @@ -134,9 +135,9 @@ % the terms below. If pipe(Pipe) is used, the Prolog stream is % a stream in text-mode using the encoding of the default % locale. The encoding can be changed using set_stream/2, -% or by using the two-argument form of =pipe=, which accepts an +% or by using the two-argument form of `pipe`, which accepts an % encoding(Encoding) option. -% The options =stdout= and =stderr= may use the same stream, +% The options `stdout` and `stderr` may use the same stream, % in which case both output streams are connected to the same % Prolog stream. % @@ -171,23 +172,23 @@ % beyond the standard 0,1,2. Each entry in Speclist describes % the next sequential descriptor, starting with 3. Each list % member follows the same format as the arguments to the -% =stdin=, =stdout=, and =stderr= options, with the following +% `stdin`, `stdout`, and `stderr` options, with the following % additional or changed semantics: % % * _ % Any member of SpecList which is an unbound variable will % not be passed to the child process. This enables skipping % file descriptors; for example, a SpecList defined as -% =[_,_,_,stream(In)]= will bind file descriptor 6 while +% `[_,_,_,stream(In)]` will bind file descriptor 6 while % leaving 3, 4, and 5 unbound. % * from_child(+Spec) % * to_child(+Spec) % Defines the direction of the stream handed to the child % process. Exactly one of these __must__ be specified for % any additional file descriptor, unless the descriptor -% is using the =stream(+Stream)= format and the referenced +% is using the stream(+Stream) format and the referenced % stream is unidirectional. The Spec argument can take any -% of the forms given to the =stdin= option family. +% of the forms given to the `stdin` option family. % * std % When specified for an extra file descriptor, this passes % the Prolog process's file descriptor on to the child @@ -211,10 +212,10 @@ % * process(-PID) % Unify PID with the process id of the created process. % * detached(+Bool) -% In Unix: If =true=, detach the process from the terminal +% In Unix: If `true`, detach the process from the terminal % Currently mapped to setsid(); % Also creates a new process group for the child -% In Windows: If =true=, detach the process from the current +% In Windows: If `true`, detach the process from the current % job via the CREATE_BREAKAWAY_FROM_JOB flag. In Vista and beyond, % processes launched from the shell directly have the 'compatibility % assistant' attached to them automatically unless they have a UAC @@ -222,7 +223,7 @@ % permission denied error if you try and assign the newly-created % PID to a job you create yourself. % * window(+Bool) -% If =true=, create a window for the process (Windows only) +% If `true`, create a window for the process (Windows only) % * priority(+Priority) % In Unix: specifies the process priority for the newly % created process. Priority must be an integer between -20 @@ -250,7 +251,7 @@ % is raised, otherwise double-quote are used. % % The CreateProcess() API has many options. Currently only the -% =CREATE_NO_WINDOW= options is supported through the +% ``CREATE_NO_WINDOW`` options is supported through the % window(+Bool) option. If omitted, the default is to use this % option if the application has no console. Future versions are % likely to support more window specific options and replace @@ -267,16 +268,16 @@ % *Examples* % % First, a very simple example that behaves the same as -% =|shell('ls -l')|=, except for error handling: +% shell('ls -l'), except for error handling: % -% == +% ``` % ?- process_create(path(ls), ['-l'], []). -% == +% ``` % % The following example uses grep to find all matching lines in a % file. % -% == +% ``` % grep(File, Pattern, Lines) :- % setup_call_cleanup( % process_create(path(grep), [ Pattern, file(File) ], @@ -294,7 +295,7 @@ % atom_codes(Line, Codes), % read_line_to_codes(Out, Line2), % read_lines(Line2, Out, Lines). -% == +% ``` % % @error process_error(Exe, Status) where Status is one of % exit(Code) or killed(Signal). Raised if the process @@ -414,11 +415,11 @@ % until the process is finished. Options: % % * timeout(+Timeout) -% Default: =infinite=. If this option is a number, the +% Default: `infinite`. If this option is a number, the % waits for a maximum of Timeout seconds and unifies Status -% with =timeout= if the process does not terminate within +% with `timeout` if the process does not terminate within % Timeout. In this case PID is _not_ invalidated. On Unix -% systems only timeout 0 and =infinite= are supported. A +% systems only timeout 0 and `infinite` are supported. A % 0-value can be used to poll the status of the process. % % * release(+Bool) @@ -436,13 +437,13 @@ %! process_kill(+PID) is det. %! process_kill(+PID, +Signal) is det. % -% Send signal to process PID. Default is =term=. Signal is an -% integer, Unix signal name (e.g. =SIGSTOP=) or the more Prolog -% friendly variation one gets after removing =SIG= and downcase -% the result: =stop=. On Windows systems, Signal is ignored and -% the process is terminated using the TerminateProcess() API. On -% Windows systems PID must be obtained from process_create/3, -% while any PID is allowed on Unix systems. +% Send signal to process PID. Default is `term`. Signal is an integer, +% Unix signal name (e.g. ``SIGSTOP``) or the more Prolog friendly +% variation one gets after removing ``SIG`` and downcase the result: +% `stop`. On Windows systems, Signal is ignored and the process is +% terminated using the TerminateProcess() API. On Windows systems PID +% must be obtained from process_create/3, while any PID is allowed on +% Unix systems. % % @compat SICStus does not accept the prolog friendly version. We % choose to do so for compatibility with on_signal/3. @@ -455,7 +456,7 @@ %! process_group_kill(+PID, +Signal) is det. % % Send signal to the group containing process PID. Default is -% =term=. See process_wait/1 for a description of signal +% `term`. See process_wait/1 for a description of signal % handling. In Windows, the same restriction on PID applies: it % must have been created from process_create/3, and the the group % is terminated via the TerminateJobObject API. From 356865adb84bb077fd0b89c34cbdd1d56a2cb568 Mon Sep 17 00:00:00 2001 From: Danielle Church Date: Sat, 7 Aug 2021 14:55:57 -0400 Subject: [PATCH 3/3] FIXED: fd collisions, spawn environment Fixes the issue of process_create/3 not passing environment to child when no environment variables are specified and create mode is spawn, exposed by fixing the spawn method. Also fixes some fd collision bugs in the extra_streams() functionality, and introduces unit testing for all three process creation methods (which found the above). --- process.c | 46 +++++++++++++++++++++++------------------ test_process.pl | 54 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/process.c b/process.c index 15a7c79..2299b6e 100644 --- a/process.c +++ b/process.c @@ -320,6 +320,19 @@ already_in_env(const char *env, int count, const char *e) } #endif +#ifndef __WINDOWS__ +static char** +get_environ(void) +{ +#ifdef HAVE__NSGETENVIRON + char **environ = *_NSGetEnviron(); +#else + extern char **environ; +#endif + return environ; +} +#endif + static int parse_environment(term_t t, p_options *info, int pass) { term_t tail = PL_copy_term_ref(t); @@ -364,16 +377,10 @@ parse_environment(term_t t, p_options *info, int pass) #ifndef __WINDOWS__ if ( pass ) - { -#ifdef HAVE__NSGETENVIRON - char **environ = *_NSGetEnviron(); -#else - extern char **environ; -#endif - char **e; + { char **e; int count0 = count; - for(e=environ; e && *e; e++) + for(e=get_environ(); e && *e; e++) { if ( !already_in_env(eb->buffer, count0, *e) ) { add_ecbuf(eb, *e, strlen(*e)); add_ecbuf(eb, ECHARS("\0"), 1); @@ -1951,14 +1958,19 @@ do_create_process_fork(p_options *info, create_method method) case std_stream: { int pairhalf = (stream->mode & SIO_OUTPUT) /* parent writes, child reads */ ? 0 : 1; - dup2(stream->fd[pairhalf], streamfd); - if ( !stream->cloexec ) - close(stream->fd[1 - pairhalf]); + fd = dup(stream->fd[pairhalf]); + close(stream->fd[0]); + close(stream->fd[1]); + dup2(fd, streamfd); + close(fd); break; } case std_null: if ( (fd = open("/dev/null", CHILD_OPEN_FLAGS(stream->mode))) >= 0 ) - dup2(fd, streamfd); + { dup2(fd, streamfd); + if ( fd != streamfd ) + close(fd); + } break; case std_std: fd = streamfd == 0 ? Sfileno(Suser_input) @@ -1977,13 +1989,7 @@ do_create_process_fork(p_options *info, create_method method) if ( info->envp ) { execve(info->exe, info->argv, info->envp); } else - { -#ifdef HAVE__NSGETENVIRON - char **environ = *_NSGetEnviron(); -#else - extern char **environ; -#endif - execve(info->exe, info->argv, environ); + { execve(info->exe, info->argv, get_environ()); } perror(info->exe); @@ -2043,7 +2049,7 @@ do_create_process(p_options *info) rc = posix_spawn(&pid, info->exe, &file_actions, &attr, - info->argv, info->envp); + info->argv, info->envp ? info->envp : get_environ()); posix_spawn_file_actions_destroy(&file_actions); posix_spawnattr_destroy(&attr); diff --git a/test_process.pl b/test_process.pl index cb079aa..b504513 100644 --- a/test_process.pl +++ b/test_process.pl @@ -50,6 +50,8 @@ test_process :- run_tests([ process_create, + 'process_create[fork]', + 'process_create[vfork]', process_wait, process_threads ]). @@ -63,7 +65,29 @@ process:exe_options(Options), absolute_file_name(path(Name), _, [file_errors(fail)|Options]). +:- dynamic recorded_clause/3. + +record_clause(Tag, Position, Clause) :- + \+ recorded_clause(Tag, Position, _), + Clause \= (:- _Directive), + assertz(recorded_clause(Tag, Position, Clause)). + +record_clauses_as(Tag) :- + nb_setval(record_clause_tag, Tag). + +stop_recording_clauses :- + nb_delete(record_clause_tag). + +term_expansion(Clause, _) :- + nb_current(record_clause_tag, Tag), + prolog_load_context(term_position, TermPos), + once(record_clause(Tag, TermPos, Clause)), + fail. +term_expansion(replay_recorded_clauses(Tag), Clauses) :- + findall(Clause, recorded_clause(Tag, _, Clause), Clauses). + :- begin_tests(process_create, [sto(rational_trees)]). +:- record_clauses_as(process_create). test(echo, [condition(has_exe(true))]) :- process_create(path(true), [], []). @@ -161,12 +185,42 @@ read_process(Out, CWD0), normalize_space(atom(CWD), CWD0), same_file(CWD, Tmp). +test(std_env, [condition((has_exe(sh), getenv('USER', _))), + forall(member(Opts,[ [], [environment([])], [environment(['TEST'=testing])] ])), X == User]) :- + getenv('USER', User), + process_create(path(sh), + ['-c', 'echo -n $USER'], + [ stdout(pipe(Out))|Opts ]), + read_process(Out, X). +test(set_env, [condition(has_exe(sh)), forall(member(Opt,[environment(['TEST'=test_set_env]), + environment(['FOO'=bar, 'TEST'=test_set_env]), + env(['TEST'=test_set_env]), + env(['FOO'=bar, 'TEST'=test_set_env]) ])), X == test_set_env]) :- + process_create(path(sh), + ['-c', 'echo -n $TEST'], + [ stdout(pipe(Out)), Opt ]), + read_process(Out, X). +test(replace_env, [condition((has_exe(sh), getenv('USER', _))), + forall(member(Opt,[ env([]), env(['TEST'=testing]) ])), X == '']) :- + process_create(path(sh), + ['-c', 'echo -n $USER'], + [ stdout(pipe(Out)), Opt ]), + read_process(Out, X). tmp_dir(Dir) :- current_prolog_flag(tmp_dir, Dir). +:- stop_recording_clauses. :- end_tests(process_create). +:- begin_tests('process_create[fork]', [sto(rational_trees), setup(process_set_method(fork)), cleanup(process_set_method(spawn))]). +replay_recorded_clauses(process_create). +:- end_tests('process_create[fork]'). + +:- begin_tests('process_create[vfork]', [sto(rational_trees), setup(process_set_method(vfork)), cleanup(process_set_method(spawn))]). +replay_recorded_clauses(process_create). +:- end_tests('process_create[vfork]'). + :- begin_tests(process_wait, [sto(rational_trees)]).