1 //! The core reactor driving all I/O 2 //! 3 //! This module contains the `Core` type which is the reactor for all I/O 4 //! happening in `tokio-core`. This reactor (or event loop) is used to run 5 //! futures, schedule tasks, issue I/O requests, etc. 6 7 use std::cell::RefCell; 8 use std::cmp; 9 use std::fmt; 10 use std::io::{self, ErrorKind}; 11 use std::mem; 12 use std::rc::{Rc, Weak}; 13 use std::sync::Arc; 14 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; 15 use std::time::{Instant, Duration}; 16 17 use futures::{Future, IntoFuture, Async}; 18 use futures::future; 19 use futures::executor::{self, Spawn, Unpark}; 20 use futures::sync::mpsc; 21 use futures::task::Task; 22 use mio; 23 use mio::event::Evented; 24 use slab::Slab; 25 26 use heap::{Heap, Slot}; 27 28 mod io_token; 29 mod timeout_token; 30 31 mod poll_evented; 32 mod timeout; 33 mod interval; 34 pub use self::poll_evented::PollEvented; 35 pub use self::timeout::Timeout; 36 pub use self::interval::Interval; 37 38 static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; 39 scoped_thread_local!(static CURRENT_LOOP: Core); 40 41 /// An event loop. 42 /// 43 /// The event loop is the main source of blocking in an application which drives 44 /// all other I/O events and notifications happening. Each event loop can have 45 /// multiple handles pointing to it, each of which can then be used to create 46 /// various I/O objects to interact with the event loop in interesting ways. 47 // TODO: expand this 48 pub struct Core { 49 events: mio::Events, 50 tx: mpsc::UnboundedSender<Message>, 51 rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>, 52 _rx_registration: mio::Registration, 53 rx_readiness: Arc<MySetReadiness>, 54 55 inner: Rc<RefCell<Inner>>, 56 57 // Used for determining when the future passed to `run` is ready. Once the 58 // registration is passed to `io` above we never touch it again, just keep 59 // it alive. 60 _future_registration: mio::Registration, 61 future_readiness: Arc<MySetReadiness>, 62 } 63 64 struct Inner { 65 id: usize, 66 io: mio::Poll, 67 68 // Dispatch slabs for I/O and futures events 69 io_dispatch: Slab<ScheduledIo>, 70 task_dispatch: Slab<ScheduledTask>, 71 72 // Timer wheel keeping track of all timeouts. The `usize` stored in the 73 // timer wheel is an index into the slab below. 74 // 75 // The slab below keeps track of the timeouts themselves as well as the 76 // state of the timeout itself. The `TimeoutToken` type is an index into the 77 // `timeouts` slab. 78 timer_heap: Heap<(Instant, usize)>, 79 timeouts: Slab<(Option<Slot>, TimeoutState)>, 80 } 81 82 /// An unique ID for a Core 83 /// 84 /// An ID by which different cores may be distinguished. Can be compared and used as an index in 85 /// a `HashMap`. 86 /// 87 /// The ID is globally unique and never reused. 88 #[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)] 89 pub struct CoreId(usize); 90 91 /// Handle to an event loop, used to construct I/O objects, send messages, and 92 /// otherwise interact indirectly with the event loop itself. 93 /// 94 /// Handles can be cloned, and when cloned they will still refer to the 95 /// same underlying event loop. 96 #[derive(Clone)] 97 pub struct Remote { 98 id: usize, 99 tx: mpsc::UnboundedSender<Message>, 100 } 101 102 /// A non-sendable handle to an event loop, useful for manufacturing instances 103 /// of `LoopData`. 104 #[derive(Clone)] 105 pub struct Handle { 106 remote: Remote, 107 inner: Weak<RefCell<Inner>>, 108 } 109 110 struct ScheduledIo { 111 readiness: Arc<AtomicUsize>, 112 reader: Option<Task>, 113 writer: Option<Task>, 114 } 115 116 struct ScheduledTask { 117 _registration: mio::Registration, 118 spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>, 119 wake: Arc<MySetReadiness>, 120 } 121 122 enum TimeoutState { 123 NotFired, 124 Fired, 125 Waiting(Task), 126 } 127 128 enum Direction { 129 Read, 130 Write, 131 } 132 133 enum Message { 134 DropSource(usize), 135 Schedule(usize, Task, Direction), 136 UpdateTimeout(usize, Task), 137 ResetTimeout(usize, Instant), 138 CancelTimeout(usize), 139 Run(Box<FnBox>), 140 } 141 142 #[repr(usize)] 143 #[derive(Clone, Copy, Debug, PartialEq)] 144 enum Readiness { 145 Readable = 1, 146 Writable = 2, 147 } 148 149 const TOKEN_MESSAGES: mio::Token = mio::Token(0); 150 const TOKEN_FUTURE: mio::Token = mio::Token(1); 151 const TOKEN_START: usize = 2; 152 153 impl Core { 154 /// Creates a new event loop, returning any error that happened during the 155 /// creation. new() -> io::Result<Core>156 pub fn new() -> io::Result<Core> { 157 let io = try!(mio::Poll::new()); 158 let future_pair = mio::Registration::new2(); 159 try!(io.register(&future_pair.0, 160 TOKEN_FUTURE, 161 mio::Ready::readable(), 162 mio::PollOpt::level())); 163 let (tx, rx) = mpsc::unbounded(); 164 let channel_pair = mio::Registration::new2(); 165 try!(io.register(&channel_pair.0, 166 TOKEN_MESSAGES, 167 mio::Ready::readable(), 168 mio::PollOpt::level())); 169 let rx_readiness = Arc::new(MySetReadiness(channel_pair.1)); 170 rx_readiness.unpark(); 171 172 Ok(Core { 173 events: mio::Events::with_capacity(1024), 174 tx: tx, 175 rx: RefCell::new(executor::spawn(rx)), 176 _rx_registration: channel_pair.0, 177 rx_readiness: rx_readiness, 178 179 _future_registration: future_pair.0, 180 future_readiness: Arc::new(MySetReadiness(future_pair.1)), 181 182 inner: Rc::new(RefCell::new(Inner { 183 id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), 184 io: io, 185 io_dispatch: Slab::with_capacity(1), 186 task_dispatch: Slab::with_capacity(1), 187 timeouts: Slab::with_capacity(1), 188 timer_heap: Heap::new(), 189 })), 190 }) 191 } 192 193 /// Returns a handle to this event loop which cannot be sent across threads 194 /// but can be used as a proxy to the event loop itself. 195 /// 196 /// Handles are cloneable and clones always refer to the same event loop. 197 /// This handle is typically passed into functions that create I/O objects 198 /// to bind them to this event loop. handle(&self) -> Handle199 pub fn handle(&self) -> Handle { 200 Handle { 201 remote: self.remote(), 202 inner: Rc::downgrade(&self.inner), 203 } 204 } 205 206 /// Generates a remote handle to this event loop which can be used to spawn 207 /// tasks from other threads into this event loop. remote(&self) -> Remote208 pub fn remote(&self) -> Remote { 209 Remote { 210 id: self.inner.borrow().id, 211 tx: self.tx.clone(), 212 } 213 } 214 215 /// Runs a future until completion, driving the event loop while we're 216 /// otherwise waiting for the future to complete. 217 /// 218 /// This function will begin executing the event loop and will finish once 219 /// the provided future is resolved. Note that the future argument here 220 /// crucially does not require the `'static` nor `Send` bounds. As a result 221 /// the future will be "pinned" to not only this thread but also this stack 222 /// frame. 223 /// 224 /// This function will return the value that the future resolves to once 225 /// the future has finished. If the future never resolves then this function 226 /// will never return. 227 /// 228 /// # Panics 229 /// 230 /// This method will **not** catch panics from polling the future `f`. If 231 /// the future panics then it's the responsibility of the caller to catch 232 /// that panic and handle it as appropriate. run<F>(&mut self, f: F) -> Result<F::Item, F::Error> where F: Future,233 pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error> 234 where F: Future, 235 { 236 let mut task = executor::spawn(f); 237 let ready = self.future_readiness.clone(); 238 let mut future_fired = true; 239 240 loop { 241 if future_fired { 242 let res = try!(CURRENT_LOOP.set(self, || { 243 task.poll_future(ready.clone()) 244 })); 245 if let Async::Ready(e) = res { 246 return Ok(e) 247 } 248 } 249 future_fired = self.poll(None); 250 } 251 } 252 253 /// Performs one iteration of the event loop, blocking on waiting for events 254 /// for at most `max_wait` (forever if `None`). 255 /// 256 /// It only makes sense to call this method if you've previously spawned 257 /// a future onto this event loop. 258 /// 259 /// `loop { lp.turn(None) }` is equivalent to calling `run` with an 260 /// empty future (one that never finishes). turn(&mut self, max_wait: Option<Duration>)261 pub fn turn(&mut self, max_wait: Option<Duration>) { 262 self.poll(max_wait); 263 } 264 poll(&mut self, max_wait: Option<Duration>) -> bool265 fn poll(&mut self, max_wait: Option<Duration>) -> bool { 266 // Given the `max_wait` variable specified, figure out the actual 267 // timeout that we're going to pass to `poll`. This involves taking a 268 // look at active timers on our heap as well. 269 let start = Instant::now(); 270 let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| { 271 if t.0 < start { 272 Duration::new(0, 0) 273 } else { 274 t.0 - start 275 } 276 }); 277 let timeout = match (max_wait, timeout) { 278 (Some(d1), Some(d2)) => Some(cmp::min(d1, d2)), 279 (max_wait, timeout) => max_wait.or(timeout), 280 }; 281 282 // Block waiting for an event to happen, peeling out how many events 283 // happened. 284 let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) { 285 Ok(a) => a, 286 Err(ref e) if e.kind() == ErrorKind::Interrupted => return false, 287 Err(e) => panic!("error in poll: {}", e), 288 }; 289 290 let after_poll = Instant::now(); 291 debug!("loop poll - {:?}", after_poll - start); 292 debug!("loop time - {:?}", after_poll); 293 294 // Process all timeouts that may have just occurred, updating the 295 // current time since 296 self.consume_timeouts(after_poll); 297 298 // Process all the events that came in, dispatching appropriately 299 let mut fired = false; 300 for i in 0..self.events.len() { 301 let event = self.events.get(i).unwrap(); 302 let token = event.token(); 303 trace!("event {:?} {:?}", event.readiness(), event.token()); 304 305 if token == TOKEN_MESSAGES { 306 self.rx_readiness.0.set_readiness(mio::Ready::empty()).unwrap(); 307 CURRENT_LOOP.set(&self, || self.consume_queue()); 308 } else if token == TOKEN_FUTURE { 309 self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap(); 310 fired = true; 311 } else { 312 self.dispatch(token, event.readiness()); 313 } 314 } 315 debug!("loop process - {} events, {:?}", amt, after_poll.elapsed()); 316 return fired 317 } 318 dispatch(&mut self, token: mio::Token, ready: mio::Ready)319 fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { 320 let token = usize::from(token) - TOKEN_START; 321 if token % 2 == 0 { 322 self.dispatch_io(token / 2, ready) 323 } else { 324 self.dispatch_task(token / 2) 325 } 326 } 327 dispatch_io(&mut self, token: usize, ready: mio::Ready)328 fn dispatch_io(&mut self, token: usize, ready: mio::Ready) { 329 let mut reader = None; 330 let mut writer = None; 331 let mut inner = self.inner.borrow_mut(); 332 if let Some(io) = inner.io_dispatch.get_mut(token) { 333 if ready.is_readable() || platform::is_hup(&ready) { 334 reader = io.reader.take(); 335 io.readiness.fetch_or(Readiness::Readable as usize, 336 Ordering::Relaxed); 337 } 338 if ready.is_writable() { 339 writer = io.writer.take(); 340 io.readiness.fetch_or(Readiness::Writable as usize, 341 Ordering::Relaxed); 342 } 343 } 344 drop(inner); 345 // TODO: don't notify the same task twice 346 if let Some(reader) = reader { 347 self.notify_handle(reader); 348 } 349 if let Some(writer) = writer { 350 self.notify_handle(writer); 351 } 352 } 353 dispatch_task(&mut self, token: usize)354 fn dispatch_task(&mut self, token: usize) { 355 let mut inner = self.inner.borrow_mut(); 356 let (task, wake) = match inner.task_dispatch.get_mut(token) { 357 Some(slot) => (slot.spawn.take(), slot.wake.clone()), 358 None => return, 359 }; 360 wake.0.set_readiness(mio::Ready::empty()).unwrap(); 361 let mut task = match task { 362 Some(task) => task, 363 None => return, 364 }; 365 drop(inner); 366 let res = CURRENT_LOOP.set(self, || task.poll_future(wake)); 367 let _task_to_drop; 368 inner = self.inner.borrow_mut(); 369 match res { 370 Ok(Async::NotReady) => { 371 assert!(inner.task_dispatch[token].spawn.is_none()); 372 inner.task_dispatch[token].spawn = Some(task); 373 } 374 Ok(Async::Ready(())) | 375 Err(()) => { 376 _task_to_drop = inner.task_dispatch.remove(token).unwrap(); 377 } 378 } 379 drop(inner); 380 } 381 consume_timeouts(&mut self, now: Instant)382 fn consume_timeouts(&mut self, now: Instant) { 383 loop { 384 let mut inner = self.inner.borrow_mut(); 385 match inner.timer_heap.peek() { 386 Some(head) if head.0 <= now => {} 387 Some(_) => break, 388 None => break, 389 }; 390 let (_, slab_idx) = inner.timer_heap.pop().unwrap(); 391 392 trace!("firing timeout: {}", slab_idx); 393 inner.timeouts[slab_idx].0.take().unwrap(); 394 let handle = inner.timeouts[slab_idx].1.fire(); 395 drop(inner); 396 if let Some(handle) = handle { 397 self.notify_handle(handle); 398 } 399 } 400 } 401 402 /// Method used to notify a task handle. 403 /// 404 /// Note that this should be used instead of `handle.unpark()` to ensure 405 /// that the `CURRENT_LOOP` variable is set appropriately. notify_handle(&self, handle: Task)406 fn notify_handle(&self, handle: Task) { 407 debug!("notifying a task handle"); 408 CURRENT_LOOP.set(&self, || handle.unpark()); 409 } 410 consume_queue(&self)411 fn consume_queue(&self) { 412 debug!("consuming notification queue"); 413 // TODO: can we do better than `.unwrap()` here? 414 let unpark = self.rx_readiness.clone(); 415 loop { 416 let msg = self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap(); 417 match msg { 418 Async::Ready(Some(msg)) => self.notify(msg), 419 Async::NotReady | 420 Async::Ready(None) => break, 421 } 422 } 423 } 424 notify(&self, msg: Message)425 fn notify(&self, msg: Message) { 426 match msg { 427 Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok), 428 Message::Schedule(tok, wake, dir) => { 429 let task = self.inner.borrow_mut().schedule(tok, wake, dir); 430 if let Some(task) = task { 431 self.notify_handle(task); 432 } 433 } 434 Message::UpdateTimeout(t, handle) => { 435 let task = self.inner.borrow_mut().update_timeout(t, handle); 436 if let Some(task) = task { 437 self.notify_handle(task); 438 } 439 } 440 Message::ResetTimeout(t, at) => { 441 self.inner.borrow_mut().reset_timeout(t, at); 442 } 443 Message::CancelTimeout(t) => { 444 self.inner.borrow_mut().cancel_timeout(t) 445 } 446 Message::Run(r) => r.call_box(self), 447 } 448 } 449 450 /// Get the ID of this loop id(&self) -> CoreId451 pub fn id(&self) -> CoreId { 452 CoreId(self.inner.borrow().id) 453 } 454 } 455 456 impl fmt::Debug for Core { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result457 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 458 f.debug_struct("Core") 459 .field("id", &self.id()) 460 .finish() 461 } 462 } 463 464 impl Inner { add_source(&mut self, source: &Evented) -> io::Result<(Arc<AtomicUsize>, usize)>465 fn add_source(&mut self, source: &Evented) 466 -> io::Result<(Arc<AtomicUsize>, usize)> { 467 debug!("adding a new I/O source"); 468 let sched = ScheduledIo { 469 readiness: Arc::new(AtomicUsize::new(0)), 470 reader: None, 471 writer: None, 472 }; 473 if self.io_dispatch.vacant_entry().is_none() { 474 let amt = self.io_dispatch.len(); 475 self.io_dispatch.reserve_exact(amt); 476 } 477 let entry = self.io_dispatch.vacant_entry().unwrap(); 478 try!(self.io.register(source, 479 mio::Token(TOKEN_START + entry.index() * 2), 480 mio::Ready::readable() | 481 mio::Ready::writable() | 482 platform::hup(), 483 mio::PollOpt::edge())); 484 Ok((sched.readiness.clone(), entry.insert(sched).index())) 485 } 486 deregister_source(&mut self, source: &Evented) -> io::Result<()>487 fn deregister_source(&mut self, source: &Evented) -> io::Result<()> { 488 self.io.deregister(source) 489 } 490 drop_source(&mut self, token: usize)491 fn drop_source(&mut self, token: usize) { 492 debug!("dropping I/O source: {}", token); 493 self.io_dispatch.remove(token).unwrap(); 494 } 495 schedule(&mut self, token: usize, wake: Task, dir: Direction) -> Option<Task>496 fn schedule(&mut self, token: usize, wake: Task, dir: Direction) 497 -> Option<Task> { 498 debug!("scheduling direction for: {}", token); 499 let sched = self.io_dispatch.get_mut(token).unwrap(); 500 let (slot, bit) = match dir { 501 Direction::Read => (&mut sched.reader, Readiness::Readable as usize), 502 Direction::Write => (&mut sched.writer, Readiness::Writable as usize), 503 }; 504 if sched.readiness.load(Ordering::SeqCst) & bit != 0 { 505 *slot = None; 506 Some(wake) 507 } else { 508 *slot = Some(wake); 509 None 510 } 511 } 512 add_timeout(&mut self, at: Instant) -> usize513 fn add_timeout(&mut self, at: Instant) -> usize { 514 if self.timeouts.vacant_entry().is_none() { 515 let len = self.timeouts.len(); 516 self.timeouts.reserve_exact(len); 517 } 518 let entry = self.timeouts.vacant_entry().unwrap(); 519 let slot = self.timer_heap.push((at, entry.index())); 520 let entry = entry.insert((Some(slot), TimeoutState::NotFired)); 521 debug!("added a timeout: {}", entry.index()); 522 return entry.index(); 523 } 524 update_timeout(&mut self, token: usize, handle: Task) -> Option<Task>525 fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> { 526 debug!("updating a timeout: {}", token); 527 self.timeouts[token].1.block(handle) 528 } 529 reset_timeout(&mut self, token: usize, at: Instant)530 fn reset_timeout(&mut self, token: usize, at: Instant) { 531 let pair = &mut self.timeouts[token]; 532 // TODO: avoid remove + push and instead just do one sift of the heap? 533 // In theory we could update it in place and then do the percolation 534 // as necessary 535 if let Some(slot) = pair.0.take() { 536 self.timer_heap.remove(slot); 537 } 538 let slot = self.timer_heap.push((at, token)); 539 *pair = (Some(slot), TimeoutState::NotFired); 540 debug!("set a timeout: {}", token); 541 } 542 cancel_timeout(&mut self, token: usize)543 fn cancel_timeout(&mut self, token: usize) { 544 debug!("cancel a timeout: {}", token); 545 let pair = self.timeouts.remove(token); 546 if let Some((Some(slot), _state)) = pair { 547 self.timer_heap.remove(slot); 548 } 549 } 550 spawn(&mut self, future: Box<Future<Item=(), Error=()>>)551 fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) { 552 if self.task_dispatch.vacant_entry().is_none() { 553 let len = self.task_dispatch.len(); 554 self.task_dispatch.reserve_exact(len); 555 } 556 let entry = self.task_dispatch.vacant_entry().unwrap(); 557 let token = TOKEN_START + 2 * entry.index() + 1; 558 let pair = mio::Registration::new2(); 559 self.io.register(&pair.0, 560 mio::Token(token), 561 mio::Ready::readable(), 562 mio::PollOpt::level()) 563 .expect("cannot fail future registration with mio"); 564 let unpark = Arc::new(MySetReadiness(pair.1)); 565 let entry = entry.insert(ScheduledTask { 566 spawn: Some(executor::spawn(future)), 567 wake: unpark, 568 _registration: pair.0, 569 }); 570 entry.get().wake.clone().unpark(); 571 } 572 } 573 574 impl Remote { send(&self, msg: Message)575 fn send(&self, msg: Message) { 576 self.with_loop(|lp| { 577 match lp { 578 Some(lp) => { 579 // Need to execute all existing requests first, to ensure 580 // that our message is processed "in order" 581 lp.consume_queue(); 582 lp.notify(msg); 583 } 584 None => { 585 match mpsc::UnboundedSender::send(&self.tx, msg) { 586 Ok(()) => {} 587 588 // TODO: this error should punt upwards and we should 589 // notify the caller that the message wasn't 590 // received. This is tokio-core#17 591 Err(e) => drop(e), 592 } 593 } 594 } 595 }) 596 } 597 with_loop<F, R>(&self, f: F) -> R where F: FnOnce(Option<&Core>) -> R598 fn with_loop<F, R>(&self, f: F) -> R 599 where F: FnOnce(Option<&Core>) -> R 600 { 601 if CURRENT_LOOP.is_set() { 602 CURRENT_LOOP.with(|lp| { 603 let same = lp.inner.borrow().id == self.id; 604 if same { 605 f(Some(lp)) 606 } else { 607 f(None) 608 } 609 }) 610 } else { 611 f(None) 612 } 613 } 614 615 /// Spawns a new future into the event loop this remote is associated with. 616 /// 617 /// This function takes a closure which is executed within the context of 618 /// the I/O loop itself. The future returned by the closure will be 619 /// scheduled on the event loop an run to completion. 620 /// 621 /// Note that while the closure, `F`, requires the `Send` bound as it might 622 /// cross threads, the future `R` does not. spawn<F, R>(&self, f: F) where F: FnOnce(&Handle) -> R + Send + 'static, R: IntoFuture<Item=(), Error=()>, R::Future: 'static,623 pub fn spawn<F, R>(&self, f: F) 624 where F: FnOnce(&Handle) -> R + Send + 'static, 625 R: IntoFuture<Item=(), Error=()>, 626 R::Future: 'static, 627 { 628 self.send(Message::Run(Box::new(|lp: &Core| { 629 let f = f(&lp.handle()); 630 lp.inner.borrow_mut().spawn(Box::new(f.into_future())); 631 }))); 632 } 633 634 /// Return the ID of the represented Core id(&self) -> CoreId635 pub fn id(&self) -> CoreId { 636 CoreId(self.id) 637 } 638 639 /// Attempts to "promote" this remote to a handle, if possible. 640 /// 641 /// This function is intended for structures which typically work through a 642 /// `Remote` but want to optimize runtime when the remote doesn't actually 643 /// leave the thread of the original reactor. This will attempt to return a 644 /// handle if the `Remote` is on the same thread as the event loop and the 645 /// event loop is running. 646 /// 647 /// If this `Remote` has moved to a different thread or if the event loop is 648 /// running, then `None` may be returned. If you need to guarantee access to 649 /// a `Handle`, then you can call this function and fall back to using 650 /// `spawn` above if it returns `None`. handle(&self) -> Option<Handle>651 pub fn handle(&self) -> Option<Handle> { 652 if CURRENT_LOOP.is_set() { 653 CURRENT_LOOP.with(|lp| { 654 let same = lp.inner.borrow().id == self.id; 655 if same { 656 Some(lp.handle()) 657 } else { 658 None 659 } 660 }) 661 } else { 662 None 663 } 664 } 665 } 666 667 impl fmt::Debug for Remote { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result668 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 669 f.debug_struct("Remote") 670 .field("id", &self.id()) 671 .finish() 672 } 673 } 674 675 impl Handle { 676 /// Returns a reference to the underlying remote handle to the event loop. remote(&self) -> &Remote677 pub fn remote(&self) -> &Remote { 678 &self.remote 679 } 680 681 /// Spawns a new future on the event loop this handle is associated with. spawn<F>(&self, f: F) where F: Future<Item=(), Error=()> + 'static,682 pub fn spawn<F>(&self, f: F) 683 where F: Future<Item=(), Error=()> + 'static, 684 { 685 let inner = match self.inner.upgrade() { 686 Some(inner) => inner, 687 None => return, 688 }; 689 inner.borrow_mut().spawn(Box::new(f)); 690 } 691 692 /// Spawns a closure on this event loop. 693 /// 694 /// This function is a convenience wrapper around the `spawn` function above 695 /// for running a closure wrapped in `futures::lazy`. It will spawn the 696 /// function `f` provided onto the event loop, and continue to run the 697 /// future returned by `f` on the event loop as well. spawn_fn<F, R>(&self, f: F) where F: FnOnce() -> R + 'static, R: IntoFuture<Item=(), Error=()> + 'static,698 pub fn spawn_fn<F, R>(&self, f: F) 699 where F: FnOnce() -> R + 'static, 700 R: IntoFuture<Item=(), Error=()> + 'static, 701 { 702 self.spawn(future::lazy(f)) 703 } 704 705 /// Return the ID of the represented Core id(&self) -> CoreId706 pub fn id(&self) -> CoreId { 707 self.remote.id() 708 } 709 } 710 711 impl fmt::Debug for Handle { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result712 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 713 f.debug_struct("Handle") 714 .field("id", &self.id()) 715 .finish() 716 } 717 } 718 719 impl TimeoutState { block(&mut self, handle: Task) -> Option<Task>720 fn block(&mut self, handle: Task) -> Option<Task> { 721 match *self { 722 TimeoutState::Fired => return Some(handle), 723 _ => {} 724 } 725 *self = TimeoutState::Waiting(handle); 726 None 727 } 728 fire(&mut self) -> Option<Task>729 fn fire(&mut self) -> Option<Task> { 730 match mem::replace(self, TimeoutState::Fired) { 731 TimeoutState::NotFired => None, 732 TimeoutState::Fired => panic!("fired twice?"), 733 TimeoutState::Waiting(handle) => Some(handle), 734 } 735 } 736 } 737 738 struct MySetReadiness(mio::SetReadiness); 739 740 impl Unpark for MySetReadiness { unpark(&self)741 fn unpark(&self) { 742 self.0.set_readiness(mio::Ready::readable()) 743 .expect("failed to set readiness"); 744 } 745 } 746 747 trait FnBox: Send + 'static { call_box(self: Box<Self>, lp: &Core)748 fn call_box(self: Box<Self>, lp: &Core); 749 } 750 751 impl<F: FnOnce(&Core) + Send + 'static> FnBox for F { call_box(self: Box<Self>, lp: &Core)752 fn call_box(self: Box<Self>, lp: &Core) { 753 (*self)(lp) 754 } 755 } 756 757 #[cfg(unix)] 758 mod platform { 759 use mio::Ready; 760 use mio::unix::UnixReady; 761 is_hup(event: &Ready) -> bool762 pub fn is_hup(event: &Ready) -> bool { 763 UnixReady::from(*event).is_hup() 764 } 765 hup() -> Ready766 pub fn hup() -> Ready { 767 UnixReady::hup().into() 768 } 769 } 770 771 #[cfg(windows)] 772 mod platform { 773 use mio::Ready; 774 is_hup(_event: &Ready) -> bool775 pub fn is_hup(_event: &Ready) -> bool { 776 false 777 } 778 hup() -> Ready779 pub fn hup() -> Ready { 780 Ready::empty() 781 } 782 } 783