1 /*
2 tevent event library.
3
4 Copyright (C) Jeremy Allison 2015
5
6 ** NOTE! The following LGPL license applies to the tevent
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
9
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
14
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
19
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30
31 #ifdef HAVE_PTHREAD
32 #include "system/threads.h"
33
34 struct tevent_immediate_list {
35 struct tevent_immediate_list *next, *prev;
36 tevent_immediate_handler_t handler;
37 struct tevent_immediate *im;
38 void *private_ptr;
39 };
40
41 struct tevent_thread_proxy {
42 pthread_mutex_t mutex;
43 struct tevent_context *dest_ev_ctx;
44 int read_fd;
45 int write_fd;
46 struct tevent_fd *pipe_read_fde;
47 /* Pending events list. */
48 struct tevent_immediate_list *im_list;
49 /* Completed events list. */
50 struct tevent_immediate_list *tofree_im_list;
51 struct tevent_immediate *free_im;
52 };
53
free_im_list(struct tevent_immediate_list ** pp_list_head)54 static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 {
56 struct tevent_immediate_list *im_entry = NULL;
57 struct tevent_immediate_list *im_next = NULL;
58
59 for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60 im_next = im_entry->next;
61 DLIST_REMOVE(*pp_list_head, im_entry);
62 TALLOC_FREE(im_entry);
63 }
64 }
65
free_list_handler(struct tevent_context * ev,struct tevent_immediate * im,void * private_ptr)66 static void free_list_handler(struct tevent_context *ev,
67 struct tevent_immediate *im,
68 void *private_ptr)
69 {
70 struct tevent_thread_proxy *tp =
71 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72 int ret;
73
74 ret = pthread_mutex_lock(&tp->mutex);
75 if (ret != 0) {
76 abort();
77 /* Notreached. */
78 return;
79 }
80
81 free_im_list(&tp->tofree_im_list);
82
83 ret = pthread_mutex_unlock(&tp->mutex);
84 if (ret != 0) {
85 abort();
86 /* Notreached. */
87 return;
88 }
89 }
90
schedule_immediate_functions(struct tevent_thread_proxy * tp)91 static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 {
93 struct tevent_immediate_list *im_entry = NULL;
94 struct tevent_immediate_list *im_next = NULL;
95
96 for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97 im_next = im_entry->next;
98 DLIST_REMOVE(tp->im_list, im_entry);
99
100 tevent_schedule_immediate(im_entry->im,
101 tp->dest_ev_ctx,
102 im_entry->handler,
103 im_entry->private_ptr);
104
105 /* Move from pending list to free list. */
106 DLIST_ADD(tp->tofree_im_list, im_entry);
107 }
108 if (tp->tofree_im_list != NULL) {
109 /*
110 * Once the current immediate events
111 * are processed, we need to reschedule
112 * ourselves to free them. This works
113 * as tevent_schedule_immediate()
114 * always adds events to the *END* of
115 * the immediate events list.
116 */
117 tevent_schedule_immediate(tp->free_im,
118 tp->dest_ev_ctx,
119 free_list_handler,
120 tp);
121 }
122 }
123
pipe_read_handler(struct tevent_context * ev,struct tevent_fd * fde,uint16_t flags,void * private_ptr)124 static void pipe_read_handler(struct tevent_context *ev,
125 struct tevent_fd *fde,
126 uint16_t flags,
127 void *private_ptr)
128 {
129 struct tevent_thread_proxy *tp =
130 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131 ssize_t len = 64;
132 int ret;
133
134 ret = pthread_mutex_lock(&tp->mutex);
135 if (ret != 0) {
136 abort();
137 /* Notreached. */
138 return;
139 }
140
141 /*
142 * Clear out all data in the pipe. We
143 * don't really care if this returns -1.
144 */
145 while (len == 64) {
146 char buf[64];
147 len = read(tp->read_fd, buf, 64);
148 };
149
150 schedule_immediate_functions(tp);
151
152 ret = pthread_mutex_unlock(&tp->mutex);
153 if (ret != 0) {
154 abort();
155 /* Notreached. */
156 return;
157 }
158 }
159
tevent_thread_proxy_destructor(struct tevent_thread_proxy * tp)160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 {
162 int ret;
163
164 ret = pthread_mutex_lock(&tp->mutex);
165 if (ret != 0) {
166 abort();
167 /* Notreached. */
168 return 0;
169 }
170
171 TALLOC_FREE(tp->pipe_read_fde);
172
173 if (tp->read_fd != -1) {
174 (void)close(tp->read_fd);
175 tp->read_fd = -1;
176 }
177 if (tp->write_fd != -1) {
178 (void)close(tp->write_fd);
179 tp->write_fd = -1;
180 }
181
182 /* Hmmm. It's probably an error if we get here with
183 any non-NULL immediate entries.. */
184
185 free_im_list(&tp->im_list);
186 free_im_list(&tp->tofree_im_list);
187
188 TALLOC_FREE(tp->free_im);
189
190 ret = pthread_mutex_unlock(&tp->mutex);
191 if (ret != 0) {
192 abort();
193 /* Notreached. */
194 return 0;
195 }
196
197 ret = pthread_mutex_destroy(&tp->mutex);
198 if (ret != 0) {
199 abort();
200 /* Notreached. */
201 return 0;
202 }
203
204 return 0;
205 }
206
207 /*
208 * Create a struct that can be passed to other threads
209 * to allow them to signal the struct tevent_context *
210 * passed in.
211 */
212
tevent_thread_proxy_create(struct tevent_context * dest_ev_ctx)213 struct tevent_thread_proxy *tevent_thread_proxy_create(
214 struct tevent_context *dest_ev_ctx)
215 {
216 int ret;
217 int pipefds[2];
218 struct tevent_thread_proxy *tp;
219
220 if (dest_ev_ctx->wrapper.glue != NULL) {
221 /*
222 * stacking of wrappers is not supported
223 */
224 tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
225 TEVENT_DEBUG_FATAL,
226 "%s() not allowed on a wrapper context\n",
227 __func__);
228 errno = EINVAL;
229 return NULL;
230 }
231
232 tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
233 if (tp == NULL) {
234 return NULL;
235 }
236
237 ret = pthread_mutex_init(&tp->mutex, NULL);
238 if (ret != 0) {
239 goto fail;
240 }
241
242 tp->dest_ev_ctx = dest_ev_ctx;
243 tp->read_fd = -1;
244 tp->write_fd = -1;
245
246 talloc_set_destructor(tp, tevent_thread_proxy_destructor);
247
248 ret = pipe(pipefds);
249 if (ret == -1) {
250 goto fail;
251 }
252
253 tp->read_fd = pipefds[0];
254 tp->write_fd = pipefds[1];
255
256 ret = ev_set_blocking(pipefds[0], false);
257 if (ret != 0) {
258 goto fail;
259 }
260 ret = ev_set_blocking(pipefds[1], false);
261 if (ret != 0) {
262 goto fail;
263 }
264 if (!ev_set_close_on_exec(pipefds[0])) {
265 goto fail;
266 }
267 if (!ev_set_close_on_exec(pipefds[1])) {
268 goto fail;
269 }
270
271 tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
272 tp,
273 tp->read_fd,
274 TEVENT_FD_READ,
275 pipe_read_handler,
276 tp);
277 if (tp->pipe_read_fde == NULL) {
278 goto fail;
279 }
280
281 /*
282 * Create an immediate event to free
283 * completed lists.
284 */
285 tp->free_im = tevent_create_immediate(tp);
286 if (tp->free_im == NULL) {
287 goto fail;
288 }
289
290 return tp;
291
292 fail:
293
294 TALLOC_FREE(tp);
295 return NULL;
296 }
297
298 /*
299 * This function schedules an immediate event to be called with argument
300 * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301 * wait for activation to take place, this is simply fire-and-forget.
302 *
303 * pp_im must be a pointer to an immediate event talloced on
304 * a context owned by the calling thread, or the NULL context.
305 * Ownership of *pp_im will be transfered to the tevent library.
306 *
307 * pp_private can be null, or contents of *pp_private must be
308 * talloc'ed memory on a context owned by the calling thread
309 * or the NULL context. If non-null, ownership of *pp_private will
310 * be transfered to the tevent library.
311 *
312 * If you want to return a message, have the destination use the
313 * same function call to send back to the caller.
314 */
315
316
tevent_thread_proxy_schedule(struct tevent_thread_proxy * tp,struct tevent_immediate ** pp_im,tevent_immediate_handler_t handler,void * pp_private_data)317 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
318 struct tevent_immediate **pp_im,
319 tevent_immediate_handler_t handler,
320 void *pp_private_data)
321 {
322 struct tevent_immediate_list *im_entry;
323 int ret;
324 char c;
325 ssize_t written;
326
327 ret = pthread_mutex_lock(&tp->mutex);
328 if (ret != 0) {
329 abort();
330 /* Notreached. */
331 return;
332 }
333
334 if (tp->write_fd == -1) {
335 /* In the process of being destroyed. Ignore. */
336 goto end;
337 }
338
339 /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340 im_entry = talloc_zero(NULL, struct tevent_immediate_list);
341 if (im_entry == NULL) {
342 goto end;
343 }
344
345 im_entry->handler = handler;
346 im_entry->im = talloc_move(im_entry, pp_im);
347
348 if (pp_private_data != NULL) {
349 void **pptr = (void **)pp_private_data;
350 im_entry->private_ptr = talloc_move(im_entry, pptr);
351 }
352
353 DLIST_ADD(tp->im_list, im_entry);
354
355 /* And notify the dest_ev_ctx to wake up. */
356 c = '\0';
357 do {
358 written = write(tp->write_fd, &c, 1);
359 } while (written == -1 && errno == EINTR);
360
361 end:
362
363 ret = pthread_mutex_unlock(&tp->mutex);
364 if (ret != 0) {
365 abort();
366 /* Notreached. */
367 }
368 }
369 #else
370 /* !HAVE_PTHREAD */
tevent_thread_proxy_create(struct tevent_context * dest_ev_ctx)371 struct tevent_thread_proxy *tevent_thread_proxy_create(
372 struct tevent_context *dest_ev_ctx)
373 {
374 errno = ENOSYS;
375 return NULL;
376 }
377
tevent_thread_proxy_schedule(struct tevent_thread_proxy * tp,struct tevent_immediate ** pp_im,tevent_immediate_handler_t handler,void * pp_private_data)378 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
379 struct tevent_immediate **pp_im,
380 tevent_immediate_handler_t handler,
381 void *pp_private_data)
382 {
383 ;
384 }
385 #endif
386
tevent_threaded_context_destructor(struct tevent_threaded_context * tctx)387 static int tevent_threaded_context_destructor(
388 struct tevent_threaded_context *tctx)
389 {
390 struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
391 int ret;
392
393 if (main_ev != NULL) {
394 DLIST_REMOVE(main_ev->threaded_contexts, tctx);
395 }
396
397 /*
398 * We have to coordinate with _tevent_threaded_schedule_immediate's
399 * unlock of the event_ctx_mutex. We're in the main thread here,
400 * and we can be scheduled before the helper thread finalizes its
401 * call _tevent_threaded_schedule_immediate. This means we would
402 * pthreadpool_destroy a locked mutex, which is illegal.
403 */
404 ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
405 if (ret != 0) {
406 abort();
407 }
408
409 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
410 if (ret != 0) {
411 abort();
412 }
413
414 ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
415 if (ret != 0) {
416 abort();
417 }
418
419 return 0;
420 }
421
tevent_threaded_context_create(TALLOC_CTX * mem_ctx,struct tevent_context * ev)422 struct tevent_threaded_context *tevent_threaded_context_create(
423 TALLOC_CTX *mem_ctx, struct tevent_context *ev)
424 {
425 #ifdef HAVE_PTHREAD
426 struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
427 struct tevent_threaded_context *tctx;
428 int ret;
429
430 ret = tevent_common_wakeup_init(main_ev);
431 if (ret != 0) {
432 errno = ret;
433 return NULL;
434 }
435
436 tctx = talloc(mem_ctx, struct tevent_threaded_context);
437 if (tctx == NULL) {
438 return NULL;
439 }
440 tctx->event_ctx = ev;
441
442 ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
443 if (ret != 0) {
444 TALLOC_FREE(tctx);
445 return NULL;
446 }
447
448 DLIST_ADD(main_ev->threaded_contexts, tctx);
449 talloc_set_destructor(tctx, tevent_threaded_context_destructor);
450
451 return tctx;
452 #else
453 errno = ENOSYS;
454 return NULL;
455 #endif
456 }
457
tevent_threaded_schedule_immediate_destructor(struct tevent_immediate * im)458 static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
459 {
460 if (im->event_ctx != NULL) {
461 abort();
462 }
463 return 0;
464 }
465
_tevent_threaded_schedule_immediate(struct tevent_threaded_context * tctx,struct tevent_immediate * im,tevent_immediate_handler_t handler,void * private_data,const char * handler_name,const char * location)466 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
467 struct tevent_immediate *im,
468 tevent_immediate_handler_t handler,
469 void *private_data,
470 const char *handler_name,
471 const char *location)
472 {
473 #ifdef HAVE_PTHREAD
474 const char *create_location = im->create_location;
475 struct tevent_context *main_ev = NULL;
476 struct tevent_wrapper_glue *glue = NULL;
477 int ret, wakeup_fd;
478
479 ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
480 if (ret != 0) {
481 abort();
482 }
483
484 if (tctx->event_ctx == NULL) {
485 /*
486 * Our event context is already gone.
487 */
488 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
489 if (ret != 0) {
490 abort();
491 }
492 return;
493 }
494
495 glue = tctx->event_ctx->wrapper.glue;
496
497 if ((im->event_ctx != NULL) || (handler == NULL)) {
498 abort();
499 }
500 if (im->destroyed) {
501 abort();
502 }
503 if (im->busy) {
504 abort();
505 }
506
507 main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
508
509 *im = (struct tevent_immediate) {
510 .event_ctx = tctx->event_ctx,
511 .wrapper = glue,
512 .handler = handler,
513 .private_data = private_data,
514 .handler_name = handler_name,
515 .create_location = create_location,
516 .schedule_location = location,
517 };
518
519 /*
520 * Make sure the event won't be destroyed while
521 * it's part of the ev->scheduled_immediates list.
522 * _tevent_schedule_immediate() will reset the destructor
523 * in tevent_common_threaded_activate_immediate().
524 */
525 talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
526
527 ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
528 if (ret != 0) {
529 abort();
530 }
531
532 DLIST_ADD_END(main_ev->scheduled_immediates, im);
533 wakeup_fd = main_ev->wakeup_fd;
534
535 ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
536 if (ret != 0) {
537 abort();
538 }
539
540 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
541 if (ret != 0) {
542 abort();
543 }
544
545 /*
546 * We might want to wake up the main thread under the lock. We
547 * had a slightly similar situation in pthreadpool, changed
548 * with 1c4284c7395f23. This is not exactly the same, as the
549 * wakeup is only a last-resort thing in case the main thread
550 * is sleeping. Doing the wakeup under the lock can easily
551 * lead to a contended mutex, which is much more expensive
552 * than a noncontended one. So I'd opt for the lower footprint
553 * initially. Maybe we have to change that later.
554 */
555 tevent_common_wakeup_fd(wakeup_fd);
556 #else
557 /*
558 * tevent_threaded_context_create() returned NULL with ENOSYS...
559 */
560 abort();
561 #endif
562 }
563
tevent_common_threaded_activate_immediate(struct tevent_context * ev)564 void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
565 {
566 #ifdef HAVE_PTHREAD
567 int ret;
568 ret = pthread_mutex_lock(&ev->scheduled_mutex);
569 if (ret != 0) {
570 abort();
571 }
572
573 while (ev->scheduled_immediates != NULL) {
574 struct tevent_immediate *im = ev->scheduled_immediates;
575 struct tevent_immediate copy = *im;
576
577 DLIST_REMOVE(ev->scheduled_immediates, im);
578
579 tevent_debug(ev, TEVENT_DEBUG_TRACE,
580 "Schedule immediate event \"%s\": %p from thread into main\n",
581 im->handler_name, im);
582 im->handler_name = NULL;
583 _tevent_schedule_immediate(im,
584 ev,
585 copy.handler,
586 copy.private_data,
587 copy.handler_name,
588 copy.schedule_location);
589 }
590
591 ret = pthread_mutex_unlock(&ev->scheduled_mutex);
592 if (ret != 0) {
593 abort();
594 }
595 #else
596 /*
597 * tevent_threaded_context_create() returned NULL with ENOSYS...
598 */
599 abort();
600 #endif
601 }
602