1 //! The core reactor driving all I/O
2 //!
3 //! This module contains the `Core` type which is the reactor for all I/O
4 //! happening in `tokio-core`. This reactor (or event loop) is used to run
5 //! futures, schedule tasks, issue I/O requests, etc.
6 
7 use std::cell::RefCell;
8 use std::cmp;
9 use std::fmt;
10 use std::io::{self, ErrorKind};
11 use std::mem;
12 use std::rc::{Rc, Weak};
13 use std::sync::Arc;
14 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
15 use std::time::{Instant, Duration};
16 
17 use futures::{Future, IntoFuture, Async};
18 use futures::future;
19 use futures::executor::{self, Spawn, Unpark};
20 use futures::sync::mpsc;
21 use futures::task::Task;
22 use mio;
23 use mio::event::Evented;
24 use slab::Slab;
25 
26 use heap::{Heap, Slot};
27 
28 mod io_token;
29 mod timeout_token;
30 
31 mod poll_evented;
32 mod timeout;
33 mod interval;
34 pub use self::poll_evented::PollEvented;
35 pub use self::timeout::Timeout;
36 pub use self::interval::Interval;
37 
38 static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
39 scoped_thread_local!(static CURRENT_LOOP: Core);
40 
41 /// An event loop.
42 ///
43 /// The event loop is the main source of blocking in an application which drives
44 /// all other I/O events and notifications happening. Each event loop can have
45 /// multiple handles pointing to it, each of which can then be used to create
46 /// various I/O objects to interact with the event loop in interesting ways.
47 // TODO: expand this
48 pub struct Core {
49     events: mio::Events,
50     tx: mpsc::UnboundedSender<Message>,
51     rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
52     _rx_registration: mio::Registration,
53     rx_readiness: Arc<MySetReadiness>,
54 
55     inner: Rc<RefCell<Inner>>,
56 
57     // Used for determining when the future passed to `run` is ready. Once the
58     // registration is passed to `io` above we never touch it again, just keep
59     // it alive.
60     _future_registration: mio::Registration,
61     future_readiness: Arc<MySetReadiness>,
62 }
63 
64 struct Inner {
65     id: usize,
66     io: mio::Poll,
67 
68     // Dispatch slabs for I/O and futures events
69     io_dispatch: Slab<ScheduledIo>,
70     task_dispatch: Slab<ScheduledTask>,
71 
72     // Timer wheel keeping track of all timeouts. The `usize` stored in the
73     // timer wheel is an index into the slab below.
74     //
75     // The slab below keeps track of the timeouts themselves as well as the
76     // state of the timeout itself. The `TimeoutToken` type is an index into the
77     // `timeouts` slab.
78     timer_heap: Heap<(Instant, usize)>,
79     timeouts: Slab<(Option<Slot>, TimeoutState)>,
80 }
81 
82 /// An unique ID for a Core
83 ///
84 /// An ID by which different cores may be distinguished. Can be compared and used as an index in
85 /// a `HashMap`.
86 ///
87 /// The ID is globally unique and never reused.
88 #[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
89 pub struct CoreId(usize);
90 
91 /// Handle to an event loop, used to construct I/O objects, send messages, and
92 /// otherwise interact indirectly with the event loop itself.
93 ///
94 /// Handles can be cloned, and when cloned they will still refer to the
95 /// same underlying event loop.
96 #[derive(Clone)]
97 pub struct Remote {
98     id: usize,
99     tx: mpsc::UnboundedSender<Message>,
100 }
101 
102 /// A non-sendable handle to an event loop, useful for manufacturing instances
103 /// of `LoopData`.
104 #[derive(Clone)]
105 pub struct Handle {
106     remote: Remote,
107     inner: Weak<RefCell<Inner>>,
108 }
109 
110 struct ScheduledIo {
111     readiness: Arc<AtomicUsize>,
112     reader: Option<Task>,
113     writer: Option<Task>,
114 }
115 
116 struct ScheduledTask {
117     _registration: mio::Registration,
118     spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
119     wake: Arc<MySetReadiness>,
120 }
121 
122 enum TimeoutState {
123     NotFired,
124     Fired,
125     Waiting(Task),
126 }
127 
128 enum Direction {
129     Read,
130     Write,
131 }
132 
133 enum Message {
134     DropSource(usize),
135     Schedule(usize, Task, Direction),
136     UpdateTimeout(usize, Task),
137     ResetTimeout(usize, Instant),
138     CancelTimeout(usize),
139     Run(Box<FnBox>),
140 }
141 
142 #[repr(usize)]
143 #[derive(Clone, Copy, Debug, PartialEq)]
144 enum Readiness {
145     Readable = 1,
146     Writable = 2,
147 }
148 
149 const TOKEN_MESSAGES: mio::Token = mio::Token(0);
150 const TOKEN_FUTURE: mio::Token = mio::Token(1);
151 const TOKEN_START: usize = 2;
152 
153 impl Core {
154     /// Creates a new event loop, returning any error that happened during the
155     /// creation.
new() -> io::Result<Core>156     pub fn new() -> io::Result<Core> {
157         let io = try!(mio::Poll::new());
158         let future_pair = mio::Registration::new2();
159         try!(io.register(&future_pair.0,
160                          TOKEN_FUTURE,
161                          mio::Ready::readable(),
162                          mio::PollOpt::level()));
163         let (tx, rx) = mpsc::unbounded();
164         let channel_pair = mio::Registration::new2();
165         try!(io.register(&channel_pair.0,
166                          TOKEN_MESSAGES,
167                          mio::Ready::readable(),
168                          mio::PollOpt::level()));
169         let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
170         rx_readiness.unpark();
171 
172         Ok(Core {
173             events: mio::Events::with_capacity(1024),
174             tx: tx,
175             rx: RefCell::new(executor::spawn(rx)),
176             _rx_registration: channel_pair.0,
177             rx_readiness: rx_readiness,
178 
179             _future_registration: future_pair.0,
180             future_readiness: Arc::new(MySetReadiness(future_pair.1)),
181 
182             inner: Rc::new(RefCell::new(Inner {
183                 id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
184                 io: io,
185                 io_dispatch: Slab::with_capacity(1),
186                 task_dispatch: Slab::with_capacity(1),
187                 timeouts: Slab::with_capacity(1),
188                 timer_heap: Heap::new(),
189             })),
190         })
191     }
192 
193     /// Returns a handle to this event loop which cannot be sent across threads
194     /// but can be used as a proxy to the event loop itself.
195     ///
196     /// Handles are cloneable and clones always refer to the same event loop.
197     /// This handle is typically passed into functions that create I/O objects
198     /// to bind them to this event loop.
handle(&self) -> Handle199     pub fn handle(&self) -> Handle {
200         Handle {
201             remote: self.remote(),
202             inner: Rc::downgrade(&self.inner),
203         }
204     }
205 
206     /// Generates a remote handle to this event loop which can be used to spawn
207     /// tasks from other threads into this event loop.
remote(&self) -> Remote208     pub fn remote(&self) -> Remote {
209         Remote {
210             id: self.inner.borrow().id,
211             tx: self.tx.clone(),
212         }
213     }
214 
215     /// Runs a future until completion, driving the event loop while we're
216     /// otherwise waiting for the future to complete.
217     ///
218     /// This function will begin executing the event loop and will finish once
219     /// the provided future is resolved. Note that the future argument here
220     /// crucially does not require the `'static` nor `Send` bounds. As a result
221     /// the future will be "pinned" to not only this thread but also this stack
222     /// frame.
223     ///
224     /// This function will return the value that the future resolves to once
225     /// the future has finished. If the future never resolves then this function
226     /// will never return.
227     ///
228     /// # Panics
229     ///
230     /// This method will **not** catch panics from polling the future `f`. If
231     /// the future panics then it's the responsibility of the caller to catch
232     /// that panic and handle it as appropriate.
run<F>(&mut self, f: F) -> Result<F::Item, F::Error> where F: Future,233     pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
234         where F: Future,
235     {
236         let mut task = executor::spawn(f);
237         let ready = self.future_readiness.clone();
238         let mut future_fired = true;
239 
240         loop {
241             if future_fired {
242                 let res = try!(CURRENT_LOOP.set(self, || {
243                     task.poll_future(ready.clone())
244                 }));
245                 if let Async::Ready(e) = res {
246                     return Ok(e)
247                 }
248             }
249             future_fired = self.poll(None);
250         }
251     }
252 
253     /// Performs one iteration of the event loop, blocking on waiting for events
254     /// for at most `max_wait` (forever if `None`).
255     ///
256     /// It only makes sense to call this method if you've previously spawned
257     /// a future onto this event loop.
258     ///
259     /// `loop { lp.turn(None) }` is equivalent to calling `run` with an
260     /// empty future (one that never finishes).
turn(&mut self, max_wait: Option<Duration>)261     pub fn turn(&mut self, max_wait: Option<Duration>) {
262         self.poll(max_wait);
263     }
264 
poll(&mut self, max_wait: Option<Duration>) -> bool265     fn poll(&mut self, max_wait: Option<Duration>) -> bool {
266         // Given the `max_wait` variable specified, figure out the actual
267         // timeout that we're going to pass to `poll`. This involves taking a
268         // look at active timers on our heap as well.
269         let start = Instant::now();
270         let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
271             if t.0 < start {
272                 Duration::new(0, 0)
273             } else {
274                 t.0 - start
275             }
276         });
277         let timeout = match (max_wait, timeout) {
278             (Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
279             (max_wait, timeout) => max_wait.or(timeout),
280         };
281 
282         // Block waiting for an event to happen, peeling out how many events
283         // happened.
284         let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) {
285             Ok(a) => a,
286             Err(ref e) if e.kind() == ErrorKind::Interrupted => return false,
287             Err(e) => panic!("error in poll: {}", e),
288         };
289 
290         let after_poll = Instant::now();
291         debug!("loop poll - {:?}", after_poll - start);
292         debug!("loop time - {:?}", after_poll);
293 
294         // Process all timeouts that may have just occurred, updating the
295         // current time since
296         self.consume_timeouts(after_poll);
297 
298         // Process all the events that came in, dispatching appropriately
299         let mut fired = false;
300         for i in 0..self.events.len() {
301             let event = self.events.get(i).unwrap();
302             let token = event.token();
303             trace!("event {:?} {:?}", event.readiness(), event.token());
304 
305             if token == TOKEN_MESSAGES {
306                 self.rx_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
307                 CURRENT_LOOP.set(&self, || self.consume_queue());
308             } else if token == TOKEN_FUTURE {
309                 self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
310                 fired = true;
311             } else {
312                 self.dispatch(token, event.readiness());
313             }
314         }
315         debug!("loop process - {} events, {:?}", amt, after_poll.elapsed());
316         return fired
317     }
318 
dispatch(&mut self, token: mio::Token, ready: mio::Ready)319     fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
320         let token = usize::from(token) - TOKEN_START;
321         if token % 2 == 0 {
322             self.dispatch_io(token / 2, ready)
323         } else {
324             self.dispatch_task(token / 2)
325         }
326     }
327 
dispatch_io(&mut self, token: usize, ready: mio::Ready)328     fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
329         let mut reader = None;
330         let mut writer = None;
331         let mut inner = self.inner.borrow_mut();
332         if let Some(io) = inner.io_dispatch.get_mut(token) {
333             if ready.is_readable() || platform::is_hup(&ready) {
334                 reader = io.reader.take();
335                 io.readiness.fetch_or(Readiness::Readable as usize,
336                     Ordering::Relaxed);
337             }
338             if ready.is_writable() {
339                 writer = io.writer.take();
340                 io.readiness.fetch_or(Readiness::Writable as usize,
341                     Ordering::Relaxed);
342             }
343         }
344         drop(inner);
345         // TODO: don't notify the same task twice
346         if let Some(reader) = reader {
347             self.notify_handle(reader);
348         }
349         if let Some(writer) = writer {
350             self.notify_handle(writer);
351         }
352     }
353 
dispatch_task(&mut self, token: usize)354     fn dispatch_task(&mut self, token: usize) {
355         let mut inner = self.inner.borrow_mut();
356         let (task, wake) = match inner.task_dispatch.get_mut(token) {
357             Some(slot) => (slot.spawn.take(), slot.wake.clone()),
358             None => return,
359         };
360         wake.0.set_readiness(mio::Ready::empty()).unwrap();
361         let mut task = match task {
362             Some(task) => task,
363             None => return,
364         };
365         drop(inner);
366         let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
367         let _task_to_drop;
368         inner = self.inner.borrow_mut();
369         match res {
370             Ok(Async::NotReady) => {
371                 assert!(inner.task_dispatch[token].spawn.is_none());
372                 inner.task_dispatch[token].spawn = Some(task);
373             }
374             Ok(Async::Ready(())) |
375             Err(()) => {
376                 _task_to_drop = inner.task_dispatch.remove(token).unwrap();
377             }
378         }
379         drop(inner);
380     }
381 
consume_timeouts(&mut self, now: Instant)382     fn consume_timeouts(&mut self, now: Instant) {
383         loop {
384             let mut inner = self.inner.borrow_mut();
385             match inner.timer_heap.peek() {
386                 Some(head) if head.0 <= now => {}
387                 Some(_) => break,
388                 None => break,
389             };
390             let (_, slab_idx) = inner.timer_heap.pop().unwrap();
391 
392             trace!("firing timeout: {}", slab_idx);
393             inner.timeouts[slab_idx].0.take().unwrap();
394             let handle = inner.timeouts[slab_idx].1.fire();
395             drop(inner);
396             if let Some(handle) = handle {
397                 self.notify_handle(handle);
398             }
399         }
400     }
401 
402     /// Method used to notify a task handle.
403     ///
404     /// Note that this should be used instead of `handle.unpark()` to ensure
405     /// that the `CURRENT_LOOP` variable is set appropriately.
notify_handle(&self, handle: Task)406     fn notify_handle(&self, handle: Task) {
407         debug!("notifying a task handle");
408         CURRENT_LOOP.set(&self, || handle.unpark());
409     }
410 
consume_queue(&self)411     fn consume_queue(&self) {
412         debug!("consuming notification queue");
413         // TODO: can we do better than `.unwrap()` here?
414         let unpark = self.rx_readiness.clone();
415         loop {
416             let msg = self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap();
417             match msg {
418                 Async::Ready(Some(msg)) => self.notify(msg),
419                 Async::NotReady |
420                 Async::Ready(None) => break,
421             }
422         }
423     }
424 
notify(&self, msg: Message)425     fn notify(&self, msg: Message) {
426         match msg {
427             Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok),
428             Message::Schedule(tok, wake, dir) => {
429                 let task = self.inner.borrow_mut().schedule(tok, wake, dir);
430                 if let Some(task) = task {
431                     self.notify_handle(task);
432                 }
433             }
434             Message::UpdateTimeout(t, handle) => {
435                 let task = self.inner.borrow_mut().update_timeout(t, handle);
436                 if let Some(task) = task {
437                     self.notify_handle(task);
438                 }
439             }
440             Message::ResetTimeout(t, at) => {
441                 self.inner.borrow_mut().reset_timeout(t, at);
442             }
443             Message::CancelTimeout(t) => {
444                 self.inner.borrow_mut().cancel_timeout(t)
445             }
446             Message::Run(r) => r.call_box(self),
447         }
448     }
449 
450     /// Get the ID of this loop
id(&self) -> CoreId451     pub fn id(&self) -> CoreId {
452         CoreId(self.inner.borrow().id)
453     }
454 }
455 
456 impl fmt::Debug for Core {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result457     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
458         f.debug_struct("Core")
459          .field("id", &self.id())
460          .finish()
461     }
462 }
463 
464 impl Inner {
add_source(&mut self, source: &Evented) -> io::Result<(Arc<AtomicUsize>, usize)>465     fn add_source(&mut self, source: &Evented)
466                   -> io::Result<(Arc<AtomicUsize>, usize)> {
467         debug!("adding a new I/O source");
468         let sched = ScheduledIo {
469             readiness: Arc::new(AtomicUsize::new(0)),
470             reader: None,
471             writer: None,
472         };
473         if self.io_dispatch.vacant_entry().is_none() {
474             let amt = self.io_dispatch.len();
475             self.io_dispatch.reserve_exact(amt);
476         }
477         let entry = self.io_dispatch.vacant_entry().unwrap();
478         try!(self.io.register(source,
479                               mio::Token(TOKEN_START + entry.index() * 2),
480                               mio::Ready::readable() |
481                                 mio::Ready::writable() |
482                                 platform::hup(),
483                               mio::PollOpt::edge()));
484         Ok((sched.readiness.clone(), entry.insert(sched).index()))
485     }
486 
deregister_source(&mut self, source: &Evented) -> io::Result<()>487     fn deregister_source(&mut self, source: &Evented) -> io::Result<()> {
488         self.io.deregister(source)
489     }
490 
drop_source(&mut self, token: usize)491     fn drop_source(&mut self, token: usize) {
492         debug!("dropping I/O source: {}", token);
493         self.io_dispatch.remove(token).unwrap();
494     }
495 
schedule(&mut self, token: usize, wake: Task, dir: Direction) -> Option<Task>496     fn schedule(&mut self, token: usize, wake: Task, dir: Direction)
497                 -> Option<Task> {
498         debug!("scheduling direction for: {}", token);
499         let sched = self.io_dispatch.get_mut(token).unwrap();
500         let (slot, bit) = match dir {
501             Direction::Read => (&mut sched.reader, Readiness::Readable as usize),
502             Direction::Write => (&mut sched.writer, Readiness::Writable as usize),
503         };
504         if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
505             *slot = None;
506             Some(wake)
507         } else {
508             *slot = Some(wake);
509             None
510         }
511     }
512 
add_timeout(&mut self, at: Instant) -> usize513     fn add_timeout(&mut self, at: Instant) -> usize {
514         if self.timeouts.vacant_entry().is_none() {
515             let len = self.timeouts.len();
516             self.timeouts.reserve_exact(len);
517         }
518         let entry = self.timeouts.vacant_entry().unwrap();
519         let slot = self.timer_heap.push((at, entry.index()));
520         let entry = entry.insert((Some(slot), TimeoutState::NotFired));
521         debug!("added a timeout: {}", entry.index());
522         return entry.index();
523     }
524 
update_timeout(&mut self, token: usize, handle: Task) -> Option<Task>525     fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
526         debug!("updating a timeout: {}", token);
527         self.timeouts[token].1.block(handle)
528     }
529 
reset_timeout(&mut self, token: usize, at: Instant)530     fn reset_timeout(&mut self, token: usize, at: Instant) {
531         let pair = &mut self.timeouts[token];
532         // TODO: avoid remove + push and instead just do one sift of the heap?
533         // In theory we could update it in place and then do the percolation
534         // as necessary
535         if let Some(slot) = pair.0.take() {
536             self.timer_heap.remove(slot);
537         }
538         let slot = self.timer_heap.push((at, token));
539         *pair = (Some(slot), TimeoutState::NotFired);
540         debug!("set a timeout: {}", token);
541     }
542 
cancel_timeout(&mut self, token: usize)543     fn cancel_timeout(&mut self, token: usize) {
544         debug!("cancel a timeout: {}", token);
545         let pair = self.timeouts.remove(token);
546         if let Some((Some(slot), _state)) = pair {
547             self.timer_heap.remove(slot);
548         }
549     }
550 
spawn(&mut self, future: Box<Future<Item=(), Error=()>>)551     fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
552         if self.task_dispatch.vacant_entry().is_none() {
553             let len = self.task_dispatch.len();
554             self.task_dispatch.reserve_exact(len);
555         }
556         let entry = self.task_dispatch.vacant_entry().unwrap();
557         let token = TOKEN_START + 2 * entry.index() + 1;
558         let pair = mio::Registration::new2();
559         self.io.register(&pair.0,
560                          mio::Token(token),
561                          mio::Ready::readable(),
562                          mio::PollOpt::level())
563             .expect("cannot fail future registration with mio");
564         let unpark = Arc::new(MySetReadiness(pair.1));
565         let entry = entry.insert(ScheduledTask {
566             spawn: Some(executor::spawn(future)),
567             wake: unpark,
568             _registration: pair.0,
569         });
570         entry.get().wake.clone().unpark();
571     }
572 }
573 
574 impl Remote {
send(&self, msg: Message)575     fn send(&self, msg: Message) {
576         self.with_loop(|lp| {
577             match lp {
578                 Some(lp) => {
579                     // Need to execute all existing requests first, to ensure
580                     // that our message is processed "in order"
581                     lp.consume_queue();
582                     lp.notify(msg);
583                 }
584                 None => {
585                     match mpsc::UnboundedSender::send(&self.tx, msg) {
586                         Ok(()) => {}
587 
588                         // TODO: this error should punt upwards and we should
589                         //       notify the caller that the message wasn't
590                         //       received. This is tokio-core#17
591                         Err(e) => drop(e),
592                     }
593                 }
594             }
595         })
596     }
597 
with_loop<F, R>(&self, f: F) -> R where F: FnOnce(Option<&Core>) -> R598     fn with_loop<F, R>(&self, f: F) -> R
599         where F: FnOnce(Option<&Core>) -> R
600     {
601         if CURRENT_LOOP.is_set() {
602             CURRENT_LOOP.with(|lp| {
603                 let same = lp.inner.borrow().id == self.id;
604                 if same {
605                     f(Some(lp))
606                 } else {
607                     f(None)
608                 }
609             })
610         } else {
611             f(None)
612         }
613     }
614 
615     /// Spawns a new future into the event loop this remote is associated with.
616     ///
617     /// This function takes a closure which is executed within the context of
618     /// the I/O loop itself. The future returned by the closure will be
619     /// scheduled on the event loop an run to completion.
620     ///
621     /// Note that while the closure, `F`, requires the `Send` bound as it might
622     /// cross threads, the future `R` does not.
spawn<F, R>(&self, f: F) where F: FnOnce(&Handle) -> R + Send + 'static, R: IntoFuture<Item=(), Error=()>, R::Future: 'static,623     pub fn spawn<F, R>(&self, f: F)
624         where F: FnOnce(&Handle) -> R + Send + 'static,
625               R: IntoFuture<Item=(), Error=()>,
626               R::Future: 'static,
627     {
628         self.send(Message::Run(Box::new(|lp: &Core| {
629             let f = f(&lp.handle());
630             lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
631         })));
632     }
633 
634     /// Return the ID of the represented Core
id(&self) -> CoreId635     pub fn id(&self) -> CoreId {
636         CoreId(self.id)
637     }
638 
639     /// Attempts to "promote" this remote to a handle, if possible.
640     ///
641     /// This function is intended for structures which typically work through a
642     /// `Remote` but want to optimize runtime when the remote doesn't actually
643     /// leave the thread of the original reactor. This will attempt to return a
644     /// handle if the `Remote` is on the same thread as the event loop and the
645     /// event loop is running.
646     ///
647     /// If this `Remote` has moved to a different thread or if the event loop is
648     /// running, then `None` may be returned. If you need to guarantee access to
649     /// a `Handle`, then you can call this function and fall back to using
650     /// `spawn` above if it returns `None`.
handle(&self) -> Option<Handle>651     pub fn handle(&self) -> Option<Handle> {
652         if CURRENT_LOOP.is_set() {
653             CURRENT_LOOP.with(|lp| {
654                 let same = lp.inner.borrow().id == self.id;
655                 if same {
656                     Some(lp.handle())
657                 } else {
658                     None
659                 }
660             })
661         } else {
662             None
663         }
664     }
665 }
666 
667 impl fmt::Debug for Remote {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result668     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
669         f.debug_struct("Remote")
670          .field("id", &self.id())
671          .finish()
672     }
673 }
674 
675 impl Handle {
676     /// Returns a reference to the underlying remote handle to the event loop.
remote(&self) -> &Remote677     pub fn remote(&self) -> &Remote {
678         &self.remote
679     }
680 
681     /// Spawns a new future on the event loop this handle is associated with.
spawn<F>(&self, f: F) where F: Future<Item=(), Error=()> + 'static,682     pub fn spawn<F>(&self, f: F)
683         where F: Future<Item=(), Error=()> + 'static,
684     {
685         let inner = match self.inner.upgrade() {
686             Some(inner) => inner,
687             None => return,
688         };
689         inner.borrow_mut().spawn(Box::new(f));
690     }
691 
692     /// Spawns a closure on this event loop.
693     ///
694     /// This function is a convenience wrapper around the `spawn` function above
695     /// for running a closure wrapped in `futures::lazy`. It will spawn the
696     /// function `f` provided onto the event loop, and continue to run the
697     /// future returned by `f` on the event loop as well.
spawn_fn<F, R>(&self, f: F) where F: FnOnce() -> R + 'static, R: IntoFuture<Item=(), Error=()> + 'static,698     pub fn spawn_fn<F, R>(&self, f: F)
699         where F: FnOnce() -> R + 'static,
700               R: IntoFuture<Item=(), Error=()> + 'static,
701     {
702         self.spawn(future::lazy(f))
703     }
704 
705     /// Return the ID of the represented Core
id(&self) -> CoreId706     pub fn id(&self) -> CoreId {
707         self.remote.id()
708     }
709 }
710 
711 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result712     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
713         f.debug_struct("Handle")
714          .field("id", &self.id())
715          .finish()
716     }
717 }
718 
719 impl TimeoutState {
block(&mut self, handle: Task) -> Option<Task>720     fn block(&mut self, handle: Task) -> Option<Task> {
721         match *self {
722             TimeoutState::Fired => return Some(handle),
723             _ => {}
724         }
725         *self = TimeoutState::Waiting(handle);
726         None
727     }
728 
fire(&mut self) -> Option<Task>729     fn fire(&mut self) -> Option<Task> {
730         match mem::replace(self, TimeoutState::Fired) {
731             TimeoutState::NotFired => None,
732             TimeoutState::Fired => panic!("fired twice?"),
733             TimeoutState::Waiting(handle) => Some(handle),
734         }
735     }
736 }
737 
738 struct MySetReadiness(mio::SetReadiness);
739 
740 impl Unpark for MySetReadiness {
unpark(&self)741     fn unpark(&self) {
742         self.0.set_readiness(mio::Ready::readable())
743               .expect("failed to set readiness");
744     }
745 }
746 
747 trait FnBox: Send + 'static {
call_box(self: Box<Self>, lp: &Core)748     fn call_box(self: Box<Self>, lp: &Core);
749 }
750 
751 impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
call_box(self: Box<Self>, lp: &Core)752     fn call_box(self: Box<Self>, lp: &Core) {
753         (*self)(lp)
754     }
755 }
756 
757 #[cfg(unix)]
758 mod platform {
759     use mio::Ready;
760     use mio::unix::UnixReady;
761 
is_hup(event: &Ready) -> bool762     pub fn is_hup(event: &Ready) -> bool {
763         UnixReady::from(*event).is_hup()
764     }
765 
hup() -> Ready766     pub fn hup() -> Ready {
767         UnixReady::hup().into()
768     }
769 }
770 
771 #[cfg(windows)]
772 mod platform {
773     use mio::Ready;
774 
is_hup(_event: &Ready) -> bool775     pub fn is_hup(_event: &Ready) -> bool {
776         false
777     }
778 
hup() -> Ready779     pub fn hup() -> Ready {
780         Ready::empty()
781     }
782 }
783