1 mod backup;
2 mod backup_stack;
3 mod state;
4
5 pub(crate) use self::backup::{Backup, BackupId};
6 pub(crate) use self::backup_stack::MAX_BACKUP;
7 pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES};
8
9 use self::backup::Handoff;
10 use self::backup_stack::BackupStack;
11
12 use config::Config;
13 use shutdown::ShutdownTrigger;
14 use task::{Blocking, Task};
15 use worker::{self, Worker, WorkerId};
16
17 use futures::Poll;
18
19 use std::cell::Cell;
20 use std::collections::hash_map::RandomState;
21 use std::hash::{BuildHasher, Hash, Hasher};
22 use std::num::Wrapping;
23 use std::sync::atomic::AtomicUsize;
24 use std::sync::atomic::Ordering::{AcqRel, Acquire};
25 use std::sync::{Arc, Weak};
26 use std::thread;
27
28 use crossbeam_deque::Injector;
29 use crossbeam_utils::CachePadded;
30
31 #[derive(Debug)]
32 pub(crate) struct Pool {
33 // Tracks the state of the thread pool (running, shutting down, ...).
34 //
35 // While workers check this field as a hint to detect shutdown, it is
36 // **not** used as a primary point of coordination for workers. The sleep
37 // stack is used as the primary point of coordination for workers.
38 //
39 // The value of this atomic is deserialized into a `pool::State` instance.
40 // See comments for that type.
41 pub state: CachePadded<AtomicUsize>,
42
43 // Stack tracking sleeping workers.
44 sleep_stack: CachePadded<worker::Stack>,
45
46 // Worker state
47 //
48 // A worker is a thread that is processing the work queue and polling
49 // futures.
50 //
51 // The number of workers will *usually* be small.
52 pub workers: Arc<[worker::Entry]>,
53
54 // The global MPMC queue of tasks.
55 //
56 // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
57 // task queues, they periodically steal tasks from this global queue, too.
58 pub queue: Arc<Injector<Arc<Task>>>,
59
60 // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
61 //
62 // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
63 // thread.
64 pub trigger: Weak<ShutdownTrigger>,
65
66 // Backup thread state
67 //
68 // In order to efficiently support `blocking`, a pool of backup threads is
69 // needed. These backup threads are ready to take over a worker if the
70 // future being processed requires blocking.
71 backup: Box<[Backup]>,
72
73 // Stack of sleeping backup threads
74 pub backup_stack: BackupStack,
75
76 // State regarding coordinating blocking sections and tracking tasks that
77 // are pending blocking capacity.
78 blocking: Blocking,
79
80 // Configuration
81 pub config: Config,
82 }
83
84 impl Pool {
85 /// Create a new `Pool`
new( workers: Arc<[worker::Entry]>, trigger: Weak<ShutdownTrigger>, max_blocking: usize, config: Config, queue: Arc<Injector<Arc<Task>>>, ) -> Pool86 pub fn new(
87 workers: Arc<[worker::Entry]>,
88 trigger: Weak<ShutdownTrigger>,
89 max_blocking: usize,
90 config: Config,
91 queue: Arc<Injector<Arc<Task>>>,
92 ) -> Pool {
93 let pool_size = workers.len();
94 let total_size = max_blocking + pool_size;
95
96 // Create the set of backup entries
97 //
98 // This is `backup + pool_size` because the core thread pool running the
99 // workers is spawned from backup as well.
100 let backup = (0..total_size)
101 .map(|_| Backup::new())
102 .collect::<Vec<_>>()
103 .into_boxed_slice();
104
105 let backup_stack = BackupStack::new();
106
107 for i in (0..backup.len()).rev() {
108 backup_stack.push(&backup, BackupId(i)).unwrap();
109 }
110
111 // Initialize the blocking state
112 let blocking = Blocking::new(max_blocking);
113
114 let ret = Pool {
115 state: CachePadded::new(AtomicUsize::new(State::new().into())),
116 sleep_stack: CachePadded::new(worker::Stack::new()),
117 workers,
118 queue,
119 trigger,
120 backup,
121 backup_stack,
122 blocking,
123 config,
124 };
125
126 // Now, we prime the sleeper stack
127 for i in 0..pool_size {
128 ret.sleep_stack.push(&ret.workers, i).unwrap();
129 }
130
131 ret
132 }
133
134 /// Start shutting down the pool. This means that no new futures will be
135 /// accepted.
shutdown(&self, now: bool, purge_queue: bool)136 pub fn shutdown(&self, now: bool, purge_queue: bool) {
137 let mut state: State = self.state.load(Acquire).into();
138
139 trace!("shutdown; state={:?}", state);
140
141 // For now, this must be true
142 debug_assert!(!purge_queue || now);
143
144 // Start by setting the shutdown flag
145 loop {
146 let mut next = state;
147
148 let num_futures = next.num_futures();
149
150 if next.lifecycle() == Lifecycle::ShutdownNow {
151 // Already transitioned to shutting down state
152
153 if !purge_queue || num_futures == 0 {
154 // Nothing more to do
155 return;
156 }
157
158 // The queue must be purged
159 debug_assert!(purge_queue);
160 next.clear_num_futures();
161 } else {
162 next.set_lifecycle(if now || num_futures == 0 {
163 // If already idle, always transition to shutdown now.
164 Lifecycle::ShutdownNow
165 } else {
166 Lifecycle::ShutdownOnIdle
167 });
168
169 if purge_queue {
170 next.clear_num_futures();
171 }
172 }
173
174 let actual = self
175 .state
176 .compare_and_swap(state.into(), next.into(), AcqRel)
177 .into();
178
179 if state == actual {
180 state = next;
181 break;
182 }
183
184 state = actual;
185 }
186
187 trace!(" -> transitioned to shutdown");
188
189 // Only transition to terminate if there are no futures currently on the
190 // pool
191 if state.num_futures() != 0 {
192 return;
193 }
194
195 self.terminate_sleeping_workers();
196 }
197
198 /// Called by `Worker` as it tries to enter a sleeping state. Before it
199 /// sleeps, it must push itself onto the sleep stack. This enables other
200 /// threads to see it when signaling work.
push_sleeper(&self, idx: usize) -> Result<(), ()>201 pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
202 self.sleep_stack.push(&self.workers, idx)
203 }
204
terminate_sleeping_workers(&self)205 pub fn terminate_sleeping_workers(&self) {
206 use worker::Lifecycle::Signaled;
207
208 trace!(" -> shutting down workers");
209 // Wakeup all sleeping workers. They will wake up, see the state
210 // transition, and terminate.
211 while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
212 self.workers[idx].signal_stop(worker_state);
213 }
214
215 // Now terminate any backup threads
216 //
217 // The call to `pop` must be successful because shutting down the pool
218 // is coordinated and at this point, this is the only thread that will
219 // attempt to transition the backup stack to "terminated".
220 while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) {
221 self.backup[backup_id.0].signal_stop();
222 }
223 }
224
poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError>225 pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
226 self.blocking.poll_blocking_capacity(task)
227 }
228
229 /// Submit a task to the scheduler.
230 ///
231 /// Called from either inside or outside of the scheduler. If currently on
232 /// the scheduler, then a fast path is taken.
submit(&self, task: Arc<Task>, pool: &Arc<Pool>)233 pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) {
234 debug_assert_eq!(*self, **pool);
235
236 Worker::with_current(|worker| {
237 if let Some(worker) = worker {
238 // If the worker is in blocking mode, then even though the
239 // thread-local variable is set, the current thread does not
240 // have ownership of that worker entry. This is because the
241 // worker entry has already been handed off to another thread.
242 //
243 // The second check handles the case where the current thread is
244 // part of a different threadpool than the one being submitted
245 // to.
246 if !worker.is_blocking() && *self == *worker.pool {
247 let idx = worker.id.0;
248
249 trace!(" -> submit internal; idx={}", idx);
250
251 worker.pool.workers[idx].submit_internal(task);
252 worker.pool.signal_work(pool);
253 return;
254 }
255 }
256
257 self.submit_external(task, pool);
258 });
259 }
260
261 /// Submit a task to the scheduler from off worker
262 ///
263 /// Called from outside of the scheduler, this function is how new tasks
264 /// enter the system.
submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>)265 pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
266 debug_assert_eq!(*self, **pool);
267
268 trace!(" -> submit external");
269
270 self.queue.push(task);
271 self.signal_work(pool);
272 }
273
release_backup(&self, backup_id: BackupId) -> Result<(), ()>274 pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
275 // First update the state, this cannot fail because the caller must have
276 // exclusive access to the backup token.
277 self.backup[backup_id.0].release();
278
279 // Push the backup entry back on the stack
280 self.backup_stack.push(&self.backup, backup_id)
281 }
282
notify_blocking_task(&self, pool: &Arc<Pool>)283 pub fn notify_blocking_task(&self, pool: &Arc<Pool>) {
284 debug_assert_eq!(*self, **pool);
285 self.blocking.notify_task(&pool);
286 }
287
288 /// Provision a thread to run a worker
spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>)289 pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) {
290 debug_assert_eq!(*self, **pool);
291
292 let backup_id = match self.backup_stack.pop(&self.backup, false) {
293 Ok(Some(backup_id)) => backup_id,
294 Ok(None) => panic!("no thread available"),
295 Err(_) => {
296 debug!("failed to spawn worker thread due to the thread pool shutting down");
297 return;
298 }
299 };
300
301 let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone());
302
303 if !need_spawn {
304 return;
305 }
306
307 let trigger = match self.trigger.upgrade() {
308 None => {
309 // The pool is shutting down.
310 return;
311 }
312 Some(t) => t,
313 };
314
315 let mut th = thread::Builder::new();
316
317 if let Some(ref prefix) = pool.config.name_prefix {
318 th = th.name(format!("{}{}", prefix, backup_id.0));
319 }
320
321 if let Some(stack) = pool.config.stack_size {
322 th = th.stack_size(stack);
323 }
324
325 let pool = pool.clone();
326
327 let res = th.spawn(move || {
328 if let Some(ref f) = pool.config.after_start {
329 f();
330 }
331
332 let mut worker_id = id;
333
334 pool.backup[backup_id.0].start(&worker_id);
335
336 loop {
337 // The backup token should be in the running state.
338 debug_assert!(pool.backup[backup_id.0].is_running());
339
340 // TODO: Avoid always cloning
341 let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone());
342
343 // Run the worker. If the worker transitioned to a "blocking"
344 // state, then `is_blocking` will be true.
345 if !worker.do_run() {
346 // The worker shutdown, so exit the thread.
347 break;
348 }
349
350 debug_assert!(!pool.backup[backup_id.0].is_pushed());
351
352 // Push the thread back onto the backup stack. This makes it
353 // available for future handoffs.
354 //
355 // This **must** happen before notifying the task.
356 let res = pool.backup_stack.push(&pool.backup, backup_id);
357
358 if res.is_err() {
359 // The pool is being shutdown.
360 break;
361 }
362
363 // The task switched the current thread to blocking mode.
364 // Now that the blocking task completed, any tasks
365 pool.notify_blocking_task(&pool);
366
367 debug_assert!(pool.backup[backup_id.0].is_running());
368
369 // Wait for a handoff
370 let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive);
371
372 match handoff {
373 Handoff::Worker(id) => {
374 debug_assert!(pool.backup[backup_id.0].is_running());
375 worker_id = id;
376 }
377 Handoff::Idle | Handoff::Terminated => {
378 break;
379 }
380 }
381 }
382
383 if let Some(ref f) = pool.config.before_stop {
384 f();
385 }
386 });
387
388 if let Err(e) = res {
389 error!("failed to spawn worker thread; err={:?}", e);
390 panic!("failed to spawn worker thread: {:?}", e);
391 }
392 }
393
394 /// If there are any other workers currently relaxing, signal them that work
395 /// is available so that they can try to find more work to process.
signal_work(&self, pool: &Arc<Pool>)396 pub fn signal_work(&self, pool: &Arc<Pool>) {
397 debug_assert_eq!(*self, **pool);
398
399 use worker::Lifecycle::Signaled;
400
401 if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
402 let entry = &self.workers[idx];
403
404 debug_assert!(
405 worker_state.lifecycle() != Signaled,
406 "actual={:?}",
407 worker_state.lifecycle(),
408 );
409
410 trace!("signal_work -- notify; idx={}", idx);
411
412 if !entry.notify(worker_state) {
413 trace!("signal_work -- spawn; idx={}", idx);
414 self.spawn_thread(WorkerId(idx), pool);
415 }
416 }
417 }
418
419 /// Generates a random number
420 ///
421 /// Uses a thread-local random number generator based on XorShift.
rand_usize(&self) -> usize422 pub fn rand_usize(&self) -> usize {
423 thread_local! {
424 static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed()));
425 }
426
427 RNG.with(|rng| {
428 // This is the 32-bit variant of Xorshift.
429 // https://en.wikipedia.org/wiki/Xorshift
430 let mut x = rng.get();
431 x ^= x << 13;
432 x ^= x >> 17;
433 x ^= x << 5;
434 rng.set(x);
435 x.0 as usize
436 })
437 }
438 }
439
440 impl PartialEq for Pool {
eq(&self, other: &Pool) -> bool441 fn eq(&self, other: &Pool) -> bool {
442 self as *const _ == other as *const _
443 }
444 }
445
446 unsafe impl Send for Pool {}
447 unsafe impl Sync for Pool {}
448
449 // Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit
450 // PRNG. This uses one libstd RandomState for a default hasher and hashes on
451 // the current thread ID to obtain an unpredictable, collision resistant seed.
prng_seed() -> u32452 fn prng_seed() -> u32 {
453 // This obtains a small number of random bytes from the host system (for
454 // example, on unix via getrandom(2)) in order to seed an unpredictable and
455 // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with
456 // 128-bit state). We only need one of these, to make the seeds for all
457 // process threads different via hashed IDs, collision resistant, and
458 // unpredictable.
459 lazy_static! {
460 static ref RND_STATE: RandomState = RandomState::new();
461 }
462
463 // Hash the current thread ID to produce a u32 value
464 let mut hasher = RND_STATE.build_hasher();
465 thread::current().id().hash(&mut hasher);
466 let hash: u64 = hasher.finish();
467 let seed = (hash as u32) ^ ((hash >> 32) as u32);
468
469 // Ensure non-zero seed (Xorshift yields only zero's for that seed)
470 if seed == 0 {
471 0x9b4e_6d25 // misc bits, could be any non-zero
472 } else {
473 seed
474 }
475 }
476