1 /*
2  * threads.c	request threading support
3  *
4  * Version:	$Id: a9bd63bb57484df65fe716a31ec46568c596936c $
5  *
6  *   This program is free software; you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with this program; if not, write to the Free Software
18  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006  The FreeRADIUS server project
21  * Copyright 2000  Alan DeKok <aland@ox.org>
22  */
23 
24 RCSID("$Id: a9bd63bb57484df65fe716a31ec46568c596936c $")
25 USES_APPLE_DEPRECATED_API	/* OpenSSL API has been deprecated by Apple */
26 
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/process.h>
29 
30 #ifdef HAVE_STDATOMIC_H
31 #include <freeradius-devel/atomic_queue.h>
32 #endif
33 
34 #include <freeradius-devel/rad_assert.h>
35 
36 /*
37  *	Other OS's have sem_init, OS X doesn't.
38  */
39 #ifdef HAVE_SEMAPHORE_H
40 #include <semaphore.h>
41 #endif
42 
43 #ifdef __APPLE__
44 #ifdef WITH_GCD
45 #include <dispatch/dispatch.h>
46 #endif
47 #include <mach/task.h>
48 #include <mach/mach_init.h>
49 #include <mach/semaphore.h>
50 
51 #ifndef WITH_GCD
52 #undef sem_t
53 #define sem_t semaphore_t
54 #undef sem_init
55 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
56 #undef sem_wait
57 #define sem_wait(s) semaphore_wait(*s)
58 #undef sem_post
59 #define sem_post(s) semaphore_signal(*s)
60 #endif	/* WITH_GCD */
61 #endif	/* __APPLE__ */
62 
63 #ifdef HAVE_SYS_WAIT_H
64 #include <sys/wait.h>
65 #endif
66 
67 #ifdef HAVE_PTHREAD_H
68 
69 #ifdef HAVE_OPENSSL_CRYPTO_H
70 #include <openssl/crypto.h>
71 #endif
72 #ifdef HAVE_OPENSSL_ERR_H
73 #include <openssl/err.h>
74 #endif
75 #ifdef HAVE_OPENSSL_EVP_H
76 #include <openssl/evp.h>
77 #endif
78 
79 #ifndef WITH_GCD
80 #define SEMAPHORE_LOCKED	(0)
81 
82 #define THREAD_RUNNING		(1)
83 #define THREAD_CANCELLED	(2)
84 #define THREAD_EXITED		(3)
85 
86 #define NUM_FIFOS	       RAD_LISTEN_MAX
87 
88 #ifndef HAVE_STDALIGN_H
89 #undef HAVE_STDATOMIC_H
90 #endif
91 
92 #ifdef HAVE_STDATOMIC_H
93 #define CAS_INCR(_x) do { uint32_t num; \
94 			  num = load(_x); \
95 			  if (cas_incr(_x, num)) break; \
96                      } while (true)
97 
98 #define CAS_DECR(_x) do { uint32_t num; \
99 			  num = load(_x); \
100 			  if (cas_decr(_x, num)) break; \
101                      } while (true)
102 #endif
103 
104 /*
105  *  A data structure which contains the information about
106  *  the current thread.
107  */
108 typedef struct THREAD_HANDLE {
109 	struct THREAD_HANDLE	*prev;		//!< Previous thread handle (in the linked list).
110 	struct THREAD_HANDLE	*next;		//!< Next thread handle (int the linked list).
111 	pthread_t		pthread_id;	//!< pthread_id.
112 	int			thread_num;	//!< Server thread number, 1...number of threads.
113 	int			status;		//!< Is the thread running or exited?
114 	unsigned int		request_count;	//!< The number of requests that this thread has handled.
115 	time_t			timestamp;	//!< When the thread started executing.
116 	REQUEST			*request;
117 } THREAD_HANDLE;
118 
119 #endif	/* WITH_GCD */
120 
121 #ifdef WNOHANG
122 typedef struct thread_fork_t {
123 	pid_t		pid;
124 	int		status;
125 	int		exited;
126 } thread_fork_t;
127 #endif
128 
129 
130 #ifdef WITH_STATS
131 typedef struct fr_pps_t {
132 	uint32_t	pps_old;
133 	uint32_t	pps_now;
134 	uint32_t	pps;
135 	time_t		time_old;
136 } fr_pps_t;
137 #endif
138 
139 
140 /*
141  *	A data structure to manage the thread pool.  There's no real
142  *	need for a data structure, but it makes things conceptually
143  *	easier.
144  */
145 typedef struct THREAD_POOL {
146 #ifndef WITH_GCD
147 	THREAD_HANDLE	*head;
148 	THREAD_HANDLE	*tail;
149 
150 	uint32_t	total_threads;
151 
152 	uint32_t	max_thread_num;
153 	uint32_t	start_threads;
154 	uint32_t	max_threads;
155 	uint32_t	min_spare_threads;
156 	uint32_t	max_spare_threads;
157 	uint32_t	max_requests_per_thread;
158 	uint32_t	request_count;
159 	time_t		time_last_spawned;
160 	uint32_t	cleanup_delay;
161 	bool		stop_flag;
162 #endif	/* WITH_GCD */
163 	bool		spawn_flag;
164 
165 #ifdef WNOHANG
166 	pthread_mutex_t	wait_mutex;
167 	fr_hash_table_t *waiters;
168 #endif
169 
170 #ifdef WITH_GCD
171 	dispatch_queue_t	queue;
172 #else
173 
174 #ifdef WITH_STATS
175 	fr_pps_t	pps_in, pps_out;
176 #ifdef WITH_ACCOUNTING
177 	bool		auto_limit_acct;
178 #endif
179 #endif
180 
181 	/*
182 	 *	All threads wait on this semaphore, for requests
183 	 *	to enter the queue.
184 	 */
185 	sem_t		semaphore;
186 
187 	uint32_t	max_queue_size;
188 
189 #ifndef HAVE_STDATOMIC_H
190 	/*
191 	 *	To ensure only one thread at a time touches the queue.
192 	 */
193 	pthread_mutex_t	queue_mutex;
194 
195 	uint32_t	active_threads;	/* protected by queue_mutex */
196 	uint32_t	exited_threads;
197 	uint32_t	num_queued;
198 	fr_fifo_t	*fifo[NUM_FIFOS];
199 #else
200 	atomic_uint32_t	  active_threads;
201 	atomic_uint32_t	  exited_threads;
202 	fr_atomic_queue_t *queue[NUM_FIFOS];
203 #endif	/* STDATOMIC */
204 #endif	/* WITH_GCD */
205 } THREAD_POOL;
206 
207 static THREAD_POOL thread_pool;
208 static bool pool_initialized = false;
209 
210 #ifndef WITH_GCD
211 static time_t last_cleaned = 0;
212 
213 static void thread_pool_manage(time_t now);
214 #endif
215 
216 #ifndef WITH_GCD
217 /*
218  *	A mapping of configuration file names to internal integers
219  */
220 static const CONF_PARSER thread_config[] = {
221 	{ "start_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.start_threads), "5" },
222 	{ "max_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_threads), "32" },
223 	{ "min_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.min_spare_threads), "3" },
224 	{ "max_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_spare_threads), "10" },
225 	{ "max_requests_per_server", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), "0" },
226 	{ "cleanup_delay", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.cleanup_delay), "5" },
227 	{ "max_queue_size", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_queue_size), "65536" },
228 #ifdef WITH_STATS
229 #ifdef WITH_ACCOUNTING
230 	{ "auto_limit_acct", FR_CONF_POINTER(PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct), NULL },
231 #endif
232 #endif
233 	CONF_PARSER_TERMINATOR
234 };
235 #endif
236 
237 #ifdef HAVE_OPENSSL_CRYPTO_H
238 
239 /*
240  *	If we're linking against OpenSSL, then it is the
241  *	duty of the application, if it is multithreaded,
242  *	to provide OpenSSL with appropriate thread id
243  *	and mutex locking functions
244  *
245  *	Note: this only implements static callbacks.
246  *	OpenSSL does not use dynamic locking callbacks
247  *	right now, but may in the future, so we will have
248  *	to add them at some point.
249  */
250 
251 static pthread_mutex_t *ssl_mutexes = NULL;
252 
253 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
get_ssl_id(void)254 static unsigned long get_ssl_id(void)
255 {
256 	unsigned long ret;
257 	pthread_t thread = pthread_self();
258 
259 	if (sizeof(ret) >= sizeof(thread)) {
260 		memcpy(&ret, &thread, sizeof(thread));
261 	} else {
262 		memcpy(&ret, &thread, sizeof(ret));
263 	}
264 
265 	return ret;
266 }
267 
268 /*
269  *	Use preprocessor magic to get the right function and argument
270  *	to use.  This avoids ifdef's through the rest of the code.
271  */
272 #if OPENSSL_VERSION_NUMBER < 0x10000000L
273 #define ssl_id_function get_ssl_id
274 #define set_id_callback CRYPTO_set_id_callback
275 
276 #else
ssl_id_function(CRYPTO_THREADID * id)277 static void ssl_id_function(CRYPTO_THREADID *id)
278 {
279 	CRYPTO_THREADID_set_numeric(id, get_ssl_id());
280 }
281 #define set_id_callback CRYPTO_THREADID_set_callback
282 #endif
283 #endif
284 
285 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
ssl_locking_function(int mode,int n,UNUSED char const * file,UNUSED int line)286 static void ssl_locking_function(int mode, int n, UNUSED char const *file, UNUSED int line)
287 {
288 	if (mode & CRYPTO_LOCK) {
289 		pthread_mutex_lock(&(ssl_mutexes[n]));
290 	} else {
291 		pthread_mutex_unlock(&(ssl_mutexes[n]));
292 	}
293 }
294 #endif
295 
296 /*
297  *	Create the TLS mutexes.
298  */
tls_mutexes_init(void)299 int tls_mutexes_init(void)
300 {
301 	int i;
302 
303 	ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
304 	if (!ssl_mutexes) {
305 		ERROR("Error allocating memory for SSL mutexes!");
306 		return -1;
307 	}
308 
309 	for (i = 0; i < CRYPTO_num_locks(); i++) {
310 		pthread_mutex_init(&(ssl_mutexes[i]), NULL);
311 	}
312 
313 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
314 	set_id_callback(ssl_id_function);
315 #endif
316 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
317 	CRYPTO_set_locking_callback(ssl_locking_function);
318 #endif
319 
320 	return 0;
321 }
322 #endif
323 
324 #ifdef WNOHANG
325 /*
326  *	We don't want to catch SIGCHLD for a host of reasons.
327  *
328  *	- exec_wait means that someone, somewhere, somewhen, will
329  *	call waitpid(), and catch the child.
330  *
331  *	- SIGCHLD is delivered to a random thread, not the one that
332  *	forked.
333  *
334  *	- if another thread catches the child, we have to coordinate
335  *	with the thread doing the waiting.
336  *
337  *	- if we don't waitpid() for non-wait children, they'll be zombies,
338  *	and will hang around forever.
339  *
340  */
reap_children(void)341 static void reap_children(void)
342 {
343 	pid_t pid;
344 	int status;
345 	thread_fork_t mytf, *tf;
346 
347 
348 	pthread_mutex_lock(&thread_pool.wait_mutex);
349 
350 	do {
351 	retry:
352 		pid = waitpid(0, &status, WNOHANG);
353 		if (pid <= 0) break;
354 
355 		mytf.pid = pid;
356 		tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
357 		if (!tf) goto retry;
358 
359 		tf->status = status;
360 		tf->exited = 1;
361 	} while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
362 
363 	pthread_mutex_unlock(&thread_pool.wait_mutex);
364 }
365 #else
366 #define reap_children()
367 #endif /* WNOHANG */
368 
369 #ifndef WITH_GCD
370 /*
371  *	Add a request to the list of waiting requests.
372  *	This function gets called ONLY from the main handler thread...
373  *
374  *	This function should never fail.
375  */
request_enqueue(REQUEST * request)376 int request_enqueue(REQUEST *request)
377 {
378 	bool managed = false;
379 
380 	rad_assert(pool_initialized == true);
381 
382 	/*
383 	 *	If we haven't checked the number of child threads
384 	 *	in a while, OR if the thread pool appears to be full,
385 	 *	go manage it.
386 	 */
387 	if (last_cleaned < request->timestamp) {
388 		thread_pool_manage(request->timestamp);
389 		managed = true;
390 	}
391 
392 #ifdef HAVE_STDATOMIC_H
393 	if (!managed) {
394 		uint32_t num;
395 
396 		num = load(thread_pool.active_threads);
397 		if (num == thread_pool.total_threads) {
398 			thread_pool_manage(request->timestamp);
399 			managed = true;
400 		}
401 
402 		if (!managed) {
403 			num = load(thread_pool.exited_threads);
404 			if (num > 0) {
405 				thread_pool_manage(request->timestamp);
406 			}
407 		}
408 	}
409 
410 	/*
411 	 *	Use atomic queues where possible.  They're substantially faster than mutexes.
412 	 */
413 	request->component = "<core>";
414 	request->module = "<queue>";
415 	request->child_state = REQUEST_QUEUED;
416 
417 	/*
418 	 *	Push the request onto the appropriate fifo for that
419 	 */
420 	if (!fr_atomic_queue_push(thread_pool.queue[request->priority], request)) {
421 		ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
422 		return 0;
423 	}
424 
425 #else  /* no atomic queues */
426 
427 	if (!managed &&
428 	    ((thread_pool.active_threads == thread_pool.total_threads) ||
429 	     (thread_pool.exited_threads > 0))) {
430 		thread_pool_manage(request->timestamp);
431 	}
432 
433 	pthread_mutex_lock(&thread_pool.queue_mutex);
434 
435 #ifdef WITH_STATS
436 #ifdef WITH_ACCOUNTING
437 	if (thread_pool.auto_limit_acct) {
438 		struct timeval now;
439 
440 		/*
441 		 *	Throw away accounting requests if we're too
442 		 *	busy.  The NAS should retransmit these, and no
443 		 *	one should notice.
444 		 *
445 		 *	In contrast, we always try to process
446 		 *	authentication requests.  Those are more time
447 		 *	critical, and it's harder to determine which
448 		 *	we can throw away, and which we can keep.
449 		 *
450 		 *	We allow the queue to get half full before we
451 		 *	start worrying.  Even then, we still require
452 		 *	that the rate of input packets is higher than
453 		 *	the rate of outgoing packets.  i.e. the queue
454 		 *	is growing.
455 		 *
456 		 *	Once that happens, we roll a dice to see where
457 		 *	the barrier is for "keep" versus "toss".  If
458 		 *	the queue is smaller than the barrier, we
459 		 *	allow it.  If the queue is larger than the
460 		 *	barrier, we throw the packet away.  Otherwise,
461 		 *	we keep it.
462 		 *
463 		 *	i.e. the probability of throwing the packet
464 		 *	away increases from 0 (queue is half full), to
465 		 *	100 percent (queue is completely full).
466 		 *
467 		 *	A probabilistic approach allows us to process
468 		 *	SOME of the new accounting packets.
469 		 */
470 		if ((request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
471 		    (thread_pool.num_queued > (thread_pool.max_queue_size / 2)) &&
472 		    (thread_pool.pps_in.pps_now > thread_pool.pps_out.pps_now)) {
473 			uint32_t prob;
474 			uint32_t keep;
475 
476 			/*
477 			 *	Take a random value of how full we
478 			 *	want the queue to be.  It's OK to be
479 			 *	half full, but we get excited over
480 			 *	anything more than that.
481 			 */
482 			keep = (thread_pool.max_queue_size / 2);
483 			prob = fr_rand() & ((1 << 10) - 1);
484 			keep *= prob;
485 			keep >>= 10;
486 			keep += (thread_pool.max_queue_size / 2);
487 
488 			/*
489 			 *	If the queue is larger than our dice
490 			 *	roll, we throw the packet away.
491 			 */
492 			if (thread_pool.num_queued > keep) {
493 				pthread_mutex_unlock(&thread_pool.queue_mutex);
494 				return 0;
495 			}
496 		}
497 
498 		gettimeofday(&now, NULL);
499 
500 		/*
501 		 *	Calculate the instantaneous arrival rate into
502 		 *	the queue.
503 		 */
504 		thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
505 						 &thread_pool.pps_in.pps_now,
506 						 &thread_pool.pps_in.time_old,
507 						 &now);
508 
509 		thread_pool.pps_in.pps_now++;
510 	}
511 #endif	/* WITH_ACCOUNTING */
512 #endif
513 
514 	thread_pool.request_count++;
515 
516 	if (thread_pool.num_queued >= thread_pool.max_queue_size) {
517 		pthread_mutex_unlock(&thread_pool.queue_mutex);
518 
519 		/*
520 		 *	Mark the request as done.
521 		 */
522 		RATE_LIMIT(ERROR("Something is blocking the server.  There are %d packets in the queue, "
523 				 "waiting to be processed.  Ignoring the new request.", thread_pool.num_queued));
524 		return 0;
525 	}
526 
527 	request->component = "<core>";
528 	request->module = "<queue>";
529 	request->child_state = REQUEST_QUEUED;
530 
531 	/*
532 	 *	Push the request onto the appropriate fifo for that
533 	 */
534 	if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
535 		pthread_mutex_unlock(&thread_pool.queue_mutex);
536 		ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
537 		return 0;
538 	}
539 
540 	thread_pool.num_queued++;
541 
542 	pthread_mutex_unlock(&thread_pool.queue_mutex);
543 #endif
544 
545 	/*
546 	 *	There's one more request in the queue.
547 	 *
548 	 *	Note that we're not touching the queue any more, so
549 	 *	the semaphore post is outside of the mutex.  This also
550 	 *	means that when the thread wakes up and tries to lock
551 	 *	the mutex, it will be unlocked, and there won't be
552 	 *	contention.
553 	 */
554 	sem_post(&thread_pool.semaphore);
555 
556 	return 1;
557 }
558 
559 /*
560  *	Remove a request from the queue.
561  */
request_dequeue(REQUEST ** prequest)562 static int request_dequeue(REQUEST **prequest)
563 {
564 	time_t blocked;
565 	static time_t last_complained = 0;
566 	static time_t total_blocked = 0;
567 	int num_blocked = 0;
568 #ifndef HAVE_STDATOMIC_H
569 	RAD_LISTEN_TYPE start;
570 #endif
571 	RAD_LISTEN_TYPE i;
572 	REQUEST *request = NULL;
573 	reap_children();
574 
575 	rad_assert(pool_initialized == true);
576 
577 #ifdef HAVE_STDATOMIC_H
578 retry:
579 	for (i = 0; i < NUM_FIFOS; i++) {
580 		if (!fr_atomic_queue_pop(thread_pool.queue[i], (void **) &request)) continue;
581 
582 		rad_assert(request != NULL);
583 
584 		VERIFY_REQUEST(request);
585 
586 		if (request->master_state != REQUEST_STOP_PROCESSING) {
587 			break;
588 		}
589 
590 		/*
591 		 *	This entry was marked to be stopped.  Acknowledge it.
592 		 */
593 		request->child_state = REQUEST_DONE;
594 	}
595 
596 	/*
597 	 *	Popping might fail.  If so, return.
598 	 */
599 	if (!request) return 0;
600 
601 #else
602 	pthread_mutex_lock(&thread_pool.queue_mutex);
603 
604 #ifdef WITH_STATS
605 #ifdef WITH_ACCOUNTING
606 	if (thread_pool.auto_limit_acct) {
607 		struct timeval now;
608 
609 		gettimeofday(&now, NULL);
610 
611 		/*
612 		 *	Calculate the instantaneous departure rate
613 		 *	from the queue.
614 		 */
615 		thread_pool.pps_out.pps  = rad_pps(&thread_pool.pps_out.pps_old,
616 						   &thread_pool.pps_out.pps_now,
617 						   &thread_pool.pps_out.time_old,
618 						   &now);
619 		thread_pool.pps_out.pps_now++;
620 	}
621 #endif
622 #endif
623 
624 	/*
625 	 *	Clear old requests from all queues.
626 	 *
627 	 *	We only do one pass over the queue, in order to
628 	 *	amortize the work across the child threads.  Since we
629 	 *	do N checks for one request de-queued, the old
630 	 *	requests will be quickly cleared.
631 	 */
632 	for (i = 0; i < NUM_FIFOS; i++) {
633 		request = fr_fifo_peek(thread_pool.fifo[i]);
634 		if (!request) continue;
635 
636 		VERIFY_REQUEST(request);
637 
638 		if (request->master_state != REQUEST_STOP_PROCESSING) {
639 			continue;
640 		}
641 
642 		/*
643 		 *	This entry was marked to be stopped.  Acknowledge it.
644 		 */
645 		request = fr_fifo_pop(thread_pool.fifo[i]);
646 		rad_assert(request != NULL);
647 		VERIFY_REQUEST(request);
648 		request->child_state = REQUEST_DONE;
649 		thread_pool.num_queued--;
650 	}
651 
652 	start = 0;
653  retry:
654 	/*
655 	 *	Pop results from the top of the queue
656 	 */
657 	for (i = start; i < NUM_FIFOS; i++) {
658 		request = fr_fifo_pop(thread_pool.fifo[i]);
659 		if (request) {
660 			VERIFY_REQUEST(request);
661 			start = i;
662 			break;
663 		}
664 	}
665 
666 	if (!request) {
667 		pthread_mutex_unlock(&thread_pool.queue_mutex);
668 		*prequest = NULL;
669 		return 0;
670 	}
671 
672 	rad_assert(thread_pool.num_queued > 0);
673 	thread_pool.num_queued--;
674 #endif	/* HAVE_STD_ATOMIC_H */
675 
676 	*prequest = request;
677 
678 	rad_assert(*prequest != NULL);
679 	rad_assert(request->magic == REQUEST_MAGIC);
680 
681 	request->component = "<core>";
682 	request->module = "";
683 	request->child_state = REQUEST_RUNNING;
684 
685 	/*
686 	 *	If the request has sat in the queue for too long,
687 	 *	kill it.
688 	 *
689 	 *	The main clean-up code can't delete the request from
690 	 *	the queue, and therefore won't clean it up until we
691 	 *	have acknowledged it as "done".
692 	 */
693 	if (request->master_state == REQUEST_STOP_PROCESSING) {
694 		request->module = "<done>";
695 		request->child_state = REQUEST_DONE;
696 		goto retry;
697 	}
698 
699 	/*
700 	 *	The thread is currently processing a request.
701 	 */
702 #ifdef HAVE_STDATOMIC_H
703 	CAS_INCR(thread_pool.active_threads);
704 #else
705 	thread_pool.active_threads++;
706 #endif
707 
708 	blocked = time(NULL);
709 	if (!request->proxy && (blocked - request->timestamp) > 5) {
710 		total_blocked++;
711 		if (last_complained < blocked) {
712 			last_complained = blocked;
713 			blocked -= request->timestamp;
714 			num_blocked = total_blocked;
715 		} else {
716 			blocked = 0;
717 		}
718 	} else {
719 		total_blocked = 0;
720 		blocked = 0;
721 	}
722 
723 #ifndef HAVE_STDATOMIC_H
724 	pthread_mutex_unlock(&thread_pool.queue_mutex);
725 #endif
726 
727 	if (blocked) {
728 		ERROR("%d requests have been waiting in the processing queue for %d seconds.  Check that all databases are running properly!",
729 		      num_blocked, (int) blocked);
730 	}
731 
732 	return 1;
733 }
734 
735 
736 /*
737  *	The main thread handler for requests.
738  *
739  *	Wait on the semaphore until we have it, and process the request.
740  */
request_handler_thread(void * arg)741 static void *request_handler_thread(void *arg)
742 {
743 	THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
744 
745 	/*
746 	 *	Loop forever, until told to exit.
747 	 */
748 	do {
749 		/*
750 		 *	Wait to be signalled.
751 		 */
752 		DEBUG2("Thread %d waiting to be assigned a request",
753 		       self->thread_num);
754 	re_wait:
755 		if (sem_wait(&thread_pool.semaphore) != 0) {
756 			/*
757 			 *	Interrupted system call.  Go back to
758 			 *	waiting, but DON'T print out any more
759 			 *	text.
760 			 */
761 			if ((errno == EINTR) || (errno == EAGAIN)) {
762 				DEBUG2("Re-wait %d", self->thread_num);
763 				goto re_wait;
764 			}
765 			ERROR("Thread %d failed waiting for semaphore: %s: Exiting\n",
766 			       self->thread_num, fr_syserror(errno));
767 			break;
768 		}
769 
770 		DEBUG2("Thread %d got semaphore", self->thread_num);
771 
772 #ifdef HAVE_OPENSSL_ERR_H
773 		/*
774 		 *	Clear the error queue for the current thread.
775 		 */
776 		ERR_clear_error();
777 #endif
778 
779 		/*
780 		 *	The server is exiting.  Don't dequeue any
781 		 *	requests.
782 		 */
783 		if (thread_pool.stop_flag) break;
784 
785 		/*
786 		 *	Try to grab a request from the queue.
787 		 *
788 		 *	It may be empty, in which case we fail
789 		 *	gracefully.
790 		 */
791 		if (!request_dequeue(&self->request)) continue;
792 
793 		self->request->child_pid = self->pthread_id;
794 		self->request_count++;
795 
796 		DEBUG2("Thread %d handling request %d, (%d handled so far)",
797 		       self->thread_num, self->request->number,
798 		       self->request_count);
799 
800 #ifndef HAVE_STDATOMIC_H
801 #ifdef WITH_ACCOUNTING
802 		if ((self->request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
803 		    thread_pool.auto_limit_acct) {
804 			VALUE_PAIR *vp;
805 			REQUEST *request = self->request;
806 
807 			vp = radius_pair_create(request, &request->config,
808 					       181, VENDORPEC_FREERADIUS);
809 			if (vp) vp->vp_integer = thread_pool.pps_in.pps;
810 
811 			vp = radius_pair_create(request, &request->config,
812 					       182, VENDORPEC_FREERADIUS);
813 			if (vp) vp->vp_integer = thread_pool.pps_in.pps;
814 
815 			vp = radius_pair_create(request, &request->config,
816 					       183, VENDORPEC_FREERADIUS);
817 			if (vp) {
818 				vp->vp_integer = thread_pool.max_queue_size - thread_pool.num_queued;
819 				vp->vp_integer *= 100;
820 				vp->vp_integer /= thread_pool.max_queue_size;
821 			}
822 		}
823 #endif
824 #endif
825 
826 		self->request->process(self->request, FR_ACTION_RUN);
827 		self->request = NULL;
828 
829 #ifdef HAVE_STDATOMIC_H
830 		CAS_DECR(thread_pool.active_threads);
831 #else
832 		/*
833 		 *	Update the active threads.
834 		 */
835 		pthread_mutex_lock(&thread_pool.queue_mutex);
836 		rad_assert(thread_pool.active_threads > 0);
837 		thread_pool.active_threads--;
838 		pthread_mutex_unlock(&thread_pool.queue_mutex);
839 #endif
840 
841 		/*
842 		 *	If the thread has handled too many requests, then make it
843 		 *	exit.
844 		 */
845 		if ((thread_pool.max_requests_per_thread > 0) &&
846 		    (self->request_count >= thread_pool.max_requests_per_thread)) {
847 			DEBUG2("Thread %d handled too many requests",
848 			       self->thread_num);
849 			break;
850 		}
851 	} while (self->status != THREAD_CANCELLED);
852 
853 	DEBUG2("Thread %d exiting...", self->thread_num);
854 
855 #ifdef HAVE_OPENSSL_ERR_H
856 	/*
857 	 *	If we linked with OpenSSL, the application
858 	 *	must remove the thread's error queue before
859 	 *	exiting to prevent memory leaks.
860 	 */
861 #if OPENSSL_VERSION_NUMBER < 0x10000000L
862 	ERR_remove_state(0);
863 #elif OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER)
864 	ERR_remove_thread_state(NULL);
865 #endif
866 #endif
867 
868 #ifdef HAVE_STDATOMIC_H
869 	CAS_INCR(thread_pool.exited_threads);
870 #else
871 	pthread_mutex_lock(&thread_pool.queue_mutex);
872 	thread_pool.exited_threads++;
873 	pthread_mutex_unlock(&thread_pool.queue_mutex);
874 #endif
875 
876 	/*
877 	 *  Do this as the LAST thing before exiting.
878 	 */
879 	self->request = NULL;
880 	self->status = THREAD_EXITED;
881 	exec_trigger(NULL, NULL, "server.thread.stop", true);
882 
883 	return NULL;
884 }
885 
886 /*
887  *	Take a THREAD_HANDLE, delete it from the thread pool and
888  *	free its resources.
889  *
890  *	This function is called ONLY from the main server thread,
891  *	ONLY after the thread has exited.
892  */
delete_thread(THREAD_HANDLE * handle)893 static void delete_thread(THREAD_HANDLE *handle)
894 {
895 	THREAD_HANDLE *prev;
896 	THREAD_HANDLE *next;
897 
898 	rad_assert(handle->request == NULL);
899 
900 	DEBUG2("Deleting thread %d", handle->thread_num);
901 
902 	prev = handle->prev;
903 	next = handle->next;
904 	rad_assert(thread_pool.total_threads > 0);
905 	thread_pool.total_threads--;
906 
907 	/*
908 	 *	Remove the handle from the list.
909 	 */
910 	if (prev == NULL) {
911 		rad_assert(thread_pool.head == handle);
912 		thread_pool.head = next;
913 	} else {
914 		prev->next = next;
915 	}
916 
917 	if (next == NULL) {
918 		rad_assert(thread_pool.tail == handle);
919 		thread_pool.tail = prev;
920 	} else {
921 		next->prev = prev;
922 	}
923 
924 	/*
925 	 *	Free the handle, now that it's no longer referencable.
926 	 */
927 	free(handle);
928 }
929 
930 
931 /*
932  *	Spawn a new thread, and place it in the thread pool.
933  *
934  *	The thread is started initially in the blocked state, waiting
935  *	for the semaphore.
936  */
spawn_thread(time_t now,int do_trigger)937 static THREAD_HANDLE *spawn_thread(time_t now, int do_trigger)
938 {
939 	int rcode;
940 	THREAD_HANDLE *handle;
941 
942 	/*
943 	 *	Ensure that we don't spawn too many threads.
944 	 */
945 	if (thread_pool.total_threads >= thread_pool.max_threads) {
946 		DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
947 		return NULL;
948 	}
949 
950 	/*
951 	 *	Allocate a new thread handle.
952 	 */
953 	handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
954 	memset(handle, 0, sizeof(THREAD_HANDLE));
955 	handle->prev = NULL;
956 	handle->next = NULL;
957 	handle->thread_num = thread_pool.max_thread_num++;
958 	handle->request_count = 0;
959 	handle->status = THREAD_RUNNING;
960 	handle->timestamp = time(NULL);
961 
962 	/*
963 	 *	Create the thread joinable, so that it can be cleaned up
964 	 *	using pthread_join().
965 	 *
966 	 *	Note that the function returns non-zero on error, NOT
967 	 *	-1.  The return code is the error, and errno isn't set.
968 	 */
969 	rcode = pthread_create(&handle->pthread_id, 0, request_handler_thread, handle);
970 	if (rcode != 0) {
971 		free(handle);
972 		ERROR("Thread create failed: %s",
973 		       fr_syserror(rcode));
974 		return NULL;
975 	}
976 
977 	/*
978 	 *	One more thread to go into the list.
979 	 */
980 	thread_pool.total_threads++;
981 	DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
982 			handle->thread_num, thread_pool.total_threads);
983 	if (do_trigger) exec_trigger(NULL, NULL, "server.thread.start", true);
984 
985 	/*
986 	 *	Add the thread handle to the tail of the thread pool list.
987 	 */
988 	if (thread_pool.tail) {
989 		thread_pool.tail->next = handle;
990 		handle->prev = thread_pool.tail;
991 		thread_pool.tail = handle;
992 	} else {
993 		rad_assert(thread_pool.head == NULL);
994 		thread_pool.head = thread_pool.tail = handle;
995 	}
996 
997 	/*
998 	 *	Update the time we last spawned a thread.
999 	 */
1000 	thread_pool.time_last_spawned = now;
1001 
1002 	/*
1003 	 * Fire trigger if maximum number of threads reached
1004 	 */
1005 	if (thread_pool.total_threads >= thread_pool.max_threads)
1006 		exec_trigger(NULL, NULL, "server.thread.max_threads", true);
1007 
1008 	/*
1009 	 *	And return the new handle to the caller.
1010 	 */
1011 	return handle;
1012 }
1013 #endif	/* WITH_GCD */
1014 
1015 
1016 #ifdef WNOHANG
pid_hash(void const * data)1017 static uint32_t pid_hash(void const *data)
1018 {
1019 	thread_fork_t const *tf = data;
1020 
1021 	return fr_hash(&tf->pid, sizeof(tf->pid));
1022 }
1023 
pid_cmp(void const * one,void const * two)1024 static int pid_cmp(void const *one, void const *two)
1025 {
1026 	thread_fork_t const *a = one;
1027 	thread_fork_t const *b = two;
1028 
1029 	return (a->pid - b->pid);
1030 }
1031 #endif
1032 
1033 /*
1034  *	Allocate the thread pool, and seed it with an initial number
1035  *	of threads.
1036  *
1037  *	FIXME: What to do on a SIGHUP???
1038  */
thread_pool_init(CONF_SECTION * cs,bool * spawn_flag)1039 int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
1040 {
1041 #ifndef WITH_GCD
1042 	uint32_t	i;
1043 	int		rcode;
1044 #endif
1045 	CONF_SECTION	*pool_cf;
1046 	time_t		now;
1047 #ifdef HAVE_STDATOMIC_H
1048 	int num;
1049 #endif
1050 
1051 	now = time(NULL);
1052 
1053 	rad_assert(spawn_flag != NULL);
1054 	rad_assert(*spawn_flag == true);
1055 	rad_assert(pool_initialized == false); /* not called on HUP */
1056 
1057 	pool_cf = cf_subsection_find_next(cs, NULL, "thread");
1058 #ifdef WITH_GCD
1059 	if (pool_cf) WARN("Built with Grand Central Dispatch.  Ignoring 'thread' subsection");
1060 #else
1061 	if (!pool_cf) *spawn_flag = false;
1062 #endif
1063 
1064 	/*
1065 	 *	Initialize the thread pool to some reasonable values.
1066 	 */
1067 	memset(&thread_pool, 0, sizeof(THREAD_POOL));
1068 #ifndef WITH_GCD
1069 	thread_pool.head = NULL;
1070 	thread_pool.tail = NULL;
1071 	thread_pool.total_threads = 0;
1072 	thread_pool.max_thread_num = 1;
1073 	thread_pool.cleanup_delay = 5;
1074 	thread_pool.stop_flag = false;
1075 #endif
1076 	thread_pool.spawn_flag = *spawn_flag;
1077 
1078 	/*
1079 	 *	Don't bother initializing the mutexes or
1080 	 *	creating the hash tables.  They won't be used.
1081 	 */
1082 	if (!*spawn_flag) return 0;
1083 
1084 #ifdef WNOHANG
1085 	if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
1086 		ERROR("FATAL: Failed to initialize wait mutex: %s",
1087 		       fr_syserror(errno));
1088 		return -1;
1089 	}
1090 
1091 	/*
1092 	 *	Create the hash table of child PID's
1093 	 */
1094 	thread_pool.waiters = fr_hash_table_create(pid_hash,
1095 						   pid_cmp,
1096 						   free);
1097 	if (!thread_pool.waiters) {
1098 		ERROR("FATAL: Failed to set up wait hash");
1099 		return -1;
1100 	}
1101 #endif
1102 
1103 #ifndef WITH_GCD
1104 	if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
1105 		return -1;
1106 	}
1107 
1108 	/*
1109 	 *	Catch corner cases.
1110 	 */
1111 	if (thread_pool.min_spare_threads < 1)
1112 		thread_pool.min_spare_threads = 1;
1113 	if (thread_pool.max_spare_threads < 1)
1114 		thread_pool.max_spare_threads = 1;
1115 	if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
1116 		thread_pool.max_spare_threads = thread_pool.min_spare_threads;
1117 	if (thread_pool.max_threads == 0)
1118 		thread_pool.max_threads = 256;
1119 	if ((thread_pool.max_queue_size < 2) || (thread_pool.max_queue_size > 1024*1024)) {
1120 		ERROR("FATAL: max_queue_size value must be in range 2-1048576");
1121 		return -1;
1122 	}
1123 
1124 	if (thread_pool.start_threads > thread_pool.max_threads) {
1125 		ERROR("FATAL: start_servers (%i) must be <= max_servers (%i)",
1126 		      thread_pool.start_threads, thread_pool.max_threads);
1127 		return -1;
1128 	}
1129 #endif	/* WITH_GCD */
1130 
1131 	/*
1132 	 *	The pool has already been initialized.  Don't spawn
1133 	 *	new threads, and don't forget about forked children.
1134 	 */
1135 	if (pool_initialized) {
1136 		return 0;
1137 	}
1138 
1139 #ifndef WITH_GCD
1140 	/*
1141 	 *	Initialize the queue of requests.
1142 	 */
1143 	memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
1144 	rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
1145 	if (rcode != 0) {
1146 		ERROR("FATAL: Failed to initialize semaphore: %s",
1147 		       fr_syserror(errno));
1148 		return -1;
1149 	}
1150 
1151 #ifndef HAVE_STDATOMIC_H
1152 	rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
1153 	if (rcode != 0) {
1154 		ERROR("FATAL: Failed to initialize queue mutex: %s",
1155 		       fr_syserror(errno));
1156 		return -1;
1157 	}
1158 #else
1159 	num = 0;
1160 	store(thread_pool.active_threads, num);
1161 	store(thread_pool.exited_threads, num);
1162 #endif
1163 
1164 	/*
1165 	 *	Allocate multiple fifos.
1166 	 */
1167 	for (i = 0; i < NUM_FIFOS; i++) {
1168 #ifdef HAVE_STDATOMIC_H
1169 		thread_pool.queue[i] = fr_atomic_queue_create(NULL, thread_pool.max_queue_size);
1170 		if (!thread_pool.queue[i]) {
1171 			ERROR("FATAL: Failed to set up request fifo");
1172 			return -1;
1173 		}
1174 #else
1175 		thread_pool.fifo[i] = fr_fifo_create(NULL, thread_pool.max_queue_size, NULL);
1176 		if (!thread_pool.fifo[i]) {
1177 			ERROR("FATAL: Failed to set up request fifo");
1178 			return -1;
1179 		}
1180 #endif
1181 	}
1182 #endif
1183 
1184 #ifndef WITH_GCD
1185 	/*
1186 	 *	Create a number of waiting threads.
1187 	 *
1188 	 *	If we fail while creating them, do something intelligent.
1189 	 */
1190 	for (i = 0; i < thread_pool.start_threads; i++) {
1191 		if (spawn_thread(now, 0) == NULL) {
1192 			return -1;
1193 		}
1194 	}
1195 #else
1196 	thread_pool.queue = dispatch_queue_create("org.freeradius.threads", NULL);
1197 	if (!thread_pool.queue) {
1198 		ERROR("Failed creating dispatch queue: %s", fr_syserror(errno));
1199 		fr_exit(1);
1200 	}
1201 #endif
1202 
1203 	DEBUG2("Thread pool initialized");
1204 	pool_initialized = true;
1205 	return 0;
1206 }
1207 
1208 
1209 /*
1210  *	Stop all threads in the pool.
1211  */
thread_pool_stop(void)1212 void thread_pool_stop(void)
1213 {
1214 #ifndef WITH_GCD
1215 	int i;
1216 	int total_threads;
1217 	THREAD_HANDLE *handle;
1218 	THREAD_HANDLE *next;
1219 
1220 	if (!pool_initialized) return;
1221 
1222 	/*
1223 	 *	Set pool stop flag.
1224 	 */
1225 	thread_pool.stop_flag = true;
1226 
1227 	/*
1228 	 *	Wakeup all threads to make them see stop flag.
1229 	 */
1230 	total_threads = thread_pool.total_threads;
1231 	for (i = 0; i != total_threads; i++) {
1232 		sem_post(&thread_pool.semaphore);
1233 	}
1234 
1235 	/*
1236 	 *	Join and free all threads.
1237 	 */
1238 	for (handle = thread_pool.head; handle; handle = next) {
1239 		next = handle->next;
1240 		pthread_join(handle->pthread_id, NULL);
1241 		delete_thread(handle);
1242 	}
1243 
1244 	for (i = 0; i < NUM_FIFOS; i++) {
1245 #ifdef HAVE_STDATOMIC_H
1246 		talloc_free(thread_pool.queue[i]);
1247 #else
1248 		fr_fifo_free(thread_pool.fifo[i]);
1249 #endif
1250 	}
1251 
1252 #ifdef WNOHANG
1253 	fr_hash_table_free(thread_pool.waiters);
1254 #endif
1255 
1256 #ifdef HAVE_OPENSSL_CRYPTO_H
1257 	/*
1258 	 *	We're no longer threaded.  Remove the mutexes and free
1259 	 *	the memory.
1260 	 */
1261 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
1262 	set_id_callback(NULL);
1263 #endif
1264 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
1265 	CRYPTO_set_locking_callback(NULL);
1266 #endif
1267 
1268 	free(ssl_mutexes);
1269 #endif
1270 
1271 #endif
1272 }
1273 
1274 
1275 #ifdef WITH_GCD
request_enqueue(REQUEST * request)1276 int request_enqueue(REQUEST *request)
1277 {
1278 	dispatch_block_t block;
1279 
1280 	block = ^{
1281 		request->process(request, FR_ACTION_RUN);
1282 	};
1283 
1284 	dispatch_async(thread_pool.queue, block);
1285 
1286 	return 1;
1287 }
1288 #endif
1289 
1290 #ifndef WITH_GCD
1291 /*
1292  *	Check the min_spare_threads and max_spare_threads.
1293  *
1294  *	If there are too many or too few threads waiting, then we
1295  *	either create some more, or delete some.
1296  */
thread_pool_manage(time_t now)1297 static void thread_pool_manage(time_t now)
1298 {
1299 	uint32_t spare;
1300 	int i, total;
1301 	THREAD_HANDLE *handle, *next;
1302 	uint32_t active_threads;
1303 
1304 	/*
1305 	 *	Loop over the thread pool, deleting exited threads.
1306 	 */
1307 	for (handle = thread_pool.head; handle; handle = next) {
1308 		next = handle->next;
1309 
1310 		/*
1311 		 *	Maybe we've asked the thread to exit, and it
1312 		 *	has agreed.
1313 		 */
1314 		if (handle->status == THREAD_EXITED) {
1315 			pthread_join(handle->pthread_id, NULL);
1316 			delete_thread(handle);
1317 
1318 #ifdef HAVE_STDATOMIC_H
1319 			CAS_DECR(thread_pool.exited_threads);
1320 #else
1321 			pthread_mutex_lock(&thread_pool.queue_mutex);
1322 			thread_pool.exited_threads--;
1323 			pthread_mutex_unlock(&thread_pool.queue_mutex);
1324 #endif
1325 		}
1326 	}
1327 
1328 	/*
1329 	 *	We don't need a mutex lock here, as we're reading
1330 	 *	active_threads, and not modifying it.  We want a close
1331 	 *	approximation of the number of active threads, and this
1332 	 *	is good enough.
1333 	 */
1334 #ifdef HAVE_STDATOMIC_H
1335 	active_threads = load(thread_pool.active_threads);
1336 #else
1337 	active_threads = thread_pool.active_threads;
1338 #endif
1339 	spare = thread_pool.total_threads - active_threads;
1340 	if (rad_debug_lvl) {
1341 		static uint32_t old_total = 0;
1342 		static uint32_t old_active = 0;
1343 
1344 		if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) {
1345 			DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
1346 			       thread_pool.total_threads, active_threads, spare);
1347 			old_total = thread_pool.total_threads;
1348 			old_active = active_threads;
1349 		}
1350 	}
1351 
1352 	/*
1353 	 *	If there are too few spare threads.  Go create some more.
1354 	 */
1355 	if ((thread_pool.total_threads < thread_pool.max_threads) &&
1356 	    (spare < thread_pool.min_spare_threads)) {
1357 		total = thread_pool.min_spare_threads - spare;
1358 
1359 		if ((total + thread_pool.total_threads) > thread_pool.max_threads) {
1360 			total = thread_pool.max_threads - thread_pool.total_threads;
1361 		}
1362 
1363 		DEBUG2("Threads: Spawning %d spares", total);
1364 
1365 		/*
1366 		 *	Create a number of spare threads.
1367 		 */
1368 		for (i = 0; i < total; i++) {
1369 			handle = spawn_thread(now, 1);
1370 			if (handle == NULL) {
1371 				return;
1372 			}
1373 		}
1374 
1375 		return;		/* there aren't too many spare threads */
1376 	}
1377 
1378 	/*
1379 	 *	Only delete spare threads if we haven't already done
1380 	 *	so this second.
1381 	 */
1382 	if (now == last_cleaned) {
1383 		return;
1384 	}
1385 	last_cleaned = now;
1386 
1387 	/*
1388 	 *	Only delete the spare threads if sufficient time has
1389 	 *	passed since we last created one.  This helps to minimize
1390 	 *	the amount of create/delete cycles.
1391 	 */
1392 	if ((now - thread_pool.time_last_spawned) < (int)thread_pool.cleanup_delay) {
1393 		return;
1394 	}
1395 
1396 	/*
1397 	 *	If there are too many spare threads, delete one.
1398 	 *
1399 	 *	Note that we only delete ONE at a time, instead of
1400 	 *	wiping out many.  This allows the excess servers to
1401 	 *	be slowly reaped, just in case the load spike comes again.
1402 	 */
1403 	if (spare > thread_pool.max_spare_threads) {
1404 
1405 		spare -= thread_pool.max_spare_threads;
1406 
1407 		DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1408 
1409 		/*
1410 		 *	Walk through the thread pool, deleting the
1411 		 *	first idle thread we come across.
1412 		 */
1413 		for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1414 			next = handle->next;
1415 
1416 			/*
1417 			 *	If the thread is not handling a
1418 			 *	request, but still live, then tell it
1419 			 *	to exit.
1420 			 *
1421 			 *	It will eventually wake up, and realize
1422 			 *	it's been told to commit suicide.
1423 			 */
1424 			if ((handle->request == NULL) &&
1425 			    (handle->status == THREAD_RUNNING)) {
1426 				handle->status = THREAD_CANCELLED;
1427 				/*
1428 				 *	Post an extra semaphore, as a
1429 				 *	signal to wake up, and exit.
1430 				 */
1431 				sem_post(&thread_pool.semaphore);
1432 				spare--;
1433 				break;
1434 			}
1435 		}
1436 	}
1437 
1438 	/*
1439 	 *	Otherwise everything's kosher.  There are not too few,
1440 	 *	or too many spare threads.  Exit happily.
1441 	 */
1442 	return;
1443 }
1444 #endif	/* WITH_GCD */
1445 
1446 #ifdef WNOHANG
1447 /*
1448  *	Thread wrapper for fork().
1449  */
rad_fork(void)1450 pid_t rad_fork(void)
1451 {
1452 	pid_t child_pid;
1453 
1454 	if (!pool_initialized) return fork();
1455 
1456 	reap_children();	/* be nice to non-wait thingies */
1457 
1458 	if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1459 		return -1;
1460 	}
1461 
1462 	/*
1463 	 *	Fork & save the PID for later reaping.
1464 	 */
1465 	child_pid = fork();
1466 	if (child_pid > 0) {
1467 		int rcode;
1468 		thread_fork_t *tf;
1469 
1470 		tf = rad_malloc(sizeof(*tf));
1471 		memset(tf, 0, sizeof(*tf));
1472 
1473 		tf->pid = child_pid;
1474 
1475 		pthread_mutex_lock(&thread_pool.wait_mutex);
1476 		rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1477 		pthread_mutex_unlock(&thread_pool.wait_mutex);
1478 
1479 		if (!rcode) {
1480 			ERROR("Failed to store PID, creating what will be a zombie process %d",
1481 			       (int) child_pid);
1482 			free(tf);
1483 		}
1484 	}
1485 
1486 	/*
1487 	 *	Return whatever we were told.
1488 	 */
1489 	return child_pid;
1490 }
1491 
1492 
1493 /*
1494  *	Wait 10 seconds at most for a child to exit, then give up.
1495  */
rad_waitpid(pid_t pid,int * status)1496 pid_t rad_waitpid(pid_t pid, int *status)
1497 {
1498 	int i;
1499 	thread_fork_t mytf, *tf;
1500 
1501 	if (!pool_initialized) return waitpid(pid, status, 0);
1502 
1503 	if (pid <= 0) return -1;
1504 
1505 	mytf.pid = pid;
1506 
1507 	pthread_mutex_lock(&thread_pool.wait_mutex);
1508 	tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1509 	pthread_mutex_unlock(&thread_pool.wait_mutex);
1510 
1511 	if (!tf) return -1;
1512 
1513 	for (i = 0; i < 100; i++) {
1514 		reap_children();
1515 
1516 		if (tf->exited) {
1517 			*status = tf->status;
1518 
1519 			pthread_mutex_lock(&thread_pool.wait_mutex);
1520 			fr_hash_table_delete(thread_pool.waiters, &mytf);
1521 			pthread_mutex_unlock(&thread_pool.wait_mutex);
1522 			return pid;
1523 		}
1524 		usleep(100000);	/* sleep for 1/10 of a second */
1525 	}
1526 
1527 	/*
1528 	 *	10 seconds have passed, give up on the child.
1529 	 */
1530 	pthread_mutex_lock(&thread_pool.wait_mutex);
1531 	fr_hash_table_delete(thread_pool.waiters, &mytf);
1532 	pthread_mutex_unlock(&thread_pool.wait_mutex);
1533 
1534 	return 0;
1535 }
1536 #else
1537 /*
1538  *	No rad_fork or rad_waitpid
1539  */
1540 #endif
1541 
thread_pool_queue_stats(int array[RAD_LISTEN_MAX],int pps[2])1542 void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
1543 {
1544 	int i;
1545 
1546 #ifndef WITH_GCD
1547 	if (pool_initialized) {
1548 		struct timeval now;
1549 
1550 		for (i = 0; i < RAD_LISTEN_MAX; i++) {
1551 #ifndef HAVE_STDATOMIC_H
1552 			array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1553 #else
1554 			array[i] = 0;
1555 #endif
1556 		}
1557 
1558 		gettimeofday(&now, NULL);
1559 
1560 		pps[0] = rad_pps(&thread_pool.pps_in.pps_old,
1561 				 &thread_pool.pps_in.pps_now,
1562 				 &thread_pool.pps_in.time_old,
1563 				 &now);
1564 		pps[1] = rad_pps(&thread_pool.pps_out.pps_old,
1565 				 &thread_pool.pps_out.pps_now,
1566 				 &thread_pool.pps_out.time_old,
1567 				 &now);
1568 
1569 	} else
1570 #endif	/* WITH_GCD */
1571 	{
1572 		for (i = 0; i < RAD_LISTEN_MAX; i++) {
1573 			array[i] = 0;
1574 		}
1575 
1576 		pps[0] = pps[1] = 0;
1577 	}
1578 }
1579 #endif /* HAVE_PTHREAD_H */
1580 
time_free(void * data)1581 static void time_free(void *data)
1582 {
1583 	free(data);
1584 }
1585 
exec_trigger(REQUEST * request,CONF_SECTION * cs,char const * name,int quench)1586 void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, int quench)
1587 {
1588 	CONF_SECTION *subcs;
1589 	CONF_ITEM *ci;
1590 	CONF_PAIR *cp;
1591 	char const *attr;
1592 	char const *value;
1593 	VALUE_PAIR *vp;
1594 	bool alloc = false;
1595 
1596 	/*
1597 	 *	Use global "trigger" section if no local config is given.
1598 	 */
1599 	if (!cs) {
1600 		cs = main_config.config;
1601 		attr = name;
1602 	} else {
1603 		/*
1604 		 *	Try to use pair name, rather than reference.
1605 		 */
1606 		attr = strrchr(name, '.');
1607 		if (attr) {
1608 			attr++;
1609 		} else {
1610 			attr = name;
1611 		}
1612 	}
1613 
1614 	/*
1615 	 *	Find local "trigger" subsection.  If it isn't found,
1616 	 *	try using the global "trigger" section, and reset the
1617 	 *	reference to the full path, rather than the sub-path.
1618 	 */
1619 	subcs = cf_section_sub_find(cs, "trigger");
1620 	if (!subcs && (cs != main_config.config)) {
1621 		subcs = cf_section_sub_find(main_config.config, "trigger");
1622 		attr = name;
1623 	}
1624 
1625 	if (!subcs) return;
1626 
1627 	ci = cf_reference_item(subcs, main_config.config, attr);
1628 	if (!ci) {
1629 		ERROR("No such item in trigger section: %s", attr);
1630 		return;
1631 	}
1632 
1633 	if (!cf_item_is_pair(ci)) {
1634 		ERROR("Trigger is not a configuration variable: %s", attr);
1635 		return;
1636 	}
1637 
1638 	cp = cf_item_to_pair(ci);
1639 	if (!cp) return;
1640 
1641 	value = cf_pair_value(cp);
1642 	if (!value) {
1643 		ERROR("Trigger has no value: %s", name);
1644 		return;
1645 	}
1646 
1647 	/*
1648 	 *	May be called for Status-Server packets.
1649 	 */
1650 	vp = NULL;
1651 	if (request && request->packet) vp = request->packet->vps;
1652 
1653 	/*
1654 	 *	Perform periodic quenching.
1655 	 */
1656 	if (quench) {
1657 		time_t *last_time;
1658 
1659 		last_time = cf_data_find(cs, value);
1660 		if (!last_time) {
1661 			last_time = rad_malloc(sizeof(*last_time));
1662 			*last_time = 0;
1663 
1664 			if (cf_data_add(cs, value, last_time, time_free) < 0) {
1665 				free(last_time);
1666 				last_time = NULL;
1667 			}
1668 		}
1669 
1670 		/*
1671 		 *	Send the quenched traps at most once per second.
1672 		 */
1673 		if (last_time) {
1674 			time_t now = time(NULL);
1675 			if (*last_time == now) return;
1676 
1677 			*last_time = now;
1678 		}
1679 	}
1680 
1681 	/*
1682 	 *	radius_exec_program always needs a request.
1683 	 */
1684 	if (!request) {
1685 		request = request_alloc(NULL);
1686 		alloc = true;
1687 	}
1688 
1689 	DEBUG("Trigger %s -> %s", name, value);
1690 
1691 	radius_exec_program(request, NULL, 0, NULL, request, value, vp, false, true, 0);
1692 
1693 	if (alloc) talloc_free(request);
1694 }
1695