1 /*
2    tevent event library.
3 
4    Copyright (C) Jeremy Allison 2015
5 
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9 
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14 
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19 
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23 
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30 
31 #ifdef HAVE_PTHREAD
32 #include "system/threads.h"
33 
34 struct tevent_immediate_list {
35 	struct tevent_immediate_list *next, *prev;
36 	tevent_immediate_handler_t handler;
37 	struct tevent_immediate *im;
38 	void *private_ptr;
39 };
40 
41 struct tevent_thread_proxy {
42 	pthread_mutex_t mutex;
43 	struct tevent_context *dest_ev_ctx;
44 	int read_fd;
45 	int write_fd;
46 	struct tevent_fd *pipe_read_fde;
47 	/* Pending events list. */
48 	struct tevent_immediate_list *im_list;
49 	/* Completed events list. */
50 	struct tevent_immediate_list *tofree_im_list;
51 	struct tevent_immediate *free_im;
52 };
53 
free_im_list(struct tevent_immediate_list ** pp_list_head)54 static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 {
56 	struct tevent_immediate_list *im_entry = NULL;
57 	struct tevent_immediate_list *im_next = NULL;
58 
59 	for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60 		im_next = im_entry->next;
61 		DLIST_REMOVE(*pp_list_head, im_entry);
62 		TALLOC_FREE(im_entry);
63 	}
64 }
65 
free_list_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)66 static void free_list_handler(struct tevent_context *ev,
67 				struct tevent_immediate *im,
68 				void *private_ptr)
69 {
70 	struct tevent_thread_proxy *tp =
71 		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72 	int ret;
73 
74 	ret = pthread_mutex_lock(&tp->mutex);
75 	if (ret != 0) {
76 		abort();
77 		/* Notreached. */
78 		return;
79 	}
80 
81 	free_im_list(&tp->tofree_im_list);
82 
83 	ret = pthread_mutex_unlock(&tp->mutex);
84 	if (ret != 0) {
85 		abort();
86 		/* Notreached. */
87 		return;
88 	}
89 }
90 
schedule_immediate_functions(struct tevent_thread_proxy * tp)91 static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 {
93 	struct tevent_immediate_list *im_entry = NULL;
94 	struct tevent_immediate_list *im_next = NULL;
95 
96 	for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97 		im_next = im_entry->next;
98 		DLIST_REMOVE(tp->im_list, im_entry);
99 
100 		tevent_schedule_immediate(im_entry->im,
101 					tp->dest_ev_ctx,
102 					im_entry->handler,
103 					im_entry->private_ptr);
104 
105 		/* Move from pending list to free list. */
106 		DLIST_ADD(tp->tofree_im_list, im_entry);
107 	}
108 	if (tp->tofree_im_list != NULL) {
109 		/*
110 		 * Once the current immediate events
111 		 * are processed, we need to reschedule
112 		 * ourselves to free them. This works
113 		 * as tevent_schedule_immediate()
114 		 * always adds events to the *END* of
115 		 * the immediate events list.
116 		 */
117 		tevent_schedule_immediate(tp->free_im,
118 					tp->dest_ev_ctx,
119 					free_list_handler,
120 					tp);
121 	}
122 }
123 
pipe_read_handler(struct tevent_context * ev,struct tevent_fd * fde,uint16_t flags,void * private_ptr)124 static void pipe_read_handler(struct tevent_context *ev,
125 				struct tevent_fd *fde,
126 				uint16_t flags,
127 				void *private_ptr)
128 {
129 	struct tevent_thread_proxy *tp =
130 		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131 	ssize_t len = 64;
132 	int ret;
133 
134 	ret = pthread_mutex_lock(&tp->mutex);
135 	if (ret != 0) {
136 		abort();
137 		/* Notreached. */
138 		return;
139 	}
140 
141 	/*
142 	 * Clear out all data in the pipe. We
143 	 * don't really care if this returns -1.
144 	 */
145 	while (len == 64) {
146 		char buf[64];
147 		len = read(tp->read_fd, buf, 64);
148 	};
149 
150 	schedule_immediate_functions(tp);
151 
152 	ret = pthread_mutex_unlock(&tp->mutex);
153 	if (ret != 0) {
154 		abort();
155 		/* Notreached. */
156 		return;
157 	}
158 }
159 
tevent_thread_proxy_destructor(struct tevent_thread_proxy * tp)160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 {
162 	int ret;
163 
164 	ret = pthread_mutex_lock(&tp->mutex);
165 	if (ret != 0) {
166 		abort();
167 		/* Notreached. */
168 		return 0;
169 	}
170 
171 	TALLOC_FREE(tp->pipe_read_fde);
172 
173 	if (tp->read_fd != -1) {
174 		(void)close(tp->read_fd);
175 		tp->read_fd = -1;
176 	}
177 	if (tp->write_fd != -1) {
178 		(void)close(tp->write_fd);
179 		tp->write_fd = -1;
180 	}
181 
182 	/* Hmmm. It's probably an error if we get here with
183 	   any non-NULL immediate entries.. */
184 
185 	free_im_list(&tp->im_list);
186 	free_im_list(&tp->tofree_im_list);
187 
188 	TALLOC_FREE(tp->free_im);
189 
190 	ret = pthread_mutex_unlock(&tp->mutex);
191 	if (ret != 0) {
192 		abort();
193 		/* Notreached. */
194 		return 0;
195 	}
196 
197 	ret = pthread_mutex_destroy(&tp->mutex);
198 	if (ret != 0) {
199 		abort();
200 		/* Notreached. */
201 		return 0;
202 	}
203 
204 	return 0;
205 }
206 
207 /*
208  * Create a struct that can be passed to other threads
209  * to allow them to signal the struct tevent_context *
210  * passed in.
211  */
212 
tevent_thread_proxy_create(struct tevent_context * dest_ev_ctx)213 struct tevent_thread_proxy *tevent_thread_proxy_create(
214 		struct tevent_context *dest_ev_ctx)
215 {
216 	int ret;
217 	int pipefds[2];
218 	struct tevent_thread_proxy *tp;
219 
220 	if (dest_ev_ctx->wrapper.glue != NULL) {
221 		/*
222 		 * stacking of wrappers is not supported
223 		 */
224 		tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
225 			     TEVENT_DEBUG_FATAL,
226 			     "%s() not allowed on a wrapper context\n",
227 			     __func__);
228 		errno = EINVAL;
229 		return NULL;
230 	}
231 
232 	tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
233 	if (tp == NULL) {
234 		return NULL;
235 	}
236 
237 	ret = pthread_mutex_init(&tp->mutex, NULL);
238 	if (ret != 0) {
239 		goto fail;
240 	}
241 
242 	tp->dest_ev_ctx = dest_ev_ctx;
243 	tp->read_fd = -1;
244 	tp->write_fd = -1;
245 
246 	talloc_set_destructor(tp, tevent_thread_proxy_destructor);
247 
248 	ret = pipe(pipefds);
249 	if (ret == -1) {
250 		goto fail;
251 	}
252 
253 	tp->read_fd = pipefds[0];
254 	tp->write_fd = pipefds[1];
255 
256 	ret = ev_set_blocking(pipefds[0], false);
257 	if (ret != 0) {
258 		goto fail;
259 	}
260 	ret = ev_set_blocking(pipefds[1], false);
261 	if (ret != 0) {
262 		goto fail;
263 	}
264 	if (!ev_set_close_on_exec(pipefds[0])) {
265 		goto fail;
266 	}
267 	if (!ev_set_close_on_exec(pipefds[1])) {
268 		goto fail;
269 	}
270 
271 	tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
272 				tp,
273 				tp->read_fd,
274 				TEVENT_FD_READ,
275 				pipe_read_handler,
276 				tp);
277 	if (tp->pipe_read_fde == NULL) {
278 		goto fail;
279 	}
280 
281 	/*
282 	 * Create an immediate event to free
283 	 * completed lists.
284 	 */
285 	tp->free_im = tevent_create_immediate(tp);
286 	if (tp->free_im == NULL) {
287 		goto fail;
288 	}
289 
290 	return tp;
291 
292   fail:
293 
294 	TALLOC_FREE(tp);
295 	return NULL;
296 }
297 
298 /*
299  * This function schedules an immediate event to be called with argument
300  * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301  * wait for activation to take place, this is simply fire-and-forget.
302  *
303  * pp_im must be a pointer to an immediate event talloced on
304  * a context owned by the calling thread, or the NULL context.
305  * Ownership of *pp_im will be transfered to the tevent library.
306  *
307  * pp_private can be null, or contents of *pp_private must be
308  * talloc'ed memory on a context owned by the calling thread
309  * or the NULL context. If non-null, ownership of *pp_private will
310  * be transfered to the tevent library.
311  *
312  * If you want to return a message, have the destination use the
313  * same function call to send back to the caller.
314  */
315 
316 
tevent_thread_proxy_schedule(struct tevent_thread_proxy * tp,struct tevent_immediate ** pp_im,tevent_immediate_handler_t handler,void * pp_private_data)317 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
318 				  struct tevent_immediate **pp_im,
319 				  tevent_immediate_handler_t handler,
320 				  void *pp_private_data)
321 {
322 	struct tevent_immediate_list *im_entry;
323 	int ret;
324 	char c;
325 	ssize_t written;
326 
327 	ret = pthread_mutex_lock(&tp->mutex);
328 	if (ret != 0) {
329 		abort();
330 		/* Notreached. */
331 		return;
332 	}
333 
334 	if (tp->write_fd == -1) {
335 		/* In the process of being destroyed. Ignore. */
336 		goto end;
337 	}
338 
339 	/* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340 	im_entry = talloc_zero(NULL, struct tevent_immediate_list);
341 	if (im_entry == NULL) {
342 		goto end;
343 	}
344 
345 	im_entry->handler = handler;
346 	im_entry->im = talloc_move(im_entry, pp_im);
347 
348 	if (pp_private_data != NULL) {
349 		void **pptr = (void **)pp_private_data;
350 		im_entry->private_ptr = talloc_move(im_entry, pptr);
351 	}
352 
353 	DLIST_ADD(tp->im_list, im_entry);
354 
355 	/* And notify the dest_ev_ctx to wake up. */
356 	c = '\0';
357 	do {
358 		written = write(tp->write_fd, &c, 1);
359 	} while (written == -1 && errno == EINTR);
360 
361   end:
362 
363 	ret = pthread_mutex_unlock(&tp->mutex);
364 	if (ret != 0) {
365 		abort();
366 		/* Notreached. */
367 	}
368 }
369 #else
370 /* !HAVE_PTHREAD */
tevent_thread_proxy_create(struct tevent_context * dest_ev_ctx)371 struct tevent_thread_proxy *tevent_thread_proxy_create(
372 		struct tevent_context *dest_ev_ctx)
373 {
374 	errno = ENOSYS;
375 	return NULL;
376 }
377 
tevent_thread_proxy_schedule(struct tevent_thread_proxy * tp,struct tevent_immediate ** pp_im,tevent_immediate_handler_t handler,void * pp_private_data)378 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
379 				  struct tevent_immediate **pp_im,
380 				  tevent_immediate_handler_t handler,
381 				  void *pp_private_data)
382 {
383 	;
384 }
385 #endif
386 
tevent_threaded_context_destructor(struct tevent_threaded_context * tctx)387 static int tevent_threaded_context_destructor(
388 	struct tevent_threaded_context *tctx)
389 {
390 	struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
391 	int ret;
392 
393 	if (main_ev != NULL) {
394 		DLIST_REMOVE(main_ev->threaded_contexts, tctx);
395 	}
396 
397 	/*
398 	 * We have to coordinate with _tevent_threaded_schedule_immediate's
399 	 * unlock of the event_ctx_mutex. We're in the main thread here,
400 	 * and we can be scheduled before the helper thread finalizes its
401 	 * call _tevent_threaded_schedule_immediate. This means we would
402 	 * pthreadpool_destroy a locked mutex, which is illegal.
403 	 */
404 	ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
405 	if (ret != 0) {
406 		abort();
407 	}
408 
409 	ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
410 	if (ret != 0) {
411 		abort();
412 	}
413 
414 	ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
415 	if (ret != 0) {
416 		abort();
417 	}
418 
419 	return 0;
420 }
421 
tevent_threaded_context_create(TALLOC_CTX * mem_ctx,struct tevent_context * ev)422 struct tevent_threaded_context *tevent_threaded_context_create(
423 	TALLOC_CTX *mem_ctx, struct tevent_context *ev)
424 {
425 #ifdef HAVE_PTHREAD
426 	struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
427 	struct tevent_threaded_context *tctx;
428 	int ret;
429 
430 	ret = tevent_common_wakeup_init(main_ev);
431 	if (ret != 0) {
432 		errno = ret;
433 		return NULL;
434 	}
435 
436 	tctx = talloc(mem_ctx, struct tevent_threaded_context);
437 	if (tctx == NULL) {
438 		return NULL;
439 	}
440 	tctx->event_ctx = ev;
441 
442 	ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
443 	if (ret != 0) {
444 		TALLOC_FREE(tctx);
445 		return NULL;
446 	}
447 
448 	DLIST_ADD(main_ev->threaded_contexts, tctx);
449 	talloc_set_destructor(tctx, tevent_threaded_context_destructor);
450 
451 	return tctx;
452 #else
453 	errno = ENOSYS;
454 	return NULL;
455 #endif
456 }
457 
tevent_threaded_schedule_immediate_destructor(struct tevent_immediate * im)458 static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
459 {
460 	if (im->event_ctx != NULL) {
461 		abort();
462 	}
463 	return 0;
464 }
465 
_tevent_threaded_schedule_immediate(struct tevent_threaded_context * tctx,struct tevent_immediate * im,tevent_immediate_handler_t handler,void * private_data,const char * handler_name,const char * location)466 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
467 					 struct tevent_immediate *im,
468 					 tevent_immediate_handler_t handler,
469 					 void *private_data,
470 					 const char *handler_name,
471 					 const char *location)
472 {
473 #ifdef HAVE_PTHREAD
474 	const char *create_location = im->create_location;
475 	struct tevent_context *main_ev = NULL;
476 	struct tevent_wrapper_glue *glue = NULL;
477 	int ret, wakeup_fd;
478 
479 	ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
480 	if (ret != 0) {
481 		abort();
482 	}
483 
484 	if (tctx->event_ctx == NULL) {
485 		/*
486 		 * Our event context is already gone.
487 		 */
488 		ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
489 		if (ret != 0) {
490 			abort();
491 		}
492 		return;
493 	}
494 
495 	glue = tctx->event_ctx->wrapper.glue;
496 
497 	if ((im->event_ctx != NULL) || (handler == NULL)) {
498 		abort();
499 	}
500 	if (im->destroyed) {
501 		abort();
502 	}
503 	if (im->busy) {
504 		abort();
505 	}
506 
507 	main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
508 
509 	*im = (struct tevent_immediate) {
510 		.event_ctx		= tctx->event_ctx,
511 		.wrapper		= glue,
512 		.handler		= handler,
513 		.private_data		= private_data,
514 		.handler_name		= handler_name,
515 		.create_location	= create_location,
516 		.schedule_location	= location,
517 	};
518 
519 	/*
520 	 * Make sure the event won't be destroyed while
521 	 * it's part of the ev->scheduled_immediates list.
522 	 * _tevent_schedule_immediate() will reset the destructor
523 	 * in tevent_common_threaded_activate_immediate().
524 	 */
525 	talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
526 
527 	ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
528 	if (ret != 0) {
529 		abort();
530 	}
531 
532 	DLIST_ADD_END(main_ev->scheduled_immediates, im);
533 	wakeup_fd = main_ev->wakeup_fd;
534 
535 	ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
536 	if (ret != 0) {
537 		abort();
538 	}
539 
540 	ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
541 	if (ret != 0) {
542 		abort();
543 	}
544 
545 	/*
546 	 * We might want to wake up the main thread under the lock. We
547 	 * had a slightly similar situation in pthreadpool, changed
548 	 * with 1c4284c7395f23. This is not exactly the same, as the
549 	 * wakeup is only a last-resort thing in case the main thread
550 	 * is sleeping. Doing the wakeup under the lock can easily
551 	 * lead to a contended mutex, which is much more expensive
552 	 * than a noncontended one. So I'd opt for the lower footprint
553 	 * initially. Maybe we have to change that later.
554 	 */
555 	tevent_common_wakeup_fd(wakeup_fd);
556 #else
557 	/*
558 	 * tevent_threaded_context_create() returned NULL with ENOSYS...
559 	 */
560 	abort();
561 #endif
562 }
563 
tevent_common_threaded_activate_immediate(struct tevent_context * ev)564 void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
565 {
566 #ifdef HAVE_PTHREAD
567 	int ret;
568 	ret = pthread_mutex_lock(&ev->scheduled_mutex);
569 	if (ret != 0) {
570 		abort();
571 	}
572 
573 	while (ev->scheduled_immediates != NULL) {
574 		struct tevent_immediate *im = ev->scheduled_immediates;
575 		struct tevent_immediate copy = *im;
576 
577 		DLIST_REMOVE(ev->scheduled_immediates, im);
578 
579 		tevent_debug(ev, TEVENT_DEBUG_TRACE,
580 			     "Schedule immediate event \"%s\": %p from thread into main\n",
581 			     im->handler_name, im);
582 		im->handler_name = NULL;
583 		_tevent_schedule_immediate(im,
584 					   ev,
585 					   copy.handler,
586 					   copy.private_data,
587 					   copy.handler_name,
588 					   copy.schedule_location);
589 	}
590 
591 	ret = pthread_mutex_unlock(&ev->scheduled_mutex);
592 	if (ret != 0) {
593 		abort();
594 	}
595 #else
596 	/*
597 	 * tevent_threaded_context_create() returned NULL with ENOSYS...
598 	 */
599 	abort();
600 #endif
601 }
602