1 
2 #ifndef PROCESSX_CONNECTION_H
3 #define PROCESSX_CONNECTION_H
4 
5 #ifndef _GNU_SOURCE
6 #define _GNU_SOURCE 1
7 #endif
8 
9 #ifdef __INTEL_COMPILER
10 #define _BSD_SOURCE 1
11 #define _POSIX_C_SOURCE  200809L
12 #endif
13 
14 #ifdef _WIN32
15 #ifndef FD_SETSIZE
16 #define FD_SETSIZE 32767
17 #endif
18 #include <winsock2.h>
19 #include <windows.h>
20 #else
21 #include <unistd.h>
22 #endif
23 
24 #include "errors.h"
25 
26 #include <Rinternals.h>
27 #include <R_ext/Riconv.h>
28 
29 /* --------------------------------------------------------------------- */
30 /* Data types                                                            */
31 /* --------------------------------------------------------------------- */
32 
33 #define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
34 
35 #ifdef _WIN32
36 typedef HANDLE processx_file_handle_t;
37 typedef struct {
38   HANDLE handle;
39   OVERLAPPED overlapped;
40   BOOLEAN async;
41   BOOLEAN read_pending;
42   BOOLEAN freelist;
43 } processx_i_connection_t;
44 #else
45 typedef int processx_file_handle_t;
46 typedef int processx_i_connection_t;
47 #endif
48 
49 typedef enum {
50   PROCESSX_FILE_TYPE_FILE = 1,	/* regular file, blocking IO */
51   PROCESSX_FILE_TYPE_ASYNCFILE,	/* regular file, async IO (well, win only) */
52   PROCESSX_FILE_TYPE_PIPE,	/* pipe, blocking IO */
53   PROCESSX_FILE_TYPE_ASYNCPIPE	/* pipe, async IO */
54 } processx_file_type_t;
55 
56 typedef struct processx_connection_s {
57   processx_file_type_t type;
58 
59   int is_closed_;
60   int is_eof_;			/* the UTF8 buffer */
61   int is_eof_raw_;		/* the raw file */
62   int close_on_destroy;
63 
64   char *encoding;
65   void *iconv_ctx;
66 
67   processx_i_connection_t handle;
68 
69   char* buffer;
70   size_t buffer_allocated_size;
71   size_t buffer_data_size;
72 
73   char *utf8;
74   size_t utf8_allocated_size;
75   size_t utf8_data_size;
76 
77   int poll_idx;
78 } processx_connection_t;
79 
80 struct processx_pollable_s;
81 
82 /* Generic poll method
83  *
84  * @param object The thing to poll.
85  * @param handle A handle can be returned here, to `poll` or wait on.
86  *   If this is not needed, set it to NULL.
87  * @param timeout A timeout value can be returned here, for the next
88  *   poll. If this is not needed, set it to NULL.
89  * @return The result of the pre-polling. PXCLOSED, PXREADY or PXSILENT.
90  *   PXREADY: data is readily available, at least one character.
91  *     (But maybe not a full line.)
92  *   PXSILENT: we don't know if data is available, we need to check the
93  *     operating system via `poll` or `WaitForStatus`.
94  *   PXHANDLE
95  *   PXPOLLFD
96  */
97 
98 typedef int (*processx_connection_pre_poll_func_t)(
99   struct processx_pollable_s *pollable);
100 
101 /* Data structure for a pollable object
102  *
103  * @member pre_poll_func The function to call on the object, before
104  *   the poll/wait system call. The pollable object might have data
105  *   available without immediately, without poll/wait. If not, it
106  *   will return the file descriptor or HANDLE to poll.
107  * @member object The object to pass to `poll_func`.
108  * @member free Whether to call `free()` on `object` when finalizing
109  *   `processx_pollable_t` objects.
110  * @member event The result of the polling is stored here. Possible values:
111  *   `PXSILENT` (no data), `PXREADY` (data), `PXTIMEOUT` (timeout).
112  * @member fd If the pollable is an fd, then it is stored here instead of
113  *   in `object`, for simplicity.
114  */
115 
116 typedef struct processx_pollable_s {
117   processx_connection_pre_poll_func_t pre_poll_func;
118   void *object;
119   int free;
120   int event;
121   processx_file_handle_t handle;
122   SEXP fds;
123 } processx_pollable_t;
124 
125 /* --------------------------------------------------------------------- */
126 /* API from R                                                            */
127 /* --------------------------------------------------------------------- */
128 
129 /* Create connection from fd / HANDLE */
130 SEXP processx_connection_create(SEXP handle, SEXP encoding);
131 
132 /* Create from fd, this is only different on Windows */
133 SEXP processx_connection_create_fd(SEXP handle, SEXP encoding, SEXP close);
134 
135 /* Create file connection */
136 SEXP processx_connection_create_file(SEXP filename, SEXP read, SEXP write);
137 
138 /* Read characters in a given encoding from the connection. */
139 SEXP processx_connection_read_chars(SEXP con, SEXP nchars);
140 
141 /* Read lines of characters from the connection. */
142 SEXP processx_connection_read_lines(SEXP con, SEXP nlines);
143 
144 /* Write characters */
145 SEXP processx_connection_write_bytes(SEXP con, SEXP chars);
146 
147 /* Check if the connection has ended. */
148 SEXP processx_connection_is_eof(SEXP con);
149 
150 /* Close the connection. */
151 SEXP processx_connection_close(SEXP con);
152 SEXP processx_is_closed(SEXP con);
153 
154 /* Poll connections and other pollable handles */
155 SEXP processx_connection_poll(SEXP pollables, SEXP timeout);
156 
157 /* Functions for connection inheritance */
158 SEXP processx_connection_create_pipepair();
159 
160 SEXP processx_connection_set_stdout(SEXP con, SEXP drop);
161 
162 SEXP processx_connection_set_stderr(SEXP con, SEXP drop);
163 
164 SEXP processx_connection_get_fileno(SEXP con);
165 
166 SEXP processx_connection_disable_inheritance();
167 
168 SEXP processx_is_valid_fd(SEXP fd);
169 
170 /* --------------------------------------------------------------------- */
171 /* API from C                                                            */
172 /* --------------------------------------------------------------------- */
173 
174 /* Create connection object */
175 processx_connection_t *processx_c_connection_create(
176   processx_file_handle_t os_handle,
177   processx_file_type_t type,
178   const char *encoding,
179   SEXP *r_connection);
180 
181 /* Destroy connection object. We need this for the C API */
182 void processx_c_connection_destroy(processx_connection_t *ccon);
183 
184 /* Read characters */
185 ssize_t processx_c_connection_read_chars(
186   processx_connection_t *con,
187   void *buffer,
188   size_t nbyte);
189 
190 /* Read lines of characters */
191 ssize_t processx_c_connection_read_line(
192   processx_connection_t *ccon,
193   char **linep,
194   size_t *linecapp);
195 
196 /* Write characters */
197 ssize_t processx_c_connection_write_bytes(
198   processx_connection_t *con,
199   const void *buffer,
200   size_t nbytes);
201 
202 /* Check if the connection has ended */
203 int processx_c_connection_is_eof(
204   processx_connection_t *con);
205 
206 /* Close */
207 void processx_c_connection_close(
208   processx_connection_t *con);
209 int processx_c_connection_is_closed(
210   processx_connection_t *con);
211 
212 /* Poll connections and other pollable handles */
213 int processx_c_connection_poll(
214   processx_pollable_t pollables[],
215   size_t npollables, int timeout);
216 
217 /* Helper function to create pollable handles*/
218 int processx_c_pollable_from_connection(
219   processx_pollable_t *pollable,
220   processx_connection_t *ccon);
221 
222 int processx_c_pollable_from_curl(
223   processx_pollable_t *pollable, SEXP fds);
224 
225 processx_file_handle_t processx_c_connection_fileno(
226   const processx_connection_t *con);
227 
228 /* --------------------------------------------------------------------- */
229 /* Internals                                                             */
230 /* --------------------------------------------------------------------- */
231 
232 #ifndef _WIN32
233 typedef unsigned long DWORD;
234 #endif
235 
236 /* Threading in Windows */
237 
238 #ifdef _WIN32
239 int processx__start_thread();
240 extern HANDLE processx__iocp_thread;
241 extern HANDLE processx__thread_start;
242 extern HANDLE processx__thread_done;
243 
244 extern fd_set processx__readfds, processx__writefds,
245   processx__exceptionfds;
246 extern SOCKET processx__notify_socket[2];
247 extern int processx__select;
248 extern ULONG_PTR processx__key_none;
249 
250 extern int processx__thread_cmd;
251 #define PROCESSX__THREAD_CMD_INIT 0
252 #define PROCESSX__THREAD_CMD_IDLE 1
253 #define PROCESSX__THREAD_CMD_READFILE 2
254 #define PROCESSX__THREAD_CMD_GETSTATUS 3
255 
256 BOOL processx__thread_readfile(processx_connection_t *ccon,
257 			       LPVOID lpBuffer,
258 			       DWORD nNumberOfBytesToRead,
259 			       LPDWORD lpNumberOfBytesRead);
260 BOOL processx__thread_getstatus(LPDWORD lpNumberOfBytes,
261 				PULONG_PTR lpCompletionKey,
262 				LPOVERLAPPED *lpOverlapped,
263 				DWORD dwMilliseconds);
264 BOOL processx__thread_getstatus_select(LPDWORD lpNumberOfBytes,
265 				       PULONG_PTR lpCompletionKey,
266 				       LPOVERLAPPED *lpOverlapped,
267 				       DWORD dwMilliseconds);
268 DWORD processx__thread_get_last_error();
269 
270 #endif
271 
272 /* Free-list of connection in Windows */
273 
274 typedef struct processx__connection_freelist_s {
275   processx_connection_t *ccon;
276   struct processx__connection_freelist_s *next;
277 } processx__connection_freelist_t;
278 
279 int processx__connection_freelist_add(processx_connection_t *con);
280 void processx__connection_freelist_remove(processx_connection_t *con);
281 int processx__connection_schedule_destroy(processx_connection_t *con);
282 
283 #endif
284