1 use {channel, Poll, Events, Token}; 2 use event::Evented; 3 use deprecated::{Handler, NotifyError}; 4 use event_imp::{Event, Ready, PollOpt}; 5 use timer::{self, Timer, Timeout}; 6 use std::{io, fmt, usize}; 7 use std::default::Default; 8 use std::time::Duration; 9 10 #[derive(Debug, Default, Clone)] 11 pub struct EventLoopBuilder { 12 config: Config, 13 } 14 15 /// `EventLoop` configuration details 16 #[derive(Clone, Debug)] 17 struct Config { 18 // == Notifications == 19 notify_capacity: usize, 20 messages_per_tick: usize, 21 22 // == Timer == 23 timer_tick: Duration, 24 timer_wheel_size: usize, 25 timer_capacity: usize, 26 } 27 28 impl Default for Config { default() -> Config29 fn default() -> Config { 30 // Default EventLoop configuration values 31 Config { 32 notify_capacity: 4_096, 33 messages_per_tick: 256, 34 timer_tick: Duration::from_millis(100), 35 timer_wheel_size: 1_024, 36 timer_capacity: 65_536, 37 } 38 } 39 } 40 41 impl EventLoopBuilder { 42 /// Construct a new `EventLoopBuilder` with the default configuration 43 /// values. new() -> EventLoopBuilder44 pub fn new() -> EventLoopBuilder { 45 EventLoopBuilder::default() 46 } 47 48 /// Sets the maximum number of messages that can be buffered on the event 49 /// loop's notification channel before a send will fail. 50 /// 51 /// The default value for this is 4096. notify_capacity(&mut self, capacity: usize) -> &mut Self52 pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self { 53 self.config.notify_capacity = capacity; 54 self 55 } 56 57 /// Sets the maximum number of messages that can be processed on any tick of 58 /// the event loop. 59 /// 60 /// The default value for this is 256. messages_per_tick(&mut self, messages: usize) -> &mut Self61 pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self { 62 self.config.messages_per_tick = messages; 63 self 64 } 65 timer_tick(&mut self, val: Duration) -> &mut Self66 pub fn timer_tick(&mut self, val: Duration) -> &mut Self { 67 self.config.timer_tick = val; 68 self 69 } 70 timer_wheel_size(&mut self, size: usize) -> &mut Self71 pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self { 72 self.config.timer_wheel_size = size; 73 self 74 } 75 timer_capacity(&mut self, cap: usize) -> &mut Self76 pub fn timer_capacity(&mut self, cap: usize) -> &mut Self { 77 self.config.timer_capacity = cap; 78 self 79 } 80 81 /// Constructs a new `EventLoop` using the configured values. The 82 /// `EventLoop` will not be running. build<H: Handler>(self) -> io::Result<EventLoop<H>>83 pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> { 84 EventLoop::configured(self.config) 85 } 86 } 87 88 /// Single threaded IO event loop. 89 pub struct EventLoop<H: Handler> { 90 run: bool, 91 poll: Poll, 92 events: Events, 93 timer: Timer<H::Timeout>, 94 notify_tx: channel::SyncSender<H::Message>, 95 notify_rx: channel::Receiver<H::Message>, 96 config: Config, 97 } 98 99 // Token used to represent notifications 100 const NOTIFY: Token = Token(usize::MAX - 1); 101 const TIMER: Token = Token(usize::MAX - 2); 102 103 impl<H: Handler> EventLoop<H> { 104 105 /// Constructs a new `EventLoop` using the default configuration values. 106 /// The `EventLoop` will not be running. new() -> io::Result<EventLoop<H>>107 pub fn new() -> io::Result<EventLoop<H>> { 108 EventLoop::configured(Config::default()) 109 } 110 configured(config: Config) -> io::Result<EventLoop<H>>111 fn configured(config: Config) -> io::Result<EventLoop<H>> { 112 // Create the IO poller 113 let poll = Poll::new()?; 114 115 let timer = timer::Builder::default() 116 .tick_duration(config.timer_tick) 117 .num_slots(config.timer_wheel_size) 118 .capacity(config.timer_capacity) 119 .build(); 120 121 // Create cross thread notification queue 122 let (tx, rx) = channel::sync_channel(config.notify_capacity); 123 124 // Register the notification wakeup FD with the IO poller 125 poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?; 126 poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?; 127 128 Ok(EventLoop { 129 run: true, 130 poll, 131 timer, 132 notify_tx: tx, 133 notify_rx: rx, 134 config, 135 events: Events::with_capacity(1024), 136 }) 137 } 138 139 /// Returns a sender that allows sending messages to the event loop in a 140 /// thread-safe way, waking up the event loop if needed. 141 /// 142 /// # Implementation Details 143 /// 144 /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated 145 /// buffer size. The size can be changed by modifying 146 /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity). 147 /// When a message is sent to the EventLoop, it is first pushed on to the 148 /// queue. Then, if the EventLoop is currently running, an atomic flag is 149 /// set to indicate that the next loop iteration should be started without 150 /// waiting. 151 /// 152 /// If the loop is blocked waiting for IO events, then it is woken up. The 153 /// strategy for waking up the event loop is platform dependent. For 154 /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe 155 /// is used. 156 /// 157 /// The strategy of setting an atomic flag if the event loop is not already 158 /// sleeping allows avoiding an expensive wakeup operation if at all possible. channel(&self) -> Sender<H::Message>159 pub fn channel(&self) -> Sender<H::Message> { 160 Sender::new(self.notify_tx.clone()) 161 } 162 163 /// Schedules a timeout after the requested time interval. When the 164 /// duration has been reached, 165 /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked 166 /// passing in the supplied token. 167 /// 168 /// Returns a handle to the timeout that can be used to cancel the timeout 169 /// using [#clear_timeout](#method.clear_timeout). timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout>170 pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> { 171 self.timer.set_timeout(delay, token) 172 } 173 174 /// If the supplied timeout has not been triggered, cancel it such that it 175 /// will not be triggered in the future. clear_timeout(&mut self, timeout: &Timeout) -> bool176 pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool { 177 self.timer.cancel_timeout(&timeout).is_some() 178 } 179 180 /// Tells the event loop to exit after it is done handling all events in the 181 /// current iteration. shutdown(&mut self)182 pub fn shutdown(&mut self) { 183 self.run = false; 184 } 185 186 /// Indicates whether the event loop is currently running. If it's not it has either 187 /// stopped or is scheduled to stop on the next tick. is_running(&self) -> bool188 pub fn is_running(&self) -> bool { 189 self.run 190 } 191 192 /// Registers an IO handle with the event loop. register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented193 pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> 194 where E: Evented 195 { 196 self.poll.register(io, token, interest, opt) 197 } 198 199 /// Re-Registers an IO handle with the event loop. reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented200 pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> 201 where E: Evented 202 { 203 self.poll.reregister(io, token, interest, opt) 204 } 205 206 /// Keep spinning the event loop indefinitely, and notify the handler whenever 207 /// any of the registered handles are ready. run(&mut self, handler: &mut H) -> io::Result<()>208 pub fn run(&mut self, handler: &mut H) -> io::Result<()> { 209 self.run = true; 210 211 while self.run { 212 // Execute ticks as long as the event loop is running 213 self.run_once(handler, None)?; 214 } 215 216 Ok(()) 217 } 218 219 /// Deregisters an IO handle with the event loop. 220 /// 221 /// Both kqueue and epoll will automatically clear any pending events when closing a 222 /// file descriptor (socket). In that case, this method does not need to be called 223 /// prior to dropping a connection from the slab. 224 /// 225 /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with 226 /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error. deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented227 pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented { 228 self.poll.deregister(io) 229 } 230 231 /// Spin the event loop once, with a given timeout (forever if `None`), 232 /// and notify the handler if any of the registered handles become ready 233 /// during that time. run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()>234 pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> { 235 trace!("event loop tick"); 236 237 // Check the registered IO handles for any new events. Each poll 238 // is for one second, so a shutdown request can last as long as 239 // one second before it takes effect. 240 let events = match self.io_poll(timeout) { 241 Ok(e) => e, 242 Err(err) => { 243 if err.kind() == io::ErrorKind::Interrupted { 244 handler.interrupted(self); 245 0 246 } else { 247 return Err(err); 248 } 249 } 250 }; 251 252 self.io_process(handler, events); 253 handler.tick(self); 254 Ok(()) 255 } 256 257 #[inline] io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>258 fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> { 259 self.poll.poll(&mut self.events, timeout) 260 } 261 262 // Process IO events that have been previously polled io_process(&mut self, handler: &mut H, cnt: usize)263 fn io_process(&mut self, handler: &mut H, cnt: usize) { 264 let mut i = 0; 265 266 trace!("io_process(..); cnt={}; len={}", cnt, self.events.len()); 267 268 // Iterate over the notifications. Each event provides the token 269 // it was registered with (which usually represents, at least, the 270 // handle that the event is about) as well as information about 271 // what kind of event occurred (readable, writable, signal, etc.) 272 while i < cnt { 273 let evt = self.events.get(i).unwrap(); 274 275 trace!("event={:?}; idx={:?}", evt, i); 276 277 match evt.token() { 278 NOTIFY => self.notify(handler), 279 TIMER => self.timer_process(handler), 280 _ => self.io_event(handler, evt) 281 } 282 283 i += 1; 284 } 285 } 286 io_event(&mut self, handler: &mut H, evt: Event)287 fn io_event(&mut self, handler: &mut H, evt: Event) { 288 handler.ready(self, evt.token(), evt.readiness()); 289 } 290 notify(&mut self, handler: &mut H)291 fn notify(&mut self, handler: &mut H) { 292 for _ in 0..self.config.messages_per_tick { 293 match self.notify_rx.try_recv() { 294 Ok(msg) => handler.notify(self, msg), 295 _ => break, 296 } 297 } 298 299 // Re-register 300 let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()); 301 } 302 timer_process(&mut self, handler: &mut H)303 fn timer_process(&mut self, handler: &mut H) { 304 while let Some(t) = self.timer.poll() { 305 handler.timeout(self, t); 306 } 307 } 308 } 309 310 impl<H: Handler> fmt::Debug for EventLoop<H> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result311 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 312 fmt.debug_struct("EventLoop") 313 .field("run", &self.run) 314 .field("poll", &self.poll) 315 .field("config", &self.config) 316 .finish() 317 } 318 } 319 320 /// Sends messages to the EventLoop from other threads. 321 pub struct Sender<M> { 322 tx: channel::SyncSender<M> 323 } 324 325 impl<M> fmt::Debug for Sender<M> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result326 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 327 write!(fmt, "Sender<?> {{ ... }}") 328 } 329 } 330 331 impl<M> Clone for Sender <M> { clone(&self) -> Sender<M>332 fn clone(&self) -> Sender<M> { 333 Sender { tx: self.tx.clone() } 334 } 335 } 336 337 impl<M> Sender<M> { new(tx: channel::SyncSender<M>) -> Sender<M>338 fn new(tx: channel::SyncSender<M>) -> Sender<M> { 339 Sender { tx } 340 } 341 send(&self, msg: M) -> Result<(), NotifyError<M>>342 pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> { 343 self.tx.try_send(msg)?; 344 Ok(()) 345 } 346 } 347