1 use std::collections::{HashMap, HashSet, VecDeque}; 2 use std::fmt; 3 use std::ops::{Deref, DerefMut}; 4 use std::sync::{Arc, Mutex, Weak}; 5 6 #[cfg(not(feature = "runtime"))] 7 use std::time::{Duration, Instant}; 8 9 use futures_channel::oneshot; 10 #[cfg(feature = "runtime")] 11 use tokio::time::{Duration, Instant, Interval}; 12 13 use super::Ver; 14 use crate::common::{task, Exec, Future, Pin, Poll, Unpin}; 15 16 // FIXME: allow() required due to `impl Trait` leaking types to this lint 17 #[allow(missing_debug_implementations)] 18 pub(super) struct Pool<T> { 19 // If the pool is disabled, this is None. 20 inner: Option<Arc<Mutex<PoolInner<T>>>>, 21 } 22 23 // Before using a pooled connection, make sure the sender is not dead. 24 // 25 // This is a trait to allow the `client::pool::tests` to work for `i32`. 26 // 27 // See https://github.com/hyperium/hyper/issues/1429 28 pub(super) trait Poolable: Unpin + Send + Sized + 'static { is_open(&self) -> bool29 fn is_open(&self) -> bool; 30 /// Reserve this connection. 31 /// 32 /// Allows for HTTP/2 to return a shared reservation. reserve(self) -> Reservation<Self>33 fn reserve(self) -> Reservation<Self>; can_share(&self) -> bool34 fn can_share(&self) -> bool; 35 } 36 37 /// When checking out a pooled connection, it might be that the connection 38 /// only supports a single reservation, or it might be usable for many. 39 /// 40 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be 41 /// used for multiple requests. 42 // FIXME: allow() required due to `impl Trait` leaking types to this lint 43 #[allow(missing_debug_implementations)] 44 pub(super) enum Reservation<T> { 45 /// This connection could be used multiple times, the first one will be 46 /// reinserted into the `idle` pool, and the second will be given to 47 /// the `Checkout`. 48 Shared(T, T), 49 /// This connection requires unique access. It will be returned after 50 /// use is complete. 51 Unique(T), 52 } 53 54 /// Simple type alias in case the key type needs to be adjusted. 55 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; 56 57 struct PoolInner<T> { 58 // A flag that a connection is being established, and the connection 59 // should be shared. This prevents making multiple HTTP/2 connections 60 // to the same host. 61 connecting: HashSet<Key>, 62 // These are internal Conns sitting in the event loop in the KeepAlive 63 // state, waiting to receive a new Request to send on the socket. 64 idle: HashMap<Key, Vec<Idle<T>>>, 65 max_idle_per_host: usize, 66 // These are outstanding Checkouts that are waiting for a socket to be 67 // able to send a Request one. This is used when "racing" for a new 68 // connection. 69 // 70 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait 71 // for the Pool to receive an idle Conn. When a Conn becomes idle, 72 // this list is checked for any parked Checkouts, and tries to notify 73 // them that the Conn could be used instead of waiting for a brand new 74 // connection. 75 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, 76 // A oneshot channel is used to allow the interval to be notified when 77 // the Pool completely drops. That way, the interval can cancel immediately. 78 #[cfg(feature = "runtime")] 79 idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>, 80 #[cfg(feature = "runtime")] 81 exec: Exec, 82 timeout: Option<Duration>, 83 } 84 85 // This is because `Weak::new()` *allocates* space for `T`, even if it 86 // doesn't need it! 87 struct WeakOpt<T>(Option<Weak<T>>); 88 89 #[derive(Clone, Copy, Debug)] 90 pub(super) struct Config { 91 pub(super) idle_timeout: Option<Duration>, 92 pub(super) max_idle_per_host: usize, 93 } 94 95 impl Config { is_enabled(&self) -> bool96 pub(super) fn is_enabled(&self) -> bool { 97 self.max_idle_per_host > 0 98 } 99 } 100 101 impl<T> Pool<T> { new(config: Config, __exec: &Exec) -> Pool<T>102 pub fn new(config: Config, __exec: &Exec) -> Pool<T> { 103 let inner = if config.is_enabled() { 104 Some(Arc::new(Mutex::new(PoolInner { 105 connecting: HashSet::new(), 106 idle: HashMap::new(), 107 #[cfg(feature = "runtime")] 108 idle_interval_ref: None, 109 max_idle_per_host: config.max_idle_per_host, 110 waiters: HashMap::new(), 111 #[cfg(feature = "runtime")] 112 exec: __exec.clone(), 113 timeout: config.idle_timeout, 114 }))) 115 } else { 116 None 117 }; 118 119 Pool { inner } 120 } 121 is_enabled(&self) -> bool122 fn is_enabled(&self) -> bool { 123 self.inner.is_some() 124 } 125 126 #[cfg(test)] no_timer(&self)127 pub(super) fn no_timer(&self) { 128 // Prevent an actual interval from being created for this pool... 129 #[cfg(feature = "runtime")] 130 { 131 let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); 132 assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); 133 let (tx, _) = oneshot::channel(); 134 inner.idle_interval_ref = Some(tx); 135 } 136 } 137 } 138 139 impl<T: Poolable> Pool<T> { 140 /// Returns a `Checkout` which is a future that resolves if an idle 141 /// connection becomes available. checkout(&self, key: Key) -> Checkout<T>142 pub fn checkout(&self, key: Key) -> Checkout<T> { 143 Checkout { 144 key, 145 pool: self.clone(), 146 waiter: None, 147 } 148 } 149 150 /// Ensure that there is only ever 1 connecting task for HTTP/2 151 /// connections. This does nothing for HTTP/1. connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>152 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { 153 if ver == Ver::Http2 { 154 if let Some(ref enabled) = self.inner { 155 let mut inner = enabled.lock().unwrap(); 156 return if inner.connecting.insert(key.clone()) { 157 let connecting = Connecting { 158 key: key.clone(), 159 pool: WeakOpt::downgrade(enabled), 160 }; 161 Some(connecting) 162 } else { 163 trace!("HTTP/2 connecting already in progress for {:?}", key); 164 None 165 }; 166 } 167 } 168 169 // else 170 Some(Connecting { 171 key: key.clone(), 172 // in HTTP/1's case, there is never a lock, so we don't 173 // need to do anything in Drop. 174 pool: WeakOpt::none(), 175 }) 176 } 177 178 #[cfg(test)] locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>179 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { 180 self.inner.as_ref().expect("enabled").lock().expect("lock") 181 } 182 183 /* Used in client/tests.rs... 184 #[cfg(feature = "runtime")] 185 #[cfg(test)] 186 pub(super) fn h1_key(&self, s: &str) -> Key { 187 Arc::new(s.to_string()) 188 } 189 190 #[cfg(feature = "runtime")] 191 #[cfg(test)] 192 pub(super) fn idle_count(&self, key: &Key) -> usize { 193 self 194 .locked() 195 .idle 196 .get(key) 197 .map(|list| list.len()) 198 .unwrap_or(0) 199 } 200 */ 201 pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T>202 pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { 203 let (value, pool_ref) = if let Some(ref enabled) = self.inner { 204 match value.reserve() { 205 Reservation::Shared(to_insert, to_return) => { 206 let mut inner = enabled.lock().unwrap(); 207 inner.put(connecting.key.clone(), to_insert, enabled); 208 // Do this here instead of Drop for Connecting because we 209 // already have a lock, no need to lock the mutex twice. 210 inner.connected(&connecting.key); 211 // prevent the Drop of Connecting from repeating inner.connected() 212 connecting.pool = WeakOpt::none(); 213 214 // Shared reservations don't need a reference to the pool, 215 // since the pool always keeps a copy. 216 (to_return, WeakOpt::none()) 217 } 218 Reservation::Unique(value) => { 219 // Unique reservations must take a reference to the pool 220 // since they hope to reinsert once the reservation is 221 // completed 222 (value, WeakOpt::downgrade(enabled)) 223 } 224 } 225 } else { 226 // If pool is not enabled, skip all the things... 227 228 // The Connecting should have had no pool ref 229 debug_assert!(connecting.pool.upgrade().is_none()); 230 231 (value, WeakOpt::none()) 232 }; 233 Pooled { 234 key: connecting.key.clone(), 235 is_reused: false, 236 pool: pool_ref, 237 value: Some(value), 238 } 239 } 240 reuse(&self, key: &Key, value: T) -> Pooled<T>241 fn reuse(&self, key: &Key, value: T) -> Pooled<T> { 242 debug!("reuse idle connection for {:?}", key); 243 // TODO: unhack this 244 // In Pool::pooled(), which is used for inserting brand new connections, 245 // there's some code that adjusts the pool reference taken depending 246 // on if the Reservation can be shared or is unique. By the time 247 // reuse() is called, the reservation has already been made, and 248 // we just have the final value, without knowledge of if this is 249 // unique or shared. So, the hack is to just assume Ver::Http2 means 250 // shared... :( 251 let mut pool_ref = WeakOpt::none(); 252 if !value.can_share() { 253 if let Some(ref enabled) = self.inner { 254 pool_ref = WeakOpt::downgrade(enabled); 255 } 256 } 257 258 Pooled { 259 is_reused: true, 260 key: key.clone(), 261 pool: pool_ref, 262 value: Some(value), 263 } 264 } 265 } 266 267 /// Pop off this list, looking for a usable connection that hasn't expired. 268 struct IdlePopper<'a, T> { 269 key: &'a Key, 270 list: &'a mut Vec<Idle<T>>, 271 } 272 273 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { pop(self, expiration: &Expiration) -> Option<Idle<T>>274 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { 275 while let Some(entry) = self.list.pop() { 276 // If the connection has been closed, or is older than our idle 277 // timeout, simply drop it and keep looking... 278 if !entry.value.is_open() { 279 trace!("removing closed connection for {:?}", self.key); 280 continue; 281 } 282 // TODO: Actually, since the `idle` list is pushed to the end always, 283 // that would imply that if *this* entry is expired, then anything 284 // "earlier" in the list would *have* to be expired also... Right? 285 // 286 // In that case, we could just break out of the loop and drop the 287 // whole list... 288 if expiration.expires(entry.idle_at) { 289 trace!("removing expired connection for {:?}", self.key); 290 continue; 291 } 292 293 let value = match entry.value.reserve() { 294 Reservation::Shared(to_reinsert, to_checkout) => { 295 self.list.push(Idle { 296 idle_at: Instant::now(), 297 value: to_reinsert, 298 }); 299 to_checkout 300 } 301 Reservation::Unique(unique) => unique, 302 }; 303 304 return Some(Idle { 305 idle_at: entry.idle_at, 306 value, 307 }); 308 } 309 310 None 311 } 312 } 313 314 impl<T: Poolable> PoolInner<T> { put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)315 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { 316 if value.can_share() && self.idle.contains_key(&key) { 317 trace!("put; existing idle HTTP/2 connection for {:?}", key); 318 return; 319 } 320 trace!("put; add idle connection for {:?}", key); 321 let mut remove_waiters = false; 322 let mut value = Some(value); 323 if let Some(waiters) = self.waiters.get_mut(&key) { 324 while let Some(tx) = waiters.pop_front() { 325 if !tx.is_canceled() { 326 let reserved = value.take().expect("value already sent"); 327 let reserved = match reserved.reserve() { 328 Reservation::Shared(to_keep, to_send) => { 329 value = Some(to_keep); 330 to_send 331 } 332 Reservation::Unique(uniq) => uniq, 333 }; 334 match tx.send(reserved) { 335 Ok(()) => { 336 if value.is_none() { 337 break; 338 } else { 339 continue; 340 } 341 } 342 Err(e) => { 343 value = Some(e); 344 } 345 } 346 } 347 348 trace!("put; removing canceled waiter for {:?}", key); 349 } 350 remove_waiters = waiters.is_empty(); 351 } 352 if remove_waiters { 353 self.waiters.remove(&key); 354 } 355 356 match value { 357 Some(value) => { 358 // borrow-check scope... 359 { 360 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); 361 if self.max_idle_per_host <= idle_list.len() { 362 trace!("max idle per host for {:?}, dropping connection", key); 363 return; 364 } 365 366 debug!("pooling idle connection for {:?}", key); 367 idle_list.push(Idle { 368 value, 369 idle_at: Instant::now(), 370 }); 371 } 372 373 #[cfg(feature = "runtime")] 374 { 375 self.spawn_idle_interval(__pool_ref); 376 } 377 } 378 None => trace!("put; found waiter for {:?}", key), 379 } 380 } 381 382 /// A `Connecting` task is complete. Not necessarily successfully, 383 /// but the lock is going away, so clean up. connected(&mut self, key: &Key)384 fn connected(&mut self, key: &Key) { 385 let existed = self.connecting.remove(key); 386 debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); 387 // cancel any waiters. if there are any, it's because 388 // this Connecting task didn't complete successfully. 389 // those waiters would never receive a connection. 390 self.waiters.remove(key); 391 } 392 393 #[cfg(feature = "runtime")] spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)394 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { 395 let (dur, rx) = { 396 if self.idle_interval_ref.is_some() { 397 return; 398 } 399 400 if let Some(dur) = self.timeout { 401 let (tx, rx) = oneshot::channel(); 402 self.idle_interval_ref = Some(tx); 403 (dur, rx) 404 } else { 405 return; 406 } 407 }; 408 409 let interval = IdleTask { 410 interval: tokio::time::interval(dur), 411 pool: WeakOpt::downgrade(pool_ref), 412 pool_drop_notifier: rx, 413 }; 414 415 self.exec.execute(interval); 416 } 417 } 418 419 impl<T> PoolInner<T> { 420 /// Any `FutureResponse`s that were created will have made a `Checkout`, 421 /// and possibly inserted into the pool that it is waiting for an idle 422 /// connection. If a user ever dropped that future, we need to clean out 423 /// those parked senders. clean_waiters(&mut self, key: &Key)424 fn clean_waiters(&mut self, key: &Key) { 425 let mut remove_waiters = false; 426 if let Some(waiters) = self.waiters.get_mut(key) { 427 waiters.retain(|tx| !tx.is_canceled()); 428 remove_waiters = waiters.is_empty(); 429 } 430 if remove_waiters { 431 self.waiters.remove(key); 432 } 433 } 434 } 435 436 #[cfg(feature = "runtime")] 437 impl<T: Poolable> PoolInner<T> { 438 /// This should *only* be called by the IdleTask clear_expired(&mut self)439 fn clear_expired(&mut self) { 440 let dur = self.timeout.expect("interval assumes timeout"); 441 442 let now = Instant::now(); 443 //self.last_idle_check_at = now; 444 445 self.idle.retain(|key, values| { 446 values.retain(|entry| { 447 if !entry.value.is_open() { 448 trace!("idle interval evicting closed for {:?}", key); 449 return false; 450 } 451 if now - entry.idle_at > dur { 452 trace!("idle interval evicting expired for {:?}", key); 453 return false; 454 } 455 456 // Otherwise, keep this value... 457 true 458 }); 459 460 // returning false evicts this key/val 461 !values.is_empty() 462 }); 463 } 464 } 465 466 impl<T> Clone for Pool<T> { clone(&self) -> Pool<T>467 fn clone(&self) -> Pool<T> { 468 Pool { 469 inner: self.inner.clone(), 470 } 471 } 472 } 473 474 /// A wrapped poolable value that tries to reinsert to the Pool on Drop. 475 // Note: The bounds `T: Poolable` is needed for the Drop impl. 476 pub(super) struct Pooled<T: Poolable> { 477 value: Option<T>, 478 is_reused: bool, 479 key: Key, 480 pool: WeakOpt<Mutex<PoolInner<T>>>, 481 } 482 483 impl<T: Poolable> Pooled<T> { is_reused(&self) -> bool484 pub fn is_reused(&self) -> bool { 485 self.is_reused 486 } 487 is_pool_enabled(&self) -> bool488 pub fn is_pool_enabled(&self) -> bool { 489 self.pool.0.is_some() 490 } 491 as_ref(&self) -> &T492 fn as_ref(&self) -> &T { 493 self.value.as_ref().expect("not dropped") 494 } 495 as_mut(&mut self) -> &mut T496 fn as_mut(&mut self) -> &mut T { 497 self.value.as_mut().expect("not dropped") 498 } 499 } 500 501 impl<T: Poolable> Deref for Pooled<T> { 502 type Target = T; deref(&self) -> &T503 fn deref(&self) -> &T { 504 self.as_ref() 505 } 506 } 507 508 impl<T: Poolable> DerefMut for Pooled<T> { deref_mut(&mut self) -> &mut T509 fn deref_mut(&mut self) -> &mut T { 510 self.as_mut() 511 } 512 } 513 514 impl<T: Poolable> Drop for Pooled<T> { drop(&mut self)515 fn drop(&mut self) { 516 if let Some(value) = self.value.take() { 517 if !value.is_open() { 518 // If we *already* know the connection is done here, 519 // it shouldn't be re-inserted back into the pool. 520 return; 521 } 522 523 if let Some(pool) = self.pool.upgrade() { 524 if let Ok(mut inner) = pool.lock() { 525 inner.put(self.key.clone(), value, &pool); 526 } 527 } else if !value.can_share() { 528 trace!("pool dropped, dropping pooled ({:?})", self.key); 529 } 530 // Ver::Http2 is already in the Pool (or dead), so we wouldn't 531 // have an actual reference to the Pool. 532 } 533 } 534 } 535 536 impl<T: Poolable> fmt::Debug for Pooled<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 538 f.debug_struct("Pooled").field("key", &self.key).finish() 539 } 540 } 541 542 struct Idle<T> { 543 idle_at: Instant, 544 value: T, 545 } 546 547 // FIXME: allow() required due to `impl Trait` leaking types to this lint 548 #[allow(missing_debug_implementations)] 549 pub(super) struct Checkout<T> { 550 key: Key, 551 pool: Pool<T>, 552 waiter: Option<oneshot::Receiver<T>>, 553 } 554 555 impl<T: Poolable> Checkout<T> { poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<crate::Result<Pooled<T>>>>556 fn poll_waiter( 557 &mut self, 558 cx: &mut task::Context<'_>, 559 ) -> Poll<Option<crate::Result<Pooled<T>>>> { 560 static CANCELED: &str = "pool checkout failed"; 561 if let Some(mut rx) = self.waiter.take() { 562 match Pin::new(&mut rx).poll(cx) { 563 Poll::Ready(Ok(value)) => { 564 if value.is_open() { 565 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) 566 } else { 567 Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED)))) 568 } 569 } 570 Poll::Pending => { 571 self.waiter = Some(rx); 572 Poll::Pending 573 } 574 Poll::Ready(Err(_canceled)) => { 575 Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED)))) 576 } 577 } 578 } else { 579 Poll::Ready(None) 580 } 581 } 582 checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>>583 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> { 584 let entry = { 585 let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); 586 let expiration = Expiration::new(inner.timeout); 587 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { 588 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); 589 // A block to end the mutable borrow on list, 590 // so the map below can check is_empty() 591 { 592 let popper = IdlePopper { 593 key: &self.key, 594 list, 595 }; 596 popper.pop(&expiration) 597 } 598 .map(|e| (e, list.is_empty())) 599 }); 600 601 let (entry, empty) = if let Some((e, empty)) = maybe_entry { 602 (Some(e), empty) 603 } else { 604 // No entry found means nuke the list for sure. 605 (None, true) 606 }; 607 if empty { 608 //TODO: This could be done with the HashMap::entry API instead. 609 inner.idle.remove(&self.key); 610 } 611 612 if entry.is_none() && self.waiter.is_none() { 613 let (tx, mut rx) = oneshot::channel(); 614 trace!("checkout waiting for idle connection: {:?}", self.key); 615 inner 616 .waiters 617 .entry(self.key.clone()) 618 .or_insert_with(VecDeque::new) 619 .push_back(tx); 620 621 // register the waker with this oneshot 622 assert!(Pin::new(&mut rx).poll(cx).is_pending()); 623 self.waiter = Some(rx); 624 } 625 626 entry 627 }; 628 629 entry.map(|e| self.pool.reuse(&self.key, e.value)) 630 } 631 } 632 633 impl<T: Poolable> Future for Checkout<T> { 634 type Output = crate::Result<Pooled<T>>; 635 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>636 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 637 if let Some(pooled) = ready!(self.poll_waiter(cx)?) { 638 return Poll::Ready(Ok(pooled)); 639 } 640 641 if let Some(pooled) = self.checkout(cx) { 642 Poll::Ready(Ok(pooled)) 643 } else if !self.pool.is_enabled() { 644 Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) 645 } else { 646 // There's a new waiter, already registered in self.checkout() 647 debug_assert!(self.waiter.is_some()); 648 Poll::Pending 649 } 650 } 651 } 652 653 impl<T> Drop for Checkout<T> { drop(&mut self)654 fn drop(&mut self) { 655 if self.waiter.take().is_some() { 656 trace!("checkout dropped for {:?}", self.key); 657 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { 658 inner.clean_waiters(&self.key); 659 } 660 } 661 } 662 } 663 664 // FIXME: allow() required due to `impl Trait` leaking types to this lint 665 #[allow(missing_debug_implementations)] 666 pub(super) struct Connecting<T: Poolable> { 667 key: Key, 668 pool: WeakOpt<Mutex<PoolInner<T>>>, 669 } 670 671 impl<T: Poolable> Connecting<T> { alpn_h2(self, pool: &Pool<T>) -> Option<Self>672 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { 673 debug_assert!( 674 self.pool.0.is_none(), 675 "Connecting::alpn_h2 but already Http2" 676 ); 677 678 pool.connecting(&self.key, Ver::Http2) 679 } 680 } 681 682 impl<T: Poolable> Drop for Connecting<T> { drop(&mut self)683 fn drop(&mut self) { 684 if let Some(pool) = self.pool.upgrade() { 685 // No need to panic on drop, that could abort! 686 if let Ok(mut inner) = pool.lock() { 687 inner.connected(&self.key); 688 } 689 } 690 } 691 } 692 693 struct Expiration(Option<Duration>); 694 695 impl Expiration { new(dur: Option<Duration>) -> Expiration696 fn new(dur: Option<Duration>) -> Expiration { 697 Expiration(dur) 698 } 699 expires(&self, instant: Instant) -> bool700 fn expires(&self, instant: Instant) -> bool { 701 match self.0 { 702 Some(timeout) => instant.elapsed() > timeout, 703 None => false, 704 } 705 } 706 } 707 708 #[cfg(feature = "runtime")] 709 struct IdleTask<T> { 710 interval: Interval, 711 pool: WeakOpt<Mutex<PoolInner<T>>>, 712 // This allows the IdleTask to be notified as soon as the entire 713 // Pool is fully dropped, and shutdown. This channel is never sent on, 714 // but Err(Canceled) will be received when the Pool is dropped. 715 pool_drop_notifier: oneshot::Receiver<crate::common::Never>, 716 } 717 718 #[cfg(feature = "runtime")] 719 impl<T: Poolable + 'static> Future for IdleTask<T> { 720 type Output = (); 721 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>722 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 723 loop { 724 match Pin::new(&mut self.pool_drop_notifier).poll(cx) { 725 Poll::Ready(Ok(n)) => match n {}, 726 Poll::Pending => (), 727 Poll::Ready(Err(_canceled)) => { 728 trace!("pool closed, canceling idle interval"); 729 return Poll::Ready(()); 730 } 731 } 732 733 ready!(self.interval.poll_tick(cx)); 734 735 if let Some(inner) = self.pool.upgrade() { 736 if let Ok(mut inner) = inner.lock() { 737 trace!("idle interval checking for expired"); 738 inner.clear_expired(); 739 continue; 740 } 741 } 742 return Poll::Ready(()); 743 } 744 } 745 } 746 747 impl<T> WeakOpt<T> { none() -> Self748 fn none() -> Self { 749 WeakOpt(None) 750 } 751 downgrade(arc: &Arc<T>) -> Self752 fn downgrade(arc: &Arc<T>) -> Self { 753 WeakOpt(Some(Arc::downgrade(arc))) 754 } 755 upgrade(&self) -> Option<Arc<T>>756 fn upgrade(&self) -> Option<Arc<T>> { 757 self.0.as_ref().and_then(Weak::upgrade) 758 } 759 } 760 761 #[cfg(test)] 762 mod tests { 763 use std::task::Poll; 764 use std::time::Duration; 765 766 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; 767 use crate::common::{task, Exec, Future, Pin}; 768 769 /// Test unique reservations. 770 #[derive(Debug, PartialEq, Eq)] 771 struct Uniq<T>(T); 772 773 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { is_open(&self) -> bool774 fn is_open(&self) -> bool { 775 true 776 } 777 reserve(self) -> Reservation<Self>778 fn reserve(self) -> Reservation<Self> { 779 Reservation::Unique(self) 780 } 781 can_share(&self) -> bool782 fn can_share(&self) -> bool { 783 false 784 } 785 } 786 c<T: Poolable>(key: Key) -> Connecting<T>787 fn c<T: Poolable>(key: Key) -> Connecting<T> { 788 Connecting { 789 key, 790 pool: WeakOpt::none(), 791 } 792 } 793 host_key(s: &str) -> Key794 fn host_key(s: &str) -> Key { 795 (http::uri::Scheme::HTTP, s.parse().expect("host key")) 796 } 797 pool_no_timer<T>() -> Pool<T>798 fn pool_no_timer<T>() -> Pool<T> { 799 pool_max_idle_no_timer(::std::usize::MAX) 800 } 801 pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>802 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { 803 let pool = Pool::new( 804 super::Config { 805 idle_timeout: Some(Duration::from_millis(100)), 806 max_idle_per_host: max_idle, 807 }, 808 &Exec::Default, 809 ); 810 pool.no_timer(); 811 pool 812 } 813 814 #[tokio::test] test_pool_checkout_smoke()815 async fn test_pool_checkout_smoke() { 816 let pool = pool_no_timer(); 817 let key = host_key("foo"); 818 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 819 820 drop(pooled); 821 822 match pool.checkout(key).await { 823 Ok(pooled) => assert_eq!(*pooled, Uniq(41)), 824 Err(_) => panic!("not ready"), 825 }; 826 } 827 828 /// Helper to check if the future is ready after polling once. 829 struct PollOnce<'a, F>(&'a mut F); 830 831 impl<F, T, U> Future for PollOnce<'_, F> 832 where 833 F: Future<Output = Result<T, U>> + Unpin, 834 { 835 type Output = Option<()>; 836 poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>837 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 838 match Pin::new(&mut self.0).poll(cx) { 839 Poll::Ready(Ok(_)) => Poll::Ready(Some(())), 840 Poll::Ready(Err(_)) => Poll::Ready(Some(())), 841 Poll::Pending => Poll::Ready(None), 842 } 843 } 844 } 845 846 #[tokio::test] test_pool_checkout_returns_none_if_expired()847 async fn test_pool_checkout_returns_none_if_expired() { 848 let pool = pool_no_timer(); 849 let key = host_key("foo"); 850 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 851 852 drop(pooled); 853 tokio::time::delay_for(pool.locked().timeout.unwrap()).await; 854 let mut checkout = pool.checkout(key); 855 let poll_once = PollOnce(&mut checkout); 856 let is_not_ready = poll_once.await.is_none(); 857 assert!(is_not_ready); 858 } 859 860 #[cfg(feature = "runtime")] 861 #[tokio::test] test_pool_checkout_removes_expired()862 async fn test_pool_checkout_removes_expired() { 863 let pool = pool_no_timer(); 864 let key = host_key("foo"); 865 866 pool.pooled(c(key.clone()), Uniq(41)); 867 pool.pooled(c(key.clone()), Uniq(5)); 868 pool.pooled(c(key.clone()), Uniq(99)); 869 870 assert_eq!( 871 pool.locked().idle.get(&key).map(|entries| entries.len()), 872 Some(3) 873 ); 874 tokio::time::delay_for(pool.locked().timeout.unwrap()).await; 875 876 let mut checkout = pool.checkout(key.clone()); 877 let poll_once = PollOnce(&mut checkout); 878 // checkout.await should clean out the expired 879 poll_once.await; 880 assert!(pool.locked().idle.get(&key).is_none()); 881 } 882 883 #[test] test_pool_max_idle_per_host()884 fn test_pool_max_idle_per_host() { 885 let pool = pool_max_idle_no_timer(2); 886 let key = host_key("foo"); 887 888 pool.pooled(c(key.clone()), Uniq(41)); 889 pool.pooled(c(key.clone()), Uniq(5)); 890 pool.pooled(c(key.clone()), Uniq(99)); 891 892 // pooled and dropped 3, max_idle should only allow 2 893 assert_eq!( 894 pool.locked().idle.get(&key).map(|entries| entries.len()), 895 Some(2) 896 ); 897 } 898 899 #[cfg(feature = "runtime")] 900 #[tokio::test] test_pool_timer_removes_expired()901 async fn test_pool_timer_removes_expired() { 902 let _ = pretty_env_logger::try_init(); 903 tokio::time::pause(); 904 905 let pool = Pool::new( 906 super::Config { 907 idle_timeout: Some(Duration::from_millis(10)), 908 max_idle_per_host: std::usize::MAX, 909 }, 910 &Exec::Default, 911 ); 912 913 let key = host_key("foo"); 914 915 pool.pooled(c(key.clone()), Uniq(41)); 916 pool.pooled(c(key.clone()), Uniq(5)); 917 pool.pooled(c(key.clone()), Uniq(99)); 918 919 assert_eq!( 920 pool.locked().idle.get(&key).map(|entries| entries.len()), 921 Some(3) 922 ); 923 924 // Let the timer tick passed the expiration... 925 tokio::time::advance(Duration::from_millis(30)).await; 926 // Yield so the Interval can reap... 927 tokio::task::yield_now().await; 928 929 assert!(pool.locked().idle.get(&key).is_none()); 930 } 931 932 #[tokio::test] test_pool_checkout_task_unparked()933 async fn test_pool_checkout_task_unparked() { 934 use futures_util::future::join; 935 use futures_util::FutureExt; 936 937 let pool = pool_no_timer(); 938 let key = host_key("foo"); 939 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 940 941 let checkout = join(pool.checkout(key), async { 942 // the checkout future will park first, 943 // and then this lazy future will be polled, which will insert 944 // the pooled back into the pool 945 // 946 // this test makes sure that doing so will unpark the checkout 947 drop(pooled); 948 }) 949 .map(|(entry, _)| entry); 950 951 assert_eq!(*checkout.await.unwrap(), Uniq(41)); 952 } 953 954 #[tokio::test] test_pool_checkout_drop_cleans_up_waiters()955 async fn test_pool_checkout_drop_cleans_up_waiters() { 956 let pool = pool_no_timer::<Uniq<i32>>(); 957 let key = host_key("foo"); 958 959 let mut checkout1 = pool.checkout(key.clone()); 960 let mut checkout2 = pool.checkout(key.clone()); 961 962 let poll_once1 = PollOnce(&mut checkout1); 963 let poll_once2 = PollOnce(&mut checkout2); 964 965 // first poll needed to get into Pool's parked 966 poll_once1.await; 967 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 968 poll_once2.await; 969 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); 970 971 // on drop, clean up Pool 972 drop(checkout1); 973 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 974 975 drop(checkout2); 976 assert!(pool.locked().waiters.get(&key).is_none()); 977 } 978 979 #[derive(Debug)] 980 struct CanClose { 981 val: i32, 982 closed: bool, 983 } 984 985 impl Poolable for CanClose { is_open(&self) -> bool986 fn is_open(&self) -> bool { 987 !self.closed 988 } 989 reserve(self) -> Reservation<Self>990 fn reserve(self) -> Reservation<Self> { 991 Reservation::Unique(self) 992 } 993 can_share(&self) -> bool994 fn can_share(&self) -> bool { 995 false 996 } 997 } 998 999 #[test] pooled_drop_if_closed_doesnt_reinsert()1000 fn pooled_drop_if_closed_doesnt_reinsert() { 1001 let pool = pool_no_timer(); 1002 let key = host_key("foo"); 1003 pool.pooled( 1004 c(key.clone()), 1005 CanClose { 1006 val: 57, 1007 closed: true, 1008 }, 1009 ); 1010 1011 assert!(!pool.locked().idle.contains_key(&key)); 1012 } 1013 } 1014