1 //!
2 //! [Under construction](https://github.com/rayon-rs/rayon/issues/231)
3 //!
4 //! ## Restricting multiple versions
5 //!
6 //! In order to ensure proper coordination between threadpools, and especially
7 //! to make sure there's only one global threadpool, `rayon-core` is actively
8 //! restricted from building multiple versions of itself into a single target.
9 //! You may see a build error like this in violation:
10 //!
11 //! ```text
12 //! error: native library `rayon-core` is being linked to by more
13 //! than one package, and can only be linked to by one package
14 //! ```
15 //!
16 //! While we strive to keep `rayon-core` semver-compatible, it's still
17 //! possible to arrive at this situation if different crates have overly
18 //! restrictive tilde or inequality requirements for `rayon-core`. The
19 //! conflicting requirements will need to be resolved before the build will
20 //! succeed.
21
22 #![doc(html_root_url = "https://docs.rs/rayon-core/1.4")]
23 #![deny(missing_debug_implementations)]
24 #![deny(missing_docs)]
25 #![cfg_attr(test, feature(conservative_impl_trait))]
26
27 use std::any::Any;
28 use std::env;
29 use std::io;
30 use std::error::Error;
31 use std::marker::PhantomData;
32 use std::str::FromStr;
33 use std::fmt;
34
35 extern crate crossbeam_deque;
36 #[macro_use]
37 extern crate lazy_static;
38 extern crate libc;
39 extern crate num_cpus;
40 extern crate rand;
41
42 #[macro_use]
43 mod log;
44
45 mod latch;
46 mod join;
47 mod job;
48 mod registry;
49 mod scope;
50 mod sleep;
51 mod spawn;
52 mod test;
53 mod thread_pool;
54 mod unwind;
55 mod util;
56
57 #[cfg(rayon_unstable)]
58 pub mod internal;
59 pub use thread_pool::ThreadPool;
60 pub use thread_pool::current_thread_index;
61 pub use thread_pool::current_thread_has_pending_tasks;
62 pub use join::{join, join_context};
63 pub use scope::{scope, Scope};
64 pub use spawn::spawn;
65
66 /// Returns the number of threads in the current registry. If this
67 /// code is executing within a Rayon thread-pool, then this will be
68 /// the number of threads for the thread-pool of the current
69 /// thread. Otherwise, it will be the number of threads for the global
70 /// thread-pool.
71 ///
72 /// This can be useful when trying to judge how many times to split
73 /// parallel work (the parallel iterator traits use this value
74 /// internally for this purpose).
75 ///
76 /// # Future compatibility note
77 ///
78 /// Note that unless this thread-pool was created with a
79 /// builder that specifies the number of threads, then this
80 /// number may vary over time in future versions (see [the
81 /// `num_threads()` method for details][snt]).
82 ///
83 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
current_num_threads() -> usize84 pub fn current_num_threads() -> usize {
85 ::registry::Registry::current_num_threads()
86 }
87
88 /// Error when initializing a thread pool.
89 #[derive(Debug)]
90 pub struct ThreadPoolBuildError {
91 kind: ErrorKind,
92 }
93
94 #[derive(Debug)]
95 enum ErrorKind {
96 GlobalPoolAlreadyInitialized,
97 IOError(io::Error),
98 }
99
100 /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
101 /// ## Creating a ThreadPool
102 /// The following creates a thread pool with 22 threads.
103 ///
104 /// ```rust
105 /// # use rayon_core as rayon;
106 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
107 /// ```
108 ///
109 /// To instead configure the global thread pool, use [`build_global()`]:
110 ///
111 /// ```rust
112 /// # use rayon_core as rayon;
113 /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
114 /// ```
115 ///
116 /// [`ThreadPool`]: struct.ThreadPool.html
117 /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
118 #[derive(Default)]
119 pub struct ThreadPoolBuilder {
120 /// The number of threads in the rayon thread pool.
121 /// If zero will use the RAYON_NUM_THREADS environment variable.
122 /// If RAYON_NUM_THREADS is invalid or zero will use the default.
123 num_threads: usize,
124
125 /// Custom closure, if any, to handle a panic that we cannot propagate
126 /// anywhere else.
127 panic_handler: Option<Box<PanicHandler>>,
128
129 /// Closure to compute the name of a thread.
130 get_thread_name: Option<Box<FnMut(usize) -> String>>,
131
132 /// The stack size for the created worker threads
133 stack_size: Option<usize>,
134
135 /// Closure invoked on worker thread start.
136 start_handler: Option<Box<StartHandler>>,
137
138 /// Closure invoked on worker thread exit.
139 exit_handler: Option<Box<ExitHandler>>,
140
141 /// If false, worker threads will execute spawned jobs in a
142 /// "depth-first" fashion. If true, they will do a "breadth-first"
143 /// fashion. Depth-first is the default.
144 breadth_first: bool,
145 }
146
147 /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
148 ///
149 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
150 #[deprecated(note = "Use `ThreadPoolBuilder`")]
151 #[derive(Default)]
152 pub struct Configuration {
153 builder: ThreadPoolBuilder,
154 }
155
156 /// The type for a panic handling closure. Note that this same closure
157 /// may be invoked multiple times in parallel.
158 type PanicHandler = Fn(Box<Any + Send>) + Send + Sync;
159
160 /// The type for a closure that gets invoked when a thread starts. The
161 /// closure is passed the index of the thread on which it is invoked.
162 /// Note that this same closure may be invoked multiple times in parallel.
163 type StartHandler = Fn(usize) + Send + Sync;
164
165 /// The type for a closure that gets invoked when a thread exits. The
166 /// closure is passed the index of the thread on which is is invoked.
167 /// Note that this same closure may be invoked multiple times in parallel.
168 type ExitHandler = Fn(usize) + Send + Sync;
169
170 impl ThreadPoolBuilder {
171 /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
new() -> ThreadPoolBuilder172 pub fn new() -> ThreadPoolBuilder {
173 ThreadPoolBuilder::default()
174 }
175
176 /// Create a new `ThreadPool` initialized using this configuration.
build(self) -> Result<ThreadPool, ThreadPoolBuildError>177 pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
178 thread_pool::build(self)
179 }
180
181 /// Initializes the global thread pool. This initialization is
182 /// **optional**. If you do not call this function, the thread pool
183 /// will be automatically initialized with the default
184 /// configuration. Calling `build_global` is not recommended, except
185 /// in two scenarios:
186 ///
187 /// - You wish to change the default configuration.
188 /// - You are running a benchmark, in which case initializing may
189 /// yield slightly more consistent results, since the worker threads
190 /// will already be ready to go even in the first iteration. But
191 /// this cost is minimal.
192 ///
193 /// Initialization of the global thread pool happens exactly
194 /// once. Once started, the configuration cannot be
195 /// changed. Therefore, if you call `build_global` a second time, it
196 /// will return an error. An `Ok` result indicates that this
197 /// is the first initialization of the thread pool.
build_global(self) -> Result<(), ThreadPoolBuildError>198 pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
199 let registry = try!(registry::init_global_registry(self));
200 registry.wait_until_primed();
201 Ok(())
202 }
203
204 /// Get the number of threads that will be used for the thread
205 /// pool. See `num_threads()` for more information.
get_num_threads(&self) -> usize206 fn get_num_threads(&self) -> usize {
207 if self.num_threads > 0 {
208 self.num_threads
209 } else {
210 match env::var("RAYON_NUM_THREADS").ok().and_then(|s| usize::from_str(&s).ok()) {
211 Some(x) if x > 0 => return x,
212 Some(x) if x == 0 => return num_cpus::get(),
213 _ => {},
214 }
215
216 // Support for deprecated `RAYON_RS_NUM_CPUS`.
217 match env::var("RAYON_RS_NUM_CPUS").ok().and_then(|s| usize::from_str(&s).ok()) {
218 Some(x) if x > 0 => x,
219 _ => num_cpus::get(),
220 }
221 }
222 }
223
224 /// Get the thread name for the thread with the given index.
get_thread_name(&mut self, index: usize) -> Option<String>225 fn get_thread_name(&mut self, index: usize) -> Option<String> {
226 self.get_thread_name.as_mut().map(|c| c(index))
227 }
228
229 /// Set a closure which takes a thread index and returns
230 /// the thread's name.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static231 pub fn thread_name<F>(mut self, closure: F) -> Self
232 where F: FnMut(usize) -> String + 'static {
233 self.get_thread_name = Some(Box::new(closure));
234 self
235 }
236
237 /// Set the number of threads to be used in the rayon threadpool.
238 ///
239 /// If you specify a non-zero number of threads using this
240 /// function, then the resulting thread-pools are guaranteed to
241 /// start at most this number of threads.
242 ///
243 /// If `num_threads` is 0, or you do not call this function, then
244 /// the Rayon runtime will select the number of threads
245 /// automatically. At present, this is based on the
246 /// `RAYON_NUM_THREADS` environment variable (if set),
247 /// or the number of logical CPUs (otherwise).
248 /// In the future, however, the default behavior may
249 /// change to dynamically add or remove threads as needed.
250 ///
251 /// **Future compatibility warning:** Given the default behavior
252 /// may change in the future, if you wish to rely on a fixed
253 /// number of threads, you should use this function to specify
254 /// that number. To reproduce the current default behavior, you
255 /// may wish to use the [`num_cpus`
256 /// crate](https://crates.io/crates/num_cpus) to query the number
257 /// of CPUs dynamically.
258 ///
259 /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
260 /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
261 /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
262 /// be prefered.
num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder263 pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
264 self.num_threads = num_threads;
265 self
266 }
267
268 /// Returns a copy of the current panic handler.
take_panic_handler(&mut self) -> Option<Box<PanicHandler>>269 fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
270 self.panic_handler.take()
271 }
272
273 /// Normally, whenever Rayon catches a panic, it tries to
274 /// propagate it to someplace sensible, to try and reflect the
275 /// semantics of sequential execution. But in some cases,
276 /// particularly with the `spawn()` APIs, there is no
277 /// obvious place where we should propagate the panic to.
278 /// In that case, this panic handler is invoked.
279 ///
280 /// If no panic handler is set, the default is to abort the
281 /// process, under the principle that panics should not go
282 /// unobserved.
283 ///
284 /// If the panic handler itself panics, this will abort the
285 /// process. To prevent this, wrap the body of your panic handler
286 /// in a call to `std::panic::catch_unwind()`.
panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder where H: Fn(Box<Any + Send>) + Send + Sync + 'static287 pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
288 where H: Fn(Box<Any + Send>) + Send + Sync + 'static
289 {
290 self.panic_handler = Some(Box::new(panic_handler));
291 self
292 }
293
294 /// Get the stack size of the worker threads
get_stack_size(&self) -> Option<usize>295 fn get_stack_size(&self) -> Option<usize>{
296 self.stack_size
297 }
298
299 /// Set the stack size of the worker threads
stack_size(mut self, stack_size: usize) -> Self300 pub fn stack_size(mut self, stack_size: usize) -> Self {
301 self.stack_size = Some(stack_size);
302 self
303 }
304
305 /// Suggest to worker threads that they execute spawned jobs in a
306 /// "breadth-first" fashion. Typically, when a worker thread is
307 /// idle or blocked, it will attempt to execute the job from the
308 /// *top* of its local deque of work (i.e., the job most recently
309 /// spawned). If this flag is set to true, however, workers will
310 /// prefer to execute in a *breadth-first* fashion -- that is,
311 /// they will search for jobs at the *bottom* of their local
312 /// deque. (At present, workers *always* steal from the bottom of
313 /// other worker's deques, regardless of the setting of this
314 /// flag.)
315 ///
316 /// If you think of the tasks as a tree, where a parent task
317 /// spawns its children in the tree, then this flag loosely
318 /// corresponds to doing a breadth-first traversal of the tree,
319 /// whereas the default would be to do a depth-first traversal.
320 ///
321 /// **Note that this is an "execution hint".** Rayon's task
322 /// execution is highly dynamic and the precise order in which
323 /// independent tasks are executed is not intended to be
324 /// guaranteed.
breadth_first(mut self) -> Self325 pub fn breadth_first(mut self) -> Self {
326 self.breadth_first = true;
327 self
328 }
329
get_breadth_first(&self) -> bool330 fn get_breadth_first(&self) -> bool {
331 self.breadth_first
332 }
333
334 /// Takes the current thread start callback, leaving `None`.
take_start_handler(&mut self) -> Option<Box<StartHandler>>335 fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
336 self.start_handler.take()
337 }
338
339 /// Set a callback to be invoked on thread start.
340 ///
341 /// The closure is passed the index of the thread on which it is invoked.
342 /// Note that this same closure may be invoked multiple times in parallel.
343 /// If this closure panics, the panic will be passed to the panic handler.
344 /// If that handler returns, then startup will continue normally.
start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder where H: Fn(usize) + Send + Sync + 'static345 pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
346 where H: Fn(usize) + Send + Sync + 'static
347 {
348 self.start_handler = Some(Box::new(start_handler));
349 self
350 }
351
352 /// Returns a current thread exit callback, leaving `None`.
take_exit_handler(&mut self) -> Option<Box<ExitHandler>>353 fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
354 self.exit_handler.take()
355 }
356
357 /// Set a callback to be invoked on thread exit.
358 ///
359 /// The closure is passed the index of the thread on which it is invoked.
360 /// Note that this same closure may be invoked multiple times in parallel.
361 /// If this closure panics, the panic will be passed to the panic handler.
362 /// If that handler returns, then the thread will exit normally.
exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder where H: Fn(usize) + Send + Sync + 'static363 pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
364 where H: Fn(usize) + Send + Sync + 'static
365 {
366 self.exit_handler = Some(Box::new(exit_handler));
367 self
368 }
369 }
370
371 #[allow(deprecated)]
372 impl Configuration {
373 /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
new() -> Configuration374 pub fn new() -> Configuration {
375 Configuration { builder: ThreadPoolBuilder::new() }
376 }
377
378 /// Deprecated in favor of `ThreadPoolBuilder::build`.
build(self) -> Result<ThreadPool, Box<Error + 'static>>379 pub fn build(self) -> Result<ThreadPool, Box<Error + 'static>> {
380 self.builder.build().map_err(|e| e.into())
381 }
382
383 /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static384 pub fn thread_name<F>(mut self, closure: F) -> Self
385 where F: FnMut(usize) -> String + 'static {
386 self.builder = self.builder.thread_name(closure);
387 self
388 }
389
390 /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
num_threads(mut self, num_threads: usize) -> Configuration391 pub fn num_threads(mut self, num_threads: usize) -> Configuration {
392 self.builder = self.builder.num_threads(num_threads);
393 self
394 }
395
396 /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
panic_handler<H>(mut self, panic_handler: H) -> Configuration where H: Fn(Box<Any + Send>) + Send + Sync + 'static397 pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
398 where H: Fn(Box<Any + Send>) + Send + Sync + 'static
399 {
400 self.builder = self.builder.panic_handler(panic_handler);
401 self
402 }
403
404 /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
stack_size(mut self, stack_size: usize) -> Self405 pub fn stack_size(mut self, stack_size: usize) -> Self {
406 self.builder = self.builder.stack_size(stack_size);
407 self
408 }
409
410 /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
breadth_first(mut self) -> Self411 pub fn breadth_first(mut self) -> Self {
412 self.builder = self.builder.breadth_first();
413 self
414 }
415
416 /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
start_handler<H>(mut self, start_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static417 pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
418 where H: Fn(usize) + Send + Sync + 'static
419 {
420 self.builder = self.builder.start_handler(start_handler);
421 self
422 }
423
424 /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
exit_handler<H>(mut self, exit_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static425 pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
426 where H: Fn(usize) + Send + Sync + 'static
427 {
428 self.builder = self.builder.exit_handler(exit_handler);
429 self
430 }
431
432 /// Returns a ThreadPoolBuilder with identical parameters.
into_builder(self) -> ThreadPoolBuilder433 fn into_builder(self) -> ThreadPoolBuilder {
434 self.builder
435 }
436 }
437
438 impl ThreadPoolBuildError {
new(kind: ErrorKind) -> ThreadPoolBuildError439 fn new(kind: ErrorKind) -> ThreadPoolBuildError {
440 ThreadPoolBuildError { kind: kind }
441 }
442 }
443
444 impl Error for ThreadPoolBuildError {
description(&self) -> &str445 fn description(&self) -> &str {
446 match self.kind {
447 ErrorKind::GlobalPoolAlreadyInitialized => "The global thread pool has already been initialized.",
448 ErrorKind::IOError(ref e) => e.description(),
449 }
450 }
451 }
452
453 impl fmt::Display for ThreadPoolBuildError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result454 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
455 match self.kind {
456 ErrorKind::IOError(ref e) => e.fmt(f),
457 _ => self.description().fmt(f),
458 }
459 }
460 }
461
462 /// Deprecated in favor of `ThreadPoolBuilder::build_global`.
463 #[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
464 #[allow(deprecated)]
initialize(config: Configuration) -> Result<(), Box<Error>>465 pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
466 config.into_builder().build_global().map_err(|e| e.into())
467 }
468
469 impl fmt::Debug for ThreadPoolBuilder {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result470 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471 let ThreadPoolBuilder { ref num_threads, ref get_thread_name,
472 ref panic_handler, ref stack_size,
473 ref start_handler, ref exit_handler,
474 ref breadth_first } = *self;
475
476 // Just print `Some(<closure>)` or `None` to the debug
477 // output.
478 struct ClosurePlaceholder;
479 impl fmt::Debug for ClosurePlaceholder {
480 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481 f.write_str("<closure>")
482 }
483 }
484 let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
485 let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
486 let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
487 let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
488
489 f.debug_struct("ThreadPoolBuilder")
490 .field("num_threads", num_threads)
491 .field("get_thread_name", &get_thread_name)
492 .field("panic_handler", &panic_handler)
493 .field("stack_size", &stack_size)
494 .field("start_handler", &start_handler)
495 .field("exit_handler", &exit_handler)
496 .field("breadth_first", &breadth_first)
497 .finish()
498 }
499 }
500
501 #[allow(deprecated)]
502 impl fmt::Debug for Configuration {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result503 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
504 self.builder.fmt(f)
505 }
506 }
507
508 /// Provides the calling context to a closure called by `join_context`.
509 #[derive(Debug)]
510 pub struct FnContext {
511 migrated: bool,
512
513 /// disable `Send` and `Sync`, just for a little future-proofing.
514 _marker: PhantomData<*mut ()>,
515 }
516
517 impl FnContext {
518 #[inline]
new(migrated: bool) -> Self519 fn new(migrated: bool) -> Self {
520 FnContext {
521 migrated: migrated,
522 _marker: PhantomData,
523 }
524 }
525 }
526
527 impl FnContext {
528 /// Returns `true` if the closure was called from a different thread
529 /// than it was provided from.
530 #[inline]
migrated(&self) -> bool531 pub fn migrated(&self) -> bool {
532 self.migrated
533 }
534 }
535