1 2 #ifndef PROCESSX_CONNECTION_H 3 #define PROCESSX_CONNECTION_H 4 5 #ifndef _GNU_SOURCE 6 #define _GNU_SOURCE 1 7 #endif 8 9 #ifdef __INTEL_COMPILER 10 #define _BSD_SOURCE 1 11 #define _POSIX_C_SOURCE 200809L 12 #endif 13 14 #ifdef _WIN32 15 #ifndef FD_SETSIZE 16 #define FD_SETSIZE 32767 17 #endif 18 #include <winsock2.h> 19 #include <windows.h> 20 #else 21 #include <unistd.h> 22 #endif 23 24 #include "errors.h" 25 26 #include <Rinternals.h> 27 #include <R_ext/Riconv.h> 28 29 /* --------------------------------------------------------------------- */ 30 /* Data types */ 31 /* --------------------------------------------------------------------- */ 32 33 #define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) 34 35 #ifdef _WIN32 36 typedef HANDLE processx_file_handle_t; 37 typedef struct { 38 HANDLE handle; 39 OVERLAPPED overlapped; 40 BOOLEAN async; 41 BOOLEAN read_pending; 42 BOOLEAN freelist; 43 } processx_i_connection_t; 44 #else 45 typedef int processx_file_handle_t; 46 typedef int processx_i_connection_t; 47 #endif 48 49 typedef enum { 50 PROCESSX_FILE_TYPE_FILE = 1, /* regular file, blocking IO */ 51 PROCESSX_FILE_TYPE_ASYNCFILE, /* regular file, async IO (well, win only) */ 52 PROCESSX_FILE_TYPE_PIPE, /* pipe, blocking IO */ 53 PROCESSX_FILE_TYPE_ASYNCPIPE /* pipe, async IO */ 54 } processx_file_type_t; 55 56 typedef struct processx_connection_s { 57 processx_file_type_t type; 58 59 int is_closed_; 60 int is_eof_; /* the UTF8 buffer */ 61 int is_eof_raw_; /* the raw file */ 62 int close_on_destroy; 63 64 char *encoding; 65 void *iconv_ctx; 66 67 processx_i_connection_t handle; 68 69 char* buffer; 70 size_t buffer_allocated_size; 71 size_t buffer_data_size; 72 73 char *utf8; 74 size_t utf8_allocated_size; 75 size_t utf8_data_size; 76 77 int poll_idx; 78 } processx_connection_t; 79 80 struct processx_pollable_s; 81 82 /* Generic poll method 83 * 84 * @param object The thing to poll. 85 * @param handle A handle can be returned here, to `poll` or wait on. 86 * If this is not needed, set it to NULL. 87 * @param timeout A timeout value can be returned here, for the next 88 * poll. If this is not needed, set it to NULL. 89 * @return The result of the pre-polling. PXCLOSED, PXREADY or PXSILENT. 90 * PXREADY: data is readily available, at least one character. 91 * (But maybe not a full line.) 92 * PXSILENT: we don't know if data is available, we need to check the 93 * operating system via `poll` or `WaitForStatus`. 94 * PXHANDLE 95 * PXPOLLFD 96 */ 97 98 typedef int (*processx_connection_pre_poll_func_t)( 99 struct processx_pollable_s *pollable); 100 101 /* Data structure for a pollable object 102 * 103 * @member pre_poll_func The function to call on the object, before 104 * the poll/wait system call. The pollable object might have data 105 * available without immediately, without poll/wait. If not, it 106 * will return the file descriptor or HANDLE to poll. 107 * @member object The object to pass to `poll_func`. 108 * @member free Whether to call `free()` on `object` when finalizing 109 * `processx_pollable_t` objects. 110 * @member event The result of the polling is stored here. Possible values: 111 * `PXSILENT` (no data), `PXREADY` (data), `PXTIMEOUT` (timeout). 112 * @member fd If the pollable is an fd, then it is stored here instead of 113 * in `object`, for simplicity. 114 */ 115 116 typedef struct processx_pollable_s { 117 processx_connection_pre_poll_func_t pre_poll_func; 118 void *object; 119 int free; 120 int event; 121 processx_file_handle_t handle; 122 SEXP fds; 123 } processx_pollable_t; 124 125 /* --------------------------------------------------------------------- */ 126 /* API from R */ 127 /* --------------------------------------------------------------------- */ 128 129 /* Create connection from fd / HANDLE */ 130 SEXP processx_connection_create(SEXP handle, SEXP encoding); 131 132 /* Create from fd, this is only different on Windows */ 133 SEXP processx_connection_create_fd(SEXP handle, SEXP encoding, SEXP close); 134 135 /* Create file connection */ 136 SEXP processx_connection_create_file(SEXP filename, SEXP read, SEXP write); 137 138 /* Read characters in a given encoding from the connection. */ 139 SEXP processx_connection_read_chars(SEXP con, SEXP nchars); 140 141 /* Read lines of characters from the connection. */ 142 SEXP processx_connection_read_lines(SEXP con, SEXP nlines); 143 144 /* Write characters */ 145 SEXP processx_connection_write_bytes(SEXP con, SEXP chars); 146 147 /* Check if the connection has ended. */ 148 SEXP processx_connection_is_eof(SEXP con); 149 150 /* Close the connection. */ 151 SEXP processx_connection_close(SEXP con); 152 SEXP processx_is_closed(SEXP con); 153 154 /* Poll connections and other pollable handles */ 155 SEXP processx_connection_poll(SEXP pollables, SEXP timeout); 156 157 /* Functions for connection inheritance */ 158 SEXP processx_connection_create_pipepair(); 159 160 SEXP processx_connection_set_stdout(SEXP con, SEXP drop); 161 162 SEXP processx_connection_set_stderr(SEXP con, SEXP drop); 163 164 SEXP processx_connection_get_fileno(SEXP con); 165 166 SEXP processx_connection_disable_inheritance(); 167 168 SEXP processx_is_valid_fd(SEXP fd); 169 170 /* --------------------------------------------------------------------- */ 171 /* API from C */ 172 /* --------------------------------------------------------------------- */ 173 174 /* Create connection object */ 175 processx_connection_t *processx_c_connection_create( 176 processx_file_handle_t os_handle, 177 processx_file_type_t type, 178 const char *encoding, 179 SEXP *r_connection); 180 181 /* Destroy connection object. We need this for the C API */ 182 void processx_c_connection_destroy(processx_connection_t *ccon); 183 184 /* Read characters */ 185 ssize_t processx_c_connection_read_chars( 186 processx_connection_t *con, 187 void *buffer, 188 size_t nbyte); 189 190 /* Read lines of characters */ 191 ssize_t processx_c_connection_read_line( 192 processx_connection_t *ccon, 193 char **linep, 194 size_t *linecapp); 195 196 /* Write characters */ 197 ssize_t processx_c_connection_write_bytes( 198 processx_connection_t *con, 199 const void *buffer, 200 size_t nbytes); 201 202 /* Check if the connection has ended */ 203 int processx_c_connection_is_eof( 204 processx_connection_t *con); 205 206 /* Close */ 207 void processx_c_connection_close( 208 processx_connection_t *con); 209 int processx_c_connection_is_closed( 210 processx_connection_t *con); 211 212 /* Poll connections and other pollable handles */ 213 int processx_c_connection_poll( 214 processx_pollable_t pollables[], 215 size_t npollables, int timeout); 216 217 /* Helper function to create pollable handles*/ 218 int processx_c_pollable_from_connection( 219 processx_pollable_t *pollable, 220 processx_connection_t *ccon); 221 222 int processx_c_pollable_from_curl( 223 processx_pollable_t *pollable, SEXP fds); 224 225 processx_file_handle_t processx_c_connection_fileno( 226 const processx_connection_t *con); 227 228 /* --------------------------------------------------------------------- */ 229 /* Internals */ 230 /* --------------------------------------------------------------------- */ 231 232 #ifndef _WIN32 233 typedef unsigned long DWORD; 234 #endif 235 236 /* Threading in Windows */ 237 238 #ifdef _WIN32 239 int processx__start_thread(); 240 extern HANDLE processx__iocp_thread; 241 extern HANDLE processx__thread_start; 242 extern HANDLE processx__thread_done; 243 244 extern fd_set processx__readfds, processx__writefds, 245 processx__exceptionfds; 246 extern SOCKET processx__notify_socket[2]; 247 extern int processx__select; 248 extern ULONG_PTR processx__key_none; 249 250 extern int processx__thread_cmd; 251 #define PROCESSX__THREAD_CMD_INIT 0 252 #define PROCESSX__THREAD_CMD_IDLE 1 253 #define PROCESSX__THREAD_CMD_READFILE 2 254 #define PROCESSX__THREAD_CMD_GETSTATUS 3 255 256 BOOL processx__thread_readfile(processx_connection_t *ccon, 257 LPVOID lpBuffer, 258 DWORD nNumberOfBytesToRead, 259 LPDWORD lpNumberOfBytesRead); 260 BOOL processx__thread_getstatus(LPDWORD lpNumberOfBytes, 261 PULONG_PTR lpCompletionKey, 262 LPOVERLAPPED *lpOverlapped, 263 DWORD dwMilliseconds); 264 BOOL processx__thread_getstatus_select(LPDWORD lpNumberOfBytes, 265 PULONG_PTR lpCompletionKey, 266 LPOVERLAPPED *lpOverlapped, 267 DWORD dwMilliseconds); 268 DWORD processx__thread_get_last_error(); 269 270 #endif 271 272 /* Free-list of connection in Windows */ 273 274 typedef struct processx__connection_freelist_s { 275 processx_connection_t *ccon; 276 struct processx__connection_freelist_s *next; 277 } processx__connection_freelist_t; 278 279 int processx__connection_freelist_add(processx_connection_t *con); 280 void processx__connection_freelist_remove(processx_connection_t *con); 281 int processx__connection_schedule_destroy(processx_connection_t *con); 282 283 #endif 284