1 use std::collections::{HashMap, HashSet, VecDeque};
2 use std::error::Error as StdError;
3 use std::fmt;
4 use std::ops::{Deref, DerefMut};
5 use std::sync::{Arc, Mutex, Weak};
6 
7 #[cfg(not(feature = "runtime"))]
8 use std::time::{Duration, Instant};
9 
10 use futures_channel::oneshot;
11 #[cfg(feature = "runtime")]
12 use tokio::time::{Duration, Instant, Interval};
13 use tracing::{debug, trace};
14 
15 use super::client::Ver;
16 use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
17 
18 // FIXME: allow() required due to `impl Trait` leaking types to this lint
19 #[allow(missing_debug_implementations)]
20 pub(super) struct Pool<T> {
21     // If the pool is disabled, this is None.
22     inner: Option<Arc<Mutex<PoolInner<T>>>>,
23 }
24 
25 // Before using a pooled connection, make sure the sender is not dead.
26 //
27 // This is a trait to allow the `client::pool::tests` to work for `i32`.
28 //
29 // See https://github.com/hyperium/hyper/issues/1429
30 pub(super) trait Poolable: Unpin + Send + Sized + 'static {
is_open(&self) -> bool31     fn is_open(&self) -> bool;
32     /// Reserve this connection.
33     ///
34     /// Allows for HTTP/2 to return a shared reservation.
reserve(self) -> Reservation<Self>35     fn reserve(self) -> Reservation<Self>;
can_share(&self) -> bool36     fn can_share(&self) -> bool;
37 }
38 
39 /// When checking out a pooled connection, it might be that the connection
40 /// only supports a single reservation, or it might be usable for many.
41 ///
42 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
43 /// used for multiple requests.
44 // FIXME: allow() required due to `impl Trait` leaking types to this lint
45 #[allow(missing_debug_implementations)]
46 pub(super) enum Reservation<T> {
47     /// This connection could be used multiple times, the first one will be
48     /// reinserted into the `idle` pool, and the second will be given to
49     /// the `Checkout`.
50     #[cfg(feature = "http2")]
51     Shared(T, T),
52     /// This connection requires unique access. It will be returned after
53     /// use is complete.
54     Unique(T),
55 }
56 
57 /// Simple type alias in case the key type needs to be adjusted.
58 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
59 
60 struct PoolInner<T> {
61     // A flag that a connection is being established, and the connection
62     // should be shared. This prevents making multiple HTTP/2 connections
63     // to the same host.
64     connecting: HashSet<Key>,
65     // These are internal Conns sitting in the event loop in the KeepAlive
66     // state, waiting to receive a new Request to send on the socket.
67     idle: HashMap<Key, Vec<Idle<T>>>,
68     max_idle_per_host: usize,
69     // These are outstanding Checkouts that are waiting for a socket to be
70     // able to send a Request one. This is used when "racing" for a new
71     // connection.
72     //
73     // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
74     // for the Pool to receive an idle Conn. When a Conn becomes idle,
75     // this list is checked for any parked Checkouts, and tries to notify
76     // them that the Conn could be used instead of waiting for a brand new
77     // connection.
78     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
79     // A oneshot channel is used to allow the interval to be notified when
80     // the Pool completely drops. That way, the interval can cancel immediately.
81     #[cfg(feature = "runtime")]
82     idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
83     #[cfg(feature = "runtime")]
84     exec: Exec,
85     timeout: Option<Duration>,
86 }
87 
88 // This is because `Weak::new()` *allocates* space for `T`, even if it
89 // doesn't need it!
90 struct WeakOpt<T>(Option<Weak<T>>);
91 
92 #[derive(Clone, Copy, Debug)]
93 pub(super) struct Config {
94     pub(super) idle_timeout: Option<Duration>,
95     pub(super) max_idle_per_host: usize,
96 }
97 
98 impl Config {
is_enabled(&self) -> bool99     pub(super) fn is_enabled(&self) -> bool {
100         self.max_idle_per_host > 0
101     }
102 }
103 
104 impl<T> Pool<T> {
new(config: Config, __exec: &Exec) -> Pool<T>105     pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
106         let inner = if config.is_enabled() {
107             Some(Arc::new(Mutex::new(PoolInner {
108                 connecting: HashSet::new(),
109                 idle: HashMap::new(),
110                 #[cfg(feature = "runtime")]
111                 idle_interval_ref: None,
112                 max_idle_per_host: config.max_idle_per_host,
113                 waiters: HashMap::new(),
114                 #[cfg(feature = "runtime")]
115                 exec: __exec.clone(),
116                 timeout: config.idle_timeout,
117             })))
118         } else {
119             None
120         };
121 
122         Pool { inner }
123     }
124 
is_enabled(&self) -> bool125     fn is_enabled(&self) -> bool {
126         self.inner.is_some()
127     }
128 
129     #[cfg(test)]
no_timer(&self)130     pub(super) fn no_timer(&self) {
131         // Prevent an actual interval from being created for this pool...
132         #[cfg(feature = "runtime")]
133         {
134             let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
135             assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
136             let (tx, _) = oneshot::channel();
137             inner.idle_interval_ref = Some(tx);
138         }
139     }
140 }
141 
142 impl<T: Poolable> Pool<T> {
143     /// Returns a `Checkout` which is a future that resolves if an idle
144     /// connection becomes available.
checkout(&self, key: Key) -> Checkout<T>145     pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
146         Checkout {
147             key,
148             pool: self.clone(),
149             waiter: None,
150         }
151     }
152 
153     /// Ensure that there is only ever 1 connecting task for HTTP/2
154     /// connections. This does nothing for HTTP/1.
connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>155     pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
156         if ver == Ver::Http2 {
157             if let Some(ref enabled) = self.inner {
158                 let mut inner = enabled.lock().unwrap();
159                 return if inner.connecting.insert(key.clone()) {
160                     let connecting = Connecting {
161                         key: key.clone(),
162                         pool: WeakOpt::downgrade(enabled),
163                     };
164                     Some(connecting)
165                 } else {
166                     trace!("HTTP/2 connecting already in progress for {:?}", key);
167                     None
168                 };
169             }
170         }
171 
172         // else
173         Some(Connecting {
174             key: key.clone(),
175             // in HTTP/1's case, there is never a lock, so we don't
176             // need to do anything in Drop.
177             pool: WeakOpt::none(),
178         })
179     }
180 
181     #[cfg(test)]
locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>182     fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
183         self.inner.as_ref().expect("enabled").lock().expect("lock")
184     }
185 
186     /* Used in client/tests.rs...
187     #[cfg(feature = "runtime")]
188     #[cfg(test)]
189     pub(super) fn h1_key(&self, s: &str) -> Key {
190         Arc::new(s.to_string())
191     }
192 
193     #[cfg(feature = "runtime")]
194     #[cfg(test)]
195     pub(super) fn idle_count(&self, key: &Key) -> usize {
196         self
197             .locked()
198             .idle
199             .get(key)
200             .map(|list| list.len())
201             .unwrap_or(0)
202     }
203     */
204 
pooled( &self, #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, value: T, ) -> Pooled<T>205     pub(super) fn pooled(
206         &self,
207         #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
208         value: T,
209     ) -> Pooled<T> {
210         let (value, pool_ref) = if let Some(ref enabled) = self.inner {
211             match value.reserve() {
212                 #[cfg(feature = "http2")]
213                 Reservation::Shared(to_insert, to_return) => {
214                     let mut inner = enabled.lock().unwrap();
215                     inner.put(connecting.key.clone(), to_insert, enabled);
216                     // Do this here instead of Drop for Connecting because we
217                     // already have a lock, no need to lock the mutex twice.
218                     inner.connected(&connecting.key);
219                     // prevent the Drop of Connecting from repeating inner.connected()
220                     connecting.pool = WeakOpt::none();
221 
222                     // Shared reservations don't need a reference to the pool,
223                     // since the pool always keeps a copy.
224                     (to_return, WeakOpt::none())
225                 }
226                 Reservation::Unique(value) => {
227                     // Unique reservations must take a reference to the pool
228                     // since they hope to reinsert once the reservation is
229                     // completed
230                     (value, WeakOpt::downgrade(enabled))
231                 }
232             }
233         } else {
234             // If pool is not enabled, skip all the things...
235 
236             // The Connecting should have had no pool ref
237             debug_assert!(connecting.pool.upgrade().is_none());
238 
239             (value, WeakOpt::none())
240         };
241         Pooled {
242             key: connecting.key.clone(),
243             is_reused: false,
244             pool: pool_ref,
245             value: Some(value),
246         }
247     }
248 
reuse(&self, key: &Key, value: T) -> Pooled<T>249     fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
250         debug!("reuse idle connection for {:?}", key);
251         // TODO: unhack this
252         // In Pool::pooled(), which is used for inserting brand new connections,
253         // there's some code that adjusts the pool reference taken depending
254         // on if the Reservation can be shared or is unique. By the time
255         // reuse() is called, the reservation has already been made, and
256         // we just have the final value, without knowledge of if this is
257         // unique or shared. So, the hack is to just assume Ver::Http2 means
258         // shared... :(
259         let mut pool_ref = WeakOpt::none();
260         if !value.can_share() {
261             if let Some(ref enabled) = self.inner {
262                 pool_ref = WeakOpt::downgrade(enabled);
263             }
264         }
265 
266         Pooled {
267             is_reused: true,
268             key: key.clone(),
269             pool: pool_ref,
270             value: Some(value),
271         }
272     }
273 }
274 
275 /// Pop off this list, looking for a usable connection that hasn't expired.
276 struct IdlePopper<'a, T> {
277     key: &'a Key,
278     list: &'a mut Vec<Idle<T>>,
279 }
280 
281 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
pop(self, expiration: &Expiration) -> Option<Idle<T>>282     fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
283         while let Some(entry) = self.list.pop() {
284             // If the connection has been closed, or is older than our idle
285             // timeout, simply drop it and keep looking...
286             if !entry.value.is_open() {
287                 trace!("removing closed connection for {:?}", self.key);
288                 continue;
289             }
290             // TODO: Actually, since the `idle` list is pushed to the end always,
291             // that would imply that if *this* entry is expired, then anything
292             // "earlier" in the list would *have* to be expired also... Right?
293             //
294             // In that case, we could just break out of the loop and drop the
295             // whole list...
296             if expiration.expires(entry.idle_at) {
297                 trace!("removing expired connection for {:?}", self.key);
298                 continue;
299             }
300 
301             let value = match entry.value.reserve() {
302                 #[cfg(feature = "http2")]
303                 Reservation::Shared(to_reinsert, to_checkout) => {
304                     self.list.push(Idle {
305                         idle_at: Instant::now(),
306                         value: to_reinsert,
307                     });
308                     to_checkout
309                 }
310                 Reservation::Unique(unique) => unique,
311             };
312 
313             return Some(Idle {
314                 idle_at: entry.idle_at,
315                 value,
316             });
317         }
318 
319         None
320     }
321 }
322 
323 impl<T: Poolable> PoolInner<T> {
put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)324     fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
325         if value.can_share() && self.idle.contains_key(&key) {
326             trace!("put; existing idle HTTP/2 connection for {:?}", key);
327             return;
328         }
329         trace!("put; add idle connection for {:?}", key);
330         let mut remove_waiters = false;
331         let mut value = Some(value);
332         if let Some(waiters) = self.waiters.get_mut(&key) {
333             while let Some(tx) = waiters.pop_front() {
334                 if !tx.is_canceled() {
335                     let reserved = value.take().expect("value already sent");
336                     let reserved = match reserved.reserve() {
337                         #[cfg(feature = "http2")]
338                         Reservation::Shared(to_keep, to_send) => {
339                             value = Some(to_keep);
340                             to_send
341                         }
342                         Reservation::Unique(uniq) => uniq,
343                     };
344                     match tx.send(reserved) {
345                         Ok(()) => {
346                             if value.is_none() {
347                                 break;
348                             } else {
349                                 continue;
350                             }
351                         }
352                         Err(e) => {
353                             value = Some(e);
354                         }
355                     }
356                 }
357 
358                 trace!("put; removing canceled waiter for {:?}", key);
359             }
360             remove_waiters = waiters.is_empty();
361         }
362         if remove_waiters {
363             self.waiters.remove(&key);
364         }
365 
366         match value {
367             Some(value) => {
368                 // borrow-check scope...
369                 {
370                     let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
371                     if self.max_idle_per_host <= idle_list.len() {
372                         trace!("max idle per host for {:?}, dropping connection", key);
373                         return;
374                     }
375 
376                     debug!("pooling idle connection for {:?}", key);
377                     idle_list.push(Idle {
378                         value,
379                         idle_at: Instant::now(),
380                     });
381                 }
382 
383                 #[cfg(feature = "runtime")]
384                 {
385                     self.spawn_idle_interval(__pool_ref);
386                 }
387             }
388             None => trace!("put; found waiter for {:?}", key),
389         }
390     }
391 
392     /// A `Connecting` task is complete. Not necessarily successfully,
393     /// but the lock is going away, so clean up.
connected(&mut self, key: &Key)394     fn connected(&mut self, key: &Key) {
395         let existed = self.connecting.remove(key);
396         debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
397         // cancel any waiters. if there are any, it's because
398         // this Connecting task didn't complete successfully.
399         // those waiters would never receive a connection.
400         self.waiters.remove(key);
401     }
402 
403     #[cfg(feature = "runtime")]
spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)404     fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
405         let (dur, rx) = {
406             if self.idle_interval_ref.is_some() {
407                 return;
408             }
409 
410             if let Some(dur) = self.timeout {
411                 let (tx, rx) = oneshot::channel();
412                 self.idle_interval_ref = Some(tx);
413                 (dur, rx)
414             } else {
415                 return;
416             }
417         };
418 
419         let interval = IdleTask {
420             interval: tokio::time::interval(dur),
421             pool: WeakOpt::downgrade(pool_ref),
422             pool_drop_notifier: rx,
423         };
424 
425         self.exec.execute(interval);
426     }
427 }
428 
429 impl<T> PoolInner<T> {
430     /// Any `FutureResponse`s that were created will have made a `Checkout`,
431     /// and possibly inserted into the pool that it is waiting for an idle
432     /// connection. If a user ever dropped that future, we need to clean out
433     /// those parked senders.
clean_waiters(&mut self, key: &Key)434     fn clean_waiters(&mut self, key: &Key) {
435         let mut remove_waiters = false;
436         if let Some(waiters) = self.waiters.get_mut(key) {
437             waiters.retain(|tx| !tx.is_canceled());
438             remove_waiters = waiters.is_empty();
439         }
440         if remove_waiters {
441             self.waiters.remove(key);
442         }
443     }
444 }
445 
446 #[cfg(feature = "runtime")]
447 impl<T: Poolable> PoolInner<T> {
448     /// This should *only* be called by the IdleTask
clear_expired(&mut self)449     fn clear_expired(&mut self) {
450         let dur = self.timeout.expect("interval assumes timeout");
451 
452         let now = Instant::now();
453         //self.last_idle_check_at = now;
454 
455         self.idle.retain(|key, values| {
456             values.retain(|entry| {
457                 if !entry.value.is_open() {
458                     trace!("idle interval evicting closed for {:?}", key);
459                     return false;
460                 }
461                 if now - entry.idle_at > dur {
462                     trace!("idle interval evicting expired for {:?}", key);
463                     return false;
464                 }
465 
466                 // Otherwise, keep this value...
467                 true
468             });
469 
470             // returning false evicts this key/val
471             !values.is_empty()
472         });
473     }
474 }
475 
476 impl<T> Clone for Pool<T> {
clone(&self) -> Pool<T>477     fn clone(&self) -> Pool<T> {
478         Pool {
479             inner: self.inner.clone(),
480         }
481     }
482 }
483 
484 /// A wrapped poolable value that tries to reinsert to the Pool on Drop.
485 // Note: The bounds `T: Poolable` is needed for the Drop impl.
486 pub(super) struct Pooled<T: Poolable> {
487     value: Option<T>,
488     is_reused: bool,
489     key: Key,
490     pool: WeakOpt<Mutex<PoolInner<T>>>,
491 }
492 
493 impl<T: Poolable> Pooled<T> {
is_reused(&self) -> bool494     pub(super) fn is_reused(&self) -> bool {
495         self.is_reused
496     }
497 
is_pool_enabled(&self) -> bool498     pub(super) fn is_pool_enabled(&self) -> bool {
499         self.pool.0.is_some()
500     }
501 
as_ref(&self) -> &T502     fn as_ref(&self) -> &T {
503         self.value.as_ref().expect("not dropped")
504     }
505 
as_mut(&mut self) -> &mut T506     fn as_mut(&mut self) -> &mut T {
507         self.value.as_mut().expect("not dropped")
508     }
509 }
510 
511 impl<T: Poolable> Deref for Pooled<T> {
512     type Target = T;
deref(&self) -> &T513     fn deref(&self) -> &T {
514         self.as_ref()
515     }
516 }
517 
518 impl<T: Poolable> DerefMut for Pooled<T> {
deref_mut(&mut self) -> &mut T519     fn deref_mut(&mut self) -> &mut T {
520         self.as_mut()
521     }
522 }
523 
524 impl<T: Poolable> Drop for Pooled<T> {
drop(&mut self)525     fn drop(&mut self) {
526         if let Some(value) = self.value.take() {
527             if !value.is_open() {
528                 // If we *already* know the connection is done here,
529                 // it shouldn't be re-inserted back into the pool.
530                 return;
531             }
532 
533             if let Some(pool) = self.pool.upgrade() {
534                 if let Ok(mut inner) = pool.lock() {
535                     inner.put(self.key.clone(), value, &pool);
536                 }
537             } else if !value.can_share() {
538                 trace!("pool dropped, dropping pooled ({:?})", self.key);
539             }
540             // Ver::Http2 is already in the Pool (or dead), so we wouldn't
541             // have an actual reference to the Pool.
542         }
543     }
544 }
545 
546 impl<T: Poolable> fmt::Debug for Pooled<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result547     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548         f.debug_struct("Pooled").field("key", &self.key).finish()
549     }
550 }
551 
552 struct Idle<T> {
553     idle_at: Instant,
554     value: T,
555 }
556 
557 // FIXME: allow() required due to `impl Trait` leaking types to this lint
558 #[allow(missing_debug_implementations)]
559 pub(super) struct Checkout<T> {
560     key: Key,
561     pool: Pool<T>,
562     waiter: Option<oneshot::Receiver<T>>,
563 }
564 
565 #[derive(Debug)]
566 pub(super) struct CheckoutIsClosedError;
567 
568 impl StdError for CheckoutIsClosedError {}
569 
570 impl fmt::Display for CheckoutIsClosedError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result571     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
572         f.write_str("checked out connection was closed")
573     }
574 }
575 
576 impl<T: Poolable> Checkout<T> {
poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<Pooled<T>>>>577     fn poll_waiter(
578         &mut self,
579         cx: &mut task::Context<'_>,
580     ) -> Poll<Option<crate::Result<Pooled<T>>>> {
581         if let Some(mut rx) = self.waiter.take() {
582             match Pin::new(&mut rx).poll(cx) {
583                 Poll::Ready(Ok(value)) => {
584                     if value.is_open() {
585                         Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
586                     } else {
587                         Poll::Ready(Some(Err(
588                             crate::Error::new_canceled().with(CheckoutIsClosedError)
589                         )))
590                     }
591                 }
592                 Poll::Pending => {
593                     self.waiter = Some(rx);
594                     Poll::Pending
595                 }
596                 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
597                     crate::Error::new_canceled().with("request has been canceled")
598                 ))),
599             }
600         } else {
601             Poll::Ready(None)
602         }
603     }
604 
checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>>605     fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
606         let entry = {
607             let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
608             let expiration = Expiration::new(inner.timeout);
609             let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
610                 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
611                 // A block to end the mutable borrow on list,
612                 // so the map below can check is_empty()
613                 {
614                     let popper = IdlePopper {
615                         key: &self.key,
616                         list,
617                     };
618                     popper.pop(&expiration)
619                 }
620                 .map(|e| (e, list.is_empty()))
621             });
622 
623             let (entry, empty) = if let Some((e, empty)) = maybe_entry {
624                 (Some(e), empty)
625             } else {
626                 // No entry found means nuke the list for sure.
627                 (None, true)
628             };
629             if empty {
630                 //TODO: This could be done with the HashMap::entry API instead.
631                 inner.idle.remove(&self.key);
632             }
633 
634             if entry.is_none() && self.waiter.is_none() {
635                 let (tx, mut rx) = oneshot::channel();
636                 trace!("checkout waiting for idle connection: {:?}", self.key);
637                 inner
638                     .waiters
639                     .entry(self.key.clone())
640                     .or_insert_with(VecDeque::new)
641                     .push_back(tx);
642 
643                 // register the waker with this oneshot
644                 assert!(Pin::new(&mut rx).poll(cx).is_pending());
645                 self.waiter = Some(rx);
646             }
647 
648             entry
649         };
650 
651         entry.map(|e| self.pool.reuse(&self.key, e.value))
652     }
653 }
654 
655 impl<T: Poolable> Future for Checkout<T> {
656     type Output = crate::Result<Pooled<T>>;
657 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>658     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
659         if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
660             return Poll::Ready(Ok(pooled));
661         }
662 
663         if let Some(pooled) = self.checkout(cx) {
664             Poll::Ready(Ok(pooled))
665         } else if !self.pool.is_enabled() {
666             Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
667         } else {
668             // There's a new waiter, already registered in self.checkout()
669             debug_assert!(self.waiter.is_some());
670             Poll::Pending
671         }
672     }
673 }
674 
675 impl<T> Drop for Checkout<T> {
drop(&mut self)676     fn drop(&mut self) {
677         if self.waiter.take().is_some() {
678             trace!("checkout dropped for {:?}", self.key);
679             if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
680                 inner.clean_waiters(&self.key);
681             }
682         }
683     }
684 }
685 
686 // FIXME: allow() required due to `impl Trait` leaking types to this lint
687 #[allow(missing_debug_implementations)]
688 pub(super) struct Connecting<T: Poolable> {
689     key: Key,
690     pool: WeakOpt<Mutex<PoolInner<T>>>,
691 }
692 
693 impl<T: Poolable> Connecting<T> {
alpn_h2(self, pool: &Pool<T>) -> Option<Self>694     pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
695         debug_assert!(
696             self.pool.0.is_none(),
697             "Connecting::alpn_h2 but already Http2"
698         );
699 
700         pool.connecting(&self.key, Ver::Http2)
701     }
702 }
703 
704 impl<T: Poolable> Drop for Connecting<T> {
drop(&mut self)705     fn drop(&mut self) {
706         if let Some(pool) = self.pool.upgrade() {
707             // No need to panic on drop, that could abort!
708             if let Ok(mut inner) = pool.lock() {
709                 inner.connected(&self.key);
710             }
711         }
712     }
713 }
714 
715 struct Expiration(Option<Duration>);
716 
717 impl Expiration {
new(dur: Option<Duration>) -> Expiration718     fn new(dur: Option<Duration>) -> Expiration {
719         Expiration(dur)
720     }
721 
expires(&self, instant: Instant) -> bool722     fn expires(&self, instant: Instant) -> bool {
723         match self.0 {
724             Some(timeout) => instant.elapsed() > timeout,
725             None => false,
726         }
727     }
728 }
729 
730 #[cfg(feature = "runtime")]
731 pin_project_lite::pin_project! {
732     struct IdleTask<T> {
733         #[pin]
734         interval: Interval,
735         pool: WeakOpt<Mutex<PoolInner<T>>>,
736         // This allows the IdleTask to be notified as soon as the entire
737         // Pool is fully dropped, and shutdown. This channel is never sent on,
738         // but Err(Canceled) will be received when the Pool is dropped.
739         #[pin]
740         pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
741     }
742 }
743 
744 #[cfg(feature = "runtime")]
745 impl<T: Poolable + 'static> Future for IdleTask<T> {
746     type Output = ();
747 
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>748     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
749         let mut this = self.project();
750         loop {
751             match this.pool_drop_notifier.as_mut().poll(cx) {
752                 Poll::Ready(Ok(n)) => match n {},
753                 Poll::Pending => (),
754                 Poll::Ready(Err(_canceled)) => {
755                     trace!("pool closed, canceling idle interval");
756                     return Poll::Ready(());
757                 }
758             }
759 
760             ready!(this.interval.as_mut().poll_tick(cx));
761 
762             if let Some(inner) = this.pool.upgrade() {
763                 if let Ok(mut inner) = inner.lock() {
764                     trace!("idle interval checking for expired");
765                     inner.clear_expired();
766                     continue;
767                 }
768             }
769             return Poll::Ready(());
770         }
771     }
772 }
773 
774 impl<T> WeakOpt<T> {
none() -> Self775     fn none() -> Self {
776         WeakOpt(None)
777     }
778 
downgrade(arc: &Arc<T>) -> Self779     fn downgrade(arc: &Arc<T>) -> Self {
780         WeakOpt(Some(Arc::downgrade(arc)))
781     }
782 
upgrade(&self) -> Option<Arc<T>>783     fn upgrade(&self) -> Option<Arc<T>> {
784         self.0.as_ref().and_then(Weak::upgrade)
785     }
786 }
787 
788 #[cfg(test)]
789 mod tests {
790     use std::task::Poll;
791     use std::time::Duration;
792 
793     use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
794     use crate::common::{exec::Exec, task, Future, Pin};
795 
796     /// Test unique reservations.
797     #[derive(Debug, PartialEq, Eq)]
798     struct Uniq<T>(T);
799 
800     impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
is_open(&self) -> bool801         fn is_open(&self) -> bool {
802             true
803         }
804 
reserve(self) -> Reservation<Self>805         fn reserve(self) -> Reservation<Self> {
806             Reservation::Unique(self)
807         }
808 
can_share(&self) -> bool809         fn can_share(&self) -> bool {
810             false
811         }
812     }
813 
c<T: Poolable>(key: Key) -> Connecting<T>814     fn c<T: Poolable>(key: Key) -> Connecting<T> {
815         Connecting {
816             key,
817             pool: WeakOpt::none(),
818         }
819     }
820 
host_key(s: &str) -> Key821     fn host_key(s: &str) -> Key {
822         (http::uri::Scheme::HTTP, s.parse().expect("host key"))
823     }
824 
pool_no_timer<T>() -> Pool<T>825     fn pool_no_timer<T>() -> Pool<T> {
826         pool_max_idle_no_timer(::std::usize::MAX)
827     }
828 
pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>829     fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
830         let pool = Pool::new(
831             super::Config {
832                 idle_timeout: Some(Duration::from_millis(100)),
833                 max_idle_per_host: max_idle,
834             },
835             &Exec::Default,
836         );
837         pool.no_timer();
838         pool
839     }
840 
841     #[tokio::test]
test_pool_checkout_smoke()842     async fn test_pool_checkout_smoke() {
843         let pool = pool_no_timer();
844         let key = host_key("foo");
845         let pooled = pool.pooled(c(key.clone()), Uniq(41));
846 
847         drop(pooled);
848 
849         match pool.checkout(key).await {
850             Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
851             Err(_) => panic!("not ready"),
852         };
853     }
854 
855     /// Helper to check if the future is ready after polling once.
856     struct PollOnce<'a, F>(&'a mut F);
857 
858     impl<F, T, U> Future for PollOnce<'_, F>
859     where
860         F: Future<Output = Result<T, U>> + Unpin,
861     {
862         type Output = Option<()>;
863 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>864         fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
865             match Pin::new(&mut self.0).poll(cx) {
866                 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
867                 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
868                 Poll::Pending => Poll::Ready(None),
869             }
870         }
871     }
872 
873     #[tokio::test]
test_pool_checkout_returns_none_if_expired()874     async fn test_pool_checkout_returns_none_if_expired() {
875         let pool = pool_no_timer();
876         let key = host_key("foo");
877         let pooled = pool.pooled(c(key.clone()), Uniq(41));
878 
879         drop(pooled);
880         tokio::time::sleep(pool.locked().timeout.unwrap()).await;
881         let mut checkout = pool.checkout(key);
882         let poll_once = PollOnce(&mut checkout);
883         let is_not_ready = poll_once.await.is_none();
884         assert!(is_not_ready);
885     }
886 
887     #[cfg(feature = "runtime")]
888     #[tokio::test]
test_pool_checkout_removes_expired()889     async fn test_pool_checkout_removes_expired() {
890         let pool = pool_no_timer();
891         let key = host_key("foo");
892 
893         pool.pooled(c(key.clone()), Uniq(41));
894         pool.pooled(c(key.clone()), Uniq(5));
895         pool.pooled(c(key.clone()), Uniq(99));
896 
897         assert_eq!(
898             pool.locked().idle.get(&key).map(|entries| entries.len()),
899             Some(3)
900         );
901         tokio::time::sleep(pool.locked().timeout.unwrap()).await;
902 
903         let mut checkout = pool.checkout(key.clone());
904         let poll_once = PollOnce(&mut checkout);
905         // checkout.await should clean out the expired
906         poll_once.await;
907         assert!(pool.locked().idle.get(&key).is_none());
908     }
909 
910     #[test]
test_pool_max_idle_per_host()911     fn test_pool_max_idle_per_host() {
912         let pool = pool_max_idle_no_timer(2);
913         let key = host_key("foo");
914 
915         pool.pooled(c(key.clone()), Uniq(41));
916         pool.pooled(c(key.clone()), Uniq(5));
917         pool.pooled(c(key.clone()), Uniq(99));
918 
919         // pooled and dropped 3, max_idle should only allow 2
920         assert_eq!(
921             pool.locked().idle.get(&key).map(|entries| entries.len()),
922             Some(2)
923         );
924     }
925 
926     #[cfg(feature = "runtime")]
927     #[tokio::test]
test_pool_timer_removes_expired()928     async fn test_pool_timer_removes_expired() {
929         let _ = pretty_env_logger::try_init();
930         tokio::time::pause();
931 
932         let pool = Pool::new(
933             super::Config {
934                 idle_timeout: Some(Duration::from_millis(10)),
935                 max_idle_per_host: std::usize::MAX,
936             },
937             &Exec::Default,
938         );
939 
940         let key = host_key("foo");
941 
942         pool.pooled(c(key.clone()), Uniq(41));
943         pool.pooled(c(key.clone()), Uniq(5));
944         pool.pooled(c(key.clone()), Uniq(99));
945 
946         assert_eq!(
947             pool.locked().idle.get(&key).map(|entries| entries.len()),
948             Some(3)
949         );
950 
951         // Let the timer tick passed the expiration...
952         tokio::time::advance(Duration::from_millis(30)).await;
953         // Yield so the Interval can reap...
954         tokio::task::yield_now().await;
955 
956         assert!(pool.locked().idle.get(&key).is_none());
957     }
958 
959     #[tokio::test]
test_pool_checkout_task_unparked()960     async fn test_pool_checkout_task_unparked() {
961         use futures_util::future::join;
962         use futures_util::FutureExt;
963 
964         let pool = pool_no_timer();
965         let key = host_key("foo");
966         let pooled = pool.pooled(c(key.clone()), Uniq(41));
967 
968         let checkout = join(pool.checkout(key), async {
969             // the checkout future will park first,
970             // and then this lazy future will be polled, which will insert
971             // the pooled back into the pool
972             //
973             // this test makes sure that doing so will unpark the checkout
974             drop(pooled);
975         })
976         .map(|(entry, _)| entry);
977 
978         assert_eq!(*checkout.await.unwrap(), Uniq(41));
979     }
980 
981     #[tokio::test]
test_pool_checkout_drop_cleans_up_waiters()982     async fn test_pool_checkout_drop_cleans_up_waiters() {
983         let pool = pool_no_timer::<Uniq<i32>>();
984         let key = host_key("foo");
985 
986         let mut checkout1 = pool.checkout(key.clone());
987         let mut checkout2 = pool.checkout(key.clone());
988 
989         let poll_once1 = PollOnce(&mut checkout1);
990         let poll_once2 = PollOnce(&mut checkout2);
991 
992         // first poll needed to get into Pool's parked
993         poll_once1.await;
994         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
995         poll_once2.await;
996         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
997 
998         // on drop, clean up Pool
999         drop(checkout1);
1000         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1001 
1002         drop(checkout2);
1003         assert!(pool.locked().waiters.get(&key).is_none());
1004     }
1005 
1006     #[derive(Debug)]
1007     struct CanClose {
1008         #[allow(unused)]
1009         val: i32,
1010         closed: bool,
1011     }
1012 
1013     impl Poolable for CanClose {
is_open(&self) -> bool1014         fn is_open(&self) -> bool {
1015             !self.closed
1016         }
1017 
reserve(self) -> Reservation<Self>1018         fn reserve(self) -> Reservation<Self> {
1019             Reservation::Unique(self)
1020         }
1021 
can_share(&self) -> bool1022         fn can_share(&self) -> bool {
1023             false
1024         }
1025     }
1026 
1027     #[test]
pooled_drop_if_closed_doesnt_reinsert()1028     fn pooled_drop_if_closed_doesnt_reinsert() {
1029         let pool = pool_no_timer();
1030         let key = host_key("foo");
1031         pool.pooled(
1032             c(key.clone()),
1033             CanClose {
1034                 val: 57,
1035                 closed: true,
1036             },
1037         );
1038 
1039         assert!(!pool.locked().idle.contains_key(&key));
1040     }
1041 }
1042