1 //! Methods for custom fork-join scopes, created by the [`scope()`]
2 //! function. These are a more flexible alternative to [`join()`].
3 //!
4 //! [`scope()`]: fn.scope.html
5 //! [`join()`]: ../join/join.fn.html
6 
7 use crate::job::{HeapJob, JobFifo};
8 use crate::latch::{CountLatch, Latch};
9 use crate::log::Event::*;
10 use crate::registry::{in_worker, Registry, WorkerThread};
11 use crate::unwind;
12 use std::any::Any;
13 use std::fmt;
14 use std::marker::PhantomData;
15 use std::mem;
16 use std::ptr;
17 use std::sync::atomic::{AtomicPtr, Ordering};
18 use std::sync::Arc;
19 
20 #[cfg(test)]
21 mod test;
22 
23 /// Represents a fork-join scope which can be used to spawn any number of tasks.
24 /// See [`scope()`] for more information.
25 ///
26 ///[`scope()`]: fn.scope.html
27 pub struct Scope<'scope> {
28     base: ScopeBase<'scope>,
29 }
30 
31 /// Represents a fork-join scope which can be used to spawn any number of tasks.
32 /// Those spawned from the same thread are prioritized in relative FIFO order.
33 /// See [`scope_fifo()`] for more information.
34 ///
35 ///[`scope_fifo()`]: fn.scope_fifo.html
36 pub struct ScopeFifo<'scope> {
37     base: ScopeBase<'scope>,
38     fifos: Vec<JobFifo>,
39 }
40 
41 struct ScopeBase<'scope> {
42     /// thread where `scope()` was executed (note that individual jobs
43     /// may be executing on different worker threads, though they
44     /// should always be within the same pool of threads)
45     owner_thread_index: usize,
46 
47     /// thread registry where `scope()` was executed.
48     registry: Arc<Registry>,
49 
50     /// if some job panicked, the error is stored here; it will be
51     /// propagated to the one who created the scope
52     panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
53 
54     /// latch to set when the counter drops to zero (and hence this scope is complete)
55     job_completed_latch: CountLatch,
56 
57     /// You can think of a scope as containing a list of closures to execute,
58     /// all of which outlive `'scope`.  They're not actually required to be
59     /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
60     /// the closures are only *moved* across threads to be executed.
61     marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
62 }
63 
64 /// Creates a "fork-join" scope `s` and invokes the closure with a
65 /// reference to `s`. This closure can then spawn asynchronous tasks
66 /// into `s`. Those tasks may run asynchronously with respect to the
67 /// closure; they may themselves spawn additional tasks into `s`. When
68 /// the closure returns, it will block until all tasks that have been
69 /// spawned into `s` complete.
70 ///
71 /// `scope()` is a more flexible building block compared to `join()`,
72 /// since a loop can be used to spawn any number of tasks without
73 /// recursing. However, that flexibility comes at a performance price:
74 /// tasks spawned using `scope()` must be allocated onto the heap,
75 /// whereas `join()` can make exclusive use of the stack. **Prefer
76 /// `join()` (or, even better, parallel iterators) where possible.**
77 ///
78 /// # Example
79 ///
80 /// The Rayon `join()` function launches two closures and waits for them
81 /// to stop. One could implement `join()` using a scope like so, although
82 /// it would be less efficient than the real implementation:
83 ///
84 /// ```rust
85 /// # use rayon_core as rayon;
86 /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
87 ///     where A: FnOnce() -> RA + Send,
88 ///           B: FnOnce() -> RB + Send,
89 ///           RA: Send,
90 ///           RB: Send,
91 /// {
92 ///     let mut result_a: Option<RA> = None;
93 ///     let mut result_b: Option<RB> = None;
94 ///     rayon::scope(|s| {
95 ///         s.spawn(|_| result_a = Some(oper_a()));
96 ///         s.spawn(|_| result_b = Some(oper_b()));
97 ///     });
98 ///     (result_a.unwrap(), result_b.unwrap())
99 /// }
100 /// ```
101 ///
102 /// # A note on threading
103 ///
104 /// The closure given to `scope()` executes in the Rayon thread-pool,
105 /// as do those given to `spawn()`. This means that you can't access
106 /// thread-local variables (well, you can, but they may have
107 /// unexpected values).
108 ///
109 /// # Task execution
110 ///
111 /// Task execution potentially starts as soon as `spawn()` is called.
112 /// The task will end sometime before `scope()` returns. Note that the
113 /// *closure* given to scope may return much earlier. In general
114 /// the lifetime of a scope created like `scope(body) goes something like this:
115 ///
116 /// - Scope begins when `scope(body)` is called
117 /// - Scope body `body()` is invoked
118 ///     - Scope tasks may be spawned
119 /// - Scope body returns
120 /// - Scope tasks execute, possibly spawning more tasks
121 /// - Once all tasks are done, scope ends and `scope()` returns
122 ///
123 /// To see how and when tasks are joined, consider this example:
124 ///
125 /// ```rust
126 /// # use rayon_core as rayon;
127 /// // point start
128 /// rayon::scope(|s| {
129 ///     s.spawn(|s| { // task s.1
130 ///         s.spawn(|s| { // task s.1.1
131 ///             rayon::scope(|t| {
132 ///                 t.spawn(|_| ()); // task t.1
133 ///                 t.spawn(|_| ()); // task t.2
134 ///             });
135 ///         });
136 ///     });
137 ///     s.spawn(|s| { // task s.2
138 ///     });
139 ///     // point mid
140 /// });
141 /// // point end
142 /// ```
143 ///
144 /// The various tasks that are run will execute roughly like so:
145 ///
146 /// ```notrust
147 /// | (start)
148 /// |
149 /// | (scope `s` created)
150 /// +-----------------------------------------------+ (task s.2)
151 /// +-------+ (task s.1)                            |
152 /// |       |                                       |
153 /// |       +---+ (task s.1.1)                      |
154 /// |       |   |                                   |
155 /// |       |   | (scope `t` created)               |
156 /// |       |   +----------------+ (task t.2)       |
157 /// |       |   +---+ (task t.1) |                  |
158 /// | (mid) |   |   |            |                  |
159 /// :       |   + <-+------------+ (scope `t` ends) |
160 /// :       |   |                                   |
161 /// |<------+---+-----------------------------------+ (scope `s` ends)
162 /// |
163 /// | (end)
164 /// ```
165 ///
166 /// The point here is that everything spawned into scope `s` will
167 /// terminate (at latest) at the same point -- right before the
168 /// original call to `rayon::scope` returns. This includes new
169 /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
170 /// scope is created (such as `t`), the things spawned into that scope
171 /// will be joined before that scope returns, which in turn occurs
172 /// before the creating task (task `s.1.1` in this case) finishes.
173 ///
174 /// There is no guaranteed order of execution for spawns in a scope,
175 /// given that other threads may steal tasks at any time. However, they
176 /// are generally prioritized in a LIFO order on the thread from which
177 /// they were spawned. So in this example, absent any stealing, we can
178 /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
179 /// threads always steal from the other end of the deque, like FIFO
180 /// order.  The idea is that "recent" tasks are most likely to be fresh
181 /// in the local CPU's cache, while other threads can steal older
182 /// "stale" tasks.  For an alternate approach, consider
183 /// [`scope_fifo()`] instead.
184 ///
185 /// [`scope_fifo()`]: fn.scope_fifo.html
186 ///
187 /// # Accessing stack data
188 ///
189 /// In general, spawned tasks may access stack data in place that
190 /// outlives the scope itself. Other data must be fully owned by the
191 /// spawned task.
192 ///
193 /// ```rust
194 /// # use rayon_core as rayon;
195 /// let ok: Vec<i32> = vec![1, 2, 3];
196 /// rayon::scope(|s| {
197 ///     let bad: Vec<i32> = vec![4, 5, 6];
198 ///     s.spawn(|_| {
199 ///         // We can access `ok` because outlives the scope `s`.
200 ///         println!("ok: {:?}", ok);
201 ///
202 ///         // If we just try to use `bad` here, the closure will borrow `bad`
203 ///         // (because we are just printing it out, and that only requires a
204 ///         // borrow), which will result in a compilation error. Read on
205 ///         // for options.
206 ///         // println!("bad: {:?}", bad);
207 ///    });
208 /// });
209 /// ```
210 ///
211 /// As the comments example above suggest, to reference `bad` we must
212 /// take ownership of it. One way to do this is to detach the closure
213 /// from the surrounding stack frame, using the `move` keyword. This
214 /// will cause it to take ownership of *all* the variables it touches,
215 /// in this case including both `ok` *and* `bad`:
216 ///
217 /// ```rust
218 /// # use rayon_core as rayon;
219 /// let ok: Vec<i32> = vec![1, 2, 3];
220 /// rayon::scope(|s| {
221 ///     let bad: Vec<i32> = vec![4, 5, 6];
222 ///     s.spawn(move |_| {
223 ///         println!("ok: {:?}", ok);
224 ///         println!("bad: {:?}", bad);
225 ///     });
226 ///
227 ///     // That closure is fine, but now we can't use `ok` anywhere else,
228 ///     // since it is owend by the previous task:
229 ///     // s.spawn(|_| println!("ok: {:?}", ok));
230 /// });
231 /// ```
232 ///
233 /// While this works, it could be a problem if we want to use `ok` elsewhere.
234 /// There are two choices. We can keep the closure as a `move` closure, but
235 /// instead of referencing the variable `ok`, we create a shadowed variable that
236 /// is a borrow of `ok` and capture *that*:
237 ///
238 /// ```rust
239 /// # use rayon_core as rayon;
240 /// let ok: Vec<i32> = vec![1, 2, 3];
241 /// rayon::scope(|s| {
242 ///     let bad: Vec<i32> = vec![4, 5, 6];
243 ///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
244 ///     s.spawn(move |_| {
245 ///         println!("ok: {:?}", ok); // captures the shadowed version
246 ///         println!("bad: {:?}", bad);
247 ///     });
248 ///
249 ///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
250 ///     // can be shared freely. Note that we need a `move` closure here though,
251 ///     // because otherwise we'd be trying to borrow the shadowed `ok`,
252 ///     // and that doesn't outlive `scope`.
253 ///     s.spawn(move |_| println!("ok: {:?}", ok));
254 /// });
255 /// ```
256 ///
257 /// Another option is not to use the `move` keyword but instead to take ownership
258 /// of individual variables:
259 ///
260 /// ```rust
261 /// # use rayon_core as rayon;
262 /// let ok: Vec<i32> = vec![1, 2, 3];
263 /// rayon::scope(|s| {
264 ///     let bad: Vec<i32> = vec![4, 5, 6];
265 ///     s.spawn(|_| {
266 ///         // Transfer ownership of `bad` into a local variable (also named `bad`).
267 ///         // This will force the closure to take ownership of `bad` from the environment.
268 ///         let bad = bad;
269 ///         println!("ok: {:?}", ok); // `ok` is only borrowed.
270 ///         println!("bad: {:?}", bad); // refers to our local variable, above.
271 ///     });
272 ///
273 ///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
274 /// });
275 /// ```
276 ///
277 /// # Panics
278 ///
279 /// If a panic occurs, either in the closure given to `scope()` or in
280 /// any of the spawned jobs, that panic will be propagated and the
281 /// call to `scope()` will panic. If multiple panics occurs, it is
282 /// non-deterministic which of their panic values will propagate.
283 /// Regardless, once a task is spawned using `scope.spawn()`, it will
284 /// execute, even if the spawning task should later panic. `scope()`
285 /// returns once all spawned jobs have completed, and any panics are
286 /// propagated at that point.
scope<'scope, OP, R>(op: OP) -> R where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send,287 pub fn scope<'scope, OP, R>(op: OP) -> R
288 where
289     OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send,
290     R: Send,
291 {
292     in_worker(|owner_thread, _| {
293         let scope = Scope::<'scope>::new(owner_thread);
294         unsafe { scope.base.complete(owner_thread, || op(&scope)) }
295     })
296 }
297 
298 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
299 /// closure with a reference to `s`. This closure can then spawn
300 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
301 /// respect to the closure; they may themselves spawn additional tasks
302 /// into `s`. When the closure returns, it will block until all tasks
303 /// that have been spawned into `s` complete.
304 ///
305 /// # Task execution
306 ///
307 /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
308 /// difference in the order of execution. Consider a similar example:
309 ///
310 /// [`scope()`]: fn.scope.html
311 ///
312 /// ```rust
313 /// # use rayon_core as rayon;
314 /// // point start
315 /// rayon::scope_fifo(|s| {
316 ///     s.spawn_fifo(|s| { // task s.1
317 ///         s.spawn_fifo(|s| { // task s.1.1
318 ///             rayon::scope_fifo(|t| {
319 ///                 t.spawn_fifo(|_| ()); // task t.1
320 ///                 t.spawn_fifo(|_| ()); // task t.2
321 ///             });
322 ///         });
323 ///     });
324 ///     s.spawn_fifo(|s| { // task s.2
325 ///     });
326 ///     // point mid
327 /// });
328 /// // point end
329 /// ```
330 ///
331 /// The various tasks that are run will execute roughly like so:
332 ///
333 /// ```notrust
334 /// | (start)
335 /// |
336 /// | (FIFO scope `s` created)
337 /// +--------------------+ (task s.1)
338 /// +-------+ (task s.2) |
339 /// |       |            +---+ (task s.1.1)
340 /// |       |            |   |
341 /// |       |            |   | (FIFO scope `t` created)
342 /// |       |            |   +----------------+ (task t.1)
343 /// |       |            |   +---+ (task t.2) |
344 /// | (mid) |            |   |   |            |
345 /// :       |            |   + <-+------------+ (scope `t` ends)
346 /// :       |            |   |
347 /// |<------+------------+---+ (scope `s` ends)
348 /// |
349 /// | (end)
350 /// ```
351 ///
352 /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
353 /// the thread from which they were spawned, as opposed to `scope()`'s
354 /// LIFO.  So in this example, we can expect `s.1` to execute before
355 /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
356 /// FIFO order, as usual. Overall, this has roughly the same order as
357 /// the now-deprecated [`breadth_first`] option, except the effect is
358 /// isolated to a particular scope. If spawns are intermingled from any
359 /// combination of `scope()` and `scope_fifo()`, or from different
360 /// threads, their order is only specified with respect to spawns in the
361 /// same scope and thread.
362 ///
363 /// For more details on this design, see Rayon [RFC #1].
364 ///
365 /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
366 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
367 ///
368 /// # Panics
369 ///
370 /// If a panic occurs, either in the closure given to `scope_fifo()` or
371 /// in any of the spawned jobs, that panic will be propagated and the
372 /// call to `scope_fifo()` will panic. If multiple panics occurs, it is
373 /// non-deterministic which of their panic values will propagate.
374 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
375 /// will execute, even if the spawning task should later panic.
376 /// `scope_fifo()` returns once all spawned jobs have completed, and any
377 /// panics are propagated at that point.
scope_fifo<'scope, OP, R>(op: OP) -> R where OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, R: Send,378 pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
379 where
380     OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send,
381     R: Send,
382 {
383     in_worker(|owner_thread, _| {
384         let scope = ScopeFifo::<'scope>::new(owner_thread);
385         unsafe { scope.base.complete(owner_thread, || op(&scope)) }
386     })
387 }
388 
389 impl<'scope> Scope<'scope> {
new(owner_thread: &WorkerThread) -> Self390     fn new(owner_thread: &WorkerThread) -> Self {
391         Scope {
392             base: ScopeBase::new(owner_thread),
393         }
394     }
395 
396     /// Spawns a job into the fork-join scope `self`. This job will
397     /// execute sometime before the fork-join scope completes.  The
398     /// job is specified as a closure, and this closure receives its
399     /// own reference to the scope `self` as argument. This can be
400     /// used to inject new jobs into `self`.
401     ///
402     /// # Returns
403     ///
404     /// Nothing. The spawned closures cannot pass back values to the
405     /// caller directly, though they can write to local variables on
406     /// the stack (if those variables outlive the scope) or
407     /// communicate through shared channels.
408     ///
409     /// (The intention is to eventualy integrate with Rust futures to
410     /// support spawns of functions that compute a value.)
411     ///
412     /// # Examples
413     ///
414     /// ```rust
415     /// # use rayon_core as rayon;
416     /// let mut value_a = None;
417     /// let mut value_b = None;
418     /// let mut value_c = None;
419     /// rayon::scope(|s| {
420     ///     s.spawn(|s1| {
421     ///           // ^ this is the same scope as `s`; this handle `s1`
422     ///           //   is intended for use by the spawned task,
423     ///           //   since scope handles cannot cross thread boundaries.
424     ///
425     ///         value_a = Some(22);
426     ///
427     ///         // the scope `s` will not end until all these tasks are done
428     ///         s1.spawn(|_| {
429     ///             value_b = Some(44);
430     ///         });
431     ///     });
432     ///
433     ///     s.spawn(|_| {
434     ///         value_c = Some(66);
435     ///     });
436     /// });
437     /// assert_eq!(value_a, Some(22));
438     /// assert_eq!(value_b, Some(44));
439     /// assert_eq!(value_c, Some(66));
440     /// ```
441     ///
442     /// # See also
443     ///
444     /// The [`scope` function] has more extensive documentation about
445     /// task spawning.
446     ///
447     /// [`scope` function]: fn.scope.html
spawn<BODY>(&self, body: BODY) where BODY: FnOnce(&Scope<'scope>) + Send + 'scope,448     pub fn spawn<BODY>(&self, body: BODY)
449     where
450         BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
451     {
452         self.base.increment();
453         unsafe {
454             let job_ref = Box::new(HeapJob::new(move || {
455                 self.base.execute_job(move || body(self))
456             }))
457             .as_job_ref();
458 
459             // Since `Scope` implements `Sync`, we can't be sure that we're still in a
460             // thread of this pool, so we can't just push to the local worker thread.
461             self.base.registry.inject_or_push(job_ref);
462         }
463     }
464 }
465 
466 impl<'scope> ScopeFifo<'scope> {
new(owner_thread: &WorkerThread) -> Self467     fn new(owner_thread: &WorkerThread) -> Self {
468         let num_threads = owner_thread.registry().num_threads();
469         ScopeFifo {
470             base: ScopeBase::new(owner_thread),
471             fifos: (0..num_threads).map(|_| JobFifo::new()).collect(),
472         }
473     }
474 
475     /// Spawns a job into the fork-join scope `self`. This job will
476     /// execute sometime before the fork-join scope completes.  The
477     /// job is specified as a closure, and this closure receives its
478     /// own reference to the scope `self` as argument. This can be
479     /// used to inject new jobs into `self`.
480     ///
481     /// # See also
482     ///
483     /// This method is akin to [`Scope::spawn()`], but with a FIFO
484     /// priority.  The [`scope_fifo` function] has more details about
485     /// this distinction.
486     ///
487     /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
488     /// [`scope_fifo` function]: fn.scope_fifo.html
spawn_fifo<BODY>(&self, body: BODY) where BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,489     pub fn spawn_fifo<BODY>(&self, body: BODY)
490     where
491         BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
492     {
493         self.base.increment();
494         unsafe {
495             let job_ref = Box::new(HeapJob::new(move || {
496                 self.base.execute_job(move || body(self))
497             }))
498             .as_job_ref();
499 
500             // If we're in the pool, use our scope's private fifo for this thread to execute
501             // in a locally-FIFO order.  Otherwise, just use the pool's global injector.
502             match self.base.registry.current_thread() {
503                 Some(worker) => {
504                     let fifo = &self.fifos[worker.index()];
505                     worker.push(fifo.push(job_ref));
506                 }
507                 None => self.base.registry.inject(&[job_ref]),
508             }
509         }
510     }
511 }
512 
513 impl<'scope> ScopeBase<'scope> {
514     /// Creates the base of a new scope for the given worker thread
new(owner_thread: &WorkerThread) -> Self515     fn new(owner_thread: &WorkerThread) -> Self {
516         ScopeBase {
517             owner_thread_index: owner_thread.index(),
518             registry: owner_thread.registry().clone(),
519             panic: AtomicPtr::new(ptr::null_mut()),
520             job_completed_latch: CountLatch::new(),
521             marker: PhantomData,
522         }
523     }
524 
increment(&self)525     fn increment(&self) {
526         self.job_completed_latch.increment();
527     }
528 
529     /// Executes `func` as a job, either aborting or executing as
530     /// appropriate.
531     ///
532     /// Unsafe because it must be executed on a worker thread.
complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R where FUNC: FnOnce() -> R,533     unsafe fn complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R
534     where
535         FUNC: FnOnce() -> R,
536     {
537         let result = self.execute_job_closure(func);
538         self.steal_till_jobs_complete(owner_thread);
539         result.unwrap() // only None if `op` panicked, and that would have been propagated
540     }
541 
542     /// Executes `func` as a job, either aborting or executing as
543     /// appropriate.
544     ///
545     /// Unsafe because it must be executed on a worker thread.
execute_job<FUNC>(&self, func: FUNC) where FUNC: FnOnce(),546     unsafe fn execute_job<FUNC>(&self, func: FUNC)
547     where
548         FUNC: FnOnce(),
549     {
550         let _: Option<()> = self.execute_job_closure(func);
551     }
552 
553     /// Executes `func` as a job in scope. Adjusts the "job completed"
554     /// counters and also catches any panic and stores it into
555     /// `scope`.
556     ///
557     /// Unsafe because this must be executed on a worker thread.
execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R,558     unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
559     where
560         FUNC: FnOnce() -> R,
561     {
562         match unwind::halt_unwinding(func) {
563             Ok(r) => {
564                 self.job_completed_ok();
565                 Some(r)
566             }
567             Err(err) => {
568                 self.job_panicked(err);
569                 None
570             }
571         }
572     }
573 
job_panicked(&self, err: Box<dyn Any + Send + 'static>)574     unsafe fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
575         // capture the first error we see, free the rest
576         let nil = ptr::null_mut();
577         let mut err = Box::new(err); // box up the fat ptr
578         if self
579             .panic
580             .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
581             .is_ok()
582         {
583             log!(JobPanickedErrorStored {
584                 owner_thread: self.owner_thread_index
585             });
586             mem::forget(err); // ownership now transferred into self.panic
587         } else {
588             log!(JobPanickedErrorNotStored {
589                 owner_thread: self.owner_thread_index
590             });
591         }
592 
593         self.job_completed_latch.set();
594     }
595 
job_completed_ok(&self)596     unsafe fn job_completed_ok(&self) {
597         log!(JobCompletedOk {
598             owner_thread: self.owner_thread_index
599         });
600         self.job_completed_latch.set();
601     }
602 
steal_till_jobs_complete(&self, owner_thread: &WorkerThread)603     unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) {
604         // wait for job counter to reach 0:
605         owner_thread.wait_until(&self.job_completed_latch);
606 
607         // propagate panic, if any occurred; at this point, all
608         // outstanding jobs have completed, so we can use a relaxed
609         // ordering:
610         let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
611         if !panic.is_null() {
612             log!(ScopeCompletePanicked {
613                 owner_thread: owner_thread.index()
614             });
615             let value: Box<Box<dyn Any + Send + 'static>> = mem::transmute(panic);
616             unwind::resume_unwinding(*value);
617         } else {
618             log!(ScopeCompleteNoPanic {
619                 owner_thread: owner_thread.index()
620             });
621         }
622     }
623 }
624 
625 impl<'scope> fmt::Debug for Scope<'scope> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result626     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
627         fmt.debug_struct("Scope")
628             .field("pool_id", &self.base.registry.id())
629             .field("owner_thread_index", &self.base.owner_thread_index)
630             .field("panic", &self.base.panic)
631             .field("job_completed_latch", &self.base.job_completed_latch)
632             .finish()
633     }
634 }
635 
636 impl<'scope> fmt::Debug for ScopeFifo<'scope> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result637     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
638         fmt.debug_struct("ScopeFifo")
639             .field("num_fifos", &self.fifos.len())
640             .field("pool_id", &self.base.registry.id())
641             .field("owner_thread_index", &self.base.owner_thread_index)
642             .field("panic", &self.base.panic)
643             .field("job_completed_latch", &self.base.job_completed_latch)
644             .finish()
645     }
646 }
647