1 /*
2  * Copyright (C) 2010 Red Hat, Inc.
3  *
4  * Author: Angus Salkeld <asalkeld@redhat.com>
5  *
6  * This file is part of libqb.
7  *
8  * libqb is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation, either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * libqb is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20  */
21 #include "os_base.h"
22 
23 /* due to MinGW/splint emitting "< Location unknown >: Previous use of" */
24 #if defined(HAVE_SYS_RESOURCE_H) && !defined(S_SPLINT_S)
25 #include <sys/resource.h>
26 #endif
27 
28 #include <signal.h>
29 
30 #if defined(__DARWIN_NSIG)
31 #define QB_MAX_NUM_SIGNALS __DARWIN_NSIG
32 #else
33   #if defined(NSIG)
34   #define QB_MAX_NUM_SIGNALS NSIG - 1
35   #else
36   #define QB_MAX_NUM_SIGNALS 31
37   #endif
38 #endif
39 
40 #include "loop_poll_int.h"
41 
42 /*
43  * Define this to log slow (>10ms) jobs.
44  */
45 #undef DEBUG_DISPATCH_TIME
46 
47 /* logs, std(in|out|err), pipe */
48 #define POLL_FDS_USED_MISC 50
49 
50 #ifdef HAVE_EPOLL
51 #define USE_EPOLL 1
52 #else
53  #ifdef HAVE_KQUEUE
54  #define USE_KQUEUE 1
55  #else
56  #define USE_POLL 1
57  #endif /* HAVE_KQUEUE */
58 #endif /* HAVE_EPOLL */
59 
60 static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l,
61 				       struct qb_poll_entry *pe);
62 
63 static void
_poll_entry_check_generate_(struct qb_poll_entry * pe)64 _poll_entry_check_generate_(struct qb_poll_entry *pe)
65 {
66 	int32_t i;
67 
68 	for (i = 0; i < 200; i++) {
69 		pe->check = random();
70 
71 		if (pe->check != 0 && pe->check != UINT32_MAX) {
72 			break;
73 		}
74 	}
75 }
76 
77 static void
_poll_entry_mark_deleted_(struct qb_poll_entry * pe)78 _poll_entry_mark_deleted_(struct qb_poll_entry *pe)
79 {
80 	pe->ufd.fd = -1;
81 	pe->state = QB_POLL_ENTRY_DELETED;
82 	pe->check = 0;
83 }
84 
85 static void
_poll_entry_empty_(struct qb_poll_entry * pe)86 _poll_entry_empty_(struct qb_poll_entry *pe)
87 {
88 	memset(pe, 0, sizeof(struct qb_poll_entry));
89 	pe->ufd.fd = -1;
90 }
91 
92 static void
_poll_dispatch_and_take_back_(struct qb_loop_item * item,enum qb_loop_priority p)93 _poll_dispatch_and_take_back_(struct qb_loop_item *item,
94 			      enum qb_loop_priority p)
95 {
96 	struct qb_poll_entry *pe = (struct qb_poll_entry *)item;
97 	int32_t res;
98 #ifdef DEBUG_DISPATCH_TIME
99 	uint64_t start;
100 	uint64_t stop;
101 	int32_t log_warn = QB_FALSE;
102 
103 	start = qb_util_nano_current_get();
104 #endif /* DEBUG_DISPATCH_TIME */
105 
106 	assert(pe->state == QB_POLL_ENTRY_JOBLIST);
107 	assert(pe->item.type == QB_LOOP_FD);
108 
109 	res = pe->poll_dispatch_fn(pe->ufd.fd,
110 				   pe->ufd.revents,
111 				   pe->item.user_data);
112 	if (res < 0) {
113 		_poll_entry_mark_deleted_(pe);
114 	} else if (pe->state != QB_POLL_ENTRY_DELETED) {
115 		pe->state = QB_POLL_ENTRY_ACTIVE;
116 		pe->ufd.revents = 0;
117 	}
118 #ifdef DEBUG_DISPATCH_TIME
119 	if (pe->state == QB_POLL_ENTRY_ACTIVE) {
120 		pe->runs++;
121 		if ((pe->runs % 50) == 0) {
122 			log_warn = QB_TRUE;
123 		}
124 		stop = qb_util_nano_current_get();
125 		if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) {
126 			log_warn = QB_TRUE;
127 		}
128 
129 		if (log_warn && pe->item.type == QB_LOOP_FD) {
130 			qb_util_log(LOG_INFO,
131 				    "[fd:%d] dispatch:%p runs:%d duration:%d ms",
132 				    pe->ufd.fd, pe->poll_dispatch_fn,
133 				    pe->runs,
134 				    (int32_t) ((stop -
135 						start) / QB_TIME_NS_IN_MSEC));
136 		}
137 	}
138 #endif /* DEBUG_DISPATCH_TIME */
139 }
140 
141 void
qb_poll_fds_usage_check_(struct qb_poll_source * s)142 qb_poll_fds_usage_check_(struct qb_poll_source *s)
143 {
144 	struct rlimit lim;
145 	static int32_t socks_limit = 0;
146 	int32_t send_event = QB_FALSE;
147 	int32_t socks_used = 0;
148 	int32_t socks_avail = 0;
149 	struct qb_poll_entry *pe;
150 	int32_t i;
151 
152 	if (socks_limit == 0) {
153 		if (getrlimit(RLIMIT_NOFILE, &lim) == -1) {
154 			qb_util_perror(LOG_WARNING, "getrlimit");
155 			return;
156 		}
157 		socks_limit = lim.rlim_cur;
158 		socks_limit -= POLL_FDS_USED_MISC;
159 		if (socks_limit < 0) {
160 			socks_limit = 0;
161 		}
162 	}
163 
164 	for (i = 0; i < s->poll_entry_count; i++) {
165 		assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
166 		if ((pe->state == QB_POLL_ENTRY_ACTIVE ||
167 		     pe->state == QB_POLL_ENTRY_JOBLIST) && pe->ufd.fd != -1) {
168 			socks_used++;
169 		}
170 		if (pe->state == QB_POLL_ENTRY_DELETED) {
171 			_poll_entry_empty_(pe);
172 		}
173 	}
174 
175 	socks_avail = socks_limit - socks_used;
176 	if (socks_avail < 0) {
177 		socks_avail = 0;
178 	}
179 	send_event = QB_FALSE;
180 	if (s->not_enough_fds) {
181 		if (socks_avail > 2) {
182 			s->not_enough_fds = QB_FALSE;
183 			send_event = QB_TRUE;
184 		}
185 	} else {
186 		if (socks_avail <= 1) {
187 			s->not_enough_fds = QB_TRUE;
188 			send_event = QB_TRUE;
189 		}
190 	}
191 	if (send_event && s->low_fds_event_fn) {
192 		s->low_fds_event_fn(s->not_enough_fds, socks_avail);
193 	}
194 }
195 
196 
197 struct qb_loop_source *
qb_loop_poll_create(struct qb_loop * l)198 qb_loop_poll_create(struct qb_loop *l)
199 {
200 	struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source));
201 	if (s == NULL) {
202 		return NULL;
203 	}
204 	s->s.l = l;
205 	s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_;
206 
207 	s->poll_entries = qb_array_create_2(16, sizeof(struct qb_poll_entry), 16);
208 	s->poll_entry_count = 0;
209 	s->low_fds_event_fn = NULL;
210 	s->not_enough_fds = QB_FALSE;
211 
212 #ifdef USE_EPOLL
213 	(void)qb_epoll_init(s);
214 #endif
215 #ifdef USE_KQUEUE
216 	(void)qb_kqueue_init(s);
217 #endif
218 #ifdef USE_POLL
219 	(void)qb_poll_init(s);
220 #endif /* USE_POLL */
221 
222 	return (struct qb_loop_source *)s;
223 }
224 
225 void
qb_loop_poll_destroy(struct qb_loop * l)226 qb_loop_poll_destroy(struct qb_loop *l)
227 {
228 	struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source;
229 	qb_array_free(s->poll_entries);
230 
231 	s->driver.fini(s);
232 
233 	free(s);
234 }
235 
236 int32_t
qb_loop_poll_low_fds_event_set(struct qb_loop * l,qb_loop_poll_low_fds_event_fn fn)237 qb_loop_poll_low_fds_event_set(struct qb_loop *l,
238 			       qb_loop_poll_low_fds_event_fn fn)
239 {
240 	struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source;
241 	s->low_fds_event_fn = fn;
242 
243 	return 0;
244 }
245 
246 static int32_t
_get_empty_array_position_(struct qb_poll_source * s)247 _get_empty_array_position_(struct qb_poll_source *s)
248 {
249 	int32_t found = QB_FALSE;
250 	uint32_t install_pos;
251 	int32_t res = 0;
252 	struct qb_poll_entry *pe;
253 
254 	for (install_pos = 0;
255 	     install_pos < s->poll_entry_count; install_pos++) {
256 		assert(qb_array_index
257 		       (s->poll_entries, install_pos, (void **)&pe) == 0);
258 		if (pe->state == QB_POLL_ENTRY_EMPTY) {
259 			found = QB_TRUE;
260 			break;
261 		}
262 	}
263 
264 	if (found == QB_FALSE) {
265 #ifdef USE_POLL
266 		struct pollfd *ufds;
267 		int32_t new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd);
268 		ufds = realloc(s->ufds, new_size);
269 		if (ufds == NULL) {
270 			return -ENOMEM;
271 		}
272 		s->ufds = ufds;
273 #endif /* USE_POLL */
274 		/*
275 		 * Grow pollfd list
276 		 */
277 		res = qb_array_grow(s->poll_entries, s->poll_entry_count + 1);
278 		if (res != 0) {
279 			return res;
280 		}
281 
282 		s->poll_entry_count += 1;
283 		install_pos = s->poll_entry_count - 1;
284 	}
285 	return install_pos;
286 }
287 
288 static int32_t
_poll_add_(struct qb_loop * l,enum qb_loop_priority p,int32_t fd,int32_t events,void * data,struct qb_poll_entry ** pe_pt)289 _poll_add_(struct qb_loop *l,
290 	   enum qb_loop_priority p,
291 	   int32_t fd, int32_t events, void *data, struct qb_poll_entry **pe_pt)
292 {
293 	struct qb_poll_entry *pe;
294 	uint32_t install_pos;
295 	int32_t res = 0;
296 	struct qb_poll_source *s;
297 
298 	if (l == NULL) {
299 		return -EINVAL;
300 	}
301 
302 	s = (struct qb_poll_source *)l->fd_source;
303 
304 	install_pos = _get_empty_array_position_(s);
305 
306 	assert(qb_array_index(s->poll_entries, install_pos, (void **)&pe) == 0);
307 	pe->state = QB_POLL_ENTRY_ACTIVE;
308 	pe->install_pos = install_pos;
309 	_poll_entry_check_generate_(pe);
310 	pe->ufd.fd = fd;
311 	pe->ufd.events = events;
312 	pe->ufd.revents = 0;
313 	pe->item.user_data = data;
314 	pe->item.source = (struct qb_loop_source *)l->fd_source;
315 	pe->p = p;
316 	pe->runs = 0;
317 	res = s->driver.add(s, pe, fd, events);
318 	if (res == 0) {
319 		*pe_pt = pe;
320 		return 0;
321 	} else {
322 		pe->state = QB_POLL_ENTRY_EMPTY;
323 		return res;
324 	}
325 }
326 
327 static int32_t
_qb_poll_add_to_jobs_(struct qb_loop * l,struct qb_poll_entry * pe)328 _qb_poll_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe)
329 {
330 	assert(pe->item.type == QB_LOOP_FD);
331 	qb_loop_level_item_add(&l->level[pe->p], &pe->item);
332 	pe->state = QB_POLL_ENTRY_JOBLIST;
333 	return 1;
334 }
335 
336 int32_t
qb_loop_poll_add(struct qb_loop * lp,enum qb_loop_priority p,int32_t fd,int32_t events,void * data,qb_loop_poll_dispatch_fn dispatch_fn)337 qb_loop_poll_add(struct qb_loop * lp,
338 		 enum qb_loop_priority p,
339 		 int32_t fd,
340 		 int32_t events,
341 		 void *data, qb_loop_poll_dispatch_fn dispatch_fn)
342 {
343 	struct qb_poll_entry *pe = NULL;
344 	int32_t size;
345 	int32_t new_size;
346 	int32_t res;
347 	struct qb_loop *l = lp;
348 
349 	if (l == NULL) {
350 		l = qb_loop_default_get();
351 	}
352 
353 	size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
354 	res = _poll_add_(l, p, fd, events, data, &pe);
355 	if (res != 0) {
356 		qb_util_perror(LOG_ERR,
357 			       "couldn't add poll entryfor FD %d", fd);
358 		return res;
359 	}
360 	new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
361 
362 	pe->poll_dispatch_fn = dispatch_fn;
363 	pe->item.type = QB_LOOP_FD;
364 	pe->add_to_jobs = _qb_poll_add_to_jobs_;
365 
366 	if (new_size > size) {
367 		qb_util_log(LOG_TRACE,
368 			    "grown poll array to %d for FD %d", new_size, fd);
369 	}
370 
371 	return res;
372 }
373 
374 int32_t
qb_loop_poll_mod(struct qb_loop * lp,enum qb_loop_priority p,int32_t fd,int32_t events,void * data,qb_loop_poll_dispatch_fn dispatch_fn)375 qb_loop_poll_mod(struct qb_loop * lp,
376 		 enum qb_loop_priority p,
377 		 int32_t fd,
378 		 int32_t events,
379 		 void *data, qb_loop_poll_dispatch_fn dispatch_fn)
380 {
381 	uint32_t i;
382 	int32_t res = 0;
383 	struct qb_poll_entry *pe;
384 	struct qb_poll_source *s;
385 	struct qb_loop *l = lp;
386 
387 	if (l == NULL) {
388 		l = qb_loop_default_get();
389 	}
390 	s = (struct qb_poll_source *)l->fd_source;
391 
392 	/*
393 	 * Find file descriptor to modify events and dispatch function
394 	 */
395 	for (i = 0; i < s->poll_entry_count; i++) {
396 		assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
397 		if (pe->ufd.fd != fd) {
398 			continue;
399 		}
400 		if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) {
401 			qb_util_log(LOG_ERR,
402 				    "poll_mod : can't modify entry already deleted");
403 			return -EBADF;
404 		}
405 		pe->poll_dispatch_fn = dispatch_fn;
406 		pe->item.user_data = data;
407 		pe->p = p;
408 		if (pe->ufd.events != events) {
409 			res = s->driver.mod(s, pe, fd, events);
410 			pe->ufd.events = events;
411 		}
412 		return res;
413 	}
414 
415 	return -EBADF;
416 }
417 
418 int32_t
qb_loop_poll_del(struct qb_loop * lp,int32_t fd)419 qb_loop_poll_del(struct qb_loop * lp, int32_t fd)
420 {
421 	int32_t i;
422 	int32_t res = 0;
423 	struct qb_poll_entry *pe;
424 	struct qb_poll_source *s;
425 	struct qb_loop *l = lp;
426 
427 	if (l == NULL) {
428 		l = qb_loop_default_get();
429 	}
430 	s = (struct qb_poll_source *)l->fd_source;
431 	for (i = 0; i < s->poll_entry_count; i++) {
432 		assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0);
433 		if (pe->ufd.fd != fd || pe->item.type != QB_LOOP_FD) {
434 			continue;
435 		}
436 		if (pe->state == QB_POLL_ENTRY_DELETED ||
437 		    pe->state == QB_POLL_ENTRY_EMPTY) {
438 			return 0;
439 		}
440 		if (pe->state == QB_POLL_ENTRY_JOBLIST) {
441 			qb_loop_level_item_del(&l->level[pe->p], &pe->item);
442 		}
443 		res = s->driver.del(s, pe, fd, i);
444 		_poll_entry_mark_deleted_(pe);
445 		return res;
446 	}
447 
448 	return -EBADF;
449 }
450 
451 static int32_t pipe_fds[2] = { -1, -1 };
452 
453 struct qb_signal_source {
454 	struct qb_loop_source s;
455 	struct qb_list_head sig_head;
456 	sigset_t signal_superset;
457 };
458 
459 struct qb_loop_sig {
460 	struct qb_loop_item item;
461 	int32_t signal;
462 	enum qb_loop_priority p;
463 	qb_loop_signal_dispatch_fn dispatch_fn;
464 	struct qb_loop_sig *cloned_from;
465 };
466 
467 static void
_handle_real_signal_(int signal_num,siginfo_t * si,void * context)468 _handle_real_signal_(int signal_num, siginfo_t * si, void *context)
469 {
470 	int32_t sig = signal_num;
471 	int32_t res = 0;
472 
473 	if (pipe_fds[1] > 0) {
474 try_again:
475 		res = write(pipe_fds[1], &sig, sizeof(int32_t));
476 		if (res == -1 && errno == EAGAIN) {
477 			goto try_again;
478 		} else if (res != sizeof(int32_t)) {
479 			qb_util_log(LOG_ERR,
480 				    "failed to write signal to pipe [%d]", res);
481 		}
482 	}
483 	qb_util_log(LOG_TRACE, "got real signal [%d] sent to pipe", sig);
484 }
485 
486 static void
_signal_dispatch_and_take_back_(struct qb_loop_item * item,enum qb_loop_priority p)487 _signal_dispatch_and_take_back_(struct qb_loop_item *item,
488 				enum qb_loop_priority p)
489 {
490 	struct qb_loop_sig *sig = (struct qb_loop_sig *)item;
491 	int32_t res;
492 
493 	res = sig->dispatch_fn(sig->signal, sig->item.user_data);
494 	if (res != 0) {
495 		(void)qb_loop_signal_del(sig->cloned_from->item.source->l,
496 					 sig->cloned_from);
497 	}
498 	free(sig);
499 }
500 
501 struct qb_loop_source *
qb_loop_signals_create(struct qb_loop * l)502 qb_loop_signals_create(struct qb_loop *l)
503 {
504 	int32_t res = 0;
505 	struct qb_poll_entry *pe;
506 	struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_source));
507 
508 	if (s == NULL) {
509 		return NULL;
510 	}
511 	s->s.l = l;
512 	s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_;
513 	s->s.poll = NULL;
514 	qb_list_init(&s->sig_head);
515 	sigemptyset(&s->signal_superset);
516 
517 	if (pipe_fds[0] < 0) {
518 		res = pipe(pipe_fds);
519 		if (res == -1) {
520 			res = -errno;
521 			qb_util_perror(LOG_ERR, "Can't light pipe");
522 			goto error_exit;
523 		}
524 		(void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[0]);
525 		(void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[1]);
526 
527 		res = _poll_add_(l, QB_LOOP_HIGH,
528 				 pipe_fds[0], POLLIN, NULL, &pe);
529 		if (res == 0) {
530 			pe->poll_dispatch_fn = NULL;
531 			pe->item.type = QB_LOOP_SIG;
532 			pe->add_to_jobs = _qb_signal_add_to_jobs_;
533 		} else {
534 			qb_util_perror(LOG_ERR, "Can't smoke pipe");
535 			goto error_exit;
536 		}
537 	}
538 
539 	return (struct qb_loop_source *)s;
540 
541 error_exit:
542 	errno = -res;
543 	free(s);
544 	if (pipe_fds[0] >= 0) {
545 		close(pipe_fds[0]);
546 	}
547 	if (pipe_fds[1] >= 0) {
548 		close(pipe_fds[1]);
549 	}
550 	return NULL;
551 }
552 
553 void
qb_loop_signals_destroy(struct qb_loop * l)554 qb_loop_signals_destroy(struct qb_loop *l)
555 {
556 	struct qb_signal_source *s =
557 	    (struct qb_signal_source *)l->signal_source;
558 	struct qb_list_head *list;
559 	struct qb_list_head *n;
560 	struct qb_loop_item *item;
561 
562 	close(pipe_fds[0]);
563 	pipe_fds[0] = -1;
564 	close(pipe_fds[1]);
565 	pipe_fds[1] = -1;
566 
567 	qb_list_for_each_safe(list, n, &s->sig_head) {
568 		item = qb_list_entry(list, struct qb_loop_item, list);
569 		qb_list_del(&item->list);
570 		free(item);
571 	}
572 
573 	free(l->signal_source);
574 }
575 
576 static int32_t
_qb_signal_add_to_jobs_(struct qb_loop * l,struct qb_poll_entry * pe)577 _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe)
578 {
579 	struct qb_signal_source *s =
580 	    (struct qb_signal_source *)l->signal_source;
581 	struct qb_list_head *list;
582 	struct qb_loop_sig *sig;
583 	struct qb_loop_item *item;
584 	struct qb_loop_sig *new_sig_job;
585 	int32_t the_signal;
586 	ssize_t res;
587 	int32_t jobs_added = 0;
588 
589 	res = read(pipe_fds[0], &the_signal, sizeof(int32_t));
590 	if (res != sizeof(int32_t)) {
591 		qb_util_perror(LOG_WARNING, "failed to read pipe");
592 		return 0;
593 	}
594 	pe->ufd.revents = 0;
595 
596 	qb_list_for_each(list, &s->sig_head) {
597 		item = qb_list_entry(list, struct qb_loop_item, list);
598 		sig = (struct qb_loop_sig *)item;
599 		if (sig->signal == the_signal) {
600 			new_sig_job = calloc(1, sizeof(struct qb_loop_sig));
601 			if (new_sig_job == NULL) {
602 				return jobs_added;
603 			}
604 			memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig));
605 
606 			qb_util_log(LOG_TRACE,
607 				    "adding signal [%d] to job queue %p",
608 				    the_signal, sig);
609 
610 			new_sig_job->cloned_from = sig;
611 			qb_loop_level_item_add(&l->level[sig->p],
612 					       &new_sig_job->item);
613 			jobs_added++;
614 		}
615 	}
616 	return jobs_added;
617 }
618 
619 static void
_adjust_sigactions_(struct qb_signal_source * s)620 _adjust_sigactions_(struct qb_signal_source *s)
621 {
622 	struct qb_loop_sig *sig;
623 	struct qb_loop_item *item;
624 	struct sigaction sa;
625 	int32_t i;
626 	int32_t needed;
627 
628 	sa.sa_flags = SA_SIGINFO;
629 	sa.sa_sigaction = _handle_real_signal_;
630 	sigemptyset(&s->signal_superset);
631 	sigemptyset(&sa.sa_mask);
632 
633 	/* re-set to default */
634 	for (i = 0; i < QB_MAX_NUM_SIGNALS; i++) {
635 		needed = QB_FALSE;
636 		qb_list_for_each_entry(item, &s->sig_head, list) {
637 			sig = (struct qb_loop_sig *)item;
638 			if (i == sig->signal) {
639 				needed = QB_TRUE;
640 				break;
641 			}
642 		}
643 		if (needed) {
644 			sigaddset(&s->signal_superset, i);
645 			sigaction(i, &sa, NULL);
646 		}
647 	}
648 }
649 
650 int32_t
qb_loop_signal_add(qb_loop_t * lp,enum qb_loop_priority p,int32_t the_sig,void * data,qb_loop_signal_dispatch_fn dispatch_fn,qb_loop_signal_handle * handle)651 qb_loop_signal_add(qb_loop_t * lp,
652 		   enum qb_loop_priority p,
653 		   int32_t the_sig,
654 		   void *data,
655 		   qb_loop_signal_dispatch_fn dispatch_fn,
656 		   qb_loop_signal_handle * handle)
657 {
658 	struct qb_loop_sig *sig;
659 	struct qb_signal_source *s;
660 	struct qb_loop *l = lp;
661 
662 	if (l == NULL) {
663 		l = qb_loop_default_get();
664 	}
665 	if (l == NULL || dispatch_fn == NULL) {
666 		return -EINVAL;
667 	}
668 	if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
669 		return -EINVAL;
670 	}
671 	s = (struct qb_signal_source *)l->signal_source;
672 	sig = calloc(1, sizeof(struct qb_loop_sig));
673 	if (sig == NULL) {
674 		return -errno;
675 	}
676 
677 	sig->dispatch_fn = dispatch_fn;
678 	sig->p = p;
679 	sig->signal = the_sig;
680 	sig->item.user_data = data;
681 	sig->item.source = l->signal_source;
682 	sig->item.type = QB_LOOP_SIG;
683 
684 	qb_list_init(&sig->item.list);
685 	qb_list_add_tail(&sig->item.list, &s->sig_head);
686 
687 	if (sigismember(&s->signal_superset, the_sig) != 1) {
688 		_adjust_sigactions_(s);
689 	}
690 	if (handle) {
691 		*handle = sig;
692 	}
693 
694 	return 0;
695 }
696 
697 int32_t
qb_loop_signal_mod(qb_loop_t * lp,enum qb_loop_priority p,int32_t the_sig,void * data,qb_loop_signal_dispatch_fn dispatch_fn,qb_loop_signal_handle handle)698 qb_loop_signal_mod(qb_loop_t * lp,
699 		   enum qb_loop_priority p,
700 		   int32_t the_sig,
701 		   void *data,
702 		   qb_loop_signal_dispatch_fn dispatch_fn,
703 		   qb_loop_signal_handle handle)
704 {
705 	struct qb_signal_source *s;
706 	struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
707 	struct qb_loop *l = lp;
708 
709 	if (l == NULL) {
710 		l = qb_loop_default_get();
711 	}
712 	if (l == NULL || dispatch_fn == NULL || handle == NULL) {
713 		return -EINVAL;
714 	}
715 	if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
716 		return -EINVAL;
717 	}
718 	s = (struct qb_signal_source *)l->signal_source;
719 
720 	sig->item.user_data = data;
721 	sig->item.type = QB_LOOP_SIG;
722 	sig->dispatch_fn = dispatch_fn;
723 	sig->p = p;
724 
725 	if (sig->signal != the_sig) {
726 		(void)signal(sig->signal, SIG_DFL);
727 		sig->signal = the_sig;
728 		_adjust_sigactions_(s);
729 	}
730 
731 	return 0;
732 }
733 
734 int32_t
qb_loop_signal_del(qb_loop_t * lp,qb_loop_signal_handle handle)735 qb_loop_signal_del(qb_loop_t * lp, qb_loop_signal_handle handle)
736 {
737 	struct qb_signal_source *s;
738 	struct qb_loop_sig *sig = (struct qb_loop_sig *)handle;
739 	struct qb_loop_sig *sig_clone;
740 	struct qb_loop *l = lp;
741 	struct qb_loop_item *item;
742 
743 	if (l == NULL) {
744 		l = qb_loop_default_get();
745 	}
746 	if (l == NULL || handle == NULL) {
747 		return -EINVAL;
748 	}
749 	s = (struct qb_signal_source *)l->signal_source;
750 
751 	qb_list_for_each_entry(item, &l->level[sig->p].wait_head, list) {
752 		if (item->type != QB_LOOP_SIG) {
753 			continue;
754 		}
755 		sig_clone = (struct qb_loop_sig *)item;
756 		if (sig_clone->cloned_from == sig) {
757 			qb_util_log(LOG_TRACE, "deleting sig in WAITLIST");
758 			qb_list_del(&sig_clone->item.list);
759 			free(sig_clone);
760 			break;
761 		}
762 	}
763 
764 	qb_list_for_each_entry(item, &l->level[sig->p].job_head, list) {
765 		if (item->type != QB_LOOP_SIG) {
766 			continue;
767 		}
768 		sig_clone = (struct qb_loop_sig *)item;
769 		if (sig_clone->cloned_from == sig) {
770 			qb_loop_level_item_del(&l->level[sig->p], item);
771 			qb_util_log(LOG_TRACE, "deleting sig in JOBLIST");
772 			break;
773 		}
774 	}
775 
776 	qb_list_del(&sig->item.list);
777 	(void)signal(sig->signal, SIG_DFL);
778 	free(sig);
779 	_adjust_sigactions_(s);
780 	return 0;
781 }
782