1 use std::collections::{HashMap, HashSet, VecDeque}; 2 use std::fmt; 3 use std::ops::{Deref, DerefMut}; 4 use std::sync::{Arc, Mutex, Weak}; 5 use std::time::{Duration, Instant}; 6 7 use futures::{Future, Async, Poll}; 8 use futures::sync::oneshot; 9 #[cfg(feature = "runtime")] 10 use tokio_timer::Interval; 11 12 use common::Exec; 13 use super::Ver; 14 15 // FIXME: allow() required due to `impl Trait` leaking types to this lint 16 #[allow(missing_debug_implementations)] 17 pub(super) struct Pool<T> { 18 // If the pool is disabled, this is None. 19 inner: Option<Arc<Mutex<PoolInner<T>>>>, 20 } 21 22 // Before using a pooled connection, make sure the sender is not dead. 23 // 24 // This is a trait to allow the `client::pool::tests` to work for `i32`. 25 // 26 // See https://github.com/hyperium/hyper/issues/1429 27 pub(super) trait Poolable: Send + Sized + 'static { is_open(&self) -> bool28 fn is_open(&self) -> bool; 29 /// Reserve this connection. 30 /// 31 /// Allows for HTTP/2 to return a shared reservation. reserve(self) -> Reservation<Self>32 fn reserve(self) -> Reservation<Self>; can_share(&self) -> bool33 fn can_share(&self) -> bool; 34 } 35 36 /// When checking out a pooled connection, it might be that the connection 37 /// only supports a single reservation, or it might be usable for many. 38 /// 39 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be 40 /// used for multiple requests. 41 // FIXME: allow() required due to `impl Trait` leaking types to this lint 42 #[allow(missing_debug_implementations)] 43 pub(super) enum Reservation<T> { 44 /// This connection could be used multiple times, the first one will be 45 /// reinserted into the `idle` pool, and the second will be given to 46 /// the `Checkout`. 47 Shared(T, T), 48 /// This connection requires unique access. It will be returned after 49 /// use is complete. 50 Unique(T), 51 } 52 53 /// Simple type alias in case the key type needs to be adjusted. 54 pub(super) type Key = Arc<String>; 55 56 struct PoolInner<T> { 57 // A flag that a connection is being estabilished, and the connection 58 // should be shared. This prevents making multiple HTTP/2 connections 59 // to the same host. 60 connecting: HashSet<Key>, 61 // These are internal Conns sitting in the event loop in the KeepAlive 62 // state, waiting to receive a new Request to send on the socket. 63 idle: HashMap<Key, Vec<Idle<T>>>, 64 max_idle_per_host: usize, 65 // These are outstanding Checkouts that are waiting for a socket to be 66 // able to send a Request one. This is used when "racing" for a new 67 // connection. 68 // 69 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait 70 // for the Pool to receive an idle Conn. When a Conn becomes idle, 71 // this list is checked for any parked Checkouts, and tries to notify 72 // them that the Conn could be used instead of waiting for a brand new 73 // connection. 74 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, 75 // A oneshot channel is used to allow the interval to be notified when 76 // the Pool completely drops. That way, the interval can cancel immediately. 77 #[cfg(feature = "runtime")] 78 idle_interval_ref: Option<oneshot::Sender<::common::Never>>, 79 #[cfg(feature = "runtime")] 80 exec: Exec, 81 timeout: Option<Duration>, 82 } 83 84 // This is because `Weak::new()` *allocates* space for `T`, even if it 85 // doesn't need it! 86 struct WeakOpt<T>(Option<Weak<T>>); 87 88 #[derive(Clone, Copy, Debug)] 89 pub(super) struct Config { 90 pub(super) enabled: bool, 91 pub(super) keep_alive_timeout: Option<Duration>, 92 pub(super) max_idle_per_host: usize, 93 } 94 95 impl<T> Pool<T> { new(config: Config, __exec: &Exec) -> Pool<T>96 pub fn new(config: Config, __exec: &Exec) -> Pool<T> { 97 let inner = if config.enabled { 98 Some(Arc::new(Mutex::new(PoolInner { 99 connecting: HashSet::new(), 100 idle: HashMap::new(), 101 #[cfg(feature = "runtime")] 102 idle_interval_ref: None, 103 max_idle_per_host: config.max_idle_per_host, 104 waiters: HashMap::new(), 105 #[cfg(feature = "runtime")] 106 exec: __exec.clone(), 107 timeout: config.keep_alive_timeout, 108 }))) 109 } else { 110 None 111 }; 112 113 Pool { 114 inner, 115 } 116 } 117 is_enabled(&self) -> bool118 fn is_enabled(&self) -> bool { 119 self.inner.is_some() 120 } 121 122 #[cfg(test)] no_timer(&self)123 pub(super) fn no_timer(&self) { 124 // Prevent an actual interval from being created for this pool... 125 #[cfg(feature = "runtime")] 126 { 127 let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); 128 assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); 129 let (tx, _) = oneshot::channel(); 130 inner.idle_interval_ref = Some(tx); 131 } 132 } 133 } 134 135 impl<T: Poolable> Pool<T> { 136 /// Returns a `Checkout` which is a future that resolves if an idle 137 /// connection becomes available. checkout(&self, key: Key) -> Checkout<T>138 pub fn checkout(&self, key: Key) -> Checkout<T> { 139 Checkout { 140 key, 141 pool: self.clone(), 142 waiter: None, 143 } 144 } 145 146 /// Ensure that there is only ever 1 connecting task for HTTP/2 147 /// connections. This does nothing for HTTP/1. connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>148 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { 149 if ver == Ver::Http2 { 150 if let Some(ref enabled) = self.inner { 151 let mut inner = enabled.lock().unwrap(); 152 return if inner.connecting.insert(key.clone()) { 153 let connecting = Connecting { 154 key: key.clone(), 155 pool: WeakOpt::downgrade(enabled), 156 }; 157 Some(connecting) 158 } else { 159 trace!("HTTP/2 connecting already in progress for {:?}", key); 160 None 161 }; 162 } 163 } 164 165 // else 166 Some(Connecting { 167 key: key.clone(), 168 // in HTTP/1's case, there is never a lock, so we don't 169 // need to do anything in Drop. 170 pool: WeakOpt::none(), 171 }) 172 } 173 174 #[cfg(test)] locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>>175 fn locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>> { 176 self 177 .inner 178 .as_ref() 179 .expect("enabled") 180 .lock() 181 .expect("lock") 182 } 183 184 #[cfg(feature = "runtime")] 185 #[cfg(test)] h1_key(&self, s: &str) -> Key186 pub(super) fn h1_key(&self, s: &str) -> Key { 187 Arc::new(s.to_string()) 188 } 189 190 #[cfg(feature = "runtime")] 191 #[cfg(test)] idle_count(&self, key: &Key) -> usize192 pub(super) fn idle_count(&self, key: &Key) -> usize { 193 self 194 .locked() 195 .idle 196 .get(key) 197 .map(|list| list.len()) 198 .unwrap_or(0) 199 } 200 pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T>201 pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { 202 let (value, pool_ref) = if let Some(ref enabled) = self.inner { 203 match value.reserve() { 204 Reservation::Shared(to_insert, to_return) => { 205 let mut inner = enabled.lock().unwrap(); 206 inner.put(connecting.key.clone(), to_insert, enabled); 207 // Do this here instead of Drop for Connecting because we 208 // already have a lock, no need to lock the mutex twice. 209 inner.connected(&connecting.key); 210 // prevent the Drop of Connecting from repeating inner.connected() 211 connecting.pool = WeakOpt::none(); 212 213 // Shared reservations don't need a reference to the pool, 214 // since the pool always keeps a copy. 215 (to_return, WeakOpt::none()) 216 }, 217 Reservation::Unique(value) => { 218 // Unique reservations must take a reference to the pool 219 // since they hope to reinsert once the reservation is 220 // completed 221 (value, WeakOpt::downgrade(enabled)) 222 }, 223 } 224 } else { 225 // If pool is not enabled, skip all the things... 226 227 // The Connecting should have had no pool ref 228 debug_assert!(connecting.pool.upgrade().is_none()); 229 230 (value, WeakOpt::none()) 231 }; 232 Pooled { 233 key: connecting.key.clone(), 234 is_reused: false, 235 pool: pool_ref, 236 value: Some(value) 237 } 238 } 239 reuse(&self, key: &Key, value: T) -> Pooled<T>240 fn reuse(&self, key: &Key, value: T) -> Pooled<T> { 241 debug!("reuse idle connection for {:?}", key); 242 // TODO: unhack this 243 // In Pool::pooled(), which is used for inserting brand new connections, 244 // there's some code that adjusts the pool reference taken depending 245 // on if the Reservation can be shared or is unique. By the time 246 // reuse() is called, the reservation has already been made, and 247 // we just have the final value, without knowledge of if this is 248 // unique or shared. So, the hack is to just assume Ver::Http2 means 249 // shared... :( 250 let mut pool_ref = WeakOpt::none(); 251 if !value.can_share() { 252 if let Some(ref enabled) = self.inner { 253 pool_ref = WeakOpt::downgrade(enabled); 254 } 255 } 256 257 Pooled { 258 is_reused: true, 259 key: key.clone(), 260 pool: pool_ref, 261 value: Some(value), 262 } 263 } 264 } 265 266 /// Pop off this list, looking for a usable connection that hasn't expired. 267 struct IdlePopper<'a, T: 'a> { 268 key: &'a Key, 269 list: &'a mut Vec<Idle<T>>, 270 } 271 272 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { pop(self, expiration: &Expiration) -> Option<Idle<T>>273 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { 274 while let Some(entry) = self.list.pop() { 275 // If the connection has been closed, or is older than our idle 276 // timeout, simply drop it and keep looking... 277 if !entry.value.is_open() { 278 trace!("removing closed connection for {:?}", self.key); 279 continue; 280 } 281 // TODO: Actually, since the `idle` list is pushed to the end always, 282 // that would imply that if *this* entry is expired, then anything 283 // "earlier" in the list would *have* to be expired also... Right? 284 // 285 // In that case, we could just break out of the loop and drop the 286 // whole list... 287 if expiration.expires(entry.idle_at) { 288 trace!("removing expired connection for {:?}", self.key); 289 continue; 290 } 291 292 let value = match entry.value.reserve() { 293 Reservation::Shared(to_reinsert, to_checkout) => { 294 self.list.push(Idle { 295 idle_at: Instant::now(), 296 value: to_reinsert, 297 }); 298 to_checkout 299 }, 300 Reservation::Unique(unique) => { 301 unique 302 } 303 }; 304 305 return Some(Idle { 306 idle_at: entry.idle_at, 307 value, 308 }); 309 } 310 311 None 312 } 313 } 314 315 impl<T: Poolable> PoolInner<T> { put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)316 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { 317 if value.can_share() && self.idle.contains_key(&key) { 318 trace!("put; existing idle HTTP/2 connection for {:?}", key); 319 return; 320 } 321 trace!("put; add idle connection for {:?}", key); 322 let mut remove_waiters = false; 323 let mut value = Some(value); 324 if let Some(waiters) = self.waiters.get_mut(&key) { 325 while let Some(tx) = waiters.pop_front() { 326 if !tx.is_canceled() { 327 let reserved = value.take().expect("value already sent"); 328 let reserved = match reserved.reserve() { 329 Reservation::Shared(to_keep, to_send) => { 330 value = Some(to_keep); 331 to_send 332 }, 333 Reservation::Unique(uniq) => uniq, 334 }; 335 match tx.send(reserved) { 336 Ok(()) => { 337 if value.is_none() { 338 break; 339 } else { 340 continue; 341 } 342 }, 343 Err(e) => { 344 value = Some(e); 345 } 346 } 347 } 348 349 trace!("put; removing canceled waiter for {:?}", key); 350 } 351 remove_waiters = waiters.is_empty(); 352 } 353 if remove_waiters { 354 self.waiters.remove(&key); 355 } 356 357 match value { 358 Some(value) => { 359 // borrow-check scope... 360 { 361 let idle_list = self 362 .idle 363 .entry(key.clone()) 364 .or_insert(Vec::new()); 365 if self.max_idle_per_host <= idle_list.len() { 366 trace!("max idle per host for {:?}, dropping connection", key); 367 return; 368 } 369 370 debug!("pooling idle connection for {:?}", key); 371 idle_list.push(Idle { 372 value: value, 373 idle_at: Instant::now(), 374 }); 375 } 376 377 #[cfg(feature = "runtime")] 378 { 379 self.spawn_idle_interval(__pool_ref); 380 } 381 } 382 None => trace!("put; found waiter for {:?}", key), 383 } 384 } 385 386 /// A `Connecting` task is complete. Not necessarily successfully, 387 /// but the lock is going away, so clean up. connected(&mut self, key: &Key)388 fn connected(&mut self, key: &Key) { 389 let existed = self.connecting.remove(key); 390 debug_assert!( 391 existed, 392 "Connecting dropped, key not in pool.connecting" 393 ); 394 // cancel any waiters. if there are any, it's because 395 // this Connecting task didn't complete successfully. 396 // those waiters would never receive a connection. 397 self.waiters.remove(key); 398 } 399 400 #[cfg(feature = "runtime")] spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)401 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { 402 let (dur, rx) = { 403 if self.idle_interval_ref.is_some() { 404 return; 405 } 406 407 if let Some(dur) = self.timeout { 408 let (tx, rx) = oneshot::channel(); 409 self.idle_interval_ref = Some(tx); 410 (dur, rx) 411 } else { 412 return 413 } 414 }; 415 416 let start = Instant::now() + dur; 417 418 let interval = IdleInterval { 419 interval: Interval::new(start, dur), 420 pool: WeakOpt::downgrade(pool_ref), 421 pool_drop_notifier: rx, 422 }; 423 424 if let Err(err) = self.exec.execute(interval) { 425 // This task isn't critical, so simply log and ignore. 426 warn!("error spawning connection pool idle interval: {}", err); 427 } 428 } 429 } 430 431 impl<T> PoolInner<T> { 432 /// Any `FutureResponse`s that were created will have made a `Checkout`, 433 /// and possibly inserted into the pool that it is waiting for an idle 434 /// connection. If a user ever dropped that future, we need to clean out 435 /// those parked senders. clean_waiters(&mut self, key: &Key)436 fn clean_waiters(&mut self, key: &Key) { 437 let mut remove_waiters = false; 438 if let Some(waiters) = self.waiters.get_mut(key) { 439 waiters.retain(|tx| { 440 !tx.is_canceled() 441 }); 442 remove_waiters = waiters.is_empty(); 443 } 444 if remove_waiters { 445 self.waiters.remove(key); 446 } 447 } 448 } 449 450 #[cfg(feature = "runtime")] 451 impl<T: Poolable> PoolInner<T> { 452 /// This should *only* be called by the IdleInterval. clear_expired(&mut self)453 fn clear_expired(&mut self) { 454 let dur = self.timeout.expect("interval assumes timeout"); 455 456 let now = Instant::now(); 457 //self.last_idle_check_at = now; 458 459 self.idle.retain(|key, values| { 460 values.retain(|entry| { 461 if !entry.value.is_open() { 462 trace!("idle interval evicting closed for {:?}", key); 463 return false; 464 } 465 if now - entry.idle_at > dur { 466 trace!("idle interval evicting expired for {:?}", key); 467 return false; 468 } 469 470 // Otherwise, keep this value... 471 true 472 }); 473 474 // returning false evicts this key/val 475 !values.is_empty() 476 }); 477 } 478 } 479 480 impl<T> Clone for Pool<T> { clone(&self) -> Pool<T>481 fn clone(&self) -> Pool<T> { 482 Pool { 483 inner: self.inner.clone(), 484 } 485 } 486 } 487 488 /// A wrapped poolable value that tries to reinsert to the Pool on Drop. 489 // Note: The bounds `T: Poolable` is needed for the Drop impl. 490 pub(super) struct Pooled<T: Poolable> { 491 value: Option<T>, 492 is_reused: bool, 493 key: Key, 494 pool: WeakOpt<Mutex<PoolInner<T>>>, 495 } 496 497 impl<T: Poolable> Pooled<T> { is_reused(&self) -> bool498 pub fn is_reused(&self) -> bool { 499 self.is_reused 500 } 501 is_pool_enabled(&self) -> bool502 pub fn is_pool_enabled(&self) -> bool { 503 self.pool.0.is_some() 504 } 505 as_ref(&self) -> &T506 fn as_ref(&self) -> &T { 507 self.value.as_ref().expect("not dropped") 508 } 509 as_mut(&mut self) -> &mut T510 fn as_mut(&mut self) -> &mut T { 511 self.value.as_mut().expect("not dropped") 512 } 513 } 514 515 impl<T: Poolable> Deref for Pooled<T> { 516 type Target = T; deref(&self) -> &T517 fn deref(&self) -> &T { 518 self.as_ref() 519 } 520 } 521 522 impl<T: Poolable> DerefMut for Pooled<T> { deref_mut(&mut self) -> &mut T523 fn deref_mut(&mut self) -> &mut T { 524 self.as_mut() 525 } 526 } 527 528 impl<T: Poolable> Drop for Pooled<T> { drop(&mut self)529 fn drop(&mut self) { 530 if let Some(value) = self.value.take() { 531 if !value.is_open() { 532 // If we *already* know the connection is done here, 533 // it shouldn't be re-inserted back into the pool. 534 return; 535 } 536 537 if let Some(pool) = self.pool.upgrade() { 538 if let Ok(mut inner) = pool.lock() { 539 inner.put(self.key.clone(), value, &pool); 540 } 541 } else if !value.can_share() { 542 trace!("pool dropped, dropping pooled ({:?})", self.key); 543 } 544 // Ver::Http2 is already in the Pool (or dead), so we wouldn't 545 // have an actual reference to the Pool. 546 } 547 } 548 } 549 550 impl<T: Poolable> fmt::Debug for Pooled<T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result551 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 552 f.debug_struct("Pooled") 553 .field("key", &self.key) 554 .finish() 555 } 556 } 557 558 struct Idle<T> { 559 idle_at: Instant, 560 value: T, 561 } 562 563 // FIXME: allow() required due to `impl Trait` leaking types to this lint 564 #[allow(missing_debug_implementations)] 565 pub(super) struct Checkout<T> { 566 key: Key, 567 pool: Pool<T>, 568 waiter: Option<oneshot::Receiver<T>>, 569 } 570 571 impl<T: Poolable> Checkout<T> { poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error>572 fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error> { 573 static CANCELED: &str = "pool checkout failed"; 574 if let Some(mut rx) = self.waiter.take() { 575 match rx.poll() { 576 Ok(Async::Ready(value)) => { 577 if value.is_open() { 578 Ok(Async::Ready(Some(self.pool.reuse(&self.key, value)))) 579 } else { 580 Err(::Error::new_canceled().with(CANCELED)) 581 } 582 }, 583 Ok(Async::NotReady) => { 584 self.waiter = Some(rx); 585 Ok(Async::NotReady) 586 }, 587 Err(_canceled) => Err(::Error::new_canceled().with(CANCELED)), 588 } 589 } else { 590 Ok(Async::Ready(None)) 591 } 592 } 593 checkout(&mut self) -> Option<Pooled<T>>594 fn checkout(&mut self) -> Option<Pooled<T>> { 595 let entry = { 596 let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); 597 let expiration = Expiration::new(inner.timeout); 598 let maybe_entry = inner.idle.get_mut(&self.key) 599 .and_then(|list| { 600 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); 601 // A block to end the mutable borrow on list, 602 // so the map below can check is_empty() 603 { 604 let popper = IdlePopper { 605 key: &self.key, 606 list, 607 }; 608 popper.pop(&expiration) 609 } 610 .map(|e| (e, list.is_empty())) 611 }); 612 613 let (entry, empty) = if let Some((e, empty)) = maybe_entry { 614 (Some(e), empty) 615 } else { 616 // No entry found means nuke the list for sure. 617 (None, true) 618 }; 619 if empty { 620 //TODO: This could be done with the HashMap::entry API instead. 621 inner.idle.remove(&self.key); 622 } 623 624 if entry.is_none() && self.waiter.is_none() { 625 let (tx, mut rx) = oneshot::channel(); 626 let _ = rx.poll(); // park this task 627 628 trace!("checkout waiting for idle connection: {:?}", self.key); 629 inner 630 .waiters 631 .entry(self.key.clone()) 632 .or_insert(VecDeque::new()) 633 .push_back(tx); 634 635 self.waiter = Some(rx); 636 } 637 638 entry 639 }; 640 641 entry.map(|e| self.pool.reuse(&self.key, e.value)) 642 } 643 } 644 645 impl<T: Poolable> Future for Checkout<T> { 646 type Item = Pooled<T>; 647 type Error = ::Error; 648 poll(&mut self) -> Poll<Self::Item, Self::Error>649 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 650 if let Some(pooled) = try_ready!(self.poll_waiter()) { 651 return Ok(Async::Ready(pooled)); 652 } 653 654 if let Some(pooled) = self.checkout() { 655 Ok(Async::Ready(pooled)) 656 } else if !self.pool.is_enabled() { 657 Err(::Error::new_canceled().with("pool is disabled")) 658 } else { 659 Ok(Async::NotReady) 660 } 661 } 662 } 663 664 impl<T> Drop for Checkout<T> { drop(&mut self)665 fn drop(&mut self) { 666 if self.waiter.take().is_some() { 667 trace!("checkout dropped for {:?}", self.key); 668 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { 669 inner.clean_waiters(&self.key); 670 } 671 } 672 } 673 } 674 675 // FIXME: allow() required due to `impl Trait` leaking types to this lint 676 #[allow(missing_debug_implementations)] 677 pub(super) struct Connecting<T: Poolable> { 678 key: Key, 679 pool: WeakOpt<Mutex<PoolInner<T>>>, 680 } 681 682 impl<T: Poolable> Connecting<T> { alpn_h2(self, pool: &Pool<T>) -> Option<Self>683 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { 684 debug_assert!( 685 self.pool.0.is_none(), 686 "Connecting::alpn_h2 but already Http2" 687 ); 688 689 pool.connecting(&self.key, Ver::Http2) 690 } 691 } 692 693 impl<T: Poolable> Drop for Connecting<T> { drop(&mut self)694 fn drop(&mut self) { 695 if let Some(pool) = self.pool.upgrade() { 696 // No need to panic on drop, that could abort! 697 if let Ok(mut inner) = pool.lock() { 698 inner.connected(&self.key); 699 } 700 } 701 } 702 } 703 704 struct Expiration(Option<Duration>); 705 706 impl Expiration { new(dur: Option<Duration>) -> Expiration707 fn new(dur: Option<Duration>) -> Expiration { 708 Expiration(dur) 709 } 710 expires(&self, instant: Instant) -> bool711 fn expires(&self, instant: Instant) -> bool { 712 match self.0 { 713 Some(timeout) => instant.elapsed() > timeout, 714 None => false, 715 } 716 } 717 } 718 719 #[cfg(feature = "runtime")] 720 struct IdleInterval<T> { 721 interval: Interval, 722 pool: WeakOpt<Mutex<PoolInner<T>>>, 723 // This allows the IdleInterval to be notified as soon as the entire 724 // Pool is fully dropped, and shutdown. This channel is never sent on, 725 // but Err(Canceled) will be received when the Pool is dropped. 726 pool_drop_notifier: oneshot::Receiver<::common::Never>, 727 } 728 729 #[cfg(feature = "runtime")] 730 impl<T: Poolable + 'static> Future for IdleInterval<T> { 731 type Item = (); 732 type Error = (); 733 poll(&mut self) -> Poll<Self::Item, Self::Error>734 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 735 // Interval is a Stream 736 use futures::Stream; 737 738 loop { 739 match self.pool_drop_notifier.poll() { 740 Ok(Async::Ready(n)) => match n {}, 741 Ok(Async::NotReady) => (), 742 Err(_canceled) => { 743 trace!("pool closed, canceling idle interval"); 744 return Ok(Async::Ready(())); 745 } 746 } 747 748 try_ready!(self.interval.poll().map_err(|err| { 749 error!("idle interval timer error: {}", err); 750 })); 751 752 if let Some(inner) = self.pool.upgrade() { 753 if let Ok(mut inner) = inner.lock() { 754 trace!("idle interval checking for expired"); 755 inner.clear_expired(); 756 continue; 757 } 758 } 759 return Ok(Async::Ready(())); 760 } 761 } 762 } 763 764 impl<T> WeakOpt<T> { none() -> Self765 fn none() -> Self { 766 WeakOpt(None) 767 } 768 downgrade(arc: &Arc<T>) -> Self769 fn downgrade(arc: &Arc<T>) -> Self { 770 WeakOpt(Some(Arc::downgrade(arc))) 771 } 772 upgrade(&self) -> Option<Arc<T>>773 fn upgrade(&self) -> Option<Arc<T>> { 774 self.0 775 .as_ref() 776 .and_then(Weak::upgrade) 777 } 778 } 779 780 #[cfg(test)] 781 mod tests { 782 use std::sync::Arc; 783 use std::time::Duration; 784 use futures::{Async, Future}; 785 use futures::future; 786 use common::Exec; 787 use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt}; 788 789 /// Test unique reservations. 790 #[derive(Debug, PartialEq, Eq)] 791 struct Uniq<T>(T); 792 793 impl<T: Send + 'static> Poolable for Uniq<T> { is_open(&self) -> bool794 fn is_open(&self) -> bool { 795 true 796 } 797 reserve(self) -> Reservation<Self>798 fn reserve(self) -> Reservation<Self> { 799 Reservation::Unique(self) 800 } 801 can_share(&self) -> bool802 fn can_share(&self) -> bool { 803 false 804 } 805 } 806 c<T: Poolable>(key: Key) -> Connecting<T>807 fn c<T: Poolable>(key: Key) -> Connecting<T> { 808 Connecting { 809 key, 810 pool: WeakOpt::none(), 811 } 812 } 813 pool_no_timer<T>() -> Pool<T>814 fn pool_no_timer<T>() -> Pool<T> { 815 pool_max_idle_no_timer(::std::usize::MAX) 816 } 817 pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>818 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { 819 let pool = Pool::new(super::Config { 820 enabled: true, 821 keep_alive_timeout: Some(Duration::from_millis(100)), 822 max_idle_per_host: max_idle, 823 }, 824 &Exec::Default, 825 ); 826 pool.no_timer(); 827 pool 828 } 829 830 #[test] test_pool_checkout_smoke()831 fn test_pool_checkout_smoke() { 832 let pool = pool_no_timer(); 833 let key = Arc::new("foo".to_string()); 834 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 835 836 drop(pooled); 837 838 match pool.checkout(key).poll().unwrap() { 839 Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)), 840 _ => panic!("not ready"), 841 } 842 } 843 844 #[test] test_pool_checkout_returns_none_if_expired()845 fn test_pool_checkout_returns_none_if_expired() { 846 future::lazy(|| { 847 let pool = pool_no_timer(); 848 let key = Arc::new("foo".to_string()); 849 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 850 drop(pooled); 851 ::std::thread::sleep(pool.locked().timeout.unwrap()); 852 assert!(pool.checkout(key).poll().unwrap().is_not_ready()); 853 ::futures::future::ok::<(), ()>(()) 854 }).wait().unwrap(); 855 } 856 857 #[test] test_pool_checkout_removes_expired()858 fn test_pool_checkout_removes_expired() { 859 future::lazy(|| { 860 let pool = pool_no_timer(); 861 let key = Arc::new("foo".to_string()); 862 863 pool.pooled(c(key.clone()), Uniq(41)); 864 pool.pooled(c(key.clone()), Uniq(5)); 865 pool.pooled(c(key.clone()), Uniq(99)); 866 867 assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); 868 ::std::thread::sleep(pool.locked().timeout.unwrap()); 869 870 // checkout.poll() should clean out the expired 871 pool.checkout(key.clone()).poll().unwrap(); 872 assert!(pool.locked().idle.get(&key).is_none()); 873 874 Ok::<(), ()>(()) 875 }).wait().unwrap(); 876 } 877 878 #[test] test_pool_max_idle_per_host()879 fn test_pool_max_idle_per_host() { 880 future::lazy(|| { 881 let pool = pool_max_idle_no_timer(2); 882 let key = Arc::new("foo".to_string()); 883 884 pool.pooled(c(key.clone()), Uniq(41)); 885 pool.pooled(c(key.clone()), Uniq(5)); 886 pool.pooled(c(key.clone()), Uniq(99)); 887 888 // pooled and dropped 3, max_idle should only allow 2 889 assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2)); 890 891 Ok::<(), ()>(()) 892 }).wait().unwrap(); 893 } 894 895 #[cfg(feature = "runtime")] 896 #[test] test_pool_timer_removes_expired()897 fn test_pool_timer_removes_expired() { 898 use std::time::Instant; 899 use tokio_timer::Delay; 900 let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap(); 901 let pool = Pool::new(super::Config { 902 enabled: true, 903 keep_alive_timeout: Some(Duration::from_millis(100)), 904 max_idle_per_host: ::std::usize::MAX, 905 }, 906 &Exec::Default, 907 ); 908 909 let key = Arc::new("foo".to_string()); 910 911 // Since pool.pooled() will be calling spawn on drop, need to be sure 912 // those drops are called while `rt` is the current executor. To do so, 913 // call those inside a future. 914 rt.block_on(::futures::future::lazy(|| { 915 pool.pooled(c(key.clone()), Uniq(41)); 916 pool.pooled(c(key.clone()), Uniq(5)); 917 pool.pooled(c(key.clone()), Uniq(99)); 918 Ok::<_, ()>(()) 919 })).unwrap(); 920 921 assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); 922 923 // Let the timer tick passed the expiration... 924 rt 925 .block_on(Delay::new(Instant::now() + Duration::from_millis(200))) 926 .expect("rt block_on 200ms"); 927 928 assert!(pool.locked().idle.get(&key).is_none()); 929 } 930 931 #[test] test_pool_checkout_task_unparked()932 fn test_pool_checkout_task_unparked() { 933 let pool = pool_no_timer(); 934 let key = Arc::new("foo".to_string()); 935 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 936 937 let checkout = pool.checkout(key).join(future::lazy(move || { 938 // the checkout future will park first, 939 // and then this lazy future will be polled, which will insert 940 // the pooled back into the pool 941 // 942 // this test makes sure that doing so will unpark the checkout 943 drop(pooled); 944 Ok(()) 945 })).map(|(entry, _)| entry); 946 assert_eq!(*checkout.wait().unwrap(), Uniq(41)); 947 } 948 949 #[test] test_pool_checkout_drop_cleans_up_waiters()950 fn test_pool_checkout_drop_cleans_up_waiters() { 951 future::lazy(|| { 952 let pool = pool_no_timer::<Uniq<i32>>(); 953 let key = Arc::new("localhost:12345".to_string()); 954 955 let mut checkout1 = pool.checkout(key.clone()); 956 let mut checkout2 = pool.checkout(key.clone()); 957 958 // first poll needed to get into Pool's parked 959 checkout1.poll().unwrap(); 960 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 961 checkout2.poll().unwrap(); 962 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); 963 964 // on drop, clean up Pool 965 drop(checkout1); 966 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 967 968 drop(checkout2); 969 assert!(pool.locked().waiters.get(&key).is_none()); 970 971 ::futures::future::ok::<(), ()>(()) 972 }).wait().unwrap(); 973 } 974 975 #[derive(Debug)] 976 struct CanClose { 977 val: i32, 978 closed: bool, 979 } 980 981 impl Poolable for CanClose { is_open(&self) -> bool982 fn is_open(&self) -> bool { 983 !self.closed 984 } 985 reserve(self) -> Reservation<Self>986 fn reserve(self) -> Reservation<Self> { 987 Reservation::Unique(self) 988 } 989 can_share(&self) -> bool990 fn can_share(&self) -> bool { 991 false 992 } 993 } 994 995 #[test] pooled_drop_if_closed_doesnt_reinsert()996 fn pooled_drop_if_closed_doesnt_reinsert() { 997 let pool = pool_no_timer(); 998 let key = Arc::new("localhost:12345".to_string()); 999 pool.pooled(c(key.clone()), CanClose { 1000 val: 57, 1001 closed: true, 1002 }); 1003 1004 assert!(!pool.locked().idle.contains_key(&key)); 1005 } 1006 } 1007