1 mod backup;
2 mod backup_stack;
3 mod state;
4 
5 pub(crate) use self::backup::{Backup, BackupId};
6 pub(crate) use self::backup_stack::MAX_BACKUP;
7 pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES};
8 
9 use self::backup::Handoff;
10 use self::backup_stack::BackupStack;
11 
12 use config::Config;
13 use shutdown::ShutdownTrigger;
14 use task::{Blocking, Task};
15 use worker::{self, Worker, WorkerId};
16 
17 use futures::Poll;
18 
19 use std::cell::Cell;
20 use std::collections::hash_map::RandomState;
21 use std::hash::{BuildHasher, Hash, Hasher};
22 use std::num::Wrapping;
23 use std::sync::atomic::AtomicUsize;
24 use std::sync::atomic::Ordering::{AcqRel, Acquire};
25 use std::sync::{Arc, Weak};
26 use std::thread;
27 
28 use crossbeam_deque::Injector;
29 use crossbeam_utils::CachePadded;
30 
31 #[derive(Debug)]
32 pub(crate) struct Pool {
33     // Tracks the state of the thread pool (running, shutting down, ...).
34     //
35     // While workers check this field as a hint to detect shutdown, it is
36     // **not** used as a primary point of coordination for workers. The sleep
37     // stack is used as the primary point of coordination for workers.
38     //
39     // The value of this atomic is deserialized into a `pool::State` instance.
40     // See comments for that type.
41     pub state: CachePadded<AtomicUsize>,
42 
43     // Stack tracking sleeping workers.
44     sleep_stack: CachePadded<worker::Stack>,
45 
46     // Worker state
47     //
48     // A worker is a thread that is processing the work queue and polling
49     // futures.
50     //
51     // The number of workers will *usually* be small.
52     pub workers: Arc<[worker::Entry]>,
53 
54     // The global MPMC queue of tasks.
55     //
56     // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
57     // task queues, they periodically steal tasks from this global queue, too.
58     pub queue: Arc<Injector<Arc<Task>>>,
59 
60     // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
61     //
62     // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
63     // thread.
64     pub trigger: Weak<ShutdownTrigger>,
65 
66     // Backup thread state
67     //
68     // In order to efficiently support `blocking`, a pool of backup threads is
69     // needed. These backup threads are ready to take over a worker if the
70     // future being processed requires blocking.
71     backup: Box<[Backup]>,
72 
73     // Stack of sleeping backup threads
74     pub backup_stack: BackupStack,
75 
76     // State regarding coordinating blocking sections and tracking tasks that
77     // are pending blocking capacity.
78     blocking: Blocking,
79 
80     // Configuration
81     pub config: Config,
82 }
83 
84 impl Pool {
85     /// Create a new `Pool`
new( workers: Arc<[worker::Entry]>, trigger: Weak<ShutdownTrigger>, max_blocking: usize, config: Config, queue: Arc<Injector<Arc<Task>>>, ) -> Pool86     pub fn new(
87         workers: Arc<[worker::Entry]>,
88         trigger: Weak<ShutdownTrigger>,
89         max_blocking: usize,
90         config: Config,
91         queue: Arc<Injector<Arc<Task>>>,
92     ) -> Pool {
93         let pool_size = workers.len();
94         let total_size = max_blocking + pool_size;
95 
96         // Create the set of backup entries
97         //
98         // This is `backup + pool_size` because the core thread pool running the
99         // workers is spawned from backup as well.
100         let backup = (0..total_size)
101             .map(|_| Backup::new())
102             .collect::<Vec<_>>()
103             .into_boxed_slice();
104 
105         let backup_stack = BackupStack::new();
106 
107         for i in (0..backup.len()).rev() {
108             backup_stack.push(&backup, BackupId(i)).unwrap();
109         }
110 
111         // Initialize the blocking state
112         let blocking = Blocking::new(max_blocking);
113 
114         let ret = Pool {
115             state: CachePadded::new(AtomicUsize::new(State::new().into())),
116             sleep_stack: CachePadded::new(worker::Stack::new()),
117             workers,
118             queue,
119             trigger,
120             backup,
121             backup_stack,
122             blocking,
123             config,
124         };
125 
126         // Now, we prime the sleeper stack
127         for i in 0..pool_size {
128             ret.sleep_stack.push(&ret.workers, i).unwrap();
129         }
130 
131         ret
132     }
133 
134     /// Start shutting down the pool. This means that no new futures will be
135     /// accepted.
shutdown(&self, now: bool, purge_queue: bool)136     pub fn shutdown(&self, now: bool, purge_queue: bool) {
137         let mut state: State = self.state.load(Acquire).into();
138 
139         trace!("shutdown; state={:?}", state);
140 
141         // For now, this must be true
142         debug_assert!(!purge_queue || now);
143 
144         // Start by setting the shutdown flag
145         loop {
146             let mut next = state;
147 
148             let num_futures = next.num_futures();
149 
150             if next.lifecycle() == Lifecycle::ShutdownNow {
151                 // Already transitioned to shutting down state
152 
153                 if !purge_queue || num_futures == 0 {
154                     // Nothing more to do
155                     return;
156                 }
157 
158                 // The queue must be purged
159                 debug_assert!(purge_queue);
160                 next.clear_num_futures();
161             } else {
162                 next.set_lifecycle(if now || num_futures == 0 {
163                     // If already idle, always transition to shutdown now.
164                     Lifecycle::ShutdownNow
165                 } else {
166                     Lifecycle::ShutdownOnIdle
167                 });
168 
169                 if purge_queue {
170                     next.clear_num_futures();
171                 }
172             }
173 
174             let actual = self
175                 .state
176                 .compare_and_swap(state.into(), next.into(), AcqRel)
177                 .into();
178 
179             if state == actual {
180                 state = next;
181                 break;
182             }
183 
184             state = actual;
185         }
186 
187         trace!("  -> transitioned to shutdown");
188 
189         // Only transition to terminate if there are no futures currently on the
190         // pool
191         if state.num_futures() != 0 {
192             return;
193         }
194 
195         self.terminate_sleeping_workers();
196     }
197 
198     /// Called by `Worker` as it tries to enter a sleeping state. Before it
199     /// sleeps, it must push itself onto the sleep stack. This enables other
200     /// threads to see it when signaling work.
push_sleeper(&self, idx: usize) -> Result<(), ()>201     pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
202         self.sleep_stack.push(&self.workers, idx)
203     }
204 
terminate_sleeping_workers(&self)205     pub fn terminate_sleeping_workers(&self) {
206         use worker::Lifecycle::Signaled;
207 
208         trace!("  -> shutting down workers");
209         // Wakeup all sleeping workers. They will wake up, see the state
210         // transition, and terminate.
211         while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
212             self.workers[idx].signal_stop(worker_state);
213         }
214 
215         // Now terminate any backup threads
216         //
217         // The call to `pop` must be successful because shutting down the pool
218         // is coordinated and at this point, this is the only thread that will
219         // attempt to transition the backup stack to "terminated".
220         while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) {
221             self.backup[backup_id.0].signal_stop();
222         }
223     }
224 
poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError>225     pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
226         self.blocking.poll_blocking_capacity(task)
227     }
228 
229     /// Submit a task to the scheduler.
230     ///
231     /// Called from either inside or outside of the scheduler. If currently on
232     /// the scheduler, then a fast path is taken.
submit(&self, task: Arc<Task>, pool: &Arc<Pool>)233     pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) {
234         debug_assert_eq!(*self, **pool);
235 
236         Worker::with_current(|worker| {
237             if let Some(worker) = worker {
238                 // If the worker is in blocking mode, then even though the
239                 // thread-local variable is set, the current thread does not
240                 // have ownership of that worker entry. This is because the
241                 // worker entry has already been handed off to another thread.
242                 //
243                 // The second check handles the case where the current thread is
244                 // part of a different threadpool than the one being submitted
245                 // to.
246                 if !worker.is_blocking() && *self == *worker.pool {
247                     let idx = worker.id.0;
248 
249                     trace!("    -> submit internal; idx={}", idx);
250 
251                     worker.pool.workers[idx].submit_internal(task);
252                     worker.pool.signal_work(pool);
253                     return;
254                 }
255             }
256 
257             self.submit_external(task, pool);
258         });
259     }
260 
261     /// Submit a task to the scheduler from off worker
262     ///
263     /// Called from outside of the scheduler, this function is how new tasks
264     /// enter the system.
submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>)265     pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
266         debug_assert_eq!(*self, **pool);
267 
268         trace!("    -> submit external");
269 
270         self.queue.push(task);
271         self.signal_work(pool);
272     }
273 
release_backup(&self, backup_id: BackupId) -> Result<(), ()>274     pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
275         // First update the state, this cannot fail because the caller must have
276         // exclusive access to the backup token.
277         self.backup[backup_id.0].release();
278 
279         // Push the backup entry back on the stack
280         self.backup_stack.push(&self.backup, backup_id)
281     }
282 
notify_blocking_task(&self, pool: &Arc<Pool>)283     pub fn notify_blocking_task(&self, pool: &Arc<Pool>) {
284         debug_assert_eq!(*self, **pool);
285         self.blocking.notify_task(&pool);
286     }
287 
288     /// Provision a thread to run a worker
spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>)289     pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) {
290         debug_assert_eq!(*self, **pool);
291 
292         let backup_id = match self.backup_stack.pop(&self.backup, false) {
293             Ok(Some(backup_id)) => backup_id,
294             Ok(None) => panic!("no thread available"),
295             Err(_) => {
296                 debug!("failed to spawn worker thread due to the thread pool shutting down");
297                 return;
298             }
299         };
300 
301         let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone());
302 
303         if !need_spawn {
304             return;
305         }
306 
307         let trigger = match self.trigger.upgrade() {
308             None => {
309                 // The pool is shutting down.
310                 return;
311             }
312             Some(t) => t,
313         };
314 
315         let mut th = thread::Builder::new();
316 
317         if let Some(ref prefix) = pool.config.name_prefix {
318             th = th.name(format!("{}{}", prefix, backup_id.0));
319         }
320 
321         if let Some(stack) = pool.config.stack_size {
322             th = th.stack_size(stack);
323         }
324 
325         let pool = pool.clone();
326 
327         let res = th.spawn(move || {
328             if let Some(ref f) = pool.config.after_start {
329                 f();
330             }
331 
332             let mut worker_id = id;
333 
334             pool.backup[backup_id.0].start(&worker_id);
335 
336             loop {
337                 // The backup token should be in the running state.
338                 debug_assert!(pool.backup[backup_id.0].is_running());
339 
340                 // TODO: Avoid always cloning
341                 let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone());
342 
343                 // Run the worker. If the worker transitioned to a "blocking"
344                 // state, then `is_blocking` will be true.
345                 if !worker.do_run() {
346                     // The worker shutdown, so exit the thread.
347                     break;
348                 }
349 
350                 debug_assert!(!pool.backup[backup_id.0].is_pushed());
351 
352                 // Push the thread back onto the backup stack. This makes it
353                 // available for future handoffs.
354                 //
355                 // This **must** happen before notifying the task.
356                 let res = pool.backup_stack.push(&pool.backup, backup_id);
357 
358                 if res.is_err() {
359                     // The pool is being shutdown.
360                     break;
361                 }
362 
363                 // The task switched the current thread to blocking mode.
364                 // Now that the blocking task completed, any tasks
365                 pool.notify_blocking_task(&pool);
366 
367                 debug_assert!(pool.backup[backup_id.0].is_running());
368 
369                 // Wait for a handoff
370                 let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive);
371 
372                 match handoff {
373                     Handoff::Worker(id) => {
374                         debug_assert!(pool.backup[backup_id.0].is_running());
375                         worker_id = id;
376                     }
377                     Handoff::Idle | Handoff::Terminated => {
378                         break;
379                     }
380                 }
381             }
382 
383             if let Some(ref f) = pool.config.before_stop {
384                 f();
385             }
386         });
387 
388         if let Err(e) = res {
389             error!("failed to spawn worker thread; err={:?}", e);
390             panic!("failed to spawn worker thread: {:?}", e);
391         }
392     }
393 
394     /// If there are any other workers currently relaxing, signal them that work
395     /// is available so that they can try to find more work to process.
signal_work(&self, pool: &Arc<Pool>)396     pub fn signal_work(&self, pool: &Arc<Pool>) {
397         debug_assert_eq!(*self, **pool);
398 
399         use worker::Lifecycle::Signaled;
400 
401         if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
402             let entry = &self.workers[idx];
403 
404             debug_assert!(
405                 worker_state.lifecycle() != Signaled,
406                 "actual={:?}",
407                 worker_state.lifecycle(),
408             );
409 
410             trace!("signal_work -- notify; idx={}", idx);
411 
412             if !entry.notify(worker_state) {
413                 trace!("signal_work -- spawn; idx={}", idx);
414                 self.spawn_thread(WorkerId(idx), pool);
415             }
416         }
417     }
418 
419     /// Generates a random number
420     ///
421     /// Uses a thread-local random number generator based on XorShift.
rand_usize(&self) -> usize422     pub fn rand_usize(&self) -> usize {
423         thread_local! {
424             static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed()));
425         }
426 
427         RNG.with(|rng| {
428             // This is the 32-bit variant of Xorshift.
429             // https://en.wikipedia.org/wiki/Xorshift
430             let mut x = rng.get();
431             x ^= x << 13;
432             x ^= x >> 17;
433             x ^= x << 5;
434             rng.set(x);
435             x.0 as usize
436         })
437     }
438 }
439 
440 impl PartialEq for Pool {
eq(&self, other: &Pool) -> bool441     fn eq(&self, other: &Pool) -> bool {
442         self as *const _ == other as *const _
443     }
444 }
445 
446 unsafe impl Send for Pool {}
447 unsafe impl Sync for Pool {}
448 
449 // Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit
450 // PRNG. This uses one libstd RandomState for a default hasher and hashes on
451 // the current thread ID to obtain an unpredictable, collision resistant seed.
prng_seed() -> u32452 fn prng_seed() -> u32 {
453     // This obtains a small number of random bytes from the host system (for
454     // example, on unix via getrandom(2)) in order to seed an unpredictable and
455     // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with
456     // 128-bit state). We only need one of these, to make the seeds for all
457     // process threads different via hashed IDs, collision resistant, and
458     // unpredictable.
459     lazy_static! {
460         static ref RND_STATE: RandomState = RandomState::new();
461     }
462 
463     // Hash the current thread ID to produce a u32 value
464     let mut hasher = RND_STATE.build_hasher();
465     thread::current().id().hash(&mut hasher);
466     let hash: u64 = hasher.finish();
467     let seed = (hash as u32) ^ ((hash >> 32) as u32);
468 
469     // Ensure non-zero seed (Xorshift yields only zero's for that seed)
470     if seed == 0 {
471         0x9b4e_6d25 // misc bits, could be any non-zero
472     } else {
473         seed
474     }
475 }
476