1 use super::{Inner, Runtime}; 2 3 use reactor::Reactor; 4 5 use std::io; 6 use std::sync::Mutex; 7 use std::time::Duration; 8 use std::any::Any; 9 10 use num_cpus; 11 use tokio_reactor; 12 use tokio_threadpool::Builder as ThreadPoolBuilder; 13 use tokio_timer::clock::{self, Clock}; 14 use tokio_timer::timer::{self, Timer}; 15 16 #[cfg(feature = "experimental-tracing")] 17 use tracing_core as trace; 18 19 /// Builds Tokio Runtime with custom configuration values. 20 /// 21 /// Methods can be chained in order to set the configuration values. The 22 /// Runtime is constructed by calling [`build`]. 23 /// 24 /// New instances of `Builder` are obtained via [`Builder::new`]. 25 /// 26 /// See function level documentation for details on the various configuration 27 /// settings. 28 /// 29 /// [`build`]: #method.build 30 /// [`Builder::new`]: #method.new 31 /// 32 /// # Examples 33 /// 34 /// ``` 35 /// extern crate tokio; 36 /// extern crate tokio_timer; 37 /// 38 /// use std::time::Duration; 39 /// 40 /// use tokio::runtime::Builder; 41 /// use tokio_timer::clock::Clock; 42 /// 43 /// fn main() { 44 /// // build Runtime 45 /// let mut runtime = Builder::new() 46 /// .blocking_threads(4) 47 /// .clock(Clock::system()) 48 /// .core_threads(4) 49 /// .keep_alive(Some(Duration::from_secs(60))) 50 /// .name_prefix("my-custom-name-") 51 /// .stack_size(3 * 1024 * 1024) 52 /// .build() 53 /// .unwrap(); 54 /// 55 /// // use runtime ... 56 /// } 57 /// ``` 58 #[derive(Debug)] 59 pub struct Builder { 60 /// Thread pool specific builder 61 threadpool_builder: ThreadPoolBuilder, 62 63 /// The number of worker threads 64 core_threads: usize, 65 66 /// The clock to use 67 clock: Clock, 68 } 69 70 impl Builder { 71 /// Returns a new runtime builder initialized with default configuration 72 /// values. 73 /// 74 /// Configuration methods can be chained on the return value. new() -> Builder75 pub fn new() -> Builder { 76 let core_threads = num_cpus::get().max(1); 77 78 let mut threadpool_builder = ThreadPoolBuilder::new(); 79 threadpool_builder.name_prefix("tokio-runtime-worker-"); 80 threadpool_builder.pool_size(core_threads); 81 82 Builder { 83 threadpool_builder, 84 core_threads, 85 clock: Clock::new(), 86 } 87 } 88 89 /// Set the `Clock` instance that will be used by the runtime. clock(&mut self, clock: Clock) -> &mut Self90 pub fn clock(&mut self, clock: Clock) -> &mut Self { 91 self.clock = clock; 92 self 93 } 94 95 /// Set builder to set up the thread pool instance. 96 #[deprecated( 97 since = "0.1.9", 98 note = "use the `core_threads`, `blocking_threads`, `name_prefix`, \ 99 `keep_alive`, and `stack_size` functions on `runtime::Builder`, \ 100 instead")] 101 #[doc(hidden)] threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self102 pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { 103 self.threadpool_builder = val; 104 self 105 } 106 107 /// Sets a callback to handle panics in futures. 108 /// 109 /// The callback is triggered when a panic during a future bubbles up to 110 /// Tokio. By default Tokio catches these panics, and they will be ignored. 111 /// The parameter passed to this callback is the same error value returned 112 /// from `std::panic::catch_unwind()`. To abort the process on panics, use 113 /// `std::panic::resume_unwind()` in this callback as shown below. 114 /// 115 /// # Examples 116 /// 117 /// ``` 118 /// # extern crate tokio; 119 /// # extern crate futures; 120 /// # use tokio::runtime; 121 /// 122 /// # pub fn main() { 123 /// let mut rt = runtime::Builder::new() 124 /// .panic_handler(|err| std::panic::resume_unwind(err)) 125 /// .build() 126 /// .unwrap(); 127 /// # } 128 /// ``` panic_handler<F>(&mut self, f: F) -> &mut Self where F: Fn(Box<Any + Send>) + Send + Sync + 'static,129 pub fn panic_handler<F>(&mut self, f: F) -> &mut Self 130 where 131 F: Fn(Box<Any + Send>) + Send + Sync + 'static, 132 { 133 self.threadpool_builder.panic_handler(f); 134 self 135 } 136 137 138 /// Set the maximum number of worker threads for the `Runtime`'s thread pool. 139 /// 140 /// This must be a number between 1 and 32,768 though it is advised to keep 141 /// this value on the smaller side. 142 /// 143 /// The default value is the number of cores available to the system. 144 /// 145 /// # Examples 146 /// 147 /// ``` 148 /// # extern crate tokio; 149 /// # extern crate futures; 150 /// # use tokio::runtime; 151 /// 152 /// # pub fn main() { 153 /// let mut rt = runtime::Builder::new() 154 /// .core_threads(4) 155 /// .build() 156 /// .unwrap(); 157 /// # } 158 /// ``` core_threads(&mut self, val: usize) -> &mut Self159 pub fn core_threads(&mut self, val: usize) -> &mut Self { 160 self.core_threads = val; 161 self.threadpool_builder.pool_size(val); 162 self 163 } 164 165 /// Set the maximum number of concurrent blocking sections in the `Runtime`'s 166 /// thread pool. 167 /// 168 /// When the maximum concurrent `blocking` calls is reached, any further 169 /// calls to `blocking` will return `NotReady` and the task is notified once 170 /// previously in-flight calls to `blocking` return. 171 /// 172 /// This must be a number between 1 and 32,768 though it is advised to keep 173 /// this value on the smaller side. 174 /// 175 /// The default value is 100. 176 /// 177 /// # Examples 178 /// 179 /// ``` 180 /// # extern crate tokio; 181 /// # extern crate futures; 182 /// # use tokio::runtime; 183 /// 184 /// # pub fn main() { 185 /// let mut rt = runtime::Builder::new() 186 /// .blocking_threads(200) 187 /// .build(); 188 /// # } 189 /// ``` blocking_threads(&mut self, val: usize) -> &mut Self190 pub fn blocking_threads(&mut self, val: usize) -> &mut Self { 191 self.threadpool_builder.max_blocking(val); 192 self 193 } 194 195 /// Set the worker thread keep alive duration for threads in the `Runtime`'s 196 /// thread pool. 197 /// 198 /// If set, a worker thread will wait for up to the specified duration for 199 /// work, at which point the thread will shutdown. When work becomes 200 /// available, a new thread will eventually be spawned to replace the one 201 /// that shut down. 202 /// 203 /// When the value is `None`, the thread will wait for work forever. 204 /// 205 /// The default value is `None`. 206 /// 207 /// # Examples 208 /// 209 /// ``` 210 /// # extern crate tokio; 211 /// # extern crate futures; 212 /// # use tokio::runtime; 213 /// use std::time::Duration; 214 /// 215 /// # pub fn main() { 216 /// let mut rt = runtime::Builder::new() 217 /// .keep_alive(Some(Duration::from_secs(30))) 218 /// .build(); 219 /// # } 220 /// ``` keep_alive(&mut self, val: Option<Duration>) -> &mut Self221 pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self { 222 self.threadpool_builder.keep_alive(val); 223 self 224 } 225 226 /// Set name prefix of threads spawned by the `Runtime`'s thread pool. 227 /// 228 /// Thread name prefix is used for generating thread names. For example, if 229 /// prefix is `my-pool-`, then threads in the pool will get names like 230 /// `my-pool-1` etc. 231 /// 232 /// The default prefix is "tokio-runtime-worker-". 233 /// 234 /// # Examples 235 /// 236 /// ``` 237 /// # extern crate tokio; 238 /// # extern crate futures; 239 /// # use tokio::runtime; 240 /// 241 /// # pub fn main() { 242 /// let mut rt = runtime::Builder::new() 243 /// .name_prefix("my-pool-") 244 /// .build(); 245 /// # } 246 /// ``` name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self247 pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { 248 self.threadpool_builder.name_prefix(val); 249 self 250 } 251 252 /// Set the stack size (in bytes) for worker threads. 253 /// 254 /// The actual stack size may be greater than this value if the platform 255 /// specifies minimal stack size. 256 /// 257 /// The default stack size for spawned threads is 2 MiB, though this 258 /// particular stack size is subject to change in the future. 259 /// 260 /// # Examples 261 /// 262 /// ``` 263 /// # extern crate tokio; 264 /// # extern crate futures; 265 /// # use tokio::runtime; 266 /// 267 /// # pub fn main() { 268 /// let mut rt = runtime::Builder::new() 269 /// .stack_size(32 * 1024) 270 /// .build(); 271 /// # } 272 /// ``` stack_size(&mut self, val: usize) -> &mut Self273 pub fn stack_size(&mut self, val: usize) -> &mut Self { 274 self.threadpool_builder.stack_size(val); 275 self 276 } 277 278 /// Execute function `f` after each thread is started but before it starts 279 /// doing work. 280 /// 281 /// This is intended for bookkeeping and monitoring use cases. 282 /// 283 /// # Examples 284 /// 285 /// ``` 286 /// # extern crate tokio; 287 /// # extern crate futures; 288 /// # use tokio::runtime; 289 /// 290 /// # pub fn main() { 291 /// let thread_pool = runtime::Builder::new() 292 /// .after_start(|| { 293 /// println!("thread started"); 294 /// }) 295 /// .build(); 296 /// # } 297 /// ``` after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static298 pub fn after_start<F>(&mut self, f: F) -> &mut Self 299 where F: Fn() + Send + Sync + 'static 300 { 301 self.threadpool_builder.after_start(f); 302 self 303 } 304 305 /// Execute function `f` before each thread stops. 306 /// 307 /// This is intended for bookkeeping and monitoring use cases. 308 /// 309 /// # Examples 310 /// 311 /// ``` 312 /// # extern crate tokio; 313 /// # extern crate futures; 314 /// # use tokio::runtime; 315 /// 316 /// # pub fn main() { 317 /// let thread_pool = runtime::Builder::new() 318 /// .before_stop(|| { 319 /// println!("thread stopping"); 320 /// }) 321 /// .build(); 322 /// # } 323 /// ``` before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static324 pub fn before_stop<F>(&mut self, f: F) -> &mut Self 325 where F: Fn() + Send + Sync + 'static 326 { 327 self.threadpool_builder.before_stop(f); 328 self 329 } 330 331 /// Create the configured `Runtime`. 332 /// 333 /// The returned `ThreadPool` instance is ready to spawn tasks. 334 /// 335 /// # Examples 336 /// 337 /// ``` 338 /// # extern crate tokio; 339 /// # use tokio::runtime::Builder; 340 /// # pub fn main() { 341 /// let runtime = Builder::new().build().unwrap(); 342 /// // ... call runtime.run(...) 343 /// # let _ = runtime; 344 /// # } 345 /// ``` build(&mut self) -> io::Result<Runtime>346 pub fn build(&mut self) -> io::Result<Runtime> { 347 // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too. 348 self.threadpool_builder.pool_size(self.core_threads); 349 350 let mut reactor_handles = Vec::new(); 351 let mut timer_handles = Vec::new(); 352 let mut timers = Vec::new(); 353 354 for _ in 0..self.core_threads { 355 // Create a new reactor. 356 let reactor = Reactor::new()?; 357 reactor_handles.push(reactor.handle()); 358 359 // Create a new timer. 360 let timer = Timer::new_with_now(reactor, self.clock.clone()); 361 timer_handles.push(timer.handle()); 362 timers.push(Mutex::new(Some(timer))); 363 } 364 365 // Get a handle to the clock for the runtime. 366 let clock = self.clock.clone(); 367 368 // Get the current trace dispatcher. 369 // TODO(eliza): when `tracing-core` is stable enough to take a 370 // public API dependency, we should allow users to set a custom 371 // subscriber for the runtime. 372 #[cfg(feature = "experimental-tracing")] 373 let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone); 374 375 let pool = self 376 .threadpool_builder 377 .around_worker(move |w, enter| { 378 let index = w.id().to_usize(); 379 380 tokio_reactor::with_default(&reactor_handles[index], enter, |enter| { 381 clock::with_default(&clock, enter, |enter| { 382 timer::with_default(&timer_handles[index], enter, |_| { 383 384 #[cfg(feature = "experimental-tracing")] 385 trace::dispatcher::with_default(&dispatch, || { 386 w.run(); 387 }); 388 389 #[cfg(not(feature = "experimental-tracing"))] 390 w.run(); 391 }); 392 }) 393 }); 394 }) 395 .custom_park(move |worker_id| { 396 let index = worker_id.to_usize(); 397 398 timers[index] 399 .lock() 400 .unwrap() 401 .take() 402 .unwrap() 403 }) 404 .build(); 405 406 // To support deprecated `reactor()` function 407 let reactor = Reactor::new()?; 408 let reactor_handle = reactor.handle(); 409 410 Ok(Runtime { 411 inner: Some(Inner { 412 reactor_handle, 413 reactor: Mutex::new(Some(reactor)), 414 pool, 415 }), 416 }) 417 } 418 } 419