1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 
7 use crate::loom::rand::seed;
8 use crate::loom::sync::{Arc, Mutex};
9 use crate::park::{Park, Unpark};
10 use crate::runtime;
11 use crate::runtime::park::{Parker, Unparker};
12 use crate::runtime::thread_pool::{AtomicCell, Idle};
13 use crate::runtime::{queue, task};
14 use crate::util::linked_list::LinkedList;
15 use crate::util::FastRand;
16 
17 use std::cell::RefCell;
18 use std::time::Duration;
19 
20 /// A scheduler worker
21 pub(super) struct Worker {
22     /// Reference to shared state
23     shared: Arc<Shared>,
24 
25     /// Index holding this worker's remote state
26     index: usize,
27 
28     /// Used to hand-off a worker's core to another thread.
29     core: AtomicCell<Core>,
30 }
31 
32 /// Core data
33 struct Core {
34     /// Used to schedule bookkeeping tasks every so often.
35     tick: u8,
36 
37     /// When a task is scheduled from a worker, it is stored in this slot. The
38     /// worker will check this slot for a task **before** checking the run
39     /// queue. This effectively results in the **last** scheduled task to be run
40     /// next (LIFO). This is an optimization for message passing patterns and
41     /// helps to reduce latency.
42     lifo_slot: Option<Notified>,
43 
44     /// The worker-local run queue.
45     run_queue: queue::Local<Arc<Worker>>,
46 
47     /// True if the worker is currently searching for more work. Searching
48     /// involves attempting to steal from other workers.
49     is_searching: bool,
50 
51     /// True if the scheduler is being shutdown
52     is_shutdown: bool,
53 
54     /// Tasks owned by the core
55     tasks: LinkedList<Task>,
56 
57     /// Parker
58     ///
59     /// Stored in an `Option` as the parker is added / removed to make the
60     /// borrow checker happy.
61     park: Option<Parker>,
62 
63     /// Fast random number generator.
64     rand: FastRand,
65 }
66 
67 /// State shared across all workers
68 pub(super) struct Shared {
69     /// Per-worker remote state. All other workers have access to this and is
70     /// how they communicate between each other.
71     remotes: Box<[Remote]>,
72 
73     /// Submit work to the scheduler while **not** currently on a worker thread.
74     inject: queue::Inject<Arc<Worker>>,
75 
76     /// Coordinates idle workers
77     idle: Idle,
78 
79     /// Workers have have observed the shutdown signal
80     ///
81     /// The core is **not** placed back in the worker to avoid it from being
82     /// stolen by a thread that was spawned as part of `block_in_place`.
83     shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
84 }
85 
86 /// Used to communicate with a worker from other threads.
87 struct Remote {
88     /// Steal tasks from this worker.
89     steal: queue::Steal<Arc<Worker>>,
90 
91     /// Transfers tasks to be released. Any worker pushes tasks, only the owning
92     /// worker pops.
93     pending_drop: task::TransferStack<Arc<Worker>>,
94 
95     /// Unparks the associated worker thread
96     unpark: Unparker,
97 }
98 
99 /// Thread-local context
100 struct Context {
101     /// Worker
102     worker: Arc<Worker>,
103 
104     /// Core data
105     core: RefCell<Option<Box<Core>>>,
106 }
107 
108 /// Starts the workers
109 pub(crate) struct Launch(Vec<Arc<Worker>>);
110 
111 /// Running a task may consume the core. If the core is still available when
112 /// running the task completes, it is returned. Otherwise, the worker will need
113 /// to stop processing.
114 type RunResult = Result<Box<Core>, ()>;
115 
116 /// A task handle
117 type Task = task::Task<Arc<Worker>>;
118 
119 /// A notified task handle
120 type Notified = task::Notified<Arc<Worker>>;
121 
122 // Tracks thread-local state
123 scoped_thread_local!(static CURRENT: Context);
124 
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)125 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
126     let mut cores = vec![];
127     let mut remotes = vec![];
128 
129     // Create the local queues
130     for _ in 0..size {
131         let (steal, run_queue) = queue::local();
132 
133         let park = park.clone();
134         let unpark = park.unpark();
135 
136         cores.push(Box::new(Core {
137             tick: 0,
138             lifo_slot: None,
139             run_queue,
140             is_searching: false,
141             is_shutdown: false,
142             tasks: LinkedList::new(),
143             park: Some(park),
144             rand: FastRand::new(seed()),
145         }));
146 
147         remotes.push(Remote {
148             steal,
149             pending_drop: task::TransferStack::new(),
150             unpark,
151         });
152     }
153 
154     let shared = Arc::new(Shared {
155         remotes: remotes.into_boxed_slice(),
156         inject: queue::Inject::new(),
157         idle: Idle::new(size),
158         shutdown_workers: Mutex::new(vec![]),
159     });
160 
161     let mut launch = Launch(vec![]);
162 
163     for (index, core) in cores.drain(..).enumerate() {
164         launch.0.push(Arc::new(Worker {
165             shared: shared.clone(),
166             index,
167             core: AtomicCell::new(Some(core)),
168         }));
169     }
170 
171     (shared, launch)
172 }
173 
174 cfg_blocking! {
175     pub(crate) fn block_in_place<F, R>(f: F) -> R
176     where
177         F: FnOnce() -> R,
178     {
179         // Try to steal the worker core back
180         struct Reset;
181 
182         impl Drop for Reset {
183             fn drop(&mut self) {
184                 CURRENT.with(|maybe_cx| {
185                     if let Some(cx) = maybe_cx {
186                         let core = cx.worker.core.take();
187                         *cx.core.borrow_mut() = core;
188                     }
189                 });
190             }
191         }
192 
193         CURRENT.with(|maybe_cx| {
194             let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
195 
196             // Get the worker core. If none is set, then blocking is fine!
197             let core = match cx.core.borrow_mut().take() {
198                 Some(core) => {
199                     // We are effectively leaving the executor, so we need to
200                     // forcibly end budgeting.
201                     crate::coop::stop();
202                     core
203                 },
204                 None => return,
205             };
206 
207             // The parker should be set here
208             assert!(core.park.is_some());
209 
210             // In order to block, the core must be sent to another thread for
211             // execution.
212             //
213             // First, move the core back into the worker's shared core slot.
214             cx.worker.core.set(core);
215 
216             // Next, clone the worker handle and send it to a new thread for
217             // processing.
218             //
219             // Once the blocking task is done executing, we will attempt to
220             // steal the core back.
221             let worker = cx.worker.clone();
222             runtime::spawn_blocking(move || run(worker));
223         });
224 
225         let _reset = Reset;
226 
227         f()
228     }
229 }
230 
231 /// After how many ticks is the global queue polled. This helps to ensure
232 /// fairness.
233 ///
234 /// The number is fairly arbitrary. I believe this value was copied from golang.
235 const GLOBAL_POLL_INTERVAL: u8 = 61;
236 
237 impl Launch {
launch(mut self)238     pub(crate) fn launch(mut self) {
239         for worker in self.0.drain(..) {
240             runtime::spawn_blocking(move || run(worker));
241         }
242     }
243 }
244 
run(worker: Arc<Worker>)245 fn run(worker: Arc<Worker>) {
246     // Acquire a core. If this fails, then another thread is running this
247     // worker and there is nothing further to do.
248     let core = match worker.core.take() {
249         Some(core) => core,
250         None => return,
251     };
252 
253     // Set the worker context.
254     let cx = Context {
255         worker,
256         core: RefCell::new(None),
257     };
258 
259     let _enter = crate::runtime::enter();
260 
261     CURRENT.set(&cx, || {
262         // This should always be an error. It only returns a `Result` to support
263         // using `?` to short circuit.
264         assert!(cx.run(core).is_err());
265     });
266 }
267 
268 impl Context {
run(&self, mut core: Box<Core>) -> RunResult269     fn run(&self, mut core: Box<Core>) -> RunResult {
270         while !core.is_shutdown {
271             // Increment the tick
272             core.tick();
273 
274             // Run maintenance, if needed
275             core = self.maintenance(core);
276 
277             // First, check work available to the current worker.
278             if let Some(task) = core.next_task(&self.worker) {
279                 core = self.run_task(task, core)?;
280                 continue;
281             }
282 
283             // There is no more **local** work to process, try to steal work
284             // from other workers.
285             if let Some(task) = core.steal_work(&self.worker) {
286                 core = self.run_task(task, core)?;
287             } else {
288                 // Wait for work
289                 core = self.park(core);
290             }
291         }
292 
293         // Signal shutdown
294         self.worker.shared.shutdown(core, self.worker.clone());
295         Err(())
296     }
297 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult298     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
299         // Make sure thew orker is not in the **searching** state. This enables
300         // another idle worker to try to steal work.
301         core.transition_from_searching(&self.worker);
302 
303         // Make the core available to the runtime context
304         *self.core.borrow_mut() = Some(core);
305 
306         // Run the task
307         crate::coop::budget(|| {
308             task.run();
309 
310             // As long as there is budget remaining and a task exists in the
311             // `lifo_slot`, then keep running.
312             loop {
313                 // Check if we still have the core. If not, the core was stolen
314                 // by another worker.
315                 let mut core = match self.core.borrow_mut().take() {
316                     Some(core) => core,
317                     None => return Err(()),
318                 };
319 
320                 // Check for a task in the LIFO slot
321                 let task = match core.lifo_slot.take() {
322                     Some(task) => task,
323                     None => return Ok(core),
324                 };
325 
326                 if crate::coop::has_budget_remaining() {
327                     // Run the LIFO task, then loop
328                     *self.core.borrow_mut() = Some(core);
329                     task.run();
330                 } else {
331                     // Not enough budget left to run the LIFO task, push it to
332                     // the back of the queue and return.
333                     core.run_queue.push_back(task, self.worker.inject());
334                     return Ok(core);
335                 }
336             }
337         })
338     }
339 
maintenance(&self, mut core: Box<Core>) -> Box<Core>340     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
341         if core.tick % GLOBAL_POLL_INTERVAL == 0 {
342             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
343             // to run without actually putting the thread to sleep.
344             core = self.park_timeout(core, Some(Duration::from_millis(0)));
345 
346             // Run regularly scheduled maintenance
347             core.maintenance(&self.worker);
348         }
349 
350         core
351     }
352 
park(&self, mut core: Box<Core>) -> Box<Core>353     fn park(&self, mut core: Box<Core>) -> Box<Core> {
354         core.transition_to_parked(&self.worker);
355 
356         while !core.is_shutdown {
357             core = self.park_timeout(core, None);
358 
359             // Run regularly scheduled maintenance
360             core.maintenance(&self.worker);
361 
362             if core.transition_from_parked(&self.worker) {
363                 return core;
364             }
365         }
366 
367         core
368     }
369 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>370     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
371         // Take the parker out of core
372         let mut park = core.park.take().expect("park missing");
373 
374         // Store `core` in context
375         *self.core.borrow_mut() = Some(core);
376 
377         // Park thread
378         if let Some(timeout) = duration {
379             park.park_timeout(timeout).expect("park failed");
380         } else {
381             park.park().expect("park failed");
382         }
383 
384         // Remove `core` from context
385         core = self.core.borrow_mut().take().expect("core missing");
386 
387         // Place `park` back in `core`
388         core.park = Some(park);
389 
390         // If there are tasks available to steal, notify a worker
391         if core.run_queue.is_stealable() {
392             self.worker.shared.notify_parked();
393         }
394 
395         core
396     }
397 }
398 
399 impl Core {
400     /// Increment the tick
tick(&mut self)401     fn tick(&mut self) {
402         self.tick = self.tick.wrapping_add(1);
403     }
404 
405     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>406     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
407         if self.tick % GLOBAL_POLL_INTERVAL == 0 {
408             worker.inject().pop().or_else(|| self.next_local_task())
409         } else {
410             self.next_local_task().or_else(|| worker.inject().pop())
411         }
412     }
413 
next_local_task(&mut self) -> Option<Notified>414     fn next_local_task(&mut self) -> Option<Notified> {
415         self.lifo_slot.take().or_else(|| self.run_queue.pop())
416     }
417 
steal_work(&mut self, worker: &Worker) -> Option<Notified>418     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
419         if !self.transition_to_searching(worker) {
420             return None;
421         }
422 
423         let num = worker.shared.remotes.len();
424         // Start from a random worker
425         let start = self.rand.fastrand_n(num as u32) as usize;
426 
427         for i in 0..num {
428             let i = (start + i) % num;
429 
430             // Don't steal from ourself! We know we don't have work.
431             if i == worker.index {
432                 continue;
433             }
434 
435             let target = &worker.shared.remotes[i];
436             if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
437                 return Some(task);
438             }
439         }
440 
441         // Fallback on checking the global queue
442         worker.shared.inject.pop()
443     }
444 
transition_to_searching(&mut self, worker: &Worker) -> bool445     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
446         if !self.is_searching {
447             self.is_searching = worker.shared.idle.transition_worker_to_searching();
448         }
449 
450         self.is_searching
451     }
452 
transition_from_searching(&mut self, worker: &Worker)453     fn transition_from_searching(&mut self, worker: &Worker) {
454         if !self.is_searching {
455             return;
456         }
457 
458         self.is_searching = false;
459         worker.shared.transition_worker_from_searching();
460     }
461 
462     /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)463     fn transition_to_parked(&mut self, worker: &Worker) {
464         // When the final worker transitions **out** of searching to parked, it
465         // must check all the queues one last time in case work materialized
466         // between the last work scan and transitioning out of searching.
467         let is_last_searcher = worker
468             .shared
469             .idle
470             .transition_worker_to_parked(worker.index, self.is_searching);
471 
472         // The worker is no longer searching. Setting this is the local cache
473         // only.
474         self.is_searching = false;
475 
476         if is_last_searcher {
477             worker.shared.notify_if_work_pending();
478         }
479     }
480 
481     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool482     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
483         // If a task is in the lifo slot, then we must unpark regardless of
484         // being notified
485         if self.lifo_slot.is_some() {
486             worker.shared.idle.unpark_worker_by_id(worker.index);
487             self.is_searching = true;
488             return true;
489         }
490 
491         if worker.shared.idle.is_parked(worker.index) {
492             return false;
493         }
494 
495         // When unparked, the worker is in the searching state.
496         self.is_searching = true;
497         true
498     }
499 
500     /// Runs maintenance work such as free pending tasks and check the pool's
501     /// state.
maintenance(&mut self, worker: &Worker)502     fn maintenance(&mut self, worker: &Worker) {
503         self.drain_pending_drop(worker);
504 
505         if !self.is_shutdown {
506             // Check if the scheduler has been shutdown
507             self.is_shutdown = worker.inject().is_closed();
508         }
509     }
510 
511     // Shutdown the core
shutdown(&mut self, worker: &Worker)512     fn shutdown(&mut self, worker: &Worker) {
513         // Take the core
514         let mut park = self.park.take().expect("park missing");
515 
516         // Signal to all tasks to shut down.
517         for header in self.tasks.iter() {
518             header.shutdown();
519         }
520 
521         loop {
522             self.drain_pending_drop(worker);
523 
524             if self.tasks.is_empty() {
525                 break;
526             }
527 
528             // Wait until signalled
529             park.park().expect("park failed");
530         }
531 
532         // Drain the queue
533         while let Some(_) = self.next_local_task() {}
534     }
535 
drain_pending_drop(&mut self, worker: &Worker)536     fn drain_pending_drop(&mut self, worker: &Worker) {
537         use std::mem::ManuallyDrop;
538 
539         for task in worker.remote().pending_drop.drain() {
540             let task = ManuallyDrop::new(task);
541 
542             // safety: tasks are only pushed into the `pending_drop` stacks that
543             // are associated with the list they are inserted into. When a task
544             // is pushed into `pending_drop`, the ref-inc is skipped, so we must
545             // not ref-dec here.
546             //
547             // See `bind` and `release` implementations.
548             unsafe {
549                 self.tasks.remove(task.header().into());
550             }
551         }
552     }
553 }
554 
555 impl Worker {
556     /// Returns a reference to the scheduler's injection queue
inject(&self) -> &queue::Inject<Arc<Worker>>557     fn inject(&self) -> &queue::Inject<Arc<Worker>> {
558         &self.shared.inject
559     }
560 
561     /// Return a reference to this worker's remote data
remote(&self) -> &Remote562     fn remote(&self) -> &Remote {
563         &self.shared.remotes[self.index]
564     }
565 
eq(&self, other: &Worker) -> bool566     fn eq(&self, other: &Worker) -> bool {
567         self.shared.ptr_eq(&other.shared) && self.index == other.index
568     }
569 }
570 
571 impl task::Schedule for Arc<Worker> {
bind(task: Task) -> Arc<Worker>572     fn bind(task: Task) -> Arc<Worker> {
573         CURRENT.with(|maybe_cx| {
574             let cx = maybe_cx.expect("scheduler context missing");
575 
576             // Track the task
577             cx.core
578                 .borrow_mut()
579                 .as_mut()
580                 .expect("scheduler core missing")
581                 .tasks
582                 .push_front(task);
583 
584             // Return a clone of the worker
585             cx.worker.clone()
586         })
587     }
588 
release(&self, task: &Task) -> Option<Task>589     fn release(&self, task: &Task) -> Option<Task> {
590         use std::ptr::NonNull;
591 
592         CURRENT.with(|maybe_cx| {
593             let cx = maybe_cx.expect("scheduler context missing");
594 
595             if self.eq(&cx.worker) {
596                 let mut maybe_core = cx.core.borrow_mut();
597 
598                 if let Some(core) = &mut *maybe_core {
599                     // Directly remove the task
600                     //
601                     // safety: the task is inserted in the list in `bind`.
602                     unsafe {
603                         let ptr = NonNull::from(task.header());
604                         return core.tasks.remove(ptr);
605                     }
606                 }
607             }
608 
609             // Track the task to be released by the worker that owns it
610             //
611             // Safety: We get a new handle without incrementing the ref-count.
612             // A ref-count is held by the "owned" linked list and it is only
613             // ever removed from that list as part of the release process: this
614             // method or popping the task from `pending_drop`. Thus, we can rely
615             // on the ref-count held by the linked-list to keep the memory
616             // alive.
617             //
618             // When the task is removed from the stack, it is forgotten instead
619             // of dropped.
620             let task = unsafe { Task::from_raw(task.header().into()) };
621 
622             self.remote().pending_drop.push(task);
623 
624             if cx.core.borrow().is_some() {
625                 return None;
626             }
627 
628             // The worker core has been handed off to another thread. In the
629             // event that the scheduler is currently shutting down, the thread
630             // that owns the task may be waiting on the release to complete
631             // shutdown.
632             if self.inject().is_closed() {
633                 self.remote().unpark.unpark();
634             }
635 
636             None
637         })
638     }
639 
schedule(&self, task: Notified)640     fn schedule(&self, task: Notified) {
641         self.shared.schedule(task, false);
642     }
643 
yield_now(&self, task: Notified)644     fn yield_now(&self, task: Notified) {
645         self.shared.schedule(task, true);
646     }
647 }
648 
649 impl Shared {
schedule(&self, task: Notified, is_yield: bool)650     pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
651         CURRENT.with(|maybe_cx| {
652             if let Some(cx) = maybe_cx {
653                 // Make sure the task is part of the **current** scheduler.
654                 if self.ptr_eq(&cx.worker.shared) {
655                     // And the current thread still holds a core
656                     if let Some(core) = cx.core.borrow_mut().as_mut() {
657                         self.schedule_local(core, task, is_yield);
658                         return;
659                     }
660                 }
661             }
662 
663             // Otherwise, use the inject queue
664             self.inject.push(task);
665             self.notify_parked();
666         });
667     }
668 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)669     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
670         // Spawning from the worker thread. If scheduling a "yield" then the
671         // task must always be pushed to the back of the queue, enabling other
672         // tasks to be executed. If **not** a yield, then there is more
673         // flexibility and the task may go to the front of the queue.
674         let should_notify = if is_yield {
675             core.run_queue.push_back(task, &self.inject);
676             true
677         } else {
678             // Push to the LIFO slot
679             let prev = core.lifo_slot.take();
680             let ret = prev.is_some();
681 
682             if let Some(prev) = prev {
683                 core.run_queue.push_back(prev, &self.inject);
684             }
685 
686             core.lifo_slot = Some(task);
687 
688             ret
689         };
690 
691         // Only notify if not currently parked. If `park` is `None`, then the
692         // scheduling is from a resource driver. As notifications often come in
693         // batches, the notification is delayed until the park is complete.
694         if should_notify && core.park.is_some() {
695             self.notify_parked();
696         }
697     }
698 
close(&self)699     pub(super) fn close(&self) {
700         if self.inject.close() {
701             self.notify_all();
702         }
703     }
704 
notify_parked(&self)705     fn notify_parked(&self) {
706         if let Some(index) = self.idle.worker_to_notify() {
707             self.remotes[index].unpark.unpark();
708         }
709     }
710 
notify_all(&self)711     fn notify_all(&self) {
712         for remote in &self.remotes[..] {
713             remote.unpark.unpark();
714         }
715     }
716 
notify_if_work_pending(&self)717     fn notify_if_work_pending(&self) {
718         for remote in &self.remotes[..] {
719             if !remote.steal.is_empty() {
720                 self.notify_parked();
721                 return;
722             }
723         }
724 
725         if !self.inject.is_empty() {
726             self.notify_parked();
727         }
728     }
729 
transition_worker_from_searching(&self)730     fn transition_worker_from_searching(&self) {
731         if self.idle.transition_worker_from_searching() {
732             // We are the final searching worker. Because work was found, we
733             // need to notify another worker.
734             self.notify_parked();
735         }
736     }
737 
738     /// Signals that a worker has observed the shutdown signal and has replaced
739     /// its core back into its handle.
740     ///
741     /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>, worker: Arc<Worker>)742     fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
743         let mut workers = self.shutdown_workers.lock().unwrap();
744         workers.push((core, worker));
745 
746         if workers.len() != self.remotes.len() {
747             return;
748         }
749 
750         for (mut core, worker) in workers.drain(..) {
751             core.shutdown(&worker);
752         }
753 
754         // Drain the injection queue
755         while let Some(_) = self.inject.pop() {}
756     }
757 
ptr_eq(&self, other: &Shared) -> bool758     fn ptr_eq(&self, other: &Shared) -> bool {
759         self as *const _ == other as *const _
760     }
761 }
762