1 use crate::job::StackJob;
2 use crate::latch::{LatchProbe, SpinLatch};
3 use crate::log::Event::*;
4 use crate::registry::{self, WorkerThread};
5 use crate::unwind;
6 use std::any::Any;
7 
8 use crate::FnContext;
9 
10 #[cfg(test)]
11 mod test;
12 
13 /// Takes two closures and *potentially* runs them in parallel. It
14 /// returns a pair of the results from those closures.
15 ///
16 /// Conceptually, calling `join()` is similar to spawning two threads,
17 /// one executing each of the two closures. However, the
18 /// implementation is quite different and incurs very low
19 /// overhead. The underlying technique is called "work stealing": the
20 /// Rayon runtime uses a fixed pool of worker threads and attempts to
21 /// only execute code in parallel when there are idle CPUs to handle
22 /// it.
23 ///
24 /// When `join` is called from outside the thread pool, the calling
25 /// thread will block while the closures execute in the pool.  When
26 /// `join` is called within the pool, the calling thread still actively
27 /// participates in the thread pool. It will begin by executing closure
28 /// A (on the current thread). While it is doing that, it will advertise
29 /// closure B as being available for other threads to execute. Once closure A
30 /// has completed, the current thread will try to execute closure B;
31 /// if however closure B has been stolen, then it will look for other work
32 /// while waiting for the thief to fully execute closure B. (This is the
33 /// typical work-stealing strategy).
34 ///
35 /// # Examples
36 ///
37 /// This example uses join to perform a quick-sort (note this is not a
38 /// particularly optimized implementation: if you **actually** want to
39 /// sort for real, you should prefer [the `par_sort` method] offered
40 /// by Rayon).
41 ///
42 /// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
43 ///
44 /// ```rust
45 /// # use rayon_core as rayon;
46 /// let mut v = vec![5, 1, 8, 22, 0, 44];
47 /// quick_sort(&mut v);
48 /// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
49 ///
50 /// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
51 ///    if v.len() > 1 {
52 ///        let mid = partition(v);
53 ///        let (lo, hi) = v.split_at_mut(mid);
54 ///        rayon::join(|| quick_sort(lo),
55 ///                    || quick_sort(hi));
56 ///    }
57 /// }
58 ///
59 /// // Partition rearranges all items `<=` to the pivot
60 /// // item (arbitrary selected to be the last item in the slice)
61 /// // to the first half of the slice. It then returns the
62 /// // "dividing point" where the pivot is placed.
63 /// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
64 ///     let pivot = v.len() - 1;
65 ///     let mut i = 0;
66 ///     for j in 0..pivot {
67 ///         if v[j] <= v[pivot] {
68 ///             v.swap(i, j);
69 ///             i += 1;
70 ///         }
71 ///     }
72 ///     v.swap(i, pivot);
73 ///     i
74 /// }
75 /// ```
76 ///
77 /// # Warning about blocking I/O
78 ///
79 /// The assumption is that the closures given to `join()` are
80 /// CPU-bound tasks that do not perform I/O or other blocking
81 /// operations. If you do perform I/O, and that I/O should block
82 /// (e.g., waiting for a network request), the overall performance may
83 /// be poor.  Moreover, if you cause one closure to be blocked waiting
84 /// on another (for example, using a channel), that could lead to a
85 /// deadlock.
86 ///
87 /// # Panics
88 ///
89 /// No matter what happens, both closures will always be executed.  If
90 /// a single closure panics, whether it be the first or second
91 /// closure, that panic will be propagated and hence `join()` will
92 /// panic with the same panic value. If both closures panic, `join()`
93 /// will panic with the panic value from the first closure.
join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, RA: Send, RB: Send,94 pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
95 where
96     A: FnOnce() -> RA + Send,
97     B: FnOnce() -> RB + Send,
98     RA: Send,
99     RB: Send,
100 {
101     #[inline]
102     fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
103         move |_| f()
104     }
105 
106     join_context(call(oper_a), call(oper_b))
107 }
108 
109 /// Identical to `join`, except that the closures have a parameter
110 /// that provides context for the way the closure has been called,
111 /// especially indicating whether they're executing on a different
112 /// thread than where `join_context` was called.  This will occur if
113 /// the second job is stolen by a different thread, or if
114 /// `join_context` was called from outside the thread pool to begin
115 /// with.
join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce(FnContext) -> RA + Send, B: FnOnce(FnContext) -> RB + Send, RA: Send, RB: Send,116 pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
117 where
118     A: FnOnce(FnContext) -> RA + Send,
119     B: FnOnce(FnContext) -> RB + Send,
120     RA: Send,
121     RB: Send,
122 {
123     #[inline]
124     fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
125         move || f(FnContext::new(injected))
126     }
127 
128     #[inline]
129     fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
130         move |migrated| f(FnContext::new(migrated))
131     }
132 
133     registry::in_worker(|worker_thread, injected| unsafe {
134         log!(Join {
135             worker: worker_thread.index()
136         });
137 
138         // Create virtual wrapper for task b; this all has to be
139         // done here so that the stack frame can keep it all live
140         // long enough.
141         let job_b = StackJob::new(call_b(oper_b), SpinLatch::new());
142         let job_b_ref = job_b.as_job_ref();
143         worker_thread.push(job_b_ref);
144 
145         // Execute task a; hopefully b gets stolen in the meantime.
146         let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
147         let result_a = match status_a {
148             Ok(v) => v,
149             Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
150         };
151 
152         // Now that task A has finished, try to pop job B from the
153         // local stack.  It may already have been popped by job A; it
154         // may also have been stolen. There may also be some tasks
155         // pushed on top of it in the stack, and we will have to pop
156         // those off to get to it.
157         while !job_b.latch.probe() {
158             if let Some(job) = worker_thread.take_local_job() {
159                 if job == job_b_ref {
160                     // Found it! Let's run it.
161                     //
162                     // Note that this could panic, but it's ok if we unwind here.
163                     log!(PoppedRhs {
164                         worker: worker_thread.index()
165                     });
166                     let result_b = job_b.run_inline(injected);
167                     return (result_a, result_b);
168                 } else {
169                     log!(PoppedJob {
170                         worker: worker_thread.index()
171                     });
172                     worker_thread.execute(job);
173                 }
174             } else {
175                 // Local deque is empty. Time to steal from other
176                 // threads.
177                 log!(LostJob {
178                     worker: worker_thread.index()
179                 });
180                 worker_thread.wait_until(&job_b.latch);
181                 debug_assert!(job_b.latch.probe());
182                 break;
183             }
184         }
185 
186         (result_a, job_b.into_result())
187     })
188 }
189 
190 /// If job A panics, we still cannot return until we are sure that job
191 /// B is complete. This is because it may contain references into the
192 /// enclosing stack frame(s).
193 #[cold] // cold path
join_recover_from_panic( worker_thread: &WorkerThread, job_b_latch: &SpinLatch, err: Box<dyn Any + Send>, ) -> !194 unsafe fn join_recover_from_panic(
195     worker_thread: &WorkerThread,
196     job_b_latch: &SpinLatch,
197     err: Box<dyn Any + Send>,
198 ) -> ! {
199     worker_thread.wait_until(job_b_latch);
200     unwind::resume_unwinding(err)
201 }
202