1 use pool::{self, Lifecycle, Pool, MAX_FUTURES};
2 use task::Task;
3 
4 use std::sync::atomic::Ordering::{AcqRel, Acquire};
5 use std::sync::Arc;
6 
7 use futures::{future, Future};
8 use tokio_executor::{self, SpawnError};
9 
10 /// Submit futures to the associated thread pool for execution.
11 ///
12 /// A `Sender` instance is a handle to a single thread pool, allowing the owner
13 /// of the handle to spawn futures onto the thread pool. New futures are spawned
14 /// using [`Sender::spawn`].
15 ///
16 /// The `Sender` handle is *only* used for spawning new futures. It does not
17 /// impact the lifecycle of the thread pool in any way.
18 ///
19 /// `Sender` instances are obtained by calling [`ThreadPool::sender`]. The
20 /// `Sender` struct implements the `Executor` trait.
21 ///
22 /// [`Sender::spawn`]: #method.spawn
23 /// [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
24 #[derive(Debug)]
25 pub struct Sender {
26     pub(crate) pool: Arc<Pool>,
27 }
28 
29 impl Sender {
30     /// Spawn a future onto the thread pool
31     ///
32     /// This function takes ownership of the future and spawns it onto the
33     /// thread pool, assigning it to a worker thread. The exact strategy used to
34     /// assign a future to a worker depends on if the caller is already on a
35     /// worker thread or external to the thread pool.
36     ///
37     /// If the caller is currently on the thread pool, the spawned future will
38     /// be assigned to the same worker that the caller is on. If the caller is
39     /// external to the thread pool, the future will be assigned to a random
40     /// worker.
41     ///
42     /// If `spawn` returns `Ok`, this does not mean that the future will be
43     /// executed. The thread pool can be forcibly shutdown between the time
44     /// `spawn` is called and the future has a chance to execute.
45     ///
46     /// If `spawn` returns `Err`, then the future failed to be spawned. There
47     /// are two possible causes:
48     ///
49     /// * The thread pool is at capacity and is unable to spawn a new future.
50     ///   This is a temporary failure. At some point in the future, the thread
51     ///   pool might be able to spawn new futures.
52     /// * The thread pool is shutdown. This is a permanent failure indicating
53     ///   that the handle will never be able to spawn new futures.
54     ///
55     /// The status of the thread pool can be queried before calling `spawn`
56     /// using the `status` function (part of the `Executor` trait).
57     ///
58     /// # Examples
59     ///
60     /// ```rust
61     /// # extern crate tokio_threadpool;
62     /// # extern crate futures;
63     /// # use tokio_threadpool::ThreadPool;
64     /// use futures::future::{Future, lazy};
65     ///
66     /// # pub fn main() {
67     /// // Create a thread pool with default configuration values
68     /// let thread_pool = ThreadPool::new();
69     ///
70     /// thread_pool.sender().spawn(lazy(|| {
71     ///     println!("called from a worker thread");
72     ///     Ok(())
73     /// })).unwrap();
74     ///
75     /// // Gracefully shutdown the threadpool
76     /// thread_pool.shutdown().wait().unwrap();
77     /// # }
78     /// ```
spawn<F>(&self, future: F) -> Result<(), SpawnError> where F: Future<Item = (), Error = ()> + Send + 'static,79     pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
80     where
81         F: Future<Item = (), Error = ()> + Send + 'static,
82     {
83         let mut s = self;
84         tokio_executor::Executor::spawn(&mut s, Box::new(future))
85     }
86 
87     /// Logic to prepare for spawning
prepare_for_spawn(&self) -> Result<(), SpawnError>88     fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
89         let mut state: pool::State = self.pool.state.load(Acquire).into();
90 
91         // Increment the number of futures spawned on the pool as well as
92         // validate that the pool is still running/
93         loop {
94             let mut next = state;
95 
96             if next.num_futures() == MAX_FUTURES {
97                 // No capacity
98                 return Err(SpawnError::at_capacity());
99             }
100 
101             if next.lifecycle() == Lifecycle::ShutdownNow {
102                 // Cannot execute the future, executor is shutdown.
103                 return Err(SpawnError::shutdown());
104             }
105 
106             next.inc_num_futures();
107 
108             let actual = self
109                 .pool
110                 .state
111                 .compare_and_swap(state.into(), next.into(), AcqRel)
112                 .into();
113 
114             if actual == state {
115                 trace!("execute; count={:?}", next.num_futures());
116                 break;
117             }
118 
119             state = actual;
120         }
121 
122         Ok(())
123     }
124 }
125 
126 impl tokio_executor::Executor for Sender {
status(&self) -> Result<(), tokio_executor::SpawnError>127     fn status(&self) -> Result<(), tokio_executor::SpawnError> {
128         let s = self;
129         tokio_executor::Executor::status(&s)
130     }
131 
spawn( &mut self, future: Box<Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>132     fn spawn(
133         &mut self,
134         future: Box<Future<Item = (), Error = ()> + Send>,
135     ) -> Result<(), SpawnError> {
136         let mut s = &*self;
137         tokio_executor::Executor::spawn(&mut s, future)
138     }
139 }
140 
141 impl<'a> tokio_executor::Executor for &'a Sender {
status(&self) -> Result<(), tokio_executor::SpawnError>142     fn status(&self) -> Result<(), tokio_executor::SpawnError> {
143         let state: pool::State = self.pool.state.load(Acquire).into();
144 
145         if state.num_futures() == MAX_FUTURES {
146             // No capacity
147             return Err(SpawnError::at_capacity());
148         }
149 
150         if state.lifecycle() == Lifecycle::ShutdownNow {
151             // Cannot execute the future, executor is shutdown.
152             return Err(SpawnError::shutdown());
153         }
154 
155         Ok(())
156     }
157 
spawn( &mut self, future: Box<Future<Item = (), Error = ()> + Send>, ) -> Result<(), SpawnError>158     fn spawn(
159         &mut self,
160         future: Box<Future<Item = (), Error = ()> + Send>,
161     ) -> Result<(), SpawnError> {
162         self.prepare_for_spawn()?;
163 
164         // At this point, the pool has accepted the future, so schedule it for
165         // execution.
166 
167         // Create a new task for the future
168         let task = Arc::new(Task::new(future));
169 
170         // Call `submit_external()` in order to place the task into the global
171         // queue. This way all workers have equal chance of running this task,
172         // which means IO handles will be assigned to reactors more evenly.
173         self.pool.submit_external(task, &self.pool);
174 
175         Ok(())
176     }
177 }
178 
179 impl<T> tokio_executor::TypedExecutor<T> for Sender
180 where
181     T: Future<Item = (), Error = ()> + Send + 'static,
182 {
status(&self) -> Result<(), tokio_executor::SpawnError>183     fn status(&self) -> Result<(), tokio_executor::SpawnError> {
184         tokio_executor::Executor::status(self)
185     }
186 
spawn(&mut self, future: T) -> Result<(), SpawnError>187     fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
188         tokio_executor::Executor::spawn(self, Box::new(future))
189     }
190 }
191 
192 impl<T> future::Executor<T> for Sender
193 where
194     T: Future<Item = (), Error = ()> + Send + 'static,
195 {
execute(&self, future: T) -> Result<(), future::ExecuteError<T>>196     fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
197         if let Err(e) = tokio_executor::Executor::status(self) {
198             let kind = if e.is_at_capacity() {
199                 future::ExecuteErrorKind::NoCapacity
200             } else {
201                 future::ExecuteErrorKind::Shutdown
202             };
203 
204             return Err(future::ExecuteError::new(kind, future));
205         }
206 
207         let _ = self.spawn(future);
208         Ok(())
209     }
210 }
211 
212 impl Clone for Sender {
213     #[inline]
clone(&self) -> Sender214     fn clone(&self) -> Sender {
215         let pool = self.pool.clone();
216         Sender { pool }
217     }
218 }
219