1 use std::collections::{HashMap, HashSet, VecDeque};
2 use std::fmt;
3 use std::ops::{Deref, DerefMut};
4 use std::sync::{Arc, Mutex, Weak};
5 
6 #[cfg(not(feature = "runtime"))]
7 use std::time::{Duration, Instant};
8 
9 use futures_channel::oneshot;
10 #[cfg(feature = "runtime")]
11 use tokio::time::{Duration, Instant, Interval};
12 
13 use super::Ver;
14 use crate::common::{task, Exec, Future, Pin, Poll, Unpin};
15 
16 // FIXME: allow() required due to `impl Trait` leaking types to this lint
17 #[allow(missing_debug_implementations)]
18 pub(super) struct Pool<T> {
19     // If the pool is disabled, this is None.
20     inner: Option<Arc<Mutex<PoolInner<T>>>>,
21 }
22 
23 // Before using a pooled connection, make sure the sender is not dead.
24 //
25 // This is a trait to allow the `client::pool::tests` to work for `i32`.
26 //
27 // See https://github.com/hyperium/hyper/issues/1429
28 pub(super) trait Poolable: Unpin + Send + Sized + 'static {
is_open(&self) -> bool29     fn is_open(&self) -> bool;
30     /// Reserve this connection.
31     ///
32     /// Allows for HTTP/2 to return a shared reservation.
reserve(self) -> Reservation<Self>33     fn reserve(self) -> Reservation<Self>;
can_share(&self) -> bool34     fn can_share(&self) -> bool;
35 }
36 
37 /// When checking out a pooled connection, it might be that the connection
38 /// only supports a single reservation, or it might be usable for many.
39 ///
40 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
41 /// used for multiple requests.
42 // FIXME: allow() required due to `impl Trait` leaking types to this lint
43 #[allow(missing_debug_implementations)]
44 pub(super) enum Reservation<T> {
45     /// This connection could be used multiple times, the first one will be
46     /// reinserted into the `idle` pool, and the second will be given to
47     /// the `Checkout`.
48     Shared(T, T),
49     /// This connection requires unique access. It will be returned after
50     /// use is complete.
51     Unique(T),
52 }
53 
54 /// Simple type alias in case the key type needs to be adjusted.
55 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
56 
57 struct PoolInner<T> {
58     // A flag that a connection is being established, and the connection
59     // should be shared. This prevents making multiple HTTP/2 connections
60     // to the same host.
61     connecting: HashSet<Key>,
62     // These are internal Conns sitting in the event loop in the KeepAlive
63     // state, waiting to receive a new Request to send on the socket.
64     idle: HashMap<Key, Vec<Idle<T>>>,
65     max_idle_per_host: usize,
66     // These are outstanding Checkouts that are waiting for a socket to be
67     // able to send a Request one. This is used when "racing" for a new
68     // connection.
69     //
70     // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
71     // for the Pool to receive an idle Conn. When a Conn becomes idle,
72     // this list is checked for any parked Checkouts, and tries to notify
73     // them that the Conn could be used instead of waiting for a brand new
74     // connection.
75     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
76     // A oneshot channel is used to allow the interval to be notified when
77     // the Pool completely drops. That way, the interval can cancel immediately.
78     #[cfg(feature = "runtime")]
79     idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
80     #[cfg(feature = "runtime")]
81     exec: Exec,
82     timeout: Option<Duration>,
83 }
84 
85 // This is because `Weak::new()` *allocates* space for `T`, even if it
86 // doesn't need it!
87 struct WeakOpt<T>(Option<Weak<T>>);
88 
89 #[derive(Clone, Copy, Debug)]
90 pub(super) struct Config {
91     pub(super) idle_timeout: Option<Duration>,
92     pub(super) max_idle_per_host: usize,
93 }
94 
95 impl Config {
is_enabled(&self) -> bool96     pub(super) fn is_enabled(&self) -> bool {
97         self.max_idle_per_host > 0
98     }
99 }
100 
101 impl<T> Pool<T> {
new(config: Config, __exec: &Exec) -> Pool<T>102     pub fn new(config: Config, __exec: &Exec) -> Pool<T> {
103         let inner = if config.is_enabled() {
104             Some(Arc::new(Mutex::new(PoolInner {
105                 connecting: HashSet::new(),
106                 idle: HashMap::new(),
107                 #[cfg(feature = "runtime")]
108                 idle_interval_ref: None,
109                 max_idle_per_host: config.max_idle_per_host,
110                 waiters: HashMap::new(),
111                 #[cfg(feature = "runtime")]
112                 exec: __exec.clone(),
113                 timeout: config.idle_timeout,
114             })))
115         } else {
116             None
117         };
118 
119         Pool { inner }
120     }
121 
is_enabled(&self) -> bool122     fn is_enabled(&self) -> bool {
123         self.inner.is_some()
124     }
125 
126     #[cfg(test)]
no_timer(&self)127     pub(super) fn no_timer(&self) {
128         // Prevent an actual interval from being created for this pool...
129         #[cfg(feature = "runtime")]
130         {
131             let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
132             assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
133             let (tx, _) = oneshot::channel();
134             inner.idle_interval_ref = Some(tx);
135         }
136     }
137 }
138 
139 impl<T: Poolable> Pool<T> {
140     /// Returns a `Checkout` which is a future that resolves if an idle
141     /// connection becomes available.
checkout(&self, key: Key) -> Checkout<T>142     pub fn checkout(&self, key: Key) -> Checkout<T> {
143         Checkout {
144             key,
145             pool: self.clone(),
146             waiter: None,
147         }
148     }
149 
150     /// Ensure that there is only ever 1 connecting task for HTTP/2
151     /// connections. This does nothing for HTTP/1.
connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>152     pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
153         if ver == Ver::Http2 {
154             if let Some(ref enabled) = self.inner {
155                 let mut inner = enabled.lock().unwrap();
156                 return if inner.connecting.insert(key.clone()) {
157                     let connecting = Connecting {
158                         key: key.clone(),
159                         pool: WeakOpt::downgrade(enabled),
160                     };
161                     Some(connecting)
162                 } else {
163                     trace!("HTTP/2 connecting already in progress for {:?}", key);
164                     None
165                 };
166             }
167         }
168 
169         // else
170         Some(Connecting {
171             key: key.clone(),
172             // in HTTP/1's case, there is never a lock, so we don't
173             // need to do anything in Drop.
174             pool: WeakOpt::none(),
175         })
176     }
177 
178     #[cfg(test)]
locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>179     fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
180         self.inner.as_ref().expect("enabled").lock().expect("lock")
181     }
182 
183     /* Used in client/tests.rs...
184     #[cfg(feature = "runtime")]
185     #[cfg(test)]
186     pub(super) fn h1_key(&self, s: &str) -> Key {
187         Arc::new(s.to_string())
188     }
189 
190     #[cfg(feature = "runtime")]
191     #[cfg(test)]
192     pub(super) fn idle_count(&self, key: &Key) -> usize {
193         self
194             .locked()
195             .idle
196             .get(key)
197             .map(|list| list.len())
198             .unwrap_or(0)
199     }
200     */
201 
pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T>202     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
203         let (value, pool_ref) = if let Some(ref enabled) = self.inner {
204             match value.reserve() {
205                 Reservation::Shared(to_insert, to_return) => {
206                     let mut inner = enabled.lock().unwrap();
207                     inner.put(connecting.key.clone(), to_insert, enabled);
208                     // Do this here instead of Drop for Connecting because we
209                     // already have a lock, no need to lock the mutex twice.
210                     inner.connected(&connecting.key);
211                     // prevent the Drop of Connecting from repeating inner.connected()
212                     connecting.pool = WeakOpt::none();
213 
214                     // Shared reservations don't need a reference to the pool,
215                     // since the pool always keeps a copy.
216                     (to_return, WeakOpt::none())
217                 }
218                 Reservation::Unique(value) => {
219                     // Unique reservations must take a reference to the pool
220                     // since they hope to reinsert once the reservation is
221                     // completed
222                     (value, WeakOpt::downgrade(enabled))
223                 }
224             }
225         } else {
226             // If pool is not enabled, skip all the things...
227 
228             // The Connecting should have had no pool ref
229             debug_assert!(connecting.pool.upgrade().is_none());
230 
231             (value, WeakOpt::none())
232         };
233         Pooled {
234             key: connecting.key.clone(),
235             is_reused: false,
236             pool: pool_ref,
237             value: Some(value),
238         }
239     }
240 
reuse(&self, key: &Key, value: T) -> Pooled<T>241     fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
242         debug!("reuse idle connection for {:?}", key);
243         // TODO: unhack this
244         // In Pool::pooled(), which is used for inserting brand new connections,
245         // there's some code that adjusts the pool reference taken depending
246         // on if the Reservation can be shared or is unique. By the time
247         // reuse() is called, the reservation has already been made, and
248         // we just have the final value, without knowledge of if this is
249         // unique or shared. So, the hack is to just assume Ver::Http2 means
250         // shared... :(
251         let mut pool_ref = WeakOpt::none();
252         if !value.can_share() {
253             if let Some(ref enabled) = self.inner {
254                 pool_ref = WeakOpt::downgrade(enabled);
255             }
256         }
257 
258         Pooled {
259             is_reused: true,
260             key: key.clone(),
261             pool: pool_ref,
262             value: Some(value),
263         }
264     }
265 }
266 
267 /// Pop off this list, looking for a usable connection that hasn't expired.
268 struct IdlePopper<'a, T> {
269     key: &'a Key,
270     list: &'a mut Vec<Idle<T>>,
271 }
272 
273 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
pop(self, expiration: &Expiration) -> Option<Idle<T>>274     fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
275         while let Some(entry) = self.list.pop() {
276             // If the connection has been closed, or is older than our idle
277             // timeout, simply drop it and keep looking...
278             if !entry.value.is_open() {
279                 trace!("removing closed connection for {:?}", self.key);
280                 continue;
281             }
282             // TODO: Actually, since the `idle` list is pushed to the end always,
283             // that would imply that if *this* entry is expired, then anything
284             // "earlier" in the list would *have* to be expired also... Right?
285             //
286             // In that case, we could just break out of the loop and drop the
287             // whole list...
288             if expiration.expires(entry.idle_at) {
289                 trace!("removing expired connection for {:?}", self.key);
290                 continue;
291             }
292 
293             let value = match entry.value.reserve() {
294                 Reservation::Shared(to_reinsert, to_checkout) => {
295                     self.list.push(Idle {
296                         idle_at: Instant::now(),
297                         value: to_reinsert,
298                     });
299                     to_checkout
300                 }
301                 Reservation::Unique(unique) => unique,
302             };
303 
304             return Some(Idle {
305                 idle_at: entry.idle_at,
306                 value,
307             });
308         }
309 
310         None
311     }
312 }
313 
314 impl<T: Poolable> PoolInner<T> {
put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)315     fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
316         if value.can_share() && self.idle.contains_key(&key) {
317             trace!("put; existing idle HTTP/2 connection for {:?}", key);
318             return;
319         }
320         trace!("put; add idle connection for {:?}", key);
321         let mut remove_waiters = false;
322         let mut value = Some(value);
323         if let Some(waiters) = self.waiters.get_mut(&key) {
324             while let Some(tx) = waiters.pop_front() {
325                 if !tx.is_canceled() {
326                     let reserved = value.take().expect("value already sent");
327                     let reserved = match reserved.reserve() {
328                         Reservation::Shared(to_keep, to_send) => {
329                             value = Some(to_keep);
330                             to_send
331                         }
332                         Reservation::Unique(uniq) => uniq,
333                     };
334                     match tx.send(reserved) {
335                         Ok(()) => {
336                             if value.is_none() {
337                                 break;
338                             } else {
339                                 continue;
340                             }
341                         }
342                         Err(e) => {
343                             value = Some(e);
344                         }
345                     }
346                 }
347 
348                 trace!("put; removing canceled waiter for {:?}", key);
349             }
350             remove_waiters = waiters.is_empty();
351         }
352         if remove_waiters {
353             self.waiters.remove(&key);
354         }
355 
356         match value {
357             Some(value) => {
358                 // borrow-check scope...
359                 {
360                     let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
361                     if self.max_idle_per_host <= idle_list.len() {
362                         trace!("max idle per host for {:?}, dropping connection", key);
363                         return;
364                     }
365 
366                     debug!("pooling idle connection for {:?}", key);
367                     idle_list.push(Idle {
368                         value,
369                         idle_at: Instant::now(),
370                     });
371                 }
372 
373                 #[cfg(feature = "runtime")]
374                 {
375                     self.spawn_idle_interval(__pool_ref);
376                 }
377             }
378             None => trace!("put; found waiter for {:?}", key),
379         }
380     }
381 
382     /// A `Connecting` task is complete. Not necessarily successfully,
383     /// but the lock is going away, so clean up.
connected(&mut self, key: &Key)384     fn connected(&mut self, key: &Key) {
385         let existed = self.connecting.remove(key);
386         debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
387         // cancel any waiters. if there are any, it's because
388         // this Connecting task didn't complete successfully.
389         // those waiters would never receive a connection.
390         self.waiters.remove(key);
391     }
392 
393     #[cfg(feature = "runtime")]
spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)394     fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
395         let (dur, rx) = {
396             if self.idle_interval_ref.is_some() {
397                 return;
398             }
399 
400             if let Some(dur) = self.timeout {
401                 let (tx, rx) = oneshot::channel();
402                 self.idle_interval_ref = Some(tx);
403                 (dur, rx)
404             } else {
405                 return;
406             }
407         };
408 
409         let interval = IdleTask {
410             interval: tokio::time::interval(dur),
411             pool: WeakOpt::downgrade(pool_ref),
412             pool_drop_notifier: rx,
413         };
414 
415         self.exec.execute(interval);
416     }
417 }
418 
419 impl<T> PoolInner<T> {
420     /// Any `FutureResponse`s that were created will have made a `Checkout`,
421     /// and possibly inserted into the pool that it is waiting for an idle
422     /// connection. If a user ever dropped that future, we need to clean out
423     /// those parked senders.
clean_waiters(&mut self, key: &Key)424     fn clean_waiters(&mut self, key: &Key) {
425         let mut remove_waiters = false;
426         if let Some(waiters) = self.waiters.get_mut(key) {
427             waiters.retain(|tx| !tx.is_canceled());
428             remove_waiters = waiters.is_empty();
429         }
430         if remove_waiters {
431             self.waiters.remove(key);
432         }
433     }
434 }
435 
436 #[cfg(feature = "runtime")]
437 impl<T: Poolable> PoolInner<T> {
438     /// This should *only* be called by the IdleTask
clear_expired(&mut self)439     fn clear_expired(&mut self) {
440         let dur = self.timeout.expect("interval assumes timeout");
441 
442         let now = Instant::now();
443         //self.last_idle_check_at = now;
444 
445         self.idle.retain(|key, values| {
446             values.retain(|entry| {
447                 if !entry.value.is_open() {
448                     trace!("idle interval evicting closed for {:?}", key);
449                     return false;
450                 }
451                 if now - entry.idle_at > dur {
452                     trace!("idle interval evicting expired for {:?}", key);
453                     return false;
454                 }
455 
456                 // Otherwise, keep this value...
457                 true
458             });
459 
460             // returning false evicts this key/val
461             !values.is_empty()
462         });
463     }
464 }
465 
466 impl<T> Clone for Pool<T> {
clone(&self) -> Pool<T>467     fn clone(&self) -> Pool<T> {
468         Pool {
469             inner: self.inner.clone(),
470         }
471     }
472 }
473 
474 /// A wrapped poolable value that tries to reinsert to the Pool on Drop.
475 // Note: The bounds `T: Poolable` is needed for the Drop impl.
476 pub(super) struct Pooled<T: Poolable> {
477     value: Option<T>,
478     is_reused: bool,
479     key: Key,
480     pool: WeakOpt<Mutex<PoolInner<T>>>,
481 }
482 
483 impl<T: Poolable> Pooled<T> {
is_reused(&self) -> bool484     pub fn is_reused(&self) -> bool {
485         self.is_reused
486     }
487 
is_pool_enabled(&self) -> bool488     pub fn is_pool_enabled(&self) -> bool {
489         self.pool.0.is_some()
490     }
491 
as_ref(&self) -> &T492     fn as_ref(&self) -> &T {
493         self.value.as_ref().expect("not dropped")
494     }
495 
as_mut(&mut self) -> &mut T496     fn as_mut(&mut self) -> &mut T {
497         self.value.as_mut().expect("not dropped")
498     }
499 }
500 
501 impl<T: Poolable> Deref for Pooled<T> {
502     type Target = T;
deref(&self) -> &T503     fn deref(&self) -> &T {
504         self.as_ref()
505     }
506 }
507 
508 impl<T: Poolable> DerefMut for Pooled<T> {
deref_mut(&mut self) -> &mut T509     fn deref_mut(&mut self) -> &mut T {
510         self.as_mut()
511     }
512 }
513 
514 impl<T: Poolable> Drop for Pooled<T> {
drop(&mut self)515     fn drop(&mut self) {
516         if let Some(value) = self.value.take() {
517             if !value.is_open() {
518                 // If we *already* know the connection is done here,
519                 // it shouldn't be re-inserted back into the pool.
520                 return;
521             }
522 
523             if let Some(pool) = self.pool.upgrade() {
524                 if let Ok(mut inner) = pool.lock() {
525                     inner.put(self.key.clone(), value, &pool);
526                 }
527             } else if !value.can_share() {
528                 trace!("pool dropped, dropping pooled ({:?})", self.key);
529             }
530             // Ver::Http2 is already in the Pool (or dead), so we wouldn't
531             // have an actual reference to the Pool.
532         }
533     }
534 }
535 
536 impl<T: Poolable> fmt::Debug for Pooled<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result537     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538         f.debug_struct("Pooled").field("key", &self.key).finish()
539     }
540 }
541 
542 struct Idle<T> {
543     idle_at: Instant,
544     value: T,
545 }
546 
547 // FIXME: allow() required due to `impl Trait` leaking types to this lint
548 #[allow(missing_debug_implementations)]
549 pub(super) struct Checkout<T> {
550     key: Key,
551     pool: Pool<T>,
552     waiter: Option<oneshot::Receiver<T>>,
553 }
554 
555 impl<T: Poolable> Checkout<T> {
poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<Pooled<T>>>>556     fn poll_waiter(
557         &mut self,
558         cx: &mut task::Context<'_>,
559     ) -> Poll<Option<crate::Result<Pooled<T>>>> {
560         static CANCELED: &str = "pool checkout failed";
561         if let Some(mut rx) = self.waiter.take() {
562             match Pin::new(&mut rx).poll(cx) {
563                 Poll::Ready(Ok(value)) => {
564                     if value.is_open() {
565                         Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
566                     } else {
567                         Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
568                     }
569                 }
570                 Poll::Pending => {
571                     self.waiter = Some(rx);
572                     Poll::Pending
573                 }
574                 Poll::Ready(Err(_canceled)) => {
575                     Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
576                 }
577             }
578         } else {
579             Poll::Ready(None)
580         }
581     }
582 
checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>>583     fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
584         let entry = {
585             let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
586             let expiration = Expiration::new(inner.timeout);
587             let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
588                 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
589                 // A block to end the mutable borrow on list,
590                 // so the map below can check is_empty()
591                 {
592                     let popper = IdlePopper {
593                         key: &self.key,
594                         list,
595                     };
596                     popper.pop(&expiration)
597                 }
598                 .map(|e| (e, list.is_empty()))
599             });
600 
601             let (entry, empty) = if let Some((e, empty)) = maybe_entry {
602                 (Some(e), empty)
603             } else {
604                 // No entry found means nuke the list for sure.
605                 (None, true)
606             };
607             if empty {
608                 //TODO: This could be done with the HashMap::entry API instead.
609                 inner.idle.remove(&self.key);
610             }
611 
612             if entry.is_none() && self.waiter.is_none() {
613                 let (tx, mut rx) = oneshot::channel();
614                 trace!("checkout waiting for idle connection: {:?}", self.key);
615                 inner
616                     .waiters
617                     .entry(self.key.clone())
618                     .or_insert_with(VecDeque::new)
619                     .push_back(tx);
620 
621                 // register the waker with this oneshot
622                 assert!(Pin::new(&mut rx).poll(cx).is_pending());
623                 self.waiter = Some(rx);
624             }
625 
626             entry
627         };
628 
629         entry.map(|e| self.pool.reuse(&self.key, e.value))
630     }
631 }
632 
633 impl<T: Poolable> Future for Checkout<T> {
634     type Output = crate::Result<Pooled<T>>;
635 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>636     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
637         if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
638             return Poll::Ready(Ok(pooled));
639         }
640 
641         if let Some(pooled) = self.checkout(cx) {
642             Poll::Ready(Ok(pooled))
643         } else if !self.pool.is_enabled() {
644             Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
645         } else {
646             // There's a new waiter, already registered in self.checkout()
647             debug_assert!(self.waiter.is_some());
648             Poll::Pending
649         }
650     }
651 }
652 
653 impl<T> Drop for Checkout<T> {
drop(&mut self)654     fn drop(&mut self) {
655         if self.waiter.take().is_some() {
656             trace!("checkout dropped for {:?}", self.key);
657             if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
658                 inner.clean_waiters(&self.key);
659             }
660         }
661     }
662 }
663 
664 // FIXME: allow() required due to `impl Trait` leaking types to this lint
665 #[allow(missing_debug_implementations)]
666 pub(super) struct Connecting<T: Poolable> {
667     key: Key,
668     pool: WeakOpt<Mutex<PoolInner<T>>>,
669 }
670 
671 impl<T: Poolable> Connecting<T> {
alpn_h2(self, pool: &Pool<T>) -> Option<Self>672     pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
673         debug_assert!(
674             self.pool.0.is_none(),
675             "Connecting::alpn_h2 but already Http2"
676         );
677 
678         pool.connecting(&self.key, Ver::Http2)
679     }
680 }
681 
682 impl<T: Poolable> Drop for Connecting<T> {
drop(&mut self)683     fn drop(&mut self) {
684         if let Some(pool) = self.pool.upgrade() {
685             // No need to panic on drop, that could abort!
686             if let Ok(mut inner) = pool.lock() {
687                 inner.connected(&self.key);
688             }
689         }
690     }
691 }
692 
693 struct Expiration(Option<Duration>);
694 
695 impl Expiration {
new(dur: Option<Duration>) -> Expiration696     fn new(dur: Option<Duration>) -> Expiration {
697         Expiration(dur)
698     }
699 
expires(&self, instant: Instant) -> bool700     fn expires(&self, instant: Instant) -> bool {
701         match self.0 {
702             Some(timeout) => instant.elapsed() > timeout,
703             None => false,
704         }
705     }
706 }
707 
708 #[cfg(feature = "runtime")]
709 struct IdleTask<T> {
710     interval: Interval,
711     pool: WeakOpt<Mutex<PoolInner<T>>>,
712     // This allows the IdleTask to be notified as soon as the entire
713     // Pool is fully dropped, and shutdown. This channel is never sent on,
714     // but Err(Canceled) will be received when the Pool is dropped.
715     pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
716 }
717 
718 #[cfg(feature = "runtime")]
719 impl<T: Poolable + 'static> Future for IdleTask<T> {
720     type Output = ();
721 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>722     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
723         loop {
724             match Pin::new(&mut self.pool_drop_notifier).poll(cx) {
725                 Poll::Ready(Ok(n)) => match n {},
726                 Poll::Pending => (),
727                 Poll::Ready(Err(_canceled)) => {
728                     trace!("pool closed, canceling idle interval");
729                     return Poll::Ready(());
730                 }
731             }
732 
733             ready!(self.interval.poll_tick(cx));
734 
735             if let Some(inner) = self.pool.upgrade() {
736                 if let Ok(mut inner) = inner.lock() {
737                     trace!("idle interval checking for expired");
738                     inner.clear_expired();
739                     continue;
740                 }
741             }
742             return Poll::Ready(());
743         }
744     }
745 }
746 
747 impl<T> WeakOpt<T> {
none() -> Self748     fn none() -> Self {
749         WeakOpt(None)
750     }
751 
downgrade(arc: &Arc<T>) -> Self752     fn downgrade(arc: &Arc<T>) -> Self {
753         WeakOpt(Some(Arc::downgrade(arc)))
754     }
755 
upgrade(&self) -> Option<Arc<T>>756     fn upgrade(&self) -> Option<Arc<T>> {
757         self.0.as_ref().and_then(Weak::upgrade)
758     }
759 }
760 
761 #[cfg(test)]
762 mod tests {
763     use std::task::Poll;
764     use std::time::Duration;
765 
766     use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
767     use crate::common::{task, Exec, Future, Pin};
768 
769     /// Test unique reservations.
770     #[derive(Debug, PartialEq, Eq)]
771     struct Uniq<T>(T);
772 
773     impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
is_open(&self) -> bool774         fn is_open(&self) -> bool {
775             true
776         }
777 
reserve(self) -> Reservation<Self>778         fn reserve(self) -> Reservation<Self> {
779             Reservation::Unique(self)
780         }
781 
can_share(&self) -> bool782         fn can_share(&self) -> bool {
783             false
784         }
785     }
786 
c<T: Poolable>(key: Key) -> Connecting<T>787     fn c<T: Poolable>(key: Key) -> Connecting<T> {
788         Connecting {
789             key,
790             pool: WeakOpt::none(),
791         }
792     }
793 
host_key(s: &str) -> Key794     fn host_key(s: &str) -> Key {
795         (http::uri::Scheme::HTTP, s.parse().expect("host key"))
796     }
797 
pool_no_timer<T>() -> Pool<T>798     fn pool_no_timer<T>() -> Pool<T> {
799         pool_max_idle_no_timer(::std::usize::MAX)
800     }
801 
pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>802     fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
803         let pool = Pool::new(
804             super::Config {
805                 idle_timeout: Some(Duration::from_millis(100)),
806                 max_idle_per_host: max_idle,
807             },
808             &Exec::Default,
809         );
810         pool.no_timer();
811         pool
812     }
813 
814     #[tokio::test]
test_pool_checkout_smoke()815     async fn test_pool_checkout_smoke() {
816         let pool = pool_no_timer();
817         let key = host_key("foo");
818         let pooled = pool.pooled(c(key.clone()), Uniq(41));
819 
820         drop(pooled);
821 
822         match pool.checkout(key).await {
823             Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
824             Err(_) => panic!("not ready"),
825         };
826     }
827 
828     /// Helper to check if the future is ready after polling once.
829     struct PollOnce<'a, F>(&'a mut F);
830 
831     impl<F, T, U> Future for PollOnce<'_, F>
832     where
833         F: Future<Output = Result<T, U>> + Unpin,
834     {
835         type Output = Option<()>;
836 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>837         fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
838             match Pin::new(&mut self.0).poll(cx) {
839                 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
840                 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
841                 Poll::Pending => Poll::Ready(None),
842             }
843         }
844     }
845 
846     #[tokio::test]
test_pool_checkout_returns_none_if_expired()847     async fn test_pool_checkout_returns_none_if_expired() {
848         let pool = pool_no_timer();
849         let key = host_key("foo");
850         let pooled = pool.pooled(c(key.clone()), Uniq(41));
851 
852         drop(pooled);
853         tokio::time::delay_for(pool.locked().timeout.unwrap()).await;
854         let mut checkout = pool.checkout(key);
855         let poll_once = PollOnce(&mut checkout);
856         let is_not_ready = poll_once.await.is_none();
857         assert!(is_not_ready);
858     }
859 
860     #[cfg(feature = "runtime")]
861     #[tokio::test]
test_pool_checkout_removes_expired()862     async fn test_pool_checkout_removes_expired() {
863         let pool = pool_no_timer();
864         let key = host_key("foo");
865 
866         pool.pooled(c(key.clone()), Uniq(41));
867         pool.pooled(c(key.clone()), Uniq(5));
868         pool.pooled(c(key.clone()), Uniq(99));
869 
870         assert_eq!(
871             pool.locked().idle.get(&key).map(|entries| entries.len()),
872             Some(3)
873         );
874         tokio::time::delay_for(pool.locked().timeout.unwrap()).await;
875 
876         let mut checkout = pool.checkout(key.clone());
877         let poll_once = PollOnce(&mut checkout);
878         // checkout.await should clean out the expired
879         poll_once.await;
880         assert!(pool.locked().idle.get(&key).is_none());
881     }
882 
883     #[test]
test_pool_max_idle_per_host()884     fn test_pool_max_idle_per_host() {
885         let pool = pool_max_idle_no_timer(2);
886         let key = host_key("foo");
887 
888         pool.pooled(c(key.clone()), Uniq(41));
889         pool.pooled(c(key.clone()), Uniq(5));
890         pool.pooled(c(key.clone()), Uniq(99));
891 
892         // pooled and dropped 3, max_idle should only allow 2
893         assert_eq!(
894             pool.locked().idle.get(&key).map(|entries| entries.len()),
895             Some(2)
896         );
897     }
898 
899     #[cfg(feature = "runtime")]
900     #[tokio::test]
test_pool_timer_removes_expired()901     async fn test_pool_timer_removes_expired() {
902         let _ = pretty_env_logger::try_init();
903         tokio::time::pause();
904 
905         let pool = Pool::new(
906             super::Config {
907                 idle_timeout: Some(Duration::from_millis(10)),
908                 max_idle_per_host: std::usize::MAX,
909             },
910             &Exec::Default,
911         );
912 
913         let key = host_key("foo");
914 
915         pool.pooled(c(key.clone()), Uniq(41));
916         pool.pooled(c(key.clone()), Uniq(5));
917         pool.pooled(c(key.clone()), Uniq(99));
918 
919         assert_eq!(
920             pool.locked().idle.get(&key).map(|entries| entries.len()),
921             Some(3)
922         );
923 
924         // Let the timer tick passed the expiration...
925         tokio::time::advance(Duration::from_millis(30)).await;
926         // Yield so the Interval can reap...
927         tokio::task::yield_now().await;
928 
929         assert!(pool.locked().idle.get(&key).is_none());
930     }
931 
932     #[tokio::test]
test_pool_checkout_task_unparked()933     async fn test_pool_checkout_task_unparked() {
934         use futures_util::future::join;
935         use futures_util::FutureExt;
936 
937         let pool = pool_no_timer();
938         let key = host_key("foo");
939         let pooled = pool.pooled(c(key.clone()), Uniq(41));
940 
941         let checkout = join(pool.checkout(key), async {
942             // the checkout future will park first,
943             // and then this lazy future will be polled, which will insert
944             // the pooled back into the pool
945             //
946             // this test makes sure that doing so will unpark the checkout
947             drop(pooled);
948         })
949         .map(|(entry, _)| entry);
950 
951         assert_eq!(*checkout.await.unwrap(), Uniq(41));
952     }
953 
954     #[tokio::test]
test_pool_checkout_drop_cleans_up_waiters()955     async fn test_pool_checkout_drop_cleans_up_waiters() {
956         let pool = pool_no_timer::<Uniq<i32>>();
957         let key = host_key("foo");
958 
959         let mut checkout1 = pool.checkout(key.clone());
960         let mut checkout2 = pool.checkout(key.clone());
961 
962         let poll_once1 = PollOnce(&mut checkout1);
963         let poll_once2 = PollOnce(&mut checkout2);
964 
965         // first poll needed to get into Pool's parked
966         poll_once1.await;
967         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
968         poll_once2.await;
969         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
970 
971         // on drop, clean up Pool
972         drop(checkout1);
973         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
974 
975         drop(checkout2);
976         assert!(pool.locked().waiters.get(&key).is_none());
977     }
978 
979     #[derive(Debug)]
980     struct CanClose {
981         val: i32,
982         closed: bool,
983     }
984 
985     impl Poolable for CanClose {
is_open(&self) -> bool986         fn is_open(&self) -> bool {
987             !self.closed
988         }
989 
reserve(self) -> Reservation<Self>990         fn reserve(self) -> Reservation<Self> {
991             Reservation::Unique(self)
992         }
993 
can_share(&self) -> bool994         fn can_share(&self) -> bool {
995             false
996         }
997     }
998 
999     #[test]
pooled_drop_if_closed_doesnt_reinsert()1000     fn pooled_drop_if_closed_doesnt_reinsert() {
1001         let pool = pool_no_timer();
1002         let key = host_key("foo");
1003         pool.pooled(
1004             c(key.clone()),
1005             CanClose {
1006                 val: 57,
1007                 closed: true,
1008             },
1009         );
1010 
1011         assert!(!pool.locked().idle.contains_key(&key));
1012     }
1013 }
1014