1 use std::collections::{HashMap, HashSet, VecDeque};
2 use std::fmt;
3 use std::ops::{Deref, DerefMut};
4 use std::sync::{Arc, Mutex, Weak};
5 use std::time::{Duration, Instant};
6 
7 use futures::{Future, Async, Poll};
8 use futures::sync::oneshot;
9 #[cfg(feature = "runtime")]
10 use tokio_timer::Interval;
11 
12 use common::Exec;
13 use super::Ver;
14 
15 // FIXME: allow() required due to `impl Trait` leaking types to this lint
16 #[allow(missing_debug_implementations)]
17 pub(super) struct Pool<T> {
18     // If the pool is disabled, this is None.
19     inner: Option<Arc<Mutex<PoolInner<T>>>>,
20 }
21 
22 // Before using a pooled connection, make sure the sender is not dead.
23 //
24 // This is a trait to allow the `client::pool::tests` to work for `i32`.
25 //
26 // See https://github.com/hyperium/hyper/issues/1429
27 pub(super) trait Poolable: Send + Sized + 'static {
is_open(&self) -> bool28     fn is_open(&self) -> bool;
29     /// Reserve this connection.
30     ///
31     /// Allows for HTTP/2 to return a shared reservation.
reserve(self) -> Reservation<Self>32     fn reserve(self) -> Reservation<Self>;
can_share(&self) -> bool33     fn can_share(&self) -> bool;
34 }
35 
36 /// When checking out a pooled connection, it might be that the connection
37 /// only supports a single reservation, or it might be usable for many.
38 ///
39 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
40 /// used for multiple requests.
41 // FIXME: allow() required due to `impl Trait` leaking types to this lint
42 #[allow(missing_debug_implementations)]
43 pub(super) enum Reservation<T> {
44     /// This connection could be used multiple times, the first one will be
45     /// reinserted into the `idle` pool, and the second will be given to
46     /// the `Checkout`.
47     Shared(T, T),
48     /// This connection requires unique access. It will be returned after
49     /// use is complete.
50     Unique(T),
51 }
52 
53 /// Simple type alias in case the key type needs to be adjusted.
54 pub(super) type Key = Arc<String>;
55 
56 struct PoolInner<T> {
57     // A flag that a connection is being estabilished, and the connection
58     // should be shared. This prevents making multiple HTTP/2 connections
59     // to the same host.
60     connecting: HashSet<Key>,
61     // These are internal Conns sitting in the event loop in the KeepAlive
62     // state, waiting to receive a new Request to send on the socket.
63     idle: HashMap<Key, Vec<Idle<T>>>,
64     max_idle_per_host: usize,
65     // These are outstanding Checkouts that are waiting for a socket to be
66     // able to send a Request one. This is used when "racing" for a new
67     // connection.
68     //
69     // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
70     // for the Pool to receive an idle Conn. When a Conn becomes idle,
71     // this list is checked for any parked Checkouts, and tries to notify
72     // them that the Conn could be used instead of waiting for a brand new
73     // connection.
74     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
75     // A oneshot channel is used to allow the interval to be notified when
76     // the Pool completely drops. That way, the interval can cancel immediately.
77     #[cfg(feature = "runtime")]
78     idle_interval_ref: Option<oneshot::Sender<::common::Never>>,
79     #[cfg(feature = "runtime")]
80     exec: Exec,
81     timeout: Option<Duration>,
82 }
83 
84 // This is because `Weak::new()` *allocates* space for `T`, even if it
85 // doesn't need it!
86 struct WeakOpt<T>(Option<Weak<T>>);
87 
88 #[derive(Clone, Copy, Debug)]
89 pub(super) struct Config {
90     pub(super) enabled: bool,
91     pub(super) keep_alive_timeout: Option<Duration>,
92     pub(super) max_idle_per_host: usize,
93 }
94 
95 impl<T> Pool<T> {
new(config: Config, __exec: &Exec) -> Pool<T>96     pub fn new(config: Config, __exec: &Exec) -> Pool<T> {
97         let inner = if config.enabled {
98              Some(Arc::new(Mutex::new(PoolInner {
99                 connecting: HashSet::new(),
100                 idle: HashMap::new(),
101                 #[cfg(feature = "runtime")]
102                 idle_interval_ref: None,
103                 max_idle_per_host: config.max_idle_per_host,
104                 waiters: HashMap::new(),
105                 #[cfg(feature = "runtime")]
106                 exec: __exec.clone(),
107                 timeout: config.keep_alive_timeout,
108              })))
109         } else {
110             None
111         };
112 
113         Pool {
114             inner,
115         }
116     }
117 
is_enabled(&self) -> bool118     fn is_enabled(&self) -> bool {
119         self.inner.is_some()
120     }
121 
122     #[cfg(test)]
no_timer(&self)123     pub(super) fn no_timer(&self) {
124         // Prevent an actual interval from being created for this pool...
125         #[cfg(feature = "runtime")]
126         {
127             let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
128             assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
129             let (tx, _) = oneshot::channel();
130             inner.idle_interval_ref = Some(tx);
131         }
132     }
133 }
134 
135 impl<T: Poolable> Pool<T> {
136     /// Returns a `Checkout` which is a future that resolves if an idle
137     /// connection becomes available.
checkout(&self, key: Key) -> Checkout<T>138     pub fn checkout(&self, key: Key) -> Checkout<T> {
139         Checkout {
140             key,
141             pool: self.clone(),
142             waiter: None,
143         }
144     }
145 
146     /// Ensure that there is only ever 1 connecting task for HTTP/2
147     /// connections. This does nothing for HTTP/1.
connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>148     pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
149         if ver == Ver::Http2 {
150             if let Some(ref enabled) = self.inner {
151                 let mut inner = enabled.lock().unwrap();
152                 return if inner.connecting.insert(key.clone()) {
153                     let connecting = Connecting {
154                         key: key.clone(),
155                         pool: WeakOpt::downgrade(enabled),
156                     };
157                     Some(connecting)
158                 } else {
159                     trace!("HTTP/2 connecting already in progress for {:?}", key);
160                     None
161                 };
162             }
163         }
164 
165         // else
166         Some(Connecting {
167             key: key.clone(),
168             // in HTTP/1's case, there is never a lock, so we don't
169             // need to do anything in Drop.
170             pool: WeakOpt::none(),
171         })
172     }
173 
174     #[cfg(test)]
locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>>175     fn locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>> {
176         self
177             .inner
178             .as_ref()
179             .expect("enabled")
180             .lock()
181             .expect("lock")
182     }
183 
184     #[cfg(feature = "runtime")]
185     #[cfg(test)]
h1_key(&self, s: &str) -> Key186     pub(super) fn h1_key(&self, s: &str) -> Key {
187         Arc::new(s.to_string())
188     }
189 
190     #[cfg(feature = "runtime")]
191     #[cfg(test)]
idle_count(&self, key: &Key) -> usize192     pub(super) fn idle_count(&self, key: &Key) -> usize {
193         self
194             .locked()
195             .idle
196             .get(key)
197             .map(|list| list.len())
198             .unwrap_or(0)
199     }
200 
pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T>201     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
202         let (value, pool_ref) = if let Some(ref enabled) = self.inner {
203             match value.reserve() {
204                 Reservation::Shared(to_insert, to_return) => {
205                     let mut inner = enabled.lock().unwrap();
206                     inner.put(connecting.key.clone(), to_insert, enabled);
207                     // Do this here instead of Drop for Connecting because we
208                     // already have a lock, no need to lock the mutex twice.
209                     inner.connected(&connecting.key);
210                     // prevent the Drop of Connecting from repeating inner.connected()
211                     connecting.pool = WeakOpt::none();
212 
213                     // Shared reservations don't need a reference to the pool,
214                     // since the pool always keeps a copy.
215                     (to_return, WeakOpt::none())
216                 },
217                 Reservation::Unique(value) => {
218                     // Unique reservations must take a reference to the pool
219                     // since they hope to reinsert once the reservation is
220                     // completed
221                     (value, WeakOpt::downgrade(enabled))
222                 },
223             }
224         } else {
225             // If pool is not enabled, skip all the things...
226 
227             // The Connecting should have had no pool ref
228             debug_assert!(connecting.pool.upgrade().is_none());
229 
230             (value, WeakOpt::none())
231         };
232         Pooled {
233             key: connecting.key.clone(),
234             is_reused: false,
235             pool: pool_ref,
236             value: Some(value)
237         }
238     }
239 
reuse(&self, key: &Key, value: T) -> Pooled<T>240     fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
241         debug!("reuse idle connection for {:?}", key);
242         // TODO: unhack this
243         // In Pool::pooled(), which is used for inserting brand new connections,
244         // there's some code that adjusts the pool reference taken depending
245         // on if the Reservation can be shared or is unique. By the time
246         // reuse() is called, the reservation has already been made, and
247         // we just have the final value, without knowledge of if this is
248         // unique or shared. So, the hack is to just assume Ver::Http2 means
249         // shared... :(
250         let mut pool_ref = WeakOpt::none();
251         if !value.can_share() {
252             if let Some(ref enabled) = self.inner {
253                 pool_ref = WeakOpt::downgrade(enabled);
254             }
255         }
256 
257         Pooled {
258             is_reused: true,
259             key: key.clone(),
260             pool: pool_ref,
261             value: Some(value),
262         }
263     }
264 }
265 
266 /// Pop off this list, looking for a usable connection that hasn't expired.
267 struct IdlePopper<'a, T: 'a> {
268     key: &'a Key,
269     list: &'a mut Vec<Idle<T>>,
270 }
271 
272 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
pop(self, expiration: &Expiration) -> Option<Idle<T>>273     fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
274         while let Some(entry) = self.list.pop() {
275             // If the connection has been closed, or is older than our idle
276             // timeout, simply drop it and keep looking...
277             if !entry.value.is_open() {
278                 trace!("removing closed connection for {:?}", self.key);
279                 continue;
280             }
281             // TODO: Actually, since the `idle` list is pushed to the end always,
282             // that would imply that if *this* entry is expired, then anything
283             // "earlier" in the list would *have* to be expired also... Right?
284             //
285             // In that case, we could just break out of the loop and drop the
286             // whole list...
287             if expiration.expires(entry.idle_at) {
288                 trace!("removing expired connection for {:?}", self.key);
289                 continue;
290             }
291 
292             let value = match entry.value.reserve() {
293                 Reservation::Shared(to_reinsert, to_checkout) => {
294                     self.list.push(Idle {
295                         idle_at: Instant::now(),
296                         value: to_reinsert,
297                     });
298                     to_checkout
299                 },
300                 Reservation::Unique(unique) => {
301                     unique
302                 }
303             };
304 
305             return Some(Idle {
306                 idle_at: entry.idle_at,
307                 value,
308             });
309         }
310 
311         None
312     }
313 }
314 
315 impl<T: Poolable> PoolInner<T> {
put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)316     fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
317         if value.can_share() && self.idle.contains_key(&key) {
318             trace!("put; existing idle HTTP/2 connection for {:?}", key);
319             return;
320         }
321         trace!("put; add idle connection for {:?}", key);
322         let mut remove_waiters = false;
323         let mut value = Some(value);
324         if let Some(waiters) = self.waiters.get_mut(&key) {
325             while let Some(tx) = waiters.pop_front() {
326                 if !tx.is_canceled() {
327                     let reserved = value.take().expect("value already sent");
328                     let reserved = match reserved.reserve() {
329                         Reservation::Shared(to_keep, to_send) => {
330                             value = Some(to_keep);
331                             to_send
332                         },
333                         Reservation::Unique(uniq) => uniq,
334                     };
335                     match tx.send(reserved) {
336                         Ok(()) => {
337                             if value.is_none() {
338                                 break;
339                             } else {
340                                 continue;
341                             }
342                         },
343                         Err(e) => {
344                             value = Some(e);
345                         }
346                     }
347                 }
348 
349                 trace!("put; removing canceled waiter for {:?}", key);
350             }
351             remove_waiters = waiters.is_empty();
352         }
353         if remove_waiters {
354             self.waiters.remove(&key);
355         }
356 
357         match value {
358             Some(value) => {
359                 // borrow-check scope...
360                 {
361                     let idle_list = self
362                         .idle
363                         .entry(key.clone())
364                         .or_insert(Vec::new());
365                     if self.max_idle_per_host <= idle_list.len() {
366                         trace!("max idle per host for {:?}, dropping connection", key);
367                         return;
368                     }
369 
370                     debug!("pooling idle connection for {:?}", key);
371                     idle_list.push(Idle {
372                         value: value,
373                         idle_at: Instant::now(),
374                     });
375                 }
376 
377                 #[cfg(feature = "runtime")]
378                 {
379                     self.spawn_idle_interval(__pool_ref);
380                 }
381             }
382             None => trace!("put; found waiter for {:?}", key),
383         }
384     }
385 
386     /// A `Connecting` task is complete. Not necessarily successfully,
387     /// but the lock is going away, so clean up.
connected(&mut self, key: &Key)388     fn connected(&mut self, key: &Key) {
389         let existed = self.connecting.remove(key);
390         debug_assert!(
391             existed,
392             "Connecting dropped, key not in pool.connecting"
393         );
394         // cancel any waiters. if there are any, it's because
395         // this Connecting task didn't complete successfully.
396         // those waiters would never receive a connection.
397         self.waiters.remove(key);
398     }
399 
400     #[cfg(feature = "runtime")]
spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)401     fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
402         let (dur, rx) = {
403             if self.idle_interval_ref.is_some() {
404                 return;
405             }
406 
407             if let Some(dur) = self.timeout {
408                 let (tx, rx) = oneshot::channel();
409                 self.idle_interval_ref = Some(tx);
410                 (dur, rx)
411             } else {
412                 return
413             }
414         };
415 
416         let start = Instant::now() + dur;
417 
418         let interval = IdleInterval {
419             interval: Interval::new(start, dur),
420             pool: WeakOpt::downgrade(pool_ref),
421             pool_drop_notifier: rx,
422         };
423 
424         if let Err(err) = self.exec.execute(interval) {
425             // This task isn't critical, so simply log and ignore.
426             warn!("error spawning connection pool idle interval: {}", err);
427         }
428     }
429 }
430 
431 impl<T> PoolInner<T> {
432     /// Any `FutureResponse`s that were created will have made a `Checkout`,
433     /// and possibly inserted into the pool that it is waiting for an idle
434     /// connection. If a user ever dropped that future, we need to clean out
435     /// those parked senders.
clean_waiters(&mut self, key: &Key)436     fn clean_waiters(&mut self, key: &Key) {
437         let mut remove_waiters = false;
438         if let Some(waiters) = self.waiters.get_mut(key) {
439             waiters.retain(|tx| {
440                 !tx.is_canceled()
441             });
442             remove_waiters = waiters.is_empty();
443         }
444         if remove_waiters {
445             self.waiters.remove(key);
446         }
447     }
448 }
449 
450 #[cfg(feature = "runtime")]
451 impl<T: Poolable> PoolInner<T> {
452     /// This should *only* be called by the IdleInterval.
clear_expired(&mut self)453     fn clear_expired(&mut self) {
454         let dur = self.timeout.expect("interval assumes timeout");
455 
456         let now = Instant::now();
457         //self.last_idle_check_at = now;
458 
459         self.idle.retain(|key, values| {
460             values.retain(|entry| {
461                 if !entry.value.is_open() {
462                     trace!("idle interval evicting closed for {:?}", key);
463                     return false;
464                 }
465                 if now - entry.idle_at > dur {
466                     trace!("idle interval evicting expired for {:?}", key);
467                     return false;
468                 }
469 
470                 // Otherwise, keep this value...
471                 true
472             });
473 
474             // returning false evicts this key/val
475             !values.is_empty()
476         });
477     }
478 }
479 
480 impl<T> Clone for Pool<T> {
clone(&self) -> Pool<T>481     fn clone(&self) -> Pool<T> {
482         Pool {
483             inner: self.inner.clone(),
484         }
485     }
486 }
487 
488 /// A wrapped poolable value that tries to reinsert to the Pool on Drop.
489 // Note: The bounds `T: Poolable` is needed for the Drop impl.
490 pub(super) struct Pooled<T: Poolable> {
491     value: Option<T>,
492     is_reused: bool,
493     key: Key,
494     pool: WeakOpt<Mutex<PoolInner<T>>>,
495 }
496 
497 impl<T: Poolable> Pooled<T> {
is_reused(&self) -> bool498     pub fn is_reused(&self) -> bool {
499         self.is_reused
500     }
501 
is_pool_enabled(&self) -> bool502     pub fn is_pool_enabled(&self) -> bool {
503         self.pool.0.is_some()
504     }
505 
as_ref(&self) -> &T506     fn as_ref(&self) -> &T {
507         self.value.as_ref().expect("not dropped")
508     }
509 
as_mut(&mut self) -> &mut T510     fn as_mut(&mut self) -> &mut T {
511         self.value.as_mut().expect("not dropped")
512     }
513 }
514 
515 impl<T: Poolable> Deref for Pooled<T> {
516     type Target = T;
deref(&self) -> &T517     fn deref(&self) -> &T {
518         self.as_ref()
519     }
520 }
521 
522 impl<T: Poolable> DerefMut for Pooled<T> {
deref_mut(&mut self) -> &mut T523     fn deref_mut(&mut self) -> &mut T {
524         self.as_mut()
525     }
526 }
527 
528 impl<T: Poolable> Drop for Pooled<T> {
drop(&mut self)529     fn drop(&mut self) {
530         if let Some(value) = self.value.take() {
531             if !value.is_open() {
532                 // If we *already* know the connection is done here,
533                 // it shouldn't be re-inserted back into the pool.
534                 return;
535             }
536 
537             if let Some(pool) = self.pool.upgrade() {
538                 if let Ok(mut inner) = pool.lock() {
539                     inner.put(self.key.clone(), value, &pool);
540                 }
541             } else if !value.can_share() {
542                 trace!("pool dropped, dropping pooled ({:?})", self.key);
543             }
544             // Ver::Http2 is already in the Pool (or dead), so we wouldn't
545             // have an actual reference to the Pool.
546         }
547     }
548 }
549 
550 impl<T: Poolable> fmt::Debug for Pooled<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result551     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
552         f.debug_struct("Pooled")
553             .field("key", &self.key)
554             .finish()
555     }
556 }
557 
558 struct Idle<T> {
559     idle_at: Instant,
560     value: T,
561 }
562 
563 // FIXME: allow() required due to `impl Trait` leaking types to this lint
564 #[allow(missing_debug_implementations)]
565 pub(super) struct Checkout<T> {
566     key: Key,
567     pool: Pool<T>,
568     waiter: Option<oneshot::Receiver<T>>,
569 }
570 
571 impl<T: Poolable> Checkout<T> {
poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error>572     fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error> {
573         static CANCELED: &str = "pool checkout failed";
574         if let Some(mut rx) = self.waiter.take() {
575             match rx.poll() {
576                 Ok(Async::Ready(value)) => {
577                     if value.is_open() {
578                         Ok(Async::Ready(Some(self.pool.reuse(&self.key, value))))
579                     } else {
580                         Err(::Error::new_canceled().with(CANCELED))
581                     }
582                 },
583                 Ok(Async::NotReady) => {
584                     self.waiter = Some(rx);
585                     Ok(Async::NotReady)
586                 },
587                 Err(_canceled) => Err(::Error::new_canceled().with(CANCELED)),
588             }
589         } else {
590             Ok(Async::Ready(None))
591         }
592     }
593 
checkout(&mut self) -> Option<Pooled<T>>594     fn checkout(&mut self) -> Option<Pooled<T>> {
595         let entry = {
596             let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
597             let expiration = Expiration::new(inner.timeout);
598             let maybe_entry = inner.idle.get_mut(&self.key)
599                 .and_then(|list| {
600                     trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
601                     // A block to end the mutable borrow on list,
602                     // so the map below can check is_empty()
603                     {
604                         let popper = IdlePopper {
605                             key: &self.key,
606                             list,
607                         };
608                         popper.pop(&expiration)
609                     }
610                         .map(|e| (e, list.is_empty()))
611                 });
612 
613             let (entry, empty) = if let Some((e, empty)) = maybe_entry {
614                 (Some(e), empty)
615             } else {
616                 // No entry found means nuke the list for sure.
617                 (None, true)
618             };
619             if empty {
620                 //TODO: This could be done with the HashMap::entry API instead.
621                 inner.idle.remove(&self.key);
622             }
623 
624             if entry.is_none() && self.waiter.is_none() {
625                 let (tx, mut rx) = oneshot::channel();
626                 let _ = rx.poll(); // park this task
627 
628                 trace!("checkout waiting for idle connection: {:?}", self.key);
629                 inner
630                     .waiters
631                     .entry(self.key.clone())
632                     .or_insert(VecDeque::new())
633                     .push_back(tx);
634 
635                 self.waiter = Some(rx);
636             }
637 
638             entry
639         };
640 
641         entry.map(|e| self.pool.reuse(&self.key, e.value))
642     }
643 }
644 
645 impl<T: Poolable> Future for Checkout<T> {
646     type Item = Pooled<T>;
647     type Error = ::Error;
648 
poll(&mut self) -> Poll<Self::Item, Self::Error>649     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
650         if let Some(pooled) = try_ready!(self.poll_waiter()) {
651             return Ok(Async::Ready(pooled));
652         }
653 
654         if let Some(pooled) = self.checkout() {
655             Ok(Async::Ready(pooled))
656         } else if !self.pool.is_enabled() {
657             Err(::Error::new_canceled().with("pool is disabled"))
658         } else {
659             Ok(Async::NotReady)
660         }
661     }
662 }
663 
664 impl<T> Drop for Checkout<T> {
drop(&mut self)665     fn drop(&mut self) {
666         if self.waiter.take().is_some() {
667             trace!("checkout dropped for {:?}", self.key);
668             if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
669                 inner.clean_waiters(&self.key);
670             }
671         }
672     }
673 }
674 
675 // FIXME: allow() required due to `impl Trait` leaking types to this lint
676 #[allow(missing_debug_implementations)]
677 pub(super) struct Connecting<T: Poolable> {
678     key: Key,
679     pool: WeakOpt<Mutex<PoolInner<T>>>,
680 }
681 
682 impl<T: Poolable> Connecting<T> {
alpn_h2(self, pool: &Pool<T>) -> Option<Self>683     pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
684         debug_assert!(
685             self.pool.0.is_none(),
686             "Connecting::alpn_h2 but already Http2"
687         );
688 
689         pool.connecting(&self.key, Ver::Http2)
690     }
691 }
692 
693 impl<T: Poolable> Drop for Connecting<T> {
drop(&mut self)694     fn drop(&mut self) {
695         if let Some(pool) = self.pool.upgrade() {
696             // No need to panic on drop, that could abort!
697             if let Ok(mut inner) = pool.lock() {
698                 inner.connected(&self.key);
699             }
700         }
701     }
702 }
703 
704 struct Expiration(Option<Duration>);
705 
706 impl Expiration {
new(dur: Option<Duration>) -> Expiration707     fn new(dur: Option<Duration>) -> Expiration {
708         Expiration(dur)
709     }
710 
expires(&self, instant: Instant) -> bool711     fn expires(&self, instant: Instant) -> bool {
712         match self.0 {
713             Some(timeout) => instant.elapsed() > timeout,
714             None => false,
715         }
716     }
717 }
718 
719 #[cfg(feature = "runtime")]
720 struct IdleInterval<T> {
721     interval: Interval,
722     pool: WeakOpt<Mutex<PoolInner<T>>>,
723     // This allows the IdleInterval to be notified as soon as the entire
724     // Pool is fully dropped, and shutdown. This channel is never sent on,
725     // but Err(Canceled) will be received when the Pool is dropped.
726     pool_drop_notifier: oneshot::Receiver<::common::Never>,
727 }
728 
729 #[cfg(feature = "runtime")]
730 impl<T: Poolable + 'static> Future for IdleInterval<T> {
731     type Item = ();
732     type Error = ();
733 
poll(&mut self) -> Poll<Self::Item, Self::Error>734     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
735         // Interval is a Stream
736         use futures::Stream;
737 
738         loop {
739             match self.pool_drop_notifier.poll() {
740                 Ok(Async::Ready(n)) => match n {},
741                 Ok(Async::NotReady) => (),
742                 Err(_canceled) => {
743                     trace!("pool closed, canceling idle interval");
744                     return Ok(Async::Ready(()));
745                 }
746             }
747 
748             try_ready!(self.interval.poll().map_err(|err| {
749                 error!("idle interval timer error: {}", err);
750             }));
751 
752             if let Some(inner) = self.pool.upgrade() {
753                 if let Ok(mut inner) = inner.lock() {
754                     trace!("idle interval checking for expired");
755                     inner.clear_expired();
756                     continue;
757                 }
758             }
759             return Ok(Async::Ready(()));
760         }
761     }
762 }
763 
764 impl<T> WeakOpt<T> {
none() -> Self765     fn none() -> Self {
766         WeakOpt(None)
767     }
768 
downgrade(arc: &Arc<T>) -> Self769     fn downgrade(arc: &Arc<T>) -> Self {
770         WeakOpt(Some(Arc::downgrade(arc)))
771     }
772 
upgrade(&self) -> Option<Arc<T>>773     fn upgrade(&self) -> Option<Arc<T>> {
774         self.0
775             .as_ref()
776             .and_then(Weak::upgrade)
777     }
778 }
779 
780 #[cfg(test)]
781 mod tests {
782     use std::sync::Arc;
783     use std::time::Duration;
784     use futures::{Async, Future};
785     use futures::future;
786     use common::Exec;
787     use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
788 
789     /// Test unique reservations.
790     #[derive(Debug, PartialEq, Eq)]
791     struct Uniq<T>(T);
792 
793     impl<T: Send + 'static> Poolable for Uniq<T> {
is_open(&self) -> bool794         fn is_open(&self) -> bool {
795             true
796         }
797 
reserve(self) -> Reservation<Self>798         fn reserve(self) -> Reservation<Self> {
799             Reservation::Unique(self)
800         }
801 
can_share(&self) -> bool802         fn can_share(&self) -> bool {
803             false
804         }
805     }
806 
c<T: Poolable>(key: Key) -> Connecting<T>807     fn c<T: Poolable>(key: Key) -> Connecting<T> {
808         Connecting {
809             key,
810             pool: WeakOpt::none(),
811         }
812     }
813 
pool_no_timer<T>() -> Pool<T>814     fn pool_no_timer<T>() -> Pool<T> {
815         pool_max_idle_no_timer(::std::usize::MAX)
816     }
817 
pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>818     fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
819         let pool = Pool::new(super::Config {
820                 enabled: true,
821                 keep_alive_timeout: Some(Duration::from_millis(100)),
822                 max_idle_per_host: max_idle,
823             },
824             &Exec::Default,
825         );
826         pool.no_timer();
827         pool
828     }
829 
830     #[test]
test_pool_checkout_smoke()831     fn test_pool_checkout_smoke() {
832         let pool = pool_no_timer();
833         let key = Arc::new("foo".to_string());
834         let pooled = pool.pooled(c(key.clone()), Uniq(41));
835 
836         drop(pooled);
837 
838         match pool.checkout(key).poll().unwrap() {
839             Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)),
840             _ => panic!("not ready"),
841         }
842     }
843 
844     #[test]
test_pool_checkout_returns_none_if_expired()845     fn test_pool_checkout_returns_none_if_expired() {
846         future::lazy(|| {
847             let pool = pool_no_timer();
848             let key = Arc::new("foo".to_string());
849             let pooled = pool.pooled(c(key.clone()), Uniq(41));
850             drop(pooled);
851             ::std::thread::sleep(pool.locked().timeout.unwrap());
852             assert!(pool.checkout(key).poll().unwrap().is_not_ready());
853             ::futures::future::ok::<(), ()>(())
854         }).wait().unwrap();
855     }
856 
857     #[test]
test_pool_checkout_removes_expired()858     fn test_pool_checkout_removes_expired() {
859         future::lazy(|| {
860             let pool = pool_no_timer();
861             let key = Arc::new("foo".to_string());
862 
863             pool.pooled(c(key.clone()), Uniq(41));
864             pool.pooled(c(key.clone()), Uniq(5));
865             pool.pooled(c(key.clone()), Uniq(99));
866 
867             assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
868             ::std::thread::sleep(pool.locked().timeout.unwrap());
869 
870             // checkout.poll() should clean out the expired
871             pool.checkout(key.clone()).poll().unwrap();
872             assert!(pool.locked().idle.get(&key).is_none());
873 
874             Ok::<(), ()>(())
875         }).wait().unwrap();
876     }
877 
878     #[test]
test_pool_max_idle_per_host()879     fn test_pool_max_idle_per_host() {
880         future::lazy(|| {
881             let pool = pool_max_idle_no_timer(2);
882             let key = Arc::new("foo".to_string());
883 
884             pool.pooled(c(key.clone()), Uniq(41));
885             pool.pooled(c(key.clone()), Uniq(5));
886             pool.pooled(c(key.clone()), Uniq(99));
887 
888             // pooled and dropped 3, max_idle should only allow 2
889             assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2));
890 
891             Ok::<(), ()>(())
892         }).wait().unwrap();
893     }
894 
895     #[cfg(feature = "runtime")]
896     #[test]
test_pool_timer_removes_expired()897     fn test_pool_timer_removes_expired() {
898         use std::time::Instant;
899         use tokio_timer::Delay;
900         let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap();
901         let pool = Pool::new(super::Config {
902                 enabled: true,
903                 keep_alive_timeout: Some(Duration::from_millis(100)),
904                 max_idle_per_host: ::std::usize::MAX,
905             },
906             &Exec::Default,
907         );
908 
909         let key = Arc::new("foo".to_string());
910 
911         // Since pool.pooled() will be calling spawn on drop, need to be sure
912         // those drops are called while `rt` is the current executor. To do so,
913         // call those inside a future.
914         rt.block_on(::futures::future::lazy(|| {
915             pool.pooled(c(key.clone()), Uniq(41));
916             pool.pooled(c(key.clone()), Uniq(5));
917             pool.pooled(c(key.clone()), Uniq(99));
918             Ok::<_, ()>(())
919         })).unwrap();
920 
921         assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
922 
923         // Let the timer tick passed the expiration...
924         rt
925             .block_on(Delay::new(Instant::now() + Duration::from_millis(200)))
926             .expect("rt block_on 200ms");
927 
928         assert!(pool.locked().idle.get(&key).is_none());
929     }
930 
931     #[test]
test_pool_checkout_task_unparked()932     fn test_pool_checkout_task_unparked() {
933         let pool = pool_no_timer();
934         let key = Arc::new("foo".to_string());
935         let pooled = pool.pooled(c(key.clone()), Uniq(41));
936 
937         let checkout = pool.checkout(key).join(future::lazy(move || {
938             // the checkout future will park first,
939             // and then this lazy future will be polled, which will insert
940             // the pooled back into the pool
941             //
942             // this test makes sure that doing so will unpark the checkout
943             drop(pooled);
944             Ok(())
945         })).map(|(entry, _)| entry);
946         assert_eq!(*checkout.wait().unwrap(), Uniq(41));
947     }
948 
949     #[test]
test_pool_checkout_drop_cleans_up_waiters()950     fn test_pool_checkout_drop_cleans_up_waiters() {
951         future::lazy(|| {
952             let pool = pool_no_timer::<Uniq<i32>>();
953             let key = Arc::new("localhost:12345".to_string());
954 
955             let mut checkout1 = pool.checkout(key.clone());
956             let mut checkout2 = pool.checkout(key.clone());
957 
958             // first poll needed to get into Pool's parked
959             checkout1.poll().unwrap();
960             assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
961             checkout2.poll().unwrap();
962             assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
963 
964             // on drop, clean up Pool
965             drop(checkout1);
966             assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
967 
968             drop(checkout2);
969             assert!(pool.locked().waiters.get(&key).is_none());
970 
971             ::futures::future::ok::<(), ()>(())
972         }).wait().unwrap();
973     }
974 
975     #[derive(Debug)]
976     struct CanClose {
977         val: i32,
978         closed: bool,
979     }
980 
981     impl Poolable for CanClose {
is_open(&self) -> bool982         fn is_open(&self) -> bool {
983             !self.closed
984         }
985 
reserve(self) -> Reservation<Self>986         fn reserve(self) -> Reservation<Self> {
987             Reservation::Unique(self)
988         }
989 
can_share(&self) -> bool990         fn can_share(&self) -> bool {
991             false
992         }
993     }
994 
995     #[test]
pooled_drop_if_closed_doesnt_reinsert()996     fn pooled_drop_if_closed_doesnt_reinsert() {
997         let pool = pool_no_timer();
998         let key = Arc::new("localhost:12345".to_string());
999         pool.pooled(c(key.clone()), CanClose {
1000             val: 57,
1001             closed: true,
1002         });
1003 
1004         assert!(!pool.locked().idle.contains_key(&key));
1005     }
1006 }
1007