1 use std::collections::{HashMap, HashSet, VecDeque}; 2 use std::error::Error as StdError; 3 use std::fmt; 4 use std::ops::{Deref, DerefMut}; 5 use std::sync::{Arc, Mutex, Weak}; 6 7 #[cfg(not(feature = "runtime"))] 8 use std::time::{Duration, Instant}; 9 10 use futures_channel::oneshot; 11 #[cfg(feature = "runtime")] 12 use tokio::time::{Duration, Instant, Interval}; 13 use tracing::{debug, trace}; 14 15 use super::client::Ver; 16 use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; 17 18 // FIXME: allow() required due to `impl Trait` leaking types to this lint 19 #[allow(missing_debug_implementations)] 20 pub(super) struct Pool<T> { 21 // If the pool is disabled, this is None. 22 inner: Option<Arc<Mutex<PoolInner<T>>>>, 23 } 24 25 // Before using a pooled connection, make sure the sender is not dead. 26 // 27 // This is a trait to allow the `client::pool::tests` to work for `i32`. 28 // 29 // See https://github.com/hyperium/hyper/issues/1429 30 pub(super) trait Poolable: Unpin + Send + Sized + 'static { is_open(&self) -> bool31 fn is_open(&self) -> bool; 32 /// Reserve this connection. 33 /// 34 /// Allows for HTTP/2 to return a shared reservation. reserve(self) -> Reservation<Self>35 fn reserve(self) -> Reservation<Self>; can_share(&self) -> bool36 fn can_share(&self) -> bool; 37 } 38 39 /// When checking out a pooled connection, it might be that the connection 40 /// only supports a single reservation, or it might be usable for many. 41 /// 42 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be 43 /// used for multiple requests. 44 // FIXME: allow() required due to `impl Trait` leaking types to this lint 45 #[allow(missing_debug_implementations)] 46 pub(super) enum Reservation<T> { 47 /// This connection could be used multiple times, the first one will be 48 /// reinserted into the `idle` pool, and the second will be given to 49 /// the `Checkout`. 50 #[cfg(feature = "http2")] 51 Shared(T, T), 52 /// This connection requires unique access. It will be returned after 53 /// use is complete. 54 Unique(T), 55 } 56 57 /// Simple type alias in case the key type needs to be adjusted. 58 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; 59 60 struct PoolInner<T> { 61 // A flag that a connection is being established, and the connection 62 // should be shared. This prevents making multiple HTTP/2 connections 63 // to the same host. 64 connecting: HashSet<Key>, 65 // These are internal Conns sitting in the event loop in the KeepAlive 66 // state, waiting to receive a new Request to send on the socket. 67 idle: HashMap<Key, Vec<Idle<T>>>, 68 max_idle_per_host: usize, 69 // These are outstanding Checkouts that are waiting for a socket to be 70 // able to send a Request one. This is used when "racing" for a new 71 // connection. 72 // 73 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait 74 // for the Pool to receive an idle Conn. When a Conn becomes idle, 75 // this list is checked for any parked Checkouts, and tries to notify 76 // them that the Conn could be used instead of waiting for a brand new 77 // connection. 78 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, 79 // A oneshot channel is used to allow the interval to be notified when 80 // the Pool completely drops. That way, the interval can cancel immediately. 81 #[cfg(feature = "runtime")] 82 idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>, 83 #[cfg(feature = "runtime")] 84 exec: Exec, 85 timeout: Option<Duration>, 86 } 87 88 // This is because `Weak::new()` *allocates* space for `T`, even if it 89 // doesn't need it! 90 struct WeakOpt<T>(Option<Weak<T>>); 91 92 #[derive(Clone, Copy, Debug)] 93 pub(super) struct Config { 94 pub(super) idle_timeout: Option<Duration>, 95 pub(super) max_idle_per_host: usize, 96 } 97 98 impl Config { is_enabled(&self) -> bool99 pub(super) fn is_enabled(&self) -> bool { 100 self.max_idle_per_host > 0 101 } 102 } 103 104 impl<T> Pool<T> { new(config: Config, __exec: &Exec) -> Pool<T>105 pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> { 106 let inner = if config.is_enabled() { 107 Some(Arc::new(Mutex::new(PoolInner { 108 connecting: HashSet::new(), 109 idle: HashMap::new(), 110 #[cfg(feature = "runtime")] 111 idle_interval_ref: None, 112 max_idle_per_host: config.max_idle_per_host, 113 waiters: HashMap::new(), 114 #[cfg(feature = "runtime")] 115 exec: __exec.clone(), 116 timeout: config.idle_timeout, 117 }))) 118 } else { 119 None 120 }; 121 122 Pool { inner } 123 } 124 is_enabled(&self) -> bool125 fn is_enabled(&self) -> bool { 126 self.inner.is_some() 127 } 128 129 #[cfg(test)] no_timer(&self)130 pub(super) fn no_timer(&self) { 131 // Prevent an actual interval from being created for this pool... 132 #[cfg(feature = "runtime")] 133 { 134 let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); 135 assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); 136 let (tx, _) = oneshot::channel(); 137 inner.idle_interval_ref = Some(tx); 138 } 139 } 140 } 141 142 impl<T: Poolable> Pool<T> { 143 /// Returns a `Checkout` which is a future that resolves if an idle 144 /// connection becomes available. checkout(&self, key: Key) -> Checkout<T>145 pub(super) fn checkout(&self, key: Key) -> Checkout<T> { 146 Checkout { 147 key, 148 pool: self.clone(), 149 waiter: None, 150 } 151 } 152 153 /// Ensure that there is only ever 1 connecting task for HTTP/2 154 /// connections. This does nothing for HTTP/1. connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>155 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { 156 if ver == Ver::Http2 { 157 if let Some(ref enabled) = self.inner { 158 let mut inner = enabled.lock().unwrap(); 159 return if inner.connecting.insert(key.clone()) { 160 let connecting = Connecting { 161 key: key.clone(), 162 pool: WeakOpt::downgrade(enabled), 163 }; 164 Some(connecting) 165 } else { 166 trace!("HTTP/2 connecting already in progress for {:?}", key); 167 None 168 }; 169 } 170 } 171 172 // else 173 Some(Connecting { 174 key: key.clone(), 175 // in HTTP/1's case, there is never a lock, so we don't 176 // need to do anything in Drop. 177 pool: WeakOpt::none(), 178 }) 179 } 180 181 #[cfg(test)] locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>182 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { 183 self.inner.as_ref().expect("enabled").lock().expect("lock") 184 } 185 186 /* Used in client/tests.rs... 187 #[cfg(feature = "runtime")] 188 #[cfg(test)] 189 pub(super) fn h1_key(&self, s: &str) -> Key { 190 Arc::new(s.to_string()) 191 } 192 193 #[cfg(feature = "runtime")] 194 #[cfg(test)] 195 pub(super) fn idle_count(&self, key: &Key) -> usize { 196 self 197 .locked() 198 .idle 199 .get(key) 200 .map(|list| list.len()) 201 .unwrap_or(0) 202 } 203 */ 204 pooled( &self, #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, value: T, ) -> Pooled<T>205 pub(super) fn pooled( 206 &self, 207 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, 208 value: T, 209 ) -> Pooled<T> { 210 let (value, pool_ref) = if let Some(ref enabled) = self.inner { 211 match value.reserve() { 212 #[cfg(feature = "http2")] 213 Reservation::Shared(to_insert, to_return) => { 214 let mut inner = enabled.lock().unwrap(); 215 inner.put(connecting.key.clone(), to_insert, enabled); 216 // Do this here instead of Drop for Connecting because we 217 // already have a lock, no need to lock the mutex twice. 218 inner.connected(&connecting.key); 219 // prevent the Drop of Connecting from repeating inner.connected() 220 connecting.pool = WeakOpt::none(); 221 222 // Shared reservations don't need a reference to the pool, 223 // since the pool always keeps a copy. 224 (to_return, WeakOpt::none()) 225 } 226 Reservation::Unique(value) => { 227 // Unique reservations must take a reference to the pool 228 // since they hope to reinsert once the reservation is 229 // completed 230 (value, WeakOpt::downgrade(enabled)) 231 } 232 } 233 } else { 234 // If pool is not enabled, skip all the things... 235 236 // The Connecting should have had no pool ref 237 debug_assert!(connecting.pool.upgrade().is_none()); 238 239 (value, WeakOpt::none()) 240 }; 241 Pooled { 242 key: connecting.key.clone(), 243 is_reused: false, 244 pool: pool_ref, 245 value: Some(value), 246 } 247 } 248 reuse(&self, key: &Key, value: T) -> Pooled<T>249 fn reuse(&self, key: &Key, value: T) -> Pooled<T> { 250 debug!("reuse idle connection for {:?}", key); 251 // TODO: unhack this 252 // In Pool::pooled(), which is used for inserting brand new connections, 253 // there's some code that adjusts the pool reference taken depending 254 // on if the Reservation can be shared or is unique. By the time 255 // reuse() is called, the reservation has already been made, and 256 // we just have the final value, without knowledge of if this is 257 // unique or shared. So, the hack is to just assume Ver::Http2 means 258 // shared... :( 259 let mut pool_ref = WeakOpt::none(); 260 if !value.can_share() { 261 if let Some(ref enabled) = self.inner { 262 pool_ref = WeakOpt::downgrade(enabled); 263 } 264 } 265 266 Pooled { 267 is_reused: true, 268 key: key.clone(), 269 pool: pool_ref, 270 value: Some(value), 271 } 272 } 273 } 274 275 /// Pop off this list, looking for a usable connection that hasn't expired. 276 struct IdlePopper<'a, T> { 277 key: &'a Key, 278 list: &'a mut Vec<Idle<T>>, 279 } 280 281 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { pop(self, expiration: &Expiration) -> Option<Idle<T>>282 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { 283 while let Some(entry) = self.list.pop() { 284 // If the connection has been closed, or is older than our idle 285 // timeout, simply drop it and keep looking... 286 if !entry.value.is_open() { 287 trace!("removing closed connection for {:?}", self.key); 288 continue; 289 } 290 // TODO: Actually, since the `idle` list is pushed to the end always, 291 // that would imply that if *this* entry is expired, then anything 292 // "earlier" in the list would *have* to be expired also... Right? 293 // 294 // In that case, we could just break out of the loop and drop the 295 // whole list... 296 if expiration.expires(entry.idle_at) { 297 trace!("removing expired connection for {:?}", self.key); 298 continue; 299 } 300 301 let value = match entry.value.reserve() { 302 #[cfg(feature = "http2")] 303 Reservation::Shared(to_reinsert, to_checkout) => { 304 self.list.push(Idle { 305 idle_at: Instant::now(), 306 value: to_reinsert, 307 }); 308 to_checkout 309 } 310 Reservation::Unique(unique) => unique, 311 }; 312 313 return Some(Idle { 314 idle_at: entry.idle_at, 315 value, 316 }); 317 } 318 319 None 320 } 321 } 322 323 impl<T: Poolable> PoolInner<T> { put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)324 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { 325 if value.can_share() && self.idle.contains_key(&key) { 326 trace!("put; existing idle HTTP/2 connection for {:?}", key); 327 return; 328 } 329 trace!("put; add idle connection for {:?}", key); 330 let mut remove_waiters = false; 331 let mut value = Some(value); 332 if let Some(waiters) = self.waiters.get_mut(&key) { 333 while let Some(tx) = waiters.pop_front() { 334 if !tx.is_canceled() { 335 let reserved = value.take().expect("value already sent"); 336 let reserved = match reserved.reserve() { 337 #[cfg(feature = "http2")] 338 Reservation::Shared(to_keep, to_send) => { 339 value = Some(to_keep); 340 to_send 341 } 342 Reservation::Unique(uniq) => uniq, 343 }; 344 match tx.send(reserved) { 345 Ok(()) => { 346 if value.is_none() { 347 break; 348 } else { 349 continue; 350 } 351 } 352 Err(e) => { 353 value = Some(e); 354 } 355 } 356 } 357 358 trace!("put; removing canceled waiter for {:?}", key); 359 } 360 remove_waiters = waiters.is_empty(); 361 } 362 if remove_waiters { 363 self.waiters.remove(&key); 364 } 365 366 match value { 367 Some(value) => { 368 // borrow-check scope... 369 { 370 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); 371 if self.max_idle_per_host <= idle_list.len() { 372 trace!("max idle per host for {:?}, dropping connection", key); 373 return; 374 } 375 376 debug!("pooling idle connection for {:?}", key); 377 idle_list.push(Idle { 378 value, 379 idle_at: Instant::now(), 380 }); 381 } 382 383 #[cfg(feature = "runtime")] 384 { 385 self.spawn_idle_interval(__pool_ref); 386 } 387 } 388 None => trace!("put; found waiter for {:?}", key), 389 } 390 } 391 392 /// A `Connecting` task is complete. Not necessarily successfully, 393 /// but the lock is going away, so clean up. connected(&mut self, key: &Key)394 fn connected(&mut self, key: &Key) { 395 let existed = self.connecting.remove(key); 396 debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); 397 // cancel any waiters. if there are any, it's because 398 // this Connecting task didn't complete successfully. 399 // those waiters would never receive a connection. 400 self.waiters.remove(key); 401 } 402 403 #[cfg(feature = "runtime")] spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)404 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { 405 let (dur, rx) = { 406 if self.idle_interval_ref.is_some() { 407 return; 408 } 409 410 if let Some(dur) = self.timeout { 411 let (tx, rx) = oneshot::channel(); 412 self.idle_interval_ref = Some(tx); 413 (dur, rx) 414 } else { 415 return; 416 } 417 }; 418 419 let interval = IdleTask { 420 interval: tokio::time::interval(dur), 421 pool: WeakOpt::downgrade(pool_ref), 422 pool_drop_notifier: rx, 423 }; 424 425 self.exec.execute(interval); 426 } 427 } 428 429 impl<T> PoolInner<T> { 430 /// Any `FutureResponse`s that were created will have made a `Checkout`, 431 /// and possibly inserted into the pool that it is waiting for an idle 432 /// connection. If a user ever dropped that future, we need to clean out 433 /// those parked senders. clean_waiters(&mut self, key: &Key)434 fn clean_waiters(&mut self, key: &Key) { 435 let mut remove_waiters = false; 436 if let Some(waiters) = self.waiters.get_mut(key) { 437 waiters.retain(|tx| !tx.is_canceled()); 438 remove_waiters = waiters.is_empty(); 439 } 440 if remove_waiters { 441 self.waiters.remove(key); 442 } 443 } 444 } 445 446 #[cfg(feature = "runtime")] 447 impl<T: Poolable> PoolInner<T> { 448 /// This should *only* be called by the IdleTask clear_expired(&mut self)449 fn clear_expired(&mut self) { 450 let dur = self.timeout.expect("interval assumes timeout"); 451 452 let now = Instant::now(); 453 //self.last_idle_check_at = now; 454 455 self.idle.retain(|key, values| { 456 values.retain(|entry| { 457 if !entry.value.is_open() { 458 trace!("idle interval evicting closed for {:?}", key); 459 return false; 460 } 461 if now - entry.idle_at > dur { 462 trace!("idle interval evicting expired for {:?}", key); 463 return false; 464 } 465 466 // Otherwise, keep this value... 467 true 468 }); 469 470 // returning false evicts this key/val 471 !values.is_empty() 472 }); 473 } 474 } 475 476 impl<T> Clone for Pool<T> { clone(&self) -> Pool<T>477 fn clone(&self) -> Pool<T> { 478 Pool { 479 inner: self.inner.clone(), 480 } 481 } 482 } 483 484 /// A wrapped poolable value that tries to reinsert to the Pool on Drop. 485 // Note: The bounds `T: Poolable` is needed for the Drop impl. 486 pub(super) struct Pooled<T: Poolable> { 487 value: Option<T>, 488 is_reused: bool, 489 key: Key, 490 pool: WeakOpt<Mutex<PoolInner<T>>>, 491 } 492 493 impl<T: Poolable> Pooled<T> { is_reused(&self) -> bool494 pub(super) fn is_reused(&self) -> bool { 495 self.is_reused 496 } 497 is_pool_enabled(&self) -> bool498 pub(super) fn is_pool_enabled(&self) -> bool { 499 self.pool.0.is_some() 500 } 501 as_ref(&self) -> &T502 fn as_ref(&self) -> &T { 503 self.value.as_ref().expect("not dropped") 504 } 505 as_mut(&mut self) -> &mut T506 fn as_mut(&mut self) -> &mut T { 507 self.value.as_mut().expect("not dropped") 508 } 509 } 510 511 impl<T: Poolable> Deref for Pooled<T> { 512 type Target = T; deref(&self) -> &T513 fn deref(&self) -> &T { 514 self.as_ref() 515 } 516 } 517 518 impl<T: Poolable> DerefMut for Pooled<T> { deref_mut(&mut self) -> &mut T519 fn deref_mut(&mut self) -> &mut T { 520 self.as_mut() 521 } 522 } 523 524 impl<T: Poolable> Drop for Pooled<T> { drop(&mut self)525 fn drop(&mut self) { 526 if let Some(value) = self.value.take() { 527 if !value.is_open() { 528 // If we *already* know the connection is done here, 529 // it shouldn't be re-inserted back into the pool. 530 return; 531 } 532 533 if let Some(pool) = self.pool.upgrade() { 534 if let Ok(mut inner) = pool.lock() { 535 inner.put(self.key.clone(), value, &pool); 536 } 537 } else if !value.can_share() { 538 trace!("pool dropped, dropping pooled ({:?})", self.key); 539 } 540 // Ver::Http2 is already in the Pool (or dead), so we wouldn't 541 // have an actual reference to the Pool. 542 } 543 } 544 } 545 546 impl<T: Poolable> fmt::Debug for Pooled<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 548 f.debug_struct("Pooled").field("key", &self.key).finish() 549 } 550 } 551 552 struct Idle<T> { 553 idle_at: Instant, 554 value: T, 555 } 556 557 // FIXME: allow() required due to `impl Trait` leaking types to this lint 558 #[allow(missing_debug_implementations)] 559 pub(super) struct Checkout<T> { 560 key: Key, 561 pool: Pool<T>, 562 waiter: Option<oneshot::Receiver<T>>, 563 } 564 565 #[derive(Debug)] 566 pub(super) struct CheckoutIsClosedError; 567 568 impl StdError for CheckoutIsClosedError {} 569 570 impl fmt::Display for CheckoutIsClosedError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result571 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 572 f.write_str("checked out connection was closed") 573 } 574 } 575 576 impl<T: Poolable> Checkout<T> { poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<Pooled<T>>>>577 fn poll_waiter( 578 &mut self, 579 cx: &mut task::Context<'_>, 580 ) -> Poll<Option<crate::Result<Pooled<T>>>> { 581 if let Some(mut rx) = self.waiter.take() { 582 match Pin::new(&mut rx).poll(cx) { 583 Poll::Ready(Ok(value)) => { 584 if value.is_open() { 585 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) 586 } else { 587 Poll::Ready(Some(Err( 588 crate::Error::new_canceled().with(CheckoutIsClosedError) 589 ))) 590 } 591 } 592 Poll::Pending => { 593 self.waiter = Some(rx); 594 Poll::Pending 595 } 596 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( 597 crate::Error::new_canceled().with("request has been canceled") 598 ))), 599 } 600 } else { 601 Poll::Ready(None) 602 } 603 } 604 checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>>605 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> { 606 let entry = { 607 let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); 608 let expiration = Expiration::new(inner.timeout); 609 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { 610 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); 611 // A block to end the mutable borrow on list, 612 // so the map below can check is_empty() 613 { 614 let popper = IdlePopper { 615 key: &self.key, 616 list, 617 }; 618 popper.pop(&expiration) 619 } 620 .map(|e| (e, list.is_empty())) 621 }); 622 623 let (entry, empty) = if let Some((e, empty)) = maybe_entry { 624 (Some(e), empty) 625 } else { 626 // No entry found means nuke the list for sure. 627 (None, true) 628 }; 629 if empty { 630 //TODO: This could be done with the HashMap::entry API instead. 631 inner.idle.remove(&self.key); 632 } 633 634 if entry.is_none() && self.waiter.is_none() { 635 let (tx, mut rx) = oneshot::channel(); 636 trace!("checkout waiting for idle connection: {:?}", self.key); 637 inner 638 .waiters 639 .entry(self.key.clone()) 640 .or_insert_with(VecDeque::new) 641 .push_back(tx); 642 643 // register the waker with this oneshot 644 assert!(Pin::new(&mut rx).poll(cx).is_pending()); 645 self.waiter = Some(rx); 646 } 647 648 entry 649 }; 650 651 entry.map(|e| self.pool.reuse(&self.key, e.value)) 652 } 653 } 654 655 impl<T: Poolable> Future for Checkout<T> { 656 type Output = crate::Result<Pooled<T>>; 657 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>658 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 659 if let Some(pooled) = ready!(self.poll_waiter(cx)?) { 660 return Poll::Ready(Ok(pooled)); 661 } 662 663 if let Some(pooled) = self.checkout(cx) { 664 Poll::Ready(Ok(pooled)) 665 } else if !self.pool.is_enabled() { 666 Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) 667 } else { 668 // There's a new waiter, already registered in self.checkout() 669 debug_assert!(self.waiter.is_some()); 670 Poll::Pending 671 } 672 } 673 } 674 675 impl<T> Drop for Checkout<T> { drop(&mut self)676 fn drop(&mut self) { 677 if self.waiter.take().is_some() { 678 trace!("checkout dropped for {:?}", self.key); 679 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { 680 inner.clean_waiters(&self.key); 681 } 682 } 683 } 684 } 685 686 // FIXME: allow() required due to `impl Trait` leaking types to this lint 687 #[allow(missing_debug_implementations)] 688 pub(super) struct Connecting<T: Poolable> { 689 key: Key, 690 pool: WeakOpt<Mutex<PoolInner<T>>>, 691 } 692 693 impl<T: Poolable> Connecting<T> { alpn_h2(self, pool: &Pool<T>) -> Option<Self>694 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { 695 debug_assert!( 696 self.pool.0.is_none(), 697 "Connecting::alpn_h2 but already Http2" 698 ); 699 700 pool.connecting(&self.key, Ver::Http2) 701 } 702 } 703 704 impl<T: Poolable> Drop for Connecting<T> { drop(&mut self)705 fn drop(&mut self) { 706 if let Some(pool) = self.pool.upgrade() { 707 // No need to panic on drop, that could abort! 708 if let Ok(mut inner) = pool.lock() { 709 inner.connected(&self.key); 710 } 711 } 712 } 713 } 714 715 struct Expiration(Option<Duration>); 716 717 impl Expiration { new(dur: Option<Duration>) -> Expiration718 fn new(dur: Option<Duration>) -> Expiration { 719 Expiration(dur) 720 } 721 expires(&self, instant: Instant) -> bool722 fn expires(&self, instant: Instant) -> bool { 723 match self.0 { 724 Some(timeout) => instant.elapsed() > timeout, 725 None => false, 726 } 727 } 728 } 729 730 #[cfg(feature = "runtime")] 731 pin_project_lite::pin_project! { 732 struct IdleTask<T> { 733 #[pin] 734 interval: Interval, 735 pool: WeakOpt<Mutex<PoolInner<T>>>, 736 // This allows the IdleTask to be notified as soon as the entire 737 // Pool is fully dropped, and shutdown. This channel is never sent on, 738 // but Err(Canceled) will be received when the Pool is dropped. 739 #[pin] 740 pool_drop_notifier: oneshot::Receiver<crate::common::Never>, 741 } 742 } 743 744 #[cfg(feature = "runtime")] 745 impl<T: Poolable + 'static> Future for IdleTask<T> { 746 type Output = (); 747 poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>748 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 749 let mut this = self.project(); 750 loop { 751 match this.pool_drop_notifier.as_mut().poll(cx) { 752 Poll::Ready(Ok(n)) => match n {}, 753 Poll::Pending => (), 754 Poll::Ready(Err(_canceled)) => { 755 trace!("pool closed, canceling idle interval"); 756 return Poll::Ready(()); 757 } 758 } 759 760 ready!(this.interval.as_mut().poll_tick(cx)); 761 762 if let Some(inner) = this.pool.upgrade() { 763 if let Ok(mut inner) = inner.lock() { 764 trace!("idle interval checking for expired"); 765 inner.clear_expired(); 766 continue; 767 } 768 } 769 return Poll::Ready(()); 770 } 771 } 772 } 773 774 impl<T> WeakOpt<T> { none() -> Self775 fn none() -> Self { 776 WeakOpt(None) 777 } 778 downgrade(arc: &Arc<T>) -> Self779 fn downgrade(arc: &Arc<T>) -> Self { 780 WeakOpt(Some(Arc::downgrade(arc))) 781 } 782 upgrade(&self) -> Option<Arc<T>>783 fn upgrade(&self) -> Option<Arc<T>> { 784 self.0.as_ref().and_then(Weak::upgrade) 785 } 786 } 787 788 #[cfg(test)] 789 mod tests { 790 use std::task::Poll; 791 use std::time::Duration; 792 793 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; 794 use crate::common::{exec::Exec, task, Future, Pin}; 795 796 /// Test unique reservations. 797 #[derive(Debug, PartialEq, Eq)] 798 struct Uniq<T>(T); 799 800 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { is_open(&self) -> bool801 fn is_open(&self) -> bool { 802 true 803 } 804 reserve(self) -> Reservation<Self>805 fn reserve(self) -> Reservation<Self> { 806 Reservation::Unique(self) 807 } 808 can_share(&self) -> bool809 fn can_share(&self) -> bool { 810 false 811 } 812 } 813 c<T: Poolable>(key: Key) -> Connecting<T>814 fn c<T: Poolable>(key: Key) -> Connecting<T> { 815 Connecting { 816 key, 817 pool: WeakOpt::none(), 818 } 819 } 820 host_key(s: &str) -> Key821 fn host_key(s: &str) -> Key { 822 (http::uri::Scheme::HTTP, s.parse().expect("host key")) 823 } 824 pool_no_timer<T>() -> Pool<T>825 fn pool_no_timer<T>() -> Pool<T> { 826 pool_max_idle_no_timer(::std::usize::MAX) 827 } 828 pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>829 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { 830 let pool = Pool::new( 831 super::Config { 832 idle_timeout: Some(Duration::from_millis(100)), 833 max_idle_per_host: max_idle, 834 }, 835 &Exec::Default, 836 ); 837 pool.no_timer(); 838 pool 839 } 840 841 #[tokio::test] test_pool_checkout_smoke()842 async fn test_pool_checkout_smoke() { 843 let pool = pool_no_timer(); 844 let key = host_key("foo"); 845 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 846 847 drop(pooled); 848 849 match pool.checkout(key).await { 850 Ok(pooled) => assert_eq!(*pooled, Uniq(41)), 851 Err(_) => panic!("not ready"), 852 }; 853 } 854 855 /// Helper to check if the future is ready after polling once. 856 struct PollOnce<'a, F>(&'a mut F); 857 858 impl<F, T, U> Future for PollOnce<'_, F> 859 where 860 F: Future<Output = Result<T, U>> + Unpin, 861 { 862 type Output = Option<()>; 863 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>864 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 865 match Pin::new(&mut self.0).poll(cx) { 866 Poll::Ready(Ok(_)) => Poll::Ready(Some(())), 867 Poll::Ready(Err(_)) => Poll::Ready(Some(())), 868 Poll::Pending => Poll::Ready(None), 869 } 870 } 871 } 872 873 #[tokio::test] test_pool_checkout_returns_none_if_expired()874 async fn test_pool_checkout_returns_none_if_expired() { 875 let pool = pool_no_timer(); 876 let key = host_key("foo"); 877 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 878 879 drop(pooled); 880 tokio::time::sleep(pool.locked().timeout.unwrap()).await; 881 let mut checkout = pool.checkout(key); 882 let poll_once = PollOnce(&mut checkout); 883 let is_not_ready = poll_once.await.is_none(); 884 assert!(is_not_ready); 885 } 886 887 #[cfg(feature = "runtime")] 888 #[tokio::test] test_pool_checkout_removes_expired()889 async fn test_pool_checkout_removes_expired() { 890 let pool = pool_no_timer(); 891 let key = host_key("foo"); 892 893 pool.pooled(c(key.clone()), Uniq(41)); 894 pool.pooled(c(key.clone()), Uniq(5)); 895 pool.pooled(c(key.clone()), Uniq(99)); 896 897 assert_eq!( 898 pool.locked().idle.get(&key).map(|entries| entries.len()), 899 Some(3) 900 ); 901 tokio::time::sleep(pool.locked().timeout.unwrap()).await; 902 903 let mut checkout = pool.checkout(key.clone()); 904 let poll_once = PollOnce(&mut checkout); 905 // checkout.await should clean out the expired 906 poll_once.await; 907 assert!(pool.locked().idle.get(&key).is_none()); 908 } 909 910 #[test] test_pool_max_idle_per_host()911 fn test_pool_max_idle_per_host() { 912 let pool = pool_max_idle_no_timer(2); 913 let key = host_key("foo"); 914 915 pool.pooled(c(key.clone()), Uniq(41)); 916 pool.pooled(c(key.clone()), Uniq(5)); 917 pool.pooled(c(key.clone()), Uniq(99)); 918 919 // pooled and dropped 3, max_idle should only allow 2 920 assert_eq!( 921 pool.locked().idle.get(&key).map(|entries| entries.len()), 922 Some(2) 923 ); 924 } 925 926 #[cfg(feature = "runtime")] 927 #[tokio::test] test_pool_timer_removes_expired()928 async fn test_pool_timer_removes_expired() { 929 let _ = pretty_env_logger::try_init(); 930 tokio::time::pause(); 931 932 let pool = Pool::new( 933 super::Config { 934 idle_timeout: Some(Duration::from_millis(10)), 935 max_idle_per_host: std::usize::MAX, 936 }, 937 &Exec::Default, 938 ); 939 940 let key = host_key("foo"); 941 942 pool.pooled(c(key.clone()), Uniq(41)); 943 pool.pooled(c(key.clone()), Uniq(5)); 944 pool.pooled(c(key.clone()), Uniq(99)); 945 946 assert_eq!( 947 pool.locked().idle.get(&key).map(|entries| entries.len()), 948 Some(3) 949 ); 950 951 // Let the timer tick passed the expiration... 952 tokio::time::advance(Duration::from_millis(30)).await; 953 // Yield so the Interval can reap... 954 tokio::task::yield_now().await; 955 956 assert!(pool.locked().idle.get(&key).is_none()); 957 } 958 959 #[tokio::test] test_pool_checkout_task_unparked()960 async fn test_pool_checkout_task_unparked() { 961 use futures_util::future::join; 962 use futures_util::FutureExt; 963 964 let pool = pool_no_timer(); 965 let key = host_key("foo"); 966 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 967 968 let checkout = join(pool.checkout(key), async { 969 // the checkout future will park first, 970 // and then this lazy future will be polled, which will insert 971 // the pooled back into the pool 972 // 973 // this test makes sure that doing so will unpark the checkout 974 drop(pooled); 975 }) 976 .map(|(entry, _)| entry); 977 978 assert_eq!(*checkout.await.unwrap(), Uniq(41)); 979 } 980 981 #[tokio::test] test_pool_checkout_drop_cleans_up_waiters()982 async fn test_pool_checkout_drop_cleans_up_waiters() { 983 let pool = pool_no_timer::<Uniq<i32>>(); 984 let key = host_key("foo"); 985 986 let mut checkout1 = pool.checkout(key.clone()); 987 let mut checkout2 = pool.checkout(key.clone()); 988 989 let poll_once1 = PollOnce(&mut checkout1); 990 let poll_once2 = PollOnce(&mut checkout2); 991 992 // first poll needed to get into Pool's parked 993 poll_once1.await; 994 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 995 poll_once2.await; 996 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); 997 998 // on drop, clean up Pool 999 drop(checkout1); 1000 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 1001 1002 drop(checkout2); 1003 assert!(pool.locked().waiters.get(&key).is_none()); 1004 } 1005 1006 #[derive(Debug)] 1007 struct CanClose { 1008 #[allow(unused)] 1009 val: i32, 1010 closed: bool, 1011 } 1012 1013 impl Poolable for CanClose { is_open(&self) -> bool1014 fn is_open(&self) -> bool { 1015 !self.closed 1016 } 1017 reserve(self) -> Reservation<Self>1018 fn reserve(self) -> Reservation<Self> { 1019 Reservation::Unique(self) 1020 } 1021 can_share(&self) -> bool1022 fn can_share(&self) -> bool { 1023 false 1024 } 1025 } 1026 1027 #[test] pooled_drop_if_closed_doesnt_reinsert()1028 fn pooled_drop_if_closed_doesnt_reinsert() { 1029 let pool = pool_no_timer(); 1030 let key = host_key("foo"); 1031 pool.pooled( 1032 c(key.clone()), 1033 CanClose { 1034 val: 57, 1035 closed: true, 1036 }, 1037 ); 1038 1039 assert!(!pool.locked().idle.contains_key(&key)); 1040 } 1041 } 1042