1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6
7 use crate::loom::rand::seed;
8 use crate::loom::sync::{Arc, Mutex};
9 use crate::park::{Park, Unpark};
10 use crate::runtime;
11 use crate::runtime::park::{Parker, Unparker};
12 use crate::runtime::thread_pool::{AtomicCell, Idle};
13 use crate::runtime::{queue, task};
14 use crate::util::linked_list::LinkedList;
15 use crate::util::FastRand;
16
17 use std::cell::RefCell;
18 use std::time::Duration;
19
20 /// A scheduler worker
21 pub(super) struct Worker {
22 /// Reference to shared state
23 shared: Arc<Shared>,
24
25 /// Index holding this worker's remote state
26 index: usize,
27
28 /// Used to hand-off a worker's core to another thread.
29 core: AtomicCell<Core>,
30 }
31
32 /// Core data
33 struct Core {
34 /// Used to schedule bookkeeping tasks every so often.
35 tick: u8,
36
37 /// When a task is scheduled from a worker, it is stored in this slot. The
38 /// worker will check this slot for a task **before** checking the run
39 /// queue. This effectively results in the **last** scheduled task to be run
40 /// next (LIFO). This is an optimization for message passing patterns and
41 /// helps to reduce latency.
42 lifo_slot: Option<Notified>,
43
44 /// The worker-local run queue.
45 run_queue: queue::Local<Arc<Worker>>,
46
47 /// True if the worker is currently searching for more work. Searching
48 /// involves attempting to steal from other workers.
49 is_searching: bool,
50
51 /// True if the scheduler is being shutdown
52 is_shutdown: bool,
53
54 /// Tasks owned by the core
55 tasks: LinkedList<Task>,
56
57 /// Parker
58 ///
59 /// Stored in an `Option` as the parker is added / removed to make the
60 /// borrow checker happy.
61 park: Option<Parker>,
62
63 /// Fast random number generator.
64 rand: FastRand,
65 }
66
67 /// State shared across all workers
68 pub(super) struct Shared {
69 /// Per-worker remote state. All other workers have access to this and is
70 /// how they communicate between each other.
71 remotes: Box<[Remote]>,
72
73 /// Submit work to the scheduler while **not** currently on a worker thread.
74 inject: queue::Inject<Arc<Worker>>,
75
76 /// Coordinates idle workers
77 idle: Idle,
78
79 /// Workers have have observed the shutdown signal
80 ///
81 /// The core is **not** placed back in the worker to avoid it from being
82 /// stolen by a thread that was spawned as part of `block_in_place`.
83 shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
84 }
85
86 /// Used to communicate with a worker from other threads.
87 struct Remote {
88 /// Steal tasks from this worker.
89 steal: queue::Steal<Arc<Worker>>,
90
91 /// Transfers tasks to be released. Any worker pushes tasks, only the owning
92 /// worker pops.
93 pending_drop: task::TransferStack<Arc<Worker>>,
94
95 /// Unparks the associated worker thread
96 unpark: Unparker,
97 }
98
99 /// Thread-local context
100 struct Context {
101 /// Worker
102 worker: Arc<Worker>,
103
104 /// Core data
105 core: RefCell<Option<Box<Core>>>,
106 }
107
108 /// Starts the workers
109 pub(crate) struct Launch(Vec<Arc<Worker>>);
110
111 /// Running a task may consume the core. If the core is still available when
112 /// running the task completes, it is returned. Otherwise, the worker will need
113 /// to stop processing.
114 type RunResult = Result<Box<Core>, ()>;
115
116 /// A task handle
117 type Task = task::Task<Arc<Worker>>;
118
119 /// A notified task handle
120 type Notified = task::Notified<Arc<Worker>>;
121
122 // Tracks thread-local state
123 scoped_thread_local!(static CURRENT: Context);
124
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)125 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
126 let mut cores = vec![];
127 let mut remotes = vec![];
128
129 // Create the local queues
130 for _ in 0..size {
131 let (steal, run_queue) = queue::local();
132
133 let park = park.clone();
134 let unpark = park.unpark();
135
136 cores.push(Box::new(Core {
137 tick: 0,
138 lifo_slot: None,
139 run_queue,
140 is_searching: false,
141 is_shutdown: false,
142 tasks: LinkedList::new(),
143 park: Some(park),
144 rand: FastRand::new(seed()),
145 }));
146
147 remotes.push(Remote {
148 steal,
149 pending_drop: task::TransferStack::new(),
150 unpark,
151 });
152 }
153
154 let shared = Arc::new(Shared {
155 remotes: remotes.into_boxed_slice(),
156 inject: queue::Inject::new(),
157 idle: Idle::new(size),
158 shutdown_workers: Mutex::new(vec![]),
159 });
160
161 let mut launch = Launch(vec![]);
162
163 for (index, core) in cores.drain(..).enumerate() {
164 launch.0.push(Arc::new(Worker {
165 shared: shared.clone(),
166 index,
167 core: AtomicCell::new(Some(core)),
168 }));
169 }
170
171 (shared, launch)
172 }
173
174 cfg_blocking! {
175 pub(crate) fn block_in_place<F, R>(f: F) -> R
176 where
177 F: FnOnce() -> R,
178 {
179 // Try to steal the worker core back
180 struct Reset;
181
182 impl Drop for Reset {
183 fn drop(&mut self) {
184 CURRENT.with(|maybe_cx| {
185 if let Some(cx) = maybe_cx {
186 let core = cx.worker.core.take();
187 *cx.core.borrow_mut() = core;
188 }
189 });
190 }
191 }
192
193 CURRENT.with(|maybe_cx| {
194 let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
195
196 // Get the worker core. If none is set, then blocking is fine!
197 let core = match cx.core.borrow_mut().take() {
198 Some(core) => {
199 // We are effectively leaving the executor, so we need to
200 // forcibly end budgeting.
201 crate::coop::stop();
202 core
203 },
204 None => return,
205 };
206
207 // The parker should be set here
208 assert!(core.park.is_some());
209
210 // In order to block, the core must be sent to another thread for
211 // execution.
212 //
213 // First, move the core back into the worker's shared core slot.
214 cx.worker.core.set(core);
215
216 // Next, clone the worker handle and send it to a new thread for
217 // processing.
218 //
219 // Once the blocking task is done executing, we will attempt to
220 // steal the core back.
221 let worker = cx.worker.clone();
222 runtime::spawn_blocking(move || run(worker));
223 });
224
225 let _reset = Reset;
226
227 f()
228 }
229 }
230
231 /// After how many ticks is the global queue polled. This helps to ensure
232 /// fairness.
233 ///
234 /// The number is fairly arbitrary. I believe this value was copied from golang.
235 const GLOBAL_POLL_INTERVAL: u8 = 61;
236
237 impl Launch {
launch(mut self)238 pub(crate) fn launch(mut self) {
239 for worker in self.0.drain(..) {
240 runtime::spawn_blocking(move || run(worker));
241 }
242 }
243 }
244
run(worker: Arc<Worker>)245 fn run(worker: Arc<Worker>) {
246 // Acquire a core. If this fails, then another thread is running this
247 // worker and there is nothing further to do.
248 let core = match worker.core.take() {
249 Some(core) => core,
250 None => return,
251 };
252
253 // Set the worker context.
254 let cx = Context {
255 worker,
256 core: RefCell::new(None),
257 };
258
259 let _enter = crate::runtime::enter();
260
261 CURRENT.set(&cx, || {
262 // This should always be an error. It only returns a `Result` to support
263 // using `?` to short circuit.
264 assert!(cx.run(core).is_err());
265 });
266 }
267
268 impl Context {
run(&self, mut core: Box<Core>) -> RunResult269 fn run(&self, mut core: Box<Core>) -> RunResult {
270 while !core.is_shutdown {
271 // Increment the tick
272 core.tick();
273
274 // Run maintenance, if needed
275 core = self.maintenance(core);
276
277 // First, check work available to the current worker.
278 if let Some(task) = core.next_task(&self.worker) {
279 core = self.run_task(task, core)?;
280 continue;
281 }
282
283 // There is no more **local** work to process, try to steal work
284 // from other workers.
285 if let Some(task) = core.steal_work(&self.worker) {
286 core = self.run_task(task, core)?;
287 } else {
288 // Wait for work
289 core = self.park(core);
290 }
291 }
292
293 // Signal shutdown
294 self.worker.shared.shutdown(core, self.worker.clone());
295 Err(())
296 }
297
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult298 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
299 // Make sure thew orker is not in the **searching** state. This enables
300 // another idle worker to try to steal work.
301 core.transition_from_searching(&self.worker);
302
303 // Make the core available to the runtime context
304 *self.core.borrow_mut() = Some(core);
305
306 // Run the task
307 crate::coop::budget(|| {
308 task.run();
309
310 // As long as there is budget remaining and a task exists in the
311 // `lifo_slot`, then keep running.
312 loop {
313 // Check if we still have the core. If not, the core was stolen
314 // by another worker.
315 let mut core = match self.core.borrow_mut().take() {
316 Some(core) => core,
317 None => return Err(()),
318 };
319
320 // Check for a task in the LIFO slot
321 let task = match core.lifo_slot.take() {
322 Some(task) => task,
323 None => return Ok(core),
324 };
325
326 if crate::coop::has_budget_remaining() {
327 // Run the LIFO task, then loop
328 *self.core.borrow_mut() = Some(core);
329 task.run();
330 } else {
331 // Not enough budget left to run the LIFO task, push it to
332 // the back of the queue and return.
333 core.run_queue.push_back(task, self.worker.inject());
334 return Ok(core);
335 }
336 }
337 })
338 }
339
maintenance(&self, mut core: Box<Core>) -> Box<Core>340 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
341 if core.tick % GLOBAL_POLL_INTERVAL == 0 {
342 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
343 // to run without actually putting the thread to sleep.
344 core = self.park_timeout(core, Some(Duration::from_millis(0)));
345
346 // Run regularly scheduled maintenance
347 core.maintenance(&self.worker);
348 }
349
350 core
351 }
352
park(&self, mut core: Box<Core>) -> Box<Core>353 fn park(&self, mut core: Box<Core>) -> Box<Core> {
354 core.transition_to_parked(&self.worker);
355
356 while !core.is_shutdown {
357 core = self.park_timeout(core, None);
358
359 // Run regularly scheduled maintenance
360 core.maintenance(&self.worker);
361
362 if core.transition_from_parked(&self.worker) {
363 return core;
364 }
365 }
366
367 core
368 }
369
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>370 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
371 // Take the parker out of core
372 let mut park = core.park.take().expect("park missing");
373
374 // Store `core` in context
375 *self.core.borrow_mut() = Some(core);
376
377 // Park thread
378 if let Some(timeout) = duration {
379 park.park_timeout(timeout).expect("park failed");
380 } else {
381 park.park().expect("park failed");
382 }
383
384 // Remove `core` from context
385 core = self.core.borrow_mut().take().expect("core missing");
386
387 // Place `park` back in `core`
388 core.park = Some(park);
389
390 // If there are tasks available to steal, notify a worker
391 if core.run_queue.is_stealable() {
392 self.worker.shared.notify_parked();
393 }
394
395 core
396 }
397 }
398
399 impl Core {
400 /// Increment the tick
tick(&mut self)401 fn tick(&mut self) {
402 self.tick = self.tick.wrapping_add(1);
403 }
404
405 /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>406 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
407 if self.tick % GLOBAL_POLL_INTERVAL == 0 {
408 worker.inject().pop().or_else(|| self.next_local_task())
409 } else {
410 self.next_local_task().or_else(|| worker.inject().pop())
411 }
412 }
413
next_local_task(&mut self) -> Option<Notified>414 fn next_local_task(&mut self) -> Option<Notified> {
415 self.lifo_slot.take().or_else(|| self.run_queue.pop())
416 }
417
steal_work(&mut self, worker: &Worker) -> Option<Notified>418 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
419 if !self.transition_to_searching(worker) {
420 return None;
421 }
422
423 let num = worker.shared.remotes.len();
424 // Start from a random worker
425 let start = self.rand.fastrand_n(num as u32) as usize;
426
427 for i in 0..num {
428 let i = (start + i) % num;
429
430 // Don't steal from ourself! We know we don't have work.
431 if i == worker.index {
432 continue;
433 }
434
435 let target = &worker.shared.remotes[i];
436 if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
437 return Some(task);
438 }
439 }
440
441 // Fallback on checking the global queue
442 worker.shared.inject.pop()
443 }
444
transition_to_searching(&mut self, worker: &Worker) -> bool445 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
446 if !self.is_searching {
447 self.is_searching = worker.shared.idle.transition_worker_to_searching();
448 }
449
450 self.is_searching
451 }
452
transition_from_searching(&mut self, worker: &Worker)453 fn transition_from_searching(&mut self, worker: &Worker) {
454 if !self.is_searching {
455 return;
456 }
457
458 self.is_searching = false;
459 worker.shared.transition_worker_from_searching();
460 }
461
462 /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)463 fn transition_to_parked(&mut self, worker: &Worker) {
464 // When the final worker transitions **out** of searching to parked, it
465 // must check all the queues one last time in case work materialized
466 // between the last work scan and transitioning out of searching.
467 let is_last_searcher = worker
468 .shared
469 .idle
470 .transition_worker_to_parked(worker.index, self.is_searching);
471
472 // The worker is no longer searching. Setting this is the local cache
473 // only.
474 self.is_searching = false;
475
476 if is_last_searcher {
477 worker.shared.notify_if_work_pending();
478 }
479 }
480
481 /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool482 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
483 // If a task is in the lifo slot, then we must unpark regardless of
484 // being notified
485 if self.lifo_slot.is_some() {
486 worker.shared.idle.unpark_worker_by_id(worker.index);
487 self.is_searching = true;
488 return true;
489 }
490
491 if worker.shared.idle.is_parked(worker.index) {
492 return false;
493 }
494
495 // When unparked, the worker is in the searching state.
496 self.is_searching = true;
497 true
498 }
499
500 /// Runs maintenance work such as free pending tasks and check the pool's
501 /// state.
maintenance(&mut self, worker: &Worker)502 fn maintenance(&mut self, worker: &Worker) {
503 self.drain_pending_drop(worker);
504
505 if !self.is_shutdown {
506 // Check if the scheduler has been shutdown
507 self.is_shutdown = worker.inject().is_closed();
508 }
509 }
510
511 // Shutdown the core
shutdown(&mut self, worker: &Worker)512 fn shutdown(&mut self, worker: &Worker) {
513 // Take the core
514 let mut park = self.park.take().expect("park missing");
515
516 // Signal to all tasks to shut down.
517 for header in self.tasks.iter() {
518 header.shutdown();
519 }
520
521 loop {
522 self.drain_pending_drop(worker);
523
524 if self.tasks.is_empty() {
525 break;
526 }
527
528 // Wait until signalled
529 park.park().expect("park failed");
530 }
531
532 // Drain the queue
533 while let Some(_) = self.next_local_task() {}
534 }
535
drain_pending_drop(&mut self, worker: &Worker)536 fn drain_pending_drop(&mut self, worker: &Worker) {
537 use std::mem::ManuallyDrop;
538
539 for task in worker.remote().pending_drop.drain() {
540 let task = ManuallyDrop::new(task);
541
542 // safety: tasks are only pushed into the `pending_drop` stacks that
543 // are associated with the list they are inserted into. When a task
544 // is pushed into `pending_drop`, the ref-inc is skipped, so we must
545 // not ref-dec here.
546 //
547 // See `bind` and `release` implementations.
548 unsafe {
549 self.tasks.remove(task.header().into());
550 }
551 }
552 }
553 }
554
555 impl Worker {
556 /// Returns a reference to the scheduler's injection queue
inject(&self) -> &queue::Inject<Arc<Worker>>557 fn inject(&self) -> &queue::Inject<Arc<Worker>> {
558 &self.shared.inject
559 }
560
561 /// Return a reference to this worker's remote data
remote(&self) -> &Remote562 fn remote(&self) -> &Remote {
563 &self.shared.remotes[self.index]
564 }
565
eq(&self, other: &Worker) -> bool566 fn eq(&self, other: &Worker) -> bool {
567 self.shared.ptr_eq(&other.shared) && self.index == other.index
568 }
569 }
570
571 impl task::Schedule for Arc<Worker> {
bind(task: Task) -> Arc<Worker>572 fn bind(task: Task) -> Arc<Worker> {
573 CURRENT.with(|maybe_cx| {
574 let cx = maybe_cx.expect("scheduler context missing");
575
576 // Track the task
577 cx.core
578 .borrow_mut()
579 .as_mut()
580 .expect("scheduler core missing")
581 .tasks
582 .push_front(task);
583
584 // Return a clone of the worker
585 cx.worker.clone()
586 })
587 }
588
release(&self, task: &Task) -> Option<Task>589 fn release(&self, task: &Task) -> Option<Task> {
590 use std::ptr::NonNull;
591
592 CURRENT.with(|maybe_cx| {
593 let cx = maybe_cx.expect("scheduler context missing");
594
595 if self.eq(&cx.worker) {
596 let mut maybe_core = cx.core.borrow_mut();
597
598 if let Some(core) = &mut *maybe_core {
599 // Directly remove the task
600 //
601 // safety: the task is inserted in the list in `bind`.
602 unsafe {
603 let ptr = NonNull::from(task.header());
604 return core.tasks.remove(ptr);
605 }
606 }
607 }
608
609 // Track the task to be released by the worker that owns it
610 //
611 // Safety: We get a new handle without incrementing the ref-count.
612 // A ref-count is held by the "owned" linked list and it is only
613 // ever removed from that list as part of the release process: this
614 // method or popping the task from `pending_drop`. Thus, we can rely
615 // on the ref-count held by the linked-list to keep the memory
616 // alive.
617 //
618 // When the task is removed from the stack, it is forgotten instead
619 // of dropped.
620 let task = unsafe { Task::from_raw(task.header().into()) };
621
622 self.remote().pending_drop.push(task);
623
624 if cx.core.borrow().is_some() {
625 return None;
626 }
627
628 // The worker core has been handed off to another thread. In the
629 // event that the scheduler is currently shutting down, the thread
630 // that owns the task may be waiting on the release to complete
631 // shutdown.
632 if self.inject().is_closed() {
633 self.remote().unpark.unpark();
634 }
635
636 None
637 })
638 }
639
schedule(&self, task: Notified)640 fn schedule(&self, task: Notified) {
641 self.shared.schedule(task, false);
642 }
643
yield_now(&self, task: Notified)644 fn yield_now(&self, task: Notified) {
645 self.shared.schedule(task, true);
646 }
647 }
648
649 impl Shared {
schedule(&self, task: Notified, is_yield: bool)650 pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
651 CURRENT.with(|maybe_cx| {
652 if let Some(cx) = maybe_cx {
653 // Make sure the task is part of the **current** scheduler.
654 if self.ptr_eq(&cx.worker.shared) {
655 // And the current thread still holds a core
656 if let Some(core) = cx.core.borrow_mut().as_mut() {
657 self.schedule_local(core, task, is_yield);
658 return;
659 }
660 }
661 }
662
663 // Otherwise, use the inject queue
664 self.inject.push(task);
665 self.notify_parked();
666 });
667 }
668
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)669 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
670 // Spawning from the worker thread. If scheduling a "yield" then the
671 // task must always be pushed to the back of the queue, enabling other
672 // tasks to be executed. If **not** a yield, then there is more
673 // flexibility and the task may go to the front of the queue.
674 let should_notify = if is_yield {
675 core.run_queue.push_back(task, &self.inject);
676 true
677 } else {
678 // Push to the LIFO slot
679 let prev = core.lifo_slot.take();
680 let ret = prev.is_some();
681
682 if let Some(prev) = prev {
683 core.run_queue.push_back(prev, &self.inject);
684 }
685
686 core.lifo_slot = Some(task);
687
688 ret
689 };
690
691 // Only notify if not currently parked. If `park` is `None`, then the
692 // scheduling is from a resource driver. As notifications often come in
693 // batches, the notification is delayed until the park is complete.
694 if should_notify && core.park.is_some() {
695 self.notify_parked();
696 }
697 }
698
close(&self)699 pub(super) fn close(&self) {
700 if self.inject.close() {
701 self.notify_all();
702 }
703 }
704
notify_parked(&self)705 fn notify_parked(&self) {
706 if let Some(index) = self.idle.worker_to_notify() {
707 self.remotes[index].unpark.unpark();
708 }
709 }
710
notify_all(&self)711 fn notify_all(&self) {
712 for remote in &self.remotes[..] {
713 remote.unpark.unpark();
714 }
715 }
716
notify_if_work_pending(&self)717 fn notify_if_work_pending(&self) {
718 for remote in &self.remotes[..] {
719 if !remote.steal.is_empty() {
720 self.notify_parked();
721 return;
722 }
723 }
724
725 if !self.inject.is_empty() {
726 self.notify_parked();
727 }
728 }
729
transition_worker_from_searching(&self)730 fn transition_worker_from_searching(&self) {
731 if self.idle.transition_worker_from_searching() {
732 // We are the final searching worker. Because work was found, we
733 // need to notify another worker.
734 self.notify_parked();
735 }
736 }
737
738 /// Signals that a worker has observed the shutdown signal and has replaced
739 /// its core back into its handle.
740 ///
741 /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>, worker: Arc<Worker>)742 fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
743 let mut workers = self.shutdown_workers.lock().unwrap();
744 workers.push((core, worker));
745
746 if workers.len() != self.remotes.len() {
747 return;
748 }
749
750 for (mut core, worker) in workers.drain(..) {
751 core.shutdown(&worker);
752 }
753
754 // Drain the injection queue
755 while let Some(_) = self.inject.pop() {}
756 }
757
ptr_eq(&self, other: &Shared) -> bool758 fn ptr_eq(&self, other: &Shared) -> bool {
759 self as *const _ == other as *const _
760 }
761 }
762