1
2 #include "processx-connection.h"
3
4 #include <string.h>
5 #include <stdlib.h>
6 #include <errno.h>
7 #include <sys/types.h>
8 #include <unistd.h>
9
10 #ifndef _WIN32
11 #include <sys/uio.h>
12 #include <poll.h>
13 #else
14 #include <io.h>
15 #endif
16
17 #include "processx.h"
18
19 #ifdef _WIN32
20 #include "win/processx-win.h"
21 #else
22 #include "unix/processx-unix.h"
23 #endif
24
25 /* Internal functions in this file */
26
27 static void processx__connection_find_chars(processx_connection_t *ccon,
28 ssize_t maxchars,
29 ssize_t maxbytes,
30 size_t *chars,
31 size_t *bytes);
32
33 static void processx__connection_find_lines(processx_connection_t *ccon,
34 ssize_t maxlines,
35 size_t *lines,
36 int *eof);
37
38 static void processx__connection_alloc(processx_connection_t *ccon);
39 static void processx__connection_realloc(processx_connection_t *ccon);
40 static ssize_t processx__connection_read(processx_connection_t *ccon);
41 static ssize_t processx__find_newline(processx_connection_t *ccon,
42 size_t start);
43 static ssize_t processx__connection_read_until_newline(processx_connection_t
44 *ccon);
45 static void processx__connection_xfinalizer(SEXP con);
46 static ssize_t processx__connection_to_utf8(processx_connection_t *ccon);
47 static void processx__connection_find_utf8_chars(processx_connection_t *ccon,
48 ssize_t maxchars,
49 ssize_t maxbytes,
50 size_t *chars,
51 size_t *bytes);
52
53 #ifdef _WIN32
54 #define PROCESSX_CHECK_VALID_CONN(x) do { \
55 if (!x) R_THROW_ERROR("Invalid connection object"); \
56 if (!(x)->handle.handle) { \
57 R_THROW_ERROR("Invalid (uninitialized or closed?) connection object"); \
58 } \
59 } while (0)
60 #else
61 #define PROCESSX_CHECK_VALID_CONN(x) do { \
62 if (!x) R_THROW_ERROR("Invalid connection object"); \
63 if ((x)->handle < 0) { \
64 R_THROW_ERROR("Invalid (uninitialized or closed?) connection object"); \
65 } \
66 } while (0)
67 #endif
68
69 /* --------------------------------------------------------------------- */
70 /* API from R */
71 /* --------------------------------------------------------------------- */
72
processx_connection_create(SEXP handle,SEXP encoding)73 SEXP processx_connection_create(SEXP handle, SEXP encoding) {
74 processx_file_handle_t *os_handle = R_ExternalPtrAddr(handle);
75 const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
76 SEXP result = R_NilValue;
77
78 if (!os_handle) R_THROW_ERROR("Cannot create connection, invalid handle");
79
80 processx_c_connection_create(*os_handle, PROCESSX_FILE_TYPE_ASYNCPIPE,
81 c_encoding, &result);
82 return result;
83 }
84
processx_connection_create_fd(SEXP handle,SEXP encoding,SEXP close)85 SEXP processx_connection_create_fd(SEXP handle, SEXP encoding, SEXP close) {
86 int fd = INTEGER(handle)[0];
87 const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
88 processx_file_handle_t os_handle;
89 processx_connection_t *con;
90 SEXP result = R_NilValue;
91
92 #ifdef _WIN32
93 os_handle = (HANDLE) _get_osfhandle(fd);
94 #else
95 os_handle = fd;
96 #endif
97
98 con = processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_ASYNCPIPE,
99 c_encoding, &result);
100
101 if (! LOGICAL(close)[0]) con->close_on_destroy = 0;
102
103 return result;
104 }
105
processx_connection_create_file(SEXP filename,SEXP read,SEXP write)106 SEXP processx_connection_create_file(SEXP filename, SEXP read, SEXP write) {
107 const char *c_filename = CHAR(STRING_ELT(filename, 0));
108 int c_read = LOGICAL(read)[0];
109 int c_write = LOGICAL(write)[0];
110 SEXP result = R_NilValue;
111 processx_file_handle_t os_handle;
112
113 #ifdef _WIN32
114 DWORD access = 0, create = 0;
115 if (c_read) access |= GENERIC_READ;
116 if (c_write) access |= GENERIC_WRITE;
117 if (c_read) create |= OPEN_EXISTING;
118 if (c_write) create |= CREATE_ALWAYS;
119 os_handle = CreateFile(
120 /* lpFilename = */ c_filename,
121 /* dwDesiredAccess = */ access,
122 /* dwShareMode = */ 0,
123 /* lpSecurityAttributes = */ NULL,
124 /* dwCreationDisposition = */ create,
125 /* dwFlagsAndAttributes = */ FILE_ATTRIBUTE_NORMAL,
126 /* hTemplateFile = */ NULL);
127 if (os_handle == INVALID_HANDLE_VALUE) {
128 R_THROW_SYSTEM_ERROR("Cannot open file `%s`", c_filename);
129 }
130
131 #else
132 int flags = 0;
133 if ( c_read && !c_write) flags |= O_RDONLY;
134 if (!c_read && c_write) flags |= O_WRONLY | O_CREAT | O_TRUNC;
135 if ( c_read && c_write) flags |= O_RDWR;
136 os_handle = open(c_filename, flags, 0644);
137 if (os_handle == -1) {
138 R_THROW_SYSTEM_ERROR("Cannot open file `%s`", c_filename);
139 }
140 #endif
141
142 processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_FILE,
143 "", &result);
144
145 return result;
146 }
147
processx_connection_read_chars(SEXP con,SEXP nchars)148 SEXP processx_connection_read_chars(SEXP con, SEXP nchars) {
149
150 processx_connection_t *ccon = R_ExternalPtrAddr(con);
151 SEXP result;
152 int cnchars = asInteger(nchars);
153 size_t utf8_chars, utf8_bytes;
154
155 processx__connection_find_chars(ccon, cnchars, -1, &utf8_chars,
156 &utf8_bytes);
157
158 result = PROTECT(ScalarString(mkCharLenCE(ccon->utf8, (int) utf8_bytes,
159 CE_UTF8)));
160 ccon->utf8_data_size -= utf8_bytes;
161 memmove(ccon->utf8, ccon->utf8 + utf8_bytes, ccon->utf8_data_size);
162
163 UNPROTECT(1);
164 return result;
165 }
166
processx_connection_read_lines(SEXP con,SEXP nlines)167 SEXP processx_connection_read_lines(SEXP con, SEXP nlines) {
168
169 processx_connection_t *ccon = R_ExternalPtrAddr(con);
170 SEXP result;
171 int cn = asInteger(nlines);
172 ssize_t newline, eol = -1;
173 size_t lines_read = 0, l;
174 int eof = 0;
175 int slashr;
176
177 processx__connection_find_lines(ccon, cn, &lines_read, &eof);
178
179 result = PROTECT(allocVector(STRSXP, lines_read + eof));
180 for (l = 0, newline = -1; l < lines_read; l++) {
181 eol = processx__find_newline(ccon, newline + 1);
182 slashr = eol > 0 && ccon->utf8[eol - 1] == '\r';
183 SET_STRING_ELT(
184 result, l,
185 mkCharLenCE(ccon->utf8 + newline + 1,
186 (int) (eol - newline - 1 - slashr), CE_UTF8));
187 newline = eol;
188 }
189
190 if (eof) {
191 eol = ccon->utf8_data_size - 1;
192 SET_STRING_ELT(
193 result, l,
194 mkCharLenCE(ccon->utf8 + newline + 1,
195 (int) (eol - newline), CE_UTF8));
196 }
197
198 if (eol >= 0) {
199 ccon->utf8_data_size -= eol + 1;
200 memmove(ccon->utf8, ccon->utf8 + eol + 1, ccon->utf8_data_size);
201 }
202
203 UNPROTECT(1);
204 return result;
205 }
206
processx_connection_write_bytes(SEXP con,SEXP bytes)207 SEXP processx_connection_write_bytes(SEXP con, SEXP bytes) {
208 processx_connection_t *ccon = R_ExternalPtrAddr(con);
209 Rbyte *cbytes = RAW(bytes);
210 size_t nbytes = LENGTH(bytes);
211 SEXP result;
212
213 ssize_t written = processx_c_connection_write_bytes(ccon, cbytes, nbytes);
214
215 size_t left = nbytes - written;
216 PROTECT(result = allocVector(RAWSXP, left));
217 if (left > 0) memcpy(RAW(result), cbytes + written, left);
218
219 UNPROTECT(1);
220 return result;
221 }
222
processx_connection_is_eof(SEXP con)223 SEXP processx_connection_is_eof(SEXP con) {
224 processx_connection_t *ccon = R_ExternalPtrAddr(con);
225 if (!ccon) R_THROW_ERROR("Invalid connection object");
226 return ScalarLogical(ccon->is_eof_);
227 }
228
processx_connection_close(SEXP con)229 SEXP processx_connection_close(SEXP con) {
230 processx_connection_t *ccon = R_ExternalPtrAddr(con);
231 if (!ccon) R_THROW_ERROR("Invalid connection object");
232 processx_c_connection_close(ccon);
233 return R_NilValue;
234 }
235
processx_connection_is_closed(SEXP con)236 SEXP processx_connection_is_closed(SEXP con) {
237 processx_connection_t *ccon = R_ExternalPtrAddr(con);
238 if (!ccon) R_THROW_ERROR("Invalid connection object");
239 return ScalarLogical(processx_c_connection_is_closed(ccon));
240 }
241
242 /* Poll connections and other pollable handles */
processx_connection_poll(SEXP pollables,SEXP timeout)243 SEXP processx_connection_poll(SEXP pollables, SEXP timeout) {
244 /* TODO: this is not used currently */
245 R_THROW_ERROR("Not implemented");
246 return R_NilValue;
247 }
248
processx_connection_create_pipepair(SEXP encoding,SEXP nonblocking)249 SEXP processx_connection_create_pipepair(SEXP encoding, SEXP nonblocking) {
250 const char *c_encoding = CHAR(STRING_ELT(encoding, 0));
251 int *c_nonblocking = LOGICAL(nonblocking);
252 SEXP result, con1, con2;
253
254 #ifdef _WIN32
255 HANDLE h1, h2;
256 processx__create_pipe(0, &h1, &h2, "???");
257
258 #else
259 int pipe[2], h1, h2;
260 processx__make_socketpair(pipe, NULL);
261 processx__nonblock_fcntl(pipe[0], c_nonblocking[0]);
262 processx__nonblock_fcntl(pipe[1], c_nonblocking[1]);
263 h1 = pipe[0];
264 h2 = pipe[1];
265 #endif
266
267 processx_c_connection_create(h1, c_nonblocking[0] ?
268 PROCESSX_FILE_TYPE_ASYNCPIPE : PROCESSX_FILE_TYPE_PIPE, c_encoding,
269 &con1);
270 PROTECT(con1);
271 processx_c_connection_create(h2, c_nonblocking[1] ?
272 PROCESSX_FILE_TYPE_ASYNCPIPE : PROCESSX_FILE_TYPE_PIPE, c_encoding,
273 &con2);
274 PROTECT(con2);
275
276 result = PROTECT(allocVector(VECSXP, 2));
277 SET_VECTOR_ELT(result, 0, con1);
278 SET_VECTOR_ELT(result, 1, con2);
279
280 UNPROTECT(3);
281 return result;
282 }
283
processx__connection_set_std(SEXP con,int which,int drop)284 SEXP processx__connection_set_std(SEXP con, int which, int drop) {
285 processx_connection_t *ccon = R_ExternalPtrAddr(con);
286 if (!ccon) R_THROW_ERROR("Invalid connection object");
287 SEXP result = R_NilValue;
288
289 #ifdef _WIN32
290 int fd, ret;
291 if (!drop) {
292 int saved = _dup(which);
293 processx_file_handle_t os_handle;
294 if (saved == -1) {
295 R_THROW_POSIX_ERROR("Cannot save stdout/stderr for rerouting");
296 }
297 os_handle = (HANDLE) _get_osfhandle(saved) ;
298 processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_PIPE,
299 "", &result);
300 }
301
302 fd = _open_osfhandle((intptr_t) ccon->handle.handle, 0);
303 ret = _dup2(fd, which);
304 if (ret) R_THROW_POSIX_ERROR("Cannot reroute stdout/stderr");
305
306 #else
307 const char *what[] = { "stdin", "stdout", "stderr" };
308 int ret;
309 if (!drop) {
310 processx_file_handle_t os_handle = dup(which);
311 if (os_handle == -1) {
312 R_THROW_SYSTEM_ERROR("Cannot save %s for rerouting", what[which]);
313 }
314 processx_c_connection_create(os_handle, PROCESSX_FILE_TYPE_PIPE,
315 "", &result);
316 }
317 ret = dup2(ccon->handle, which);
318 if (ret == -1) {
319 R_THROW_SYSTEM_ERROR("Cannot reroute %s", what[which]);
320 }
321 #endif
322
323 return result;
324 }
325
processx_connection_set_stdout(SEXP con,SEXP drop)326 SEXP processx_connection_set_stdout(SEXP con, SEXP drop) {
327 return processx__connection_set_std(con, 1, LOGICAL(drop)[0]);
328 }
329
processx_connection_set_stderr(SEXP con,SEXP drop)330 SEXP processx_connection_set_stderr(SEXP con, SEXP drop) {
331 return processx__connection_set_std(con, 2, LOGICAL(drop)[0]);
332 }
333
processx_connection_get_fileno(SEXP con)334 SEXP processx_connection_get_fileno(SEXP con) {
335 processx_connection_t *ccon = R_ExternalPtrAddr(con);
336 if (!ccon) R_THROW_ERROR("Invalid connection object");
337 int fd;
338
339 #ifdef _WIN32
340 fd = _open_osfhandle((intptr_t) ccon->handle.handle, 0);
341 #else
342 fd = ccon->handle;
343 #endif
344
345 return ScalarInteger(fd);
346 }
347
348 #ifdef _WIN32
349
350 /*
351 * Clear the HANDLE_FLAG_INHERIT flag from all HANDLEs that were inherited
352 * the parent process. Don't check for errors - the stdio handles may not be
353 * valid, or may be closed already. There is no guarantee that this function
354 * does a perfect job.
355 */
356
processx_connection_disable_inheritance()357 SEXP processx_connection_disable_inheritance() {
358 HANDLE handle;
359 STARTUPINFOW si;
360
361 /* Make the windows stdio handles non-inheritable. */
362 handle = GetStdHandle(STD_INPUT_HANDLE);
363 if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
364 SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
365 }
366
367 handle = GetStdHandle(STD_OUTPUT_HANDLE);
368 if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
369 SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
370 }
371
372 handle = GetStdHandle(STD_ERROR_HANDLE);
373 if (handle != NULL && handle != INVALID_HANDLE_VALUE) {
374 SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
375 }
376
377 /* Make inherited CRT FDs non-inheritable. */
378 GetStartupInfoW(&si);
379 if (processx__stdio_verify(si.lpReserved2, si.cbReserved2)) {
380 processx__stdio_noinherit(si.lpReserved2);
381 }
382
383 return R_NilValue;
384 }
385
386 #else
387
processx_connection_disable_inheritance()388 SEXP processx_connection_disable_inheritance() {
389 int fd;
390
391 /* Set the CLOEXEC flag on all open descriptors. Unconditionally try the
392 * first 16 file descriptors. After that, bail out after the first error.
393 */
394 for (fd = 0; ; fd++) {
395 if (processx__cloexec_fcntl(fd, 1) && fd > 15) break;
396 }
397
398 return R_NilValue;
399 }
400
401 #endif
402
403 /* Api from C -----------------------------------------------------------*/
404
processx_c_connection_create(processx_file_handle_t os_handle,processx_file_type_t type,const char * encoding,SEXP * r_connection)405 processx_connection_t *processx_c_connection_create(
406 processx_file_handle_t os_handle,
407 processx_file_type_t type,
408 const char *encoding,
409 SEXP *r_connection) {
410
411 processx_connection_t *con;
412 SEXP result, class;
413
414 con = malloc(sizeof(processx_connection_t));
415 if (!con) R_THROW_ERROR("cannot create connection, out of memory");
416
417 con->type = type;
418 con->is_closed_ = 0;
419 con->is_eof_ = 0;
420 con->is_eof_raw_ = 0;
421 con->close_on_destroy = 1;
422 con->iconv_ctx = 0;
423
424 con->buffer = 0;
425 con->buffer_allocated_size = 0;
426 con->buffer_data_size = 0;
427
428 con->utf8 = 0;
429 con->utf8_allocated_size = 0;
430 con->utf8_data_size = 0;
431
432 con->encoding = 0;
433 if (encoding && encoding[0]) {
434 con->encoding = strdup(encoding);
435 if (!con->encoding) {
436 free(con);
437 R_THROW_ERROR("cannot create connection, out of memory");
438 return 0; /* never reached */
439 }
440 }
441
442 #ifdef _WIN32
443 con->handle.handle = os_handle;
444 memset(&con->handle.overlapped, 0, sizeof(OVERLAPPED));
445 con->handle.read_pending = FALSE;
446 con->handle.freelist = FALSE;
447 #else
448 con->handle = os_handle;
449 #endif
450
451 if (r_connection) {
452 result = PROTECT(R_MakeExternalPtr(con, R_NilValue, R_NilValue));
453 R_RegisterCFinalizerEx(result, processx__connection_xfinalizer, 0);
454 class = PROTECT(ScalarString(mkChar("processx_connection")));
455 setAttrib(result, R_ClassSymbol, class);
456 *r_connection = result;
457 UNPROTECT(2);
458 }
459
460 return con;
461 }
462
463 /* Destroy */
processx_c_connection_destroy(processx_connection_t * ccon)464 void processx_c_connection_destroy(processx_connection_t *ccon) {
465
466 if (!ccon) return;
467
468 if (ccon->close_on_destroy) processx_c_connection_close(ccon);
469
470 /* Even if not close_on_destroy, for us the connection is closed. */
471 ccon->is_closed_ = 1;
472
473 #ifdef _WIN32
474 /* Check if we can free the connection. If there is a pending read,
475 then we cannot. In this case schedule_destroy will add it to a free
476 list and return 1. */
477 if (processx__connection_schedule_destroy(ccon)) return;
478 #endif
479
480 if (ccon->iconv_ctx) {
481 Riconv_close(ccon->iconv_ctx);
482 ccon->iconv_ctx = NULL;
483 }
484
485 if (ccon->buffer) { free(ccon->buffer); ccon->buffer = NULL; }
486 if (ccon->utf8) { free(ccon->utf8); ccon->utf8 = NULL; }
487 if (ccon->encoding) { free(ccon->encoding); ccon->encoding = NULL; }
488
489 #ifdef _WIN32
490 if (ccon->handle.overlapped.hEvent) {
491 CloseHandle(ccon->handle.overlapped.hEvent);
492 }
493 ccon->handle.overlapped.hEvent = 0;
494 #endif
495
496 free(ccon);
497 }
498
499 /* Read characters */
processx_c_connection_read_chars(processx_connection_t * ccon,void * buffer,size_t nbyte)500 ssize_t processx_c_connection_read_chars(processx_connection_t *ccon,
501 void *buffer,
502 size_t nbyte) {
503 size_t utf8_chars, utf8_bytes;
504
505 if (nbyte < 4) {
506 R_THROW_ERROR("Buffer size must be at least 4 bytes, to allow multibyte "
507 "characters");
508 }
509
510 processx__connection_find_chars(ccon, -1, nbyte, &utf8_chars, &utf8_bytes);
511
512 memcpy(buffer, ccon->utf8, utf8_bytes);
513 ccon->utf8_data_size -= utf8_bytes;
514 memmove(ccon->utf8, ccon->utf8 + utf8_bytes, ccon->utf8_data_size);
515
516 return utf8_bytes;
517 }
518
519 /**
520 * Read a single line, ending with \n
521 *
522 * The trailing \n character is not copied to the buffer.
523 *
524 * @param ccon Connection.
525 * @param linep Must point to a buffer pointer. If must not be NULL. If
526 * the buffer pointer is NULL, it will be allocated. If it is not NULL,
527 * it might be reallocated using `realloc`, as needed.
528 * @param linecapp Initial size of the buffer. It will be updated if the
529 * buffer is newly allocated or reallocated.
530 * @return Number of characters read, not including the \n character.
531 * It returns -1 on EOF. If the connection is not at EOF yet, but there
532 * is nothing to read currently, it returns 0. If 0 is returned, `linep`
533 * and `linecapp` are not touched.
534 *
535 */
processx_c_connection_read_line(processx_connection_t * ccon,char ** linep,size_t * linecapp)536 ssize_t processx_c_connection_read_line(processx_connection_t *ccon,
537 char **linep, size_t *linecapp) {
538
539 int eof = 0;
540 ssize_t newline;
541
542 if (!linep) {
543 R_THROW_ERROR("cannot read line, linep cannot be a null pointer");
544 }
545 if (!linecapp) {
546 R_THROW_ERROR("cannot read line, linecapp cannot be a null pointer");
547 }
548
549 if (ccon->is_eof_) return -1;
550
551 /* Read until a newline character shows up, or there is nothing more
552 to read (at least for now). */
553 newline = processx__connection_read_until_newline(ccon);
554
555 /* If there is no newline at the end of the file, we still add the
556 last line. */
557 if (ccon->is_eof_raw_ && ccon->utf8_data_size != 0 &&
558 ccon->buffer_data_size == 0 &&
559 ccon->utf8[ccon->utf8_data_size - 1] != '\n') {
560 eof = 1;
561 }
562
563 /* We cannot serve a line currently. Maybe later. */
564 if (newline == -1 && ! eof) return 0;
565
566 /* Newline will contain the end of the line now, even if EOF */
567 if (newline == -1) newline = ccon->utf8_data_size;
568 if (ccon->utf8[newline - 1] == '\r') newline--;
569
570 if (! *linep) {
571 *linep = malloc(newline + 1);
572 *linecapp = newline + 1;
573 } else if (*linecapp < newline + 1) {
574 char *tmp = realloc(*linep, newline + 1);
575 if (!tmp) R_THROW_ERROR("cannot read line, out of memory");
576 *linep = tmp;
577 *linecapp = newline + 1;
578 }
579
580 memcpy(*linep, ccon->utf8, newline);
581 (*linep)[newline] = '\0';
582
583 if (!eof) {
584 ccon->utf8_data_size -= (newline + 1);
585 memmove(ccon->utf8, ccon->utf8 + newline + 1, ccon->utf8_data_size);
586 } else {
587 ccon->utf8_data_size = 0;
588 }
589
590 return newline;
591 }
592
593 /* Write bytes */
processx_c_connection_write_bytes(processx_connection_t * ccon,const void * buffer,size_t nbytes)594 ssize_t processx_c_connection_write_bytes(
595 processx_connection_t *ccon,
596 const void *buffer,
597 size_t nbytes) {
598
599 PROCESSX_CHECK_VALID_CONN(ccon);
600
601 #ifdef _WIN32
602 DWORD written;
603 BOOL ret = WriteFile(
604 /* hFile = */ ccon->handle.handle,
605 /* lpBuffer = */ buffer,
606 /* nNumberOfBytesToWrite = */ nbytes,
607 /* lpNumberOfBytesWritten = */ &written,
608 /* lpOverlapped = */ NULL);
609 if (!ret) R_THROW_SYSTEM_ERROR("Cannot write connection");
610 return (ssize_t) written;
611 #else
612 /* Need to ignore SIGPIPE here, otherwise R might crash */
613 struct sigaction old_handler, new_handler;
614 memset(&new_handler, 0, sizeof(new_handler));
615 sigemptyset(&new_handler.sa_mask);
616 new_handler.sa_handler = SIG_IGN;
617 sigaction(SIGPIPE, &new_handler, &old_handler );
618
619 ssize_t ret = write(ccon->handle, buffer, nbytes);
620
621 sigaction(SIGPIPE, &old_handler, NULL );
622
623 if (ret == -1) {
624 if (errno == EAGAIN || errno == EWOULDBLOCK) {
625 return 0;
626 } else {
627 R_THROW_SYSTEM_ERROR("Cannot write connection");
628 }
629 }
630 return ret;
631 #endif
632 }
633
634 /* Check if the connection has ended */
processx_c_connection_is_eof(processx_connection_t * ccon)635 int processx_c_connection_is_eof(processx_connection_t *ccon) {
636 return ccon->is_eof_;
637 }
638
639 /* Close */
processx_c_connection_close(processx_connection_t * ccon)640 void processx_c_connection_close(processx_connection_t *ccon) {
641 #ifdef _WIN32
642 if (ccon->handle.handle) {
643 CloseHandle(ccon->handle.handle);
644 }
645 ccon->handle.handle = 0;
646 #else
647 if (ccon->handle >= 0) close(ccon->handle);
648 ccon->handle = -1;
649 #endif
650 ccon->is_closed_ = 1;
651 }
652
processx_c_connection_is_closed(processx_connection_t * ccon)653 int processx_c_connection_is_closed(processx_connection_t *ccon) {
654 return ccon->is_closed_;
655 }
656
657 #ifdef _WIN32
658
659 /* TODO: errors */
processx__socket_pair(SOCKET fds[2])660 int processx__socket_pair(SOCKET fds[2]) {
661 struct sockaddr_in inaddr;
662 struct sockaddr addr;
663 SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
664 memset(&inaddr, 0, sizeof(inaddr));
665 memset(&addr, 0, sizeof(addr));
666 inaddr.sin_family = AF_INET;
667 inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
668 inaddr.sin_port = 0;
669 int yes = 1;
670 setsockopt(lst, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes));
671 bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr));
672 listen(lst, 1);
673 int len = sizeof(inaddr);
674 getsockname(lst, &addr,&len);
675 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
676 connect(fds[0], &addr, len);
677 fds[1] = accept(lst,0,0);
678 closesocket(lst);
679 return 0;
680 }
681
processx_c_connection_poll(processx_pollable_t pollables[],size_t npollables,int timeout)682 int processx_c_connection_poll(processx_pollable_t pollables[],
683 size_t npollables, int timeout) {
684
685 int hasdata = 0;
686 size_t i, j = 0, selj = 0;
687 int *ptr;
688 int timeleft = timeout;
689 DWORD bytes;
690 OVERLAPPED *overlapped = 0;
691 ULONG_PTR key;
692 int *events;
693
694 FD_ZERO(&processx__readfds);
695 FD_ZERO(&processx__writefds);
696 FD_ZERO(&processx__exceptionfds);
697
698 events = (int*) R_alloc(npollables, sizeof(int));
699
700 /* First iteration, we call the pre-poll method, and collect the
701 handles for the IOCP, and the fds for select(). */
702 for (i = 0; i < npollables; i++) {
703 processx_pollable_t *el = pollables + i;
704 events[i] = PXSILENT;
705 if (el->pre_poll_func) events[i] = el->pre_poll_func(el);
706 switch (events[i]) {
707 case PXHANDLE:
708 j++;
709 break;
710 default:
711 break;
712 }
713 }
714
715 /* j contains the number of IOCP handles to poll */
716
717 ptr = (int*) R_alloc(j, sizeof(int));
718
719 for (i = 0, j = 0; i < npollables; i++) {
720 processx_pollable_t *el = pollables + i;
721
722 switch (events[i]) {
723 case PXNOPIPE:
724 case PXCLOSED:
725 case PXSILENT:
726 el->event = events[i];
727 break;
728
729 case PXREADY:
730 hasdata++;
731 el->event = events[i];
732 break;
733
734 case PXHANDLE:
735 el->event = PXSILENT;
736 ptr[j] = i;
737 j++;
738 break;
739
740 case PXSELECT: {
741 SEXP elem;
742 el->event = PXSILENT;
743 int k, n;
744 elem = VECTOR_ELT(el->fds, 0);
745 n = LENGTH(elem);
746 selj += n;
747 for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__readfds);
748 elem = VECTOR_ELT(el->fds, 1);
749 n = LENGTH(elem);
750 selj += n;
751 for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__writefds);
752 elem = VECTOR_ELT(el->fds, 2);
753 n = LENGTH(elem);
754 selj += n;
755 for (k = 0; k < n; k++) FD_SET(INTEGER(elem)[k], &processx__exceptionfds);
756 } }
757 }
758
759 if (j == 0 && selj == 0) return hasdata;
760
761 if (hasdata) timeout = timeleft = 0;
762
763 if (selj != 0) {
764 processx__socket_pair(processx__notify_socket);
765 FD_SET(processx__notify_socket[0], &processx__readfds);
766 processx__select = 1;
767 } else {
768 processx__select = 0;
769 }
770
771 while (timeout < 0 || timeleft >= 0) {
772 int poll_timeout;
773 if (timeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) {
774 poll_timeout = PROCESSX_INTERRUPT_INTERVAL;
775 } else {
776 poll_timeout = timeleft;
777 }
778
779 BOOL sres;
780 if (selj == 0) {
781 sres = processx__thread_getstatus(&bytes, &key, &overlapped,
782 poll_timeout);
783 } else {
784 sres = processx__thread_getstatus_select(&bytes, &key, &overlapped,
785 poll_timeout);
786 }
787 DWORD err = sres ? ERROR_SUCCESS : processx__thread_get_last_error();
788
789 /* See if there was any data from the curl sockets */
790
791 if (processx__select) {
792 for (i = 0; i < npollables; i++) {
793 if (events[i] == PXSELECT) {
794 processx_pollable_t *el = pollables + i;
795 SEXP elem;
796 int k, n;
797 int has = 0;
798 elem = VECTOR_ELT(el->fds, 0);
799 n = LENGTH(elem);
800 for (k = 0; k < n; k++) {
801 if (FD_ISSET(INTEGER(elem)[k], &processx__readfds)) has = 1;
802 FD_SET(INTEGER(elem)[k], &processx__readfds);
803 }
804 elem = VECTOR_ELT(el->fds, 1);
805 n = LENGTH(elem);
806 for (k = 0; k < n; k++) {
807 if (FD_ISSET(INTEGER(elem)[k], &processx__writefds)) has = 1;
808 FD_SET(INTEGER(elem)[k], &processx__writefds);
809 }
810 elem = VECTOR_ELT(el->fds, 2);
811 n = LENGTH(elem);
812 for (k = 0; k < n; k++) {
813 if (FD_ISSET(INTEGER(elem)[k], &processx__exceptionfds)) has = 1;
814 FD_SET(INTEGER(elem)[k], &processx__exceptionfds);
815 }
816 if (has) {
817 el->event = PXEVENT;
818 hasdata++;
819 }
820 }
821 }
822 }
823
824 /* See if there was any data from the IOCP */
825
826 if (overlapped) {
827 /* data */
828 processx_connection_t *con = (processx_connection_t*) key;
829 int poll_idx = con->poll_idx;
830 con->handle.read_pending = FALSE;
831 con->buffer_data_size += bytes;
832 if (con->buffer_data_size > 0) processx__connection_to_utf8(con);
833 if (con->type == PROCESSX_FILE_TYPE_ASYNCFILE) {
834 /* TODO: larger files */
835 con->handle.overlapped.Offset += bytes;
836 }
837
838 if (!bytes) {
839 con->is_eof_raw_ = 1;
840 if (con->utf8_data_size == 0 && con->buffer_data_size == 0) {
841 con->is_eof_ = 1;
842 }
843 }
844
845 if (con->handle.freelist) processx__connection_freelist_remove(con);
846
847 if (poll_idx < npollables &&
848 pollables[poll_idx].object == con) {
849 pollables[poll_idx].event = PXREADY;
850 hasdata++;
851 }
852 } else if (err != WAIT_TIMEOUT && err != ERROR_SUCCESS) {
853 R_THROW_SYSTEM_ERROR_CODE(err, "Cannot poll");
854 }
855
856 if (hasdata) break;
857 R_CheckUserInterrupt();
858 timeleft -= PROCESSX_INTERRUPT_INTERVAL;
859 }
860
861 if (hasdata == 0) {
862 for (i = 0; i < j; i++) pollables[ptr[i]].event = PXTIMEOUT;
863 }
864
865 closesocket(processx__notify_socket[0]);
866 closesocket(processx__notify_socket[1]);
867
868 return hasdata;
869 }
870
871 #else
872
processx__poll_decode(short code)873 static int processx__poll_decode(short code) {
874 if (code & POLLNVAL) return PXCLOSED;
875 if (code & POLLIN || code & POLLHUP || code & POLLOUT) return PXREADY;
876 return PXSILENT;
877 }
878
879 /* Poll connections and other pollable handles */
processx_c_connection_poll(processx_pollable_t pollables[],size_t npollables,int timeout)880 int processx_c_connection_poll(processx_pollable_t pollables[],
881 size_t npollables, int timeout) {
882
883 int hasdata = 0;
884 size_t i, j = 0;
885 struct pollfd *fds;
886 int *ptr;
887 int ret;
888 int *events;
889
890 if (npollables == 0) return 0;
891
892 /* Need to allocate this, because we need to put in the fds, maybe */
893 events = (int*) R_alloc(npollables, sizeof(int));
894
895 /* First iteration, we call the pre-poll method, and collect the
896 fds to poll. */
897 for (i = 0; i < npollables; i++) {
898 processx_pollable_t *el = pollables + i;
899 events[i] = PXSILENT;
900 if (el->pre_poll_func) events[i] = el->pre_poll_func(el);
901 switch (events[i]) {
902 case PXHANDLE:
903 j++;
904 break;
905 case PXSELECT: {
906 /* This is three vectors of fds to poll, in an R list */
907 int w;
908 for (w = 0; w < 3; w++) {
909 j += LENGTH(VECTOR_ELT(el->fds, w));
910 } }
911 default:
912 break;
913 }
914 }
915
916 /* j contains the number of fds to poll now */
917
918 fds = (struct pollfd*) R_alloc(j, sizeof(struct pollfd));
919 ptr = (int*) R_alloc(j, sizeof(int));
920
921 /* Need to go over them again, collect the ones that we need to poll */
922 for (i = 0, j = 0; i < npollables; i++) {
923 processx_pollable_t *el = pollables + i;
924 switch (events[i]) {
925 case PXNOPIPE:
926 case PXCLOSED:
927 case PXSILENT:
928 el->event = events[i];
929 break;
930
931 case PXREADY:
932 hasdata++;
933 el->event = events[i];
934 break;
935
936 case PXHANDLE:
937 el->event = PXSILENT;
938 fds[j].fd = el->handle;
939 fds[j].events = POLLIN;
940 fds[j].revents = 0;
941 ptr[j] = (int) i;
942 j++;
943 break;
944
945 case PXSELECT: {
946 int pollevs[3] = { POLLIN, POLLOUT, POLLIN | POLLOUT };
947 int w;
948 el->event = PXSILENT;
949 for (w = 0; w < 3; w++) {
950 SEXP elem = VECTOR_ELT(el->fds, w);
951 int k, n = LENGTH(elem);
952 for (k = 0; k < n; k++) {
953 fds[j].fd = INTEGER(elem)[k];
954 fds[j].events = pollevs[w];
955 fds[j].revents = 0;
956 ptr[j] = (int) i;
957 j++;
958 }
959 }
960 break; }
961 }
962 }
963
964 /* Nothing to poll */
965 if (j == 0) return hasdata;
966
967 /* If we already have some data, then we don't wait any more,
968 just check if other connections are ready */
969 ret = processx__interruptible_poll(fds, (nfds_t) j,
970 hasdata > 0 ? 0 : timeout);
971
972 if (ret == -1) {
973 R_THROW_SYSTEM_ERROR("Processx poll error");
974
975 } else if (ret == 0) {
976 if (hasdata == 0) {
977 for (i = 0; i < j; i++) pollables[ptr[i]].event = PXTIMEOUT;
978 }
979
980 } else {
981 for (i = 0; i < j; i++) {
982 if (events[ptr[i]] == PXSELECT) {
983 if (pollables[ptr[i]].event == PXSILENT) {
984 int ev = fds[i].revents;
985 if (ev & (POLLNVAL | POLLIN | POLLHUP | POLLOUT)) {
986 pollables[ptr[i]].event = PXEVENT;
987 }
988 }
989 } else {
990 pollables[ptr[i]].event = processx__poll_decode(fds[i].revents);
991 hasdata += (pollables[ptr[i]].event == PXREADY);
992 }
993 }
994 }
995
996 return hasdata;
997 }
998
999 #endif
1000
1001 #ifdef _WIN32
1002
processx__connection_start_read(processx_connection_t * ccon)1003 void processx__connection_start_read(processx_connection_t *ccon) {
1004 DWORD bytes_read;
1005 BOOLEAN res;
1006 size_t todo;
1007
1008 if (! ccon->handle.handle) return;
1009
1010 if (ccon->handle.read_pending) return;
1011
1012 if (!ccon->buffer) processx__connection_alloc(ccon);
1013
1014 todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1015
1016 res = processx__thread_readfile(
1017 ccon,
1018 ccon->buffer + ccon->buffer_data_size,
1019 todo,
1020 &bytes_read);
1021
1022 if (!res) {
1023 DWORD err = processx__thread_get_last_error();
1024 if (err == ERROR_BROKEN_PIPE || err == ERROR_HANDLE_EOF) {
1025 ccon->is_eof_raw_ = 1;
1026 if (ccon->utf8_data_size == 0 && ccon->buffer_data_size == 0) {
1027 ccon->is_eof_ = 1;
1028 }
1029 if (ccon->buffer_data_size) processx__connection_to_utf8(ccon);
1030 } else if (err == ERROR_IO_PENDING) {
1031 ccon->handle.read_pending = TRUE;
1032 } else {
1033 ccon->handle.read_pending = FALSE;
1034 R_THROW_SYSTEM_ERROR_CODE(err, "reading from connection");
1035 }
1036 } else {
1037 /* Returned synchronously, but the event will be still signalled,
1038 so we just drop the sync data for now. */
1039 ccon->handle.read_pending = TRUE;
1040 }
1041 }
1042
1043 #endif
1044
1045 /* Poll a connection
1046 *
1047 * Checks if there is anything in the buffer. If yes, it returns
1048 * PXREADY. Otherwise it returns the handle.
1049 *
1050 * We can read immediately (without an actual device read), potentially:
1051 * 1. if the connection is already closed, we return PXCLOSED
1052 * 2. if the connection is already EOF, we return PXREADY
1053 * 3. if there is data in the UTF8 buffer, we return PXREADY
1054 * 4. if there is data in the raw buffer, and the raw file was EOF, we
1055 * return PXREADY, because we can surely return something, even if the
1056 * raw buffer has incomplete UTF8 characters.
1057 * 5. otherwise, if there is something in the raw buffer, we try
1058 * to convert it to UTF8.
1059 */
1060
1061 #define PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY do { \
1062 if (!ccon) return PXNOPIPE; \
1063 if (ccon->is_closed_) return PXCLOSED; \
1064 if (ccon->is_eof_) return PXREADY; \
1065 if (ccon->utf8_data_size > 0) return PXREADY; \
1066 if (ccon->buffer_data_size > 0 && ccon->is_eof_raw_) return PXREADY; \
1067 if (ccon->buffer_data_size > 0) { \
1068 processx__connection_to_utf8(ccon); \
1069 if (ccon->utf8_data_size > 0) return PXREADY; \
1070 } } while (0)
1071
processx_i_pre_poll_func_connection(processx_pollable_t * pollable)1072 int processx_i_pre_poll_func_connection(processx_pollable_t *pollable) {
1073
1074 processx_connection_t *ccon = pollable->object;
1075
1076 PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY;
1077
1078 #ifdef _WIN32
1079 processx__connection_start_read(ccon);
1080 /* Starting to read may actually get some data, or an EOF, so check again */
1081 PROCESSX__I_PRE_POLL_FUNC_CONNECTION_READY;
1082 pollable->handle = ccon->handle.overlapped.hEvent;
1083 #else
1084 pollable->handle = ccon->handle;
1085 #endif
1086
1087 return PXHANDLE;
1088 }
1089
processx_c_pollable_from_connection(processx_pollable_t * pollable,processx_connection_t * ccon)1090 int processx_c_pollable_from_connection(
1091 processx_pollable_t *pollable,
1092 processx_connection_t *ccon) {
1093
1094 pollable->pre_poll_func = processx_i_pre_poll_func_connection;
1095 pollable->object = ccon;
1096 pollable->free = 0;
1097 pollable->fds = R_NilValue;
1098 return 0;
1099 }
1100
processx_i_pre_poll_func_curl(processx_pollable_t * pollable)1101 int processx_i_pre_poll_func_curl(processx_pollable_t *pollable) {
1102 return PXSELECT;
1103 }
1104
processx_c_pollable_from_curl(processx_pollable_t * pollable,SEXP fds)1105 int processx_c_pollable_from_curl(
1106 processx_pollable_t *pollable,
1107 SEXP fds) {
1108
1109 pollable->pre_poll_func = processx_i_pre_poll_func_curl;
1110 pollable->object = NULL;
1111 pollable->free = 0;
1112 pollable->fds = fds;
1113 return 0;
1114 }
1115
processx_c_connection_fileno(const processx_connection_t * con)1116 processx_file_handle_t processx_c_connection_fileno(
1117 const processx_connection_t *con) {
1118 #ifdef _WIN32
1119 return con->handle.handle;
1120 #else
1121 return con->handle;
1122 #endif
1123 }
1124
1125 /* --------------------------------------------------------------------- */
1126 /* Internals */
1127 /* --------------------------------------------------------------------- */
1128
1129 /**
1130 * Work out how many UTF-8 characters we can read
1131 *
1132 * It might try to read more data, but it does not modify the buffer
1133 * otherwise.
1134 *
1135 * @param ccon Connection.
1136 * @param maxchars Maximum number of characters to find.
1137 * @param maxbytes Maximum number of bytes to check while searching.
1138 * @param chars Number of characters found is stored here.
1139 * @param bytes Number of bytes the `chars` characters span.
1140 *
1141 */
1142
processx__connection_find_chars(processx_connection_t * ccon,ssize_t maxchars,ssize_t maxbytes,size_t * chars,size_t * bytes)1143 static void processx__connection_find_chars(processx_connection_t *ccon,
1144 ssize_t maxchars,
1145 ssize_t maxbytes,
1146 size_t *chars,
1147 size_t *bytes) {
1148
1149 int should_read_more;
1150
1151 PROCESSX_CHECK_VALID_CONN(ccon);
1152
1153 should_read_more = ! ccon->is_eof_ && ccon->utf8_data_size == 0;
1154 if (should_read_more) processx__connection_read(ccon);
1155
1156 if (ccon->utf8_data_size == 0 || maxchars == 0) { *bytes = 0; return; }
1157
1158 /* At at most cnchars characters from the UTF8 buffer */
1159 processx__connection_find_utf8_chars(ccon, maxchars, maxbytes, chars,
1160 bytes);
1161 }
1162
1163 /**
1164 * Find one or more lines in the buffer
1165 *
1166 * Since the buffer is UTF-8 encoded, `\n` is assumed as end-of-line
1167 * character.
1168 *
1169 * @param ccon Connection.
1170 * @param maxlines Maximum number of lines to find.
1171 * @param lines Number of lines found is stored here.
1172 * @param eof If the end of the file is reached, and there is no `\n`
1173 * at the end of the file, this is set to 1.
1174 *
1175 */
1176
processx__connection_find_lines(processx_connection_t * ccon,ssize_t maxlines,size_t * lines,int * eof)1177 static void processx__connection_find_lines(processx_connection_t *ccon,
1178 ssize_t maxlines,
1179 size_t *lines,
1180 int *eof ) {
1181
1182 ssize_t newline;
1183
1184 *eof = 0;
1185
1186 if (maxlines < 0) maxlines = 1000;
1187
1188 PROCESSX_CHECK_VALID_CONN(ccon);
1189
1190 /* Read until a newline character shows up, or there is nothing more
1191 to read (at least for now). */
1192 newline = processx__connection_read_until_newline(ccon);
1193
1194 /* Count the number of lines we got. */
1195 while (newline != -1 && *lines < maxlines) {
1196 (*lines) ++;
1197 newline = processx__find_newline(ccon, /* start = */ newline + 1);
1198 }
1199
1200 /* If there is no newline at the end of the file, we still add the
1201 last line. */
1202 if (ccon->is_eof_raw_ && ccon->utf8_data_size != 0 &&
1203 ccon->buffer_data_size == 0 &&
1204 ccon->utf8[ccon->utf8_data_size - 1] != '\n') {
1205 *eof = 1;
1206 }
1207
1208 }
1209
processx__connection_xfinalizer(SEXP con)1210 static void processx__connection_xfinalizer(SEXP con) {
1211 processx_connection_t *ccon = R_ExternalPtrAddr(con);
1212 processx_c_connection_destroy(ccon);
1213 }
1214
processx__find_newline(processx_connection_t * ccon,size_t start)1215 static ssize_t processx__find_newline(processx_connection_t *ccon,
1216 size_t start) {
1217
1218 if (ccon->utf8_data_size == 0) return -1;
1219 const char *ret = ccon->utf8 + start;
1220 const char *end = ccon->utf8 + ccon->utf8_data_size;
1221
1222 while (ret < end && *ret != '\n') ret++;
1223
1224 if (ret < end) return ret - ccon->utf8; else return -1;
1225 }
1226
processx__connection_read_until_newline(processx_connection_t * ccon)1227 static ssize_t processx__connection_read_until_newline
1228 (processx_connection_t *ccon) {
1229
1230 char *ptr, *end;
1231
1232 /* Make sure we try to have something, unless EOF */
1233 if (ccon->utf8_data_size == 0) processx__connection_read(ccon);
1234 if (ccon->utf8_data_size == 0) return -1;
1235
1236 /* We have sg in the utf8 at this point */
1237
1238 ptr = ccon->utf8;
1239 end = ccon->utf8 + ccon->utf8_data_size;
1240 while (1) {
1241 ssize_t new_bytes;
1242 while (ptr < end && *ptr != '\n') ptr++;
1243
1244 /* Have we found a newline? */
1245 if (ptr < end) return ptr - ccon->utf8;
1246
1247 /* No newline, but EOF? */
1248 if (ccon->is_eof_) return -1;
1249
1250 /* Maybe we can read more, but might need a bigger utf8.
1251 * The 8 bytes is definitely more than what we need for a UTF8
1252 * character, and this makes sure that we don't stop just because
1253 * no more UTF8 characters fit in the UTF8 buffer. */
1254 if (ccon->utf8_data_size >= ccon->utf8_allocated_size - 8) {
1255 size_t ptrnum = ptr - ccon->utf8;
1256 size_t endnum = end - ccon->utf8;
1257 processx__connection_realloc(ccon);
1258 ptr = ccon->utf8 + ptrnum;
1259 end = ccon->utf8 + endnum;
1260 }
1261 new_bytes = processx__connection_read(ccon);
1262
1263 /* If we cannot read now, then we give up */
1264 if (new_bytes == 0) return -1;
1265 }
1266 }
1267
1268 /* Allocate buffer for reading */
1269
processx__connection_alloc(processx_connection_t * ccon)1270 static void processx__connection_alloc(processx_connection_t *ccon) {
1271 ccon->buffer = malloc(64 * 1024);
1272 if (!ccon->buffer) R_THROW_ERROR("Cannot allocate memory for processx buffer");
1273 ccon->buffer_allocated_size = 64 * 1024;
1274 ccon->buffer_data_size = 0;
1275
1276 ccon->utf8 = malloc(64 * 1024);
1277 if (!ccon->utf8) {
1278 free(ccon->buffer);
1279 R_THROW_ERROR("Cannot allocate memory for processx buffer");
1280 }
1281 ccon->utf8_allocated_size = 64 * 1024;
1282 ccon->utf8_data_size = 0;
1283 }
1284
1285 /* We only really need to re-alloc the UTF8 buffer, because the
1286 other buffer is transient, even if there are no newline characters. */
1287
processx__connection_realloc(processx_connection_t * ccon)1288 static void processx__connection_realloc(processx_connection_t *ccon) {
1289 size_t new_size = (size_t) (ccon->utf8_allocated_size * 1.2);
1290 void *nb;
1291 if (new_size == ccon->utf8_allocated_size) new_size = 2 * new_size;
1292 nb = realloc(ccon->utf8, new_size);
1293 if (!nb) R_THROW_ERROR("Cannot allocate memory for processx line");
1294 ccon->utf8 = nb;
1295 ccon->utf8_allocated_size = new_size;
1296 }
1297
1298 /* Read as much as we can. This is the only function that explicitly
1299 works with the raw buffer. It is also the only function that actually
1300 reads from the data source.
1301
1302 When this is called, the UTF8 buffer is probably empty, but the raw
1303 buffer might not be. */
1304
1305 #ifdef _WIN32
1306
processx__connection_read(processx_connection_t * ccon)1307 ssize_t processx__connection_read(processx_connection_t *ccon) {
1308 DWORD todo, bytes_read = 0;
1309
1310 /* Nothing to read, nothing to convert to UTF8 */
1311 if (ccon->is_eof_raw_ && ccon->buffer_data_size == 0) {
1312 if (ccon->utf8_data_size == 0) ccon->is_eof_ = 1;
1313 return 0;
1314 }
1315
1316 if (!ccon->buffer) processx__connection_alloc(ccon);
1317
1318 /* If cannot read anything more, then try to convert to UTF8 */
1319 todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1320 if (todo == 0) return processx__connection_to_utf8(ccon);
1321
1322 /* Otherwise we read. If there is no read pending, we start one. */
1323 processx__connection_start_read(ccon);
1324
1325 /* A read might be pending at this point. See if it has finished. */
1326 if (ccon->handle.read_pending) {
1327 ULONG_PTR key;
1328 DWORD bytes;
1329 OVERLAPPED *overlapped = 0;
1330
1331 while (1) {
1332 BOOL sres = processx__thread_getstatus(&bytes, &key, &overlapped, 0);
1333 DWORD err = sres ? ERROR_SUCCESS : processx__thread_get_last_error();
1334 if (overlapped) {
1335 processx_connection_t *con = (processx_connection_t *) key;
1336 con->handle.read_pending = FALSE;
1337 con->buffer_data_size += bytes;
1338 if (con->buffer && con->buffer_data_size > 0) {
1339 bytes = processx__connection_to_utf8(con);
1340 }
1341 if (con->type == PROCESSX_FILE_TYPE_ASYNCFILE) {
1342 /* TODO: large files */
1343 con->handle.overlapped.Offset += bytes;
1344 }
1345 if (!bytes) {
1346 con->is_eof_raw_ = 1;
1347 if (con->utf8_data_size == 0 && con->buffer_data_size == 0) {
1348 con->is_eof_ = 1;
1349 }
1350 }
1351
1352 if (con->handle.freelist) processx__connection_freelist_remove(con);
1353
1354 if (con == ccon) {
1355 bytes_read = bytes;
1356 break;
1357 }
1358
1359 } else if (err != WAIT_TIMEOUT) {
1360 R_THROW_SYSTEM_ERROR_CODE(err, "Read error");
1361
1362 } else {
1363 break;
1364 }
1365 }
1366 }
1367
1368 return bytes_read;
1369 }
1370
1371 #else
1372
processx__connection_read(processx_connection_t * ccon)1373 static ssize_t processx__connection_read(processx_connection_t *ccon) {
1374 ssize_t todo, bytes_read;
1375
1376 /* Nothing to read, nothing to convert to UTF8 */
1377 if (ccon->is_eof_raw_ && ccon->buffer_data_size == 0) {
1378 if (ccon->utf8_data_size == 0) ccon->is_eof_ = 1;
1379 return 0;
1380 }
1381
1382 if (!ccon->buffer) processx__connection_alloc(ccon);
1383
1384 /* If cannot read anything more, then try to convert to UTF8 */
1385 todo = ccon->buffer_allocated_size - ccon->buffer_data_size;
1386 if (todo == 0) return processx__connection_to_utf8(ccon);
1387
1388 /* Otherwise we read */
1389 bytes_read = read(ccon->handle, ccon->buffer + ccon->buffer_data_size, todo);
1390
1391 if (bytes_read == 0) {
1392 /* EOF */
1393 ccon->is_eof_raw_ = 1;
1394 if (ccon->utf8_data_size == 0 && ccon->buffer_data_size == 0) {
1395 ccon->is_eof_ = 1;
1396 }
1397
1398 } else if (bytes_read == -1 && errno == EAGAIN) {
1399 /* There is still data to read, potentially */
1400 bytes_read = 0;
1401
1402 } else if (bytes_read == -1) {
1403 /* Proper error */
1404 R_THROW_SYSTEM_ERROR("Cannot read from processx connection");
1405 }
1406
1407 ccon->buffer_data_size += bytes_read;
1408
1409 /* If there is anything to convert to UTF8, try converting */
1410 if (ccon->buffer_data_size > 0) {
1411 bytes_read = processx__connection_to_utf8(ccon);
1412 } else {
1413 bytes_read = 0;
1414 }
1415
1416 return bytes_read;
1417 }
1418 #endif
1419
processx__connection_to_utf8(processx_connection_t * ccon)1420 static ssize_t processx__connection_to_utf8(processx_connection_t *ccon) {
1421
1422 const char *inbuf, *inbufold;
1423 char *outbuf, *outbufold;
1424 size_t inbytesleft = ccon->buffer_data_size;
1425 size_t outbytesleft = ccon->utf8_allocated_size - ccon->utf8_data_size;
1426 size_t r, indone = 0, outdone = 0;
1427 int moved = 0;
1428 const char *emptystr = "";
1429 const char *encoding = ccon->encoding ? ccon->encoding : emptystr;
1430
1431 inbuf = inbufold = ccon->buffer;
1432 outbuf = outbufold = ccon->utf8 + ccon->utf8_data_size;
1433
1434 /* If we this is the first time we are here. */
1435 if (! ccon->iconv_ctx) ccon->iconv_ctx = Riconv_open("UTF-8", encoding);
1436
1437 /* If nothing to do, or no space to do more, just return */
1438 if (inbytesleft == 0 || outbytesleft == 0) return 0;
1439
1440 while (!moved) {
1441 r = Riconv(ccon->iconv_ctx, &inbuf, &inbytesleft, &outbuf,
1442 &outbytesleft);
1443 moved = 1;
1444
1445 if (r == (size_t) -1) {
1446 /* Error */
1447 if (errno == E2BIG) {
1448 /* Output buffer is full, that's fine, we'll try later.
1449 Just use what we have done so far. */
1450
1451 } else if (errno == EILSEQ) {
1452 /* Invalid characters in encoding, *inbuf points to the beginning
1453 of the invalid sequence. We can just try to remove this, and
1454 convert again? */
1455 inbuf++; inbytesleft--;
1456 if (inbytesleft > 0) moved = 0;
1457
1458 } else if (errno == EINVAL) {
1459 /* Does not end with a complete multi-byte character */
1460 /* This is fine, we'll handle it later, unless we are at the end */
1461 if (ccon->is_eof_raw_) {
1462 warning("Invalid multi-byte character at end of stream ignored");
1463 inbuf += inbytesleft; inbytesleft = 0;
1464 }
1465 }
1466 }
1467 }
1468
1469 /* We converted 'r' bytes, update the buffer structure accordingly */
1470 indone = inbuf - inbufold;
1471 outdone = outbuf - outbufold;
1472 if (outdone > 0 || indone > 0) {
1473 ccon->buffer_data_size -= indone;
1474 memmove(ccon->buffer, ccon->buffer + indone, ccon->buffer_data_size);
1475 ccon->utf8_data_size += outdone;
1476 }
1477
1478 return outdone;
1479 }
1480
1481 /* Try to get at max 'max' UTF8 characters from the buffer. Return the
1482 * number of characters found, and also the corresponding number of
1483 * bytes. */
1484
1485 /* Number of additional bytes */
1486 static const unsigned char processx__utf8_length[] = {
1487 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
1488 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
1489 3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,
1490 4,4,4,4,4,4,4,4,5,5,5,5,6,6,6,6 };
1491
processx__connection_find_utf8_chars(processx_connection_t * ccon,ssize_t maxchars,ssize_t maxbytes,size_t * chars,size_t * bytes)1492 static void processx__connection_find_utf8_chars(processx_connection_t *ccon,
1493 ssize_t maxchars,
1494 ssize_t maxbytes,
1495 size_t *chars,
1496 size_t *bytes) {
1497
1498 char *ptr = ccon->utf8;
1499 char *end = ccon->utf8 + ccon->utf8_data_size;
1500 size_t length = ccon->utf8_data_size;
1501 *chars = *bytes = 0;
1502
1503 while (maxchars != 0 && maxbytes != 0 && ptr < end) {
1504 int clen, c = (unsigned char) *ptr;
1505
1506 /* ASCII byte */
1507 if (c < 128) {
1508 (*chars) ++; (*bytes) ++; ptr++; length--;
1509 if (maxchars > 0) maxchars--;
1510 if (maxbytes > 0) maxbytes--;
1511 continue;
1512 }
1513
1514 /* Catch some errors */
1515 if (c < 0xc0) goto invalid;
1516 if (c >= 0xfe) goto invalid;
1517
1518 clen = processx__utf8_length[c & 0x3f];
1519 if (length < clen) goto invalid;
1520 if (maxbytes > 0 && clen > maxbytes) break;
1521 (*chars) ++; (*bytes) += clen; ptr += clen; length -= clen;
1522 if (maxchars > 0) maxchars--;
1523 if (maxbytes > 0) maxbytes -= clen;
1524 }
1525
1526 return;
1527
1528 invalid:
1529 R_THROW_ERROR("Invalid UTF-8 string, internal error");
1530 }
1531
1532 #ifndef _WIN32
1533
processx__interruptible_poll(struct pollfd fds[],nfds_t nfds,int timeout)1534 int processx__interruptible_poll(struct pollfd fds[],
1535 nfds_t nfds, int timeout) {
1536 int ret = 0;
1537 int timeleft = timeout;
1538
1539 while (timeout < 0 || timeleft > PROCESSX_INTERRUPT_INTERVAL) {
1540 do {
1541 ret = poll(fds, nfds, PROCESSX_INTERRUPT_INTERVAL);
1542 } while (ret == -1 && errno == EINTR);
1543
1544 /* If not a timeout, then return */
1545 if (ret != 0) return ret;
1546
1547 R_CheckUserInterrupt();
1548 timeleft -= PROCESSX_INTERRUPT_INTERVAL;
1549 }
1550
1551 /* Maybe we are not done, and there is a little left from the timeout */
1552 if (timeleft >= 0) {
1553 do {
1554 ret = poll(fds, nfds, timeleft);
1555 } while (ret == -1 && errno == EINTR);
1556 }
1557
1558 return ret;
1559 }
1560
1561 #endif
1562
1563 #ifdef _WIN32
1564
1565 processx__connection_freelist_t freelist_head = { 0, 0 };
1566 processx__connection_freelist_t *freelist = &freelist_head;
1567
processx__connection_freelist_add(processx_connection_t * ccon)1568 int processx__connection_freelist_add(processx_connection_t *ccon) {
1569 if (ccon->handle.freelist) return 0;
1570 processx__connection_freelist_t *node =
1571 calloc(1, sizeof(processx__connection_freelist_t));
1572 if (!node) R_THROW_ERROR("Cannot add to connection freelist, this is a leak");
1573
1574 node->ccon = ccon;
1575 node->next = freelist->next;
1576 freelist->next = node;
1577 ccon->handle.freelist = TRUE;
1578
1579 return 0;
1580 }
1581
processx__connection_freelist_remove(processx_connection_t * ccon)1582 void processx__connection_freelist_remove(processx_connection_t *ccon) {
1583 processx__connection_freelist_t *prev = freelist, *ptr = freelist->next;
1584 while (ptr) {
1585 if (ptr->ccon == ccon) {
1586 prev->next = ptr->next;
1587 free(ptr);
1588 return;
1589 }
1590 prev = ptr;
1591 ptr = ptr->next;
1592 }
1593 }
1594
processx__connection_schedule_destroy(processx_connection_t * ccon)1595 int processx__connection_schedule_destroy(processx_connection_t *ccon) {
1596 /* The connection is already closed here, but reads might still be
1597 pending... if this is the case, then we add the connection to the
1598 free list. */
1599 if (ccon->handle.read_pending) {
1600 processx__connection_freelist_add(ccon);
1601 return 1;
1602
1603 } else {
1604 return 0;
1605 }
1606 }
1607
1608 #endif
1609
1610 #ifdef _WIN32
1611
processx_is_valid_fd(SEXP fd)1612 SEXP processx_is_valid_fd(SEXP fd) {
1613 int cfd = INTEGER(fd)[0];
1614 HANDLE hnd = (HANDLE) _get_osfhandle(cfd);
1615 int valid =
1616 hnd != INVALID_HANDLE_VALUE &&
1617 hnd != NULL &&
1618 hnd != (HANDLE) (-2);
1619 return ScalarLogical(valid);
1620 }
1621
1622 #else
1623
processx_is_valid_fd(SEXP fd)1624 SEXP processx_is_valid_fd(SEXP fd) {
1625 int cfd = INTEGER(fd)[0];
1626 errno = 0;
1627 int valid = fcntl(cfd, F_GETFD) != -1 || errno != EBADF;
1628 return ScalarLogical(valid);
1629 }
1630
1631 #endif
1632
1633 #undef PROCESSX_CHECK_VALID_CONN
1634