1 /*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * @file connection.c
19 * @brief Handle pools of connections (threads, sockets, etc.)
20 * @note This API must be used by all modules in the public distribution that
21 * maintain pools of connections.
22 *
23 * @copyright 2012 The FreeRADIUS server project
24 * @copyright 2012 Alan DeKok <aland@deployingradius.com>
25 */
26 RCSID("$Id: 8c8c9b2255300c8ee257286be1ec597c7c506392 $")
27
28 #include <freeradius-devel/radiusd.h>
29 #include <freeradius-devel/heap.h>
30 #include <freeradius-devel/modpriv.h>
31 #include <freeradius-devel/rad_assert.h>
32
33 typedef struct fr_connection fr_connection_t;
34
35 static int fr_connection_pool_check(fr_connection_pool_t *pool);
36
37 #ifndef NDEBUG
38 #ifdef HAVE_PTHREAD_H
39 /* #define PTHREAD_DEBUG (1) */
40 #endif
41 #endif
42
43 /*
44 * We don't need to pollute the logs with "open / close"
45 * connection information. Instead, only print these messages
46 * when debugging.
47 */
48 #undef INFO
49 #define INFO(fmt, ...) if (rad_debug_lvl) radlog(L_INFO, fmt, ## __VA_ARGS__)
50
51 /** An individual connection within the connection pool
52 *
53 * Defines connection counters, timestamps, and holds a pointer to the
54 * connection handle itself.
55 *
56 * @see fr_connection_pool_t
57 */
58 struct fr_connection {
59 fr_connection_t *prev; //!< Previous connection in list.
60 fr_connection_t *next; //!< Next connection in list.
61
62 time_t created; //!< Time connection was created.
63 struct timeval last_reserved; //!< Last time the connection was reserved.
64
65 struct timeval last_released; //!< Time the connection was released.
66
67 uint32_t num_uses; //!< Number of times the connection has been reserved.
68 uint64_t number; //!< Unique ID assigned when the connection is created,
69 //!< these will monotonically increase over the
70 //!< lifetime of the connection pool.
71 void *connection; //!< Pointer to whatever the module uses for a connection
72 //!< handle.
73 bool in_use; //!< Whether the connection is currently reserved.
74
75 int heap; //!< For the next connection heap.
76
77 #ifdef PTHREAD_DEBUG
78 pthread_t pthread_id; //!< When 'in_use == true'.
79 #endif
80 };
81
82 /** A connection pool
83 *
84 * Defines the configuration of the connection pool, all the counters and
85 * timestamps related to the connection pool, the mutex that stops multiple
86 * threads leaving the pool in an inconsistent state, and the callbacks
87 * required to open, close and check the status of connections within the pool.
88 *
89 * @see fr_connection
90 */
91 struct fr_connection_pool_t {
92 int ref; //!< Reference counter to prevent connection
93 //!< pool being freed multiple times.
94 uint32_t start; //!< Number of initial connections.
95 uint32_t min; //!< Minimum number of concurrent connections to keep open.
96 uint32_t max; //!< Maximum number of concurrent connections to allow.
97 uint32_t spare; //!< Number of spare connections to try.
98 uint32_t pending; //!< Number of pending open connections.
99 uint32_t retry_delay; //!< seconds to delay re-open after a failed open.
100 uint32_t cleanup_interval; //!< Initial timer for how often we sweep the pool
101 //!< for free connections. (0 is infinite).
102 int delay_interval; //!< When we next do a cleanup. Initialized to
103 //!< cleanup_interval, and increase from there based
104 //!< on the delay.
105 int next_delay; //!< The next delay time. cleanup. Initialized to
106 //!< cleanup_interval, and decays from there.
107 uint64_t max_uses; //!< Maximum number of times a connection can be used
108 //!< before being closed.
109 uint32_t lifetime; //!< How long a connection can be open before being
110 //!< closed (irrespective of whether it's idle or not).
111 uint32_t idle_timeout; //!< How long a connection can be idle before
112 //!< being closed.
113
114 bool spread; //!< If true we spread requests over the connections,
115 //!< using the connection released longest ago, first.
116
117 time_t last_checked; //!< Last time we pruned the connection pool.
118 time_t last_spawned; //!< Last time we spawned a connection.
119 time_t last_failed; //!< Last time we tried to spawn a connection but failed.
120 time_t last_throttled; //!< Last time we refused to spawn a connection because
121 //!< the last connection failed, or we were already spawning
122 //!< a connection.
123 time_t last_at_max; //!< Last time we hit the maximum number of allowed
124 //!< connections.
125
126 uint32_t max_pending; //!< Max number of connections to open.
127
128 uint64_t count; //!< Number of connections spawned over the lifetime
129 //!< of the pool.
130 uint32_t num; //!< Number of connections in the pool.
131 uint32_t active; //!< Number of currently reserved connections.
132
133 fr_heap_t *heap; //!< For the next connection heap
134
135 fr_connection_t *head; //!< Start of the connection list.
136 fr_connection_t *tail; //!< End of the connection list.
137
138 #ifdef HAVE_PTHREAD_H
139 pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
140 //!< modifications in threaded mode.
141 #endif
142
143 CONF_SECTION *cs; //!< Configuration section holding the section of parsed
144 //!< config file that relates to this pool.
145 void *opaque; //!< Pointer to context data that will be passed to callbacks.
146
147 char const *log_prefix; //!< Log prefix to prepend to all log messages created
148 //!< by the connection pool code.
149
150 char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
151 //!< fired by the connection pool code.
152
153 fr_connection_create_t create; //!< Function used to create new connections.
154 fr_connection_alive_t alive; //!< Function used to check status of connections.
155 };
156
157 #ifndef HAVE_PTHREAD_H
158 # define pthread_mutex_lock(_x)
159 # define pthread_mutex_unlock(_x)
160 #endif
161
162 static const CONF_PARSER connection_config[] = {
163 { "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
164 { "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
165 { "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
166 { "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
167 { "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
168 { "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
169 { "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
170 { "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
171 { "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
172 { "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
173 { "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
174 CONF_PARSER_TERMINATOR
175 };
176
177 /** Order connections by reserved most recently
178 */
last_reserved_cmp(void const * one,void const * two)179 static int last_reserved_cmp(void const *one, void const *two)
180 {
181 fr_connection_t const *a = one;
182 fr_connection_t const *b = two;
183
184 if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1;
185 if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1;
186
187 if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1;
188 if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1;
189
190 return 0;
191 }
192
193 /** Order connections by released longest ago
194 */
last_released_cmp(void const * one,void const * two)195 static int last_released_cmp(void const *one, void const *two)
196 {
197 fr_connection_t const *a = one;
198 fr_connection_t const *b = two;
199
200 if (b->last_released.tv_sec < a->last_released.tv_sec) return -1;
201 if (b->last_released.tv_sec > a->last_released.tv_sec) return +1;
202
203 if (b->last_released.tv_usec < a->last_released.tv_usec) return -1;
204 if (b->last_released.tv_usec > a->last_released.tv_usec) return +1;
205
206 return 0;
207 }
208
209 /** Removes a connection from the connection list
210 *
211 * @note Must be called with the mutex held.
212 *
213 * @param[in,out] pool to modify.
214 * @param[in] this Connection to delete.
215 */
fr_connection_unlink(fr_connection_pool_t * pool,fr_connection_t * this)216 static void fr_connection_unlink(fr_connection_pool_t *pool, fr_connection_t *this)
217 {
218 if (this->prev) {
219 rad_assert(pool->head != this);
220 this->prev->next = this->next;
221 } else {
222 rad_assert(pool->head == this);
223 pool->head = this->next;
224 }
225 if (this->next) {
226 rad_assert(pool->tail != this);
227 this->next->prev = this->prev;
228 } else {
229 rad_assert(pool->tail == this);
230 pool->tail = this->prev;
231 }
232
233 this->prev = this->next = NULL;
234 }
235
236 /** Adds a connection to the head of the connection list
237 *
238 * @note Must be called with the mutex held.
239 *
240 * @param[in,out] pool to modify.
241 * @param[in] this Connection to add.
242 */
fr_connection_link_head(fr_connection_pool_t * pool,fr_connection_t * this)243 static void fr_connection_link_head(fr_connection_pool_t *pool, fr_connection_t *this)
244 {
245 rad_assert(pool != NULL);
246 rad_assert(this != NULL);
247 rad_assert(pool->head != this);
248 rad_assert(pool->tail != this);
249
250 if (pool->head) {
251 pool->head->prev = this;
252 }
253
254 this->next = pool->head;
255 this->prev = NULL;
256 pool->head = this;
257 if (!pool->tail) {
258 rad_assert(this->next == NULL);
259 pool->tail = this;
260 } else {
261 rad_assert(this->next != NULL);
262 }
263 }
264
265 /** Send a connection pool trigger.
266 *
267 * @param[in] pool to send trigger for.
268 * @param[in] name_suffix trigger name suffix.
269 */
fr_connection_exec_trigger(fr_connection_pool_t * pool,char const * name_suffix)270 static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
271 {
272 char name[64];
273 rad_assert(pool != NULL);
274 rad_assert(name_suffix != NULL);
275 snprintf(name, sizeof(name), "%s%s", pool->trigger_prefix, name_suffix);
276 exec_trigger(NULL, pool->cs, name, true);
277 }
278
279 /** Find a connection handle in the connection list
280 *
281 * Walks over the list of connections searching for a specified connection
282 * handle and returns the first connection that contains that pointer.
283 *
284 * @note Will lock mutex and only release mutex if connection handle
285 * is not found, so will usually return will mutex held.
286 * @note Must be called with the mutex free.
287 *
288 * @param[in] pool to search in.
289 * @param[in] conn handle to search for.
290 * @return
291 * - Connection containing the specified handle.
292 * - NULL if non if connection was found.
293 */
fr_connection_find(fr_connection_pool_t * pool,void * conn)294 static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn)
295 {
296 fr_connection_t *this;
297
298 if (!pool || !conn) return NULL;
299
300 pthread_mutex_lock(&pool->mutex);
301
302 /*
303 * FIXME: This loop could be avoided if we passed a 'void
304 * **connection' instead. We could use "offsetof" in
305 * order to find top of the parent structure.
306 */
307 for (this = pool->head; this != NULL; this = this->next) {
308 if (this->connection == conn) {
309 #ifdef PTHREAD_DEBUG
310 pthread_t pthread_id;
311
312 pthread_id = pthread_self();
313 rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
314 #endif
315
316 rad_assert(this->in_use == true);
317 return this;
318 }
319 }
320
321 pthread_mutex_unlock(&pool->mutex);
322 return NULL;
323 }
324
325 /** Spawns a new connection
326 *
327 * Spawns a new connection using the create callback, and returns it for
328 * adding to the connection list.
329 *
330 * @note Will call the 'open' trigger.
331 * @note Must be called with the mutex free.
332 *
333 * @param[in] pool to modify.
334 * @param[in] now Current time.
335 * @param[in] in_use whether the new connection should be "in_use" or not
336 * @return
337 * - New connection struct.
338 * - NULL on error.
339 */
fr_connection_spawn(fr_connection_pool_t * pool,time_t now,bool in_use)340 static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
341 {
342 uint64_t number;
343 uint32_t max_pending;
344 TALLOC_CTX *ctx;
345
346 fr_connection_t *this;
347 void *conn;
348
349 rad_assert(pool != NULL);
350
351 /*
352 * If we have NO connections, and we've previously failed
353 * opening connections, don't open multiple connections until
354 * we successfully open at least one.
355 */
356 if ((pool->num == 0) && pool->pending && pool->last_failed) return NULL;
357
358 pthread_mutex_lock(&pool->mutex);
359 rad_assert(pool->num <= pool->max);
360
361 /*
362 * Don't spawn too many connections at the same time.
363 */
364 if ((pool->num + pool->pending) >= pool->max) {
365 pthread_mutex_unlock(&pool->mutex);
366
367 ERROR("%s: Cannot open new connection, already at max", pool->log_prefix);
368 return NULL;
369 }
370
371 /*
372 * If the last attempt failed, wait a bit before
373 * retrying.
374 */
375 if (pool->last_failed && ((pool->last_failed + pool->retry_delay) > now)) {
376 bool complain = false;
377
378 if (pool->last_throttled != now) {
379 complain = true;
380
381 pool->last_throttled = now;
382 }
383
384 pthread_mutex_unlock(&pool->mutex);
385
386 if (!RATE_LIMIT_ENABLED || complain) {
387 ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
388 pool->log_prefix, pool->retry_delay);
389 }
390
391 return NULL;
392 }
393
394 /*
395 * We limit the rate of new connections after a failed attempt.
396 */
397 if (pool->pending > pool->max_pending) {
398 pthread_mutex_unlock(&pool->mutex);
399 RATE_LIMIT(WARN("%s: Cannot open a new connection due to rate limit after failure",
400 pool->log_prefix));
401 return NULL;
402 }
403
404 pool->pending++;
405 number = pool->count++;
406
407 /*
408 * Unlock the mutex while we try to open a new
409 * connection. If there are issues with the back-end,
410 * opening a new connection may take a LONG time. In
411 * that case, we want the other connections to continue
412 * to be used.
413 */
414 pthread_mutex_unlock(&pool->mutex);
415
416 /*
417 * The true value for max_pending is the smaller of
418 * free connection slots, or pool->max_pending.
419 */
420 max_pending = (pool->max - pool->num);
421 if (pool->max_pending < max_pending) max_pending = pool->max_pending;
422 INFO("%s: Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
423 pool->log_prefix, number, pool->pending, max_pending);
424
425 /*
426 * Allocate a new top level ctx for the create callback
427 * to hang its memory off of.
428 */
429 ctx = talloc_init("fr_connection_ctx");
430 if (!ctx) return NULL;
431
432 /*
433 * This may take a long time, which prevents other
434 * threads from releasing connections. We don't care
435 * about other threads opening new connections, as we
436 * already have no free connections.
437 */
438 conn = pool->create(ctx, pool->opaque);
439 if (!conn) {
440 ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, number);
441
442 pool->last_failed = now;
443 pthread_mutex_lock(&pool->mutex);
444 pool->max_pending = 1;
445 pool->pending--;
446 pthread_mutex_unlock(&pool->mutex);
447
448 talloc_free(ctx);
449
450 return NULL;
451 }
452
453 /*
454 * And lock the mutex again while we link the new
455 * connection back into the pool.
456 */
457 pthread_mutex_lock(&pool->mutex);
458
459 this = talloc_zero(pool, fr_connection_t);
460 if (!this) {
461 pthread_mutex_unlock(&pool->mutex);
462 talloc_free(ctx);
463
464 return NULL;
465 }
466 fr_link_talloc_ctx_free(this, ctx);
467
468 this->created = now;
469 this->connection = conn;
470 this->in_use = in_use;
471
472 this->number = number;
473 gettimeofday(&this->last_reserved, NULL);
474 this->last_released = this->last_reserved;
475
476 /*
477 * The connection pool is starting up. Insert the
478 * connection into the heap.
479 */
480 if (!in_use) fr_heap_insert(pool->heap, this);
481
482 fr_connection_link_head(pool, this);
483
484 /*
485 * Do NOT insert the connection into the heap. That's
486 * done when the connection is released.
487 */
488
489 pool->num++;
490
491 rad_assert(pool->pending > 0);
492 pool->pending--;
493
494 /*
495 * We've successfully opened one more connection. Allow
496 * more connections to open in parallel.
497 */
498 if (pool->max_pending < pool->max) pool->max_pending++;
499
500 pool->last_spawned = time(NULL);
501 pool->delay_interval = pool->cleanup_interval;
502 pool->next_delay = pool->cleanup_interval;
503 pool->last_failed = 0;
504
505 pthread_mutex_unlock(&pool->mutex);
506
507 fr_connection_exec_trigger(pool, "open");
508
509 return this;
510 }
511
512 /** Close an existing connection.
513 *
514 * Removes the connection from the list, calls the delete callback to close
515 * the connection, then frees memory allocated to the connection.
516 *
517 * @note Will call the 'close' trigger.
518 * @note Must be called with the mutex held.
519 *
520 * @param[in,out] pool to modify.
521 * @param[in] this Connection to delete.
522 * @param[in] reason to close the connection
523 * @param[in] msg optional message
524 */
fr_connection_close_internal(fr_connection_pool_t * pool,fr_connection_t * this,char const * reason,char const * msg)525 static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this,
526 char const *reason, char const *msg)
527 {
528 if (!msg) {
529 INFO("%s: %s (%" PRIu64 ")", pool->log_prefix, reason, this->number);
530 } else {
531 INFO("%s: %s (%" PRIu64 ") - %s", pool->log_prefix, reason, this->number, msg);
532 }
533
534
535 /*
536 * If it's in use, release it.
537 */
538 if (this->in_use) {
539 #ifdef PTHREAD_DEBUG
540 pthread_t pthread_id = pthread_self();
541 rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
542 #endif
543
544 this->in_use = false;
545
546 rad_assert(pool->active != 0);
547 pool->active--;
548
549 } else {
550 /*
551 * Connection isn't used, remove it from the heap.
552 */
553 fr_heap_extract(pool->heap, this);
554 }
555
556 fr_connection_exec_trigger(pool, "close");
557
558 fr_connection_unlink(pool, this);
559
560 rad_assert(pool->num > 0);
561 pool->num--;
562 talloc_free(this);
563 }
564
565 /** Check whether a connection needs to be removed from the pool
566 *
567 * Will verify that the connection is within idle_timeout, max_uses, and
568 * lifetime values. If it is not, the connection will be closed.
569 *
570 * @note Will only close connections not in use.
571 * @note Must be called with the mutex held.
572 *
573 * @param[in,out] pool to modify.
574 * @param[in,out] this Connection to manage.
575 * @param[in] now Current time.
576 * @param[in] get whether we want to get a connection
577 * @return
578 * - 0 if connection was closed.
579 * - 1 if connection handle was left open.
580 */
fr_connection_manage(fr_connection_pool_t * pool,fr_connection_t * this,time_t now,bool get)581 static int fr_connection_manage(fr_connection_pool_t *pool,
582 fr_connection_t *this,
583 time_t now, bool get)
584 {
585 rad_assert(pool != NULL);
586 rad_assert(this != NULL);
587 char const *reason = "Closing expired connection";
588 char const *msg = NULL;
589
590 /*
591 * Don't terminate in-use connections
592 */
593 if (this->in_use) return 1;
594
595 if ((pool->max_uses > 0) &&
596 (this->num_uses >= pool->max_uses)) {
597 msg = "Hit max_uses limit";
598
599 do_delete:
600 if (pool->num <= pool->min) {
601 DEBUG("%s: You probably need to lower \"min\"", pool->log_prefix);
602 }
603 fr_connection_close_internal(pool, this, reason, msg);
604 return 0;
605 }
606
607 if ((pool->lifetime > 0) &&
608 ((this->created + pool->lifetime) < now)) {
609 msg = "Hit lifetime limit";
610 goto do_delete;
611 }
612
613 /*
614 * The connection WAS idle, but the caller is interested
615 * in getting a new one. Instead of closing the old one
616 * and opening a new one, we just return the old one.
617 */
618 if (get) return 1;
619
620 if ((pool->idle_timeout > 0) &&
621 ((this->last_released.tv_sec + pool->idle_timeout) < now)) {
622 msg = "Hit idle_timeout limit";
623 goto do_delete;
624 }
625
626 return 1;
627 }
628
629
630 /** Check whether any connections need to be removed from the pool
631 *
632 * Maintains the number of connections in the pool as per the configuration
633 * parameters for the connection pool.
634 *
635 * @note Will only run checks the first time it's called in a given second,
636 * to throttle connection spawning/closing.
637 * @note Will only close connections not in use.
638 * @note Must be called with the mutex held, will release mutex before
639 * returning.
640 *
641 * @param[in,out] pool to manage.
642 * @return 1
643 */
fr_connection_pool_check(fr_connection_pool_t * pool)644 static int fr_connection_pool_check(fr_connection_pool_t *pool)
645 {
646 uint32_t num, spare;
647 time_t now = time(NULL);
648 fr_connection_t *this, *next;
649
650 if (pool->last_checked == now) {
651 pthread_mutex_unlock(&pool->mutex);
652 return 1;
653 }
654
655 /*
656 * Get "real" number of connections, and count pending
657 * connections as spare.
658 */
659 num = pool->num + pool->pending;
660 spare = pool->pending + (pool->num - pool->active);
661
662 /*
663 * The other end can close connections. If so, we'll
664 * have fewer than "min". When that happens, open more
665 * connections to enforce "min".
666 *
667 * The code for spawning connections enforces that
668 * num + pending <= max.
669 */
670 if (num < pool->min) {
671 INFO("Need %u more connections to reach min connections (%i)", pool->min - num, pool->min);
672 goto add_connection;
673 }
674
675 /*
676 * On the odd chance that we've opened too many
677 * connections, take care of that.
678 */
679 if (num > pool->max) {
680 /*
681 * Pending connections don't get closed as "spare".
682 */
683 if (pool->pending > 0) goto manage_connections;
684
685 /*
686 * Otherwise close one of the connections to
687 * bring us down to "max".
688 */
689 goto close_connection;
690 }
691
692 /*
693 * Now that we've enforced min/max connections, try to
694 * keep the "spare" connections at the correct number.
695 */
696
697 /*
698 * Nothing to do? Go check all of the connections for
699 * timeouts, etc.
700 */
701 if (spare == pool->spare) goto manage_connections;
702
703 /*
704 * Too many spare connections, delete some.
705 */
706 if (spare > pool->spare) {
707 fr_connection_t *found;
708
709 /*
710 * Pending connections don't get closed as "spare".
711 */
712 if (pool->pending > 0) goto manage_connections;
713
714 /*
715 * Don't close too many connections, even they
716 * are spare.
717 */
718 if (num <= pool->min) goto manage_connections;
719
720 /*
721 * Too many spares, go close one.
722 */
723
724 close_connection:
725 /*
726 * Don't close connections too often, in order to
727 * prevent flapping.
728 */
729 if (now < (pool->last_spawned + pool->delay_interval)) goto manage_connections;
730
731 /*
732 * Find a connection to close.
733 */
734 found = NULL;
735 for (this = pool->tail; this != NULL; this = this->prev) {
736 if (this->in_use) continue;
737
738 if (!found ||
739 timercmp(&this->last_reserved, &found->last_reserved, <)) {
740 found = this;
741 }
742 }
743
744 rad_assert(found != NULL);
745
746 fr_connection_close_internal(pool, found, "Closing connection", "Too many unused connections.");
747
748 /*
749 * Decrease the delay for the next time we clean
750 * up.
751 */
752 pool->next_delay >>= 1;
753 if (pool->next_delay == 0) pool->next_delay = 1;
754 pool->delay_interval += pool->next_delay;
755
756 goto manage_connections;
757 }
758
759 /*
760 * Too few connections, open some more.
761 */
762 if (spare < pool->spare) {
763 /*
764 * Don't open too many pending connections.
765 */
766 if (pool->pending >= pool->max_pending) goto manage_connections;
767
768 /*
769 * Don't open too many connections, even if we
770 * need more spares.
771 */
772 if (num >= pool->max) goto manage_connections;
773
774 /*
775 * Too few spares, go add one.
776 */
777
778 add_connection:
779 INFO("Need more connections to reach %i spares", pool->spare);
780
781 /*
782 * Only try to open spares if we're not already attempting to open
783 * a connection. Avoids spurious log messages.
784 */
785 pthread_mutex_unlock(&pool->mutex);
786 fr_connection_spawn(pool, now, false); /* ignore return code */
787 pthread_mutex_lock(&pool->mutex);
788 goto manage_connections;
789 }
790
791 /*
792 * Pass over all of the connections in the pool, limiting
793 * lifetime, idle time, max requests, etc.
794 */
795 manage_connections:
796 for (this = pool->head; this != NULL; this = next) {
797 next = this->next;
798 fr_connection_manage(pool, this, now, false);
799 }
800
801 pool->last_checked = now;
802 pthread_mutex_unlock(&pool->mutex);
803
804 return 1;
805 }
806
807 /** Get a connection from the connection pool
808 *
809 * @note Must be called with the mutex free.
810 *
811 * @param[in,out] pool to reserve the connection from.
812 * @param[in] spawn whether to spawn a new connection
813 * @return
814 * - A pointer to the connection handle.
815 * - NULL on error.
816 */
fr_connection_get_internal(fr_connection_pool_t * pool,bool spawn)817 static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
818 {
819 time_t now;
820 fr_connection_t *this;
821
822 if (!pool) return NULL;
823
824 /*
825 * Allow CTRL-C to kill the server in debugging mode.
826 */
827 if (main_config.exiting) return NULL;
828
829 #ifdef HAVE_PTHREAD_H
830 if (spawn) pthread_mutex_lock(&pool->mutex);
831 #endif
832
833 now = time(NULL);
834
835 /*
836 * Grab the link with the lowest latency, and check it
837 * for limits. If "connection manage" says the link is
838 * no longer usable, go grab another one.
839 */
840 do {
841 this = fr_heap_peek(pool->heap);
842 if (!this) break;
843
844 fr_assert(!this->in_use);
845 } while (!fr_connection_manage(pool, this, now, true));
846
847 /*
848 * We have a working connection. Extract it from the
849 * heap and use it.
850 */
851 if (this) {
852 fr_heap_extract(pool->heap, this);
853 goto do_return;
854 }
855
856 /*
857 * We were asked to avoid spawning a new connection, by
858 * fr_connection_reconnect_internal(). So we just return
859 * here.
860 */
861 if (!spawn) return NULL;
862
863 if (pool->num == pool->max) {
864 bool complain = false;
865
866 /*
867 * Rate-limit complaints.
868 */
869 if (pool->last_at_max != now) {
870 complain = true;
871 pool->last_at_max = now;
872 }
873
874 pthread_mutex_unlock(&pool->mutex);
875
876 if (!RATE_LIMIT_ENABLED || complain) {
877 ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
878 }
879
880 return NULL;
881 }
882
883 pthread_mutex_unlock(&pool->mutex);
884
885 DEBUG("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix,
886 pool->active, pool->num);
887 this = fr_connection_spawn(pool, now, true); /* MY connection! */
888 if (!this) return NULL;
889
890 pthread_mutex_lock(&pool->mutex);
891
892 do_return:
893 pool->active++;
894 this->num_uses++;
895 gettimeofday(&this->last_reserved, NULL);
896 this->in_use = true;
897
898 #ifdef PTHREAD_DEBUG
899 this->pthread_id = pthread_self();
900 #endif
901
902 #ifdef HAVE_PTHREAD_H
903 if (spawn) pthread_mutex_unlock(&pool->mutex);
904 #endif
905
906 DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
907
908 return this->connection;
909 }
910
911 /** Reconnect a suspected inviable connection
912 *
913 * @note Must be called with the mutex held, will not release mutex.
914 *
915 * @see fr_connection_get
916 * @param[in,out] pool to reconnect the connection in.
917 * @param[in,out] conn to reconnect.
918 * @return new connection handle if successful else NULL.
919 */
fr_connection_reconnect_internal(fr_connection_pool_t * pool,fr_connection_t * conn)920 static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn)
921 {
922 void *new_conn;
923 uint64_t conn_number;
924 TALLOC_CTX *ctx;
925
926 conn_number = conn->number;
927
928 /*
929 * Destroy any handles associated with the fr_connection_t
930 */
931 talloc_free_children(conn);
932
933 DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
934
935 /*
936 * Allocate a new top level ctx for the create callback
937 * to hang its memory off of.
938 */
939 ctx = talloc_init("fr_connection_ctx");
940 if (!ctx) return NULL;
941 fr_link_talloc_ctx_free(conn, ctx);
942
943 new_conn = pool->create(ctx, pool->opaque);
944 if (!new_conn) {
945 /*
946 * We can't create a new connection, so close the current one.
947 */
948 fr_connection_close_internal(pool, conn, "Closing connection", "Failed to reconnect");
949
950 /*
951 * Maybe there's a connection which is unused and
952 * available. If so, return it.
953 */
954 new_conn = fr_connection_get_internal(pool, false);
955 if (new_conn) return new_conn;
956
957 RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available",
958 pool->log_prefix, conn_number));
959
960 return NULL;
961 }
962
963 fr_connection_exec_trigger(pool, "close");
964 conn->connection = new_conn;
965
966 return new_conn;
967 }
968
969 /** Create a new connection pool
970 *
971 * Allocates structures used by the connection pool, initialises the various
972 * configuration options and counters, and sets the callback functions.
973 *
974 * Will also spawn the number of connections specified by the 'start'
975 * configuration options.
976 *
977 * @note Will call the 'start' trigger.
978 *
979 * @param[in] ctx Context to link pool's destruction to.
980 * @param[in] cs pool section.
981 * @param[in] opaque data pointer to pass to callbacks.
982 * @param[in] c Callback to create new connections.
983 * @param[in] a Callback to check the status of connections.
984 * @param[in] log_prefix prefix to prepend to all log messages.
985 * @param[in] trigger_prefix prefix to prepend to all trigger names.
986 * @return
987 * - New connection pool.
988 * - NULL on error.
989 */
fr_connection_pool_init(TALLOC_CTX * ctx,CONF_SECTION * cs,void * opaque,fr_connection_create_t c,fr_connection_alive_t a,char const * log_prefix,char const * trigger_prefix)990 fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx,
991 CONF_SECTION *cs,
992 void *opaque,
993 fr_connection_create_t c,
994 fr_connection_alive_t a,
995 char const *log_prefix,
996 char const *trigger_prefix)
997 {
998 uint32_t i;
999 fr_connection_pool_t *pool;
1000 fr_connection_t *this;
1001 time_t now;
1002
1003 if (!cs || !opaque || !c) return NULL;
1004
1005 now = time(NULL);
1006
1007 /*
1008 * Pool is allocated in the NULL context as
1009 * threads are likely to allocate memory
1010 * beneath the pool.
1011 */
1012 pool = talloc_zero(NULL, fr_connection_pool_t);
1013 if (!pool) return NULL;
1014
1015 /*
1016 * Ensure the pool is freed at the same time
1017 * as its parent.
1018 */
1019 if (fr_link_talloc_ctx_free(ctx, pool) < 0) {
1020 talloc_free(pool);
1021
1022 return NULL;
1023 }
1024
1025 pool->cs = cs;
1026 pool->opaque = opaque;
1027 pool->create = c;
1028 pool->alive = a;
1029
1030 pool->head = pool->tail = NULL;
1031
1032 /*
1033 * We keep a heap of connections, sorted by the last time
1034 * we STARTED using them. Newly opened connections
1035 * aren't in the heap. They're only inserted in the list
1036 * once they're released.
1037 *
1038 * We do "most recently started" instead of "most
1039 * recently used", because MRU is done as most recently
1040 * *released*. We want to order connections by
1041 * responsiveness, and MRU prioritizes high latency
1042 * connections.
1043 *
1044 * We want most recently *started*, which gives
1045 * preference to low latency links, and pushes high
1046 * latency links down in the priority heap.
1047 *
1048 * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1049 */
1050 if (!pool->spread) {
1051 pool->heap = fr_heap_create(last_reserved_cmp, offsetof(fr_connection_t, heap));
1052 /*
1053 * For some types of connections we need to used a different
1054 * algorithm, because load balancing benefits are secondary
1055 * to maintaining a cache of open connections.
1056 *
1057 * With libcurl's multihandle, connections can only be reused
1058 * if all handles that make up the multhandle are done processing
1059 * their requests.
1060 *
1061 * We can't tell when that's happened using libcurl, and even
1062 * if we could, blocking until all servers had responded
1063 * would have huge cost.
1064 *
1065 * The solution is to order the heap so that the connection that
1066 * was released longest ago is at the top.
1067 *
1068 * That way we maximise time between connection use.
1069 */
1070 } else {
1071 pool->heap = fr_heap_create(last_released_cmp, offsetof(fr_connection_t, heap));
1072 }
1073 if (!pool->heap) {
1074 talloc_free(pool);
1075 return NULL;
1076 }
1077
1078 pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
1079 pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "";
1080
1081 #ifdef HAVE_PTHREAD_H
1082 pthread_mutex_init(&pool->mutex, NULL);
1083 #endif
1084
1085 DEBUG("%s: Initialising connection pool", pool->log_prefix);
1086
1087 if (cf_section_parse(cs, pool, connection_config) < 0) goto error;
1088
1089 /*
1090 * Some simple limits
1091 */
1092 if (pool->max == 0) {
1093 cf_log_err_cs(cs, "Cannot set 'max' to zero");
1094 goto error;
1095 }
1096 pool->max_pending = pool->max; /* can open all connections now */
1097
1098 if (pool->min > pool->max) {
1099 cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
1100 goto error;
1101 }
1102
1103 FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1104 FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1105 FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1106
1107 if (pool->lifetime > 0) {
1108 FR_INTEGER_COND_CHECK("idle_timeout", pool->idle_timeout, (pool->idle_timeout <= pool->lifetime), 0);
1109 }
1110
1111 if (pool->idle_timeout > 0) {
1112 FR_INTEGER_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1113 }
1114
1115 /*
1116 * Don't open any connections. Instead, force the limits
1117 * to only 1 connection.
1118 *
1119 */
1120 if (check_config) {
1121 pool->start = pool->min = pool->max = 1;
1122 return pool;
1123 }
1124
1125 /*
1126 * Create all of the connections, unless the admin says
1127 * not to.
1128 */
1129 for (i = 0; i < pool->start; i++) {
1130 this = fr_connection_spawn(pool, now, false);
1131 if (!this) {
1132 error:
1133 fr_connection_pool_free(pool);
1134 return NULL;
1135 }
1136 }
1137
1138 fr_connection_exec_trigger(pool, "start");
1139
1140 return pool;
1141 }
1142
1143 /** Initialise a module specific connection pool
1144 *
1145 * @see fr_connection_pool_init
1146 *
1147 * @param[in] module section.
1148 * @param[in] opaque data pointer to pass to callbacks.
1149 * @param[in] c Callback to create new connections.
1150 * @param[in] a Callback to check the status of connections.
1151 * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION.
1152 * @return
1153 * - New connection pool.
1154 * - NULL on error.
1155 */
fr_connection_pool_module_init(CONF_SECTION * module,void * opaque,fr_connection_create_t c,fr_connection_alive_t a,char const * log_prefix)1156 fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module,
1157 void *opaque,
1158 fr_connection_create_t c,
1159 fr_connection_alive_t a,
1160 char const *log_prefix)
1161 {
1162 CONF_SECTION *cs, *mycs;
1163 char buff[128];
1164 char trigger_prefix[64];
1165
1166 fr_connection_pool_t *pool;
1167 char const *cs_name1, *cs_name2;
1168
1169 int ret;
1170
1171 #define CONNECTION_POOL_CF_KEY "connection_pool"
1172 #define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x)))
1173
1174 cs_name1 = cf_section_name1(module);
1175 cs_name2 = cf_section_name2(module);
1176 if (!cs_name2) cs_name2 = cs_name1;
1177
1178 snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1);
1179
1180 if (!log_prefix) {
1181 snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2);
1182 log_prefix = buff;
1183 }
1184
1185 /*
1186 * Get sibling's pool config section
1187 */
1188 ret = find_module_sibling_section(&cs, module, "pool");
1189 switch (ret) {
1190 case -1:
1191 return NULL;
1192
1193 case 1:
1194 DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs));
1195 break;
1196
1197 case 0:
1198 DEBUG4("%s: Using local pool section", log_prefix);
1199 break;
1200 }
1201
1202 /*
1203 * Get our pool config section
1204 */
1205 mycs = cf_section_sub_find(module, "pool");
1206 if (!mycs) {
1207 DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix,
1208 cf_section_name(module));
1209
1210 mycs = cf_section_alloc(module, "pool", NULL);
1211 cf_section_add(module, mycs);
1212 }
1213
1214 /*
1215 * Sibling didn't have a pool config section
1216 * Use our own local pool.
1217 */
1218 if (!cs) {
1219 DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix,
1220 parent_name(cs), parent_name(mycs));
1221 cs = mycs;
1222 }
1223
1224 /*
1225 * If fr_connection_pool_init has already been called
1226 * for this config section, reuse the previous instance.
1227 *
1228 * This allows modules to pass in the config sections
1229 * they would like to use the connection pool from.
1230 */
1231 pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY);
1232 if (!pool) {
1233 DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs));
1234 pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix);
1235 if (!pool) return NULL;
1236
1237 DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1238 cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL);
1239 return pool;
1240 }
1241 pool->ref++;
1242
1243 DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1244
1245 /*
1246 * We're reusing pool data add it to our local config
1247 * section. This allows other modules to transitively
1248 * re-use a pool through this module.
1249 */
1250 if (mycs != cs) {
1251 DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"",
1252 log_prefix, pool, parent_name(cs), parent_name(mycs));
1253 cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL);
1254 }
1255
1256 return pool;
1257 }
1258
1259 /** Allocate a new pool using an existing one as a template
1260 *
1261 * @param ctx to allocate new pool in.
1262 * @param pool to copy.
1263 * @param opaque data to pass to connection function.
1264 * @return
1265 * - New connection pool.
1266 * - NULL on error.
1267 */
fr_connection_pool_copy(TALLOC_CTX * ctx,fr_connection_pool_t * pool,void * opaque)1268 fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque)
1269 {
1270 return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create,
1271 pool->alive, pool->log_prefix, pool->trigger_prefix);
1272 }
1273
1274 /** Get the number of connections currently in the pool
1275 *
1276 * @param pool to count connections for.
1277 * @return the number of connections in the pool
1278 */
fr_connection_pool_get_num(fr_connection_pool_t * pool)1279 int fr_connection_pool_get_num(fr_connection_pool_t *pool)
1280 {
1281 return pool->num;
1282 }
1283
1284
1285 /** Delete a connection pool
1286 *
1287 * Closes, unlinks and frees all connections in the connection pool, then frees
1288 * all memory used by the connection pool.
1289 *
1290 * @note Will call the 'stop' trigger.
1291 * @note Must be called with the mutex free.
1292 *
1293 * @param[in,out] pool to delete.
1294 */
fr_connection_pool_free(fr_connection_pool_t * pool)1295 void fr_connection_pool_free(fr_connection_pool_t *pool)
1296 {
1297 fr_connection_t *this;
1298
1299 if (!pool) return;
1300
1301 /*
1302 * More modules hold a reference to this pool, don't free
1303 * it yet.
1304 */
1305 if (pool->ref > 0) {
1306 pool->ref--;
1307 return;
1308 }
1309
1310 DEBUG("%s: Removing connection pool", pool->log_prefix);
1311
1312 pthread_mutex_lock(&pool->mutex);
1313
1314 /*
1315 * Don't loop over the list. Just keep removing the head
1316 * until they're all gone.
1317 */
1318 while ((this = pool->head) != NULL) {
1319 fr_connection_close_internal(pool, this, "Closing connection", "Shutting down connection pool");
1320 }
1321
1322 fr_heap_delete(pool->heap);
1323
1324 fr_connection_exec_trigger(pool, "stop");
1325
1326 rad_assert(pool->head == NULL);
1327 rad_assert(pool->tail == NULL);
1328 rad_assert(pool->num == 0);
1329
1330 #ifdef HAVE_PTHREAD_H
1331 pthread_mutex_destroy(&pool->mutex);
1332 #endif
1333
1334 talloc_free(pool);
1335 }
1336
1337 /** Reserve a connection in the connection pool
1338 *
1339 * Will attempt to find an unused connection in the connection pool, if one is
1340 * found, will mark it as in in use increment the number of active connections
1341 * and return the connection handle.
1342 *
1343 * If no free connections are found will attempt to spawn a new one, conditional
1344 * on a connection spawning not already being in progress, and not being at the
1345 * 'max' connection limit.
1346 *
1347 * @note fr_connection_release must be called once the caller has finished
1348 * using the connection.
1349 *
1350 * @see fr_connection_release
1351 * @param[in,out] pool to reserve the connection from.
1352 * @return
1353 * - A pointer to the connection handle.
1354 * - NULL on error.
1355 */
fr_connection_get(fr_connection_pool_t * pool)1356 void *fr_connection_get(fr_connection_pool_t *pool)
1357 {
1358 return fr_connection_get_internal(pool, true);
1359 }
1360
1361 /** Release a connection
1362 *
1363 * Will mark a connection as unused and decrement the number of active
1364 * connections.
1365 *
1366 * @see fr_connection_get
1367 * @param[in,out] pool to release the connection in.
1368 * @param[in,out] conn to release.
1369 */
fr_connection_release(fr_connection_pool_t * pool,void * conn)1370 void fr_connection_release(fr_connection_pool_t *pool, void *conn)
1371 {
1372 fr_connection_t *this;
1373
1374 this = fr_connection_find(pool, conn);
1375 if (!this) return;
1376
1377 this->in_use = false;
1378
1379 /*
1380 * Record when the connection was last released
1381 */
1382 gettimeofday(&this->last_released, NULL);
1383
1384 /*
1385 * Insert the connection in the heap.
1386 *
1387 * This will either be based on when we *started* using it
1388 * (allowing fast links to be re-used, and slow links to be
1389 * gradually expired), or when we released it (allowing
1390 * the maximum amount of time between connection use).
1391 */
1392 fr_heap_insert(pool->heap, this);
1393
1394 rad_assert(pool->active != 0);
1395 pool->active--;
1396
1397 DEBUG("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
1398
1399 /*
1400 * We mirror the "spawn on get" functionality by having
1401 * "delete on release". If there are too many spare
1402 * connections, go manage the pool && clean some up.
1403 */
1404 fr_connection_pool_check(pool);
1405 }
1406
1407 /** Reconnect a suspected inviable connection
1408 *
1409 * This should be called by the module if it suspects that a connection is
1410 * not viable (e.g. the server has closed it).
1411 *
1412 * Will attempt to create a new connection handle using the create callback,
1413 * and if this is successful the new handle will be assigned to the existing
1414 * pool connection.
1415 *
1416 * If this is not successful, the connection will be removed from the pool.
1417 *
1418 * When implementing a module that uses the connection pool API, it is advisable
1419 * to pass a pointer to the pointer to the handle (void **conn)
1420 * to all functions which may call reconnect. This is so that if a new handle
1421 * is created and returned, the handle pointer can be updated up the callstack,
1422 * and a function higher up the stack doesn't attempt to use a now invalid
1423 * connection handle.
1424 *
1425 * @note Will free any talloced memory hung off the context of the connection,
1426 * being reconnected.
1427 *
1428 * @warning After calling reconnect the caller *MUST NOT* attempt to use
1429 * the old handle in any other operations, as its memory will have been
1430 * freed.
1431 *
1432 * @see fr_connection_get
1433 * @param[in,out] pool to reconnect the connection in.
1434 * @param[in,out] conn to reconnect.
1435 * @return new connection handle if successful else NULL.
1436 */
fr_connection_reconnect(fr_connection_pool_t * pool,void * conn)1437 void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
1438 {
1439 void *new_conn;
1440 fr_connection_t *this;
1441
1442 if (!pool || !conn) return NULL;
1443
1444 /*
1445 * Don't allow opening of new connections if we're trying
1446 * to exit.
1447 */
1448 if (main_config.exiting) {
1449 fr_connection_release(pool, conn);
1450 return NULL;
1451 }
1452
1453 /*
1454 * If fr_connection_find is successful the pool is now locked
1455 */
1456 this = fr_connection_find(pool, conn);
1457 if (!this) return NULL;
1458
1459 new_conn = fr_connection_reconnect_internal(pool, this);
1460 pthread_mutex_unlock(&pool->mutex);
1461
1462 return new_conn;
1463 }
1464
1465 /** Delete a connection from the connection pool.
1466 *
1467 * Resolves the connection handle to a connection, then (if found)
1468 * closes, unlinks and frees that connection.
1469 *
1470 * @note Must be called with the mutex free.
1471 *
1472 * @param[in,out] pool Connection pool to modify.
1473 * @param[in] conn to delete.
1474 * @param[in] msg why the connection was closed.
1475 * @return
1476 * - 0 If the connection could not be found.
1477 * - 1 if the connection was deleted.
1478 */
fr_connection_close(fr_connection_pool_t * pool,void * conn,char const * msg)1479 int fr_connection_close(fr_connection_pool_t *pool, void *conn, char const *msg)
1480 {
1481 fr_connection_t *this;
1482
1483 this = fr_connection_find(pool, conn);
1484 if (!this) return 0;
1485
1486 fr_connection_close_internal(pool, this, "Deleting connection", msg);
1487 fr_connection_pool_check(pool);
1488 return 1;
1489 }
1490