1 //! A generic connection pool.
2 //!
3 //! Opening a new database connection every time one is needed is both
4 //! inefficient and can lead to resource exhaustion under high traffic
5 //! conditions. A connection pool maintains a set of open connections to a
6 //! database, handing them out for repeated use.
7 //!
8 //! r2d2 is agnostic to the connection type it is managing. Implementors of the
9 //! `ManageConnection` trait provide the database-specific logic to create and
10 //! check the health of connections.
11 //!
12 //! # Example
13 //!
14 //! Using an imaginary "foodb" database.
15 //!
16 //! ```rust,ignore
17 //! use std::thread;
18 //!
19 //! extern crate r2d2;
20 //! extern crate r2d2_foodb;
21 //!
22 //! fn main() {
23 //! let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");
24 //! let pool = r2d2::Pool::builder()
25 //! .max_size(15)
26 //! .build(manager)
27 //! .unwrap();
28 //!
29 //! for _ in 0..20 {
30 //! let pool = pool.clone();
31 //! thread::spawn(move || {
32 //! let conn = pool.get().unwrap();
33 //! // use the connection
34 //! // it will be returned to the pool when it falls out of scope.
35 //! })
36 //! }
37 //! }
38 //! ```
39 #![warn(missing_docs)]
40 #![doc(html_root_url = "https://docs.rs/r2d2/0.8")]
41
42 use log::error;
43
44 use parking_lot::{Condvar, Mutex, MutexGuard};
45 use std::cmp;
46 use std::error;
47 use std::fmt;
48 use std::mem;
49 use std::ops::{Deref, DerefMut};
50 use std::sync::atomic::{AtomicUsize, Ordering};
51 use std::sync::{Arc, Weak};
52 use std::time::{Duration, Instant};
53
54 pub use crate::config::Builder;
55 use crate::config::Config;
56 use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
57 pub use crate::event::{HandleEvent, NopEventHandler};
58 pub use crate::extensions::Extensions;
59
60 mod config;
61 pub mod event;
62 mod extensions;
63
64 #[cfg(test)]
65 mod test;
66
67 static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
68
69 /// A trait which provides connection-specific functionality.
70 pub trait ManageConnection: Send + Sync + 'static {
71 /// The connection type this manager deals with.
72 type Connection: Send + 'static;
73
74 /// The error type returned by `Connection`s.
75 type Error: error::Error + 'static;
76
77 /// Attempts to create a new connection.
connect(&self) -> Result<Self::Connection, Self::Error>78 fn connect(&self) -> Result<Self::Connection, Self::Error>;
79
80 /// Determines if the connection is still connected to the database.
81 ///
82 /// A standard implementation would check if a simple query like `SELECT 1`
83 /// succeeds.
is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>84 fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
85
86 /// *Quickly* determines if the connection is no longer usable.
87 ///
88 /// This will be called synchronously every time a connection is returned
89 /// to the pool, so it should *not* block. If it returns `true`, the
90 /// connection will be discarded.
91 ///
92 /// For example, an implementation might check if the underlying TCP socket
93 /// has disconnected. Implementations that do not support this kind of
94 /// fast health check may simply return `false`.
has_broken(&self, conn: &mut Self::Connection) -> bool95 fn has_broken(&self, conn: &mut Self::Connection) -> bool;
96 }
97
98 /// A trait which handles errors reported by the `ManageConnection`.
99 pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
100 /// Handles an error.
handle_error(&self, error: E)101 fn handle_error(&self, error: E);
102 }
103
104 /// A `HandleError` implementation which does nothing.
105 #[derive(Copy, Clone, Debug)]
106 pub struct NopErrorHandler;
107
108 impl<E> HandleError<E> for NopErrorHandler {
handle_error(&self, _: E)109 fn handle_error(&self, _: E) {}
110 }
111
112 /// A `HandleError` implementation which logs at the error level.
113 #[derive(Copy, Clone, Debug)]
114 pub struct LoggingErrorHandler;
115
116 impl<E> HandleError<E> for LoggingErrorHandler
117 where
118 E: error::Error,
119 {
handle_error(&self, error: E)120 fn handle_error(&self, error: E) {
121 error!("{}", error);
122 }
123 }
124
125 /// A trait which allows for customization of connections.
126 pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
127 /// Called with connections immediately after they are returned from
128 /// `ManageConnection::connect`.
129 ///
130 /// The default implementation simply returns `Ok(())`.
131 ///
132 /// # Errors
133 ///
134 /// If this method returns an error, the connection will be discarded.
135 #[allow(unused_variables)]
on_acquire(&self, conn: &mut C) -> Result<(), E>136 fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
137 Ok(())
138 }
139
140 /// Called with connections when they are removed from the pool.
141 ///
142 /// The connections may be broken (as reported by `is_valid` or
143 /// `has_broken`), or have simply timed out.
144 ///
145 /// The default implementation does nothing.
146 #[allow(unused_variables)]
on_release(&self, conn: C)147 fn on_release(&self, conn: C) {}
148 }
149
150 /// A `CustomizeConnection` which does nothing.
151 #[derive(Copy, Clone, Debug)]
152 pub struct NopConnectionCustomizer;
153
154 impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
155
156 struct Conn<C> {
157 conn: C,
158 extensions: Extensions,
159 birth: Instant,
160 id: u64,
161 }
162
163 struct IdleConn<C> {
164 conn: Conn<C>,
165 idle_start: Instant,
166 }
167
168 struct PoolInternals<C> {
169 conns: Vec<IdleConn<C>>,
170 num_conns: u32,
171 pending_conns: u32,
172 last_error: Option<String>,
173 }
174
175 struct SharedPool<M>
176 where
177 M: ManageConnection,
178 {
179 config: Config<M::Connection, M::Error>,
180 manager: M,
181 internals: Mutex<PoolInternals<M::Connection>>,
182 cond: Condvar,
183 }
184
drop_conns<M>( shared: &Arc<SharedPool<M>>, mut internals: MutexGuard<PoolInternals<M::Connection>>, conns: Vec<Conn<M::Connection>>, ) where M: ManageConnection,185 fn drop_conns<M>(
186 shared: &Arc<SharedPool<M>>,
187 mut internals: MutexGuard<PoolInternals<M::Connection>>,
188 conns: Vec<Conn<M::Connection>>,
189 ) where
190 M: ManageConnection,
191 {
192 internals.num_conns -= conns.len() as u32;
193 establish_idle_connections(shared, &mut internals);
194 drop(internals); // make sure we run connection destructors without this locked
195
196 for conn in conns {
197 let event = ReleaseEvent {
198 id: conn.id,
199 age: conn.birth.elapsed(),
200 };
201 shared.config.event_handler.handle_release(event);
202 shared.config.connection_customizer.on_release(conn.conn);
203 }
204 }
205
establish_idle_connections<M>( shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>, ) where M: ManageConnection,206 fn establish_idle_connections<M>(
207 shared: &Arc<SharedPool<M>>,
208 internals: &mut PoolInternals<M::Connection>,
209 ) where
210 M: ManageConnection,
211 {
212 let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
213 let idle = internals.conns.len() as u32;
214 for _ in idle..min {
215 add_connection(shared, internals);
216 }
217 }
218
add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>) where M: ManageConnection,219 fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
220 where
221 M: ManageConnection,
222 {
223 if internals.num_conns + internals.pending_conns >= shared.config.max_size {
224 return;
225 }
226
227 internals.pending_conns += 1;
228 inner(Duration::from_secs(0), shared);
229
230 fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
231 where
232 M: ManageConnection,
233 {
234 let new_shared = Arc::downgrade(shared);
235 shared.config.thread_pool.execute_after(delay, move || {
236 let shared = match new_shared.upgrade() {
237 Some(shared) => shared,
238 None => return,
239 };
240
241 let conn = shared.manager.connect().and_then(|mut conn| {
242 shared
243 .config
244 .connection_customizer
245 .on_acquire(&mut conn)
246 .map(|_| conn)
247 });
248 match conn {
249 Ok(conn) => {
250 let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
251
252 let event = AcquireEvent { id };
253 shared.config.event_handler.handle_acquire(event);
254
255 let mut internals = shared.internals.lock();
256 internals.last_error = None;
257 let now = Instant::now();
258 let conn = IdleConn {
259 conn: Conn {
260 conn,
261 extensions: Extensions::new(),
262 birth: now,
263 id,
264 },
265 idle_start: now,
266 };
267 internals.conns.push(conn);
268 internals.pending_conns -= 1;
269 internals.num_conns += 1;
270 shared.cond.notify_one();
271 }
272 Err(err) => {
273 shared.internals.lock().last_error = Some(err.to_string());
274 shared.config.error_handler.handle_error(err);
275 let delay = cmp::max(Duration::from_millis(200), delay);
276 let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
277 inner(delay, &shared);
278 }
279 }
280 });
281 }
282 }
283
reap_connections<M>(shared: &Weak<SharedPool<M>>) where M: ManageConnection,284 fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
285 where
286 M: ManageConnection,
287 {
288 let shared = match shared.upgrade() {
289 Some(shared) => shared,
290 None => return,
291 };
292
293 let mut old = Vec::with_capacity(shared.config.max_size as usize);
294 let mut to_drop = vec![];
295
296 let mut internals = shared.internals.lock();
297 mem::swap(&mut old, &mut internals.conns);
298 let now = Instant::now();
299 for conn in old {
300 let mut reap = false;
301 if let Some(timeout) = shared.config.idle_timeout {
302 reap |= now - conn.idle_start >= timeout;
303 }
304 if let Some(lifetime) = shared.config.max_lifetime {
305 reap |= now - conn.conn.birth >= lifetime;
306 }
307 if reap {
308 to_drop.push(conn.conn);
309 } else {
310 internals.conns.push(conn);
311 }
312 }
313 drop_conns(&shared, internals, to_drop);
314 }
315
316 /// A generic connection pool.
317 pub struct Pool<M>(Arc<SharedPool<M>>)
318 where
319 M: ManageConnection;
320
321 /// Returns a new `Pool` referencing the same state as `self`.
322 impl<M> Clone for Pool<M>
323 where
324 M: ManageConnection,
325 {
clone(&self) -> Pool<M>326 fn clone(&self) -> Pool<M> {
327 Pool(self.0.clone())
328 }
329 }
330
331 impl<M> fmt::Debug for Pool<M>
332 where
333 M: ManageConnection + fmt::Debug,
334 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result335 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
336 fmt.debug_struct("Pool")
337 .field("state", &self.state())
338 .field("config", &self.0.config)
339 .field("manager", &self.0.manager)
340 .finish()
341 }
342 }
343
344 impl<M> Pool<M>
345 where
346 M: ManageConnection,
347 {
348 /// Creates a new connection pool with a default configuration.
new(manager: M) -> Result<Pool<M>, Error>349 pub fn new(manager: M) -> Result<Pool<M>, Error> {
350 Pool::builder().build(manager)
351 }
352
353 /// Returns a builder type to configure a new pool.
builder() -> Builder<M>354 pub fn builder() -> Builder<M> {
355 Builder::new()
356 }
357
358 // for testing
new_inner( config: Config<M::Connection, M::Error>, manager: M, reaper_rate: Duration, ) -> Pool<M>359 fn new_inner(
360 config: Config<M::Connection, M::Error>,
361 manager: M,
362 reaper_rate: Duration,
363 ) -> Pool<M> {
364 let internals = PoolInternals {
365 conns: Vec::with_capacity(config.max_size as usize),
366 num_conns: 0,
367 pending_conns: 0,
368 last_error: None,
369 };
370
371 let shared = Arc::new(SharedPool {
372 config,
373 manager,
374 internals: Mutex::new(internals),
375 cond: Condvar::new(),
376 });
377
378 establish_idle_connections(&shared, &mut shared.internals.lock());
379
380 if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
381 let s = Arc::downgrade(&shared);
382 shared
383 .config
384 .thread_pool
385 .execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
386 }
387
388 Pool(shared)
389 }
390
wait_for_initialization(&self) -> Result<(), Error>391 fn wait_for_initialization(&self) -> Result<(), Error> {
392 let end = Instant::now() + self.0.config.connection_timeout;
393 let mut internals = self.0.internals.lock();
394
395 let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
396
397 while internals.num_conns != initial_size {
398 if self.0.cond.wait_until(&mut internals, end).timed_out() {
399 return Err(Error(internals.last_error.take()));
400 }
401 }
402
403 Ok(())
404 }
405
406 /// Retrieves a connection from the pool.
407 ///
408 /// Waits for at most the configured connection timeout before returning an
409 /// error.
get(&self) -> Result<PooledConnection<M>, Error>410 pub fn get(&self) -> Result<PooledConnection<M>, Error> {
411 self.get_timeout(self.0.config.connection_timeout)
412 }
413
414 /// Retrieves a connection from the pool, waiting for at most `timeout`
415 ///
416 /// The given timeout will be used instead of the configured connection
417 /// timeout.
get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error>418 pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
419 let start = Instant::now();
420 let end = start + timeout;
421 let mut internals = self.0.internals.lock();
422
423 loop {
424 match self.try_get_inner(internals) {
425 Ok(conn) => {
426 let event = CheckoutEvent {
427 id: conn.conn.as_ref().unwrap().id,
428 duration: start.elapsed(),
429 };
430 self.0.config.event_handler.handle_checkout(event);
431 return Ok(conn);
432 }
433 Err(i) => internals = i,
434 }
435
436 add_connection(&self.0, &mut internals);
437
438 if self.0.cond.wait_until(&mut internals, end).timed_out() {
439 let event = TimeoutEvent { timeout };
440 self.0.config.event_handler.handle_timeout(event);
441
442 return Err(Error(internals.last_error.take()));
443 }
444 }
445 }
446
447 /// Attempts to retrieve a connection from the pool if there is one
448 /// available.
449 ///
450 /// Returns `None` if there are no idle connections available in the pool.
451 /// This method will not block waiting to establish a new connection.
try_get(&self) -> Option<PooledConnection<M>>452 pub fn try_get(&self) -> Option<PooledConnection<M>> {
453 self.try_get_inner(self.0.internals.lock()).ok()
454 }
455
try_get_inner<'a>( &'a self, mut internals: MutexGuard<'a, PoolInternals<M::Connection>>, ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>>456 fn try_get_inner<'a>(
457 &'a self,
458 mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
459 ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
460 loop {
461 if let Some(mut conn) = internals.conns.pop() {
462 establish_idle_connections(&self.0, &mut internals);
463 drop(internals);
464
465 if self.0.config.test_on_check_out {
466 if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
467 let msg = e.to_string();
468 self.0.config.error_handler.handle_error(e);
469 // FIXME we shouldn't have to lock, unlock, and relock here
470 internals = self.0.internals.lock();
471 internals.last_error = Some(msg);
472 drop_conns(&self.0, internals, vec![conn.conn]);
473 internals = self.0.internals.lock();
474 continue;
475 }
476 }
477
478 return Ok(PooledConnection {
479 pool: self.clone(),
480 checkout: Instant::now(),
481 conn: Some(conn.conn),
482 });
483 } else {
484 return Err(internals);
485 }
486 }
487 }
488
put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>)489 fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
490 let event = CheckinEvent {
491 id: conn.id,
492 duration: checkout.elapsed(),
493 };
494 self.0.config.event_handler.handle_checkin(event);
495
496 // This is specified to be fast, but call it before locking anyways
497 let broken = self.0.manager.has_broken(&mut conn.conn);
498
499 let mut internals = self.0.internals.lock();
500 if broken {
501 drop_conns(&self.0, internals, vec![conn]);
502 } else {
503 let conn = IdleConn {
504 conn,
505 idle_start: Instant::now(),
506 };
507 internals.conns.push(conn);
508 self.0.cond.notify_one();
509 }
510 }
511
512 /// Returns information about the current state of the pool.
state(&self) -> State513 pub fn state(&self) -> State {
514 let internals = self.0.internals.lock();
515 State {
516 connections: internals.num_conns,
517 idle_connections: internals.conns.len() as u32,
518 _p: (),
519 }
520 }
521
522 /// Returns the configured maximum pool size.
max_size(&self) -> u32523 pub fn max_size(&self) -> u32 {
524 self.0.config.max_size
525 }
526
527 /// Returns the configured mimimum idle connection count.
min_idle(&self) -> Option<u32>528 pub fn min_idle(&self) -> Option<u32> {
529 self.0.config.min_idle
530 }
531
532 /// Returns if the pool is configured to test connections on check out.
test_on_check_out(&self) -> bool533 pub fn test_on_check_out(&self) -> bool {
534 self.0.config.test_on_check_out
535 }
536
537 /// Returns the configured maximum connection lifetime.
max_lifetime(&self) -> Option<Duration>538 pub fn max_lifetime(&self) -> Option<Duration> {
539 self.0.config.max_lifetime
540 }
541
542 /// Returns the configured idle connection timeout.
idle_timeout(&self) -> Option<Duration>543 pub fn idle_timeout(&self) -> Option<Duration> {
544 self.0.config.idle_timeout
545 }
546
547 /// Returns the configured connection timeout.
connection_timeout(&self) -> Duration548 pub fn connection_timeout(&self) -> Duration {
549 self.0.config.connection_timeout
550 }
551 }
552
553 /// The error type returned by methods in this crate.
554 #[derive(Debug)]
555 pub struct Error(Option<String>);
556
557 impl fmt::Display for Error {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result558 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
559 fmt.write_str(error::Error::description(self))?;
560 if let Some(ref err) = self.0 {
561 write!(fmt, ": {}", err)?;
562 }
563 Ok(())
564 }
565 }
566
567 impl error::Error for Error {
description(&self) -> &str568 fn description(&self) -> &str {
569 "timed out waiting for connection"
570 }
571 }
572
573 /// Information about the state of a `Pool`.
574 pub struct State {
575 /// The number of connections currently being managed by the pool.
576 pub connections: u32,
577 /// The number of idle connections.
578 pub idle_connections: u32,
579 _p: (),
580 }
581
582 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result583 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
584 fmt.debug_struct("State")
585 .field("connections", &self.connections)
586 .field("idle_connections", &self.idle_connections)
587 .finish()
588 }
589 }
590
591 /// A smart pointer wrapping a connection.
592 pub struct PooledConnection<M>
593 where
594 M: ManageConnection,
595 {
596 pool: Pool<M>,
597 checkout: Instant,
598 conn: Option<Conn<M::Connection>>,
599 }
600
601 impl<M> fmt::Debug for PooledConnection<M>
602 where
603 M: ManageConnection,
604 M::Connection: fmt::Debug,
605 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result606 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
607 fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
608 }
609 }
610
611 impl<M> Drop for PooledConnection<M>
612 where
613 M: ManageConnection,
614 {
drop(&mut self)615 fn drop(&mut self) {
616 self.pool.put_back(self.checkout, self.conn.take().unwrap());
617 }
618 }
619
620 impl<M> Deref for PooledConnection<M>
621 where
622 M: ManageConnection,
623 {
624 type Target = M::Connection;
625
deref(&self) -> &M::Connection626 fn deref(&self) -> &M::Connection {
627 &self.conn.as_ref().unwrap().conn
628 }
629 }
630
631 impl<M> DerefMut for PooledConnection<M>
632 where
633 M: ManageConnection,
634 {
deref_mut(&mut self) -> &mut M::Connection635 fn deref_mut(&mut self) -> &mut M::Connection {
636 &mut self.conn.as_mut().unwrap().conn
637 }
638 }
639
640 impl<M> PooledConnection<M>
641 where
642 M: ManageConnection,
643 {
644 /// Returns a shared reference to the extensions associated with this connection.
extensions(this: &Self) -> &Extensions645 pub fn extensions(this: &Self) -> &Extensions {
646 &this.conn.as_ref().unwrap().extensions
647 }
648
649 /// Returns a mutable reference to the extensions associated with this connection.
extensions_mut(this: &mut Self) -> &mut Extensions650 pub fn extensions_mut(this: &mut Self) -> &mut Extensions {
651 &mut this.conn.as_mut().unwrap().extensions
652 }
653 }
654