1 use super::{Inner, Runtime}; 2 3 use reactor::Reactor; 4 5 use std::io; 6 use std::sync::Mutex; 7 use std::time::Duration; 8 use std::any::Any; 9 10 use num_cpus; 11 use tokio_reactor; 12 use tokio_threadpool::Builder as ThreadPoolBuilder; 13 use tokio_timer::clock::{self, Clock}; 14 use tokio_timer::timer::{self, Timer}; 15 use tokio_trace_core as trace; 16 17 /// Builds Tokio Runtime with custom configuration values. 18 /// 19 /// Methods can be chained in order to set the configuration values. The 20 /// Runtime is constructed by calling [`build`]. 21 /// 22 /// New instances of `Builder` are obtained via [`Builder::new`]. 23 /// 24 /// See function level documentation for details on the various configuration 25 /// settings. 26 /// 27 /// [`build`]: #method.build 28 /// [`Builder::new`]: #method.new 29 /// 30 /// # Examples 31 /// 32 /// ``` 33 /// extern crate tokio; 34 /// extern crate tokio_timer; 35 /// 36 /// use std::time::Duration; 37 /// 38 /// use tokio::runtime::Builder; 39 /// use tokio_timer::clock::Clock; 40 /// 41 /// fn main() { 42 /// // build Runtime 43 /// let mut runtime = Builder::new() 44 /// .blocking_threads(4) 45 /// .clock(Clock::system()) 46 /// .core_threads(4) 47 /// .keep_alive(Some(Duration::from_secs(60))) 48 /// .name_prefix("my-custom-name-") 49 /// .stack_size(3 * 1024 * 1024) 50 /// .build() 51 /// .unwrap(); 52 /// 53 /// // use runtime ... 54 /// } 55 /// ``` 56 #[derive(Debug)] 57 pub struct Builder { 58 /// Thread pool specific builder 59 threadpool_builder: ThreadPoolBuilder, 60 61 /// The number of worker threads 62 core_threads: usize, 63 64 /// The clock to use 65 clock: Clock, 66 } 67 68 impl Builder { 69 /// Returns a new runtime builder initialized with default configuration 70 /// values. 71 /// 72 /// Configuration methods can be chained on the return value. new() -> Builder73 pub fn new() -> Builder { 74 let core_threads = num_cpus::get().max(1); 75 76 let mut threadpool_builder = ThreadPoolBuilder::new(); 77 threadpool_builder.name_prefix("tokio-runtime-worker-"); 78 threadpool_builder.pool_size(core_threads); 79 80 Builder { 81 threadpool_builder, 82 core_threads, 83 clock: Clock::new(), 84 } 85 } 86 87 /// Set the `Clock` instance that will be used by the runtime. clock(&mut self, clock: Clock) -> &mut Self88 pub fn clock(&mut self, clock: Clock) -> &mut Self { 89 self.clock = clock; 90 self 91 } 92 93 /// Set builder to set up the thread pool instance. 94 #[deprecated( 95 since = "0.1.9", 96 note = "use the `core_threads`, `blocking_threads`, `name_prefix`, \ 97 `keep_alive`, and `stack_size` functions on `runtime::Builder`, \ 98 instead")] 99 #[doc(hidden)] threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self100 pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { 101 self.threadpool_builder = val; 102 self 103 } 104 105 /// Sets a callback to handle panics in futures. 106 /// 107 /// The callback is triggered when a panic during a future bubbles up to 108 /// Tokio. By default Tokio catches these panics, and they will be ignored. 109 /// The parameter passed to this callback is the same error value returned 110 /// from `std::panic::catch_unwind()`. To abort the process on panics, use 111 /// `std::panic::resume_unwind()` in this callback as shown below. 112 /// 113 /// # Examples 114 /// 115 /// ``` 116 /// # extern crate tokio; 117 /// # extern crate futures; 118 /// # use tokio::runtime; 119 /// 120 /// # pub fn main() { 121 /// let mut rt = runtime::Builder::new() 122 /// .panic_handler(|err| std::panic::resume_unwind(err)) 123 /// .build() 124 /// .unwrap(); 125 /// # } 126 /// ``` panic_handler<F>(&mut self, f: F) -> &mut Self where F: Fn(Box<Any + Send>) + Send + Sync + 'static,127 pub fn panic_handler<F>(&mut self, f: F) -> &mut Self 128 where 129 F: Fn(Box<Any + Send>) + Send + Sync + 'static, 130 { 131 self.threadpool_builder.panic_handler(f); 132 self 133 } 134 135 136 /// Set the maximum number of worker threads for the `Runtime`'s thread pool. 137 /// 138 /// This must be a number between 1 and 32,768 though it is advised to keep 139 /// this value on the smaller side. 140 /// 141 /// The default value is the number of cores available to the system. 142 /// 143 /// # Examples 144 /// 145 /// ``` 146 /// # extern crate tokio; 147 /// # extern crate futures; 148 /// # use tokio::runtime; 149 /// 150 /// # pub fn main() { 151 /// let mut rt = runtime::Builder::new() 152 /// .core_threads(4) 153 /// .build() 154 /// .unwrap(); 155 /// # } 156 /// ``` core_threads(&mut self, val: usize) -> &mut Self157 pub fn core_threads(&mut self, val: usize) -> &mut Self { 158 self.core_threads = val; 159 self.threadpool_builder.pool_size(val); 160 self 161 } 162 163 /// Set the maximum number of concurrent blocking sections in the `Runtime`'s 164 /// thread pool. 165 /// 166 /// When the maximum concurrent `blocking` calls is reached, any further 167 /// calls to `blocking` will return `NotReady` and the task is notified once 168 /// previously in-flight calls to `blocking` return. 169 /// 170 /// This must be a number between 1 and 32,768 though it is advised to keep 171 /// this value on the smaller side. 172 /// 173 /// The default value is 100. 174 /// 175 /// # Examples 176 /// 177 /// ``` 178 /// # extern crate tokio; 179 /// # extern crate futures; 180 /// # use tokio::runtime; 181 /// 182 /// # pub fn main() { 183 /// let mut rt = runtime::Builder::new() 184 /// .blocking_threads(200) 185 /// .build(); 186 /// # } 187 /// ``` blocking_threads(&mut self, val: usize) -> &mut Self188 pub fn blocking_threads(&mut self, val: usize) -> &mut Self { 189 self.threadpool_builder.max_blocking(val); 190 self 191 } 192 193 /// Set the worker thread keep alive duration for threads in the `Runtime`'s 194 /// thread pool. 195 /// 196 /// If set, a worker thread will wait for up to the specified duration for 197 /// work, at which point the thread will shutdown. When work becomes 198 /// available, a new thread will eventually be spawned to replace the one 199 /// that shut down. 200 /// 201 /// When the value is `None`, the thread will wait for work forever. 202 /// 203 /// The default value is `None`. 204 /// 205 /// # Examples 206 /// 207 /// ``` 208 /// # extern crate tokio; 209 /// # extern crate futures; 210 /// # use tokio::runtime; 211 /// use std::time::Duration; 212 /// 213 /// # pub fn main() { 214 /// let mut rt = runtime::Builder::new() 215 /// .keep_alive(Some(Duration::from_secs(30))) 216 /// .build(); 217 /// # } 218 /// ``` keep_alive(&mut self, val: Option<Duration>) -> &mut Self219 pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self { 220 self.threadpool_builder.keep_alive(val); 221 self 222 } 223 224 /// Set name prefix of threads spawned by the `Runtime`'s thread pool. 225 /// 226 /// Thread name prefix is used for generating thread names. For example, if 227 /// prefix is `my-pool-`, then threads in the pool will get names like 228 /// `my-pool-1` etc. 229 /// 230 /// The default prefix is "tokio-runtime-worker-". 231 /// 232 /// # Examples 233 /// 234 /// ``` 235 /// # extern crate tokio; 236 /// # extern crate futures; 237 /// # use tokio::runtime; 238 /// 239 /// # pub fn main() { 240 /// let mut rt = runtime::Builder::new() 241 /// .name_prefix("my-pool-") 242 /// .build(); 243 /// # } 244 /// ``` name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self245 pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { 246 self.threadpool_builder.name_prefix(val); 247 self 248 } 249 250 /// Set the stack size (in bytes) for worker threads. 251 /// 252 /// The actual stack size may be greater than this value if the platform 253 /// specifies minimal stack size. 254 /// 255 /// The default stack size for spawned threads is 2 MiB, though this 256 /// particular stack size is subject to change in the future. 257 /// 258 /// # Examples 259 /// 260 /// ``` 261 /// # extern crate tokio; 262 /// # extern crate futures; 263 /// # use tokio::runtime; 264 /// 265 /// # pub fn main() { 266 /// let mut rt = runtime::Builder::new() 267 /// .stack_size(32 * 1024) 268 /// .build(); 269 /// # } 270 /// ``` stack_size(&mut self, val: usize) -> &mut Self271 pub fn stack_size(&mut self, val: usize) -> &mut Self { 272 self.threadpool_builder.stack_size(val); 273 self 274 } 275 276 /// Execute function `f` after each thread is started but before it starts 277 /// doing work. 278 /// 279 /// This is intended for bookkeeping and monitoring use cases. 280 /// 281 /// # Examples 282 /// 283 /// ``` 284 /// # extern crate tokio; 285 /// # extern crate futures; 286 /// # use tokio::runtime; 287 /// 288 /// # pub fn main() { 289 /// let thread_pool = runtime::Builder::new() 290 /// .after_start(|| { 291 /// println!("thread started"); 292 /// }) 293 /// .build(); 294 /// # } 295 /// ``` after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static296 pub fn after_start<F>(&mut self, f: F) -> &mut Self 297 where F: Fn() + Send + Sync + 'static 298 { 299 self.threadpool_builder.after_start(f); 300 self 301 } 302 303 /// Execute function `f` before each thread stops. 304 /// 305 /// This is intended for bookkeeping and monitoring use cases. 306 /// 307 /// # Examples 308 /// 309 /// ``` 310 /// # extern crate tokio; 311 /// # extern crate futures; 312 /// # use tokio::runtime; 313 /// 314 /// # pub fn main() { 315 /// let thread_pool = runtime::Builder::new() 316 /// .before_stop(|| { 317 /// println!("thread stopping"); 318 /// }) 319 /// .build(); 320 /// # } 321 /// ``` before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static322 pub fn before_stop<F>(&mut self, f: F) -> &mut Self 323 where F: Fn() + Send + Sync + 'static 324 { 325 self.threadpool_builder.before_stop(f); 326 self 327 } 328 329 /// Create the configured `Runtime`. 330 /// 331 /// The returned `ThreadPool` instance is ready to spawn tasks. 332 /// 333 /// # Examples 334 /// 335 /// ``` 336 /// # extern crate tokio; 337 /// # use tokio::runtime::Builder; 338 /// # pub fn main() { 339 /// let runtime = Builder::new().build().unwrap(); 340 /// // ... call runtime.run(...) 341 /// # let _ = runtime; 342 /// # } 343 /// ``` build(&mut self) -> io::Result<Runtime>344 pub fn build(&mut self) -> io::Result<Runtime> { 345 // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too. 346 self.threadpool_builder.pool_size(self.core_threads); 347 348 let mut reactor_handles = Vec::new(); 349 let mut timer_handles = Vec::new(); 350 let mut timers = Vec::new(); 351 352 for _ in 0..self.core_threads { 353 // Create a new reactor. 354 let reactor = Reactor::new()?; 355 reactor_handles.push(reactor.handle()); 356 357 // Create a new timer. 358 let timer = Timer::new_with_now(reactor, self.clock.clone()); 359 timer_handles.push(timer.handle()); 360 timers.push(Mutex::new(Some(timer))); 361 } 362 363 // Get a handle to the clock for the runtime. 364 let clock = self.clock.clone(); 365 366 // Get the current trace dispatcher. 367 // TODO(eliza): when `tokio-trace-core` is stable enough to take a 368 // public API dependency, we should allow users to set a custom 369 // subscriber for the runtime. 370 let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone); 371 372 let pool = self 373 .threadpool_builder 374 .around_worker(move |w, enter| { 375 let index = w.id().to_usize(); 376 377 tokio_reactor::with_default(&reactor_handles[index], enter, |enter| { 378 clock::with_default(&clock, enter, |enter| { 379 timer::with_default(&timer_handles[index], enter, |_| { 380 trace::dispatcher::with_default(&dispatch, || { 381 w.run(); 382 }) 383 }); 384 }) 385 }); 386 }) 387 .custom_park(move |worker_id| { 388 let index = worker_id.to_usize(); 389 390 timers[index] 391 .lock() 392 .unwrap() 393 .take() 394 .unwrap() 395 }) 396 .build(); 397 398 // To support deprecated `reactor()` function 399 let reactor = Reactor::new()?; 400 let reactor_handle = reactor.handle(); 401 402 Ok(Runtime { 403 inner: Some(Inner { 404 reactor_handle, 405 reactor: Mutex::new(Some(reactor)), 406 pool, 407 }), 408 }) 409 } 410 } 411