1 /*
2    Unix SMB/CIFS implementation.
3 
4    testing of the events subsystem
5 
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8 
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12 
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17 
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22 
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26 
27 #include "includes.h"
28 #define TEVENT_DEPRECATED 1
29 #include "tevent.h"
30 #include "system/filesys.h"
31 #include "system/select.h"
32 #include "system/network.h"
33 #include "torture/torture.h"
34 #include "torture/local/proto.h"
35 #ifdef HAVE_PTHREAD
36 #include "system/threads.h"
37 #include <assert.h>
38 #endif
39 
40 static int fde_count;
41 
do_read(int fd,void * buf,size_t count)42 static void do_read(int fd, void *buf, size_t count)
43 {
44 	ssize_t ret;
45 
46 	do {
47 		ret = read(fd, buf, count);
48 	} while (ret == -1 && errno == EINTR);
49 }
50 
fde_handler_read(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)51 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
52 			uint16_t flags, void *private_data)
53 {
54 	int *fd = (int *)private_data;
55 	char c;
56 #ifdef SA_SIGINFO
57 	kill(getpid(), SIGUSR1);
58 #endif
59 	kill(getpid(), SIGALRM);
60 
61 	do_read(fd[0], &c, 1);
62 	fde_count++;
63 }
64 
do_write(int fd,void * buf,size_t count)65 static void do_write(int fd, void *buf, size_t count)
66 {
67 	ssize_t ret;
68 
69 	do {
70 		ret = write(fd, buf, count);
71 	} while (ret == -1 && errno == EINTR);
72 }
73 
fde_handler_write(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)74 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
75 			uint16_t flags, void *private_data)
76 {
77 	int *fd = (int *)private_data;
78 	char c = 0;
79 
80 	do_write(fd[1], &c, 1);
81 }
82 
83 
84 /* This will only fire if the fd's returned from pipe() are bi-directional. */
fde_handler_read_1(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)85 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
86 			uint16_t flags, void *private_data)
87 {
88 	int *fd = (int *)private_data;
89 	char c;
90 #ifdef SA_SIGINFO
91 	kill(getpid(), SIGUSR1);
92 #endif
93 	kill(getpid(), SIGALRM);
94 
95 	do_read(fd[1], &c, 1);
96 	fde_count++;
97 }
98 
99 /* This will only fire if the fd's returned from pipe() are bi-directional. */
fde_handler_write_1(struct tevent_context * ev_ctx,struct tevent_fd * f,uint16_t flags,void * private_data)100 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
101 			uint16_t flags, void *private_data)
102 {
103 	int *fd = (int *)private_data;
104 	char c = 0;
105 	do_write(fd[0], &c, 1);
106 }
107 
finished_handler(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)108 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
109 			     struct timeval tval, void *private_data)
110 {
111 	int *finished = (int *)private_data;
112 	(*finished) = 1;
113 }
114 
count_handler(struct tevent_context * ev_ctx,struct tevent_signal * te,int signum,int count,void * info,void * private_data)115 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
116 			  int signum, int count, void *info, void *private_data)
117 {
118 	int *countp = (int *)private_data;
119 	(*countp) += count;
120 }
121 
test_event_context(struct torture_context * test,const void * test_data)122 static bool test_event_context(struct torture_context *test,
123 			       const void *test_data)
124 {
125 	struct tevent_context *ev_ctx;
126 	int fd[2] = { -1, -1 };
127 	const char *backend = (const char *)test_data;
128 	int alarm_count=0, info_count=0;
129 	struct tevent_fd *fde_read;
130 	struct tevent_fd *fde_read_1;
131 	struct tevent_fd *fde_write;
132 	struct tevent_fd *fde_write_1;
133 #ifdef SA_RESTART
134 	struct tevent_signal *se1 = NULL;
135 #endif
136 #ifdef SA_RESETHAND
137 	struct tevent_signal *se2 = NULL;
138 #endif
139 #ifdef SA_SIGINFO
140 	struct tevent_signal *se3 = NULL;
141 #endif
142 	int finished=0;
143 	struct timeval t;
144 	int ret;
145 
146 	ev_ctx = tevent_context_init_byname(test, backend);
147 	if (ev_ctx == NULL) {
148 		torture_comment(test, "event backend '%s' not supported\n", backend);
149 		return true;
150 	}
151 
152 	torture_comment(test, "backend '%s' - %s\n",
153 			backend, __FUNCTION__);
154 
155 	/* reset globals */
156 	fde_count = 0;
157 
158 	/* create a pipe */
159 	ret = pipe(fd);
160 	torture_assert_int_equal(test, ret, 0, "pipe failed");
161 
162 	fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
163 			    fde_handler_read, fd);
164 	fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
165 			    fde_handler_write_1, fd);
166 
167 	fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
168 			    fde_handler_write, fd);
169 	fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
170 			    fde_handler_read_1, fd);
171 
172 	tevent_fd_set_auto_close(fde_read);
173 	tevent_fd_set_auto_close(fde_write);
174 
175 	tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
176 			 finished_handler, &finished);
177 
178 #ifdef SA_RESTART
179 	se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
180 	torture_assert(test, se1 != NULL, "failed to setup se1");
181 #endif
182 #ifdef SA_RESETHAND
183 	se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
184 	torture_assert(test, se2 != NULL, "failed to setup se2");
185 #endif
186 #ifdef SA_SIGINFO
187 	se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
188 	torture_assert(test, se3 != NULL, "failed to setup se3");
189 #endif
190 
191 	t = timeval_current();
192 	while (!finished) {
193 		errno = 0;
194 		if (tevent_loop_once(ev_ctx) == -1) {
195 			TALLOC_FREE(ev_ctx);
196 			torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
197 			return false;
198 		}
199 	}
200 
201 	talloc_free(fde_read_1);
202 	talloc_free(fde_write_1);
203 	talloc_free(fde_read);
204 	talloc_free(fde_write);
205 
206 	while (alarm_count < fde_count+1) {
207 		if (tevent_loop_once(ev_ctx) == -1) {
208 			break;
209 		}
210 	}
211 
212 	torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
213 
214 #ifdef SA_RESTART
215 	talloc_free(se1);
216 #endif
217 
218 	torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
219 
220 #ifdef SA_RESETHAND
221 	/*
222 	 * we do not call talloc_free(se2)
223 	 * because it is already gone,
224 	 * after triggering the event handler.
225 	 */
226 #endif
227 
228 #ifdef SA_SIGINFO
229 	talloc_free(se3);
230 	torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
231 #endif
232 
233 	talloc_free(ev_ctx);
234 
235 	return true;
236 }
237 
238 struct test_event_fd1_state {
239 	struct torture_context *tctx;
240 	const char *backend;
241 	struct tevent_context *ev;
242 	int sock[2];
243 	struct tevent_timer *te;
244 	struct tevent_fd *fde0;
245 	struct tevent_fd *fde1;
246 	bool got_write;
247 	bool got_read;
248 	bool drain;
249 	bool drain_done;
250 	unsigned loop_count;
251 	bool finished;
252 	const char *error;
253 };
254 
test_event_fd1_fde_handler(struct tevent_context * ev_ctx,struct tevent_fd * fde,uint16_t flags,void * private_data)255 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
256 				       struct tevent_fd *fde,
257 				       uint16_t flags,
258 				       void *private_data)
259 {
260 	struct test_event_fd1_state *state =
261 		(struct test_event_fd1_state *)private_data;
262 
263 	if (state->drain_done) {
264 		state->finished = true;
265 		state->error = __location__;
266 		return;
267 	}
268 
269 	if (state->drain) {
270 		ssize_t ret;
271 		uint8_t c = 0;
272 
273 		if (!(flags & TEVENT_FD_READ)) {
274 			state->finished = true;
275 			state->error = __location__;
276 			return;
277 		}
278 
279 		ret = read(state->sock[0], &c, 1);
280 		if (ret == 1) {
281 			return;
282 		}
283 
284 		/*
285 		 * end of test...
286 		 */
287 		tevent_fd_set_flags(fde, 0);
288 		state->drain_done = true;
289 		return;
290 	}
291 
292 	if (!state->got_write) {
293 		uint8_t c = 0;
294 
295 		if (flags != TEVENT_FD_WRITE) {
296 			state->finished = true;
297 			state->error = __location__;
298 			return;
299 		}
300 		state->got_write = true;
301 
302 		/*
303 		 * we write to the other socket...
304 		 */
305 		do_write(state->sock[1], &c, 1);
306 		TEVENT_FD_NOT_WRITEABLE(fde);
307 		TEVENT_FD_READABLE(fde);
308 		return;
309 	}
310 
311 	if (!state->got_read) {
312 		if (flags != TEVENT_FD_READ) {
313 			state->finished = true;
314 			state->error = __location__;
315 			return;
316 		}
317 		state->got_read = true;
318 
319 		TEVENT_FD_NOT_READABLE(fde);
320 		return;
321 	}
322 
323 	state->finished = true;
324 	state->error = __location__;
325 	return;
326 }
327 
test_event_fd1_finished(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)328 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
329 				    struct tevent_timer *te,
330 				    struct timeval tval,
331 				    void *private_data)
332 {
333 	struct test_event_fd1_state *state =
334 		(struct test_event_fd1_state *)private_data;
335 
336 	if (state->drain_done) {
337 		state->finished = true;
338 		return;
339 	}
340 
341 	if (!state->got_write) {
342 		state->finished = true;
343 		state->error = __location__;
344 		return;
345 	}
346 
347 	if (!state->got_read) {
348 		state->finished = true;
349 		state->error = __location__;
350 		return;
351 	}
352 
353 	state->loop_count++;
354 	if (state->loop_count > 3) {
355 		state->finished = true;
356 		state->error = __location__;
357 		return;
358 	}
359 
360 	state->got_write = false;
361 	state->got_read = false;
362 
363 	tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
364 
365 	if (state->loop_count > 2) {
366 		state->drain = true;
367 		TALLOC_FREE(state->fde1);
368 		TEVENT_FD_READABLE(state->fde0);
369 	}
370 
371 	state->te = tevent_add_timer(state->ev, state->ev,
372 				    timeval_current_ofs(0,2000),
373 				    test_event_fd1_finished, state);
374 }
375 
test_event_fd1(struct torture_context * tctx,const void * test_data)376 static bool test_event_fd1(struct torture_context *tctx,
377 			   const void *test_data)
378 {
379 	struct test_event_fd1_state state;
380 	int ret;
381 
382 	ZERO_STRUCT(state);
383 	state.tctx = tctx;
384 	state.backend = (const char *)test_data;
385 
386 	state.ev = tevent_context_init_byname(tctx, state.backend);
387 	if (state.ev == NULL) {
388 		torture_skip(tctx, talloc_asprintf(tctx,
389 			     "event backend '%s' not supported\n",
390 			     state.backend));
391 		return true;
392 	}
393 
394 	tevent_set_debug_stderr(state.ev);
395 	torture_comment(tctx, "backend '%s' - %s\n",
396 			state.backend, __FUNCTION__);
397 
398 	/*
399 	 * This tests the following:
400 	 *
401 	 * It monitors the state of state.sock[0]
402 	 * with tevent_fd, but we never read/write on state.sock[0]
403 	 * while state.sock[1] * is only used to write a few bytes.
404 	 *
405 	 * We have a loop:
406 	 *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
407 	 *   - we write 1 byte to state.sock[1]
408 	 *   - we wait only for TEVENT_FD_READ on state.sock[0]
409 	 *   - we disable events on state.sock[0]
410 	 *   - the timer event restarts the loop
411 	 * Then we close state.sock[1]
412 	 * We have a loop:
413 	 *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
414 	 *   - we try to read 1 byte
415 	 *   - if the read gets an error of returns 0
416 	 *     we disable the event handler
417 	 *   - the timer finishes the test
418 	 */
419 	state.sock[0] = -1;
420 	state.sock[1] = -1;
421 
422 	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
423 	torture_assert(tctx, ret == 0, "socketpair() failed");
424 
425 	state.te = tevent_add_timer(state.ev, state.ev,
426 				    timeval_current_ofs(0,1000),
427 				    test_event_fd1_finished, &state);
428 	state.fde0 = tevent_add_fd(state.ev, state.ev,
429 				   state.sock[0], TEVENT_FD_WRITE,
430 				   test_event_fd1_fde_handler, &state);
431 	/* state.fde1 is only used to auto close */
432 	state.fde1 = tevent_add_fd(state.ev, state.ev,
433 				   state.sock[1], 0,
434 				   test_event_fd1_fde_handler, &state);
435 
436 	tevent_fd_set_auto_close(state.fde0);
437 	tevent_fd_set_auto_close(state.fde1);
438 
439 	while (!state.finished) {
440 		errno = 0;
441 		if (tevent_loop_once(state.ev) == -1) {
442 			talloc_free(state.ev);
443 			torture_fail(tctx, talloc_asprintf(tctx,
444 				     "Failed event loop %s\n",
445 				     strerror(errno)));
446 		}
447 	}
448 
449 	talloc_free(state.ev);
450 
451 	torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
452 		       "%s", state.error));
453 
454 	return true;
455 }
456 
457 struct test_event_fd2_state {
458 	struct torture_context *tctx;
459 	const char *backend;
460 	struct tevent_context *ev;
461 	struct tevent_timer *te;
462 	struct test_event_fd2_sock {
463 		struct test_event_fd2_state *state;
464 		int fd;
465 		struct tevent_fd *fde;
466 		size_t num_written;
467 		size_t num_read;
468 		bool got_full;
469 	} sock0, sock1;
470 	bool finished;
471 	const char *error;
472 };
473 
test_event_fd2_sock_handler(struct tevent_context * ev_ctx,struct tevent_fd * fde,uint16_t flags,void * private_data)474 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
475 					struct tevent_fd *fde,
476 					uint16_t flags,
477 					void *private_data)
478 {
479 	struct test_event_fd2_sock *cur_sock =
480 		(struct test_event_fd2_sock *)private_data;
481 	struct test_event_fd2_state *state = cur_sock->state;
482 	struct test_event_fd2_sock *oth_sock = NULL;
483 	uint8_t v = 0, c;
484 	ssize_t ret;
485 
486 	if (cur_sock == &state->sock0) {
487 		oth_sock = &state->sock1;
488 	} else {
489 		oth_sock = &state->sock0;
490 	}
491 
492 	if (oth_sock->num_written == 1) {
493 		if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
494 			state->finished = true;
495 			state->error = __location__;
496 			return;
497 		}
498 	}
499 
500 	if (cur_sock->num_read == oth_sock->num_written) {
501 		state->finished = true;
502 		state->error = __location__;
503 		return;
504 	}
505 
506 	if (!(flags & TEVENT_FD_READ)) {
507 		state->finished = true;
508 		state->error = __location__;
509 		return;
510 	}
511 
512 	if (oth_sock->num_read >= PIPE_BUF) {
513 		/*
514 		 * On Linux we become writable once we've read
515 		 * one byte. On Solaris we only become writable
516 		 * again once we've read 4096 bytes. PIPE_BUF
517 		 * is probably a safe bet to test against.
518 		 *
519 		 * There should be room to write a byte again
520 		 */
521 		if (!(flags & TEVENT_FD_WRITE)) {
522 			state->finished = true;
523 			state->error = __location__;
524 			return;
525 		}
526 	}
527 
528 	if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
529 		v = (uint8_t)cur_sock->num_written;
530 		ret = write(cur_sock->fd, &v, 1);
531 		if (ret != 1) {
532 			state->finished = true;
533 			state->error = __location__;
534 			return;
535 		}
536 		cur_sock->num_written++;
537 		if (cur_sock->num_written > 0x80000000) {
538 			state->finished = true;
539 			state->error = __location__;
540 			return;
541 		}
542 		return;
543 	}
544 
545 	if (!cur_sock->got_full) {
546 		cur_sock->got_full = true;
547 
548 		if (!oth_sock->got_full) {
549 			/*
550 			 * cur_sock is full,
551 			 * lets wait for oth_sock
552 			 * to be filled
553 			 */
554 			tevent_fd_set_flags(cur_sock->fde, 0);
555 			return;
556 		}
557 
558 		/*
559 		 * oth_sock waited for cur_sock,
560 		 * lets restart it
561 		 */
562 		tevent_fd_set_flags(oth_sock->fde,
563 				    TEVENT_FD_READ|TEVENT_FD_WRITE);
564 	}
565 
566 	ret = read(cur_sock->fd, &v, 1);
567 	if (ret != 1) {
568 		state->finished = true;
569 		state->error = __location__;
570 		return;
571 	}
572 	c = (uint8_t)cur_sock->num_read;
573 	if (c != v) {
574 		state->finished = true;
575 		state->error = __location__;
576 		return;
577 	}
578 	cur_sock->num_read++;
579 
580 	if (cur_sock->num_read < oth_sock->num_written) {
581 		/* there is more to read */
582 		return;
583 	}
584 	/*
585 	 * we read everything, we need to remove TEVENT_FD_WRITE
586 	 * to avoid spinning
587 	 */
588 	TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
589 
590 	if (oth_sock->num_read == cur_sock->num_written) {
591 		/*
592 		 * both directions are finished
593 		 */
594 		state->finished = true;
595 	}
596 
597 	return;
598 }
599 
test_event_fd2_finished(struct tevent_context * ev_ctx,struct tevent_timer * te,struct timeval tval,void * private_data)600 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
601 				    struct tevent_timer *te,
602 				    struct timeval tval,
603 				    void *private_data)
604 {
605 	struct test_event_fd2_state *state =
606 		(struct test_event_fd2_state *)private_data;
607 
608 	/*
609 	 * this should never be triggered
610 	 */
611 	state->finished = true;
612 	state->error = __location__;
613 }
614 
test_event_fd2(struct torture_context * tctx,const void * test_data)615 static bool test_event_fd2(struct torture_context *tctx,
616 			   const void *test_data)
617 {
618 	struct test_event_fd2_state state;
619 	int sock[2];
620 	uint8_t c = 0;
621 
622 	ZERO_STRUCT(state);
623 	state.tctx = tctx;
624 	state.backend = (const char *)test_data;
625 
626 	state.ev = tevent_context_init_byname(tctx, state.backend);
627 	if (state.ev == NULL) {
628 		torture_skip(tctx, talloc_asprintf(tctx,
629 			     "event backend '%s' not supported\n",
630 			     state.backend));
631 		return true;
632 	}
633 
634 	tevent_set_debug_stderr(state.ev);
635 	torture_comment(tctx, "backend '%s' - %s\n",
636 			state.backend, __FUNCTION__);
637 
638 	/*
639 	 * This tests the following
640 	 *
641 	 * - We write 1 byte to each socket
642 	 * - We wait for TEVENT_FD_READ/WRITE on both sockets
643 	 * - When we get TEVENT_FD_WRITE we write 1 byte
644 	 *   until both socket buffers are full, which
645 	 *   means both sockets only get TEVENT_FD_READ.
646 	 * - Then we read 1 byte until we have consumed
647 	 *   all bytes the other end has written.
648 	 */
649 	sock[0] = -1;
650 	sock[1] = -1;
651 	socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
652 
653 	/*
654 	 * the timer should never expire
655 	 */
656 	state.te = tevent_add_timer(state.ev, state.ev,
657 				    timeval_current_ofs(600, 0),
658 				    test_event_fd2_finished, &state);
659 	state.sock0.state = &state;
660 	state.sock0.fd = sock[0];
661 	state.sock0.fde = tevent_add_fd(state.ev, state.ev,
662 					state.sock0.fd,
663 					TEVENT_FD_READ | TEVENT_FD_WRITE,
664 					test_event_fd2_sock_handler,
665 					&state.sock0);
666 	state.sock1.state = &state;
667 	state.sock1.fd = sock[1];
668 	state.sock1.fde = tevent_add_fd(state.ev, state.ev,
669 					state.sock1.fd,
670 					TEVENT_FD_READ | TEVENT_FD_WRITE,
671 					test_event_fd2_sock_handler,
672 					&state.sock1);
673 
674 	tevent_fd_set_auto_close(state.sock0.fde);
675 	tevent_fd_set_auto_close(state.sock1.fde);
676 
677 	do_write(state.sock0.fd, &c, 1);
678 	state.sock0.num_written++;
679 	do_write(state.sock1.fd, &c, 1);
680 	state.sock1.num_written++;
681 
682 	while (!state.finished) {
683 		errno = 0;
684 		if (tevent_loop_once(state.ev) == -1) {
685 			talloc_free(state.ev);
686 			torture_fail(tctx, talloc_asprintf(tctx,
687 				     "Failed event loop %s\n",
688 				     strerror(errno)));
689 		}
690 	}
691 
692 	talloc_free(state.ev);
693 
694 	torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
695 		       "%s", state.error));
696 
697 	return true;
698 }
699 
700 struct test_wrapper_state {
701 	struct torture_context *tctx;
702 	int num_events;
703 	int num_wrap_handlers;
704 };
705 
test_wrapper_before_use(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,const char * location)706 static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
707 				    void *private_data,
708 				    struct tevent_context *main_ev,
709 				    const char *location)
710 {
711 	struct test_wrapper_state *state =
712 		talloc_get_type_abort(private_data,
713 		struct test_wrapper_state);
714 
715 	torture_comment(state->tctx, "%s\n", __func__);
716 	state->num_wrap_handlers++;
717 	return true;
718 }
719 
test_wrapper_after_use(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,const char * location)720 static void test_wrapper_after_use(struct tevent_context *wrap_ev,
721 				   void *private_data,
722 				   struct tevent_context *main_ev,
723 				   const char *location)
724 {
725 	struct test_wrapper_state *state =
726 		talloc_get_type_abort(private_data,
727 		struct test_wrapper_state);
728 
729 	torture_comment(state->tctx, "%s\n", __func__);
730 	state->num_wrap_handlers++;
731 }
732 
test_wrapper_before_fd_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_fd * fde,uint16_t flags,const char * handler_name,const char * location)733 static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
734 					   void *private_data,
735 					   struct tevent_context *main_ev,
736 					   struct tevent_fd *fde,
737 					   uint16_t flags,
738 					   const char *handler_name,
739 					   const char *location)
740 {
741 	struct test_wrapper_state *state =
742 		talloc_get_type_abort(private_data,
743 		struct test_wrapper_state);
744 
745 	torture_comment(state->tctx, "%s\n", __func__);
746 	state->num_wrap_handlers++;
747 }
748 
test_wrapper_after_fd_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_fd * fde,uint16_t flags,const char * handler_name,const char * location)749 static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
750 					  void *private_data,
751 					  struct tevent_context *main_ev,
752 					  struct tevent_fd *fde,
753 					  uint16_t flags,
754 					  const char *handler_name,
755 					  const char *location)
756 {
757 	struct test_wrapper_state *state =
758 		talloc_get_type_abort(private_data,
759 		struct test_wrapper_state);
760 
761 	torture_comment(state->tctx, "%s\n", __func__);
762 	state->num_wrap_handlers++;
763 }
764 
test_wrapper_before_timer_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_timer * te,struct timeval requested_time,struct timeval trigger_time,const char * handler_name,const char * location)765 static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
766 					      void *private_data,
767 					      struct tevent_context *main_ev,
768 					      struct tevent_timer *te,
769 					      struct timeval requested_time,
770 					      struct timeval trigger_time,
771 					      const char *handler_name,
772 					      const char *location)
773 {
774 	struct test_wrapper_state *state =
775 		talloc_get_type_abort(private_data,
776 		struct test_wrapper_state);
777 
778 	torture_comment(state->tctx, "%s\n", __func__);
779 	state->num_wrap_handlers++;
780 }
781 
test_wrapper_after_timer_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_timer * te,struct timeval requested_time,struct timeval trigger_time,const char * handler_name,const char * location)782 static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
783 					     void *private_data,
784 					     struct tevent_context *main_ev,
785 					     struct tevent_timer *te,
786 					     struct timeval requested_time,
787 					     struct timeval trigger_time,
788 					     const char *handler_name,
789 					     const char *location)
790 {
791 	struct test_wrapper_state *state =
792 		talloc_get_type_abort(private_data,
793 		struct test_wrapper_state);
794 
795 	torture_comment(state->tctx, "%s\n", __func__);
796 	state->num_wrap_handlers++;
797 }
798 
test_wrapper_before_immediate_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_immediate * im,const char * handler_name,const char * location)799 static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
800 						  void *private_data,
801 						  struct tevent_context *main_ev,
802 						  struct tevent_immediate *im,
803 						  const char *handler_name,
804 						  const char *location)
805 {
806 	struct test_wrapper_state *state =
807 		talloc_get_type_abort(private_data,
808 		struct test_wrapper_state);
809 
810 	torture_comment(state->tctx, "%s\n", __func__);
811 	state->num_wrap_handlers++;
812 }
813 
test_wrapper_after_immediate_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_immediate * im,const char * handler_name,const char * location)814 static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
815 						 void *private_data,
816 						 struct tevent_context *main_ev,
817 						 struct tevent_immediate *im,
818 						 const char *handler_name,
819 						 const char *location)
820 {
821 	struct test_wrapper_state *state =
822 		talloc_get_type_abort(private_data,
823 		struct test_wrapper_state);
824 
825 	torture_comment(state->tctx, "%s\n", __func__);
826 	state->num_wrap_handlers++;
827 }
828 
test_wrapper_before_signal_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_signal * se,int signum,int count,void * siginfo,const char * handler_name,const char * location)829 static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
830 					       void *private_data,
831 					       struct tevent_context *main_ev,
832 					       struct tevent_signal *se,
833 					       int signum,
834 					       int count,
835 					       void *siginfo,
836 					       const char *handler_name,
837 					       const char *location)
838 {
839 	struct test_wrapper_state *state =
840 		talloc_get_type_abort(private_data,
841 		struct test_wrapper_state);
842 
843 	torture_comment(state->tctx, "%s\n", __func__);
844 	state->num_wrap_handlers++;
845 }
846 
test_wrapper_after_signal_handler(struct tevent_context * wrap_ev,void * private_data,struct tevent_context * main_ev,struct tevent_signal * se,int signum,int count,void * siginfo,const char * handler_name,const char * location)847 static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
848 					      void *private_data,
849 					      struct tevent_context *main_ev,
850 					      struct tevent_signal *se,
851 					      int signum,
852 					      int count,
853 					      void *siginfo,
854 					      const char *handler_name,
855 					      const char *location)
856 {
857 	struct test_wrapper_state *state =
858 		talloc_get_type_abort(private_data,
859 		struct test_wrapper_state);
860 
861 	torture_comment(state->tctx, "%s\n", __func__);
862 	state->num_wrap_handlers++;
863 }
864 
865 static const struct tevent_wrapper_ops test_wrapper_ops = {
866 	.name				= "test_wrapper",
867 	.before_use			= test_wrapper_before_use,
868 	.after_use			= test_wrapper_after_use,
869 	.before_fd_handler		= test_wrapper_before_fd_handler,
870 	.after_fd_handler		= test_wrapper_after_fd_handler,
871 	.before_timer_handler		= test_wrapper_before_timer_handler,
872 	.after_timer_handler		= test_wrapper_after_timer_handler,
873 	.before_immediate_handler	= test_wrapper_before_immediate_handler,
874 	.after_immediate_handler	= test_wrapper_after_immediate_handler,
875 	.before_signal_handler		= test_wrapper_before_signal_handler,
876 	.after_signal_handler		= test_wrapper_after_signal_handler,
877 };
878 
test_wrapper_timer_handler(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * private_data)879 static void test_wrapper_timer_handler(struct tevent_context *ev,
880 				       struct tevent_timer *te,
881 				       struct timeval tv,
882 				       void *private_data)
883 {
884 	struct test_wrapper_state *state =
885 		(struct test_wrapper_state *)private_data;
886 
887 
888 	torture_comment(state->tctx, "timer handler\n");
889 
890 	state->num_events++;
891 	talloc_free(te);
892 	return;
893 }
894 
test_wrapper_fd_handler(struct tevent_context * ev,struct tevent_fd * fde,unsigned short fd_flags,void * private_data)895 static void test_wrapper_fd_handler(struct tevent_context *ev,
896 				    struct tevent_fd *fde,
897 				    unsigned short fd_flags,
898 				    void *private_data)
899 {
900 	struct test_wrapper_state *state =
901 		(struct test_wrapper_state *)private_data;
902 
903 	torture_comment(state->tctx, "fd handler\n");
904 
905 	state->num_events++;
906 	talloc_free(fde);
907 	return;
908 }
909 
test_wrapper_immediate_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)910 static void test_wrapper_immediate_handler(struct tevent_context *ev,
911 					   struct tevent_immediate *im,
912 					   void *private_data)
913 {
914 	struct test_wrapper_state *state =
915 		(struct test_wrapper_state *)private_data;
916 
917 	state->num_events++;
918 	talloc_free(im);
919 
920 	torture_comment(state->tctx, "immediate handler\n");
921 	return;
922 }
923 
test_wrapper_signal_handler(struct tevent_context * ev,struct tevent_signal * se,int signum,int count,void * siginfo,void * private_data)924 static void test_wrapper_signal_handler(struct tevent_context *ev,
925 					struct tevent_signal *se,
926 					int signum,
927 					int count,
928 					void *siginfo,
929 					void *private_data)
930 {
931 	struct test_wrapper_state *state =
932 		(struct test_wrapper_state *)private_data;
933 
934 	torture_comment(state->tctx, "signal handler\n");
935 
936 	state->num_events++;
937 	talloc_free(se);
938 	return;
939 }
940 
test_wrapper(struct torture_context * tctx,const void * test_data)941 static bool test_wrapper(struct torture_context *tctx,
942 			 const void *test_data)
943 {
944 	struct test_wrapper_state *state = NULL;
945 	int sock[2] = { -1, -1};
946 	uint8_t c = 0;
947 	const int num_events = 4;
948 	const char *backend = (const char *)test_data;
949 	struct tevent_context *ev = NULL;
950 	struct tevent_context *wrap_ev = NULL;
951 	struct tevent_fd *fde = NULL;
952 	struct tevent_timer *te = NULL;
953 	struct tevent_signal *se = NULL;
954 	struct tevent_immediate *im = NULL;
955 	int ret;
956 	bool ok = false;
957 	bool ret2;
958 
959 	ev = tevent_context_init_byname(tctx, backend);
960 	if (ev == NULL) {
961 		torture_skip(tctx, talloc_asprintf(tctx,
962 			     "event backend '%s' not supported\n",
963 			     backend));
964 		return true;
965 	}
966 
967 	tevent_set_debug_stderr(ev);
968 	torture_comment(tctx, "tevent backend '%s'\n", backend);
969 
970 	wrap_ev = tevent_context_wrapper_create(
971 		ev, ev,	&test_wrapper_ops, &state, struct test_wrapper_state);
972 	torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
973 				     "tevent_context_wrapper_create failed\n");
974 	*state = (struct test_wrapper_state) {
975 		.tctx = tctx,
976 	};
977 
978 	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
979 	torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
980 
981 	te = tevent_add_timer(wrap_ev, wrap_ev,
982 			      timeval_current_ofs(0, 0),
983 			      test_wrapper_timer_handler, state);
984 	torture_assert_not_null_goto(tctx, te, ok, done,
985 				     "tevent_add_timer failed\n");
986 
987 	fde = tevent_add_fd(wrap_ev, wrap_ev,
988 			    sock[1],
989 			    TEVENT_FD_READ,
990 			    test_wrapper_fd_handler,
991 			    state);
992 	torture_assert_not_null_goto(tctx, fde, ok, done,
993 				     "tevent_add_fd failed\n");
994 
995 	im = tevent_create_immediate(wrap_ev);
996 	torture_assert_not_null_goto(tctx, im, ok, done,
997 				     "tevent_create_immediate failed\n");
998 
999 	se = tevent_add_signal(wrap_ev, wrap_ev,
1000 			       SIGUSR1,
1001 			       0,
1002 			       test_wrapper_signal_handler,
1003 			       state);
1004 	torture_assert_not_null_goto(tctx, se, ok, done,
1005 				     "tevent_add_signal failed\n");
1006 
1007 	do_write(sock[0], &c, 1);
1008 	kill(getpid(), SIGUSR1);
1009 	tevent_schedule_immediate(im,
1010 				  wrap_ev,
1011 				  test_wrapper_immediate_handler,
1012 				  state);
1013 
1014 	ret2 = tevent_context_push_use(wrap_ev);
1015 	torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1016 	ret2 = tevent_context_push_use(ev);
1017 	torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1018 	tevent_context_pop_use(ev);
1019 	tevent_context_pop_use(wrap_ev);
1020 
1021 	ret = tevent_loop_wait(ev);
1022 	torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1023 
1024 	torture_comment(tctx, "Num events: %d\n", state->num_events);
1025 	torture_comment(tctx, "Num wrap handlers: %d\n",
1026 			state->num_wrap_handlers);
1027 
1028 	torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1029 				      "Wrong event count\n");
1030 	torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1031 				      num_events*2+2,
1032 				      ok, done, "Wrong wrapper count\n");
1033 
1034 	ok = true;
1035 
1036 done:
1037 	TALLOC_FREE(wrap_ev);
1038 	TALLOC_FREE(ev);
1039 
1040 	if (sock[0] != -1) {
1041 		close(sock[0]);
1042 	}
1043 	if (sock[1] != -1) {
1044 		close(sock[1]);
1045 	}
1046 	return ok;
1047 pop_use:
1048 	tevent_context_pop_use(wrap_ev);
1049 	goto done;
1050 }
1051 
test_free_wrapper_signal_handler(struct tevent_context * ev,struct tevent_signal * se,int signum,int count,void * siginfo,void * private_data)1052 static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1053 					struct tevent_signal *se,
1054 					int signum,
1055 					int count,
1056 					void *siginfo,
1057 					void *private_data)
1058 {
1059 	struct torture_context *tctx =
1060 		talloc_get_type_abort(private_data,
1061 		struct torture_context);
1062 
1063 	torture_comment(tctx, "signal handler\n");
1064 
1065 	talloc_free(se);
1066 
1067 	/*
1068 	 * signal handlers have highest priority in tevent, so this signal
1069 	 * handler will always be started before the other handlers
1070 	 * below. Freeing the (wrapper) event context here tests that the
1071 	 * wrapper implementation correclty handles the wrapper ev going away
1072 	 * with pending events.
1073 	 */
1074 	talloc_free(ev);
1075 	return;
1076 }
1077 
test_free_wrapper_fd_handler(struct tevent_context * ev,struct tevent_fd * fde,unsigned short fd_flags,void * private_data)1078 static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1079 					 struct tevent_fd *fde,
1080 					 unsigned short fd_flags,
1081 					 void *private_data)
1082 {
1083 	/*
1084 	 * This should never be called as
1085 	 * test_free_wrapper_signal_handler()
1086 	 * already destroyed the wrapper tevent_context.
1087 	 */
1088 	abort();
1089 }
1090 
test_free_wrapper_immediate_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)1091 static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1092 					   struct tevent_immediate *im,
1093 					   void *private_data)
1094 {
1095 	/*
1096 	 * This should never be called as
1097 	 * test_free_wrapper_signal_handler()
1098 	 * already destroyed the wrapper tevent_context.
1099 	 */
1100 	abort();
1101 }
1102 
test_free_wrapper_timer_handler(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * private_data)1103 static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1104 				       struct tevent_timer *te,
1105 				       struct timeval tv,
1106 				       void *private_data)
1107 {
1108 	/*
1109 	 * This should never be called as
1110 	 * test_free_wrapper_signal_handler()
1111 	 * already destroyed the wrapper tevent_context.
1112 	 */
1113 	abort();
1114 }
1115 
test_free_wrapper(struct torture_context * tctx,const void * test_data)1116 static bool test_free_wrapper(struct torture_context *tctx,
1117 			      const void *test_data)
1118 {
1119 	struct test_wrapper_state *state = NULL;
1120 	int sock[2] = { -1, -1};
1121 	uint8_t c = 0;
1122 	const char *backend = (const char *)test_data;
1123 	TALLOC_CTX *frame = talloc_stackframe();
1124 	struct tevent_context *ev = NULL;
1125 	struct tevent_context *wrap_ev = NULL;
1126 	struct tevent_fd *fde = NULL;
1127 	struct tevent_timer *te = NULL;
1128 	struct tevent_signal *se = NULL;
1129 	struct tevent_immediate *im = NULL;
1130 	int ret;
1131 	bool ok = false;
1132 
1133 	ev = tevent_context_init_byname(frame, backend);
1134 	if (ev == NULL) {
1135 		torture_skip(tctx, talloc_asprintf(tctx,
1136 			     "event backend '%s' not supported\n",
1137 			     backend));
1138 		return true;
1139 	}
1140 
1141 	tevent_set_debug_stderr(ev);
1142 	torture_comment(tctx, "tevent backend '%s'\n", backend);
1143 
1144 	wrap_ev = tevent_context_wrapper_create(
1145 		ev, ev,	&test_wrapper_ops, &state, struct test_wrapper_state);
1146 	torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1147 				     "tevent_context_wrapper_create failed\n");
1148 	*state = (struct test_wrapper_state) {
1149 		.tctx = tctx,
1150 	};
1151 
1152 	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1153 	torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1154 
1155 	fde = tevent_add_fd(wrap_ev, frame,
1156 			    sock[1],
1157 			    TEVENT_FD_READ,
1158 			    test_free_wrapper_fd_handler,
1159 			    NULL);
1160 	torture_assert_not_null_goto(tctx, fde, ok, done,
1161 				     "tevent_add_fd failed\n");
1162 
1163 	te = tevent_add_timer(wrap_ev, frame,
1164 			      timeval_current_ofs(0, 0),
1165 			      test_free_wrapper_timer_handler, NULL);
1166 	torture_assert_not_null_goto(tctx, te, ok, done,
1167 				     "tevent_add_timer failed\n");
1168 
1169 	im = tevent_create_immediate(frame);
1170 	torture_assert_not_null_goto(tctx, im, ok, done,
1171 				     "tevent_create_immediate failed\n");
1172 
1173 	se = tevent_add_signal(wrap_ev, frame,
1174 			       SIGUSR1,
1175 			       0,
1176 			       test_free_wrapper_signal_handler,
1177 			       tctx);
1178 	torture_assert_not_null_goto(tctx, se, ok, done,
1179 				     "tevent_add_signal failed\n");
1180 
1181 	do_write(sock[0], &c, 1);
1182 	kill(getpid(), SIGUSR1);
1183 	tevent_schedule_immediate(im,
1184 				  wrap_ev,
1185 				  test_free_wrapper_immediate_handler,
1186 				  NULL);
1187 
1188 	ret = tevent_loop_wait(ev);
1189 	torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1190 
1191 	ok = true;
1192 
1193 done:
1194 	TALLOC_FREE(frame);
1195 
1196 	if (sock[0] != -1) {
1197 		close(sock[0]);
1198 	}
1199 	if (sock[1] != -1) {
1200 		close(sock[1]);
1201 	}
1202 	return ok;
1203 }
1204 
1205 #ifdef HAVE_PTHREAD
1206 
1207 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1208 static bool do_shutdown = false;
1209 
test_event_threaded_lock(void)1210 static void test_event_threaded_lock(void)
1211 {
1212 	int ret;
1213 	ret = pthread_mutex_lock(&threaded_mutex);
1214 	assert(ret == 0);
1215 }
1216 
test_event_threaded_unlock(void)1217 static void test_event_threaded_unlock(void)
1218 {
1219 	int ret;
1220 	ret = pthread_mutex_unlock(&threaded_mutex);
1221 	assert(ret == 0);
1222 }
1223 
test_event_threaded_trace(enum tevent_trace_point point,void * private_data)1224 static void test_event_threaded_trace(enum tevent_trace_point point,
1225 				      void *private_data)
1226 {
1227 	switch (point) {
1228 	case TEVENT_TRACE_BEFORE_WAIT:
1229 		test_event_threaded_unlock();
1230 		break;
1231 	case TEVENT_TRACE_AFTER_WAIT:
1232 		test_event_threaded_lock();
1233 		break;
1234 	case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1235 	case TEVENT_TRACE_AFTER_LOOP_ONCE:
1236 		break;
1237 	}
1238 }
1239 
test_event_threaded_timer(struct tevent_context * ev,struct tevent_timer * te,struct timeval current_time,void * private_data)1240 static void test_event_threaded_timer(struct tevent_context *ev,
1241 				      struct tevent_timer *te,
1242 				      struct timeval current_time,
1243 				      void *private_data)
1244 {
1245 	return;
1246 }
1247 
test_event_poll_thread(void * private_data)1248 static void *test_event_poll_thread(void *private_data)
1249 {
1250 	struct tevent_context *ev = (struct tevent_context *)private_data;
1251 
1252 	test_event_threaded_lock();
1253 
1254 	while (true) {
1255 		int ret;
1256 		ret = tevent_loop_once(ev);
1257 		assert(ret == 0);
1258 		if (do_shutdown) {
1259 			test_event_threaded_unlock();
1260 			return NULL;
1261 		}
1262 	}
1263 
1264 }
1265 
test_event_threaded_read_handler(struct tevent_context * ev,struct tevent_fd * fde,uint16_t flags,void * private_data)1266 static void test_event_threaded_read_handler(struct tevent_context *ev,
1267 					     struct tevent_fd *fde,
1268 					     uint16_t flags,
1269 					     void *private_data)
1270 {
1271 	int *pfd = (int *)private_data;
1272 	char c;
1273 	ssize_t nread;
1274 
1275 	if ((flags & TEVENT_FD_READ) == 0) {
1276 		return;
1277 	}
1278 
1279 	do {
1280 		nread = read(*pfd, &c, 1);
1281 	} while ((nread == -1) && (errno == EINTR));
1282 
1283 	assert(nread == 1);
1284 }
1285 
test_event_context_threaded(struct torture_context * test,const void * test_data)1286 static bool test_event_context_threaded(struct torture_context *test,
1287 					const void *test_data)
1288 {
1289 	struct tevent_context *ev;
1290 	struct tevent_timer *te;
1291 	struct tevent_fd *fde;
1292 	pthread_t poll_thread;
1293 	int fds[2];
1294 	int ret;
1295 	char c = 0;
1296 
1297 	ev = tevent_context_init_byname(test, "poll_mt");
1298 	torture_assert(test, ev != NULL, "poll_mt not supported");
1299 
1300 	tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1301 
1302 	te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1303 			      test_event_threaded_timer, NULL);
1304 	torture_assert(test, te != NULL, "Could not add timer");
1305 
1306 	ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1307 	torture_assert(test, ret == 0, "Could not create poll thread");
1308 
1309 	ret = pipe(fds);
1310 	torture_assert(test, ret == 0, "Could not create pipe");
1311 
1312 	poll(NULL, 0, 100);
1313 
1314 	test_event_threaded_lock();
1315 
1316 	fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1317 			    test_event_threaded_read_handler, &fds[0]);
1318 	torture_assert(test, fde != NULL, "Could not add fd event");
1319 
1320 	test_event_threaded_unlock();
1321 
1322 	poll(NULL, 0, 100);
1323 
1324 	do_write(fds[1], &c, 1);
1325 
1326 	poll(NULL, 0, 100);
1327 
1328 	test_event_threaded_lock();
1329 	do_shutdown = true;
1330 	test_event_threaded_unlock();
1331 
1332 	do_write(fds[1], &c, 1);
1333 
1334 	ret = pthread_join(poll_thread, NULL);
1335 	torture_assert(test, ret == 0, "pthread_join failed");
1336 
1337 	return true;
1338 }
1339 
1340 #define NUM_TEVENT_THREADS 100
1341 
1342 /* Ugly, but needed for torture_comment... */
1343 static struct torture_context *thread_test_ctx;
1344 static pthread_t thread_map[NUM_TEVENT_THREADS];
1345 static unsigned thread_counter;
1346 
1347 /* Called in master thread context */
callback_nowait(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1348 static void callback_nowait(struct tevent_context *ev,
1349 				struct tevent_immediate *im,
1350 				void *private_ptr)
1351 {
1352 	pthread_t *thread_id_ptr =
1353 		talloc_get_type_abort(private_ptr, pthread_t);
1354 	unsigned i;
1355 
1356 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1357 		if (pthread_equal(*thread_id_ptr,
1358 				thread_map[i])) {
1359 			break;
1360 		}
1361 	}
1362 	torture_comment(thread_test_ctx,
1363 			"Callback %u from thread %u\n",
1364 			thread_counter,
1365 			i);
1366 	thread_counter++;
1367 }
1368 
1369 /* Blast the master tevent_context with a callback, no waiting. */
thread_fn_nowait(void * private_ptr)1370 static void *thread_fn_nowait(void *private_ptr)
1371 {
1372 	struct tevent_thread_proxy *master_tp =
1373 		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1374 	struct tevent_immediate *im;
1375 	pthread_t *thread_id_ptr;
1376 
1377 	im = tevent_create_immediate(NULL);
1378 	if (im == NULL) {
1379 		return NULL;
1380 	}
1381 	thread_id_ptr = talloc(NULL, pthread_t);
1382 	if (thread_id_ptr == NULL) {
1383 		return NULL;
1384 	}
1385 	*thread_id_ptr = pthread_self();
1386 
1387 	tevent_thread_proxy_schedule(master_tp,
1388 				&im,
1389 				callback_nowait,
1390 				&thread_id_ptr);
1391 	return NULL;
1392 }
1393 
timeout_fn(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * p)1394 static void timeout_fn(struct tevent_context *ev,
1395 			struct tevent_timer *te,
1396 			struct timeval tv, void *p)
1397 {
1398 	thread_counter = NUM_TEVENT_THREADS * 10;
1399 }
1400 
test_multi_tevent_threaded(struct torture_context * test,const void * test_data)1401 static bool test_multi_tevent_threaded(struct torture_context *test,
1402 					const void *test_data)
1403 {
1404 	unsigned i;
1405 	struct tevent_context *master_ev;
1406 	struct tevent_thread_proxy *tp;
1407 
1408 	talloc_disable_null_tracking();
1409 
1410 	/* Ugly global stuff. */
1411 	thread_test_ctx = test;
1412 	thread_counter = 0;
1413 
1414 	master_ev = tevent_context_init(NULL);
1415 	if (master_ev == NULL) {
1416 		return false;
1417 	}
1418 	tevent_set_debug_stderr(master_ev);
1419 
1420 	tp = tevent_thread_proxy_create(master_ev);
1421 	if (tp == NULL) {
1422 		torture_fail(test,
1423 			talloc_asprintf(test,
1424 				"tevent_thread_proxy_create failed\n"));
1425 		talloc_free(master_ev);
1426 		return false;
1427 	}
1428 
1429 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1430 		int ret = pthread_create(&thread_map[i],
1431 				NULL,
1432 				thread_fn_nowait,
1433 				tp);
1434 		if (ret != 0) {
1435 			torture_fail(test,
1436 				talloc_asprintf(test,
1437 					"Failed to create thread %i, %d\n",
1438 					i, ret));
1439 			return false;
1440 		}
1441 	}
1442 
1443 	/* Ensure we don't wait more than 10 seconds. */
1444 	tevent_add_timer(master_ev,
1445 			master_ev,
1446 			timeval_current_ofs(10,0),
1447 			timeout_fn,
1448 			NULL);
1449 
1450 	while (thread_counter < NUM_TEVENT_THREADS) {
1451 		int ret = tevent_loop_once(master_ev);
1452 		torture_assert(test, ret == 0, "tevent_loop_once failed");
1453 	}
1454 
1455 	torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1456 		"thread_counter fail\n");
1457 
1458 	talloc_free(master_ev);
1459 	return true;
1460 }
1461 
1462 struct reply_state {
1463 	struct tevent_thread_proxy *reply_tp;
1464 	pthread_t thread_id;
1465 	int *p_finished;
1466 };
1467 
thread_timeout_fn(struct tevent_context * ev,struct tevent_timer * te,struct timeval tv,void * p)1468 static void thread_timeout_fn(struct tevent_context *ev,
1469 			struct tevent_timer *te,
1470 			struct timeval tv, void *p)
1471 {
1472 	int *p_finished = (int *)p;
1473 
1474 	*p_finished = 2;
1475 }
1476 
1477 /* Called in child-thread context */
thread_callback(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1478 static void thread_callback(struct tevent_context *ev,
1479 				struct tevent_immediate *im,
1480 				void *private_ptr)
1481 {
1482 	struct reply_state *rsp =
1483 		talloc_get_type_abort(private_ptr, struct reply_state);
1484 
1485 	talloc_steal(ev, rsp);
1486 	*rsp->p_finished = 1;
1487 }
1488 
1489 /* Called in master thread context */
master_callback(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)1490 static void master_callback(struct tevent_context *ev,
1491 				struct tevent_immediate *im,
1492 				void *private_ptr)
1493 {
1494 	struct reply_state *rsp =
1495 		talloc_get_type_abort(private_ptr, struct reply_state);
1496 	unsigned i;
1497 
1498 	talloc_steal(ev, rsp);
1499 
1500 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1501 		if (pthread_equal(rsp->thread_id,
1502 				thread_map[i])) {
1503 			break;
1504 		}
1505 	}
1506 	torture_comment(thread_test_ctx,
1507 			"Callback %u from thread %u\n",
1508 			thread_counter,
1509 			i);
1510 	/* Now reply to the thread ! */
1511 	tevent_thread_proxy_schedule(rsp->reply_tp,
1512 				&im,
1513 				thread_callback,
1514 				&rsp);
1515 
1516 	thread_counter++;
1517 }
1518 
thread_fn_1(void * private_ptr)1519 static void *thread_fn_1(void *private_ptr)
1520 {
1521 	struct tevent_thread_proxy *master_tp =
1522 		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1523 	struct tevent_thread_proxy *tp;
1524 	struct tevent_immediate *im;
1525 	struct tevent_context *ev;
1526 	struct reply_state *rsp;
1527 	int finished = 0;
1528 	int ret;
1529 
1530 	ev = tevent_context_init(NULL);
1531 	if (ev == NULL) {
1532 		return NULL;
1533 	}
1534 
1535 	tp = tevent_thread_proxy_create(ev);
1536 	if (tp == NULL) {
1537 		talloc_free(ev);
1538 		return NULL;
1539 	}
1540 
1541 	im = tevent_create_immediate(ev);
1542 	if (im == NULL) {
1543 		talloc_free(ev);
1544 		return NULL;
1545 	}
1546 
1547 	rsp = talloc(ev, struct reply_state);
1548 	if (rsp == NULL) {
1549 		talloc_free(ev);
1550 		return NULL;
1551 	}
1552 
1553 	rsp->thread_id = pthread_self();
1554 	rsp->reply_tp = tp;
1555 	rsp->p_finished = &finished;
1556 
1557 	/* Introduce a little randomness into the mix.. */
1558 	usleep(random() % 7000);
1559 
1560 	tevent_thread_proxy_schedule(master_tp,
1561 				&im,
1562 				master_callback,
1563 				&rsp);
1564 
1565 	/* Ensure we don't wait more than 10 seconds. */
1566 	tevent_add_timer(ev,
1567 			ev,
1568 			timeval_current_ofs(10,0),
1569 			thread_timeout_fn,
1570 			&finished);
1571 
1572 	while (finished == 0) {
1573 		ret = tevent_loop_once(ev);
1574 		assert(ret == 0);
1575 	}
1576 
1577 	if (finished > 1) {
1578 		/* Timeout ! */
1579 		abort();
1580 	}
1581 
1582 	/*
1583 	 * NB. We should talloc_free(ev) here, but if we do
1584 	 * we currently get hit by helgrind Fix #323432
1585 	 * "When calling pthread_cond_destroy or pthread_mutex_destroy
1586 	 * with initializers as argument Helgrind (incorrectly) reports errors."
1587 	 *
1588 	 * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1589 	 * with-pthread-mutex-destroy-td47757.html
1590 	 *
1591 	 * Helgrind doesn't understand that the request/reply
1592 	 * messages provide synchronization between the lock/unlock
1593 	 * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1594 	 * when the struct tevent_thread_proxy object is talloc_free'd.
1595 	 *
1596 	 * As a work-around for now return ev for the parent thread to free.
1597 	 */
1598 	return ev;
1599 }
1600 
test_multi_tevent_threaded_1(struct torture_context * test,const void * test_data)1601 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1602 					const void *test_data)
1603 {
1604 	unsigned i;
1605 	struct tevent_context *master_ev;
1606 	struct tevent_thread_proxy *master_tp;
1607 	int ret;
1608 
1609 	talloc_disable_null_tracking();
1610 
1611 	/* Ugly global stuff. */
1612 	thread_test_ctx = test;
1613 	thread_counter = 0;
1614 
1615 	master_ev = tevent_context_init(NULL);
1616 	if (master_ev == NULL) {
1617 		return false;
1618 	}
1619 	tevent_set_debug_stderr(master_ev);
1620 
1621 	master_tp = tevent_thread_proxy_create(master_ev);
1622 	if (master_tp == NULL) {
1623 		torture_fail(test,
1624 			talloc_asprintf(test,
1625 				"tevent_thread_proxy_create failed\n"));
1626 		talloc_free(master_ev);
1627 		return false;
1628 	}
1629 
1630 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1631 		ret = pthread_create(&thread_map[i],
1632 				NULL,
1633 				thread_fn_1,
1634 				master_tp);
1635 		if (ret != 0) {
1636 			torture_fail(test,
1637 				talloc_asprintf(test,
1638 					"Failed to create thread %i, %d\n",
1639 					i, ret));
1640 				return false;
1641 		}
1642 	}
1643 
1644 	while (thread_counter < NUM_TEVENT_THREADS) {
1645 		ret = tevent_loop_once(master_ev);
1646 		torture_assert(test, ret == 0, "tevent_loop_once failed");
1647 	}
1648 
1649 	/* Wait for all the threads to finish - join 'em. */
1650 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1651 		void *retval;
1652 		ret = pthread_join(thread_map[i], &retval);
1653 		torture_assert(test, ret == 0, "pthread_join failed");
1654 		/* Free the child thread event context. */
1655 		talloc_free(retval);
1656 	}
1657 
1658 	talloc_free(master_ev);
1659 	return true;
1660 }
1661 
1662 struct threaded_test_2 {
1663 	struct tevent_threaded_context *tctx;
1664 	struct tevent_immediate *im;
1665 	pthread_t thread_id;
1666 };
1667 
1668 static void master_callback_2(struct tevent_context *ev,
1669 			      struct tevent_immediate *im,
1670 			      void *private_data);
1671 
thread_fn_2(void * private_data)1672 static void *thread_fn_2(void *private_data)
1673 {
1674 	struct threaded_test_2 *state = private_data;
1675 
1676 	state->thread_id = pthread_self();
1677 
1678 	usleep(random() % 7000);
1679 
1680 	tevent_threaded_schedule_immediate(
1681 		state->tctx, state->im, master_callback_2, state);
1682 
1683 	return NULL;
1684 }
1685 
master_callback_2(struct tevent_context * ev,struct tevent_immediate * im,void * private_data)1686 static void master_callback_2(struct tevent_context *ev,
1687 			      struct tevent_immediate *im,
1688 			      void *private_data)
1689 {
1690 	struct threaded_test_2 *state = private_data;
1691 	int i;
1692 
1693 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1694 		if (pthread_equal(state->thread_id, thread_map[i])) {
1695 			break;
1696 		}
1697 	}
1698 	torture_comment(thread_test_ctx,
1699 			"Callback_2 %u from thread %u\n",
1700 			thread_counter,
1701 			i);
1702 	thread_counter++;
1703 }
1704 
test_multi_tevent_threaded_2(struct torture_context * test,const void * test_data)1705 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1706 					 const void *test_data)
1707 {
1708 	unsigned i;
1709 
1710 	struct tevent_context *ev;
1711 	struct tevent_threaded_context *tctx;
1712 	int ret;
1713 
1714 	thread_test_ctx = test;
1715 	thread_counter = 0;
1716 
1717 	ev = tevent_context_init(test);
1718 	torture_assert(test, ev != NULL, "tevent_context_init failed");
1719 
1720 	/*
1721 	 * tevent_re_initialise used to have a bug where it did not
1722 	 * re-initialise the thread support after taking it
1723 	 * down. Exercise that code path.
1724 	 */
1725 	ret = tevent_re_initialise(ev);
1726 	torture_assert(test, ret == 0, "tevent_re_initialise failed");
1727 
1728 	tctx = tevent_threaded_context_create(ev, ev);
1729 	torture_assert(test, tctx != NULL,
1730 		       "tevent_threaded_context_create failed");
1731 
1732 	for (i=0; i<NUM_TEVENT_THREADS; i++) {
1733 		struct threaded_test_2 *state;
1734 
1735 		state = talloc(ev, struct threaded_test_2);
1736 		torture_assert(test, state != NULL, "talloc failed");
1737 
1738 		state->tctx = tctx;
1739 		state->im = tevent_create_immediate(state);
1740 		torture_assert(test, state->im != NULL,
1741 			       "tevent_create_immediate failed");
1742 
1743 		ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1744 		torture_assert(test, ret == 0, "pthread_create failed");
1745 	}
1746 
1747 	while (thread_counter < NUM_TEVENT_THREADS) {
1748 		ret = tevent_loop_once(ev);
1749 		torture_assert(test, ret == 0, "tevent_loop_once failed");
1750 	}
1751 
1752 	/* Wait for all the threads to finish - join 'em. */
1753 	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1754 		void *retval;
1755 		ret = pthread_join(thread_map[i], &retval);
1756 		torture_assert(test, ret == 0, "pthread_join failed");
1757 		/* Free the child thread event context. */
1758 	}
1759 
1760 	talloc_free(tctx);
1761 	talloc_free(ev);
1762 	return true;
1763 }
1764 #endif
1765 
torture_local_event(TALLOC_CTX * mem_ctx)1766 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1767 {
1768 	struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1769 	const char **list = tevent_backend_list(suite);
1770 	int i;
1771 
1772 	for (i=0;list && list[i];i++) {
1773 		struct torture_suite *backend_suite;
1774 
1775 		backend_suite = torture_suite_create(mem_ctx, list[i]);
1776 
1777 		torture_suite_add_simple_tcase_const(backend_suite,
1778 					       "context",
1779 					       test_event_context,
1780 					       (const void *)list[i]);
1781 		torture_suite_add_simple_tcase_const(backend_suite,
1782 					       "fd1",
1783 					       test_event_fd1,
1784 					       (const void *)list[i]);
1785 		torture_suite_add_simple_tcase_const(backend_suite,
1786 					       "fd2",
1787 					       test_event_fd2,
1788 					       (const void *)list[i]);
1789 		torture_suite_add_simple_tcase_const(backend_suite,
1790 					       "wrapper",
1791 					       test_wrapper,
1792 					       (const void *)list[i]);
1793 		torture_suite_add_simple_tcase_const(backend_suite,
1794 					       "free_wrapper",
1795 					       test_free_wrapper,
1796 					       (const void *)list[i]);
1797 
1798 		torture_suite_add_suite(suite, backend_suite);
1799 	}
1800 
1801 #ifdef HAVE_PTHREAD
1802 	torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1803 					     test_event_context_threaded,
1804 					     NULL);
1805 
1806 	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1807 					     test_multi_tevent_threaded,
1808 					     NULL);
1809 
1810 	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1811 					     test_multi_tevent_threaded_1,
1812 					     NULL);
1813 
1814 	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1815 					     test_multi_tevent_threaded_2,
1816 					     NULL);
1817 
1818 #endif
1819 
1820 	return suite;
1821 }
1822