1 /*
2  *   This program is free software; you can redistribute it and/or modify
3  *   it under the terms of the GNU General Public License as published by
4  *   the Free Software Foundation; either version 2 of the License, or
5  *   (at your option) any later version.
6  *
7  *   This program is distributed in the hope that it will be useful,
8  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *   GNU General Public License for more details.
11  *
12  *   You should have received a copy of the GNU General Public License
13  *   along with this program; if not, write to the Free Software
14  *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * @file connection.c
19  * @brief Handle pools of connections (threads, sockets, etc.)
20  * @note This API must be used by all modules in the public distribution that
21  * maintain pools of connections.
22  *
23  * @copyright 2012  The FreeRADIUS server project
24  * @copyright 2012  Alan DeKok <aland@deployingradius.com>
25  */
26 RCSID("$Id: 8c8c9b2255300c8ee257286be1ec597c7c506392 $")
27 
28 #include <freeradius-devel/radiusd.h>
29 #include <freeradius-devel/heap.h>
30 #include <freeradius-devel/modpriv.h>
31 #include <freeradius-devel/rad_assert.h>
32 
33 typedef struct fr_connection fr_connection_t;
34 
35 static int fr_connection_pool_check(fr_connection_pool_t *pool);
36 
37 #ifndef NDEBUG
38 #ifdef HAVE_PTHREAD_H
39 /* #define PTHREAD_DEBUG (1) */
40 #endif
41 #endif
42 
43 /*
44  *	We don't need to pollute the logs with "open / close"
45  *	connection information.  Instead, only print these messages
46  *	when debugging.
47  */
48 #undef INFO
49 #define INFO(fmt, ...) if (rad_debug_lvl) radlog(L_INFO,  fmt, ## __VA_ARGS__)
50 
51 /** An individual connection within the connection pool
52  *
53  * Defines connection counters, timestamps, and holds a pointer to the
54  * connection handle itself.
55  *
56  * @see fr_connection_pool_t
57  */
58 struct fr_connection {
59 	fr_connection_t	*prev;			//!< Previous connection in list.
60 	fr_connection_t	*next;			//!< Next connection in list.
61 
62 	time_t		created;		//!< Time connection was created.
63 	struct timeval 	last_reserved;		//!< Last time the connection was reserved.
64 
65 	struct timeval	last_released;  	//!< Time the connection was released.
66 
67 	uint32_t	num_uses;		//!< Number of times the connection has been reserved.
68 	uint64_t	number;			//!< Unique ID assigned when the connection is created,
69 						//!< these will monotonically increase over the
70 						//!< lifetime of the connection pool.
71 	void		*connection;		//!< Pointer to whatever the module uses for a connection
72 						//!< handle.
73 	bool		in_use;			//!< Whether the connection is currently reserved.
74 
75 	int		heap;			//!< For the next connection heap.
76 
77 #ifdef PTHREAD_DEBUG
78 	pthread_t	pthread_id;		//!< When 'in_use == true'.
79 #endif
80 };
81 
82 /** A connection pool
83  *
84  * Defines the configuration of the connection pool, all the counters and
85  * timestamps related to the connection pool, the mutex that stops multiple
86  * threads leaving the pool in an inconsistent state, and the callbacks
87  * required to open, close and check the status of connections within the pool.
88  *
89  * @see fr_connection
90  */
91 struct fr_connection_pool_t {
92 	int		ref;			//!< Reference counter to prevent connection
93 						//!< pool being freed multiple times.
94 	uint32_t       	start;			//!< Number of initial connections.
95 	uint32_t       	min;			//!< Minimum number of concurrent connections to keep open.
96 	uint32_t       	max;			//!< Maximum number of concurrent connections to allow.
97 	uint32_t       	spare;			//!< Number of spare connections to try.
98 	uint32_t	pending;		//!< Number of pending open connections.
99 	uint32_t       	retry_delay;		//!< seconds to delay re-open after a failed open.
100 	uint32_t       	cleanup_interval; 	//!< Initial timer for how often we sweep the pool
101 						//!< for free connections. (0 is infinite).
102 	int		delay_interval;		//!< When we next do a cleanup.  Initialized to
103 						//!< cleanup_interval, and increase from there based
104 						//!< on the delay.
105 	int		next_delay;    	 	//!< The next delay time.  cleanup.  Initialized to
106 						//!< cleanup_interval, and decays from there.
107 	uint64_t	max_uses;		//!< Maximum number of times a connection can be used
108 						//!< before being closed.
109 	uint32_t	lifetime;		//!< How long a connection can be open before being
110 						//!< closed (irrespective of whether it's idle or not).
111 	uint32_t       	idle_timeout;		//!< How long a connection can be idle before
112 						//!< being closed.
113 
114 	bool		spread;			//!< If true we spread requests over the connections,
115 						//!< using the connection released longest ago, first.
116 
117 	time_t		last_checked;		//!< Last time we pruned the connection pool.
118 	time_t		last_spawned;		//!< Last time we spawned a connection.
119 	time_t		last_failed;		//!< Last time we tried to spawn a connection but failed.
120 	time_t		last_throttled;		//!< Last time we refused to spawn a connection because
121 						//!< the last connection failed, or we were already spawning
122 						//!< a connection.
123 	time_t		last_at_max;		//!< Last time we hit the maximum number of allowed
124 						//!< connections.
125 
126 	uint32_t	max_pending;		//!< Max number of connections to open.
127 
128 	uint64_t	count;			//!< Number of connections spawned over the lifetime
129 						//!< of the pool.
130 	uint32_t       	num;			//!< Number of connections in the pool.
131 	uint32_t	active;	 		//!< Number of currently reserved connections.
132 
133 	fr_heap_t	*heap;			//!< For the next connection heap
134 
135 	fr_connection_t	*head;			//!< Start of the connection list.
136 	fr_connection_t *tail;			//!< End of the connection list.
137 
138 #ifdef HAVE_PTHREAD_H
139 	pthread_mutex_t	mutex;			//!< Mutex used to keep consistent state when making
140 						//!< modifications in threaded mode.
141 #endif
142 
143 	CONF_SECTION	*cs;			//!< Configuration section holding the section of parsed
144 						//!< config file that relates to this pool.
145 	void		*opaque;		//!< Pointer to context data that will be passed to callbacks.
146 
147 	char const	*log_prefix;		//!< Log prefix to prepend to all log messages created
148 						//!< by the connection pool code.
149 
150 	char const	*trigger_prefix;	//!< Prefix to prepend to names of all triggers
151 						//!< fired by the connection pool code.
152 
153 	fr_connection_create_t	create;		//!< Function used to create new connections.
154 	fr_connection_alive_t	alive;		//!< Function used to check status of connections.
155 };
156 
157 #ifndef HAVE_PTHREAD_H
158 #  define pthread_mutex_lock(_x)
159 #  define pthread_mutex_unlock(_x)
160 #endif
161 
162 static const CONF_PARSER connection_config[] = {
163 	{ "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
164 	{ "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
165 	{ "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
166 	{ "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
167 	{ "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
168 	{ "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
169 	{ "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
170 	{ "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
171 	{ "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
172 	{ "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
173 	{ "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
174 	CONF_PARSER_TERMINATOR
175 };
176 
177 /** Order connections by reserved most recently
178  */
last_reserved_cmp(void const * one,void const * two)179 static int last_reserved_cmp(void const *one, void const *two)
180 {
181 	fr_connection_t const *a = one;
182 	fr_connection_t const *b = two;
183 
184 	if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1;
185 	if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1;
186 
187 	if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1;
188 	if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1;
189 
190 	return 0;
191 }
192 
193 /** Order connections by released longest ago
194  */
last_released_cmp(void const * one,void const * two)195 static int last_released_cmp(void const *one, void const *two)
196 {
197 	fr_connection_t const *a = one;
198 	fr_connection_t const *b = two;
199 
200 	if (b->last_released.tv_sec < a->last_released.tv_sec) return -1;
201 	if (b->last_released.tv_sec > a->last_released.tv_sec) return +1;
202 
203 	if (b->last_released.tv_usec < a->last_released.tv_usec) return -1;
204 	if (b->last_released.tv_usec > a->last_released.tv_usec) return +1;
205 
206 	return 0;
207 }
208 
209 /** Removes a connection from the connection list
210  *
211  * @note Must be called with the mutex held.
212  *
213  * @param[in,out] pool to modify.
214  * @param[in] this Connection to delete.
215  */
fr_connection_unlink(fr_connection_pool_t * pool,fr_connection_t * this)216 static void fr_connection_unlink(fr_connection_pool_t *pool, fr_connection_t *this)
217 {
218 	if (this->prev) {
219 		rad_assert(pool->head != this);
220 		this->prev->next = this->next;
221 	} else {
222 		rad_assert(pool->head == this);
223 		pool->head = this->next;
224 	}
225 	if (this->next) {
226 		rad_assert(pool->tail != this);
227 		this->next->prev = this->prev;
228 	} else {
229 		rad_assert(pool->tail == this);
230 		pool->tail = this->prev;
231 	}
232 
233 	this->prev = this->next = NULL;
234 }
235 
236 /** Adds a connection to the head of the connection list
237  *
238  * @note Must be called with the mutex held.
239  *
240  * @param[in,out] pool to modify.
241  * @param[in] this Connection to add.
242  */
fr_connection_link_head(fr_connection_pool_t * pool,fr_connection_t * this)243 static void fr_connection_link_head(fr_connection_pool_t *pool, fr_connection_t *this)
244 {
245 	rad_assert(pool != NULL);
246 	rad_assert(this != NULL);
247 	rad_assert(pool->head != this);
248 	rad_assert(pool->tail != this);
249 
250 	if (pool->head) {
251 		pool->head->prev = this;
252 	}
253 
254 	this->next = pool->head;
255 	this->prev = NULL;
256 	pool->head = this;
257 	if (!pool->tail) {
258 		rad_assert(this->next == NULL);
259 		pool->tail = this;
260 	} else {
261 		rad_assert(this->next != NULL);
262 	}
263 }
264 
265 /** Send a connection pool trigger.
266  *
267  * @param[in] pool to send trigger for.
268  * @param[in] name_suffix trigger name suffix.
269  */
fr_connection_exec_trigger(fr_connection_pool_t * pool,char const * name_suffix)270 static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
271 {
272 	char name[64];
273 	rad_assert(pool != NULL);
274 	rad_assert(name_suffix != NULL);
275 	snprintf(name, sizeof(name), "%s%s", pool->trigger_prefix, name_suffix);
276 	exec_trigger(NULL, pool->cs, name, true);
277 }
278 
279 /** Find a connection handle in the connection list
280  *
281  * Walks over the list of connections searching for a specified connection
282  * handle and returns the first connection that contains that pointer.
283  *
284  * @note Will lock mutex and only release mutex if connection handle
285  * is not found, so will usually return will mutex held.
286  * @note Must be called with the mutex free.
287  *
288  * @param[in] pool to search in.
289  * @param[in] conn handle to search for.
290  * @return
291  *	- Connection containing the specified handle.
292  *	- NULL if non if connection was found.
293  */
fr_connection_find(fr_connection_pool_t * pool,void * conn)294 static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn)
295 {
296 	fr_connection_t *this;
297 
298 	if (!pool || !conn) return NULL;
299 
300 	pthread_mutex_lock(&pool->mutex);
301 
302 	/*
303 	 *	FIXME: This loop could be avoided if we passed a 'void
304 	 *	**connection' instead.  We could use "offsetof" in
305 	 *	order to find top of the parent structure.
306 	 */
307 	for (this = pool->head; this != NULL; this = this->next) {
308 		if (this->connection == conn) {
309 #ifdef PTHREAD_DEBUG
310 			pthread_t pthread_id;
311 
312 			pthread_id = pthread_self();
313 			rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
314 #endif
315 
316 			rad_assert(this->in_use == true);
317 			return this;
318 		}
319 	}
320 
321 	pthread_mutex_unlock(&pool->mutex);
322 	return NULL;
323 }
324 
325 /** Spawns a new connection
326  *
327  * Spawns a new connection using the create callback, and returns it for
328  * adding to the connection list.
329  *
330  * @note Will call the 'open' trigger.
331  * @note Must be called with the mutex free.
332  *
333  * @param[in] pool to modify.
334  * @param[in] now Current time.
335  * @param[in] in_use whether the new connection should be "in_use" or not
336  * @return
337  *	- New connection struct.
338  *	- NULL on error.
339  */
fr_connection_spawn(fr_connection_pool_t * pool,time_t now,bool in_use)340 static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
341 {
342 	uint64_t	number;
343 	uint32_t	max_pending;
344 	TALLOC_CTX	*ctx;
345 
346 	fr_connection_t	*this;
347 	void		*conn;
348 
349 	rad_assert(pool != NULL);
350 
351 	/*
352 	 *	If we have NO connections, and we've previously failed
353 	 *	opening connections, don't open multiple connections until
354 	 *	we successfully open at least one.
355 	 */
356 	if ((pool->num == 0) && pool->pending && pool->last_failed) return NULL;
357 
358 	pthread_mutex_lock(&pool->mutex);
359 	rad_assert(pool->num <= pool->max);
360 
361 	/*
362 	 *	Don't spawn too many connections at the same time.
363 	 */
364 	if ((pool->num + pool->pending) >= pool->max) {
365 		pthread_mutex_unlock(&pool->mutex);
366 
367 		ERROR("%s: Cannot open new connection, already at max", pool->log_prefix);
368 		return NULL;
369 	}
370 
371 	/*
372 	 *	If the last attempt failed, wait a bit before
373 	 *	retrying.
374 	 */
375 	if (pool->last_failed && ((pool->last_failed + pool->retry_delay) > now)) {
376 		bool complain = false;
377 
378 		if (pool->last_throttled != now) {
379 			complain = true;
380 
381 			pool->last_throttled = now;
382 		}
383 
384 		pthread_mutex_unlock(&pool->mutex);
385 
386 		if (!RATE_LIMIT_ENABLED || complain) {
387 			ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
388 			      pool->log_prefix, pool->retry_delay);
389 		}
390 
391 		return NULL;
392 	}
393 
394 	/*
395 	 *	We limit the rate of new connections after a failed attempt.
396 	 */
397 	if (pool->pending > pool->max_pending) {
398 		pthread_mutex_unlock(&pool->mutex);
399 		RATE_LIMIT(WARN("%s: Cannot open a new connection due to rate limit after failure",
400 				pool->log_prefix));
401 		return NULL;
402 	}
403 
404 	pool->pending++;
405 	number = pool->count++;
406 
407 	/*
408 	 *	Unlock the mutex while we try to open a new
409 	 *	connection.  If there are issues with the back-end,
410 	 *	opening a new connection may take a LONG time.  In
411 	 *	that case, we want the other connections to continue
412 	 *	to be used.
413 	 */
414 	pthread_mutex_unlock(&pool->mutex);
415 
416 	/*
417 	 *	The true value for max_pending is the smaller of
418 	 *	free connection slots, or pool->max_pending.
419 	 */
420 	max_pending = (pool->max - pool->num);
421 	if (pool->max_pending < max_pending) max_pending = pool->max_pending;
422 	INFO("%s: Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
423 	     pool->log_prefix, number, pool->pending, max_pending);
424 
425 	/*
426 	 *	Allocate a new top level ctx for the create callback
427 	 *	to hang its memory off of.
428 	 */
429 	ctx = talloc_init("fr_connection_ctx");
430 	if (!ctx) return NULL;
431 
432 	/*
433 	 *	This may take a long time, which prevents other
434 	 *	threads from releasing connections.  We don't care
435 	 *	about other threads opening new connections, as we
436 	 *	already have no free connections.
437 	 */
438 	conn = pool->create(ctx, pool->opaque);
439 	if (!conn) {
440 		ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, number);
441 
442 		pool->last_failed = now;
443 		pthread_mutex_lock(&pool->mutex);
444 		pool->max_pending = 1;
445 		pool->pending--;
446 		pthread_mutex_unlock(&pool->mutex);
447 
448 		talloc_free(ctx);
449 
450 		return NULL;
451 	}
452 
453 	/*
454 	 *	And lock the mutex again while we link the new
455 	 *	connection back into the pool.
456 	 */
457 	pthread_mutex_lock(&pool->mutex);
458 
459 	this = talloc_zero(pool, fr_connection_t);
460 	if (!this) {
461 		pthread_mutex_unlock(&pool->mutex);
462 		talloc_free(ctx);
463 
464 		return NULL;
465 	}
466 	fr_link_talloc_ctx_free(this, ctx);
467 
468 	this->created = now;
469 	this->connection = conn;
470 	this->in_use = in_use;
471 
472 	this->number = number;
473 	gettimeofday(&this->last_reserved, NULL);
474 	this->last_released = this->last_reserved;
475 
476 	/*
477 	 *	The connection pool is starting up.  Insert the
478 	 *	connection into the heap.
479 	 */
480 	if (!in_use) fr_heap_insert(pool->heap, this);
481 
482 	fr_connection_link_head(pool, this);
483 
484 	/*
485 	 *	Do NOT insert the connection into the heap.  That's
486 	 *	done when the connection is released.
487 	 */
488 
489 	pool->num++;
490 
491 	rad_assert(pool->pending > 0);
492 	pool->pending--;
493 
494 	/*
495 	 *	We've successfully opened one more connection.  Allow
496 	 *	more connections to open in parallel.
497 	 */
498 	if (pool->max_pending < pool->max) pool->max_pending++;
499 
500 	pool->last_spawned = time(NULL);
501 	pool->delay_interval = pool->cleanup_interval;
502 	pool->next_delay = pool->cleanup_interval;
503 	pool->last_failed = 0;
504 
505 	pthread_mutex_unlock(&pool->mutex);
506 
507 	fr_connection_exec_trigger(pool, "open");
508 
509 	return this;
510 }
511 
512 /** Close an existing connection.
513  *
514  * Removes the connection from the list, calls the delete callback to close
515  * the connection, then frees memory allocated to the connection.
516  *
517  * @note Will call the 'close' trigger.
518  * @note Must be called with the mutex held.
519  *
520  * @param[in,out] pool to modify.
521  * @param[in] this Connection to delete.
522  * @param[in] reason to close the connection
523  * @param[in] msg optional message
524  */
fr_connection_close_internal(fr_connection_pool_t * pool,fr_connection_t * this,char const * reason,char const * msg)525 static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this,
526 					 char const *reason, char const *msg)
527 {
528 	if (!msg) {
529 		INFO("%s: %s (%" PRIu64 ")", pool->log_prefix, reason, this->number);
530 	} else {
531 		INFO("%s: %s (%" PRIu64 ") - %s", pool->log_prefix, reason, this->number, msg);
532 	}
533 
534 
535 	/*
536 	 *	If it's in use, release it.
537 	 */
538 	if (this->in_use) {
539 #ifdef PTHREAD_DEBUG
540 		pthread_t pthread_id = pthread_self();
541 		rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
542 #endif
543 
544 		this->in_use = false;
545 
546 		rad_assert(pool->active != 0);
547 		pool->active--;
548 
549 	} else {
550 		/*
551 		 *	Connection isn't used, remove it from the heap.
552 		 */
553 		fr_heap_extract(pool->heap, this);
554 	}
555 
556 	fr_connection_exec_trigger(pool, "close");
557 
558 	fr_connection_unlink(pool, this);
559 
560 	rad_assert(pool->num > 0);
561 	pool->num--;
562 	talloc_free(this);
563 }
564 
565 /** Check whether a connection needs to be removed from the pool
566  *
567  * Will verify that the connection is within idle_timeout, max_uses, and
568  * lifetime values. If it is not, the connection will be closed.
569  *
570  * @note Will only close connections not in use.
571  * @note Must be called with the mutex held.
572  *
573  * @param[in,out] pool to modify.
574  * @param[in,out] this Connection to manage.
575  * @param[in] now Current time.
576  * @param[in] get whether we want to get a connection
577  * @return
578  *	- 0 if connection was closed.
579  *	- 1 if connection handle was left open.
580  */
fr_connection_manage(fr_connection_pool_t * pool,fr_connection_t * this,time_t now,bool get)581 static int fr_connection_manage(fr_connection_pool_t *pool,
582 				fr_connection_t *this,
583 				time_t now, bool get)
584 {
585 	rad_assert(pool != NULL);
586 	rad_assert(this != NULL);
587 	char const *reason = "Closing expired connection";
588 	char const *msg = NULL;
589 
590 	/*
591 	 *	Don't terminate in-use connections
592 	 */
593 	if (this->in_use) return 1;
594 
595 	if ((pool->max_uses > 0) &&
596 	    (this->num_uses >= pool->max_uses)) {
597 		msg = "Hit max_uses limit";
598 
599 	do_delete:
600 		if (pool->num <= pool->min) {
601 			DEBUG("%s: You probably need to lower \"min\"", pool->log_prefix);
602 		}
603 		fr_connection_close_internal(pool, this, reason, msg);
604 		return 0;
605 	}
606 
607 	if ((pool->lifetime > 0) &&
608 	    ((this->created + pool->lifetime) < now)) {
609 		msg = "Hit lifetime limit";
610 		goto do_delete;
611 	}
612 
613 	/*
614 	 *	The connection WAS idle, but the caller is interested
615 	 *	in getting a new one.  Instead of closing the old one
616 	 *	and opening a new one, we just return the old one.
617 	 */
618 	if (get) return 1;
619 
620 	if ((pool->idle_timeout > 0) &&
621 	    ((this->last_released.tv_sec + pool->idle_timeout) < now)) {
622 		msg = "Hit idle_timeout limit";
623 		goto do_delete;
624 	}
625 
626 	return 1;
627 }
628 
629 
630 /** Check whether any connections need to be removed from the pool
631  *
632  * Maintains the number of connections in the pool as per the configuration
633  * parameters for the connection pool.
634  *
635  * @note Will only run checks the first time it's called in a given second,
636  * to throttle connection spawning/closing.
637  * @note Will only close connections not in use.
638  * @note Must be called with the mutex held, will release mutex before
639  * returning.
640  *
641  * @param[in,out] pool to manage.
642  * @return 1
643  */
fr_connection_pool_check(fr_connection_pool_t * pool)644 static int fr_connection_pool_check(fr_connection_pool_t *pool)
645 {
646 	uint32_t num, spare;
647 	time_t now = time(NULL);
648 	fr_connection_t *this, *next;
649 
650 	if (pool->last_checked == now) {
651 		pthread_mutex_unlock(&pool->mutex);
652 		return 1;
653 	}
654 
655 	/*
656 	 *	Get "real" number of connections, and count pending
657 	 *	connections as spare.
658 	 */
659 	num = pool->num + pool->pending;
660 	spare = pool->pending + (pool->num - pool->active);
661 
662 	/*
663 	 *	The other end can close connections.  If so, we'll
664 	 *	have fewer than "min".  When that happens, open more
665 	 *	connections to enforce "min".
666 	 *
667 	 *	The code for spawning connections enforces that
668 	 *	num + pending <= max.
669 	 */
670 	if (num < pool->min) {
671 		INFO("Need %u more connections to reach min connections (%i)", pool->min - num, pool->min);
672 		goto add_connection;
673 	}
674 
675 	/*
676 	 *	On the odd chance that we've opened too many
677 	 *	connections, take care of that.
678 	 */
679 	if (num > pool->max) {
680 		/*
681 		 *	Pending connections don't get closed as "spare".
682 		 */
683 		if (pool->pending > 0) goto manage_connections;
684 
685 		/*
686 		 *	Otherwise close one of the connections to
687 		 *	bring us down to "max".
688 		 */
689 		goto close_connection;
690 	}
691 
692 	/*
693 	 *	Now that we've enforced min/max connections, try to
694 	 *	keep the "spare" connections at the correct number.
695 	 */
696 
697 	/*
698 	 *	Nothing to do?  Go check all of the connections for
699 	 *	timeouts, etc.
700 	 */
701 	if (spare == pool->spare) goto manage_connections;
702 
703 	/*
704 	 *	Too many spare connections, delete some.
705 	 */
706 	if (spare > pool->spare) {
707 		fr_connection_t *found;
708 
709 		/*
710 		 *	Pending connections don't get closed as "spare".
711 		 */
712 		if (pool->pending > 0) goto manage_connections;
713 
714 		/*
715 		 *	Don't close too many connections, even they
716 		 *	are spare.
717 		 */
718 		if (num <= pool->min) goto manage_connections;
719 
720 		/*
721 		 *	Too many spares, go close one.
722 		 */
723 
724 	close_connection:
725 		/*
726 		 *	Don't close connections too often, in order to
727 		 *	prevent flapping.
728 		 */
729 		if (now < (pool->last_spawned + pool->delay_interval)) goto manage_connections;
730 
731 		/*
732 		 *	Find a connection to close.
733 		 */
734 		found = NULL;
735 		for (this = pool->tail; this != NULL; this = this->prev) {
736 			if (this->in_use) continue;
737 
738 			if (!found ||
739 			    timercmp(&this->last_reserved, &found->last_reserved, <)) {
740 				found = this;
741 			}
742 		}
743 
744 		rad_assert(found != NULL);
745 
746 		fr_connection_close_internal(pool, found, "Closing connection", "Too many unused connections.");
747 
748 		/*
749 		 *	Decrease the delay for the next time we clean
750 		 *	up.
751 		 */
752 		pool->next_delay >>= 1;
753 		if (pool->next_delay == 0) pool->next_delay = 1;
754 		pool->delay_interval += pool->next_delay;
755 
756 		goto manage_connections;
757 	}
758 
759 	/*
760 	 *	Too few connections, open some more.
761 	 */
762 	if (spare < pool->spare) {
763 		/*
764 		 *	Don't open too many pending connections.
765 		 */
766 		if (pool->pending >= pool->max_pending) goto manage_connections;
767 
768 		/*
769 		 *	Don't open too many connections, even if we
770 		 *	need more spares.
771 		 */
772 		if (num >= pool->max) goto manage_connections;
773 
774 		/*
775 		 *	Too few spares, go add one.
776 		 */
777 
778 	add_connection:
779 		INFO("Need more connections to reach %i spares", pool->spare);
780 
781 		/*
782 		 *	Only try to open spares if we're not already attempting to open
783 		 *	a connection. Avoids spurious log messages.
784 		 */
785 		pthread_mutex_unlock(&pool->mutex);
786 		fr_connection_spawn(pool, now, false); /* ignore return code */
787 		pthread_mutex_lock(&pool->mutex);
788 		goto manage_connections;
789 	}
790 
791 	/*
792 	 *	Pass over all of the connections in the pool, limiting
793 	 *	lifetime, idle time, max requests, etc.
794 	 */
795 manage_connections:
796 	for (this = pool->head; this != NULL; this = next) {
797 		next = this->next;
798 		fr_connection_manage(pool, this, now, false);
799 	}
800 
801 	pool->last_checked = now;
802 	pthread_mutex_unlock(&pool->mutex);
803 
804 	return 1;
805 }
806 
807 /** Get a connection from the connection pool
808  *
809  * @note Must be called with the mutex free.
810  *
811  * @param[in,out] pool to reserve the connection from.
812  * @param[in] spawn whether to spawn a new connection
813  * @return
814  *	- A pointer to the connection handle.
815  *	- NULL on error.
816  */
fr_connection_get_internal(fr_connection_pool_t * pool,bool spawn)817 static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
818 {
819 	time_t now;
820 	fr_connection_t *this;
821 
822 	if (!pool) return NULL;
823 
824 	/*
825 	 *	Allow CTRL-C to kill the server in debugging mode.
826 	 */
827 	if (main_config.exiting) return NULL;
828 
829 #ifdef HAVE_PTHREAD_H
830 	if (spawn) pthread_mutex_lock(&pool->mutex);
831 #endif
832 
833 	now = time(NULL);
834 
835 	/*
836 	 *	Grab the link with the lowest latency, and check it
837 	 *	for limits.  If "connection manage" says the link is
838 	 *	no longer usable, go grab another one.
839 	 */
840 	do {
841 		this = fr_heap_peek(pool->heap);
842 		if (!this) break;
843 
844 		fr_assert(!this->in_use);
845 	} while (!fr_connection_manage(pool, this, now, true));
846 
847 	/*
848 	 *	We have a working connection.  Extract it from the
849 	 *	heap and use it.
850 	 */
851 	if (this) {
852 		fr_heap_extract(pool->heap, this);
853 		goto do_return;
854 	}
855 
856 	/*
857 	 *	We were asked to avoid spawning a new connection, by
858 	 *	fr_connection_reconnect_internal().  So we just return
859 	 *	here.
860 	 */
861 	if (!spawn) return NULL;
862 
863 	if (pool->num == pool->max) {
864 		bool complain = false;
865 
866 		/*
867 		 *	Rate-limit complaints.
868 		 */
869 		if (pool->last_at_max != now) {
870 			complain = true;
871 			pool->last_at_max = now;
872 		}
873 
874 		pthread_mutex_unlock(&pool->mutex);
875 
876 		if (!RATE_LIMIT_ENABLED || complain) {
877 			ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
878 		}
879 
880 		return NULL;
881 	}
882 
883 	pthread_mutex_unlock(&pool->mutex);
884 
885 	DEBUG("%s: %i of %u connections in use.  You  may need to increase \"spare\"", pool->log_prefix,
886 	      pool->active, pool->num);
887 	this = fr_connection_spawn(pool, now, true); /* MY connection! */
888 	if (!this) return NULL;
889 
890 	pthread_mutex_lock(&pool->mutex);
891 
892 do_return:
893 	pool->active++;
894 	this->num_uses++;
895 	gettimeofday(&this->last_reserved, NULL);
896 	this->in_use = true;
897 
898 #ifdef PTHREAD_DEBUG
899 	this->pthread_id = pthread_self();
900 #endif
901 
902 #ifdef HAVE_PTHREAD_H
903 	if (spawn) pthread_mutex_unlock(&pool->mutex);
904 #endif
905 
906 	DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
907 
908 	return this->connection;
909 }
910 
911 /** Reconnect a suspected inviable connection
912  *
913  * @note Must be called with the mutex held, will not release mutex.
914  *
915  * @see fr_connection_get
916  * @param[in,out] pool to reconnect the connection in.
917  * @param[in,out] conn to reconnect.
918  * @return new connection handle if successful else NULL.
919  */
fr_connection_reconnect_internal(fr_connection_pool_t * pool,fr_connection_t * conn)920 static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn)
921 {
922 	void		*new_conn;
923 	uint64_t	conn_number;
924 	TALLOC_CTX	*ctx;
925 
926 	conn_number = conn->number;
927 
928 	/*
929 	 *	Destroy any handles associated with the fr_connection_t
930 	 */
931 	talloc_free_children(conn);
932 
933 	DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
934 
935 	/*
936 	 *	Allocate a new top level ctx for the create callback
937 	 *	to hang its memory off of.
938 	 */
939 	ctx = talloc_init("fr_connection_ctx");
940 	if (!ctx) return NULL;
941 	fr_link_talloc_ctx_free(conn, ctx);
942 
943 	new_conn = pool->create(ctx, pool->opaque);
944 	if (!new_conn) {
945 		/*
946 		 *	We can't create a new connection, so close the current one.
947 		 */
948 		fr_connection_close_internal(pool, conn, "Closing connection", "Failed to reconnect");
949 
950 		/*
951 		 *	Maybe there's a connection which is unused and
952 		 *	available.  If so, return it.
953 		 */
954 		new_conn = fr_connection_get_internal(pool, false);
955 		if (new_conn) return new_conn;
956 
957 		RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available",
958 				 pool->log_prefix, conn_number));
959 
960 		return NULL;
961 	}
962 
963 	fr_connection_exec_trigger(pool, "close");
964 	conn->connection = new_conn;
965 
966 	return new_conn;
967 }
968 
969 /** Create a new connection pool
970  *
971  * Allocates structures used by the connection pool, initialises the various
972  * configuration options and counters, and sets the callback functions.
973  *
974  * Will also spawn the number of connections specified by the 'start'
975  * configuration options.
976  *
977  * @note Will call the 'start' trigger.
978  *
979  * @param[in] ctx Context to link pool's destruction to.
980  * @param[in] cs pool section.
981  * @param[in] opaque data pointer to pass to callbacks.
982  * @param[in] c Callback to create new connections.
983  * @param[in] a Callback to check the status of connections.
984  * @param[in] log_prefix prefix to prepend to all log messages.
985  * @param[in] trigger_prefix prefix to prepend to all trigger names.
986  * @return
987  *	- New connection pool.
988  *	- NULL on error.
989  */
fr_connection_pool_init(TALLOC_CTX * ctx,CONF_SECTION * cs,void * opaque,fr_connection_create_t c,fr_connection_alive_t a,char const * log_prefix,char const * trigger_prefix)990 fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx,
991 					      CONF_SECTION *cs,
992 					      void *opaque,
993 					      fr_connection_create_t c,
994 					      fr_connection_alive_t a,
995 					      char const *log_prefix,
996 					      char const *trigger_prefix)
997 {
998 	uint32_t i;
999 	fr_connection_pool_t *pool;
1000 	fr_connection_t *this;
1001 	time_t now;
1002 
1003 	if (!cs || !opaque || !c) return NULL;
1004 
1005 	now = time(NULL);
1006 
1007 	/*
1008 	 *	Pool is allocated in the NULL context as
1009 	 *	threads are likely to allocate memory
1010 	 *	beneath the pool.
1011 	 */
1012 	pool = talloc_zero(NULL, fr_connection_pool_t);
1013 	if (!pool) return NULL;
1014 
1015 	/*
1016 	 *	Ensure the pool is freed at the same time
1017 	 *	as its parent.
1018 	 */
1019 	if (fr_link_talloc_ctx_free(ctx, pool) < 0) {
1020 		talloc_free(pool);
1021 
1022 		return NULL;
1023 	}
1024 
1025 	pool->cs = cs;
1026 	pool->opaque = opaque;
1027 	pool->create = c;
1028 	pool->alive = a;
1029 
1030 	pool->head = pool->tail = NULL;
1031 
1032 	/*
1033 	 *	We keep a heap of connections, sorted by the last time
1034 	 *	we STARTED using them.  Newly opened connections
1035 	 *	aren't in the heap.  They're only inserted in the list
1036 	 *	once they're released.
1037 	 *
1038 	 *	We do "most recently started" instead of "most
1039 	 *	recently used", because MRU is done as most recently
1040 	 *	*released*.  We want to order connections by
1041 	 *	responsiveness, and MRU prioritizes high latency
1042 	 *	connections.
1043 	 *
1044 	 *	We want most recently *started*, which gives
1045 	 *	preference to low latency links, and pushes high
1046 	 *	latency links down in the priority heap.
1047 	 *
1048 	 *	https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1049 	 */
1050 	if (!pool->spread) {
1051 		pool->heap = fr_heap_create(last_reserved_cmp, offsetof(fr_connection_t, heap));
1052 	/*
1053 	 *	For some types of connections we need to used a different
1054 	 *	algorithm, because load balancing benefits are secondary
1055 	 *	to maintaining a cache of open connections.
1056 	 *
1057 	 *	With libcurl's multihandle, connections can only be reused
1058 	 *	if all handles that make up the multhandle are done processing
1059 	 *	their requests.
1060 	 *
1061 	 *	We can't tell when that's happened using libcurl, and even
1062 	 *	if we could, blocking until all servers had responded
1063 	 *	would have huge cost.
1064 	 *
1065 	 *	The solution is to order the heap so that the connection that
1066 	 *	was released longest ago is at the top.
1067 	 *
1068 	 *	That way we maximise time between connection use.
1069 	 */
1070 	} else {
1071 		pool->heap = fr_heap_create(last_released_cmp, offsetof(fr_connection_t, heap));
1072 	}
1073 	if (!pool->heap) {
1074 		talloc_free(pool);
1075 		return NULL;
1076 	}
1077 
1078 	pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
1079 	pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "";
1080 
1081 #ifdef HAVE_PTHREAD_H
1082 	pthread_mutex_init(&pool->mutex, NULL);
1083 #endif
1084 
1085 	DEBUG("%s: Initialising connection pool", pool->log_prefix);
1086 
1087 	if (cf_section_parse(cs, pool, connection_config) < 0) goto error;
1088 
1089 	/*
1090 	 *	Some simple limits
1091 	 */
1092 	if (pool->max == 0) {
1093 		cf_log_err_cs(cs, "Cannot set 'max' to zero");
1094 		goto error;
1095 	}
1096 	pool->max_pending = pool->max; /* can open all connections now */
1097 
1098 	if (pool->min > pool->max) {
1099 		cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
1100 		goto error;
1101 	}
1102 
1103 	FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1104 	FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1105 	FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1106 
1107 	if (pool->lifetime > 0) {
1108 		FR_INTEGER_COND_CHECK("idle_timeout", pool->idle_timeout, (pool->idle_timeout <= pool->lifetime), 0);
1109 	}
1110 
1111 	if (pool->idle_timeout > 0) {
1112 		FR_INTEGER_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1113 	}
1114 
1115 	/*
1116 	 *	Don't open any connections.  Instead, force the limits
1117 	 *	to only 1 connection.
1118 	 *
1119 	 */
1120 	if (check_config) {
1121 		pool->start = pool->min = pool->max = 1;
1122 		return pool;
1123 	}
1124 
1125 	/*
1126 	 *	Create all of the connections, unless the admin says
1127 	 *	not to.
1128 	 */
1129 	for (i = 0; i < pool->start; i++) {
1130 		this = fr_connection_spawn(pool, now, false);
1131 		if (!this) {
1132 		error:
1133 			fr_connection_pool_free(pool);
1134 			return NULL;
1135 		}
1136 	}
1137 
1138 	fr_connection_exec_trigger(pool, "start");
1139 
1140 	return pool;
1141 }
1142 
1143 /** Initialise a module specific connection pool
1144  *
1145  * @see fr_connection_pool_init
1146  *
1147  * @param[in] module section.
1148  * @param[in] opaque data pointer to pass to callbacks.
1149  * @param[in] c Callback to create new connections.
1150  * @param[in] a Callback to check the status of connections.
1151  * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION.
1152  * @return
1153  *	- New connection pool.
1154  *	- NULL on error.
1155  */
fr_connection_pool_module_init(CONF_SECTION * module,void * opaque,fr_connection_create_t c,fr_connection_alive_t a,char const * log_prefix)1156 fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module,
1157 						     void *opaque,
1158 						     fr_connection_create_t c,
1159 						     fr_connection_alive_t a,
1160 						     char const *log_prefix)
1161 {
1162 	CONF_SECTION *cs, *mycs;
1163 	char buff[128];
1164 	char trigger_prefix[64];
1165 
1166 	fr_connection_pool_t *pool;
1167 	char const *cs_name1, *cs_name2;
1168 
1169 	int ret;
1170 
1171 #define CONNECTION_POOL_CF_KEY "connection_pool"
1172 #define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x)))
1173 
1174 	cs_name1 = cf_section_name1(module);
1175 	cs_name2 = cf_section_name2(module);
1176 	if (!cs_name2) cs_name2 = cs_name1;
1177 
1178 	snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1);
1179 
1180 	if (!log_prefix) {
1181 		snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2);
1182 		log_prefix = buff;
1183 	}
1184 
1185 	/*
1186 	 *	Get sibling's pool config section
1187 	 */
1188 	ret = find_module_sibling_section(&cs, module, "pool");
1189 	switch (ret) {
1190 	case -1:
1191 		return NULL;
1192 
1193 	case 1:
1194 		DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs));
1195 		break;
1196 
1197 	case 0:
1198 		DEBUG4("%s: Using local pool section", log_prefix);
1199 		break;
1200 	}
1201 
1202 	/*
1203 	 *	Get our pool config section
1204 	 */
1205 	mycs = cf_section_sub_find(module, "pool");
1206 	if (!mycs) {
1207 		DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix,
1208 		       cf_section_name(module));
1209 
1210 		mycs = cf_section_alloc(module, "pool", NULL);
1211 		cf_section_add(module, mycs);
1212 	}
1213 
1214 	/*
1215 	 *	Sibling didn't have a pool config section
1216 	 *	Use our own local pool.
1217 	 */
1218 	if (!cs) {
1219 		DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix,
1220 		       parent_name(cs), parent_name(mycs));
1221 		cs = mycs;
1222 	}
1223 
1224 	/*
1225 	 *	If fr_connection_pool_init has already been called
1226 	 *	for this config section, reuse the previous instance.
1227 	 *
1228 	 *	This allows modules to pass in the config sections
1229 	 *	they would like to use the connection pool from.
1230 	 */
1231 	pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY);
1232 	if (!pool) {
1233 		DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs));
1234 		pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix);
1235 		if (!pool) return NULL;
1236 
1237 		DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1238 		cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL);
1239 		return pool;
1240 	}
1241 	pool->ref++;
1242 
1243 	DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
1244 
1245 	/*
1246 	 *	We're reusing pool data add it to our local config
1247 	 *	section. This allows other modules to transitively
1248 	 *	re-use a pool through this module.
1249 	 */
1250 	if (mycs != cs) {
1251 		DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"",
1252 		       log_prefix, pool, parent_name(cs), parent_name(mycs));
1253 		cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL);
1254 	}
1255 
1256 	return pool;
1257 }
1258 
1259 /** Allocate a new pool using an existing one as a template
1260  *
1261  * @param ctx to allocate new pool in.
1262  * @param pool to copy.
1263  * @param opaque data to pass to connection function.
1264  * @return
1265  *	- New connection pool.
1266  *	- NULL on error.
1267  */
fr_connection_pool_copy(TALLOC_CTX * ctx,fr_connection_pool_t * pool,void * opaque)1268 fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque)
1269 {
1270 	return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create,
1271 				       pool->alive, pool->log_prefix, pool->trigger_prefix);
1272 }
1273 
1274 /** Get the number of connections currently in the pool
1275  *
1276  * @param pool to count connections for.
1277  * @return the number of connections in the pool
1278  */
fr_connection_pool_get_num(fr_connection_pool_t * pool)1279 int fr_connection_pool_get_num(fr_connection_pool_t *pool)
1280 {
1281 	return pool->num;
1282 }
1283 
1284 
1285 /** Delete a connection pool
1286  *
1287  * Closes, unlinks and frees all connections in the connection pool, then frees
1288  * all memory used by the connection pool.
1289  *
1290  * @note Will call the 'stop' trigger.
1291  * @note Must be called with the mutex free.
1292  *
1293  * @param[in,out] pool to delete.
1294  */
fr_connection_pool_free(fr_connection_pool_t * pool)1295 void fr_connection_pool_free(fr_connection_pool_t *pool)
1296 {
1297 	fr_connection_t *this;
1298 
1299 	if (!pool) return;
1300 
1301 	/*
1302 	 *	More modules hold a reference to this pool, don't free
1303 	 *	it yet.
1304 	 */
1305 	if (pool->ref > 0) {
1306 		pool->ref--;
1307 		return;
1308 	}
1309 
1310 	DEBUG("%s: Removing connection pool", pool->log_prefix);
1311 
1312 	pthread_mutex_lock(&pool->mutex);
1313 
1314 	/*
1315 	 *	Don't loop over the list.  Just keep removing the head
1316 	 *	until they're all gone.
1317 	 */
1318 	while ((this = pool->head) != NULL) {
1319 		fr_connection_close_internal(pool, this, "Closing connection", "Shutting down connection pool");
1320 	}
1321 
1322 	fr_heap_delete(pool->heap);
1323 
1324 	fr_connection_exec_trigger(pool, "stop");
1325 
1326 	rad_assert(pool->head == NULL);
1327 	rad_assert(pool->tail == NULL);
1328 	rad_assert(pool->num == 0);
1329 
1330 #ifdef HAVE_PTHREAD_H
1331 	pthread_mutex_destroy(&pool->mutex);
1332 #endif
1333 
1334 	talloc_free(pool);
1335 }
1336 
1337 /** Reserve a connection in the connection pool
1338  *
1339  * Will attempt to find an unused connection in the connection pool, if one is
1340  * found, will mark it as in in use increment the number of active connections
1341  * and return the connection handle.
1342  *
1343  * If no free connections are found will attempt to spawn a new one, conditional
1344  * on a connection spawning not already being in progress, and not being at the
1345  * 'max' connection limit.
1346  *
1347  * @note fr_connection_release must be called once the caller has finished
1348  * using the connection.
1349  *
1350  * @see fr_connection_release
1351  * @param[in,out] pool to reserve the connection from.
1352  * @return
1353  *	- A pointer to the connection handle.
1354  *	- NULL on error.
1355  */
fr_connection_get(fr_connection_pool_t * pool)1356 void *fr_connection_get(fr_connection_pool_t *pool)
1357 {
1358 	return fr_connection_get_internal(pool, true);
1359 }
1360 
1361 /** Release a connection
1362  *
1363  * Will mark a connection as unused and decrement the number of active
1364  * connections.
1365  *
1366  * @see fr_connection_get
1367  * @param[in,out] pool to release the connection in.
1368  * @param[in,out] conn to release.
1369  */
fr_connection_release(fr_connection_pool_t * pool,void * conn)1370 void fr_connection_release(fr_connection_pool_t *pool, void *conn)
1371 {
1372 	fr_connection_t *this;
1373 
1374 	this = fr_connection_find(pool, conn);
1375 	if (!this) return;
1376 
1377 	this->in_use = false;
1378 
1379 	/*
1380 	 *	Record when the connection was last released
1381 	 */
1382 	gettimeofday(&this->last_released, NULL);
1383 
1384 	/*
1385 	 *	Insert the connection in the heap.
1386 	 *
1387 	 *	This will either be based on when we *started* using it
1388 	 *	(allowing fast links to be re-used, and slow links to be
1389 	 *	gradually expired), or when we released it (allowing
1390 	 *	the maximum amount of time between connection use).
1391 	 */
1392 	fr_heap_insert(pool->heap, this);
1393 
1394 	rad_assert(pool->active != 0);
1395 	pool->active--;
1396 
1397 	DEBUG("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
1398 
1399 	/*
1400 	 *	We mirror the "spawn on get" functionality by having
1401 	 *	"delete on release".  If there are too many spare
1402 	 *	connections, go manage the pool && clean some up.
1403 	 */
1404 	fr_connection_pool_check(pool);
1405 }
1406 
1407 /** Reconnect a suspected inviable connection
1408  *
1409  * This should be called by the module if it suspects that a connection is
1410  * not viable (e.g. the server has closed it).
1411  *
1412  * Will attempt to create a new connection handle using the create callback,
1413  * and if this is successful the new handle will be assigned to the existing
1414  * pool connection.
1415  *
1416  * If this is not successful, the connection will be removed from the pool.
1417  *
1418  * When implementing a module that uses the connection pool API, it is advisable
1419  * to pass a pointer to the pointer to the handle (void **conn)
1420  * to all functions which may call reconnect. This is so that if a new handle
1421  * is created and returned, the handle pointer can be updated up the callstack,
1422  * and a function higher up the stack doesn't attempt to use a now invalid
1423  * connection handle.
1424  *
1425  * @note Will free any talloced memory hung off the context of the connection,
1426  *	being reconnected.
1427  *
1428  * @warning After calling reconnect the caller *MUST NOT* attempt to use
1429  *	the old handle in any other operations, as its memory will have been
1430  *	freed.
1431  *
1432  * @see fr_connection_get
1433  * @param[in,out] pool to reconnect the connection in.
1434  * @param[in,out] conn to reconnect.
1435  * @return new connection handle if successful else NULL.
1436  */
fr_connection_reconnect(fr_connection_pool_t * pool,void * conn)1437 void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
1438 {
1439 	void		*new_conn;
1440 	fr_connection_t	*this;
1441 
1442 	if (!pool || !conn) return NULL;
1443 
1444 	/*
1445 	 *	Don't allow opening of new connections if we're trying
1446 	 *	to exit.
1447 	 */
1448 	if (main_config.exiting) {
1449 		fr_connection_release(pool, conn);
1450 		return NULL;
1451 	}
1452 
1453 	/*
1454 	 *	If fr_connection_find is successful the pool is now locked
1455 	 */
1456 	this = fr_connection_find(pool, conn);
1457 	if (!this) return NULL;
1458 
1459 	new_conn = fr_connection_reconnect_internal(pool, this);
1460 	pthread_mutex_unlock(&pool->mutex);
1461 
1462 	return new_conn;
1463 }
1464 
1465 /** Delete a connection from the connection pool.
1466  *
1467  * Resolves the connection handle to a connection, then (if found)
1468  * closes, unlinks and frees that connection.
1469  *
1470  * @note Must be called with the mutex free.
1471  *
1472  * @param[in,out] pool Connection pool to modify.
1473  * @param[in] conn to delete.
1474  * @param[in] msg why the connection was closed.
1475  * @return
1476  *	- 0 If the connection could not be found.
1477  *	- 1 if the connection was deleted.
1478  */
fr_connection_close(fr_connection_pool_t * pool,void * conn,char const * msg)1479 int fr_connection_close(fr_connection_pool_t *pool, void *conn, char const *msg)
1480 {
1481 	fr_connection_t *this;
1482 
1483 	this = fr_connection_find(pool, conn);
1484 	if (!this) return 0;
1485 
1486 	fr_connection_close_internal(pool, this, "Deleting connection", msg);
1487 	fr_connection_pool_check(pool);
1488 	return 1;
1489 }
1490