1 /* t-poll.c - Check the poll function
2  * Copyright (C) 2015 g10 Code GmbH
3  *
4  * This file is part of libgpg-error.
5  *
6  * libgpg-error is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either version 2.1 of
9  * the License, or (at your option) any later version.
10  *
11  * libgpg-error is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this program; if not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 /* FIXME: We need much better tests that this very basic one.  */
21 
22 #if HAVE_CONFIG_H
23 # include <config.h>
24 #endif
25 
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <assert.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #ifdef _WIN32
34 # include <windows.h>
35 # include <time.h>
36 #else
37 # ifdef USE_POSIX_THREADS
38 #  include <pthread.h>
39 # endif
40 #endif
41 
42 #define PGM "t-poll"
43 
44 #include "t-common.h"
45 
46 #ifdef _WIN32
47 # define THREAD_RET_TYPE  DWORD WINAPI
48 # define THREAD_RET_VALUE 0
49 #else
50 # define THREAD_RET_TYPE  void *
51 # define THREAD_RET_VALUE NULL
52 #endif
53 
54 
55 /* Object to convey data to a thread.  */
56 struct thread_arg
57 {
58   const char *name;
59   estream_t stream;
60   volatile int stop_me;
61 #ifdef USE_POSIX_THREADS
62   pthread_t thread;
63 #elif _WIN32
64   HANDLE thread;
65 #endif
66 };
67 
68 
69 static struct thread_arg peer_stdin;  /* Thread to feed the stdin.  */
70 static struct thread_arg peer_stdout; /* Thread to feed the stdout. */
71 static struct thread_arg peer_stderr; /* Thread to feed the stderr. */
72 
73 static estream_t test_stdin;
74 static estream_t test_stdout;
75 static estream_t test_stderr;
76 
77 #if defined(_WIN32) || defined(USE_POSIX_THREADS)
78 
79 /* This thread feeds data to the given stream.  */
80 static THREAD_RET_TYPE
producer_thread(void * argaddr)81 producer_thread (void *argaddr)
82 {
83   struct thread_arg *arg = argaddr;
84   int i = 0;
85 
86   (void)arg;
87 
88   while (!arg->stop_me && i++ < 3)
89     {
90       show ("thread '%s' about to write\n", arg->name);
91       es_fprintf (arg->stream, "This is '%s' count=%d\n", arg->name, i);
92       es_fflush (arg->stream);
93     }
94   es_fclose (arg->stream);
95   return THREAD_RET_VALUE;
96 }
97 
98 /* This thread eats data from the given stream.  */
99 static THREAD_RET_TYPE
consumer_thread(void * argaddr)100 consumer_thread (void *argaddr)
101 {
102   struct thread_arg *arg = argaddr;
103   char buf[15];
104 
105   (void)arg;
106 
107   while (!arg->stop_me)
108     {
109       show ("thread '%s' ready to read\n", arg->name);
110       if (!es_fgets (buf, sizeof buf, arg->stream))
111         {
112           show ("Thread '%s' received EOF or error\n", arg->name);
113           break;
114         }
115       show ("Thread '%s' got: '%s'\n", arg->name, buf);
116     }
117   es_fclose (arg->stream);
118   return THREAD_RET_VALUE;
119 }
120 
121 #endif /*_WIN32 || USE_POSIX_THREADS */
122 
123 
124 static void
launch_thread(THREAD_RET_TYPE (* fnc)(void *),struct thread_arg * th)125 launch_thread (THREAD_RET_TYPE (*fnc)(void *), struct thread_arg *th)
126 {
127   int fd;
128 
129   th->stop_me = 0;
130   fd = es_fileno (th->stream);
131 #ifdef _WIN32
132 
133   th->thread = CreateThread (NULL, 0, fnc, th, 0, NULL);
134   if (!th->thread)
135     die ("creating thread '%s' failed: rc=%d", th->name, (int)GetLastError ());
136   show ("thread '%s' launched (fd=%d)\n", th->name, fd);
137 
138 #elif USE_POSIX_THREADS
139 
140   if (pthread_create (&th->thread, NULL, fnc, th))
141     die ("creating thread '%s' failed: %s\n", th->name, strerror (errno));
142   show ("thread '%s' launched (fd=%d)\n", th->name, fd);
143 
144 # else /* no thread support */
145 
146   verbose++;
147   show ("no thread support - skipping test\n", PGM);
148   verbose--;
149 
150 #endif /* no thread support */
151 }
152 
153 
154 static void
join_thread(struct thread_arg * th)155 join_thread (struct thread_arg *th)
156 {
157 #ifdef _WIN32
158   int rc;
159 
160   rc = WaitForSingleObject (th->thread, INFINITE);
161   if (rc == WAIT_OBJECT_0)
162     show ("thread '%s' has terminated\n", th->name);
163   else
164     fail ("waiting for thread '%s' failed: %d", th->name, (int)GetLastError ());
165   CloseHandle (th->thread);
166 
167 #elif USE_POSIX_THREADS
168 
169   pthread_join (th->thread, NULL);
170   show ("thread '%s' has terminated\n", th->name);
171 
172 #endif
173 }
174 
175 
176 static void
create_pipe(estream_t * r_in,estream_t * r_out)177 create_pipe (estream_t *r_in, estream_t *r_out)
178 {
179   gpg_error_t err;
180   int filedes[2];
181 
182 #ifdef _WIN32
183   if (_pipe (filedes, 512, 0) == -1)
184 #else
185   if (pipe (filedes) == -1)
186 #endif
187     {
188       err = gpg_error_from_syserror ();
189       die ("error creating a pipe: %s\n", gpg_strerror (err));
190     }
191 
192   show ("created pipe [%d, %d]\n", filedes[0], filedes[1]);
193 
194   *r_in = es_fdopen (filedes[0], "r,pollable");
195   if (!*r_in)
196     {
197       err = gpg_error_from_syserror ();
198       die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
199     }
200 
201   *r_out = es_fdopen (filedes[1], "w,pollable");
202   if (!*r_out)
203     {
204       err = gpg_error_from_syserror ();
205       die ("error creating a stream for a pipe: %s\n", gpg_strerror (err));
206     }
207 }
208 
209 
210 static void
test_poll(void)211 test_poll (void)
212 {
213   int ret;
214   gpgrt_poll_t fds[3];
215   char buffer[16];
216   size_t used, nwritten;
217   int c;
218 
219   memset (fds, 0, sizeof fds);
220   fds[0].stream = test_stdin;
221   fds[0].want_read = 1;
222   fds[1].stream = test_stdout;
223   fds[1].want_write = 1;
224   /* FIXME: We don't use the next stream at all.  */
225   fds[2].stream = test_stderr;
226   fds[2].want_write = 1;
227   fds[2].ignore = 1;
228 
229 
230   used = 0;
231   while (used || !fds[0].ignore)
232     {
233       ret = gpgrt_poll (fds, DIM(fds), -1);
234       if (ret == -1)
235         {
236           fail ("gpgrt_poll failed: %s\n", strerror (errno));
237           continue;
238         }
239       if (!ret)
240         {
241           fail ("gpgrt_poll unexpectedly timed out\n");
242           continue;
243         }
244 
245       show ("gpgrt_poll detected %d events\n", ret);
246       if (debug)
247         show ("gpgrt_poll: r=%d"
248               " 0:%c%c%c%c%c%c%c%c%c%c%c%c"
249               " 1:%c%c%c%c%c%c%c%c%c%c%c%c"
250               " 2:%c%c%c%c%c%c%c%c%c%c%c%c"
251               "\n",
252               ret,
253               fds[0].want_read?  'r':'-',
254               fds[0].want_write? 'w':'-',
255               fds[0].want_oob?   'o':'-',
256               fds[0].want_rdhup? 'h':'-',
257               fds[0].ignore?     '!':'=',
258               fds[0].got_read?   'r':'-',
259               fds[0].got_write?  'w':'-',
260               fds[0].got_oob?    'o':'-',
261               fds[0].got_rdhup?  'h':'-',
262               fds[0].got_hup?    'H':' ',
263               fds[0].got_err?    'e':' ',
264               fds[0].got_nval?   'n':' ',
265 
266               fds[1].want_read?  'r':'-',
267               fds[1].want_write? 'w':'-',
268               fds[1].want_oob?   'o':'-',
269               fds[1].want_rdhup? 'h':'-',
270               fds[1].ignore?     '!':'=',
271               fds[1].got_read?   'r':'-',
272               fds[1].got_write?  'w':'-',
273               fds[1].got_oob?    'o':'-',
274               fds[1].got_rdhup?  'h':'-',
275               fds[1].got_hup?    'H':' ',
276               fds[1].got_err?    'e':' ',
277               fds[1].got_nval?   'n':' ',
278 
279               fds[2].want_read?  'r':'-',
280               fds[2].want_write? 'w':'-',
281               fds[2].want_oob?   'o':'-',
282               fds[2].want_rdhup? 'h':'-',
283               fds[2].ignore?     '!':'=',
284               fds[2].got_read?   'r':'-',
285               fds[2].got_write?  'w':'-',
286               fds[2].got_oob?    'o':'-',
287               fds[2].got_rdhup?  'h':'-',
288               fds[2].got_hup?    'H':' ',
289               fds[2].got_err?    'e':' ',
290               fds[2].got_nval?   'n':' '
291               );
292       else
293         show ("gpgrt_poll detected %d events\n", ret);
294 
295       if (fds[0].got_read)
296         {
297           /* Read from the producer.  */
298           for (;;)
299             {
300               c = es_fgetc (fds[0].stream);
301               if (c == EOF)
302                 {
303                   if (es_feof (fds[0].stream))
304                     {
305                       show ("reading '%s': EOF\n", peer_stdin.name);
306                       fds[0].ignore = 1; /* Not anymore needed.  */
307                       peer_stdin.stop_me = 1; /* Tell the thread to stop.  */
308                     }
309                   else if (es_ferror (fds[0].stream))
310                     {
311                       fail ("error reading '%s': %s\n",
312                             peer_stdin.name, strerror (errno));
313                       fds[0].ignore = 1;    /* Disable.  */
314                       peer_stdin.stop_me = 1; /* Tell the thread to stop.  */
315                     }
316                   else
317                     show ("reading '%s': EAGAIN\n", peer_stdin.name);
318                   break;
319                 }
320               else
321                 {
322                   if (used <= sizeof buffer -1)
323                     buffer[used++] = c;
324                   if (used == sizeof buffer)
325                     {
326                       show ("throttling reading from '%s'\n", peer_stdin.name);
327                       fds[0].ignore = 1;
328                       break;
329                     }
330                 }
331             }
332           show ("read from '%s': %zu bytes\n", peer_stdin.name, used);
333           if (used)
334             fds[1].ignore = 0; /* Data to send.  */
335         }
336       if (fds[1].got_write)
337         {
338           if (used)
339             {
340               ret = es_write (fds[1].stream, buffer, used, &nwritten);
341               show ("result for writing to '%s': ret=%d, n=%zu, nwritten=%zu\n",
342                     peer_stdout.name, ret, used, nwritten);
343               if (!ret)
344                 {
345                   assert (nwritten <= used);
346                   /* Move the remaining data to the front of buffer.  */
347                   memmove (buffer, buffer + nwritten,
348                            sizeof buffer - nwritten);
349                   used -= nwritten;
350                 }
351               ret = es_fflush (fds[1].stream);
352               if (ret)
353                 fail ("Flushing for '%s' failed: %s\n",
354                       peer_stdout.name, strerror (errno));
355             }
356           if (!used)
357             fds[1].ignore = 1; /* No need to send data.  */
358         }
359 
360       if (used < sizeof buffer / 2 && !peer_stdin.stop_me && fds[0].ignore)
361         {
362           show ("accelerate reading from '%s'\n", peer_stdin.name);
363           fds[0].ignore = 0;
364         }
365     }
366 }
367 
368 
369 int
main(int argc,char ** argv)370 main (int argc, char **argv)
371 {
372   int last_argc = -1;
373 
374   if (argc)
375     {
376       argc--; argv++;
377     }
378   while (argc && last_argc != argc )
379     {
380       last_argc = argc;
381       if (!strcmp (*argv, "--help"))
382         {
383           puts (
384 "usage: ./t-poll [options]\n"
385 "\n"
386 "Options:\n"
387 "  --verbose      Show what is going on\n"
388 "  --debug        Flyswatter\n"
389 );
390           exit (0);
391         }
392       if (!strcmp (*argv, "--verbose"))
393         {
394           verbose = 1;
395           argc--; argv++;
396         }
397       else if (!strcmp (*argv, "--debug"))
398         {
399           verbose = debug = 1;
400           argc--; argv++;
401         }
402     }
403 
404   if (!gpg_error_check_version (GPG_ERROR_VERSION))
405     {
406       die ("gpg_error_check_version returned an error");
407       errorcount++;
408     }
409 
410   peer_stdin.name  = "stdin producer";
411   create_pipe (&test_stdin, &peer_stdin.stream);
412   peer_stdout.name = "stdout consumer";
413   create_pipe (&peer_stdout.stream, &test_stdout);
414   peer_stderr.name = "stderr consumer";
415   create_pipe (&peer_stderr.stream, &test_stderr);
416 
417   if (es_set_nonblock (test_stdin, 1))
418     fail ("error setting test_stdin to nonblock: %s\n", strerror (errno));
419   if (es_set_nonblock (test_stdout, 1))
420     fail ("error setting test_stdout to nonblock: %s\n", strerror (errno));
421   if (es_set_nonblock (test_stderr, 1))
422     fail ("error setting test_stderr to nonblock: %s\n", strerror (errno));
423 
424   launch_thread (producer_thread, &peer_stdin );
425   launch_thread (consumer_thread, &peer_stdout);
426   launch_thread (consumer_thread, &peer_stderr);
427   test_poll ();
428   show ("Waiting for threads to terminate...\n");
429   es_fclose (test_stdin);
430   es_fclose (test_stdout);
431   es_fclose (test_stderr);
432   peer_stdin.stop_me = 1;
433   peer_stdout.stop_me = 1;
434   peer_stderr.stop_me = 1;
435   join_thread (&peer_stdin);
436   join_thread (&peer_stdout);
437   join_thread (&peer_stderr);
438 
439   return errorcount ? 1 : 0;
440 }
441