1 use std::sync::Arc; 2 use std::sync::atomic::{AtomicUsize, Ordering}; 3 use std::io; 4 5 use futures::task; 6 use mio::event::Evented; 7 8 use reactor::{Message, Remote, Handle, Direction}; 9 10 /// A token that identifies an active timeout. 11 pub struct IoToken { 12 token: usize, 13 // TODO: can we avoid this allocation? It's kind of a bummer... 14 readiness: Arc<AtomicUsize>, 15 } 16 17 impl IoToken { 18 /// Add a new source to an event loop, returning a future which will resolve 19 /// to the token that can be used to identify this source. 20 /// 21 /// When a new I/O object is created it needs to be communicated to the 22 /// event loop to ensure that it's registered and ready to receive 23 /// notifications. The event loop with then respond back with the I/O object 24 /// and a token which can be used to send more messages to the event loop. 25 /// 26 /// The token returned is then passed in turn to each of the methods below 27 /// to interact with notifications on the I/O object itself. 28 /// 29 /// # Panics 30 /// 31 /// The returned future will panic if the event loop this handle is 32 /// associated with has gone away, or if there is an error communicating 33 /// with the event loop. new(source: &Evented, handle: &Handle) -> io::Result<IoToken>34 pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> { 35 match handle.inner.upgrade() { 36 Some(inner) => { 37 let (ready, token) = try!(inner.borrow_mut().add_source(source)); 38 Ok(IoToken { token: token, readiness: ready }) 39 } 40 None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")), 41 } 42 } 43 44 /// Consumes the last readiness notification the token this source is for 45 /// registered. 46 /// 47 /// Currently sources receive readiness notifications on an edge-basis. That 48 /// is, once you receive a notification that an object can be read, you 49 /// won't receive any more notifications until all of that data has been 50 /// read. 51 /// 52 /// The event loop will fill in this information and then inform futures 53 /// that they're ready to go with the `schedule` method, and then the `poll` 54 /// method can use this to figure out what happened. 55 /// 56 /// > **Note**: This method should generally not be used directly, but 57 /// > rather the `ReadinessStream` type should be used instead. 58 // TODO: this should really return a proper newtype/enum, not a usize take_readiness(&self) -> usize59 pub fn take_readiness(&self) -> usize { 60 self.readiness.swap(0, Ordering::SeqCst) 61 } 62 63 /// Schedule the current future task to receive a notification when the 64 /// corresponding I/O object is readable. 65 /// 66 /// Once an I/O object has been registered with the event loop through the 67 /// `add_source` method, this method can be used with the assigned token to 68 /// notify the current future task when the next read notification comes in. 69 /// 70 /// The current task will only receive a notification **once** and to 71 /// receive further notifications it will need to call `schedule_read` 72 /// again. 73 /// 74 /// > **Note**: This method should generally not be used directly, but 75 /// > rather the `ReadinessStream` type should be used instead. 76 /// 77 /// # Panics 78 /// 79 /// This function will panic if the event loop this handle is associated 80 /// with has gone away, or if there is an error communicating with the event 81 /// loop. 82 /// 83 /// This function will also panic if there is not a currently running future 84 /// task. schedule_read(&self, handle: &Remote)85 pub fn schedule_read(&self, handle: &Remote) { 86 handle.send(Message::Schedule(self.token, task::park(), Direction::Read)); 87 } 88 89 /// Schedule the current future task to receive a notification when the 90 /// corresponding I/O object is writable. 91 /// 92 /// Once an I/O object has been registered with the event loop through the 93 /// `add_source` method, this method can be used with the assigned token to 94 /// notify the current future task when the next write notification comes 95 /// in. 96 /// 97 /// The current task will only receive a notification **once** and to 98 /// receive further notifications it will need to call `schedule_write` 99 /// again. 100 /// 101 /// > **Note**: This method should generally not be used directly, but 102 /// > rather the `ReadinessStream` type should be used instead. 103 /// 104 /// # Panics 105 /// 106 /// This function will panic if the event loop this handle is associated 107 /// with has gone away, or if there is an error communicating with the event 108 /// loop. 109 /// 110 /// This function will also panic if there is not a currently running future 111 /// task. schedule_write(&self, handle: &Remote)112 pub fn schedule_write(&self, handle: &Remote) { 113 handle.send(Message::Schedule(self.token, task::park(), Direction::Write)); 114 } 115 116 /// Unregister all information associated with a token on an event loop, 117 /// deallocating all internal resources assigned to the given token. 118 /// 119 /// This method should be called whenever a source of events is being 120 /// destroyed. This will ensure that the event loop can reuse `tok` for 121 /// another I/O object if necessary and also remove it from any poll 122 /// notifications and callbacks. 123 /// 124 /// Note that wake callbacks may still be invoked after this method is 125 /// called as it may take some time for the message to drop a source to 126 /// reach the event loop. Despite this fact, this method will attempt to 127 /// ensure that the callbacks are **not** invoked, so pending scheduled 128 /// callbacks cannot be relied upon to get called. 129 /// 130 /// > **Note**: This method should generally not be used directly, but 131 /// > rather the `ReadinessStream` type should be used instead. 132 /// 133 /// # Panics 134 /// 135 /// This function will panic if the event loop this handle is associated 136 /// with has gone away, or if there is an error communicating with the event 137 /// loop. drop_source(&self, handle: &Remote)138 pub fn drop_source(&self, handle: &Remote) { 139 handle.send(Message::DropSource(self.token)); 140 } 141 } 142