1 use pool::{self, Lifecycle, Pool, MAX_FUTURES}; 2 use task::Task; 3 4 use std::sync::atomic::Ordering::{AcqRel, Acquire}; 5 use std::sync::Arc; 6 7 use futures::{future, Future}; 8 use tokio_executor::{self, SpawnError}; 9 10 /// Submit futures to the associated thread pool for execution. 11 /// 12 /// A `Sender` instance is a handle to a single thread pool, allowing the owner 13 /// of the handle to spawn futures onto the thread pool. New futures are spawned 14 /// using [`Sender::spawn`]. 15 /// 16 /// The `Sender` handle is *only* used for spawning new futures. It does not 17 /// impact the lifecycle of the thread pool in any way. 18 /// 19 /// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The 20 /// `Sender` struct implements the `Executor` trait. 21 /// 22 /// [`Sender::spawn`]: #method.spawn 23 /// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender 24 #[derive(Debug)] 25 pub struct Sender { 26 pub(crate) pool: Arc<Pool>, 27 } 28 29 impl Sender { 30 /// Spawn a future onto the thread pool 31 /// 32 /// This function takes ownership of the future and spawns it onto the 33 /// thread pool, assigning it to a worker thread. The exact strategy used to 34 /// assign a future to a worker depends on if the caller is already on a 35 /// worker thread or external to the thread pool. 36 /// 37 /// If the caller is currently on the thread pool, the spawned future will 38 /// be assigned to the same worker that the caller is on. If the caller is 39 /// external to the thread pool, the future will be assigned to a random 40 /// worker. 41 /// 42 /// If `spawn` returns `Ok`, this does not mean that the future will be 43 /// executed. The thread pool can be forcibly shutdown between the time 44 /// `spawn` is called and the future has a chance to execute. 45 /// 46 /// If `spawn` returns `Err`, then the future failed to be spawned. There 47 /// are two possible causes: 48 /// 49 /// * The thread pool is at capacity and is unable to spawn a new future. 50 /// This is a temporary failure. At some point in the future, the thread 51 /// pool might be able to spawn new futures. 52 /// * The thread pool is shutdown. This is a permanent failure indicating 53 /// that the handle will never be able to spawn new futures. 54 /// 55 /// The status of the thread pool can be queried before calling `spawn` 56 /// using the `status` function (part of the `Executor` trait). 57 /// 58 /// # Examples 59 /// 60 /// ```rust 61 /// # extern crate tokio_threadpool; 62 /// # extern crate futures; 63 /// # use tokio_threadpool::ThreadPool; 64 /// use futures::future::{Future, lazy}; 65 /// 66 /// # pub fn main() { 67 /// // Create a thread pool with default configuration values 68 /// let thread_pool = ThreadPool::new(); 69 /// 70 /// thread_pool.sender().spawn(lazy(|| { 71 /// println!("called from a worker thread"); 72 /// Ok(()) 73 /// })).unwrap(); 74 /// 75 /// // Gracefully shutdown the threadpool 76 /// thread_pool.shutdown().wait().unwrap(); 77 /// # } 78 /// ``` spawn<F>(&self, future: F) -> Result<(), SpawnError> where F: Future<Item = (), Error = ()> + Send + 'static,79 pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> 80 where 81 F: Future<Item = (), Error = ()> + Send + 'static, 82 { 83 let mut s = self; 84 tokio_executor::Executor::spawn(&mut s, Box::new(future)) 85 } 86 87 /// Logic to prepare for spawning prepare_for_spawn(&self) -> Result<(), SpawnError>88 fn prepare_for_spawn(&self) -> Result<(), SpawnError> { 89 let mut state: pool::State = self.pool.state.load(Acquire).into(); 90 91 // Increment the number of futures spawned on the pool as well as 92 // validate that the pool is still running/ 93 loop { 94 let mut next = state; 95 96 if next.num_futures() == MAX_FUTURES { 97 // No capacity 98 return Err(SpawnError::at_capacity()); 99 } 100 101 if next.lifecycle() == Lifecycle::ShutdownNow { 102 // Cannot execute the future, executor is shutdown. 103 return Err(SpawnError::shutdown()); 104 } 105 106 next.inc_num_futures(); 107 108 let actual = self 109 .pool 110 .state 111 .compare_and_swap(state.into(), next.into(), AcqRel) 112 .into(); 113 114 if actual == state { 115 trace!("execute; count={:?}", next.num_futures()); 116 break; 117 } 118 119 state = actual; 120 } 121 122 Ok(()) 123 } 124 } 125 126 impl tokio_executor::Executor for Sender { status(&self) -> Result<(), tokio_executor::SpawnError>127 fn status(&self) -> Result<(), tokio_executor::SpawnError> { 128 let s = self; 129 tokio_executor::Executor::status(&s) 130 } 131 spawn( &mut self, future: Box<Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>132 fn spawn( 133 &mut self, 134 future: Box<Future<Item = (), Error = ()> + Send>, 135 ) -> Result<(), SpawnError> { 136 let mut s = &*self; 137 tokio_executor::Executor::spawn(&mut s, future) 138 } 139 } 140 141 impl<'a> tokio_executor::Executor for &'a Sender { status(&self) -> Result<(), tokio_executor::SpawnError>142 fn status(&self) -> Result<(), tokio_executor::SpawnError> { 143 let state: pool::State = self.pool.state.load(Acquire).into(); 144 145 if state.num_futures() == MAX_FUTURES { 146 // No capacity 147 return Err(SpawnError::at_capacity()); 148 } 149 150 if state.lifecycle() == Lifecycle::ShutdownNow { 151 // Cannot execute the future, executor is shutdown. 152 return Err(SpawnError::shutdown()); 153 } 154 155 Ok(()) 156 } 157 spawn( &mut self, future: Box<Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>158 fn spawn( 159 &mut self, 160 future: Box<Future<Item = (), Error = ()> + Send>, 161 ) -> Result<(), SpawnError> { 162 self.prepare_for_spawn()?; 163 164 // At this point, the pool has accepted the future, so schedule it for 165 // execution. 166 167 // Create a new task for the future 168 let task = Arc::new(Task::new(future)); 169 170 // Call `submit_external()` in order to place the task into the global 171 // queue. This way all workers have equal chance of running this task, 172 // which means IO handles will be assigned to reactors more evenly. 173 self.pool.submit_external(task, &self.pool); 174 175 Ok(()) 176 } 177 } 178 179 impl<T> tokio_executor::TypedExecutor<T> for Sender 180 where 181 T: Future<Item = (), Error = ()> + Send + 'static, 182 { status(&self) -> Result<(), tokio_executor::SpawnError>183 fn status(&self) -> Result<(), tokio_executor::SpawnError> { 184 tokio_executor::Executor::status(self) 185 } 186 spawn(&mut self, future: T) -> Result<(), SpawnError>187 fn spawn(&mut self, future: T) -> Result<(), SpawnError> { 188 tokio_executor::Executor::spawn(self, Box::new(future)) 189 } 190 } 191 192 impl<T> future::Executor<T> for Sender 193 where 194 T: Future<Item = (), Error = ()> + Send + 'static, 195 { execute(&self, future: T) -> Result<(), future::ExecuteError<T>>196 fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { 197 if let Err(e) = tokio_executor::Executor::status(self) { 198 let kind = if e.is_at_capacity() { 199 future::ExecuteErrorKind::NoCapacity 200 } else { 201 future::ExecuteErrorKind::Shutdown 202 }; 203 204 return Err(future::ExecuteError::new(kind, future)); 205 } 206 207 let _ = self.spawn(future); 208 Ok(()) 209 } 210 } 211 212 impl Clone for Sender { 213 #[inline] clone(&self) -> Sender214 fn clone(&self) -> Sender { 215 let pool = self.pool.clone(); 216 Sender { pool } 217 } 218 } 219