1 //!
2 //! [Under construction](https://github.com/rayon-rs/rayon/issues/231)
3 //!
4 //! ## Restricting multiple versions
5 //!
6 //! In order to ensure proper coordination between threadpools, and especially
7 //! to make sure there's only one global threadpool, `rayon-core` is actively
8 //! restricted from building multiple versions of itself into a single target.
9 //! You may see a build error like this in violation:
10 //!
11 //! ```text
12 //! error: native library `rayon-core` is being linked to by more
13 //! than one package, and can only be linked to by one package
14 //! ```
15 //!
16 //! While we strive to keep `rayon-core` semver-compatible, it's still
17 //! possible to arrive at this situation if different crates have overly
18 //! restrictive tilde or inequality requirements for `rayon-core`.  The
19 //! conflicting requirements will need to be resolved before the build will
20 //! succeed.
21 
22 #![doc(html_root_url = "https://docs.rs/rayon-core/1.4")]
23 #![deny(missing_debug_implementations)]
24 #![deny(missing_docs)]
25 #![cfg_attr(test, feature(conservative_impl_trait))]
26 
27 use std::any::Any;
28 use std::env;
29 use std::io;
30 use std::error::Error;
31 use std::marker::PhantomData;
32 use std::str::FromStr;
33 use std::fmt;
34 
35 extern crate crossbeam_deque;
36 #[macro_use]
37 extern crate lazy_static;
38 extern crate libc;
39 extern crate num_cpus;
40 extern crate rand;
41 
42 #[macro_use]
43 mod log;
44 
45 mod latch;
46 mod join;
47 mod job;
48 mod registry;
49 mod scope;
50 mod sleep;
51 mod spawn;
52 mod test;
53 mod thread_pool;
54 mod unwind;
55 mod util;
56 
57 #[cfg(rayon_unstable)]
58 pub mod internal;
59 pub use thread_pool::ThreadPool;
60 pub use thread_pool::current_thread_index;
61 pub use thread_pool::current_thread_has_pending_tasks;
62 pub use join::{join, join_context};
63 pub use scope::{scope, Scope};
64 pub use spawn::spawn;
65 
66 /// Returns the number of threads in the current registry. If this
67 /// code is executing within a Rayon thread-pool, then this will be
68 /// the number of threads for the thread-pool of the current
69 /// thread. Otherwise, it will be the number of threads for the global
70 /// thread-pool.
71 ///
72 /// This can be useful when trying to judge how many times to split
73 /// parallel work (the parallel iterator traits use this value
74 /// internally for this purpose).
75 ///
76 /// # Future compatibility note
77 ///
78 /// Note that unless this thread-pool was created with a
79 /// builder that specifies the number of threads, then this
80 /// number may vary over time in future versions (see [the
81 /// `num_threads()` method for details][snt]).
82 ///
83 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
current_num_threads() -> usize84 pub fn current_num_threads() -> usize {
85     ::registry::Registry::current_num_threads()
86 }
87 
88 /// Error when initializing a thread pool.
89 #[derive(Debug)]
90 pub struct ThreadPoolBuildError {
91     kind: ErrorKind,
92 }
93 
94 #[derive(Debug)]
95 enum ErrorKind {
96     GlobalPoolAlreadyInitialized,
97     IOError(io::Error),
98 }
99 
100 /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
101 /// ## Creating a ThreadPool
102 /// The following creates a thread pool with 22 threads.
103 ///
104 /// ```rust
105 /// # use rayon_core as rayon;
106 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
107 /// ```
108 ///
109 /// To instead configure the global thread pool, use [`build_global()`]:
110 ///
111 /// ```rust
112 /// # use rayon_core as rayon;
113 /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
114 /// ```
115 ///
116 /// [`ThreadPool`]: struct.ThreadPool.html
117 /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
118 #[derive(Default)]
119 pub struct ThreadPoolBuilder {
120     /// The number of threads in the rayon thread pool.
121     /// If zero will use the RAYON_NUM_THREADS environment variable.
122     /// If RAYON_NUM_THREADS is invalid or zero will use the default.
123     num_threads: usize,
124 
125     /// Custom closure, if any, to handle a panic that we cannot propagate
126     /// anywhere else.
127     panic_handler: Option<Box<PanicHandler>>,
128 
129     /// Closure to compute the name of a thread.
130     get_thread_name: Option<Box<FnMut(usize) -> String>>,
131 
132     /// The stack size for the created worker threads
133     stack_size: Option<usize>,
134 
135     /// Closure invoked on worker thread start.
136     start_handler: Option<Box<StartHandler>>,
137 
138     /// Closure invoked on worker thread exit.
139     exit_handler: Option<Box<ExitHandler>>,
140 
141     /// If false, worker threads will execute spawned jobs in a
142     /// "depth-first" fashion. If true, they will do a "breadth-first"
143     /// fashion. Depth-first is the default.
144     breadth_first: bool,
145 }
146 
147 /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
148 ///
149 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
150 #[deprecated(note = "Use `ThreadPoolBuilder`")]
151 #[derive(Default)]
152 pub struct Configuration {
153     builder: ThreadPoolBuilder,
154 }
155 
156 /// The type for a panic handling closure. Note that this same closure
157 /// may be invoked multiple times in parallel.
158 type PanicHandler = Fn(Box<Any + Send>) + Send + Sync;
159 
160 /// The type for a closure that gets invoked when a thread starts. The
161 /// closure is passed the index of the thread on which it is invoked.
162 /// Note that this same closure may be invoked multiple times in parallel.
163 type StartHandler = Fn(usize) + Send + Sync;
164 
165 /// The type for a closure that gets invoked when a thread exits. The
166 /// closure is passed the index of the thread on which is is invoked.
167 /// Note that this same closure may be invoked multiple times in parallel.
168 type ExitHandler = Fn(usize) + Send + Sync;
169 
170 impl ThreadPoolBuilder {
171     /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
new() -> ThreadPoolBuilder172     pub fn new() -> ThreadPoolBuilder {
173         ThreadPoolBuilder::default()
174     }
175 
176     /// Create a new `ThreadPool` initialized using this configuration.
build(self) -> Result<ThreadPool, ThreadPoolBuildError>177     pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
178         thread_pool::build(self)
179     }
180 
181     /// Initializes the global thread pool. This initialization is
182     /// **optional**.  If you do not call this function, the thread pool
183     /// will be automatically initialized with the default
184     /// configuration. Calling `build_global` is not recommended, except
185     /// in two scenarios:
186     ///
187     /// - You wish to change the default configuration.
188     /// - You are running a benchmark, in which case initializing may
189     ///   yield slightly more consistent results, since the worker threads
190     ///   will already be ready to go even in the first iteration.  But
191     ///   this cost is minimal.
192     ///
193     /// Initialization of the global thread pool happens exactly
194     /// once. Once started, the configuration cannot be
195     /// changed. Therefore, if you call `build_global` a second time, it
196     /// will return an error. An `Ok` result indicates that this
197     /// is the first initialization of the thread pool.
build_global(self) -> Result<(), ThreadPoolBuildError>198     pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
199         let registry = try!(registry::init_global_registry(self));
200         registry.wait_until_primed();
201         Ok(())
202     }
203 
204     /// Get the number of threads that will be used for the thread
205     /// pool. See `num_threads()` for more information.
get_num_threads(&self) -> usize206     fn get_num_threads(&self) -> usize {
207         if self.num_threads > 0 {
208             self.num_threads
209         } else {
210             match env::var("RAYON_NUM_THREADS").ok().and_then(|s| usize::from_str(&s).ok()) {
211                 Some(x) if x > 0 => return x,
212                 Some(x) if x == 0 => return num_cpus::get(),
213                 _ => {},
214             }
215 
216             // Support for deprecated `RAYON_RS_NUM_CPUS`.
217             match env::var("RAYON_RS_NUM_CPUS").ok().and_then(|s| usize::from_str(&s).ok()) {
218                 Some(x) if x > 0 => x,
219                 _ => num_cpus::get(),
220             }
221         }
222     }
223 
224     /// Get the thread name for the thread with the given index.
get_thread_name(&mut self, index: usize) -> Option<String>225     fn get_thread_name(&mut self, index: usize) -> Option<String> {
226         self.get_thread_name.as_mut().map(|c| c(index))
227     }
228 
229     /// Set a closure which takes a thread index and returns
230     /// the thread's name.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static231     pub fn thread_name<F>(mut self, closure: F) -> Self
232     where F: FnMut(usize) -> String + 'static {
233         self.get_thread_name = Some(Box::new(closure));
234         self
235     }
236 
237     /// Set the number of threads to be used in the rayon threadpool.
238     ///
239     /// If you specify a non-zero number of threads using this
240     /// function, then the resulting thread-pools are guaranteed to
241     /// start at most this number of threads.
242     ///
243     /// If `num_threads` is 0, or you do not call this function, then
244     /// the Rayon runtime will select the number of threads
245     /// automatically. At present, this is based on the
246     /// `RAYON_NUM_THREADS` environment variable (if set),
247     /// or the number of logical CPUs (otherwise).
248     /// In the future, however, the default behavior may
249     /// change to dynamically add or remove threads as needed.
250     ///
251     /// **Future compatibility warning:** Given the default behavior
252     /// may change in the future, if you wish to rely on a fixed
253     /// number of threads, you should use this function to specify
254     /// that number. To reproduce the current default behavior, you
255     /// may wish to use the [`num_cpus`
256     /// crate](https://crates.io/crates/num_cpus) to query the number
257     /// of CPUs dynamically.
258     ///
259     /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
260     /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
261     /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
262     /// be prefered.
num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder263     pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
264         self.num_threads = num_threads;
265         self
266     }
267 
268     /// Returns a copy of the current panic handler.
take_panic_handler(&mut self) -> Option<Box<PanicHandler>>269     fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
270         self.panic_handler.take()
271     }
272 
273     /// Normally, whenever Rayon catches a panic, it tries to
274     /// propagate it to someplace sensible, to try and reflect the
275     /// semantics of sequential execution. But in some cases,
276     /// particularly with the `spawn()` APIs, there is no
277     /// obvious place where we should propagate the panic to.
278     /// In that case, this panic handler is invoked.
279     ///
280     /// If no panic handler is set, the default is to abort the
281     /// process, under the principle that panics should not go
282     /// unobserved.
283     ///
284     /// If the panic handler itself panics, this will abort the
285     /// process. To prevent this, wrap the body of your panic handler
286     /// in a call to `std::panic::catch_unwind()`.
panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder where H: Fn(Box<Any + Send>) + Send + Sync + 'static287     pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
288         where H: Fn(Box<Any + Send>) + Send + Sync + 'static
289     {
290         self.panic_handler = Some(Box::new(panic_handler));
291         self
292     }
293 
294     /// Get the stack size of the worker threads
get_stack_size(&self) -> Option<usize>295     fn get_stack_size(&self) -> Option<usize>{
296         self.stack_size
297     }
298 
299     /// Set the stack size of the worker threads
stack_size(mut self, stack_size: usize) -> Self300     pub fn stack_size(mut self, stack_size: usize) -> Self {
301         self.stack_size = Some(stack_size);
302         self
303     }
304 
305     /// Suggest to worker threads that they execute spawned jobs in a
306     /// "breadth-first" fashion. Typically, when a worker thread is
307     /// idle or blocked, it will attempt to execute the job from the
308     /// *top* of its local deque of work (i.e., the job most recently
309     /// spawned). If this flag is set to true, however, workers will
310     /// prefer to execute in a *breadth-first* fashion -- that is,
311     /// they will search for jobs at the *bottom* of their local
312     /// deque. (At present, workers *always* steal from the bottom of
313     /// other worker's deques, regardless of the setting of this
314     /// flag.)
315     ///
316     /// If you think of the tasks as a tree, where a parent task
317     /// spawns its children in the tree, then this flag loosely
318     /// corresponds to doing a breadth-first traversal of the tree,
319     /// whereas the default would be to do a depth-first traversal.
320     ///
321     /// **Note that this is an "execution hint".** Rayon's task
322     /// execution is highly dynamic and the precise order in which
323     /// independent tasks are executed is not intended to be
324     /// guaranteed.
breadth_first(mut self) -> Self325     pub fn breadth_first(mut self) -> Self {
326         self.breadth_first = true;
327         self
328     }
329 
get_breadth_first(&self) -> bool330     fn get_breadth_first(&self) -> bool {
331         self.breadth_first
332     }
333 
334     /// Takes the current thread start callback, leaving `None`.
take_start_handler(&mut self) -> Option<Box<StartHandler>>335     fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
336         self.start_handler.take()
337     }
338 
339     /// Set a callback to be invoked on thread start.
340     ///
341     /// The closure is passed the index of the thread on which it is invoked.
342     /// Note that this same closure may be invoked multiple times in parallel.
343     /// If this closure panics, the panic will be passed to the panic handler.
344     /// If that handler returns, then startup will continue normally.
start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder where H: Fn(usize) + Send + Sync + 'static345     pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
346         where H: Fn(usize) + Send + Sync + 'static
347     {
348         self.start_handler = Some(Box::new(start_handler));
349         self
350     }
351 
352     /// Returns a current thread exit callback, leaving `None`.
take_exit_handler(&mut self) -> Option<Box<ExitHandler>>353     fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
354         self.exit_handler.take()
355     }
356 
357     /// Set a callback to be invoked on thread exit.
358     ///
359     /// The closure is passed the index of the thread on which it is invoked.
360     /// Note that this same closure may be invoked multiple times in parallel.
361     /// If this closure panics, the panic will be passed to the panic handler.
362     /// If that handler returns, then the thread will exit normally.
exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder where H: Fn(usize) + Send + Sync + 'static363     pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
364         where H: Fn(usize) + Send + Sync + 'static
365     {
366         self.exit_handler = Some(Box::new(exit_handler));
367         self
368     }
369 }
370 
371 #[allow(deprecated)]
372 impl Configuration {
373     /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
new() -> Configuration374     pub fn new() -> Configuration {
375         Configuration { builder: ThreadPoolBuilder::new() }
376     }
377 
378     /// Deprecated in favor of `ThreadPoolBuilder::build`.
build(self) -> Result<ThreadPool, Box<Error + 'static>>379     pub fn build(self) -> Result<ThreadPool, Box<Error + 'static>> {
380         self.builder.build().map_err(|e| e.into())
381     }
382 
383     /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static384     pub fn thread_name<F>(mut self, closure: F) -> Self
385     where F: FnMut(usize) -> String + 'static {
386         self.builder = self.builder.thread_name(closure);
387         self
388     }
389 
390     /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
num_threads(mut self, num_threads: usize) -> Configuration391     pub fn num_threads(mut self, num_threads: usize) -> Configuration {
392         self.builder = self.builder.num_threads(num_threads);
393         self
394     }
395 
396     /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
panic_handler<H>(mut self, panic_handler: H) -> Configuration where H: Fn(Box<Any + Send>) + Send + Sync + 'static397     pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
398         where H: Fn(Box<Any + Send>) + Send + Sync + 'static
399     {
400         self.builder = self.builder.panic_handler(panic_handler);
401         self
402     }
403 
404     /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
stack_size(mut self, stack_size: usize) -> Self405     pub fn stack_size(mut self, stack_size: usize) -> Self {
406         self.builder = self.builder.stack_size(stack_size);
407         self
408     }
409 
410     /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
breadth_first(mut self) -> Self411     pub fn breadth_first(mut self) -> Self {
412         self.builder = self.builder.breadth_first();
413         self
414     }
415 
416     /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
start_handler<H>(mut self, start_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static417     pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
418         where H: Fn(usize) + Send + Sync + 'static
419     {
420         self.builder = self.builder.start_handler(start_handler);
421         self
422     }
423 
424     /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
exit_handler<H>(mut self, exit_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static425     pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
426         where H: Fn(usize) + Send + Sync + 'static
427     {
428         self.builder = self.builder.exit_handler(exit_handler);
429         self
430     }
431 
432     /// Returns a ThreadPoolBuilder with identical parameters.
into_builder(self) -> ThreadPoolBuilder433     fn into_builder(self) -> ThreadPoolBuilder {
434         self.builder
435     }
436 }
437 
438 impl ThreadPoolBuildError {
new(kind: ErrorKind) -> ThreadPoolBuildError439     fn new(kind: ErrorKind) -> ThreadPoolBuildError {
440         ThreadPoolBuildError { kind: kind }
441     }
442 }
443 
444 impl Error for ThreadPoolBuildError {
description(&self) -> &str445     fn description(&self) -> &str {
446         match self.kind {
447             ErrorKind::GlobalPoolAlreadyInitialized => "The global thread pool has already been initialized.",
448             ErrorKind::IOError(ref e) => e.description(),
449         }
450     }
451 }
452 
453 impl fmt::Display for ThreadPoolBuildError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result454     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
455         match self.kind {
456             ErrorKind::IOError(ref e) => e.fmt(f),
457             _ => self.description().fmt(f),
458         }
459     }
460 }
461 
462 /// Deprecated in favor of `ThreadPoolBuilder::build_global`.
463 #[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
464 #[allow(deprecated)]
initialize(config: Configuration) -> Result<(), Box<Error>>465 pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
466     config.into_builder().build_global().map_err(|e| e.into())
467 }
468 
469 impl fmt::Debug for ThreadPoolBuilder {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result470     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471         let ThreadPoolBuilder { ref num_threads, ref get_thread_name,
472                                 ref panic_handler, ref stack_size,
473                                 ref start_handler, ref exit_handler,
474                                 ref breadth_first } = *self;
475 
476         // Just print `Some(<closure>)` or `None` to the debug
477         // output.
478         struct ClosurePlaceholder;
479         impl fmt::Debug for ClosurePlaceholder {
480             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481                 f.write_str("<closure>")
482             }
483         }
484         let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
485         let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
486         let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
487         let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
488 
489         f.debug_struct("ThreadPoolBuilder")
490          .field("num_threads", num_threads)
491          .field("get_thread_name", &get_thread_name)
492          .field("panic_handler", &panic_handler)
493          .field("stack_size", &stack_size)
494          .field("start_handler", &start_handler)
495          .field("exit_handler", &exit_handler)
496          .field("breadth_first", &breadth_first)
497          .finish()
498     }
499 }
500 
501 #[allow(deprecated)]
502 impl fmt::Debug for Configuration {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result503     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
504         self.builder.fmt(f)
505     }
506 }
507 
508 /// Provides the calling context to a closure called by `join_context`.
509 #[derive(Debug)]
510 pub struct FnContext {
511     migrated: bool,
512 
513     /// disable `Send` and `Sync`, just for a little future-proofing.
514     _marker: PhantomData<*mut ()>,
515 }
516 
517 impl FnContext {
518     #[inline]
new(migrated: bool) -> Self519     fn new(migrated: bool) -> Self {
520         FnContext {
521             migrated: migrated,
522             _marker: PhantomData,
523         }
524     }
525 }
526 
527 impl FnContext {
528     /// Returns `true` if the closure was called from a different thread
529     /// than it was provided from.
530     #[inline]
migrated(&self) -> bool531     pub fn migrated(&self) -> bool {
532         self.migrated
533     }
534 }
535