1 //! Threadpool 2 3 mod atomic_cell; 4 use atomic_cell::AtomicCell; 5 6 mod idle; 7 use self::idle::Idle; 8 9 mod worker; 10 pub(crate) use worker::Launch; 11 12 pub(crate) use worker::block_in_place; 13 14 use crate::loom::sync::Arc; 15 use crate::runtime::stats::RuntimeStats; 16 use crate::runtime::task::JoinHandle; 17 use crate::runtime::{Callback, Parker}; 18 19 use std::fmt; 20 use std::future::Future; 21 22 /// Work-stealing based thread pool for executing futures. 23 pub(crate) struct ThreadPool { 24 spawner: Spawner, 25 } 26 27 /// Submits futures to the associated thread pool for execution. 28 /// 29 /// A `Spawner` instance is a handle to a single thread pool that allows the owner 30 /// of the handle to spawn futures onto the thread pool. 31 /// 32 /// The `Spawner` handle is *only* used for spawning new futures. It does not 33 /// impact the lifecycle of the thread pool in any way. The thread pool may 34 /// shut down while there are outstanding `Spawner` instances. 35 /// 36 /// `Spawner` instances are obtained by calling [`ThreadPool::spawner`]. 37 /// 38 /// [`ThreadPool::spawner`]: method@ThreadPool::spawner 39 #[derive(Clone)] 40 pub(crate) struct Spawner { 41 shared: Arc<worker::Shared>, 42 } 43 44 // ===== impl ThreadPool ===== 45 46 impl ThreadPool { new( size: usize, parker: Parker, before_park: Option<Callback>, after_unpark: Option<Callback>, ) -> (ThreadPool, Launch)47 pub(crate) fn new( 48 size: usize, 49 parker: Parker, 50 before_park: Option<Callback>, 51 after_unpark: Option<Callback>, 52 ) -> (ThreadPool, Launch) { 53 let (shared, launch) = worker::create(size, parker, before_park, after_unpark); 54 let spawner = Spawner { shared }; 55 let thread_pool = ThreadPool { spawner }; 56 57 (thread_pool, launch) 58 } 59 60 /// Returns reference to `Spawner`. 61 /// 62 /// The `Spawner` handle can be cloned and enables spawning tasks from other 63 /// threads. spawner(&self) -> &Spawner64 pub(crate) fn spawner(&self) -> &Spawner { 65 &self.spawner 66 } 67 68 /// Blocks the current thread waiting for the future to complete. 69 /// 70 /// The future will execute on the current thread, but all spawned tasks 71 /// will be executed on the thread pool. block_on<F>(&self, future: F) -> F::Output where F: Future,72 pub(crate) fn block_on<F>(&self, future: F) -> F::Output 73 where 74 F: Future, 75 { 76 let mut enter = crate::runtime::enter(true); 77 enter.block_on(future).expect("failed to park thread") 78 } 79 } 80 81 impl fmt::Debug for ThreadPool { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result82 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 83 fmt.debug_struct("ThreadPool").finish() 84 } 85 } 86 87 impl Drop for ThreadPool { drop(&mut self)88 fn drop(&mut self) { 89 self.spawner.shutdown(); 90 } 91 } 92 93 // ==== impl Spawner ===== 94 95 impl Spawner { 96 /// Spawns a future onto the thread pool spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: crate::future::Future + Send + 'static, F::Output: Send + 'static,97 pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 98 where 99 F: crate::future::Future + Send + 'static, 100 F::Output: Send + 'static, 101 { 102 worker::Shared::bind_new_task(&self.shared, future) 103 } 104 shutdown(&mut self)105 pub(crate) fn shutdown(&mut self) { 106 self.shared.close(); 107 } 108 stats(&self) -> &RuntimeStats109 pub(crate) fn stats(&self) -> &RuntimeStats { 110 self.shared.stats() 111 } 112 } 113 114 impl fmt::Debug for Spawner { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result115 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 116 fmt.debug_struct("Spawner").finish() 117 } 118 } 119