1 /*
2 * threads.c request threading support
3 *
4 * Version: $Id: a9bd63bb57484df65fe716a31ec46568c596936c $
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 *
20 * Copyright 2000,2006 The FreeRADIUS server project
21 * Copyright 2000 Alan DeKok <aland@ox.org>
22 */
23
24 RCSID("$Id: a9bd63bb57484df65fe716a31ec46568c596936c $")
25 USES_APPLE_DEPRECATED_API /* OpenSSL API has been deprecated by Apple */
26
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/process.h>
29
30 #ifdef HAVE_STDATOMIC_H
31 #include <freeradius-devel/atomic_queue.h>
32 #endif
33
34 #include <freeradius-devel/rad_assert.h>
35
36 /*
37 * Other OS's have sem_init, OS X doesn't.
38 */
39 #ifdef HAVE_SEMAPHORE_H
40 #include <semaphore.h>
41 #endif
42
43 #ifdef __APPLE__
44 #ifdef WITH_GCD
45 #include <dispatch/dispatch.h>
46 #endif
47 #include <mach/task.h>
48 #include <mach/mach_init.h>
49 #include <mach/semaphore.h>
50
51 #ifndef WITH_GCD
52 #undef sem_t
53 #define sem_t semaphore_t
54 #undef sem_init
55 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
56 #undef sem_wait
57 #define sem_wait(s) semaphore_wait(*s)
58 #undef sem_post
59 #define sem_post(s) semaphore_signal(*s)
60 #endif /* WITH_GCD */
61 #endif /* __APPLE__ */
62
63 #ifdef HAVE_SYS_WAIT_H
64 #include <sys/wait.h>
65 #endif
66
67 #ifdef HAVE_PTHREAD_H
68
69 #ifdef HAVE_OPENSSL_CRYPTO_H
70 #include <openssl/crypto.h>
71 #endif
72 #ifdef HAVE_OPENSSL_ERR_H
73 #include <openssl/err.h>
74 #endif
75 #ifdef HAVE_OPENSSL_EVP_H
76 #include <openssl/evp.h>
77 #endif
78
79 #ifndef WITH_GCD
80 #define SEMAPHORE_LOCKED (0)
81
82 #define THREAD_RUNNING (1)
83 #define THREAD_CANCELLED (2)
84 #define THREAD_EXITED (3)
85
86 #define NUM_FIFOS RAD_LISTEN_MAX
87
88 #ifndef HAVE_STDALIGN_H
89 #undef HAVE_STDATOMIC_H
90 #endif
91
92 #ifdef HAVE_STDATOMIC_H
93 #define CAS_INCR(_x) do { uint32_t num; \
94 num = load(_x); \
95 if (cas_incr(_x, num)) break; \
96 } while (true)
97
98 #define CAS_DECR(_x) do { uint32_t num; \
99 num = load(_x); \
100 if (cas_decr(_x, num)) break; \
101 } while (true)
102 #endif
103
104 /*
105 * A data structure which contains the information about
106 * the current thread.
107 */
108 typedef struct THREAD_HANDLE {
109 struct THREAD_HANDLE *prev; //!< Previous thread handle (in the linked list).
110 struct THREAD_HANDLE *next; //!< Next thread handle (int the linked list).
111 pthread_t pthread_id; //!< pthread_id.
112 int thread_num; //!< Server thread number, 1...number of threads.
113 int status; //!< Is the thread running or exited?
114 unsigned int request_count; //!< The number of requests that this thread has handled.
115 time_t timestamp; //!< When the thread started executing.
116 REQUEST *request;
117 } THREAD_HANDLE;
118
119 #endif /* WITH_GCD */
120
121 #ifdef WNOHANG
122 typedef struct thread_fork_t {
123 pid_t pid;
124 int status;
125 int exited;
126 } thread_fork_t;
127 #endif
128
129
130 #ifdef WITH_STATS
131 typedef struct fr_pps_t {
132 uint32_t pps_old;
133 uint32_t pps_now;
134 uint32_t pps;
135 time_t time_old;
136 } fr_pps_t;
137 #endif
138
139
140 /*
141 * A data structure to manage the thread pool. There's no real
142 * need for a data structure, but it makes things conceptually
143 * easier.
144 */
145 typedef struct THREAD_POOL {
146 #ifndef WITH_GCD
147 THREAD_HANDLE *head;
148 THREAD_HANDLE *tail;
149
150 uint32_t total_threads;
151
152 uint32_t max_thread_num;
153 uint32_t start_threads;
154 uint32_t max_threads;
155 uint32_t min_spare_threads;
156 uint32_t max_spare_threads;
157 uint32_t max_requests_per_thread;
158 uint32_t request_count;
159 time_t time_last_spawned;
160 uint32_t cleanup_delay;
161 bool stop_flag;
162 #endif /* WITH_GCD */
163 bool spawn_flag;
164
165 #ifdef WNOHANG
166 pthread_mutex_t wait_mutex;
167 fr_hash_table_t *waiters;
168 #endif
169
170 #ifdef WITH_GCD
171 dispatch_queue_t queue;
172 #else
173
174 #ifdef WITH_STATS
175 fr_pps_t pps_in, pps_out;
176 #ifdef WITH_ACCOUNTING
177 bool auto_limit_acct;
178 #endif
179 #endif
180
181 /*
182 * All threads wait on this semaphore, for requests
183 * to enter the queue.
184 */
185 sem_t semaphore;
186
187 uint32_t max_queue_size;
188
189 #ifndef HAVE_STDATOMIC_H
190 /*
191 * To ensure only one thread at a time touches the queue.
192 */
193 pthread_mutex_t queue_mutex;
194
195 uint32_t active_threads; /* protected by queue_mutex */
196 uint32_t exited_threads;
197 uint32_t num_queued;
198 fr_fifo_t *fifo[NUM_FIFOS];
199 #else
200 atomic_uint32_t active_threads;
201 atomic_uint32_t exited_threads;
202 fr_atomic_queue_t *queue[NUM_FIFOS];
203 #endif /* STDATOMIC */
204 #endif /* WITH_GCD */
205 } THREAD_POOL;
206
207 static THREAD_POOL thread_pool;
208 static bool pool_initialized = false;
209
210 #ifndef WITH_GCD
211 static time_t last_cleaned = 0;
212
213 static void thread_pool_manage(time_t now);
214 #endif
215
216 #ifndef WITH_GCD
217 /*
218 * A mapping of configuration file names to internal integers
219 */
220 static const CONF_PARSER thread_config[] = {
221 { "start_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.start_threads), "5" },
222 { "max_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_threads), "32" },
223 { "min_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.min_spare_threads), "3" },
224 { "max_spare_servers", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_spare_threads), "10" },
225 { "max_requests_per_server", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), "0" },
226 { "cleanup_delay", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.cleanup_delay), "5" },
227 { "max_queue_size", FR_CONF_POINTER(PW_TYPE_INTEGER, &thread_pool.max_queue_size), "65536" },
228 #ifdef WITH_STATS
229 #ifdef WITH_ACCOUNTING
230 { "auto_limit_acct", FR_CONF_POINTER(PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct), NULL },
231 #endif
232 #endif
233 CONF_PARSER_TERMINATOR
234 };
235 #endif
236
237 #ifdef HAVE_OPENSSL_CRYPTO_H
238
239 /*
240 * If we're linking against OpenSSL, then it is the
241 * duty of the application, if it is multithreaded,
242 * to provide OpenSSL with appropriate thread id
243 * and mutex locking functions
244 *
245 * Note: this only implements static callbacks.
246 * OpenSSL does not use dynamic locking callbacks
247 * right now, but may in the future, so we will have
248 * to add them at some point.
249 */
250
251 static pthread_mutex_t *ssl_mutexes = NULL;
252
253 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
get_ssl_id(void)254 static unsigned long get_ssl_id(void)
255 {
256 unsigned long ret;
257 pthread_t thread = pthread_self();
258
259 if (sizeof(ret) >= sizeof(thread)) {
260 memcpy(&ret, &thread, sizeof(thread));
261 } else {
262 memcpy(&ret, &thread, sizeof(ret));
263 }
264
265 return ret;
266 }
267
268 /*
269 * Use preprocessor magic to get the right function and argument
270 * to use. This avoids ifdef's through the rest of the code.
271 */
272 #if OPENSSL_VERSION_NUMBER < 0x10000000L
273 #define ssl_id_function get_ssl_id
274 #define set_id_callback CRYPTO_set_id_callback
275
276 #else
ssl_id_function(CRYPTO_THREADID * id)277 static void ssl_id_function(CRYPTO_THREADID *id)
278 {
279 CRYPTO_THREADID_set_numeric(id, get_ssl_id());
280 }
281 #define set_id_callback CRYPTO_THREADID_set_callback
282 #endif
283 #endif
284
285 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
ssl_locking_function(int mode,int n,UNUSED char const * file,UNUSED int line)286 static void ssl_locking_function(int mode, int n, UNUSED char const *file, UNUSED int line)
287 {
288 if (mode & CRYPTO_LOCK) {
289 pthread_mutex_lock(&(ssl_mutexes[n]));
290 } else {
291 pthread_mutex_unlock(&(ssl_mutexes[n]));
292 }
293 }
294 #endif
295
296 /*
297 * Create the TLS mutexes.
298 */
tls_mutexes_init(void)299 int tls_mutexes_init(void)
300 {
301 int i;
302
303 ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
304 if (!ssl_mutexes) {
305 ERROR("Error allocating memory for SSL mutexes!");
306 return -1;
307 }
308
309 for (i = 0; i < CRYPTO_num_locks(); i++) {
310 pthread_mutex_init(&(ssl_mutexes[i]), NULL);
311 }
312
313 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
314 set_id_callback(ssl_id_function);
315 #endif
316 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
317 CRYPTO_set_locking_callback(ssl_locking_function);
318 #endif
319
320 return 0;
321 }
322 #endif
323
324 #ifdef WNOHANG
325 /*
326 * We don't want to catch SIGCHLD for a host of reasons.
327 *
328 * - exec_wait means that someone, somewhere, somewhen, will
329 * call waitpid(), and catch the child.
330 *
331 * - SIGCHLD is delivered to a random thread, not the one that
332 * forked.
333 *
334 * - if another thread catches the child, we have to coordinate
335 * with the thread doing the waiting.
336 *
337 * - if we don't waitpid() for non-wait children, they'll be zombies,
338 * and will hang around forever.
339 *
340 */
reap_children(void)341 static void reap_children(void)
342 {
343 pid_t pid;
344 int status;
345 thread_fork_t mytf, *tf;
346
347
348 pthread_mutex_lock(&thread_pool.wait_mutex);
349
350 do {
351 retry:
352 pid = waitpid(0, &status, WNOHANG);
353 if (pid <= 0) break;
354
355 mytf.pid = pid;
356 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
357 if (!tf) goto retry;
358
359 tf->status = status;
360 tf->exited = 1;
361 } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
362
363 pthread_mutex_unlock(&thread_pool.wait_mutex);
364 }
365 #else
366 #define reap_children()
367 #endif /* WNOHANG */
368
369 #ifndef WITH_GCD
370 /*
371 * Add a request to the list of waiting requests.
372 * This function gets called ONLY from the main handler thread...
373 *
374 * This function should never fail.
375 */
request_enqueue(REQUEST * request)376 int request_enqueue(REQUEST *request)
377 {
378 bool managed = false;
379
380 rad_assert(pool_initialized == true);
381
382 /*
383 * If we haven't checked the number of child threads
384 * in a while, OR if the thread pool appears to be full,
385 * go manage it.
386 */
387 if (last_cleaned < request->timestamp) {
388 thread_pool_manage(request->timestamp);
389 managed = true;
390 }
391
392 #ifdef HAVE_STDATOMIC_H
393 if (!managed) {
394 uint32_t num;
395
396 num = load(thread_pool.active_threads);
397 if (num == thread_pool.total_threads) {
398 thread_pool_manage(request->timestamp);
399 managed = true;
400 }
401
402 if (!managed) {
403 num = load(thread_pool.exited_threads);
404 if (num > 0) {
405 thread_pool_manage(request->timestamp);
406 }
407 }
408 }
409
410 /*
411 * Use atomic queues where possible. They're substantially faster than mutexes.
412 */
413 request->component = "<core>";
414 request->module = "<queue>";
415 request->child_state = REQUEST_QUEUED;
416
417 /*
418 * Push the request onto the appropriate fifo for that
419 */
420 if (!fr_atomic_queue_push(thread_pool.queue[request->priority], request)) {
421 ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
422 return 0;
423 }
424
425 #else /* no atomic queues */
426
427 if (!managed &&
428 ((thread_pool.active_threads == thread_pool.total_threads) ||
429 (thread_pool.exited_threads > 0))) {
430 thread_pool_manage(request->timestamp);
431 }
432
433 pthread_mutex_lock(&thread_pool.queue_mutex);
434
435 #ifdef WITH_STATS
436 #ifdef WITH_ACCOUNTING
437 if (thread_pool.auto_limit_acct) {
438 struct timeval now;
439
440 /*
441 * Throw away accounting requests if we're too
442 * busy. The NAS should retransmit these, and no
443 * one should notice.
444 *
445 * In contrast, we always try to process
446 * authentication requests. Those are more time
447 * critical, and it's harder to determine which
448 * we can throw away, and which we can keep.
449 *
450 * We allow the queue to get half full before we
451 * start worrying. Even then, we still require
452 * that the rate of input packets is higher than
453 * the rate of outgoing packets. i.e. the queue
454 * is growing.
455 *
456 * Once that happens, we roll a dice to see where
457 * the barrier is for "keep" versus "toss". If
458 * the queue is smaller than the barrier, we
459 * allow it. If the queue is larger than the
460 * barrier, we throw the packet away. Otherwise,
461 * we keep it.
462 *
463 * i.e. the probability of throwing the packet
464 * away increases from 0 (queue is half full), to
465 * 100 percent (queue is completely full).
466 *
467 * A probabilistic approach allows us to process
468 * SOME of the new accounting packets.
469 */
470 if ((request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
471 (thread_pool.num_queued > (thread_pool.max_queue_size / 2)) &&
472 (thread_pool.pps_in.pps_now > thread_pool.pps_out.pps_now)) {
473 uint32_t prob;
474 uint32_t keep;
475
476 /*
477 * Take a random value of how full we
478 * want the queue to be. It's OK to be
479 * half full, but we get excited over
480 * anything more than that.
481 */
482 keep = (thread_pool.max_queue_size / 2);
483 prob = fr_rand() & ((1 << 10) - 1);
484 keep *= prob;
485 keep >>= 10;
486 keep += (thread_pool.max_queue_size / 2);
487
488 /*
489 * If the queue is larger than our dice
490 * roll, we throw the packet away.
491 */
492 if (thread_pool.num_queued > keep) {
493 pthread_mutex_unlock(&thread_pool.queue_mutex);
494 return 0;
495 }
496 }
497
498 gettimeofday(&now, NULL);
499
500 /*
501 * Calculate the instantaneous arrival rate into
502 * the queue.
503 */
504 thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
505 &thread_pool.pps_in.pps_now,
506 &thread_pool.pps_in.time_old,
507 &now);
508
509 thread_pool.pps_in.pps_now++;
510 }
511 #endif /* WITH_ACCOUNTING */
512 #endif
513
514 thread_pool.request_count++;
515
516 if (thread_pool.num_queued >= thread_pool.max_queue_size) {
517 pthread_mutex_unlock(&thread_pool.queue_mutex);
518
519 /*
520 * Mark the request as done.
521 */
522 RATE_LIMIT(ERROR("Something is blocking the server. There are %d packets in the queue, "
523 "waiting to be processed. Ignoring the new request.", thread_pool.num_queued));
524 return 0;
525 }
526
527 request->component = "<core>";
528 request->module = "<queue>";
529 request->child_state = REQUEST_QUEUED;
530
531 /*
532 * Push the request onto the appropriate fifo for that
533 */
534 if (!fr_fifo_push(thread_pool.fifo[request->priority], request)) {
535 pthread_mutex_unlock(&thread_pool.queue_mutex);
536 ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
537 return 0;
538 }
539
540 thread_pool.num_queued++;
541
542 pthread_mutex_unlock(&thread_pool.queue_mutex);
543 #endif
544
545 /*
546 * There's one more request in the queue.
547 *
548 * Note that we're not touching the queue any more, so
549 * the semaphore post is outside of the mutex. This also
550 * means that when the thread wakes up and tries to lock
551 * the mutex, it will be unlocked, and there won't be
552 * contention.
553 */
554 sem_post(&thread_pool.semaphore);
555
556 return 1;
557 }
558
559 /*
560 * Remove a request from the queue.
561 */
request_dequeue(REQUEST ** prequest)562 static int request_dequeue(REQUEST **prequest)
563 {
564 time_t blocked;
565 static time_t last_complained = 0;
566 static time_t total_blocked = 0;
567 int num_blocked = 0;
568 #ifndef HAVE_STDATOMIC_H
569 RAD_LISTEN_TYPE start;
570 #endif
571 RAD_LISTEN_TYPE i;
572 REQUEST *request = NULL;
573 reap_children();
574
575 rad_assert(pool_initialized == true);
576
577 #ifdef HAVE_STDATOMIC_H
578 retry:
579 for (i = 0; i < NUM_FIFOS; i++) {
580 if (!fr_atomic_queue_pop(thread_pool.queue[i], (void **) &request)) continue;
581
582 rad_assert(request != NULL);
583
584 VERIFY_REQUEST(request);
585
586 if (request->master_state != REQUEST_STOP_PROCESSING) {
587 break;
588 }
589
590 /*
591 * This entry was marked to be stopped. Acknowledge it.
592 */
593 request->child_state = REQUEST_DONE;
594 }
595
596 /*
597 * Popping might fail. If so, return.
598 */
599 if (!request) return 0;
600
601 #else
602 pthread_mutex_lock(&thread_pool.queue_mutex);
603
604 #ifdef WITH_STATS
605 #ifdef WITH_ACCOUNTING
606 if (thread_pool.auto_limit_acct) {
607 struct timeval now;
608
609 gettimeofday(&now, NULL);
610
611 /*
612 * Calculate the instantaneous departure rate
613 * from the queue.
614 */
615 thread_pool.pps_out.pps = rad_pps(&thread_pool.pps_out.pps_old,
616 &thread_pool.pps_out.pps_now,
617 &thread_pool.pps_out.time_old,
618 &now);
619 thread_pool.pps_out.pps_now++;
620 }
621 #endif
622 #endif
623
624 /*
625 * Clear old requests from all queues.
626 *
627 * We only do one pass over the queue, in order to
628 * amortize the work across the child threads. Since we
629 * do N checks for one request de-queued, the old
630 * requests will be quickly cleared.
631 */
632 for (i = 0; i < NUM_FIFOS; i++) {
633 request = fr_fifo_peek(thread_pool.fifo[i]);
634 if (!request) continue;
635
636 VERIFY_REQUEST(request);
637
638 if (request->master_state != REQUEST_STOP_PROCESSING) {
639 continue;
640 }
641
642 /*
643 * This entry was marked to be stopped. Acknowledge it.
644 */
645 request = fr_fifo_pop(thread_pool.fifo[i]);
646 rad_assert(request != NULL);
647 VERIFY_REQUEST(request);
648 request->child_state = REQUEST_DONE;
649 thread_pool.num_queued--;
650 }
651
652 start = 0;
653 retry:
654 /*
655 * Pop results from the top of the queue
656 */
657 for (i = start; i < NUM_FIFOS; i++) {
658 request = fr_fifo_pop(thread_pool.fifo[i]);
659 if (request) {
660 VERIFY_REQUEST(request);
661 start = i;
662 break;
663 }
664 }
665
666 if (!request) {
667 pthread_mutex_unlock(&thread_pool.queue_mutex);
668 *prequest = NULL;
669 return 0;
670 }
671
672 rad_assert(thread_pool.num_queued > 0);
673 thread_pool.num_queued--;
674 #endif /* HAVE_STD_ATOMIC_H */
675
676 *prequest = request;
677
678 rad_assert(*prequest != NULL);
679 rad_assert(request->magic == REQUEST_MAGIC);
680
681 request->component = "<core>";
682 request->module = "";
683 request->child_state = REQUEST_RUNNING;
684
685 /*
686 * If the request has sat in the queue for too long,
687 * kill it.
688 *
689 * The main clean-up code can't delete the request from
690 * the queue, and therefore won't clean it up until we
691 * have acknowledged it as "done".
692 */
693 if (request->master_state == REQUEST_STOP_PROCESSING) {
694 request->module = "<done>";
695 request->child_state = REQUEST_DONE;
696 goto retry;
697 }
698
699 /*
700 * The thread is currently processing a request.
701 */
702 #ifdef HAVE_STDATOMIC_H
703 CAS_INCR(thread_pool.active_threads);
704 #else
705 thread_pool.active_threads++;
706 #endif
707
708 blocked = time(NULL);
709 if (!request->proxy && (blocked - request->timestamp) > 5) {
710 total_blocked++;
711 if (last_complained < blocked) {
712 last_complained = blocked;
713 blocked -= request->timestamp;
714 num_blocked = total_blocked;
715 } else {
716 blocked = 0;
717 }
718 } else {
719 total_blocked = 0;
720 blocked = 0;
721 }
722
723 #ifndef HAVE_STDATOMIC_H
724 pthread_mutex_unlock(&thread_pool.queue_mutex);
725 #endif
726
727 if (blocked) {
728 ERROR("%d requests have been waiting in the processing queue for %d seconds. Check that all databases are running properly!",
729 num_blocked, (int) blocked);
730 }
731
732 return 1;
733 }
734
735
736 /*
737 * The main thread handler for requests.
738 *
739 * Wait on the semaphore until we have it, and process the request.
740 */
request_handler_thread(void * arg)741 static void *request_handler_thread(void *arg)
742 {
743 THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
744
745 /*
746 * Loop forever, until told to exit.
747 */
748 do {
749 /*
750 * Wait to be signalled.
751 */
752 DEBUG2("Thread %d waiting to be assigned a request",
753 self->thread_num);
754 re_wait:
755 if (sem_wait(&thread_pool.semaphore) != 0) {
756 /*
757 * Interrupted system call. Go back to
758 * waiting, but DON'T print out any more
759 * text.
760 */
761 if ((errno == EINTR) || (errno == EAGAIN)) {
762 DEBUG2("Re-wait %d", self->thread_num);
763 goto re_wait;
764 }
765 ERROR("Thread %d failed waiting for semaphore: %s: Exiting\n",
766 self->thread_num, fr_syserror(errno));
767 break;
768 }
769
770 DEBUG2("Thread %d got semaphore", self->thread_num);
771
772 #ifdef HAVE_OPENSSL_ERR_H
773 /*
774 * Clear the error queue for the current thread.
775 */
776 ERR_clear_error();
777 #endif
778
779 /*
780 * The server is exiting. Don't dequeue any
781 * requests.
782 */
783 if (thread_pool.stop_flag) break;
784
785 /*
786 * Try to grab a request from the queue.
787 *
788 * It may be empty, in which case we fail
789 * gracefully.
790 */
791 if (!request_dequeue(&self->request)) continue;
792
793 self->request->child_pid = self->pthread_id;
794 self->request_count++;
795
796 DEBUG2("Thread %d handling request %d, (%d handled so far)",
797 self->thread_num, self->request->number,
798 self->request_count);
799
800 #ifndef HAVE_STDATOMIC_H
801 #ifdef WITH_ACCOUNTING
802 if ((self->request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
803 thread_pool.auto_limit_acct) {
804 VALUE_PAIR *vp;
805 REQUEST *request = self->request;
806
807 vp = radius_pair_create(request, &request->config,
808 181, VENDORPEC_FREERADIUS);
809 if (vp) vp->vp_integer = thread_pool.pps_in.pps;
810
811 vp = radius_pair_create(request, &request->config,
812 182, VENDORPEC_FREERADIUS);
813 if (vp) vp->vp_integer = thread_pool.pps_in.pps;
814
815 vp = radius_pair_create(request, &request->config,
816 183, VENDORPEC_FREERADIUS);
817 if (vp) {
818 vp->vp_integer = thread_pool.max_queue_size - thread_pool.num_queued;
819 vp->vp_integer *= 100;
820 vp->vp_integer /= thread_pool.max_queue_size;
821 }
822 }
823 #endif
824 #endif
825
826 self->request->process(self->request, FR_ACTION_RUN);
827 self->request = NULL;
828
829 #ifdef HAVE_STDATOMIC_H
830 CAS_DECR(thread_pool.active_threads);
831 #else
832 /*
833 * Update the active threads.
834 */
835 pthread_mutex_lock(&thread_pool.queue_mutex);
836 rad_assert(thread_pool.active_threads > 0);
837 thread_pool.active_threads--;
838 pthread_mutex_unlock(&thread_pool.queue_mutex);
839 #endif
840
841 /*
842 * If the thread has handled too many requests, then make it
843 * exit.
844 */
845 if ((thread_pool.max_requests_per_thread > 0) &&
846 (self->request_count >= thread_pool.max_requests_per_thread)) {
847 DEBUG2("Thread %d handled too many requests",
848 self->thread_num);
849 break;
850 }
851 } while (self->status != THREAD_CANCELLED);
852
853 DEBUG2("Thread %d exiting...", self->thread_num);
854
855 #ifdef HAVE_OPENSSL_ERR_H
856 /*
857 * If we linked with OpenSSL, the application
858 * must remove the thread's error queue before
859 * exiting to prevent memory leaks.
860 */
861 #if OPENSSL_VERSION_NUMBER < 0x10000000L
862 ERR_remove_state(0);
863 #elif OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER)
864 ERR_remove_thread_state(NULL);
865 #endif
866 #endif
867
868 #ifdef HAVE_STDATOMIC_H
869 CAS_INCR(thread_pool.exited_threads);
870 #else
871 pthread_mutex_lock(&thread_pool.queue_mutex);
872 thread_pool.exited_threads++;
873 pthread_mutex_unlock(&thread_pool.queue_mutex);
874 #endif
875
876 /*
877 * Do this as the LAST thing before exiting.
878 */
879 self->request = NULL;
880 self->status = THREAD_EXITED;
881 exec_trigger(NULL, NULL, "server.thread.stop", true);
882
883 return NULL;
884 }
885
886 /*
887 * Take a THREAD_HANDLE, delete it from the thread pool and
888 * free its resources.
889 *
890 * This function is called ONLY from the main server thread,
891 * ONLY after the thread has exited.
892 */
delete_thread(THREAD_HANDLE * handle)893 static void delete_thread(THREAD_HANDLE *handle)
894 {
895 THREAD_HANDLE *prev;
896 THREAD_HANDLE *next;
897
898 rad_assert(handle->request == NULL);
899
900 DEBUG2("Deleting thread %d", handle->thread_num);
901
902 prev = handle->prev;
903 next = handle->next;
904 rad_assert(thread_pool.total_threads > 0);
905 thread_pool.total_threads--;
906
907 /*
908 * Remove the handle from the list.
909 */
910 if (prev == NULL) {
911 rad_assert(thread_pool.head == handle);
912 thread_pool.head = next;
913 } else {
914 prev->next = next;
915 }
916
917 if (next == NULL) {
918 rad_assert(thread_pool.tail == handle);
919 thread_pool.tail = prev;
920 } else {
921 next->prev = prev;
922 }
923
924 /*
925 * Free the handle, now that it's no longer referencable.
926 */
927 free(handle);
928 }
929
930
931 /*
932 * Spawn a new thread, and place it in the thread pool.
933 *
934 * The thread is started initially in the blocked state, waiting
935 * for the semaphore.
936 */
spawn_thread(time_t now,int do_trigger)937 static THREAD_HANDLE *spawn_thread(time_t now, int do_trigger)
938 {
939 int rcode;
940 THREAD_HANDLE *handle;
941
942 /*
943 * Ensure that we don't spawn too many threads.
944 */
945 if (thread_pool.total_threads >= thread_pool.max_threads) {
946 DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
947 return NULL;
948 }
949
950 /*
951 * Allocate a new thread handle.
952 */
953 handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
954 memset(handle, 0, sizeof(THREAD_HANDLE));
955 handle->prev = NULL;
956 handle->next = NULL;
957 handle->thread_num = thread_pool.max_thread_num++;
958 handle->request_count = 0;
959 handle->status = THREAD_RUNNING;
960 handle->timestamp = time(NULL);
961
962 /*
963 * Create the thread joinable, so that it can be cleaned up
964 * using pthread_join().
965 *
966 * Note that the function returns non-zero on error, NOT
967 * -1. The return code is the error, and errno isn't set.
968 */
969 rcode = pthread_create(&handle->pthread_id, 0, request_handler_thread, handle);
970 if (rcode != 0) {
971 free(handle);
972 ERROR("Thread create failed: %s",
973 fr_syserror(rcode));
974 return NULL;
975 }
976
977 /*
978 * One more thread to go into the list.
979 */
980 thread_pool.total_threads++;
981 DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
982 handle->thread_num, thread_pool.total_threads);
983 if (do_trigger) exec_trigger(NULL, NULL, "server.thread.start", true);
984
985 /*
986 * Add the thread handle to the tail of the thread pool list.
987 */
988 if (thread_pool.tail) {
989 thread_pool.tail->next = handle;
990 handle->prev = thread_pool.tail;
991 thread_pool.tail = handle;
992 } else {
993 rad_assert(thread_pool.head == NULL);
994 thread_pool.head = thread_pool.tail = handle;
995 }
996
997 /*
998 * Update the time we last spawned a thread.
999 */
1000 thread_pool.time_last_spawned = now;
1001
1002 /*
1003 * Fire trigger if maximum number of threads reached
1004 */
1005 if (thread_pool.total_threads >= thread_pool.max_threads)
1006 exec_trigger(NULL, NULL, "server.thread.max_threads", true);
1007
1008 /*
1009 * And return the new handle to the caller.
1010 */
1011 return handle;
1012 }
1013 #endif /* WITH_GCD */
1014
1015
1016 #ifdef WNOHANG
pid_hash(void const * data)1017 static uint32_t pid_hash(void const *data)
1018 {
1019 thread_fork_t const *tf = data;
1020
1021 return fr_hash(&tf->pid, sizeof(tf->pid));
1022 }
1023
pid_cmp(void const * one,void const * two)1024 static int pid_cmp(void const *one, void const *two)
1025 {
1026 thread_fork_t const *a = one;
1027 thread_fork_t const *b = two;
1028
1029 return (a->pid - b->pid);
1030 }
1031 #endif
1032
1033 /*
1034 * Allocate the thread pool, and seed it with an initial number
1035 * of threads.
1036 *
1037 * FIXME: What to do on a SIGHUP???
1038 */
thread_pool_init(CONF_SECTION * cs,bool * spawn_flag)1039 int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
1040 {
1041 #ifndef WITH_GCD
1042 uint32_t i;
1043 int rcode;
1044 #endif
1045 CONF_SECTION *pool_cf;
1046 time_t now;
1047 #ifdef HAVE_STDATOMIC_H
1048 int num;
1049 #endif
1050
1051 now = time(NULL);
1052
1053 rad_assert(spawn_flag != NULL);
1054 rad_assert(*spawn_flag == true);
1055 rad_assert(pool_initialized == false); /* not called on HUP */
1056
1057 pool_cf = cf_subsection_find_next(cs, NULL, "thread");
1058 #ifdef WITH_GCD
1059 if (pool_cf) WARN("Built with Grand Central Dispatch. Ignoring 'thread' subsection");
1060 #else
1061 if (!pool_cf) *spawn_flag = false;
1062 #endif
1063
1064 /*
1065 * Initialize the thread pool to some reasonable values.
1066 */
1067 memset(&thread_pool, 0, sizeof(THREAD_POOL));
1068 #ifndef WITH_GCD
1069 thread_pool.head = NULL;
1070 thread_pool.tail = NULL;
1071 thread_pool.total_threads = 0;
1072 thread_pool.max_thread_num = 1;
1073 thread_pool.cleanup_delay = 5;
1074 thread_pool.stop_flag = false;
1075 #endif
1076 thread_pool.spawn_flag = *spawn_flag;
1077
1078 /*
1079 * Don't bother initializing the mutexes or
1080 * creating the hash tables. They won't be used.
1081 */
1082 if (!*spawn_flag) return 0;
1083
1084 #ifdef WNOHANG
1085 if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
1086 ERROR("FATAL: Failed to initialize wait mutex: %s",
1087 fr_syserror(errno));
1088 return -1;
1089 }
1090
1091 /*
1092 * Create the hash table of child PID's
1093 */
1094 thread_pool.waiters = fr_hash_table_create(pid_hash,
1095 pid_cmp,
1096 free);
1097 if (!thread_pool.waiters) {
1098 ERROR("FATAL: Failed to set up wait hash");
1099 return -1;
1100 }
1101 #endif
1102
1103 #ifndef WITH_GCD
1104 if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
1105 return -1;
1106 }
1107
1108 /*
1109 * Catch corner cases.
1110 */
1111 if (thread_pool.min_spare_threads < 1)
1112 thread_pool.min_spare_threads = 1;
1113 if (thread_pool.max_spare_threads < 1)
1114 thread_pool.max_spare_threads = 1;
1115 if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
1116 thread_pool.max_spare_threads = thread_pool.min_spare_threads;
1117 if (thread_pool.max_threads == 0)
1118 thread_pool.max_threads = 256;
1119 if ((thread_pool.max_queue_size < 2) || (thread_pool.max_queue_size > 1024*1024)) {
1120 ERROR("FATAL: max_queue_size value must be in range 2-1048576");
1121 return -1;
1122 }
1123
1124 if (thread_pool.start_threads > thread_pool.max_threads) {
1125 ERROR("FATAL: start_servers (%i) must be <= max_servers (%i)",
1126 thread_pool.start_threads, thread_pool.max_threads);
1127 return -1;
1128 }
1129 #endif /* WITH_GCD */
1130
1131 /*
1132 * The pool has already been initialized. Don't spawn
1133 * new threads, and don't forget about forked children.
1134 */
1135 if (pool_initialized) {
1136 return 0;
1137 }
1138
1139 #ifndef WITH_GCD
1140 /*
1141 * Initialize the queue of requests.
1142 */
1143 memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
1144 rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
1145 if (rcode != 0) {
1146 ERROR("FATAL: Failed to initialize semaphore: %s",
1147 fr_syserror(errno));
1148 return -1;
1149 }
1150
1151 #ifndef HAVE_STDATOMIC_H
1152 rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
1153 if (rcode != 0) {
1154 ERROR("FATAL: Failed to initialize queue mutex: %s",
1155 fr_syserror(errno));
1156 return -1;
1157 }
1158 #else
1159 num = 0;
1160 store(thread_pool.active_threads, num);
1161 store(thread_pool.exited_threads, num);
1162 #endif
1163
1164 /*
1165 * Allocate multiple fifos.
1166 */
1167 for (i = 0; i < NUM_FIFOS; i++) {
1168 #ifdef HAVE_STDATOMIC_H
1169 thread_pool.queue[i] = fr_atomic_queue_create(NULL, thread_pool.max_queue_size);
1170 if (!thread_pool.queue[i]) {
1171 ERROR("FATAL: Failed to set up request fifo");
1172 return -1;
1173 }
1174 #else
1175 thread_pool.fifo[i] = fr_fifo_create(NULL, thread_pool.max_queue_size, NULL);
1176 if (!thread_pool.fifo[i]) {
1177 ERROR("FATAL: Failed to set up request fifo");
1178 return -1;
1179 }
1180 #endif
1181 }
1182 #endif
1183
1184 #ifndef WITH_GCD
1185 /*
1186 * Create a number of waiting threads.
1187 *
1188 * If we fail while creating them, do something intelligent.
1189 */
1190 for (i = 0; i < thread_pool.start_threads; i++) {
1191 if (spawn_thread(now, 0) == NULL) {
1192 return -1;
1193 }
1194 }
1195 #else
1196 thread_pool.queue = dispatch_queue_create("org.freeradius.threads", NULL);
1197 if (!thread_pool.queue) {
1198 ERROR("Failed creating dispatch queue: %s", fr_syserror(errno));
1199 fr_exit(1);
1200 }
1201 #endif
1202
1203 DEBUG2("Thread pool initialized");
1204 pool_initialized = true;
1205 return 0;
1206 }
1207
1208
1209 /*
1210 * Stop all threads in the pool.
1211 */
thread_pool_stop(void)1212 void thread_pool_stop(void)
1213 {
1214 #ifndef WITH_GCD
1215 int i;
1216 int total_threads;
1217 THREAD_HANDLE *handle;
1218 THREAD_HANDLE *next;
1219
1220 if (!pool_initialized) return;
1221
1222 /*
1223 * Set pool stop flag.
1224 */
1225 thread_pool.stop_flag = true;
1226
1227 /*
1228 * Wakeup all threads to make them see stop flag.
1229 */
1230 total_threads = thread_pool.total_threads;
1231 for (i = 0; i != total_threads; i++) {
1232 sem_post(&thread_pool.semaphore);
1233 }
1234
1235 /*
1236 * Join and free all threads.
1237 */
1238 for (handle = thread_pool.head; handle; handle = next) {
1239 next = handle->next;
1240 pthread_join(handle->pthread_id, NULL);
1241 delete_thread(handle);
1242 }
1243
1244 for (i = 0; i < NUM_FIFOS; i++) {
1245 #ifdef HAVE_STDATOMIC_H
1246 talloc_free(thread_pool.queue[i]);
1247 #else
1248 fr_fifo_free(thread_pool.fifo[i]);
1249 #endif
1250 }
1251
1252 #ifdef WNOHANG
1253 fr_hash_table_free(thread_pool.waiters);
1254 #endif
1255
1256 #ifdef HAVE_OPENSSL_CRYPTO_H
1257 /*
1258 * We're no longer threaded. Remove the mutexes and free
1259 * the memory.
1260 */
1261 #ifdef HAVE_CRYPTO_SET_ID_CALLBACK
1262 set_id_callback(NULL);
1263 #endif
1264 #ifdef HAVE_CRYPTO_SET_LOCKING_CALLBACK
1265 CRYPTO_set_locking_callback(NULL);
1266 #endif
1267
1268 free(ssl_mutexes);
1269 #endif
1270
1271 #endif
1272 }
1273
1274
1275 #ifdef WITH_GCD
request_enqueue(REQUEST * request)1276 int request_enqueue(REQUEST *request)
1277 {
1278 dispatch_block_t block;
1279
1280 block = ^{
1281 request->process(request, FR_ACTION_RUN);
1282 };
1283
1284 dispatch_async(thread_pool.queue, block);
1285
1286 return 1;
1287 }
1288 #endif
1289
1290 #ifndef WITH_GCD
1291 /*
1292 * Check the min_spare_threads and max_spare_threads.
1293 *
1294 * If there are too many or too few threads waiting, then we
1295 * either create some more, or delete some.
1296 */
thread_pool_manage(time_t now)1297 static void thread_pool_manage(time_t now)
1298 {
1299 uint32_t spare;
1300 int i, total;
1301 THREAD_HANDLE *handle, *next;
1302 uint32_t active_threads;
1303
1304 /*
1305 * Loop over the thread pool, deleting exited threads.
1306 */
1307 for (handle = thread_pool.head; handle; handle = next) {
1308 next = handle->next;
1309
1310 /*
1311 * Maybe we've asked the thread to exit, and it
1312 * has agreed.
1313 */
1314 if (handle->status == THREAD_EXITED) {
1315 pthread_join(handle->pthread_id, NULL);
1316 delete_thread(handle);
1317
1318 #ifdef HAVE_STDATOMIC_H
1319 CAS_DECR(thread_pool.exited_threads);
1320 #else
1321 pthread_mutex_lock(&thread_pool.queue_mutex);
1322 thread_pool.exited_threads--;
1323 pthread_mutex_unlock(&thread_pool.queue_mutex);
1324 #endif
1325 }
1326 }
1327
1328 /*
1329 * We don't need a mutex lock here, as we're reading
1330 * active_threads, and not modifying it. We want a close
1331 * approximation of the number of active threads, and this
1332 * is good enough.
1333 */
1334 #ifdef HAVE_STDATOMIC_H
1335 active_threads = load(thread_pool.active_threads);
1336 #else
1337 active_threads = thread_pool.active_threads;
1338 #endif
1339 spare = thread_pool.total_threads - active_threads;
1340 if (rad_debug_lvl) {
1341 static uint32_t old_total = 0;
1342 static uint32_t old_active = 0;
1343
1344 if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) {
1345 DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
1346 thread_pool.total_threads, active_threads, spare);
1347 old_total = thread_pool.total_threads;
1348 old_active = active_threads;
1349 }
1350 }
1351
1352 /*
1353 * If there are too few spare threads. Go create some more.
1354 */
1355 if ((thread_pool.total_threads < thread_pool.max_threads) &&
1356 (spare < thread_pool.min_spare_threads)) {
1357 total = thread_pool.min_spare_threads - spare;
1358
1359 if ((total + thread_pool.total_threads) > thread_pool.max_threads) {
1360 total = thread_pool.max_threads - thread_pool.total_threads;
1361 }
1362
1363 DEBUG2("Threads: Spawning %d spares", total);
1364
1365 /*
1366 * Create a number of spare threads.
1367 */
1368 for (i = 0; i < total; i++) {
1369 handle = spawn_thread(now, 1);
1370 if (handle == NULL) {
1371 return;
1372 }
1373 }
1374
1375 return; /* there aren't too many spare threads */
1376 }
1377
1378 /*
1379 * Only delete spare threads if we haven't already done
1380 * so this second.
1381 */
1382 if (now == last_cleaned) {
1383 return;
1384 }
1385 last_cleaned = now;
1386
1387 /*
1388 * Only delete the spare threads if sufficient time has
1389 * passed since we last created one. This helps to minimize
1390 * the amount of create/delete cycles.
1391 */
1392 if ((now - thread_pool.time_last_spawned) < (int)thread_pool.cleanup_delay) {
1393 return;
1394 }
1395
1396 /*
1397 * If there are too many spare threads, delete one.
1398 *
1399 * Note that we only delete ONE at a time, instead of
1400 * wiping out many. This allows the excess servers to
1401 * be slowly reaped, just in case the load spike comes again.
1402 */
1403 if (spare > thread_pool.max_spare_threads) {
1404
1405 spare -= thread_pool.max_spare_threads;
1406
1407 DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1408
1409 /*
1410 * Walk through the thread pool, deleting the
1411 * first idle thread we come across.
1412 */
1413 for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1414 next = handle->next;
1415
1416 /*
1417 * If the thread is not handling a
1418 * request, but still live, then tell it
1419 * to exit.
1420 *
1421 * It will eventually wake up, and realize
1422 * it's been told to commit suicide.
1423 */
1424 if ((handle->request == NULL) &&
1425 (handle->status == THREAD_RUNNING)) {
1426 handle->status = THREAD_CANCELLED;
1427 /*
1428 * Post an extra semaphore, as a
1429 * signal to wake up, and exit.
1430 */
1431 sem_post(&thread_pool.semaphore);
1432 spare--;
1433 break;
1434 }
1435 }
1436 }
1437
1438 /*
1439 * Otherwise everything's kosher. There are not too few,
1440 * or too many spare threads. Exit happily.
1441 */
1442 return;
1443 }
1444 #endif /* WITH_GCD */
1445
1446 #ifdef WNOHANG
1447 /*
1448 * Thread wrapper for fork().
1449 */
rad_fork(void)1450 pid_t rad_fork(void)
1451 {
1452 pid_t child_pid;
1453
1454 if (!pool_initialized) return fork();
1455
1456 reap_children(); /* be nice to non-wait thingies */
1457
1458 if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1459 return -1;
1460 }
1461
1462 /*
1463 * Fork & save the PID for later reaping.
1464 */
1465 child_pid = fork();
1466 if (child_pid > 0) {
1467 int rcode;
1468 thread_fork_t *tf;
1469
1470 tf = rad_malloc(sizeof(*tf));
1471 memset(tf, 0, sizeof(*tf));
1472
1473 tf->pid = child_pid;
1474
1475 pthread_mutex_lock(&thread_pool.wait_mutex);
1476 rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1477 pthread_mutex_unlock(&thread_pool.wait_mutex);
1478
1479 if (!rcode) {
1480 ERROR("Failed to store PID, creating what will be a zombie process %d",
1481 (int) child_pid);
1482 free(tf);
1483 }
1484 }
1485
1486 /*
1487 * Return whatever we were told.
1488 */
1489 return child_pid;
1490 }
1491
1492
1493 /*
1494 * Wait 10 seconds at most for a child to exit, then give up.
1495 */
rad_waitpid(pid_t pid,int * status)1496 pid_t rad_waitpid(pid_t pid, int *status)
1497 {
1498 int i;
1499 thread_fork_t mytf, *tf;
1500
1501 if (!pool_initialized) return waitpid(pid, status, 0);
1502
1503 if (pid <= 0) return -1;
1504
1505 mytf.pid = pid;
1506
1507 pthread_mutex_lock(&thread_pool.wait_mutex);
1508 tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1509 pthread_mutex_unlock(&thread_pool.wait_mutex);
1510
1511 if (!tf) return -1;
1512
1513 for (i = 0; i < 100; i++) {
1514 reap_children();
1515
1516 if (tf->exited) {
1517 *status = tf->status;
1518
1519 pthread_mutex_lock(&thread_pool.wait_mutex);
1520 fr_hash_table_delete(thread_pool.waiters, &mytf);
1521 pthread_mutex_unlock(&thread_pool.wait_mutex);
1522 return pid;
1523 }
1524 usleep(100000); /* sleep for 1/10 of a second */
1525 }
1526
1527 /*
1528 * 10 seconds have passed, give up on the child.
1529 */
1530 pthread_mutex_lock(&thread_pool.wait_mutex);
1531 fr_hash_table_delete(thread_pool.waiters, &mytf);
1532 pthread_mutex_unlock(&thread_pool.wait_mutex);
1533
1534 return 0;
1535 }
1536 #else
1537 /*
1538 * No rad_fork or rad_waitpid
1539 */
1540 #endif
1541
thread_pool_queue_stats(int array[RAD_LISTEN_MAX],int pps[2])1542 void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
1543 {
1544 int i;
1545
1546 #ifndef WITH_GCD
1547 if (pool_initialized) {
1548 struct timeval now;
1549
1550 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1551 #ifndef HAVE_STDATOMIC_H
1552 array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1553 #else
1554 array[i] = 0;
1555 #endif
1556 }
1557
1558 gettimeofday(&now, NULL);
1559
1560 pps[0] = rad_pps(&thread_pool.pps_in.pps_old,
1561 &thread_pool.pps_in.pps_now,
1562 &thread_pool.pps_in.time_old,
1563 &now);
1564 pps[1] = rad_pps(&thread_pool.pps_out.pps_old,
1565 &thread_pool.pps_out.pps_now,
1566 &thread_pool.pps_out.time_old,
1567 &now);
1568
1569 } else
1570 #endif /* WITH_GCD */
1571 {
1572 for (i = 0; i < RAD_LISTEN_MAX; i++) {
1573 array[i] = 0;
1574 }
1575
1576 pps[0] = pps[1] = 0;
1577 }
1578 }
1579 #endif /* HAVE_PTHREAD_H */
1580
time_free(void * data)1581 static void time_free(void *data)
1582 {
1583 free(data);
1584 }
1585
exec_trigger(REQUEST * request,CONF_SECTION * cs,char const * name,int quench)1586 void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, int quench)
1587 {
1588 CONF_SECTION *subcs;
1589 CONF_ITEM *ci;
1590 CONF_PAIR *cp;
1591 char const *attr;
1592 char const *value;
1593 VALUE_PAIR *vp;
1594 bool alloc = false;
1595
1596 /*
1597 * Use global "trigger" section if no local config is given.
1598 */
1599 if (!cs) {
1600 cs = main_config.config;
1601 attr = name;
1602 } else {
1603 /*
1604 * Try to use pair name, rather than reference.
1605 */
1606 attr = strrchr(name, '.');
1607 if (attr) {
1608 attr++;
1609 } else {
1610 attr = name;
1611 }
1612 }
1613
1614 /*
1615 * Find local "trigger" subsection. If it isn't found,
1616 * try using the global "trigger" section, and reset the
1617 * reference to the full path, rather than the sub-path.
1618 */
1619 subcs = cf_section_sub_find(cs, "trigger");
1620 if (!subcs && (cs != main_config.config)) {
1621 subcs = cf_section_sub_find(main_config.config, "trigger");
1622 attr = name;
1623 }
1624
1625 if (!subcs) return;
1626
1627 ci = cf_reference_item(subcs, main_config.config, attr);
1628 if (!ci) {
1629 ERROR("No such item in trigger section: %s", attr);
1630 return;
1631 }
1632
1633 if (!cf_item_is_pair(ci)) {
1634 ERROR("Trigger is not a configuration variable: %s", attr);
1635 return;
1636 }
1637
1638 cp = cf_item_to_pair(ci);
1639 if (!cp) return;
1640
1641 value = cf_pair_value(cp);
1642 if (!value) {
1643 ERROR("Trigger has no value: %s", name);
1644 return;
1645 }
1646
1647 /*
1648 * May be called for Status-Server packets.
1649 */
1650 vp = NULL;
1651 if (request && request->packet) vp = request->packet->vps;
1652
1653 /*
1654 * Perform periodic quenching.
1655 */
1656 if (quench) {
1657 time_t *last_time;
1658
1659 last_time = cf_data_find(cs, value);
1660 if (!last_time) {
1661 last_time = rad_malloc(sizeof(*last_time));
1662 *last_time = 0;
1663
1664 if (cf_data_add(cs, value, last_time, time_free) < 0) {
1665 free(last_time);
1666 last_time = NULL;
1667 }
1668 }
1669
1670 /*
1671 * Send the quenched traps at most once per second.
1672 */
1673 if (last_time) {
1674 time_t now = time(NULL);
1675 if (*last_time == now) return;
1676
1677 *last_time = now;
1678 }
1679 }
1680
1681 /*
1682 * radius_exec_program always needs a request.
1683 */
1684 if (!request) {
1685 request = request_alloc(NULL);
1686 alloc = true;
1687 }
1688
1689 DEBUG("Trigger %s -> %s", name, value);
1690
1691 radius_exec_program(request, NULL, 0, NULL, request, value, vp, false, true, 0);
1692
1693 if (alloc) talloc_free(request);
1694 }
1695