1 /* t-poll.c - Check the poll function
2 * Copyright (C) 2015 g10 Code GmbH
3 *
4 * This file is part of libgpg-error.
5 *
6 * libgpg-error is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public License
8 * as published by the Free Software Foundation; either version 2.1 of
9 * the License, or (at your option) any later version.
10 *
11 * libgpg-error is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this program; if not, see <https://www.gnu.org/licenses/>.
18 */
19
20 /* FIXME: We need much better tests that this very basic one. */
21
22 #if HAVE_CONFIG_H
23 # include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <assert.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #ifdef _WIN32
34 # include <windows.h>
35 # include <time.h>
36 #else
37 # ifdef USE_POSIX_THREADS
38 # include <pthread.h>
39 # endif
40 #endif
41
42 #define PGM "t-poll"
43
44 #include "t-common.h"
45
46 #ifdef _WIN32
47 # define THREAD_RET_TYPE DWORD WINAPI
48 # define THREAD_RET_VALUE 0
49 #else
50 # define THREAD_RET_TYPE void *
51 # define THREAD_RET_VALUE NULL
52 #endif
53
54
55 /* Object to convey data to a thread. */
56 struct thread_arg
57 {
58 const char *name;
59 estream_t stream;
60 volatile int stop_me;
61 #ifdef USE_POSIX_THREADS
62 pthread_t thread;
63 #elif _WIN32
64 HANDLE thread;
65 #endif
66 };
67
68
69 static struct thread_arg peer_stdin; /* Thread to feed the stdin. */
70 static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
71 static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
72
73 static estream_t test_stdin;
74 static estream_t test_stdout;
75 static estream_t test_stderr;
76
77 #if defined(_WIN32) || defined(USE_POSIX_THREADS)
78
79 /* This thread feeds data to the given stream. */
80 static THREAD_RET_TYPE
producer_thread(void * argaddr)81 producer_thread (void *argaddr)
82 {
83 struct thread_arg *arg = argaddr;
84 int i = 0;
85
86 (void)arg;
87
88 while (!arg->stop_me && i++ < 3)
89 {
90 show ("thread '%s' about to write\n", arg->name);
91 es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
92 es_fflush (arg->stream);
93 }
94 es_fclose (arg->stream);
95 return THREAD_RET_VALUE;
96 }
97
98 /* This thread eats data from the given stream. */
99 static THREAD_RET_TYPE
consumer_thread(void * argaddr)100 consumer_thread (void *argaddr)
101 {
102 struct thread_arg *arg = argaddr;
103 char buf[15];
104
105 (void)arg;
106
107 while (!arg->stop_me)
108 {
109 show ("thread '%s' ready to read\n", arg->name);
110 if (!es_fgets (buf, sizeof buf, arg->stream))
111 {
112 show ("Thread '%s' received EOF or error\n", arg->name);
113 break;
114 }
115 show ("Thread '%s' got: '%s'\n", arg->name, buf);
116 }
117 es_fclose (arg->stream);
118 return THREAD_RET_VALUE;
119 }
120
121 #endif /*_WIN32 || USE_POSIX_THREADS */
122
123
124 static void
launch_thread(THREAD_RET_TYPE (* fnc)(void *),struct thread_arg * th)125 launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
126 {
127 int fd;
128
129 th->stop_me = 0;
130 fd = es_fileno (th->stream);
131 #ifdef _WIN32
132
133 th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
134 if (!th->thread)
135 die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
136 show ("thread '%s' launched (fd=%d)\n", th->name, fd);
137
138 #elif USE_POSIX_THREADS
139
140 if (pthread_create (&th->thread, NULL, fnc, th))
141 die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
142 show ("thread '%s' launched (fd=%d)\n", th->name, fd);
143
144 # else /* no thread support */
145
146 verbose++;
147 show ("no thread support - skipping test\n", PGM);
148 verbose--;
149
150 #endif /* no thread support */
151 }
152
153
154 static void
join_thread(struct thread_arg * th)155 join_thread (struct thread_arg *th)
156 {
157 #ifdef _WIN32
158 int rc;
159
160 rc = WaitForSingleObject (th->thread, INFINITE);
161 if (rc == WAIT_OBJECT_0)
162 show ("thread '%s' has terminated\n", th->name);
163 else
164 fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
165 CloseHandle (th->thread);
166
167 #elif USE_POSIX_THREADS
168
169 pthread_join (th->thread, NULL);
170 show ("thread '%s' has terminated\n", th->name);
171
172 #endif
173 }
174
175
176 static void
create_pipe(estream_t * r_in,estream_t * r_out)177 create_pipe (estream_t *r_in, estream_t *r_out)
178 {
179 gpg_error_t err;
180 int filedes[2];
181
182 #ifdef _WIN32
183 if (_pipe (filedes, 512, 0) == -1)
184 #else
185 if (pipe (filedes) == -1)
186 #endif
187 {
188 err = gpg_error_from_syserror ();
189 die ("error creating a pipe: %s\n", gpg_strerror (err));
190 }
191
192 show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
193
194 *r_in = es_fdopen (filedes[0], "r,pollable");
195 if (!*r_in)
196 {
197 err = gpg_error_from_syserror ();
198 die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
199 }
200
201 *r_out = es_fdopen (filedes[1], "w,pollable");
202 if (!*r_out)
203 {
204 err = gpg_error_from_syserror ();
205 die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
206 }
207 }
208
209
210 static void
test_poll(void)211 test_poll (void)
212 {
213 int ret;
214 gpgrt_poll_t fds[3];
215 char buffer[16];
216 size_t used, nwritten;
217 int c;
218
219 memset (fds, 0, sizeof fds);
220 fds[0].stream = test_stdin;
221 fds[0].want_read = 1;
222 fds[1].stream = test_stdout;
223 fds[1].want_write = 1;
224 /* FIXME: We don't use the next stream at all. */
225 fds[2].stream = test_stderr;
226 fds[2].want_write = 1;
227 fds[2].ignore = 1;
228
229
230 used = 0;
231 while (used || !fds[0].ignore)
232 {
233 ret = gpgrt_poll (fds, DIM(fds), -1);
234 if (ret == -1)
235 {
236 fail ("gpgrt_poll failed: %s\n", strerror (errno));
237 continue;
238 }
239 if (!ret)
240 {
241 fail ("gpgrt_poll unexpectedly timed out\n");
242 continue;
243 }
244
245 show ("gpgrt_poll detected %d events\n", ret);
246 if (debug)
247 show ("gpgrt_poll: r=%d"
248 " 0:%c%c%c%c%c%c%c%c%c%c%c%c"
249 " 1:%c%c%c%c%c%c%c%c%c%c%c%c"
250 " 2:%c%c%c%c%c%c%c%c%c%c%c%c"
251 "\n",
252 ret,
253 fds[0].want_read? 'r':'-',
254 fds[0].want_write? 'w':'-',
255 fds[0].want_oob? 'o':'-',
256 fds[0].want_rdhup? 'h':'-',
257 fds[0].ignore? '!':'=',
258 fds[0].got_read? 'r':'-',
259 fds[0].got_write? 'w':'-',
260 fds[0].got_oob? 'o':'-',
261 fds[0].got_rdhup? 'h':'-',
262 fds[0].got_hup? 'H':' ',
263 fds[0].got_err? 'e':' ',
264 fds[0].got_nval? 'n':' ',
265
266 fds[1].want_read? 'r':'-',
267 fds[1].want_write? 'w':'-',
268 fds[1].want_oob? 'o':'-',
269 fds[1].want_rdhup? 'h':'-',
270 fds[1].ignore? '!':'=',
271 fds[1].got_read? 'r':'-',
272 fds[1].got_write? 'w':'-',
273 fds[1].got_oob? 'o':'-',
274 fds[1].got_rdhup? 'h':'-',
275 fds[1].got_hup? 'H':' ',
276 fds[1].got_err? 'e':' ',
277 fds[1].got_nval? 'n':' ',
278
279 fds[2].want_read? 'r':'-',
280 fds[2].want_write? 'w':'-',
281 fds[2].want_oob? 'o':'-',
282 fds[2].want_rdhup? 'h':'-',
283 fds[2].ignore? '!':'=',
284 fds[2].got_read? 'r':'-',
285 fds[2].got_write? 'w':'-',
286 fds[2].got_oob? 'o':'-',
287 fds[2].got_rdhup? 'h':'-',
288 fds[2].got_hup? 'H':' ',
289 fds[2].got_err? 'e':' ',
290 fds[2].got_nval? 'n':' '
291 );
292 else
293 show ("gpgrt_poll detected %d events\n", ret);
294
295 if (fds[0].got_read)
296 {
297 /* Read from the producer. */
298 for (;;)
299 {
300 c = es_fgetc (fds[0].stream);
301 if (c == EOF)
302 {
303 if (es_feof (fds[0].stream))
304 {
305 show ("reading '%s': EOF\n", peer_stdin.name);
306 fds[0].ignore = 1; /* Not anymore needed. */
307 peer_stdin.stop_me = 1; /* Tell the thread to stop. */
308 }
309 else if (es_ferror (fds[0].stream))
310 {
311 fail ("error reading '%s': %s\n",
312 peer_stdin.name, strerror (errno));
313 fds[0].ignore = 1; /* Disable. */
314 peer_stdin.stop_me = 1; /* Tell the thread to stop. */
315 }
316 else
317 show ("reading '%s': EAGAIN\n", peer_stdin.name);
318 break;
319 }
320 else
321 {
322 if (used <= sizeof buffer -1)
323 buffer[used++] = c;
324 if (used == sizeof buffer)
325 {
326 show ("throttling reading from '%s'\n", peer_stdin.name);
327 fds[0].ignore = 1;
328 break;
329 }
330 }
331 }
332 show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
333 if (used)
334 fds[1].ignore = 0; /* Data to send. */
335 }
336 if (fds[1].got_write)
337 {
338 if (used)
339 {
340 ret = es_write (fds[1].stream, buffer, used, &nwritten);
341 show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
342 peer_stdout.name, ret, used, nwritten);
343 if (!ret)
344 {
345 assert (nwritten <= used);
346 /* Move the remaining data to the front of buffer. */
347 memmove (buffer, buffer + nwritten,
348 sizeof buffer - nwritten);
349 used -= nwritten;
350 }
351 ret = es_fflush (fds[1].stream);
352 if (ret)
353 fail ("Flushing for '%s' failed: %s\n",
354 peer_stdout.name, strerror (errno));
355 }
356 if (!used)
357 fds[1].ignore = 1; /* No need to send data. */
358 }
359
360 if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
361 {
362 show ("accelerate reading from '%s'\n", peer_stdin.name);
363 fds[0].ignore = 0;
364 }
365 }
366 }
367
368
369 int
main(int argc,char ** argv)370 main (int argc, char **argv)
371 {
372 int last_argc = -1;
373
374 if (argc)
375 {
376 argc--; argv++;
377 }
378 while (argc && last_argc != argc )
379 {
380 last_argc = argc;
381 if (!strcmp (*argv, "--help"))
382 {
383 puts (
384 "usage: ./t-poll [options]\n"
385 "\n"
386 "Options:\n"
387 " --verbose Show what is going on\n"
388 " --debug Flyswatter\n"
389 );
390 exit (0);
391 }
392 if (!strcmp (*argv, "--verbose"))
393 {
394 verbose = 1;
395 argc--; argv++;
396 }
397 else if (!strcmp (*argv, "--debug"))
398 {
399 verbose = debug = 1;
400 argc--; argv++;
401 }
402 }
403
404 if (!gpg_error_check_version (GPG_ERROR_VERSION))
405 {
406 die ("gpg_error_check_version returned an error");
407 errorcount++;
408 }
409
410 peer_stdin.name = "stdin producer";
411 create_pipe (&test_stdin, &peer_stdin.stream);
412 peer_stdout.name = "stdout consumer";
413 create_pipe (&peer_stdout.stream, &test_stdout);
414 peer_stderr.name = "stderr consumer";
415 create_pipe (&peer_stderr.stream, &test_stderr);
416
417 if (es_set_nonblock (test_stdin, 1))
418 fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
419 if (es_set_nonblock (test_stdout, 1))
420 fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
421 if (es_set_nonblock (test_stderr, 1))
422 fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
423
424 launch_thread (producer_thread, &peer_stdin );
425 launch_thread (consumer_thread, &peer_stdout);
426 launch_thread (consumer_thread, &peer_stderr);
427 test_poll ();
428 show ("Waiting for threads to terminate...\n");
429 es_fclose (test_stdin);
430 es_fclose (test_stdout);
431 es_fclose (test_stderr);
432 peer_stdin.stop_me = 1;
433 peer_stdout.stop_me = 1;
434 peer_stderr.stop_me = 1;
435 join_thread (&peer_stdin);
436 join_thread (&peer_stdout);
437 join_thread (&peer_stderr);
438
439 return errorcount ? 1 : 0;
440 }
441