1 /*
2 Unix SMB/CIFS implementation.
3
4 testing of the events subsystem
5
6 Copyright (C) Stefan Metzmacher 2006-2009
7 Copyright (C) Jeremy Allison 2013
8
9 ** NOTE! The following LGPL license applies to the tevent
10 ** library. This does NOT imply that all of Samba is released
11 ** under the LGPL
12
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 3 of the License, or (at your option) any later version.
17
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
22
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #define TEVENT_DEPRECATED 1
29 #include "tevent.h"
30 #include "system/filesys.h"
31 #include "system/select.h"
32 #include "system/network.h"
33 #include "torture/torture.h"
34 #include "torture/local/proto.h"
35 #ifdef HAVE_PTHREAD
36 #include "system/threads.h"
37 #include <assert.h>
38 #endif
39
40 static int fde_count;
41
do_read(int fd,void * buf,size_t count)42 static void do_read(int fd, void *buf, size_t count)
43 {
44 ssize_t ret;
45
46 do {
47 ret = read(fd, buf, count);
48 } while (ret == -1 && errno == EINTR);
49 }
50
fde_handler_read(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)51 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
52 uint16_t flags, void *private_data)
53 {
54 int *fd = (int *)private_data;
55 char c;
56 #ifdef SA_SIGINFO
57 kill(getpid(), SIGUSR1);
58 #endif
59 kill(getpid(), SIGALRM);
60
61 do_read(fd[0], &c, 1);
62 fde_count++;
63 }
64
do_write(int fd,void * buf,size_t count)65 static void do_write(int fd, void *buf, size_t count)
66 {
67 ssize_t ret;
68
69 do {
70 ret = write(fd, buf, count);
71 } while (ret == -1 && errno == EINTR);
72 }
73
fde_handler_write(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)74 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
75 uint16_t flags, void *private_data)
76 {
77 int *fd = (int *)private_data;
78 char c = 0;
79
80 do_write(fd[1], &c, 1);
81 }
82
83
84 /* This will only fire if the fd's returned from pipe() are bi-directional. */
fde_handler_read_1(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)85 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
86 uint16_t flags, void *private_data)
87 {
88 int *fd = (int *)private_data;
89 char c;
90 #ifdef SA_SIGINFO
91 kill(getpid(), SIGUSR1);
92 #endif
93 kill(getpid(), SIGALRM);
94
95 do_read(fd[1], &c, 1);
96 fde_count++;
97 }
98
99 /* This will only fire if the fd's returned from pipe() are bi-directional. */
fde_handler_write_1(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)100 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
101 uint16_t flags, void *private_data)
102 {
103 int *fd = (int *)private_data;
104 char c = 0;
105 do_write(fd[0], &c, 1);
106 }
107
finished_handler(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)108 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
109 struct timeval tval, void *private_data)
110 {
111 int *finished = (int *)private_data;
112 (*finished) = 1;
113 }
114
count_handler(struct tevent_context * ev_ctx,struct tevent_signal * te,int signum,int count,void * info,void * private_data)115 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
116 int signum, int count, void *info, void *private_data)
117 {
118 int *countp = (int *)private_data;
119 (*countp) += count;
120 }
121
test_event_context(struct torture_context * test,const void * test_data)122 static bool test_event_context(struct torture_context *test,
123 const void *test_data)
124 {
125 struct tevent_context *ev_ctx;
126 int fd[2] = { -1, -1 };
127 const char *backend = (const char *)test_data;
128 int alarm_count=0, info_count=0;
129 struct tevent_fd *fde_read;
130 struct tevent_fd *fde_read_1;
131 struct tevent_fd *fde_write;
132 struct tevent_fd *fde_write_1;
133 #ifdef SA_RESTART
134 struct tevent_signal *se1 = NULL;
135 #endif
136 #ifdef SA_RESETHAND
137 struct tevent_signal *se2 = NULL;
138 #endif
139 #ifdef SA_SIGINFO
140 struct tevent_signal *se3 = NULL;
141 #endif
142 int finished=0;
143 struct timeval t;
144 int ret;
145
146 ev_ctx = tevent_context_init_byname(test, backend);
147 if (ev_ctx == NULL) {
148 torture_comment(test, "event backend '%s' not supported\n", backend);
149 return true;
150 }
151
152 torture_comment(test, "backend '%s' - %s\n",
153 backend, __FUNCTION__);
154
155 /* reset globals */
156 fde_count = 0;
157
158 /* create a pipe */
159 ret = pipe(fd);
160 torture_assert_int_equal(test, ret, 0, "pipe failed");
161
162 fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
163 fde_handler_read, fd);
164 fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
165 fde_handler_write_1, fd);
166
167 fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
168 fde_handler_write, fd);
169 fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
170 fde_handler_read_1, fd);
171
172 tevent_fd_set_auto_close(fde_read);
173 tevent_fd_set_auto_close(fde_write);
174
175 tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
176 finished_handler, &finished);
177
178 #ifdef SA_RESTART
179 se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
180 torture_assert(test, se1 != NULL, "failed to setup se1");
181 #endif
182 #ifdef SA_RESETHAND
183 se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
184 torture_assert(test, se2 != NULL, "failed to setup se2");
185 #endif
186 #ifdef SA_SIGINFO
187 se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
188 torture_assert(test, se3 != NULL, "failed to setup se3");
189 #endif
190
191 t = timeval_current();
192 while (!finished) {
193 errno = 0;
194 if (tevent_loop_once(ev_ctx) == -1) {
195 TALLOC_FREE(ev_ctx);
196 torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
197 return false;
198 }
199 }
200
201 talloc_free(fde_read_1);
202 talloc_free(fde_write_1);
203 talloc_free(fde_read);
204 talloc_free(fde_write);
205
206 while (alarm_count < fde_count+1) {
207 if (tevent_loop_once(ev_ctx) == -1) {
208 break;
209 }
210 }
211
212 torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
213
214 #ifdef SA_RESTART
215 talloc_free(se1);
216 #endif
217
218 torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
219
220 #ifdef SA_RESETHAND
221 /*
222 * we do not call talloc_free(se2)
223 * because it is already gone,
224 * after triggering the event handler.
225 */
226 #endif
227
228 #ifdef SA_SIGINFO
229 talloc_free(se3);
230 torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
231 #endif
232
233 talloc_free(ev_ctx);
234
235 return true;
236 }
237
238 struct test_event_fd1_state {
239 struct torture_context *tctx;
240 const char *backend;
241 struct tevent_context *ev;
242 int sock[2];
243 struct tevent_timer *te;
244 struct tevent_fd *fde0;
245 struct tevent_fd *fde1;
246 bool got_write;
247 bool got_read;
248 bool drain;
249 bool drain_done;
250 unsigned loop_count;
251 bool finished;
252 const char *error;
253 };
254
test_event_fd1_fde_handler(struct tevent_context * ev_ctx,struct tevent_fd * fde,uint16_t flags,void * private_data)255 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
256 struct tevent_fd *fde,
257 uint16_t flags,
258 void *private_data)
259 {
260 struct test_event_fd1_state *state =
261 (struct test_event_fd1_state *)private_data;
262
263 if (state->drain_done) {
264 state->finished = true;
265 state->error = __location__;
266 return;
267 }
268
269 if (state->drain) {
270 ssize_t ret;
271 uint8_t c = 0;
272
273 if (!(flags & TEVENT_FD_READ)) {
274 state->finished = true;
275 state->error = __location__;
276 return;
277 }
278
279 ret = read(state->sock[0], &c, 1);
280 if (ret == 1) {
281 return;
282 }
283
284 /*
285 * end of test...
286 */
287 tevent_fd_set_flags(fde, 0);
288 state->drain_done = true;
289 return;
290 }
291
292 if (!state->got_write) {
293 uint8_t c = 0;
294
295 if (flags != TEVENT_FD_WRITE) {
296 state->finished = true;
297 state->error = __location__;
298 return;
299 }
300 state->got_write = true;
301
302 /*
303 * we write to the other socket...
304 */
305 do_write(state->sock[1], &c, 1);
306 TEVENT_FD_NOT_WRITEABLE(fde);
307 TEVENT_FD_READABLE(fde);
308 return;
309 }
310
311 if (!state->got_read) {
312 if (flags != TEVENT_FD_READ) {
313 state->finished = true;
314 state->error = __location__;
315 return;
316 }
317 state->got_read = true;
318
319 TEVENT_FD_NOT_READABLE(fde);
320 return;
321 }
322
323 state->finished = true;
324 state->error = __location__;
325 return;
326 }
327
test_event_fd1_finished(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)328 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
329 struct tevent_timer *te,
330 struct timeval tval,
331 void *private_data)
332 {
333 struct test_event_fd1_state *state =
334 (struct test_event_fd1_state *)private_data;
335
336 if (state->drain_done) {
337 state->finished = true;
338 return;
339 }
340
341 if (!state->got_write) {
342 state->finished = true;
343 state->error = __location__;
344 return;
345 }
346
347 if (!state->got_read) {
348 state->finished = true;
349 state->error = __location__;
350 return;
351 }
352
353 state->loop_count++;
354 if (state->loop_count > 3) {
355 state->finished = true;
356 state->error = __location__;
357 return;
358 }
359
360 state->got_write = false;
361 state->got_read = false;
362
363 tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
364
365 if (state->loop_count > 2) {
366 state->drain = true;
367 TALLOC_FREE(state->fde1);
368 TEVENT_FD_READABLE(state->fde0);
369 }
370
371 state->te = tevent_add_timer(state->ev, state->ev,
372 timeval_current_ofs(0,2000),
373 test_event_fd1_finished, state);
374 }
375
test_event_fd1(struct torture_context * tctx,const void * test_data)376 static bool test_event_fd1(struct torture_context *tctx,
377 const void *test_data)
378 {
379 struct test_event_fd1_state state;
380 int ret;
381
382 ZERO_STRUCT(state);
383 state.tctx = tctx;
384 state.backend = (const char *)test_data;
385
386 state.ev = tevent_context_init_byname(tctx, state.backend);
387 if (state.ev == NULL) {
388 torture_skip(tctx, talloc_asprintf(tctx,
389 "event backend '%s' not supported\n",
390 state.backend));
391 return true;
392 }
393
394 tevent_set_debug_stderr(state.ev);
395 torture_comment(tctx, "backend '%s' - %s\n",
396 state.backend, __FUNCTION__);
397
398 /*
399 * This tests the following:
400 *
401 * It monitors the state of state.sock[0]
402 * with tevent_fd, but we never read/write on state.sock[0]
403 * while state.sock[1] * is only used to write a few bytes.
404 *
405 * We have a loop:
406 * - we wait only for TEVENT_FD_WRITE on state.sock[0]
407 * - we write 1 byte to state.sock[1]
408 * - we wait only for TEVENT_FD_READ on state.sock[0]
409 * - we disable events on state.sock[0]
410 * - the timer event restarts the loop
411 * Then we close state.sock[1]
412 * We have a loop:
413 * - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
414 * - we try to read 1 byte
415 * - if the read gets an error of returns 0
416 * we disable the event handler
417 * - the timer finishes the test
418 */
419 state.sock[0] = -1;
420 state.sock[1] = -1;
421
422 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
423 torture_assert(tctx, ret == 0, "socketpair() failed");
424
425 state.te = tevent_add_timer(state.ev, state.ev,
426 timeval_current_ofs(0,1000),
427 test_event_fd1_finished, &state);
428 state.fde0 = tevent_add_fd(state.ev, state.ev,
429 state.sock[0], TEVENT_FD_WRITE,
430 test_event_fd1_fde_handler, &state);
431 /* state.fde1 is only used to auto close */
432 state.fde1 = tevent_add_fd(state.ev, state.ev,
433 state.sock[1], 0,
434 test_event_fd1_fde_handler, &state);
435
436 tevent_fd_set_auto_close(state.fde0);
437 tevent_fd_set_auto_close(state.fde1);
438
439 while (!state.finished) {
440 errno = 0;
441 if (tevent_loop_once(state.ev) == -1) {
442 talloc_free(state.ev);
443 torture_fail(tctx, talloc_asprintf(tctx,
444 "Failed event loop %s\n",
445 strerror(errno)));
446 }
447 }
448
449 talloc_free(state.ev);
450
451 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
452 "%s", state.error));
453
454 return true;
455 }
456
457 struct test_event_fd2_state {
458 struct torture_context *tctx;
459 const char *backend;
460 struct tevent_context *ev;
461 struct tevent_timer *te;
462 struct test_event_fd2_sock {
463 struct test_event_fd2_state *state;
464 int fd;
465 struct tevent_fd *fde;
466 size_t num_written;
467 size_t num_read;
468 bool got_full;
469 } sock0, sock1;
470 bool finished;
471 const char *error;
472 };
473
test_event_fd2_sock_handler(struct tevent_context * ev_ctx,struct tevent_fd * fde,uint16_t flags,void * private_data)474 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
475 struct tevent_fd *fde,
476 uint16_t flags,
477 void *private_data)
478 {
479 struct test_event_fd2_sock *cur_sock =
480 (struct test_event_fd2_sock *)private_data;
481 struct test_event_fd2_state *state = cur_sock->state;
482 struct test_event_fd2_sock *oth_sock = NULL;
483 uint8_t v = 0, c;
484 ssize_t ret;
485
486 if (cur_sock == &state->sock0) {
487 oth_sock = &state->sock1;
488 } else {
489 oth_sock = &state->sock0;
490 }
491
492 if (oth_sock->num_written == 1) {
493 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
494 state->finished = true;
495 state->error = __location__;
496 return;
497 }
498 }
499
500 if (cur_sock->num_read == oth_sock->num_written) {
501 state->finished = true;
502 state->error = __location__;
503 return;
504 }
505
506 if (!(flags & TEVENT_FD_READ)) {
507 state->finished = true;
508 state->error = __location__;
509 return;
510 }
511
512 if (oth_sock->num_read >= PIPE_BUF) {
513 /*
514 * On Linux we become writable once we've read
515 * one byte. On Solaris we only become writable
516 * again once we've read 4096 bytes. PIPE_BUF
517 * is probably a safe bet to test against.
518 *
519 * There should be room to write a byte again
520 */
521 if (!(flags & TEVENT_FD_WRITE)) {
522 state->finished = true;
523 state->error = __location__;
524 return;
525 }
526 }
527
528 if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
529 v = (uint8_t)cur_sock->num_written;
530 ret = write(cur_sock->fd, &v, 1);
531 if (ret != 1) {
532 state->finished = true;
533 state->error = __location__;
534 return;
535 }
536 cur_sock->num_written++;
537 if (cur_sock->num_written > 0x80000000) {
538 state->finished = true;
539 state->error = __location__;
540 return;
541 }
542 return;
543 }
544
545 if (!cur_sock->got_full) {
546 cur_sock->got_full = true;
547
548 if (!oth_sock->got_full) {
549 /*
550 * cur_sock is full,
551 * lets wait for oth_sock
552 * to be filled
553 */
554 tevent_fd_set_flags(cur_sock->fde, 0);
555 return;
556 }
557
558 /*
559 * oth_sock waited for cur_sock,
560 * lets restart it
561 */
562 tevent_fd_set_flags(oth_sock->fde,
563 TEVENT_FD_READ|TEVENT_FD_WRITE);
564 }
565
566 ret = read(cur_sock->fd, &v, 1);
567 if (ret != 1) {
568 state->finished = true;
569 state->error = __location__;
570 return;
571 }
572 c = (uint8_t)cur_sock->num_read;
573 if (c != v) {
574 state->finished = true;
575 state->error = __location__;
576 return;
577 }
578 cur_sock->num_read++;
579
580 if (cur_sock->num_read < oth_sock->num_written) {
581 /* there is more to read */
582 return;
583 }
584 /*
585 * we read everything, we need to remove TEVENT_FD_WRITE
586 * to avoid spinning
587 */
588 TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
589
590 if (oth_sock->num_read == cur_sock->num_written) {
591 /*
592 * both directions are finished
593 */
594 state->finished = true;
595 }
596
597 return;
598 }
599
test_event_fd2_finished(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)600 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
601 struct tevent_timer *te,
602 struct timeval tval,
603 void *private_data)
604 {
605 struct test_event_fd2_state *state =
606 (struct test_event_fd2_state *)private_data;
607
608 /*
609 * this should never be triggered
610 */
611 state->finished = true;
612 state->error = __location__;
613 }
614
test_event_fd2(struct torture_context * tctx,const void * test_data)615 static bool test_event_fd2(struct torture_context *tctx,
616 const void *test_data)
617 {
618 struct test_event_fd2_state state;
619 int sock[2];
620 uint8_t c = 0;
621
622 ZERO_STRUCT(state);
623 state.tctx = tctx;
624 state.backend = (const char *)test_data;
625
626 state.ev = tevent_context_init_byname(tctx, state.backend);
627 if (state.ev == NULL) {
628 torture_skip(tctx, talloc_asprintf(tctx,
629 "event backend '%s' not supported\n",
630 state.backend));
631 return true;
632 }
633
634 tevent_set_debug_stderr(state.ev);
635 torture_comment(tctx, "backend '%s' - %s\n",
636 state.backend, __FUNCTION__);
637
638 /*
639 * This tests the following
640 *
641 * - We write 1 byte to each socket
642 * - We wait for TEVENT_FD_READ/WRITE on both sockets
643 * - When we get TEVENT_FD_WRITE we write 1 byte
644 * until both socket buffers are full, which
645 * means both sockets only get TEVENT_FD_READ.
646 * - Then we read 1 byte until we have consumed
647 * all bytes the other end has written.
648 */
649 sock[0] = -1;
650 sock[1] = -1;
651 socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
652
653 /*
654 * the timer should never expire
655 */
656 state.te = tevent_add_timer(state.ev, state.ev,
657 timeval_current_ofs(600, 0),
658 test_event_fd2_finished, &state);
659 state.sock0.state = &state;
660 state.sock0.fd = sock[0];
661 state.sock0.fde = tevent_add_fd(state.ev, state.ev,
662 state.sock0.fd,
663 TEVENT_FD_READ | TEVENT_FD_WRITE,
664 test_event_fd2_sock_handler,
665 &state.sock0);
666 state.sock1.state = &state;
667 state.sock1.fd = sock[1];
668 state.sock1.fde = tevent_add_fd(state.ev, state.ev,
669 state.sock1.fd,
670 TEVENT_FD_READ | TEVENT_FD_WRITE,
671 test_event_fd2_sock_handler,
672 &state.sock1);
673
674 tevent_fd_set_auto_close(state.sock0.fde);
675 tevent_fd_set_auto_close(state.sock1.fde);
676
677 do_write(state.sock0.fd, &c, 1);
678 state.sock0.num_written++;
679 do_write(state.sock1.fd, &c, 1);
680 state.sock1.num_written++;
681
682 while (!state.finished) {
683 errno = 0;
684 if (tevent_loop_once(state.ev) == -1) {
685 talloc_free(state.ev);
686 torture_fail(tctx, talloc_asprintf(tctx,
687 "Failed event loop %s\n",
688 strerror(errno)));
689 }
690 }
691
692 talloc_free(state.ev);
693
694 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
695 "%s", state.error));
696
697 return true;
698 }
699
700 struct test_wrapper_state {
701 struct torture_context *tctx;
702 int num_events;
703 int num_wrap_handlers;
704 };
705
test_wrapper_before_use(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,const char * location)706 static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
707 void *private_data,
708 struct tevent_context *main_ev,
709 const char *location)
710 {
711 struct test_wrapper_state *state =
712 talloc_get_type_abort(private_data,
713 struct test_wrapper_state);
714
715 torture_comment(state->tctx, "%s\n", __func__);
716 state->num_wrap_handlers++;
717 return true;
718 }
719
test_wrapper_after_use(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,const char * location)720 static void test_wrapper_after_use(struct tevent_context *wrap_ev,
721 void *private_data,
722 struct tevent_context *main_ev,
723 const char *location)
724 {
725 struct test_wrapper_state *state =
726 talloc_get_type_abort(private_data,
727 struct test_wrapper_state);
728
729 torture_comment(state->tctx, "%s\n", __func__);
730 state->num_wrap_handlers++;
731 }
732
test_wrapper_before_fd_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_fd * fde,uint16_t flags,const char * handler_name,const char * location)733 static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
734 void *private_data,
735 struct tevent_context *main_ev,
736 struct tevent_fd *fde,
737 uint16_t flags,
738 const char *handler_name,
739 const char *location)
740 {
741 struct test_wrapper_state *state =
742 talloc_get_type_abort(private_data,
743 struct test_wrapper_state);
744
745 torture_comment(state->tctx, "%s\n", __func__);
746 state->num_wrap_handlers++;
747 }
748
test_wrapper_after_fd_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_fd * fde,uint16_t flags,const char * handler_name,const char * location)749 static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
750 void *private_data,
751 struct tevent_context *main_ev,
752 struct tevent_fd *fde,
753 uint16_t flags,
754 const char *handler_name,
755 const char *location)
756 {
757 struct test_wrapper_state *state =
758 talloc_get_type_abort(private_data,
759 struct test_wrapper_state);
760
761 torture_comment(state->tctx, "%s\n", __func__);
762 state->num_wrap_handlers++;
763 }
764
test_wrapper_before_timer_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_timer * te,struct timeval requested_time,struct timeval trigger_time,const char * handler_name,const char * location)765 static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
766 void *private_data,
767 struct tevent_context *main_ev,
768 struct tevent_timer *te,
769 struct timeval requested_time,
770 struct timeval trigger_time,
771 const char *handler_name,
772 const char *location)
773 {
774 struct test_wrapper_state *state =
775 talloc_get_type_abort(private_data,
776 struct test_wrapper_state);
777
778 torture_comment(state->tctx, "%s\n", __func__);
779 state->num_wrap_handlers++;
780 }
781
test_wrapper_after_timer_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_timer * te,struct timeval requested_time,struct timeval trigger_time,const char * handler_name,const char * location)782 static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
783 void *private_data,
784 struct tevent_context *main_ev,
785 struct tevent_timer *te,
786 struct timeval requested_time,
787 struct timeval trigger_time,
788 const char *handler_name,
789 const char *location)
790 {
791 struct test_wrapper_state *state =
792 talloc_get_type_abort(private_data,
793 struct test_wrapper_state);
794
795 torture_comment(state->tctx, "%s\n", __func__);
796 state->num_wrap_handlers++;
797 }
798
test_wrapper_before_immediate_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_immediate * im,const char * handler_name,const char * location)799 static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
800 void *private_data,
801 struct tevent_context *main_ev,
802 struct tevent_immediate *im,
803 const char *handler_name,
804 const char *location)
805 {
806 struct test_wrapper_state *state =
807 talloc_get_type_abort(private_data,
808 struct test_wrapper_state);
809
810 torture_comment(state->tctx, "%s\n", __func__);
811 state->num_wrap_handlers++;
812 }
813
test_wrapper_after_immediate_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_immediate * im,const char * handler_name,const char * location)814 static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
815 void *private_data,
816 struct tevent_context *main_ev,
817 struct tevent_immediate *im,
818 const char *handler_name,
819 const char *location)
820 {
821 struct test_wrapper_state *state =
822 talloc_get_type_abort(private_data,
823 struct test_wrapper_state);
824
825 torture_comment(state->tctx, "%s\n", __func__);
826 state->num_wrap_handlers++;
827 }
828
test_wrapper_before_signal_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_signal * se,int signum,int count,void * siginfo,const char * handler_name,const char * location)829 static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
830 void *private_data,
831 struct tevent_context *main_ev,
832 struct tevent_signal *se,
833 int signum,
834 int count,
835 void *siginfo,
836 const char *handler_name,
837 const char *location)
838 {
839 struct test_wrapper_state *state =
840 talloc_get_type_abort(private_data,
841 struct test_wrapper_state);
842
843 torture_comment(state->tctx, "%s\n", __func__);
844 state->num_wrap_handlers++;
845 }
846
test_wrapper_after_signal_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_signal * se,int signum,int count,void * siginfo,const char * handler_name,const char * location)847 static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
848 void *private_data,
849 struct tevent_context *main_ev,
850 struct tevent_signal *se,
851 int signum,
852 int count,
853 void *siginfo,
854 const char *handler_name,
855 const char *location)
856 {
857 struct test_wrapper_state *state =
858 talloc_get_type_abort(private_data,
859 struct test_wrapper_state);
860
861 torture_comment(state->tctx, "%s\n", __func__);
862 state->num_wrap_handlers++;
863 }
864
865 static const struct tevent_wrapper_ops test_wrapper_ops = {
866 .name = "test_wrapper",
867 .before_use = test_wrapper_before_use,
868 .after_use = test_wrapper_after_use,
869 .before_fd_handler = test_wrapper_before_fd_handler,
870 .after_fd_handler = test_wrapper_after_fd_handler,
871 .before_timer_handler = test_wrapper_before_timer_handler,
872 .after_timer_handler = test_wrapper_after_timer_handler,
873 .before_immediate_handler = test_wrapper_before_immediate_handler,
874 .after_immediate_handler = test_wrapper_after_immediate_handler,
875 .before_signal_handler = test_wrapper_before_signal_handler,
876 .after_signal_handler = test_wrapper_after_signal_handler,
877 };
878
test_wrapper_timer_handler(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * private_data)879 static void test_wrapper_timer_handler(struct tevent_context *ev,
880 struct tevent_timer *te,
881 struct timeval tv,
882 void *private_data)
883 {
884 struct test_wrapper_state *state =
885 (struct test_wrapper_state *)private_data;
886
887
888 torture_comment(state->tctx, "timer handler\n");
889
890 state->num_events++;
891 talloc_free(te);
892 return;
893 }
894
test_wrapper_fd_handler(struct tevent_context * ev,struct tevent_fd * fde,unsigned short fd_flags,void * private_data)895 static void test_wrapper_fd_handler(struct tevent_context *ev,
896 struct tevent_fd *fde,
897 unsigned short fd_flags,
898 void *private_data)
899 {
900 struct test_wrapper_state *state =
901 (struct test_wrapper_state *)private_data;
902
903 torture_comment(state->tctx, "fd handler\n");
904
905 state->num_events++;
906 talloc_free(fde);
907 return;
908 }
909
test_wrapper_immediate_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)910 static void test_wrapper_immediate_handler(struct tevent_context *ev,
911 struct tevent_immediate *im,
912 void *private_data)
913 {
914 struct test_wrapper_state *state =
915 (struct test_wrapper_state *)private_data;
916
917 state->num_events++;
918 talloc_free(im);
919
920 torture_comment(state->tctx, "immediate handler\n");
921 return;
922 }
923
test_wrapper_signal_handler(struct tevent_context * ev,struct tevent_signal * se,int signum,int count,void * siginfo,void * private_data)924 static void test_wrapper_signal_handler(struct tevent_context *ev,
925 struct tevent_signal *se,
926 int signum,
927 int count,
928 void *siginfo,
929 void *private_data)
930 {
931 struct test_wrapper_state *state =
932 (struct test_wrapper_state *)private_data;
933
934 torture_comment(state->tctx, "signal handler\n");
935
936 state->num_events++;
937 talloc_free(se);
938 return;
939 }
940
test_wrapper(struct torture_context * tctx,const void * test_data)941 static bool test_wrapper(struct torture_context *tctx,
942 const void *test_data)
943 {
944 struct test_wrapper_state *state = NULL;
945 int sock[2] = { -1, -1};
946 uint8_t c = 0;
947 const int num_events = 4;
948 const char *backend = (const char *)test_data;
949 struct tevent_context *ev = NULL;
950 struct tevent_context *wrap_ev = NULL;
951 struct tevent_fd *fde = NULL;
952 struct tevent_timer *te = NULL;
953 struct tevent_signal *se = NULL;
954 struct tevent_immediate *im = NULL;
955 int ret;
956 bool ok = false;
957 bool ret2;
958
959 ev = tevent_context_init_byname(tctx, backend);
960 if (ev == NULL) {
961 torture_skip(tctx, talloc_asprintf(tctx,
962 "event backend '%s' not supported\n",
963 backend));
964 return true;
965 }
966
967 tevent_set_debug_stderr(ev);
968 torture_comment(tctx, "tevent backend '%s'\n", backend);
969
970 wrap_ev = tevent_context_wrapper_create(
971 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
972 torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
973 "tevent_context_wrapper_create failed\n");
974 *state = (struct test_wrapper_state) {
975 .tctx = tctx,
976 };
977
978 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
979 torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
980
981 te = tevent_add_timer(wrap_ev, wrap_ev,
982 timeval_current_ofs(0, 0),
983 test_wrapper_timer_handler, state);
984 torture_assert_not_null_goto(tctx, te, ok, done,
985 "tevent_add_timer failed\n");
986
987 fde = tevent_add_fd(wrap_ev, wrap_ev,
988 sock[1],
989 TEVENT_FD_READ,
990 test_wrapper_fd_handler,
991 state);
992 torture_assert_not_null_goto(tctx, fde, ok, done,
993 "tevent_add_fd failed\n");
994
995 im = tevent_create_immediate(wrap_ev);
996 torture_assert_not_null_goto(tctx, im, ok, done,
997 "tevent_create_immediate failed\n");
998
999 se = tevent_add_signal(wrap_ev, wrap_ev,
1000 SIGUSR1,
1001 0,
1002 test_wrapper_signal_handler,
1003 state);
1004 torture_assert_not_null_goto(tctx, se, ok, done,
1005 "tevent_add_signal failed\n");
1006
1007 do_write(sock[0], &c, 1);
1008 kill(getpid(), SIGUSR1);
1009 tevent_schedule_immediate(im,
1010 wrap_ev,
1011 test_wrapper_immediate_handler,
1012 state);
1013
1014 ret2 = tevent_context_push_use(wrap_ev);
1015 torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1016 ret2 = tevent_context_push_use(ev);
1017 torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1018 tevent_context_pop_use(ev);
1019 tevent_context_pop_use(wrap_ev);
1020
1021 ret = tevent_loop_wait(ev);
1022 torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1023
1024 torture_comment(tctx, "Num events: %d\n", state->num_events);
1025 torture_comment(tctx, "Num wrap handlers: %d\n",
1026 state->num_wrap_handlers);
1027
1028 torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1029 "Wrong event count\n");
1030 torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1031 num_events*2+2,
1032 ok, done, "Wrong wrapper count\n");
1033
1034 ok = true;
1035
1036 done:
1037 TALLOC_FREE(wrap_ev);
1038 TALLOC_FREE(ev);
1039
1040 if (sock[0] != -1) {
1041 close(sock[0]);
1042 }
1043 if (sock[1] != -1) {
1044 close(sock[1]);
1045 }
1046 return ok;
1047 pop_use:
1048 tevent_context_pop_use(wrap_ev);
1049 goto done;
1050 }
1051
test_free_wrapper_signal_handler(struct tevent_context * ev,struct tevent_signal * se,int signum,int count,void * siginfo,void * private_data)1052 static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1053 struct tevent_signal *se,
1054 int signum,
1055 int count,
1056 void *siginfo,
1057 void *private_data)
1058 {
1059 struct torture_context *tctx =
1060 talloc_get_type_abort(private_data,
1061 struct torture_context);
1062
1063 torture_comment(tctx, "signal handler\n");
1064
1065 talloc_free(se);
1066
1067 /*
1068 * signal handlers have highest priority in tevent, so this signal
1069 * handler will always be started before the other handlers
1070 * below. Freeing the (wrapper) event context here tests that the
1071 * wrapper implementation correclty handles the wrapper ev going away
1072 * with pending events.
1073 */
1074 talloc_free(ev);
1075 return;
1076 }
1077
test_free_wrapper_fd_handler(struct tevent_context * ev,struct tevent_fd * fde,unsigned short fd_flags,void * private_data)1078 static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1079 struct tevent_fd *fde,
1080 unsigned short fd_flags,
1081 void *private_data)
1082 {
1083 /*
1084 * This should never be called as
1085 * test_free_wrapper_signal_handler()
1086 * already destroyed the wrapper tevent_context.
1087 */
1088 abort();
1089 }
1090
test_free_wrapper_immediate_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)1091 static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1092 struct tevent_immediate *im,
1093 void *private_data)
1094 {
1095 /*
1096 * This should never be called as
1097 * test_free_wrapper_signal_handler()
1098 * already destroyed the wrapper tevent_context.
1099 */
1100 abort();
1101 }
1102
test_free_wrapper_timer_handler(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * private_data)1103 static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1104 struct tevent_timer *te,
1105 struct timeval tv,
1106 void *private_data)
1107 {
1108 /*
1109 * This should never be called as
1110 * test_free_wrapper_signal_handler()
1111 * already destroyed the wrapper tevent_context.
1112 */
1113 abort();
1114 }
1115
test_free_wrapper(struct torture_context * tctx,const void * test_data)1116 static bool test_free_wrapper(struct torture_context *tctx,
1117 const void *test_data)
1118 {
1119 struct test_wrapper_state *state = NULL;
1120 int sock[2] = { -1, -1};
1121 uint8_t c = 0;
1122 const char *backend = (const char *)test_data;
1123 TALLOC_CTX *frame = talloc_stackframe();
1124 struct tevent_context *ev = NULL;
1125 struct tevent_context *wrap_ev = NULL;
1126 struct tevent_fd *fde = NULL;
1127 struct tevent_timer *te = NULL;
1128 struct tevent_signal *se = NULL;
1129 struct tevent_immediate *im = NULL;
1130 int ret;
1131 bool ok = false;
1132
1133 ev = tevent_context_init_byname(frame, backend);
1134 if (ev == NULL) {
1135 torture_skip(tctx, talloc_asprintf(tctx,
1136 "event backend '%s' not supported\n",
1137 backend));
1138 return true;
1139 }
1140
1141 tevent_set_debug_stderr(ev);
1142 torture_comment(tctx, "tevent backend '%s'\n", backend);
1143
1144 wrap_ev = tevent_context_wrapper_create(
1145 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
1146 torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1147 "tevent_context_wrapper_create failed\n");
1148 *state = (struct test_wrapper_state) {
1149 .tctx = tctx,
1150 };
1151
1152 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1153 torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1154
1155 fde = tevent_add_fd(wrap_ev, frame,
1156 sock[1],
1157 TEVENT_FD_READ,
1158 test_free_wrapper_fd_handler,
1159 NULL);
1160 torture_assert_not_null_goto(tctx, fde, ok, done,
1161 "tevent_add_fd failed\n");
1162
1163 te = tevent_add_timer(wrap_ev, frame,
1164 timeval_current_ofs(0, 0),
1165 test_free_wrapper_timer_handler, NULL);
1166 torture_assert_not_null_goto(tctx, te, ok, done,
1167 "tevent_add_timer failed\n");
1168
1169 im = tevent_create_immediate(frame);
1170 torture_assert_not_null_goto(tctx, im, ok, done,
1171 "tevent_create_immediate failed\n");
1172
1173 se = tevent_add_signal(wrap_ev, frame,
1174 SIGUSR1,
1175 0,
1176 test_free_wrapper_signal_handler,
1177 tctx);
1178 torture_assert_not_null_goto(tctx, se, ok, done,
1179 "tevent_add_signal failed\n");
1180
1181 do_write(sock[0], &c, 1);
1182 kill(getpid(), SIGUSR1);
1183 tevent_schedule_immediate(im,
1184 wrap_ev,
1185 test_free_wrapper_immediate_handler,
1186 NULL);
1187
1188 ret = tevent_loop_wait(ev);
1189 torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1190
1191 ok = true;
1192
1193 done:
1194 TALLOC_FREE(frame);
1195
1196 if (sock[0] != -1) {
1197 close(sock[0]);
1198 }
1199 if (sock[1] != -1) {
1200 close(sock[1]);
1201 }
1202 return ok;
1203 }
1204
1205 #ifdef HAVE_PTHREAD
1206
1207 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1208 static bool do_shutdown = false;
1209
test_event_threaded_lock(void)1210 static void test_event_threaded_lock(void)
1211 {
1212 int ret;
1213 ret = pthread_mutex_lock(&threaded_mutex);
1214 assert(ret == 0);
1215 }
1216
test_event_threaded_unlock(void)1217 static void test_event_threaded_unlock(void)
1218 {
1219 int ret;
1220 ret = pthread_mutex_unlock(&threaded_mutex);
1221 assert(ret == 0);
1222 }
1223
test_event_threaded_trace(enum tevent_trace_point point,void * private_data)1224 static void test_event_threaded_trace(enum tevent_trace_point point,
1225 void *private_data)
1226 {
1227 switch (point) {
1228 case TEVENT_TRACE_BEFORE_WAIT:
1229 test_event_threaded_unlock();
1230 break;
1231 case TEVENT_TRACE_AFTER_WAIT:
1232 test_event_threaded_lock();
1233 break;
1234 case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1235 case TEVENT_TRACE_AFTER_LOOP_ONCE:
1236 break;
1237 }
1238 }
1239
test_event_threaded_timer(struct tevent_context * ev,struct tevent_timer * te,struct timeval current_time,void * private_data)1240 static void test_event_threaded_timer(struct tevent_context *ev,
1241 struct tevent_timer *te,
1242 struct timeval current_time,
1243 void *private_data)
1244 {
1245 return;
1246 }
1247
test_event_poll_thread(void * private_data)1248 static void *test_event_poll_thread(void *private_data)
1249 {
1250 struct tevent_context *ev = (struct tevent_context *)private_data;
1251
1252 test_event_threaded_lock();
1253
1254 while (true) {
1255 int ret;
1256 ret = tevent_loop_once(ev);
1257 assert(ret == 0);
1258 if (do_shutdown) {
1259 test_event_threaded_unlock();
1260 return NULL;
1261 }
1262 }
1263
1264 }
1265
test_event_threaded_read_handler(struct tevent_context * ev,struct tevent_fd * fde,uint16_t flags,void * private_data)1266 static void test_event_threaded_read_handler(struct tevent_context *ev,
1267 struct tevent_fd *fde,
1268 uint16_t flags,
1269 void *private_data)
1270 {
1271 int *pfd = (int *)private_data;
1272 char c;
1273 ssize_t nread;
1274
1275 if ((flags & TEVENT_FD_READ) == 0) {
1276 return;
1277 }
1278
1279 do {
1280 nread = read(*pfd, &c, 1);
1281 } while ((nread == -1) && (errno == EINTR));
1282
1283 assert(nread == 1);
1284 }
1285
test_event_context_threaded(struct torture_context * test,const void * test_data)1286 static bool test_event_context_threaded(struct torture_context *test,
1287 const void *test_data)
1288 {
1289 struct tevent_context *ev;
1290 struct tevent_timer *te;
1291 struct tevent_fd *fde;
1292 pthread_t poll_thread;
1293 int fds[2];
1294 int ret;
1295 char c = 0;
1296
1297 ev = tevent_context_init_byname(test, "poll_mt");
1298 torture_assert(test, ev != NULL, "poll_mt not supported");
1299
1300 tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1301
1302 te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1303 test_event_threaded_timer, NULL);
1304 torture_assert(test, te != NULL, "Could not add timer");
1305
1306 ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1307 torture_assert(test, ret == 0, "Could not create poll thread");
1308
1309 ret = pipe(fds);
1310 torture_assert(test, ret == 0, "Could not create pipe");
1311
1312 poll(NULL, 0, 100);
1313
1314 test_event_threaded_lock();
1315
1316 fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1317 test_event_threaded_read_handler, &fds[0]);
1318 torture_assert(test, fde != NULL, "Could not add fd event");
1319
1320 test_event_threaded_unlock();
1321
1322 poll(NULL, 0, 100);
1323
1324 do_write(fds[1], &c, 1);
1325
1326 poll(NULL, 0, 100);
1327
1328 test_event_threaded_lock();
1329 do_shutdown = true;
1330 test_event_threaded_unlock();
1331
1332 do_write(fds[1], &c, 1);
1333
1334 ret = pthread_join(poll_thread, NULL);
1335 torture_assert(test, ret == 0, "pthread_join failed");
1336
1337 return true;
1338 }
1339
1340 #define NUM_TEVENT_THREADS 100
1341
1342 /* Ugly, but needed for torture_comment... */
1343 static struct torture_context *thread_test_ctx;
1344 static pthread_t thread_map[NUM_TEVENT_THREADS];
1345 static unsigned thread_counter;
1346
1347 /* Called in master thread context */
callback_nowait(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1348 static void callback_nowait(struct tevent_context *ev,
1349 struct tevent_immediate *im,
1350 void *private_ptr)
1351 {
1352 pthread_t *thread_id_ptr =
1353 talloc_get_type_abort(private_ptr, pthread_t);
1354 unsigned i;
1355
1356 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1357 if (pthread_equal(*thread_id_ptr,
1358 thread_map[i])) {
1359 break;
1360 }
1361 }
1362 torture_comment(thread_test_ctx,
1363 "Callback %u from thread %u\n",
1364 thread_counter,
1365 i);
1366 thread_counter++;
1367 }
1368
1369 /* Blast the master tevent_context with a callback, no waiting. */
thread_fn_nowait(void * private_ptr)1370 static void *thread_fn_nowait(void *private_ptr)
1371 {
1372 struct tevent_thread_proxy *master_tp =
1373 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1374 struct tevent_immediate *im;
1375 pthread_t *thread_id_ptr;
1376
1377 im = tevent_create_immediate(NULL);
1378 if (im == NULL) {
1379 return NULL;
1380 }
1381 thread_id_ptr = talloc(NULL, pthread_t);
1382 if (thread_id_ptr == NULL) {
1383 return NULL;
1384 }
1385 *thread_id_ptr = pthread_self();
1386
1387 tevent_thread_proxy_schedule(master_tp,
1388 &im,
1389 callback_nowait,
1390 &thread_id_ptr);
1391 return NULL;
1392 }
1393
timeout_fn(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * p)1394 static void timeout_fn(struct tevent_context *ev,
1395 struct tevent_timer *te,
1396 struct timeval tv, void *p)
1397 {
1398 thread_counter = NUM_TEVENT_THREADS * 10;
1399 }
1400
test_multi_tevent_threaded(struct torture_context * test,const void * test_data)1401 static bool test_multi_tevent_threaded(struct torture_context *test,
1402 const void *test_data)
1403 {
1404 unsigned i;
1405 struct tevent_context *master_ev;
1406 struct tevent_thread_proxy *tp;
1407
1408 talloc_disable_null_tracking();
1409
1410 /* Ugly global stuff. */
1411 thread_test_ctx = test;
1412 thread_counter = 0;
1413
1414 master_ev = tevent_context_init(NULL);
1415 if (master_ev == NULL) {
1416 return false;
1417 }
1418 tevent_set_debug_stderr(master_ev);
1419
1420 tp = tevent_thread_proxy_create(master_ev);
1421 if (tp == NULL) {
1422 torture_fail(test,
1423 talloc_asprintf(test,
1424 "tevent_thread_proxy_create failed\n"));
1425 talloc_free(master_ev);
1426 return false;
1427 }
1428
1429 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1430 int ret = pthread_create(&thread_map[i],
1431 NULL,
1432 thread_fn_nowait,
1433 tp);
1434 if (ret != 0) {
1435 torture_fail(test,
1436 talloc_asprintf(test,
1437 "Failed to create thread %i, %d\n",
1438 i, ret));
1439 return false;
1440 }
1441 }
1442
1443 /* Ensure we don't wait more than 10 seconds. */
1444 tevent_add_timer(master_ev,
1445 master_ev,
1446 timeval_current_ofs(10,0),
1447 timeout_fn,
1448 NULL);
1449
1450 while (thread_counter < NUM_TEVENT_THREADS) {
1451 int ret = tevent_loop_once(master_ev);
1452 torture_assert(test, ret == 0, "tevent_loop_once failed");
1453 }
1454
1455 torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1456 "thread_counter fail\n");
1457
1458 talloc_free(master_ev);
1459 return true;
1460 }
1461
1462 struct reply_state {
1463 struct tevent_thread_proxy *reply_tp;
1464 pthread_t thread_id;
1465 int *p_finished;
1466 };
1467
thread_timeout_fn(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * p)1468 static void thread_timeout_fn(struct tevent_context *ev,
1469 struct tevent_timer *te,
1470 struct timeval tv, void *p)
1471 {
1472 int *p_finished = (int *)p;
1473
1474 *p_finished = 2;
1475 }
1476
1477 /* Called in child-thread context */
thread_callback(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1478 static void thread_callback(struct tevent_context *ev,
1479 struct tevent_immediate *im,
1480 void *private_ptr)
1481 {
1482 struct reply_state *rsp =
1483 talloc_get_type_abort(private_ptr, struct reply_state);
1484
1485 talloc_steal(ev, rsp);
1486 *rsp->p_finished = 1;
1487 }
1488
1489 /* Called in master thread context */
master_callback(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1490 static void master_callback(struct tevent_context *ev,
1491 struct tevent_immediate *im,
1492 void *private_ptr)
1493 {
1494 struct reply_state *rsp =
1495 talloc_get_type_abort(private_ptr, struct reply_state);
1496 unsigned i;
1497
1498 talloc_steal(ev, rsp);
1499
1500 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1501 if (pthread_equal(rsp->thread_id,
1502 thread_map[i])) {
1503 break;
1504 }
1505 }
1506 torture_comment(thread_test_ctx,
1507 "Callback %u from thread %u\n",
1508 thread_counter,
1509 i);
1510 /* Now reply to the thread ! */
1511 tevent_thread_proxy_schedule(rsp->reply_tp,
1512 &im,
1513 thread_callback,
1514 &rsp);
1515
1516 thread_counter++;
1517 }
1518
thread_fn_1(void * private_ptr)1519 static void *thread_fn_1(void *private_ptr)
1520 {
1521 struct tevent_thread_proxy *master_tp =
1522 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1523 struct tevent_thread_proxy *tp;
1524 struct tevent_immediate *im;
1525 struct tevent_context *ev;
1526 struct reply_state *rsp;
1527 int finished = 0;
1528 int ret;
1529
1530 ev = tevent_context_init(NULL);
1531 if (ev == NULL) {
1532 return NULL;
1533 }
1534
1535 tp = tevent_thread_proxy_create(ev);
1536 if (tp == NULL) {
1537 talloc_free(ev);
1538 return NULL;
1539 }
1540
1541 im = tevent_create_immediate(ev);
1542 if (im == NULL) {
1543 talloc_free(ev);
1544 return NULL;
1545 }
1546
1547 rsp = talloc(ev, struct reply_state);
1548 if (rsp == NULL) {
1549 talloc_free(ev);
1550 return NULL;
1551 }
1552
1553 rsp->thread_id = pthread_self();
1554 rsp->reply_tp = tp;
1555 rsp->p_finished = &finished;
1556
1557 /* Introduce a little randomness into the mix.. */
1558 usleep(random() % 7000);
1559
1560 tevent_thread_proxy_schedule(master_tp,
1561 &im,
1562 master_callback,
1563 &rsp);
1564
1565 /* Ensure we don't wait more than 10 seconds. */
1566 tevent_add_timer(ev,
1567 ev,
1568 timeval_current_ofs(10,0),
1569 thread_timeout_fn,
1570 &finished);
1571
1572 while (finished == 0) {
1573 ret = tevent_loop_once(ev);
1574 assert(ret == 0);
1575 }
1576
1577 if (finished > 1) {
1578 /* Timeout ! */
1579 abort();
1580 }
1581
1582 /*
1583 * NB. We should talloc_free(ev) here, but if we do
1584 * we currently get hit by helgrind Fix #323432
1585 * "When calling pthread_cond_destroy or pthread_mutex_destroy
1586 * with initializers as argument Helgrind (incorrectly) reports errors."
1587 *
1588 * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1589 * with-pthread-mutex-destroy-td47757.html
1590 *
1591 * Helgrind doesn't understand that the request/reply
1592 * messages provide synchronization between the lock/unlock
1593 * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1594 * when the struct tevent_thread_proxy object is talloc_free'd.
1595 *
1596 * As a work-around for now return ev for the parent thread to free.
1597 */
1598 return ev;
1599 }
1600
test_multi_tevent_threaded_1(struct torture_context * test,const void * test_data)1601 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1602 const void *test_data)
1603 {
1604 unsigned i;
1605 struct tevent_context *master_ev;
1606 struct tevent_thread_proxy *master_tp;
1607 int ret;
1608
1609 talloc_disable_null_tracking();
1610
1611 /* Ugly global stuff. */
1612 thread_test_ctx = test;
1613 thread_counter = 0;
1614
1615 master_ev = tevent_context_init(NULL);
1616 if (master_ev == NULL) {
1617 return false;
1618 }
1619 tevent_set_debug_stderr(master_ev);
1620
1621 master_tp = tevent_thread_proxy_create(master_ev);
1622 if (master_tp == NULL) {
1623 torture_fail(test,
1624 talloc_asprintf(test,
1625 "tevent_thread_proxy_create failed\n"));
1626 talloc_free(master_ev);
1627 return false;
1628 }
1629
1630 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1631 ret = pthread_create(&thread_map[i],
1632 NULL,
1633 thread_fn_1,
1634 master_tp);
1635 if (ret != 0) {
1636 torture_fail(test,
1637 talloc_asprintf(test,
1638 "Failed to create thread %i, %d\n",
1639 i, ret));
1640 return false;
1641 }
1642 }
1643
1644 while (thread_counter < NUM_TEVENT_THREADS) {
1645 ret = tevent_loop_once(master_ev);
1646 torture_assert(test, ret == 0, "tevent_loop_once failed");
1647 }
1648
1649 /* Wait for all the threads to finish - join 'em. */
1650 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1651 void *retval;
1652 ret = pthread_join(thread_map[i], &retval);
1653 torture_assert(test, ret == 0, "pthread_join failed");
1654 /* Free the child thread event context. */
1655 talloc_free(retval);
1656 }
1657
1658 talloc_free(master_ev);
1659 return true;
1660 }
1661
1662 struct threaded_test_2 {
1663 struct tevent_threaded_context *tctx;
1664 struct tevent_immediate *im;
1665 pthread_t thread_id;
1666 };
1667
1668 static void master_callback_2(struct tevent_context *ev,
1669 struct tevent_immediate *im,
1670 void *private_data);
1671
thread_fn_2(void * private_data)1672 static void *thread_fn_2(void *private_data)
1673 {
1674 struct threaded_test_2 *state = private_data;
1675
1676 state->thread_id = pthread_self();
1677
1678 usleep(random() % 7000);
1679
1680 tevent_threaded_schedule_immediate(
1681 state->tctx, state->im, master_callback_2, state);
1682
1683 return NULL;
1684 }
1685
master_callback_2(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)1686 static void master_callback_2(struct tevent_context *ev,
1687 struct tevent_immediate *im,
1688 void *private_data)
1689 {
1690 struct threaded_test_2 *state = private_data;
1691 int i;
1692
1693 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1694 if (pthread_equal(state->thread_id, thread_map[i])) {
1695 break;
1696 }
1697 }
1698 torture_comment(thread_test_ctx,
1699 "Callback_2 %u from thread %u\n",
1700 thread_counter,
1701 i);
1702 thread_counter++;
1703 }
1704
test_multi_tevent_threaded_2(struct torture_context * test,const void * test_data)1705 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1706 const void *test_data)
1707 {
1708 unsigned i;
1709
1710 struct tevent_context *ev;
1711 struct tevent_threaded_context *tctx;
1712 int ret;
1713
1714 thread_test_ctx = test;
1715 thread_counter = 0;
1716
1717 ev = tevent_context_init(test);
1718 torture_assert(test, ev != NULL, "tevent_context_init failed");
1719
1720 /*
1721 * tevent_re_initialise used to have a bug where it did not
1722 * re-initialise the thread support after taking it
1723 * down. Exercise that code path.
1724 */
1725 ret = tevent_re_initialise(ev);
1726 torture_assert(test, ret == 0, "tevent_re_initialise failed");
1727
1728 tctx = tevent_threaded_context_create(ev, ev);
1729 torture_assert(test, tctx != NULL,
1730 "tevent_threaded_context_create failed");
1731
1732 for (i=0; i<NUM_TEVENT_THREADS; i++) {
1733 struct threaded_test_2 *state;
1734
1735 state = talloc(ev, struct threaded_test_2);
1736 torture_assert(test, state != NULL, "talloc failed");
1737
1738 state->tctx = tctx;
1739 state->im = tevent_create_immediate(state);
1740 torture_assert(test, state->im != NULL,
1741 "tevent_create_immediate failed");
1742
1743 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1744 torture_assert(test, ret == 0, "pthread_create failed");
1745 }
1746
1747 while (thread_counter < NUM_TEVENT_THREADS) {
1748 ret = tevent_loop_once(ev);
1749 torture_assert(test, ret == 0, "tevent_loop_once failed");
1750 }
1751
1752 /* Wait for all the threads to finish - join 'em. */
1753 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1754 void *retval;
1755 ret = pthread_join(thread_map[i], &retval);
1756 torture_assert(test, ret == 0, "pthread_join failed");
1757 /* Free the child thread event context. */
1758 }
1759
1760 talloc_free(tctx);
1761 talloc_free(ev);
1762 return true;
1763 }
1764 #endif
1765
torture_local_event(TALLOC_CTX * mem_ctx)1766 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1767 {
1768 struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1769 const char **list = tevent_backend_list(suite);
1770 int i;
1771
1772 for (i=0;list && list[i];i++) {
1773 struct torture_suite *backend_suite;
1774
1775 backend_suite = torture_suite_create(mem_ctx, list[i]);
1776
1777 torture_suite_add_simple_tcase_const(backend_suite,
1778 "context",
1779 test_event_context,
1780 (const void *)list[i]);
1781 torture_suite_add_simple_tcase_const(backend_suite,
1782 "fd1",
1783 test_event_fd1,
1784 (const void *)list[i]);
1785 torture_suite_add_simple_tcase_const(backend_suite,
1786 "fd2",
1787 test_event_fd2,
1788 (const void *)list[i]);
1789 torture_suite_add_simple_tcase_const(backend_suite,
1790 "wrapper",
1791 test_wrapper,
1792 (const void *)list[i]);
1793 torture_suite_add_simple_tcase_const(backend_suite,
1794 "free_wrapper",
1795 test_free_wrapper,
1796 (const void *)list[i]);
1797
1798 torture_suite_add_suite(suite, backend_suite);
1799 }
1800
1801 #ifdef HAVE_PTHREAD
1802 torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1803 test_event_context_threaded,
1804 NULL);
1805
1806 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1807 test_multi_tevent_threaded,
1808 NULL);
1809
1810 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1811 test_multi_tevent_threaded_1,
1812 NULL);
1813
1814 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1815 test_multi_tevent_threaded_2,
1816 NULL);
1817
1818 #endif
1819
1820 return suite;
1821 }
1822