1 
2 #include "processx-connection.h"
3 
4 #include <string.h>
5 #include <stdlib.h>
6 #include <errno.h>
7 #include <sys/types.h>
8 #include <unistd.h>
9 
10 #ifndef _WIN32
11 #include <sys/uio.h>
12 #include <poll.h>
13 #else
14 #include <io.h>
15 #endif
16 
17 #include "processx.h"
18 
19 #ifdef _WIN32
20 #include "win/processx-win.h"
21 #else
22 #include "unix/processx-unix.h"
23 #endif
24 
25 /* Internal functions in this file */
26 
27 static void processx__connection_find_chars(processx_connection_t *ccon,
28 					    ssize_t maxchars,
29 					    ssize_t maxbytes,
30 					    size_t *chars,
31 					    size_t *bytes);
32 
33 static void processx__connection_find_lines(processx_connection_t *ccon,
34 					    ssize_t maxlines,
35 					    size_t *lines,
36 					    int *eof);
37 
38 static void processx__connection_alloc(processx_connection_t *ccon);
39 static void processx__connection_realloc(processx_connection_t *ccon);
40 static ssize_t processx__connection_read(processx_connection_t *ccon);
41 static ssize_t processx__find_newline(processx_connection_t *ccon,
42 				      size_t start);
43 static ssize_t processx__connection_read_until_newline(processx_connection_t
44 						       *ccon);
45 static void processx__connection_xfinalizer(SEXP con);
46 static ssize_t processx__connection_to_utf8(processx_connection_t *ccon);
47 static void processx__connection_find_utf8_chars(processx_connection_t *ccon,
48 						 ssize_t maxchars,
49 						 ssize_t maxbytes,
50 						 size_t *chars,
51 						 size_t *bytes);
52 
53 #ifdef _WIN32
54 #define PROCESSX_CHECK_VALID_CONN(x) do {				\
55     if (!x) R_THROW_ERROR("Invalid connection object");                 \
56     if (!(x)->handle.handle) {						\
57       R_THROW_ERROR("Invalid (uninitialized or closed?) connection object"); \
58     }									\
59   } while (0)
60 #else
61 #define PROCESSX_CHECK_VALID_CONN(x) do {				\
62     if (!x) R_THROW_ERROR("Invalid connection object");				\
63     if ((x)->handle < 0) {                                              \
64       R_THROW_ERROR("Invalid (uninitialized or closed?) connection object");	\
65 }                                                                       \
66   } while (0)
67 #endif
68 
69 /* --------------------------------------------------------------------- */
70 /* API from R                                                            */
71 /* --------------------------------------------------------------------- */
72 
processx_connection_create(SEXP handle,SEXP encoding)73 SEXP processx_connection_create(SEXP handle, SEXP encoding) {
74   processx_file_handle_t *os_handle = R_ExternalPtrAddr(handle);
75   const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
76   SEXP result = R_NilValue;
77 
78   if (!os_handle) R_THROW_ERROR("Cannot create connection, invalid handle");
79 
80   processx_c_connection_create(*os_handle, PROCESSX_FILE_TYPE_ASYNCPIPE,
81 			       c_encoding, &result);
82   return result;
83 }
84 
processx_connection_create_fd(SEXP handle,SEXP encoding,SEXP close)85 SEXP processx_connection_create_fd(SEXP handle, SEXP encoding, SEXP close) {
86   int fd = INTEGER(handle)[0];
87   const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
88   processx_file_handle_t os_handle;
89   processx_connection_t *con;
90   SEXP result = R_NilValue;
91 
92 #ifdef _WIN32
93   os_handle = (HANDLE) _get_osfhandle(fd);
94 #else
95   os_handle = fd;
96 #endif
97 
98   con = processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_ASYNCPIPE,
99 				     c_encoding, &result);
100 
101   if (! LOGICAL(close)[0]) con->close_on_destroy = 0;
102 
103   return result;
104 }
105 
processx_connection_create_file(SEXP filename,SEXP read,SEXP write)106 SEXP processx_connection_create_file(SEXP filename, SEXP read, SEXP write) {
107   const char *c_filename = CHAR(STRING_ELT(filename, 0));
108   int c_read = LOGICAL(read)[0];
109   int c_write = LOGICAL(write)[0];
110   SEXP result = R_NilValue;
111   processx_file_handle_t os_handle;
112 
113 #ifdef _WIN32
114   DWORD access = 0, create = 0;
115   if (c_read) access |= GENERIC_READ;
116   if (c_write) access |= GENERIC_WRITE;
117   if (c_read) create |= OPEN_EXISTING;
118   if (c_write) create |= CREATE_ALWAYS;
119   os_handle = CreateFile(
120     /* lpFilename = */ c_filename,
121     /* dwDesiredAccess = */ access,
122     /* dwShareMode = */ 0,
123     /* lpSecurityAttributes = */ NULL,
124     /* dwCreationDisposition = */ create,
125     /* dwFlagsAndAttributes = */ FILE_ATTRIBUTE_NORMAL,
126     /* hTemplateFile = */ NULL);
127   if (os_handle == INVALID_HANDLE_VALUE) {
128     R_THROW_SYSTEM_ERROR("Cannot open file `%s`", c_filename);
129   }
130 
131 #else
132   int flags = 0;
133   if ( c_read && !c_write) flags |= O_RDONLY;
134   if (!c_read &&  c_write) flags |= O_WRONLY | O_CREAT | O_TRUNC;
135   if ( c_read &&  c_write) flags |= O_RDWR;
136   os_handle = open(c_filename, flags, 0644);
137   if (os_handle == -1) {
138     R_THROW_SYSTEM_ERROR("Cannot open file `%s`", c_filename);
139   }
140 #endif
141 
142   processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_FILE,
143 			       "", &result);
144 
145   return result;
146 }
147 
processx_connection_read_chars(SEXP con,SEXP nchars)148 SEXP processx_connection_read_chars(SEXP con, SEXP nchars) {
149 
150   processx_connection_t *ccon = R_ExternalPtrAddr(con);
151   SEXP result;
152   int cnchars = asInteger(nchars);
153   size_t utf8_chars, utf8_bytes;
154 
155   processx__connection_find_chars(ccon, cnchars, -1, &utf8_chars,
156 				  &utf8_bytes);
157 
158   result = PROTECT(ScalarString(mkCharLenCE(ccon->utf8, (int) utf8_bytes,
159 					    CE_UTF8)));
160   ccon->utf8_data_size -= utf8_bytes;
161   memmove(ccon->utf8, ccon->utf8 + utf8_bytes, ccon->utf8_data_size);
162 
163   UNPROTECT(1);
164   return result;
165 }
166 
processx_connection_read_lines(SEXP con,SEXP nlines)167 SEXP processx_connection_read_lines(SEXP con, SEXP nlines) {
168 
169   processx_connection_t *ccon = R_ExternalPtrAddr(con);
170   SEXP result;
171   int cn = asInteger(nlines);
172   ssize_t newline, eol = -1;
173   size_t lines_read = 0, l;
174   int eof = 0;
175   int slashr;
176 
177   processx__connection_find_lines(ccon, cn, &lines_read, &eof);
178 
179   result = PROTECT(allocVector(STRSXP, lines_read + eof));
180   for (l = 0, newline = -1; l < lines_read; l++) {
181     eol = processx__find_newline(ccon, newline + 1);
182     slashr = eol > 0 && ccon->utf8[eol - 1] == '\r';
183     SET_STRING_ELT(
184       result, l,
185       mkCharLenCE(ccon->utf8 + newline + 1,
186 		  (int) (eol - newline - 1 - slashr), CE_UTF8));
187     newline = eol;
188   }
189 
190   if (eof) {
191     eol = ccon->utf8_data_size - 1;
192     SET_STRING_ELT(
193       result, l,
194       mkCharLenCE(ccon->utf8 + newline + 1,
195 		  (int) (eol - newline), CE_UTF8));
196   }
197 
198   if (eol >= 0) {
199     ccon->utf8_data_size -= eol + 1;
200     memmove(ccon->utf8, ccon->utf8 + eol + 1, ccon->utf8_data_size);
201   }
202 
203   UNPROTECT(1);
204   return result;
205 }
206 
processx_connection_write_bytes(SEXP con,SEXP bytes)207 SEXP processx_connection_write_bytes(SEXP con, SEXP bytes) {
208   processx_connection_t *ccon = R_ExternalPtrAddr(con);
209   Rbyte *cbytes = RAW(bytes);
210   size_t nbytes = LENGTH(bytes);
211   SEXP result;
212 
213   ssize_t written = processx_c_connection_write_bytes(ccon, cbytes, nbytes);
214 
215   size_t left = nbytes - written;
216   PROTECT(result = allocVector(RAWSXP, left));
217   if (left > 0) memcpy(RAW(result), cbytes + written, left);
218 
219   UNPROTECT(1);
220   return result;
221 }
222 
processx_connection_is_eof(SEXP con)223 SEXP processx_connection_is_eof(SEXP con) {
224   processx_connection_t *ccon = R_ExternalPtrAddr(con);
225   if (!ccon) R_THROW_ERROR("Invalid connection object");
226   return ScalarLogical(ccon->is_eof_);
227 }
228 
processx_connection_close(SEXP con)229 SEXP processx_connection_close(SEXP con) {
230   processx_connection_t *ccon = R_ExternalPtrAddr(con);
231   if (!ccon) R_THROW_ERROR("Invalid connection object");
232   processx_c_connection_close(ccon);
233   return R_NilValue;
234 }
235 
processx_connection_is_closed(SEXP con)236 SEXP processx_connection_is_closed(SEXP con) {
237   processx_connection_t *ccon = R_ExternalPtrAddr(con);
238   if (!ccon) R_THROW_ERROR("Invalid connection object");
239   return ScalarLogical(processx_c_connection_is_closed(ccon));
240 }
241 
242 /* Poll connections and other pollable handles */
processx_connection_poll(SEXP pollables,SEXP timeout)243 SEXP processx_connection_poll(SEXP pollables, SEXP timeout) {
244   /* TODO: this is not used currently */
245   R_THROW_ERROR("Not implemented");
246   return R_NilValue;
247 }
248 
processx_connection_create_pipepair(SEXP encoding,SEXP nonblocking)249 SEXP processx_connection_create_pipepair(SEXP encoding, SEXP nonblocking) {
250   const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
251   int *c_nonblocking = LOGICAL(nonblocking);
252   SEXP result, con1, con2;
253 
254 #ifdef _WIN32
255   HANDLE h1, h2;
256   processx__create_pipe(0, &h1, &h2, "???");
257 
258 #else
259   int pipe[2], h1, h2;
260   processx__make_socketpair(pipe, NULL);
261   processx__nonblock_fcntl(pipe[0], c_nonblocking[0]);
262   processx__nonblock_fcntl(pipe[1], c_nonblocking[1]);
263   h1 = pipe[0];
264   h2 = pipe[1];
265 #endif
266 
267   processx_c_connection_create(h1, c_nonblocking[0] ?
268     PROCESSX_FILE_TYPE_ASYNCPIPE : PROCESSX_FILE_TYPE_PIPE, c_encoding,
269     &con1);
270   PROTECT(con1);
271   processx_c_connection_create(h2, c_nonblocking[1] ?
272     PROCESSX_FILE_TYPE_ASYNCPIPE : PROCESSX_FILE_TYPE_PIPE, c_encoding,
273     &con2);
274   PROTECT(con2);
275 
276   result = PROTECT(allocVector(VECSXP, 2));
277   SET_VECTOR_ELT(result, 0, con1);
278   SET_VECTOR_ELT(result, 1, con2);
279 
280   UNPROTECT(3);
281   return result;
282 }
283 
processx__connection_set_std(SEXP con,int which,int drop)284 SEXP processx__connection_set_std(SEXP con, int which, int drop) {
285   processx_connection_t *ccon = R_ExternalPtrAddr(con);
286   if (!ccon) R_THROW_ERROR("Invalid connection object");
287   SEXP result = R_NilValue;
288 
289 #ifdef _WIN32
290   int fd, ret;
291   if (!drop) {
292     int saved = _dup(which);
293     processx_file_handle_t os_handle;
294     if (saved == -1) {
295       R_THROW_POSIX_ERROR("Cannot save stdout/stderr for rerouting");
296     }
297     os_handle = (HANDLE) _get_osfhandle(saved) ;
298     processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_PIPE,
299 				 "", &result);
300   }
301 
302   fd = _open_osfhandle((intptr_t) ccon->handle.handle, 0);
303   ret = _dup2(fd, which);
304   if (ret) R_THROW_POSIX_ERROR("Cannot reroute stdout/stderr");
305 
306 #else
307   const char *what[] = { "stdin", "stdout", "stderr" };
308   int ret;
309   if (!drop) {
310     processx_file_handle_t os_handle = dup(which);
311     if (os_handle == -1) {
312       R_THROW_SYSTEM_ERROR("Cannot save %s for rerouting", what[which]);
313     }
314     processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_PIPE,
315 				 "", &result);
316   }
317   ret = dup2(ccon->handle, which);
318   if (ret == -1) {
319     R_THROW_SYSTEM_ERROR("Cannot reroute %s", what[which]);
320   }
321 #endif
322 
323   return result;
324 }
325 
processx_connection_set_stdout(SEXP con,SEXP drop)326 SEXP processx_connection_set_stdout(SEXP con, SEXP drop) {
327   return processx__connection_set_std(con, 1, LOGICAL(drop)[0]);
328 }
329 
processx_connection_set_stderr(SEXP con,SEXP drop)330 SEXP processx_connection_set_stderr(SEXP con, SEXP drop) {
331   return processx__connection_set_std(con, 2, LOGICAL(drop)[0]);
332 }
333 
processx_connection_get_fileno(SEXP con)334 SEXP processx_connection_get_fileno(SEXP con) {
335   processx_connection_t *ccon = R_ExternalPtrAddr(con);
336   if (!ccon) R_THROW_ERROR("Invalid connection object");
337   int fd;
338 
339 #ifdef _WIN32
340   fd = _open_osfhandle((intptr_t) ccon->handle.handle, 0);
341 #else
342   fd = ccon->handle;
343 #endif
344 
345   return ScalarInteger(fd);
346 }
347 
348 #ifdef _WIN32
349 
350 /*
351  * Clear the HANDLE_FLAG_INHERIT flag from all HANDLEs that were inherited
352  * the parent process. Don't check for errors - the stdio handles may not be
353  * valid, or may be closed already. There is no guarantee that this function
354  * does a perfect job.
355  */
356 
processx_connection_disable_inheritance()357 SEXP processx_connection_disable_inheritance() {
358   HANDLE handle;
359   STARTUPINFOW si;
360 
361   /* Make the windows stdio handles non-inheritable. */
362   handle = GetStdHandle(STD_INPUT_HANDLE);
363   if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
364     SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
365   }
366 
367   handle = GetStdHandle(STD_OUTPUT_HANDLE);
368   if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
369     SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
370   }
371 
372   handle = GetStdHandle(STD_ERROR_HANDLE);
373   if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
374     SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
375   }
376 
377   /* Make inherited CRT FDs non-inheritable. */
378   GetStartupInfoW(&si);
379   if (processx__stdio_verify(si.lpReserved2, si.cbReserved2)) {
380     processx__stdio_noinherit(si.lpReserved2);
381   }
382 
383   return R_NilValue;
384 }
385 
386 #else
387 
processx_connection_disable_inheritance()388 SEXP processx_connection_disable_inheritance() {
389   int fd;
390 
391   /* Set the CLOEXEC flag on all open descriptors. Unconditionally try the
392    * first 16 file descriptors. After that, bail out after the first error.
393    */
394   for (fd = 0; ; fd++) {
395     if (processx__cloexec_fcntl(fd, 1) && fd > 15) break;
396   }
397 
398   return R_NilValue;
399 }
400 
401 #endif
402 
403 /* Api from C -----------------------------------------------------------*/
404 
processx_c_connection_create(processx_file_handle_t os_handle,processx_file_type_t type,const char * encoding,SEXP * r_connection)405 processx_connection_t *processx_c_connection_create(
406   processx_file_handle_t os_handle,
407   processx_file_type_t type,
408   const char *encoding,
409   SEXP *r_connection) {
410 
411   processx_connection_t *con;
412   SEXP result, class;
413 
414   con = malloc(sizeof(processx_connection_t));
415   if (!con) R_THROW_ERROR("cannot create connection, out of memory");
416 
417   con->type = type;
418   con->is_closed_ = 0;
419   con->is_eof_  = 0;
420   con->is_eof_raw_ = 0;
421   con->close_on_destroy = 1;
422   con->iconv_ctx = 0;
423 
424   con->buffer = 0;
425   con->buffer_allocated_size = 0;
426   con->buffer_data_size = 0;
427 
428   con->utf8 = 0;
429   con->utf8_allocated_size = 0;
430   con->utf8_data_size = 0;
431 
432   con->encoding = 0;
433   if (encoding && encoding[0]) {
434     con->encoding = strdup(encoding);
435     if (!con->encoding) {
436       free(con);
437       R_THROW_ERROR("cannot create connection, out of memory");
438       return 0;			/* never reached */
439     }
440   }
441 
442 #ifdef _WIN32
443   con->handle.handle = os_handle;
444   memset(&con->handle.overlapped, 0, sizeof(OVERLAPPED));
445   con->handle.read_pending = FALSE;
446   con->handle.freelist = FALSE;
447 #else
448   con->handle = os_handle;
449 #endif
450 
451   if (r_connection) {
452     result = PROTECT(R_MakeExternalPtr(con, R_NilValue, R_NilValue));
453     R_RegisterCFinalizerEx(result, processx__connection_xfinalizer, 0);
454     class = PROTECT(ScalarString(mkChar("processx_connection")));
455     setAttrib(result, R_ClassSymbol, class);
456     *r_connection = result;
457     UNPROTECT(2);
458   }
459 
460   return con;
461 }
462 
463 /* Destroy */
processx_c_connection_destroy(processx_connection_t * ccon)464 void processx_c_connection_destroy(processx_connection_t *ccon) {
465 
466   if (!ccon) return;
467 
468   if (ccon->close_on_destroy) processx_c_connection_close(ccon);
469 
470   /* Even if not close_on_destroy, for us the connection is closed. */
471   ccon->is_closed_ = 1;
472 
473 #ifdef _WIN32
474   /* Check if we can free the connection. If there is a pending read,
475      then we cannot. In this case schedule_destroy will add it to a free
476      list and return 1. */
477   if (processx__connection_schedule_destroy(ccon)) return;
478 #endif
479 
480   if (ccon->iconv_ctx) {
481     Riconv_close(ccon->iconv_ctx);
482     ccon->iconv_ctx = NULL;
483   }
484 
485   if (ccon->buffer) { free(ccon->buffer); ccon->buffer = NULL; }
486   if (ccon->utf8) { free(ccon->utf8); ccon->utf8 = NULL; }
487   if (ccon->encoding) { free(ccon->encoding); ccon->encoding = NULL; }
488 
489  #ifdef _WIN32
490   if (ccon->handle.overlapped.hEvent) {
491     CloseHandle(ccon->handle.overlapped.hEvent);
492   }
493   ccon->handle.overlapped.hEvent = 0;
494 #endif
495 
496   free(ccon);
497 }
498 
499 /* Read characters */
processx_c_connection_read_chars(processx_connection_t * ccon,void * buffer,size_t nbyte)500 ssize_t processx_c_connection_read_chars(processx_connection_t *ccon,
501 					 void *buffer,
502 					 size_t nbyte) {
503   size_t utf8_chars, utf8_bytes;
504 
505   if (nbyte < 4) {
506     R_THROW_ERROR("Buffer size must be at least 4 bytes, to allow multibyte "
507 	  "characters");
508   }
509 
510   processx__connection_find_chars(ccon, -1, nbyte, &utf8_chars, &utf8_bytes);
511 
512   memcpy(buffer, ccon->utf8, utf8_bytes);
513   ccon->utf8_data_size -= utf8_bytes;
514   memmove(ccon->utf8, ccon->utf8 + utf8_bytes, ccon->utf8_data_size);
515 
516   return utf8_bytes;
517 }
518 
519 /**
520  * Read a single line, ending with \n
521  *
522  * The trailing \n character is not copied to the buffer.
523  *
524  * @param ccon Connection.
525  * @param linep Must point to a buffer pointer. If must not be NULL. If
526  *   the buffer pointer is NULL, it will be allocated. If it is not NULL,
527  *   it might be reallocated using `realloc`, as needed.
528  * @param linecapp Initial size of the buffer. It will be updated if the
529  *   buffer is newly allocated or reallocated.
530  * @return Number of characters read, not including the \n character.
531  *   It returns -1 on EOF. If the connection is not at EOF yet, but there
532  *   is nothing to read currently, it returns 0. If 0 is returned, `linep`
533  *   and `linecapp` are not touched.
534  *
535  */
processx_c_connection_read_line(processx_connection_t * ccon,char ** linep,size_t * linecapp)536 ssize_t processx_c_connection_read_line(processx_connection_t *ccon,
537 					char **linep, size_t *linecapp) {
538 
539   int eof = 0;
540   ssize_t newline;
541 
542   if (!linep) {
543     R_THROW_ERROR("cannot read line, linep cannot be a null pointer");
544   }
545   if (!linecapp) {
546     R_THROW_ERROR("cannot read line, linecapp cannot be a null pointer");
547   }
548 
549   if (ccon->is_eof_) return -1;
550 
551   /* Read until a newline character shows up, or there is nothing more
552      to read (at least for now). */
553   newline = processx__connection_read_until_newline(ccon);
554 
555   /* If there is no newline at the end of the file, we still add the
556      last line. */
557   if (ccon->is_eof_raw_ && ccon->utf8_data_size != 0 &&
558       ccon->buffer_data_size == 0 &&
559       ccon->utf8[ccon->utf8_data_size - 1] != '\n') {
560     eof = 1;
561   }
562 
563   /* We cannot serve a line currently. Maybe later. */
564   if (newline == -1 && ! eof) return 0;
565 
566   /* Newline will contain the end of the line now, even if EOF */
567   if (newline == -1) newline = ccon->utf8_data_size;
568   if (ccon->utf8[newline - 1] == '\r') newline--;
569 
570   if (! *linep) {
571     *linep = malloc(newline + 1);
572     *linecapp = newline + 1;
573   } else if (*linecapp < newline + 1) {
574     char *tmp = realloc(*linep, newline + 1);
575     if (!tmp) R_THROW_ERROR("cannot read line, out of memory");
576     *linep = tmp;
577     *linecapp = newline + 1;
578   }
579 
580   memcpy(*linep, ccon->utf8, newline);
581   (*linep)[newline] = '\0';
582 
583   if (!eof) {
584     ccon->utf8_data_size -= (newline + 1);
585     memmove(ccon->utf8, ccon->utf8 + newline + 1, ccon->utf8_data_size);
586   } else {
587     ccon->utf8_data_size = 0;
588   }
589 
590   return newline;
591 }
592 
593 /* Write bytes */
processx_c_connection_write_bytes(processx_connection_t * ccon,const void * buffer,size_t nbytes)594 ssize_t processx_c_connection_write_bytes(
595   processx_connection_t *ccon,
596   const void *buffer,
597   size_t nbytes) {
598 
599   PROCESSX_CHECK_VALID_CONN(ccon);
600 
601 #ifdef _WIN32
602   DWORD written;
603   BOOL ret = WriteFile(
604     /* hFile =                  */ ccon->handle.handle,
605     /* lpBuffer =               */ buffer,
606     /* nNumberOfBytesToWrite =  */ nbytes,
607     /* lpNumberOfBytesWritten = */ &written,
608     /* lpOverlapped =           */ NULL);
609   if (!ret) R_THROW_SYSTEM_ERROR("Cannot write connection");
610   return (ssize_t) written;
611 #else
612   /* Need to ignore SIGPIPE here, otherwise R might crash */
613   struct sigaction old_handler, new_handler;
614   memset(&new_handler, 0, sizeof(new_handler));
615   sigemptyset(&new_handler.sa_mask);
616   new_handler.sa_handler = SIG_IGN;
617   sigaction(SIGPIPE, &new_handler, &old_handler );
618 
619   ssize_t ret = write(ccon->handle, buffer, nbytes);
620 
621   sigaction(SIGPIPE, &old_handler, NULL );
622 
623   if (ret == -1) {
624     if (errno == EAGAIN || errno == EWOULDBLOCK) {
625       return 0;
626     } else {
627       R_THROW_SYSTEM_ERROR("Cannot write connection");
628     }
629   }
630   return ret;
631 #endif
632 }
633 
634 /* Check if the connection has ended */
processx_c_connection_is_eof(processx_connection_t * ccon)635 int processx_c_connection_is_eof(processx_connection_t *ccon) {
636   return ccon->is_eof_;
637 }
638 
639 /* Close */
processx_c_connection_close(processx_connection_t * ccon)640 void processx_c_connection_close(processx_connection_t *ccon) {
641 #ifdef _WIN32
642   if (ccon->handle.handle) {
643     CloseHandle(ccon->handle.handle);
644   }
645   ccon->handle.handle = 0;
646 #else
647   if (ccon->handle >= 0) close(ccon->handle);
648   ccon->handle = -1;
649 #endif
650   ccon->is_closed_ = 1;
651 }
652 
processx_c_connection_is_closed(processx_connection_t * ccon)653 int processx_c_connection_is_closed(processx_connection_t *ccon) {
654   return ccon->is_closed_;
655 }
656 
657 #ifdef _WIN32
658 
659 /* TODO: errors */
processx__socket_pair(SOCKET fds[2])660 int processx__socket_pair(SOCKET fds[2]) {
661   struct sockaddr_in inaddr;
662   struct sockaddr addr;
663   SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
664   memset(&inaddr, 0, sizeof(inaddr));
665   memset(&addr, 0, sizeof(addr));
666   inaddr.sin_family = AF_INET;
667   inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
668   inaddr.sin_port = 0;
669   int yes = 1;
670   setsockopt(lst, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes));
671   bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
672   listen(lst, 1);
673   int len = sizeof(inaddr);
674   getsockname(lst, &addr,&len);
675   fds[0] = socket(AF_INET, SOCK_STREAM, 0);
676   connect(fds[0], &addr, len);
677   fds[1] = accept(lst,0,0);
678   closesocket(lst);
679   return 0;
680 }
681 
processx_c_connection_poll(processx_pollable_t pollables[],size_t npollables,int timeout)682 int processx_c_connection_poll(processx_pollable_t pollables[],
683 			       size_t npollables, int timeout) {
684 
685   int hasdata = 0;
686   size_t i, j = 0, selj = 0;
687   int *ptr;
688   int timeleft = timeout;
689   DWORD bytes;
690   OVERLAPPED *overlapped = 0;
691   ULONG_PTR key;
692   int *events;
693 
694   FD_ZERO(&processx__readfds);
695   FD_ZERO(&processx__writefds);
696   FD_ZERO(&processx__exceptionfds);
697 
698   events = (int*) R_alloc(npollables, sizeof(int));
699 
700   /* First iteration, we call the pre-poll method, and collect the
701      handles for the IOCP, and the fds for select(). */
702   for (i = 0; i < npollables; i++) {
703     processx_pollable_t *el = pollables + i;
704     events[i] = PXSILENT;
705     if (el->pre_poll_func) events[i] = el->pre_poll_func(el);
706     switch (events[i]) {
707     case PXHANDLE:
708       j++;
709       break;
710     default:
711       break;
712     }
713   }
714 
715   /* j contains the number of IOCP handles to poll */
716 
717   ptr = (int*) R_alloc(j, sizeof(int));
718 
719   for (i = 0, j = 0; i < npollables; i++) {
720     processx_pollable_t *el = pollables + i;
721 
722     switch (events[i]) {
723     case PXNOPIPE:
724     case PXCLOSED:
725     case PXSILENT:
726       el->event = events[i];
727       break;
728 
729     case PXREADY:
730       hasdata++;
731       el->event = events[i];
732       break;
733 
734     case PXHANDLE:
735       el->event = PXSILENT;
736       ptr[j] = i;
737       j++;
738       break;
739 
740     case PXSELECT: {
741       SEXP elem;
742       el->event = PXSILENT;
743       int k, n;
744       elem = VECTOR_ELT(el->fds, 0);
745       n = LENGTH(elem);
746       selj += n;
747       for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__readfds);
748       elem = VECTOR_ELT(el->fds, 1);
749       n = LENGTH(elem);
750       selj += n;
751       for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__writefds);
752       elem = VECTOR_ELT(el->fds, 2);
753       n = LENGTH(elem);
754       selj += n;
755       for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__exceptionfds);
756     } }
757   }
758 
759   if (j == 0 && selj == 0) return hasdata;
760 
761   if (hasdata) timeout = timeleft = 0;
762 
763   if (selj != 0) {
764     processx__socket_pair(processx__notify_socket);
765     FD_SET(processx__notify_socket[0], &processx__readfds);
766     processx__select = 1;
767   } else {
768     processx__select = 0;
769   }
770 
771   while (timeout < 0 || timeleft >= 0) {
772     int poll_timeout;
773     if (timeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) {
774       poll_timeout = PROCESSX_INTERRUPT_INTERVAL;
775     } else {
776       poll_timeout = timeleft;
777     }
778 
779     BOOL sres;
780     if (selj == 0) {
781       sres = processx__thread_getstatus(&bytes, &key, &overlapped,
782 					poll_timeout);
783     } else {
784       sres = processx__thread_getstatus_select(&bytes, &key, &overlapped,
785 					       poll_timeout);
786     }
787     DWORD err = sres ? ERROR_SUCCESS : processx__thread_get_last_error();
788 
789     /* See if there was any data from the curl sockets */
790 
791     if (processx__select) {
792       for (i = 0; i < npollables; i++) {
793 	if (events[i] == PXSELECT) {
794 	  processx_pollable_t *el = pollables + i;
795 	  SEXP elem;
796 	  int k, n;
797 	  int has = 0;
798 	  elem = VECTOR_ELT(el->fds, 0);
799 	  n = LENGTH(elem);
800 	  for (k = 0; k < n; k++) {
801 	    if (FD_ISSET(INTEGER(elem)[k], &processx__readfds)) has = 1;
802 	    FD_SET(INTEGER(elem)[k], &processx__readfds);
803 	  }
804 	  elem = VECTOR_ELT(el->fds, 1);
805 	  n = LENGTH(elem);
806 	  for (k = 0; k < n; k++) {
807 	    if (FD_ISSET(INTEGER(elem)[k], &processx__writefds)) has = 1;
808 	    FD_SET(INTEGER(elem)[k], &processx__writefds);
809 	  }
810 	  elem = VECTOR_ELT(el->fds, 2);
811 	  n = LENGTH(elem);
812 	  for (k = 0; k < n; k++) {
813 	    if (FD_ISSET(INTEGER(elem)[k], &processx__exceptionfds)) has = 1;
814 	    FD_SET(INTEGER(elem)[k], &processx__exceptionfds);
815 	  }
816 	  if (has) {
817 	    el->event = PXEVENT;
818 	    hasdata++;
819 	  }
820 	}
821       }
822     }
823 
824     /* See if there was any data from the IOCP */
825 
826     if (overlapped) {
827       /* data */
828       processx_connection_t *con = (processx_connection_t*) key;
829       int poll_idx = con->poll_idx;
830       con->handle.read_pending = FALSE;
831       con->buffer_data_size += bytes;
832       if (con->buffer_data_size > 0) processx__connection_to_utf8(con);
833       if (con->type == PROCESSX_FILE_TYPE_ASYNCFILE) {
834 	/* TODO: larger files */
835 	con->handle.overlapped.Offset += bytes;
836       }
837 
838       if (!bytes) {
839 	con->is_eof_raw_ = 1;
840 	if (con->utf8_data_size == 0 && con->buffer_data_size == 0) {
841 	  con->is_eof_ = 1;
842 	}
843       }
844 
845       if (con->handle.freelist) processx__connection_freelist_remove(con);
846 
847       if (poll_idx < npollables &&
848 	  pollables[poll_idx].object == con) {
849 	pollables[poll_idx].event = PXREADY;
850 	hasdata++;
851       }
852     } else if (err != WAIT_TIMEOUT && err != ERROR_SUCCESS) {
853       R_THROW_SYSTEM_ERROR_CODE(err, "Cannot poll");
854     }
855 
856     if (hasdata) break;
857     R_CheckUserInterrupt();
858     timeleft -= PROCESSX_INTERRUPT_INTERVAL;
859   }
860 
861   if (hasdata == 0) {
862     for (i = 0; i < j; i++) pollables[ptr[i]].event = PXTIMEOUT;
863   }
864 
865   closesocket(processx__notify_socket[0]);
866   closesocket(processx__notify_socket[1]);
867 
868   return hasdata;
869 }
870 
871 #else
872 
processx__poll_decode(short code)873 static int processx__poll_decode(short code) {
874   if (code & POLLNVAL) return PXCLOSED;
875   if (code & POLLIN || code & POLLHUP || code & POLLOUT) return PXREADY;
876   return PXSILENT;
877 }
878 
879 /* Poll connections and other pollable handles */
processx_c_connection_poll(processx_pollable_t pollables[],size_t npollables,int timeout)880 int processx_c_connection_poll(processx_pollable_t pollables[],
881 			       size_t npollables, int timeout) {
882 
883   int hasdata = 0;
884   size_t i, j = 0;
885   struct pollfd *fds;
886   int *ptr;
887   int ret;
888   int *events;
889 
890   if (npollables == 0) return 0;
891 
892   /* Need to allocate this, because we need to put in the fds, maybe */
893   events = (int*) R_alloc(npollables, sizeof(int));
894 
895   /* First iteration, we call the pre-poll method, and collect the
896      fds to poll. */
897   for (i = 0; i < npollables; i++) {
898     processx_pollable_t *el = pollables + i;
899     events[i] = PXSILENT;
900     if (el->pre_poll_func) events[i] = el->pre_poll_func(el);
901     switch (events[i]) {
902     case PXHANDLE:
903       j++;
904       break;
905     case PXSELECT: {
906       /* This is three vectors of fds to poll, in an R list */
907       int w;
908       for (w = 0; w < 3; w++) {
909         j += LENGTH(VECTOR_ELT(el->fds, w));
910       } }
911     default:
912       break;
913     }
914   }
915 
916   /* j contains the number of fds to poll now */
917 
918   fds = (struct pollfd*) R_alloc(j, sizeof(struct pollfd));
919   ptr = (int*) R_alloc(j, sizeof(int));
920 
921   /* Need to go over them again, collect the ones that we need to poll */
922   for (i = 0, j = 0; i < npollables; i++) {
923     processx_pollable_t *el = pollables + i;
924     switch (events[i]) {
925     case PXNOPIPE:
926     case PXCLOSED:
927     case PXSILENT:
928       el->event = events[i];
929       break;
930 
931     case PXREADY:
932       hasdata++;
933       el->event = events[i];
934       break;
935 
936     case PXHANDLE:
937       el->event = PXSILENT;
938       fds[j].fd = el->handle;
939       fds[j].events = POLLIN;
940       fds[j].revents = 0;
941       ptr[j] = (int) i;
942       j++;
943       break;
944 
945     case PXSELECT: {
946       int pollevs[3] = { POLLIN, POLLOUT, POLLIN | POLLOUT };
947       int w;
948       el->event = PXSILENT;
949       for (w = 0; w < 3; w++) {
950         SEXP elem = VECTOR_ELT(el->fds, w);
951         int k, n = LENGTH(elem);
952         for (k = 0; k < n; k++) {
953           fds[j].fd = INTEGER(elem)[k];
954           fds[j].events = pollevs[w];
955           fds[j].revents = 0;
956           ptr[j] = (int) i;
957           j++;
958         }
959       }
960       break; }
961     }
962   }
963 
964   /* Nothing to poll */
965   if (j == 0) return hasdata;
966 
967   /* If we already have some data, then we don't wait any more,
968      just check if other connections are ready */
969   ret = processx__interruptible_poll(fds, (nfds_t) j,
970 				     hasdata > 0 ? 0 : timeout);
971 
972   if (ret == -1) {
973     R_THROW_SYSTEM_ERROR("Processx poll error");
974 
975   } else if (ret == 0) {
976     if (hasdata == 0) {
977       for (i = 0; i < j; i++) pollables[ptr[i]].event = PXTIMEOUT;
978     }
979 
980   } else {
981     for (i = 0; i < j; i++) {
982       if (events[ptr[i]] == PXSELECT) {
983         if (pollables[ptr[i]].event == PXSILENT) {
984           int ev = fds[i].revents;
985           if (ev & (POLLNVAL | POLLIN | POLLHUP | POLLOUT)) {
986             pollables[ptr[i]].event = PXEVENT;
987           }
988         }
989       } else {
990         pollables[ptr[i]].event = processx__poll_decode(fds[i].revents);
991         hasdata += (pollables[ptr[i]].event == PXREADY);
992       }
993     }
994   }
995 
996   return hasdata;
997 }
998 
999 #endif
1000 
1001 #ifdef _WIN32
1002 
processx__connection_start_read(processx_connection_t * ccon)1003 void processx__connection_start_read(processx_connection_t *ccon) {
1004   DWORD bytes_read;
1005   BOOLEAN res;
1006   size_t todo;
1007 
1008   if (! ccon->handle.handle) return;
1009 
1010   if (ccon->handle.read_pending) return;
1011 
1012   if (!ccon->buffer) processx__connection_alloc(ccon);
1013 
1014   todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1015 
1016   res = processx__thread_readfile(
1017     ccon,
1018     ccon->buffer + ccon->buffer_data_size,
1019     todo,
1020     &bytes_read);
1021 
1022   if (!res) {
1023     DWORD err = processx__thread_get_last_error();
1024     if (err == ERROR_BROKEN_PIPE || err == ERROR_HANDLE_EOF) {
1025       ccon->is_eof_raw_ = 1;
1026       if (ccon->utf8_data_size == 0 && ccon->buffer_data_size == 0) {
1027         ccon->is_eof_ = 1;
1028       }
1029       if (ccon->buffer_data_size) processx__connection_to_utf8(ccon);
1030     } else if (err == ERROR_IO_PENDING) {
1031       ccon->handle.read_pending = TRUE;
1032     } else {
1033       ccon->handle.read_pending = FALSE;
1034       R_THROW_SYSTEM_ERROR_CODE(err, "reading from connection");
1035     }
1036   } else {
1037     /* Returned synchronously, but the event will be still signalled,
1038        so we just drop the sync data for now. */
1039     ccon->handle.read_pending  = TRUE;
1040   }
1041 }
1042 
1043 #endif
1044 
1045 /* Poll a connection
1046  *
1047  * Checks if there is anything in the buffer. If yes, it returns
1048  * PXREADY. Otherwise it returns the handle.
1049  *
1050  * We can read immediately (without an actual device read), potentially:
1051  * 1. if the connection is already closed, we return PXCLOSED
1052  * 2. if the connection is already EOF, we return PXREADY
1053  * 3. if there is data in the UTF8 buffer, we return PXREADY
1054  * 4. if there is data in the raw buffer, and the raw file was EOF, we
1055  *    return PXREADY, because we can surely return something, even if the
1056  *    raw buffer has incomplete UTF8 characters.
1057  * 5. otherwise, if there is something in the raw buffer, we try
1058  *    to convert it to UTF8.
1059  */
1060 
1061 #define PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY do {			\
1062   if (!ccon) return PXNOPIPE;						\
1063   if (ccon->is_closed_) return PXCLOSED;				\
1064   if (ccon->is_eof_) return PXREADY;					\
1065   if (ccon->utf8_data_size > 0) return PXREADY;				\
1066   if (ccon->buffer_data_size > 0 && ccon->is_eof_raw_) return PXREADY;	\
1067   if (ccon->buffer_data_size > 0) {					\
1068     processx__connection_to_utf8(ccon);					\
1069     if (ccon->utf8_data_size > 0) return PXREADY;			\
1070   } } while (0)
1071 
processx_i_pre_poll_func_connection(processx_pollable_t * pollable)1072 int processx_i_pre_poll_func_connection(processx_pollable_t *pollable) {
1073 
1074   processx_connection_t *ccon = pollable->object;
1075 
1076   PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY;
1077 
1078 #ifdef _WIN32
1079   processx__connection_start_read(ccon);
1080   /* Starting to read may actually get some data, or an EOF, so check again */
1081   PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY;
1082   pollable->handle = ccon->handle.overlapped.hEvent;
1083 #else
1084   pollable->handle = ccon->handle;
1085 #endif
1086 
1087   return PXHANDLE;
1088 }
1089 
processx_c_pollable_from_connection(processx_pollable_t * pollable,processx_connection_t * ccon)1090 int processx_c_pollable_from_connection(
1091   processx_pollable_t *pollable,
1092   processx_connection_t *ccon) {
1093 
1094   pollable->pre_poll_func = processx_i_pre_poll_func_connection;
1095   pollable->object = ccon;
1096   pollable->free = 0;
1097   pollable->fds = R_NilValue;
1098   return 0;
1099 }
1100 
processx_i_pre_poll_func_curl(processx_pollable_t * pollable)1101 int processx_i_pre_poll_func_curl(processx_pollable_t *pollable) {
1102   return PXSELECT;
1103 }
1104 
processx_c_pollable_from_curl(processx_pollable_t * pollable,SEXP fds)1105 int processx_c_pollable_from_curl(
1106   processx_pollable_t *pollable,
1107   SEXP fds) {
1108 
1109   pollable->pre_poll_func = processx_i_pre_poll_func_curl;
1110   pollable->object = NULL;
1111   pollable->free = 0;
1112   pollable->fds = fds;
1113   return 0;
1114 }
1115 
processx_c_connection_fileno(const processx_connection_t * con)1116 processx_file_handle_t processx_c_connection_fileno(
1117   const processx_connection_t *con) {
1118 #ifdef _WIN32
1119   return con->handle.handle;
1120 #else
1121   return con->handle;
1122 #endif
1123 }
1124 
1125 /* --------------------------------------------------------------------- */
1126 /* Internals                                                             */
1127 /* --------------------------------------------------------------------- */
1128 
1129 /**
1130  * Work out how many UTF-8 characters we can read
1131  *
1132  * It might try to read more data, but it does not modify the buffer
1133  * otherwise.
1134  *
1135  * @param ccon Connection.
1136  * @param maxchars Maximum number of characters to find.
1137  * @param maxbytes Maximum number of bytes to check while searching.
1138  * @param chars Number of characters found is stored here.
1139  * @param bytes Number of bytes the `chars` characters span.
1140  *
1141  */
1142 
processx__connection_find_chars(processx_connection_t * ccon,ssize_t maxchars,ssize_t maxbytes,size_t * chars,size_t * bytes)1143 static void processx__connection_find_chars(processx_connection_t *ccon,
1144 					    ssize_t maxchars,
1145 					    ssize_t maxbytes,
1146 					    size_t *chars,
1147 					    size_t *bytes) {
1148 
1149   int should_read_more;
1150 
1151   PROCESSX_CHECK_VALID_CONN(ccon);
1152 
1153   should_read_more = ! ccon->is_eof_ && ccon->utf8_data_size == 0;
1154   if (should_read_more) processx__connection_read(ccon);
1155 
1156   if (ccon->utf8_data_size == 0 || maxchars == 0) { *bytes = 0; return; }
1157 
1158   /* At at most cnchars characters from the UTF8 buffer */
1159   processx__connection_find_utf8_chars(ccon, maxchars, maxbytes, chars,
1160 				       bytes);
1161 }
1162 
1163 /**
1164  * Find one or more lines in the buffer
1165  *
1166  * Since the buffer is UTF-8 encoded, `\n` is assumed as end-of-line
1167  * character.
1168  *
1169  * @param ccon Connection.
1170  * @param maxlines Maximum number of lines to find.
1171  * @param lines Number of lines found is stored here.
1172  * @param eof If the end of the file is reached, and there is no `\n`
1173  *   at the end of the file, this is set to 1.
1174  *
1175  */
1176 
processx__connection_find_lines(processx_connection_t * ccon,ssize_t maxlines,size_t * lines,int * eof)1177 static void processx__connection_find_lines(processx_connection_t *ccon,
1178 					    ssize_t maxlines,
1179 					    size_t *lines,
1180 					    int *eof ) {
1181 
1182   ssize_t newline;
1183 
1184   *eof = 0;
1185 
1186   if (maxlines < 0) maxlines = 1000;
1187 
1188   PROCESSX_CHECK_VALID_CONN(ccon);
1189 
1190   /* Read until a newline character shows up, or there is nothing more
1191      to read (at least for now). */
1192   newline = processx__connection_read_until_newline(ccon);
1193 
1194   /* Count the number of lines we got. */
1195   while (newline != -1 && *lines < maxlines) {
1196     (*lines) ++;
1197     newline = processx__find_newline(ccon, /* start = */ newline + 1);
1198   }
1199 
1200   /* If there is no newline at the end of the file, we still add the
1201      last line. */
1202   if (ccon->is_eof_raw_ && ccon->utf8_data_size != 0 &&
1203       ccon->buffer_data_size == 0 &&
1204       ccon->utf8[ccon->utf8_data_size - 1] != '\n') {
1205     *eof = 1;
1206   }
1207 
1208 }
1209 
processx__connection_xfinalizer(SEXP con)1210 static void processx__connection_xfinalizer(SEXP con) {
1211   processx_connection_t *ccon = R_ExternalPtrAddr(con);
1212   processx_c_connection_destroy(ccon);
1213 }
1214 
processx__find_newline(processx_connection_t * ccon,size_t start)1215 static ssize_t processx__find_newline(processx_connection_t *ccon,
1216 				     size_t start) {
1217 
1218   if (ccon->utf8_data_size == 0) return -1;
1219   const char *ret = ccon->utf8 + start;
1220   const char *end = ccon->utf8 + ccon->utf8_data_size;
1221 
1222   while (ret < end && *ret != '\n') ret++;
1223 
1224   if (ret < end) return ret - ccon->utf8; else return -1;
1225 }
1226 
processx__connection_read_until_newline(processx_connection_t * ccon)1227 static ssize_t processx__connection_read_until_newline
1228   (processx_connection_t *ccon) {
1229 
1230   char *ptr, *end;
1231 
1232   /* Make sure we try to have something, unless EOF */
1233   if (ccon->utf8_data_size == 0) processx__connection_read(ccon);
1234   if (ccon->utf8_data_size == 0) return -1;
1235 
1236   /* We have sg in the utf8 at this point */
1237 
1238   ptr = ccon->utf8;
1239   end = ccon->utf8 + ccon->utf8_data_size;
1240   while (1) {
1241     ssize_t new_bytes;
1242     while (ptr < end && *ptr != '\n') ptr++;
1243 
1244     /* Have we found a newline? */
1245     if (ptr < end) return ptr - ccon->utf8;
1246 
1247     /* No newline, but EOF? */
1248     if (ccon->is_eof_) return -1;
1249 
1250     /* Maybe we can read more, but might need a bigger utf8.
1251      * The 8 bytes is definitely more than what we need for a UTF8
1252      * character, and this makes sure that we don't stop just because
1253      * no more UTF8 characters fit in the UTF8 buffer. */
1254     if (ccon->utf8_data_size >= ccon->utf8_allocated_size - 8) {
1255       size_t ptrnum = ptr - ccon->utf8;
1256       size_t endnum = end - ccon->utf8;
1257       processx__connection_realloc(ccon);
1258       ptr = ccon->utf8 + ptrnum;
1259       end = ccon->utf8 + endnum;
1260     }
1261     new_bytes = processx__connection_read(ccon);
1262 
1263     /* If we cannot read now, then we give up */
1264     if (new_bytes == 0) return -1;
1265   }
1266 }
1267 
1268 /* Allocate buffer for reading */
1269 
processx__connection_alloc(processx_connection_t * ccon)1270 static void processx__connection_alloc(processx_connection_t *ccon) {
1271   ccon->buffer = malloc(64 * 1024);
1272   if (!ccon->buffer) R_THROW_ERROR("Cannot allocate memory for processx buffer");
1273   ccon->buffer_allocated_size = 64 * 1024;
1274   ccon->buffer_data_size = 0;
1275 
1276   ccon->utf8 = malloc(64 * 1024);
1277   if (!ccon->utf8) {
1278     free(ccon->buffer);
1279     R_THROW_ERROR("Cannot allocate memory for processx buffer");
1280   }
1281   ccon->utf8_allocated_size = 64 * 1024;
1282   ccon->utf8_data_size = 0;
1283 }
1284 
1285 /* We only really need to re-alloc the UTF8 buffer, because the
1286    other buffer is transient, even if there are no newline characters. */
1287 
processx__connection_realloc(processx_connection_t * ccon)1288 static void processx__connection_realloc(processx_connection_t *ccon) {
1289   size_t new_size = (size_t) (ccon->utf8_allocated_size * 1.2);
1290   void *nb;
1291   if (new_size == ccon->utf8_allocated_size) new_size = 2 * new_size;
1292   nb = realloc(ccon->utf8, new_size);
1293   if (!nb) R_THROW_ERROR("Cannot allocate memory for processx line");
1294   ccon->utf8 = nb;
1295   ccon->utf8_allocated_size = new_size;
1296 }
1297 
1298 /* Read as much as we can. This is the only function that explicitly
1299    works with the raw buffer. It is also the only function that actually
1300    reads from the data source.
1301 
1302    When this is called, the UTF8 buffer is probably empty, but the raw
1303    buffer might not be. */
1304 
1305 #ifdef _WIN32
1306 
processx__connection_read(processx_connection_t * ccon)1307 ssize_t processx__connection_read(processx_connection_t *ccon) {
1308   DWORD todo, bytes_read = 0;
1309 
1310   /* Nothing to read, nothing to convert to UTF8 */
1311   if (ccon->is_eof_raw_ && ccon->buffer_data_size == 0) {
1312     if (ccon->utf8_data_size == 0) ccon->is_eof_ = 1;
1313     return 0;
1314   }
1315 
1316   if (!ccon->buffer) processx__connection_alloc(ccon);
1317 
1318   /* If cannot read anything more, then try to convert to UTF8 */
1319   todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1320   if (todo == 0) return processx__connection_to_utf8(ccon);
1321 
1322   /* Otherwise we read. If there is no read pending, we start one. */
1323   processx__connection_start_read(ccon);
1324 
1325   /* A read might be pending at this point. See if it has finished. */
1326   if (ccon->handle.read_pending) {
1327     ULONG_PTR key;
1328     DWORD bytes;
1329     OVERLAPPED *overlapped = 0;
1330 
1331     while (1) {
1332       BOOL sres = processx__thread_getstatus(&bytes, &key, &overlapped, 0);
1333       DWORD err = sres ? ERROR_SUCCESS : processx__thread_get_last_error();
1334       if (overlapped) {
1335 	processx_connection_t *con = (processx_connection_t *) key;
1336 	con->handle.read_pending = FALSE;
1337 	con->buffer_data_size += bytes;
1338 	if (con->buffer && con->buffer_data_size > 0) {
1339 	  bytes = processx__connection_to_utf8(con);
1340 	}
1341 	if (con->type == PROCESSX_FILE_TYPE_ASYNCFILE) {
1342 	  /* TODO: large files */
1343 	  con->handle.overlapped.Offset += bytes;
1344 	}
1345 	if (!bytes) {
1346 	  con->is_eof_raw_ = 1;
1347 	  if (con->utf8_data_size == 0 && con->buffer_data_size == 0) {
1348 	    con->is_eof_ = 1;
1349 	  }
1350 	}
1351 
1352 	if (con->handle.freelist) processx__connection_freelist_remove(con);
1353 
1354 	if (con == ccon) {
1355 	  bytes_read = bytes;
1356 	  break;
1357 	}
1358 
1359       } else if (err != WAIT_TIMEOUT) {
1360 	R_THROW_SYSTEM_ERROR_CODE(err, "Read error");
1361 
1362       } else {
1363 	break;
1364       }
1365     }
1366   }
1367 
1368   return bytes_read;
1369 }
1370 
1371 #else
1372 
processx__connection_read(processx_connection_t * ccon)1373 static ssize_t processx__connection_read(processx_connection_t *ccon) {
1374   ssize_t todo, bytes_read;
1375 
1376   /* Nothing to read, nothing to convert to UTF8 */
1377   if (ccon->is_eof_raw_ && ccon->buffer_data_size == 0) {
1378     if (ccon->utf8_data_size == 0) ccon->is_eof_ = 1;
1379     return 0;
1380   }
1381 
1382   if (!ccon->buffer) processx__connection_alloc(ccon);
1383 
1384   /* If cannot read anything more, then try to convert to UTF8 */
1385   todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1386   if (todo == 0) return processx__connection_to_utf8(ccon);
1387 
1388   /* Otherwise we read */
1389   bytes_read = read(ccon->handle, ccon->buffer + ccon->buffer_data_size, todo);
1390 
1391   if (bytes_read == 0) {
1392     /* EOF */
1393     ccon->is_eof_raw_ = 1;
1394     if (ccon->utf8_data_size == 0 && ccon->buffer_data_size == 0) {
1395       ccon->is_eof_ = 1;
1396     }
1397 
1398   } else if (bytes_read == -1 && errno == EAGAIN) {
1399     /* There is still data to read, potentially */
1400     bytes_read = 0;
1401 
1402   } else if (bytes_read == -1) {
1403     /* Proper error  */
1404     R_THROW_SYSTEM_ERROR("Cannot read from processx connection");
1405   }
1406 
1407   ccon->buffer_data_size += bytes_read;
1408 
1409   /* If there is anything to convert to UTF8, try converting */
1410   if (ccon->buffer_data_size > 0) {
1411     bytes_read = processx__connection_to_utf8(ccon);
1412   } else {
1413     bytes_read = 0;
1414   }
1415 
1416   return bytes_read;
1417 }
1418 #endif
1419 
processx__connection_to_utf8(processx_connection_t * ccon)1420 static ssize_t processx__connection_to_utf8(processx_connection_t *ccon) {
1421 
1422   const char *inbuf, *inbufold;
1423   char *outbuf, *outbufold;
1424   size_t inbytesleft = ccon->buffer_data_size;
1425   size_t outbytesleft = ccon->utf8_allocated_size - ccon->utf8_data_size;
1426   size_t r, indone = 0, outdone = 0;
1427   int moved = 0;
1428   const char *emptystr = "";
1429   const char *encoding = ccon->encoding ? ccon->encoding : emptystr;
1430 
1431   inbuf = inbufold = ccon->buffer;
1432   outbuf = outbufold = ccon->utf8 + ccon->utf8_data_size;
1433 
1434   /* If we this is the first time we are here. */
1435   if (! ccon->iconv_ctx) ccon->iconv_ctx = Riconv_open("UTF-8", encoding);
1436 
1437   /* If nothing to do, or no space to do more, just return */
1438   if (inbytesleft == 0 || outbytesleft == 0) return 0;
1439 
1440   while (!moved) {
1441     r = Riconv(ccon->iconv_ctx, &inbuf, &inbytesleft, &outbuf,
1442 	       &outbytesleft);
1443     moved = 1;
1444 
1445     if (r == (size_t) -1) {
1446       /* Error */
1447       if (errno == E2BIG) {
1448 	/* Output buffer is full, that's fine, we'll try later.
1449 	   Just use what we have done so far. */
1450 
1451       } else if (errno == EILSEQ) {
1452 	/* Invalid characters in encoding, *inbuf points to the beginning
1453 	   of the invalid sequence. We can just try to remove this, and
1454 	   convert again? */
1455 	inbuf++; inbytesleft--;
1456 	if (inbytesleft > 0) moved = 0;
1457 
1458       } else if (errno == EINVAL) {
1459 	/* Does not end with a complete multi-byte character */
1460 	/* This is fine, we'll handle it later, unless we are at the end */
1461 	if (ccon->is_eof_raw_) {
1462 	  warning("Invalid multi-byte character at end of stream ignored");
1463 	  inbuf += inbytesleft; inbytesleft = 0;
1464 	}
1465       }
1466     }
1467   }
1468 
1469   /* We converted 'r' bytes, update the buffer structure accordingly */
1470   indone = inbuf - inbufold;
1471   outdone = outbuf - outbufold;
1472   if (outdone > 0 || indone > 0) {
1473     ccon->buffer_data_size -= indone;
1474     memmove(ccon->buffer, ccon->buffer + indone, ccon->buffer_data_size);
1475     ccon->utf8_data_size += outdone;
1476   }
1477 
1478   return outdone;
1479 }
1480 
1481 /* Try to get at max 'max' UTF8 characters from the buffer. Return the
1482  * number of characters found, and also the corresponding number of
1483  * bytes. */
1484 
1485 /* Number of additional bytes */
1486 static const unsigned char processx__utf8_length[] = {
1487   2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
1488   2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
1489   3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,
1490   4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6 };
1491 
processx__connection_find_utf8_chars(processx_connection_t * ccon,ssize_t maxchars,ssize_t maxbytes,size_t * chars,size_t * bytes)1492 static void processx__connection_find_utf8_chars(processx_connection_t *ccon,
1493 						 ssize_t maxchars,
1494 						 ssize_t maxbytes,
1495 						 size_t *chars,
1496 						 size_t *bytes) {
1497 
1498   char *ptr = ccon->utf8;
1499   char *end = ccon->utf8 + ccon->utf8_data_size;
1500   size_t length = ccon->utf8_data_size;
1501   *chars = *bytes = 0;
1502 
1503   while (maxchars != 0 && maxbytes != 0 && ptr < end) {
1504     int clen, c = (unsigned char) *ptr;
1505 
1506     /* ASCII byte */
1507     if (c < 128) {
1508       (*chars) ++; (*bytes) ++; ptr++; length--;
1509       if (maxchars > 0) maxchars--;
1510       if (maxbytes > 0) maxbytes--;
1511       continue;
1512     }
1513 
1514     /* Catch some errors */
1515     if (c <  0xc0) goto invalid;
1516     if (c >= 0xfe) goto invalid;
1517 
1518     clen = processx__utf8_length[c & 0x3f];
1519     if (length < clen) goto invalid;
1520     if (maxbytes > 0 && clen > maxbytes) break;
1521     (*chars) ++; (*bytes) += clen; ptr += clen; length -= clen;
1522     if (maxchars > 0) maxchars--;
1523     if (maxbytes > 0) maxbytes -= clen;
1524   }
1525 
1526   return;
1527 
1528  invalid:
1529   R_THROW_ERROR("Invalid UTF-8 string, internal error");
1530 }
1531 
1532 #ifndef _WIN32
1533 
processx__interruptible_poll(struct pollfd fds[],nfds_t nfds,int timeout)1534 int processx__interruptible_poll(struct pollfd fds[],
1535 				 nfds_t nfds, int timeout) {
1536   int ret = 0;
1537   int timeleft = timeout;
1538 
1539   while (timeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) {
1540     do {
1541       ret = poll(fds, nfds, PROCESSX_INTERRUPT_INTERVAL);
1542     } while (ret == -1 && errno == EINTR);
1543 
1544     /* If not a timeout, then return */
1545     if (ret != 0) return ret;
1546 
1547     R_CheckUserInterrupt();
1548     timeleft -= PROCESSX_INTERRUPT_INTERVAL;
1549   }
1550 
1551   /* Maybe we are not done, and there is a little left from the timeout */
1552   if (timeleft >= 0) {
1553     do {
1554       ret = poll(fds, nfds, timeleft);
1555     } while (ret == -1 && errno == EINTR);
1556   }
1557 
1558   return ret;
1559 }
1560 
1561 #endif
1562 
1563 #ifdef _WIN32
1564 
1565 processx__connection_freelist_t freelist_head = { 0, 0 };
1566 processx__connection_freelist_t *freelist = &freelist_head;
1567 
processx__connection_freelist_add(processx_connection_t * ccon)1568 int processx__connection_freelist_add(processx_connection_t *ccon) {
1569   if (ccon->handle.freelist) return 0;
1570   processx__connection_freelist_t *node =
1571     calloc(1, sizeof(processx__connection_freelist_t));
1572   if (!node) R_THROW_ERROR("Cannot add to connection freelist, this is a leak");
1573 
1574   node->ccon = ccon;
1575   node->next = freelist->next;
1576   freelist->next = node;
1577   ccon->handle.freelist = TRUE;
1578 
1579   return 0;
1580 }
1581 
processx__connection_freelist_remove(processx_connection_t * ccon)1582 void processx__connection_freelist_remove(processx_connection_t *ccon) {
1583   processx__connection_freelist_t *prev = freelist, *ptr = freelist->next;
1584   while (ptr) {
1585     if (ptr->ccon == ccon) {
1586       prev->next = ptr->next;
1587       free(ptr);
1588       return;
1589     }
1590     prev = ptr;
1591     ptr = ptr->next;
1592   }
1593 }
1594 
processx__connection_schedule_destroy(processx_connection_t * ccon)1595 int processx__connection_schedule_destroy(processx_connection_t *ccon) {
1596   /* The connection is already closed here, but reads might still be
1597      pending... if this is the case, then we add the connection to the
1598      free list. */
1599   if (ccon->handle.read_pending) {
1600     processx__connection_freelist_add(ccon);
1601     return 1;
1602 
1603   } else {
1604     return 0;
1605   }
1606 }
1607 
1608 #endif
1609 
1610 #ifdef _WIN32
1611 
processx_is_valid_fd(SEXP fd)1612 SEXP processx_is_valid_fd(SEXP fd) {
1613   int cfd = INTEGER(fd)[0];
1614   HANDLE hnd = (HANDLE) _get_osfhandle(cfd);
1615   int valid =
1616     hnd != INVALID_HANDLE_VALUE &&
1617     hnd != NULL &&
1618     hnd != (HANDLE) (-2);
1619   return ScalarLogical(valid);
1620 }
1621 
1622 #else
1623 
processx_is_valid_fd(SEXP fd)1624 SEXP processx_is_valid_fd(SEXP fd) {
1625   int cfd = INTEGER(fd)[0];
1626   errno = 0;
1627   int valid = fcntl(cfd, F_GETFD) != -1 || errno != EBADF;
1628   return ScalarLogical(valid);
1629 }
1630 
1631 #endif
1632 
1633 #undef PROCESSX_CHECK_VALID_CONN
1634