1 //! A generic connection pool.
2 //!
3 //! Opening a new database connection every time one is needed is both
4 //! inefficient and can lead to resource exhaustion under high traffic
5 //! conditions. A connection pool maintains a set of open connections to a
6 //! database, handing them out for repeated use.
7 //!
8 //! r2d2 is agnostic to the connection type it is managing. Implementors of the
9 //! `ManageConnection` trait provide the database-specific logic to create and
10 //! check the health of connections.
11 //!
12 //! # Example
13 //!
14 //! Using an imaginary "foodb" database.
15 //!
16 //! ```rust,ignore
17 //! use std::thread;
18 //!
19 //! extern crate r2d2;
20 //! extern crate r2d2_foodb;
21 //!
22 //! fn main() {
23 //!     let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");
24 //!     let pool = r2d2::Pool::builder()
25 //!         .max_size(15)
26 //!         .build(manager)
27 //!         .unwrap();
28 //!
29 //!     for _ in 0..20 {
30 //!         let pool = pool.clone();
31 //!         thread::spawn(move || {
32 //!             let conn = pool.get().unwrap();
33 //!             // use the connection
34 //!             // it will be returned to the pool when it falls out of scope.
35 //!         })
36 //!     }
37 //! }
38 //! ```
39 #![warn(missing_docs)]
40 #![doc(html_root_url = "https://docs.rs/r2d2/0.8")]
41 
42 use log::error;
43 
44 use parking_lot::{Condvar, Mutex, MutexGuard};
45 use std::cmp;
46 use std::error;
47 use std::fmt;
48 use std::mem;
49 use std::ops::{Deref, DerefMut};
50 use std::sync::atomic::{AtomicUsize, Ordering};
51 use std::sync::{Arc, Weak};
52 use std::time::{Duration, Instant};
53 
54 pub use crate::config::Builder;
55 use crate::config::Config;
56 use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
57 pub use crate::event::{HandleEvent, NopEventHandler};
58 pub use crate::extensions::Extensions;
59 
60 mod config;
61 pub mod event;
62 mod extensions;
63 
64 #[cfg(test)]
65 mod test;
66 
67 static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
68 
69 /// A trait which provides connection-specific functionality.
70 pub trait ManageConnection: Send + Sync + 'static {
71     /// The connection type this manager deals with.
72     type Connection: Send + 'static;
73 
74     /// The error type returned by `Connection`s.
75     type Error: error::Error + 'static;
76 
77     /// Attempts to create a new connection.
connect(&self) -> Result<Self::Connection, Self::Error>78     fn connect(&self) -> Result<Self::Connection, Self::Error>;
79 
80     /// Determines if the connection is still connected to the database.
81     ///
82     /// A standard implementation would check if a simple query like `SELECT 1`
83     /// succeeds.
is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>84     fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
85 
86     /// *Quickly* determines if the connection is no longer usable.
87     ///
88     /// This will be called synchronously every time a connection is returned
89     /// to the pool, so it should *not* block. If it returns `true`, the
90     /// connection will be discarded.
91     ///
92     /// For example, an implementation might check if the underlying TCP socket
93     /// has disconnected. Implementations that do not support this kind of
94     /// fast health check may simply return `false`.
has_broken(&self, conn: &mut Self::Connection) -> bool95     fn has_broken(&self, conn: &mut Self::Connection) -> bool;
96 }
97 
98 /// A trait which handles errors reported by the `ManageConnection`.
99 pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
100     /// Handles an error.
handle_error(&self, error: E)101     fn handle_error(&self, error: E);
102 }
103 
104 /// A `HandleError` implementation which does nothing.
105 #[derive(Copy, Clone, Debug)]
106 pub struct NopErrorHandler;
107 
108 impl<E> HandleError<E> for NopErrorHandler {
handle_error(&self, _: E)109     fn handle_error(&self, _: E) {}
110 }
111 
112 /// A `HandleError` implementation which logs at the error level.
113 #[derive(Copy, Clone, Debug)]
114 pub struct LoggingErrorHandler;
115 
116 impl<E> HandleError<E> for LoggingErrorHandler
117 where
118     E: error::Error,
119 {
handle_error(&self, error: E)120     fn handle_error(&self, error: E) {
121         error!("{}", error);
122     }
123 }
124 
125 /// A trait which allows for customization of connections.
126 pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
127     /// Called with connections immediately after they are returned from
128     /// `ManageConnection::connect`.
129     ///
130     /// The default implementation simply returns `Ok(())`.
131     ///
132     /// # Errors
133     ///
134     /// If this method returns an error, the connection will be discarded.
135     #[allow(unused_variables)]
on_acquire(&self, conn: &mut C) -> Result<(), E>136     fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
137         Ok(())
138     }
139 
140     /// Called with connections when they are removed from the pool.
141     ///
142     /// The connections may be broken (as reported by `is_valid` or
143     /// `has_broken`), or have simply timed out.
144     ///
145     /// The default implementation does nothing.
146     #[allow(unused_variables)]
on_release(&self, conn: C)147     fn on_release(&self, conn: C) {}
148 }
149 
150 /// A `CustomizeConnection` which does nothing.
151 #[derive(Copy, Clone, Debug)]
152 pub struct NopConnectionCustomizer;
153 
154 impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
155 
156 struct Conn<C> {
157     conn: C,
158     extensions: Extensions,
159     birth: Instant,
160     id: u64,
161 }
162 
163 struct IdleConn<C> {
164     conn: Conn<C>,
165     idle_start: Instant,
166 }
167 
168 struct PoolInternals<C> {
169     conns: Vec<IdleConn<C>>,
170     num_conns: u32,
171     pending_conns: u32,
172     last_error: Option<String>,
173 }
174 
175 struct SharedPool<M>
176 where
177     M: ManageConnection,
178 {
179     config: Config<M::Connection, M::Error>,
180     manager: M,
181     internals: Mutex<PoolInternals<M::Connection>>,
182     cond: Condvar,
183 }
184 
drop_conns<M>( shared: &Arc<SharedPool<M>>, mut internals: MutexGuard<PoolInternals<M::Connection>>, conns: Vec<Conn<M::Connection>>, ) where M: ManageConnection,185 fn drop_conns<M>(
186     shared: &Arc<SharedPool<M>>,
187     mut internals: MutexGuard<PoolInternals<M::Connection>>,
188     conns: Vec<Conn<M::Connection>>,
189 ) where
190     M: ManageConnection,
191 {
192     internals.num_conns -= conns.len() as u32;
193     establish_idle_connections(shared, &mut internals);
194     drop(internals); // make sure we run connection destructors without this locked
195 
196     for conn in conns {
197         let event = ReleaseEvent {
198             id: conn.id,
199             age: conn.birth.elapsed(),
200         };
201         shared.config.event_handler.handle_release(event);
202         shared.config.connection_customizer.on_release(conn.conn);
203     }
204 }
205 
establish_idle_connections<M>( shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>, ) where M: ManageConnection,206 fn establish_idle_connections<M>(
207     shared: &Arc<SharedPool<M>>,
208     internals: &mut PoolInternals<M::Connection>,
209 ) where
210     M: ManageConnection,
211 {
212     let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
213     let idle = internals.conns.len() as u32;
214     for _ in idle..min {
215         add_connection(shared, internals);
216     }
217 }
218 
add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>) where M: ManageConnection,219 fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
220 where
221     M: ManageConnection,
222 {
223     if internals.num_conns + internals.pending_conns >= shared.config.max_size {
224         return;
225     }
226 
227     internals.pending_conns += 1;
228     inner(Duration::from_secs(0), shared);
229 
230     fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
231     where
232         M: ManageConnection,
233     {
234         let new_shared = Arc::downgrade(shared);
235         shared.config.thread_pool.execute_after(delay, move || {
236             let shared = match new_shared.upgrade() {
237                 Some(shared) => shared,
238                 None => return,
239             };
240 
241             let conn = shared.manager.connect().and_then(|mut conn| {
242                 shared
243                     .config
244                     .connection_customizer
245                     .on_acquire(&mut conn)
246                     .map(|_| conn)
247             });
248             match conn {
249                 Ok(conn) => {
250                     let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
251 
252                     let event = AcquireEvent { id };
253                     shared.config.event_handler.handle_acquire(event);
254 
255                     let mut internals = shared.internals.lock();
256                     internals.last_error = None;
257                     let now = Instant::now();
258                     let conn = IdleConn {
259                         conn: Conn {
260                             conn,
261                             extensions: Extensions::new(),
262                             birth: now,
263                             id,
264                         },
265                         idle_start: now,
266                     };
267                     internals.conns.push(conn);
268                     internals.pending_conns -= 1;
269                     internals.num_conns += 1;
270                     shared.cond.notify_one();
271                 }
272                 Err(err) => {
273                     shared.internals.lock().last_error = Some(err.to_string());
274                     shared.config.error_handler.handle_error(err);
275                     let delay = cmp::max(Duration::from_millis(200), delay);
276                     let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
277                     inner(delay, &shared);
278                 }
279             }
280         });
281     }
282 }
283 
reap_connections<M>(shared: &Weak<SharedPool<M>>) where M: ManageConnection,284 fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
285 where
286     M: ManageConnection,
287 {
288     let shared = match shared.upgrade() {
289         Some(shared) => shared,
290         None => return,
291     };
292 
293     let mut old = Vec::with_capacity(shared.config.max_size as usize);
294     let mut to_drop = vec![];
295 
296     let mut internals = shared.internals.lock();
297     mem::swap(&mut old, &mut internals.conns);
298     let now = Instant::now();
299     for conn in old {
300         let mut reap = false;
301         if let Some(timeout) = shared.config.idle_timeout {
302             reap |= now - conn.idle_start >= timeout;
303         }
304         if let Some(lifetime) = shared.config.max_lifetime {
305             reap |= now - conn.conn.birth >= lifetime;
306         }
307         if reap {
308             to_drop.push(conn.conn);
309         } else {
310             internals.conns.push(conn);
311         }
312     }
313     drop_conns(&shared, internals, to_drop);
314 }
315 
316 /// A generic connection pool.
317 pub struct Pool<M>(Arc<SharedPool<M>>)
318 where
319     M: ManageConnection;
320 
321 /// Returns a new `Pool` referencing the same state as `self`.
322 impl<M> Clone for Pool<M>
323 where
324     M: ManageConnection,
325 {
clone(&self) -> Pool<M>326     fn clone(&self) -> Pool<M> {
327         Pool(self.0.clone())
328     }
329 }
330 
331 impl<M> fmt::Debug for Pool<M>
332 where
333     M: ManageConnection + fmt::Debug,
334 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result335     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
336         fmt.debug_struct("Pool")
337             .field("state", &self.state())
338             .field("config", &self.0.config)
339             .field("manager", &self.0.manager)
340             .finish()
341     }
342 }
343 
344 impl<M> Pool<M>
345 where
346     M: ManageConnection,
347 {
348     /// Creates a new connection pool with a default configuration.
new(manager: M) -> Result<Pool<M>, Error>349     pub fn new(manager: M) -> Result<Pool<M>, Error> {
350         Pool::builder().build(manager)
351     }
352 
353     /// Returns a builder type to configure a new pool.
builder() -> Builder<M>354     pub fn builder() -> Builder<M> {
355         Builder::new()
356     }
357 
358     // for testing
new_inner( config: Config<M::Connection, M::Error>, manager: M, reaper_rate: Duration, ) -> Pool<M>359     fn new_inner(
360         config: Config<M::Connection, M::Error>,
361         manager: M,
362         reaper_rate: Duration,
363     ) -> Pool<M> {
364         let internals = PoolInternals {
365             conns: Vec::with_capacity(config.max_size as usize),
366             num_conns: 0,
367             pending_conns: 0,
368             last_error: None,
369         };
370 
371         let shared = Arc::new(SharedPool {
372             config,
373             manager,
374             internals: Mutex::new(internals),
375             cond: Condvar::new(),
376         });
377 
378         establish_idle_connections(&shared, &mut shared.internals.lock());
379 
380         if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
381             let s = Arc::downgrade(&shared);
382             shared
383                 .config
384                 .thread_pool
385                 .execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
386         }
387 
388         Pool(shared)
389     }
390 
wait_for_initialization(&self) -> Result<(), Error>391     fn wait_for_initialization(&self) -> Result<(), Error> {
392         let end = Instant::now() + self.0.config.connection_timeout;
393         let mut internals = self.0.internals.lock();
394 
395         let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
396 
397         while internals.num_conns != initial_size {
398             if self.0.cond.wait_until(&mut internals, end).timed_out() {
399                 return Err(Error(internals.last_error.take()));
400             }
401         }
402 
403         Ok(())
404     }
405 
406     /// Retrieves a connection from the pool.
407     ///
408     /// Waits for at most the configured connection timeout before returning an
409     /// error.
get(&self) -> Result<PooledConnection<M>, Error>410     pub fn get(&self) -> Result<PooledConnection<M>, Error> {
411         self.get_timeout(self.0.config.connection_timeout)
412     }
413 
414     /// Retrieves a connection from the pool, waiting for at most `timeout`
415     ///
416     /// The given timeout will be used instead of the configured connection
417     /// timeout.
get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error>418     pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
419         let start = Instant::now();
420         let end = start + timeout;
421         let mut internals = self.0.internals.lock();
422 
423         loop {
424             match self.try_get_inner(internals) {
425                 Ok(conn) => {
426                     let event = CheckoutEvent {
427                         id: conn.conn.as_ref().unwrap().id,
428                         duration: start.elapsed(),
429                     };
430                     self.0.config.event_handler.handle_checkout(event);
431                     return Ok(conn);
432                 }
433                 Err(i) => internals = i,
434             }
435 
436             add_connection(&self.0, &mut internals);
437 
438             if self.0.cond.wait_until(&mut internals, end).timed_out() {
439                 let event = TimeoutEvent { timeout };
440                 self.0.config.event_handler.handle_timeout(event);
441 
442                 return Err(Error(internals.last_error.take()));
443             }
444         }
445     }
446 
447     /// Attempts to retrieve a connection from the pool if there is one
448     /// available.
449     ///
450     /// Returns `None` if there are no idle connections available in the pool.
451     /// This method will not block waiting to establish a new connection.
try_get(&self) -> Option<PooledConnection<M>>452     pub fn try_get(&self) -> Option<PooledConnection<M>> {
453         self.try_get_inner(self.0.internals.lock()).ok()
454     }
455 
try_get_inner<'a>( &'a self, mut internals: MutexGuard<'a, PoolInternals<M::Connection>>, ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>>456     fn try_get_inner<'a>(
457         &'a self,
458         mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
459     ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
460         loop {
461             if let Some(mut conn) = internals.conns.pop() {
462                 establish_idle_connections(&self.0, &mut internals);
463                 drop(internals);
464 
465                 if self.0.config.test_on_check_out {
466                     if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
467                         let msg = e.to_string();
468                         self.0.config.error_handler.handle_error(e);
469                         // FIXME we shouldn't have to lock, unlock, and relock here
470                         internals = self.0.internals.lock();
471                         internals.last_error = Some(msg);
472                         drop_conns(&self.0, internals, vec![conn.conn]);
473                         internals = self.0.internals.lock();
474                         continue;
475                     }
476                 }
477 
478                 return Ok(PooledConnection {
479                     pool: self.clone(),
480                     checkout: Instant::now(),
481                     conn: Some(conn.conn),
482                 });
483             } else {
484                 return Err(internals);
485             }
486         }
487     }
488 
put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>)489     fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
490         let event = CheckinEvent {
491             id: conn.id,
492             duration: checkout.elapsed(),
493         };
494         self.0.config.event_handler.handle_checkin(event);
495 
496         // This is specified to be fast, but call it before locking anyways
497         let broken = self.0.manager.has_broken(&mut conn.conn);
498 
499         let mut internals = self.0.internals.lock();
500         if broken {
501             drop_conns(&self.0, internals, vec![conn]);
502         } else {
503             let conn = IdleConn {
504                 conn,
505                 idle_start: Instant::now(),
506             };
507             internals.conns.push(conn);
508             self.0.cond.notify_one();
509         }
510     }
511 
512     /// Returns information about the current state of the pool.
state(&self) -> State513     pub fn state(&self) -> State {
514         let internals = self.0.internals.lock();
515         State {
516             connections: internals.num_conns,
517             idle_connections: internals.conns.len() as u32,
518             _p: (),
519         }
520     }
521 
522     /// Returns the configured maximum pool size.
max_size(&self) -> u32523     pub fn max_size(&self) -> u32 {
524         self.0.config.max_size
525     }
526 
527     /// Returns the configured mimimum idle connection count.
min_idle(&self) -> Option<u32>528     pub fn min_idle(&self) -> Option<u32> {
529         self.0.config.min_idle
530     }
531 
532     /// Returns if the pool is configured to test connections on check out.
test_on_check_out(&self) -> bool533     pub fn test_on_check_out(&self) -> bool {
534         self.0.config.test_on_check_out
535     }
536 
537     /// Returns the configured maximum connection lifetime.
max_lifetime(&self) -> Option<Duration>538     pub fn max_lifetime(&self) -> Option<Duration> {
539         self.0.config.max_lifetime
540     }
541 
542     /// Returns the configured idle connection timeout.
idle_timeout(&self) -> Option<Duration>543     pub fn idle_timeout(&self) -> Option<Duration> {
544         self.0.config.idle_timeout
545     }
546 
547     /// Returns the configured connection timeout.
connection_timeout(&self) -> Duration548     pub fn connection_timeout(&self) -> Duration {
549         self.0.config.connection_timeout
550     }
551 }
552 
553 /// The error type returned by methods in this crate.
554 #[derive(Debug)]
555 pub struct Error(Option<String>);
556 
557 impl fmt::Display for Error {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result558     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
559         fmt.write_str(error::Error::description(self))?;
560         if let Some(ref err) = self.0 {
561             write!(fmt, ": {}", err)?;
562         }
563         Ok(())
564     }
565 }
566 
567 impl error::Error for Error {
description(&self) -> &str568     fn description(&self) -> &str {
569         "timed out waiting for connection"
570     }
571 }
572 
573 /// Information about the state of a `Pool`.
574 pub struct State {
575     /// The number of connections currently being managed by the pool.
576     pub connections: u32,
577     /// The number of idle connections.
578     pub idle_connections: u32,
579     _p: (),
580 }
581 
582 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result583     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
584         fmt.debug_struct("State")
585             .field("connections", &self.connections)
586             .field("idle_connections", &self.idle_connections)
587             .finish()
588     }
589 }
590 
591 /// A smart pointer wrapping a connection.
592 pub struct PooledConnection<M>
593 where
594     M: ManageConnection,
595 {
596     pool: Pool<M>,
597     checkout: Instant,
598     conn: Option<Conn<M::Connection>>,
599 }
600 
601 impl<M> fmt::Debug for PooledConnection<M>
602 where
603     M: ManageConnection,
604     M::Connection: fmt::Debug,
605 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result606     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
607         fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
608     }
609 }
610 
611 impl<M> Drop for PooledConnection<M>
612 where
613     M: ManageConnection,
614 {
drop(&mut self)615     fn drop(&mut self) {
616         self.pool.put_back(self.checkout, self.conn.take().unwrap());
617     }
618 }
619 
620 impl<M> Deref for PooledConnection<M>
621 where
622     M: ManageConnection,
623 {
624     type Target = M::Connection;
625 
deref(&self) -> &M::Connection626     fn deref(&self) -> &M::Connection {
627         &self.conn.as_ref().unwrap().conn
628     }
629 }
630 
631 impl<M> DerefMut for PooledConnection<M>
632 where
633     M: ManageConnection,
634 {
deref_mut(&mut self) -> &mut M::Connection635     fn deref_mut(&mut self) -> &mut M::Connection {
636         &mut self.conn.as_mut().unwrap().conn
637     }
638 }
639 
640 impl<M> PooledConnection<M>
641 where
642     M: ManageConnection,
643 {
644     /// Returns a shared reference to the extensions associated with this connection.
extensions(this: &Self) -> &Extensions645     pub fn extensions(this: &Self) -> &Extensions {
646         &this.conn.as_ref().unwrap().extensions
647     }
648 
649     /// Returns a mutable reference to the extensions associated with this connection.
extensions_mut(this: &mut Self) -> &mut Extensions650     pub fn extensions_mut(this: &mut Self) -> &mut Extensions {
651         &mut this.conn.as_mut().unwrap().extensions
652     }
653 }
654