1 use {channel, Evented, Poll, Events, Token}; 2 use deprecated::{Handler, NotifyError}; 3 use event_imp::{Event, Ready, PollOpt}; 4 use timer::{self, Timer, Timeout}; 5 use std::{io, fmt, usize}; 6 use std::default::Default; 7 use std::time::Duration; 8 9 #[derive(Debug, Default, Clone)] 10 pub struct EventLoopBuilder { 11 config: Config, 12 } 13 14 /// `EventLoop` configuration details 15 #[derive(Clone, Debug)] 16 struct Config { 17 // == Notifications == 18 notify_capacity: usize, 19 messages_per_tick: usize, 20 21 // == Timer == 22 timer_tick: Duration, 23 timer_wheel_size: usize, 24 timer_capacity: usize, 25 } 26 27 impl Default for Config { default() -> Config28 fn default() -> Config { 29 // Default EventLoop configuration values 30 Config { 31 notify_capacity: 4_096, 32 messages_per_tick: 256, 33 timer_tick: Duration::from_millis(100), 34 timer_wheel_size: 1_024, 35 timer_capacity: 65_536, 36 } 37 } 38 } 39 40 impl EventLoopBuilder { 41 /// Construct a new `EventLoopBuilder` with the default configuration 42 /// values. new() -> EventLoopBuilder43 pub fn new() -> EventLoopBuilder { 44 EventLoopBuilder::default() 45 } 46 47 /// Sets the maximum number of messages that can be buffered on the event 48 /// loop's notification channel before a send will fail. 49 /// 50 /// The default value for this is 4096. notify_capacity(&mut self, capacity: usize) -> &mut Self51 pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self { 52 self.config.notify_capacity = capacity; 53 self 54 } 55 56 /// Sets the maximum number of messages that can be processed on any tick of 57 /// the event loop. 58 /// 59 /// The default value for this is 256. messages_per_tick(&mut self, messages: usize) -> &mut Self60 pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self { 61 self.config.messages_per_tick = messages; 62 self 63 } 64 timer_tick(&mut self, val: Duration) -> &mut Self65 pub fn timer_tick(&mut self, val: Duration) -> &mut Self { 66 self.config.timer_tick = val; 67 self 68 } 69 timer_wheel_size(&mut self, size: usize) -> &mut Self70 pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self { 71 self.config.timer_wheel_size = size; 72 self 73 } 74 timer_capacity(&mut self, cap: usize) -> &mut Self75 pub fn timer_capacity(&mut self, cap: usize) -> &mut Self { 76 self.config.timer_capacity = cap; 77 self 78 } 79 80 /// Constructs a new `EventLoop` using the configured values. The 81 /// `EventLoop` will not be running. build<H: Handler>(self) -> io::Result<EventLoop<H>>82 pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> { 83 EventLoop::configured(self.config) 84 } 85 } 86 87 /// Single threaded IO event loop. 88 pub struct EventLoop<H: Handler> { 89 run: bool, 90 poll: Poll, 91 events: Events, 92 timer: Timer<H::Timeout>, 93 notify_tx: channel::SyncSender<H::Message>, 94 notify_rx: channel::Receiver<H::Message>, 95 config: Config, 96 } 97 98 // Token used to represent notifications 99 const NOTIFY: Token = Token(usize::MAX - 1); 100 const TIMER: Token = Token(usize::MAX - 2); 101 102 impl<H: Handler> EventLoop<H> { 103 104 /// Constructs a new `EventLoop` using the default configuration values. 105 /// The `EventLoop` will not be running. new() -> io::Result<EventLoop<H>>106 pub fn new() -> io::Result<EventLoop<H>> { 107 EventLoop::configured(Config::default()) 108 } 109 configured(config: Config) -> io::Result<EventLoop<H>>110 fn configured(config: Config) -> io::Result<EventLoop<H>> { 111 // Create the IO poller 112 let poll = try!(Poll::new()); 113 114 let timer = timer::Builder::default() 115 .tick_duration(config.timer_tick) 116 .num_slots(config.timer_wheel_size) 117 .capacity(config.timer_capacity) 118 .build(); 119 120 // Create cross thread notification queue 121 let (tx, rx) = channel::sync_channel(config.notify_capacity); 122 123 // Register the notification wakeup FD with the IO poller 124 try!(poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())); 125 try!(poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())); 126 127 Ok(EventLoop { 128 run: true, 129 poll: poll, 130 timer: timer, 131 notify_tx: tx, 132 notify_rx: rx, 133 config: config, 134 events: Events::with_capacity(1024), 135 }) 136 } 137 138 /// Returns a sender that allows sending messages to the event loop in a 139 /// thread-safe way, waking up the event loop if needed. 140 /// 141 /// # Example 142 /// ``` 143 /// use std::thread; 144 /// use mio::deprecated::{EventLoop, Handler}; 145 /// 146 /// struct MyHandler; 147 /// 148 /// impl Handler for MyHandler { 149 /// type Timeout = (); 150 /// type Message = u32; 151 /// 152 /// fn notify(&mut self, event_loop: &mut EventLoop<MyHandler>, msg: u32) { 153 /// assert_eq!(msg, 123); 154 /// event_loop.shutdown(); 155 /// } 156 /// } 157 /// 158 /// let mut event_loop = EventLoop::new().unwrap(); 159 /// let sender = event_loop.channel(); 160 /// 161 /// // Send the notification from another thread 162 /// thread::spawn(move || { 163 /// let _ = sender.send(123); 164 /// }); 165 /// 166 /// let _ = event_loop.run(&mut MyHandler); 167 /// ``` 168 /// 169 /// # Implementation Details 170 /// 171 /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated 172 /// buffer size. The size can be changed by modifying 173 /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity). 174 /// When a message is sent to the EventLoop, it is first pushed on to the 175 /// queue. Then, if the EventLoop is currently running, an atomic flag is 176 /// set to indicate that the next loop iteration should be started without 177 /// waiting. 178 /// 179 /// If the loop is blocked waiting for IO events, then it is woken up. The 180 /// strategy for waking up the event loop is platform dependent. For 181 /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe 182 /// is used. 183 /// 184 /// The strategy of setting an atomic flag if the event loop is not already 185 /// sleeping allows avoiding an expensive wakeup operation if at all possible. channel(&self) -> Sender<H::Message>186 pub fn channel(&self) -> Sender<H::Message> { 187 Sender::new(self.notify_tx.clone()) 188 } 189 190 /// Schedules a timeout after the requested time interval. When the 191 /// duration has been reached, 192 /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked 193 /// passing in the supplied token. 194 /// 195 /// Returns a handle to the timeout that can be used to cancel the timeout 196 /// using [#clear_timeout](#method.clear_timeout). 197 /// 198 /// # Example 199 /// ``` 200 /// use mio::deprecated::{EventLoop, Handler}; 201 /// use std::time::Duration; 202 /// 203 /// struct MyHandler; 204 /// 205 /// impl Handler for MyHandler { 206 /// type Timeout = u32; 207 /// type Message = (); 208 /// 209 /// fn timeout(&mut self, event_loop: &mut EventLoop<MyHandler>, timeout: u32) { 210 /// assert_eq!(timeout, 123); 211 /// event_loop.shutdown(); 212 /// } 213 /// } 214 /// 215 /// 216 /// let mut event_loop = EventLoop::new().unwrap(); 217 /// let timeout = event_loop.timeout(123, Duration::from_millis(300)).unwrap(); 218 /// let _ = event_loop.run(&mut MyHandler); 219 /// ``` timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout>220 pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> { 221 self.timer.set_timeout(delay, token) 222 } 223 224 /// If the supplied timeout has not been triggered, cancel it such that it 225 /// will not be triggered in the future. clear_timeout(&mut self, timeout: &Timeout) -> bool226 pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool { 227 self.timer.cancel_timeout(&timeout).is_some() 228 } 229 230 /// Tells the event loop to exit after it is done handling all events in the 231 /// current iteration. shutdown(&mut self)232 pub fn shutdown(&mut self) { 233 self.run = false; 234 } 235 236 /// Indicates whether the event loop is currently running. If it's not it has either 237 /// stopped or is scheduled to stop on the next tick. is_running(&self) -> bool238 pub fn is_running(&self) -> bool { 239 self.run 240 } 241 242 /// Registers an IO handle with the event loop. register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented243 pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> 244 where E: Evented 245 { 246 self.poll.register(io, token, interest, opt) 247 } 248 249 /// Re-Registers an IO handle with the event loop. reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented250 pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> 251 where E: Evented 252 { 253 self.poll.reregister(io, token, interest, opt) 254 } 255 256 /// Keep spinning the event loop indefinitely, and notify the handler whenever 257 /// any of the registered handles are ready. run(&mut self, handler: &mut H) -> io::Result<()>258 pub fn run(&mut self, handler: &mut H) -> io::Result<()> { 259 self.run = true; 260 261 while self.run { 262 // Execute ticks as long as the event loop is running 263 try!(self.run_once(handler, None)); 264 } 265 266 Ok(()) 267 } 268 269 /// Deregisters an IO handle with the event loop. 270 /// 271 /// Both kqueue and epoll will automatically clear any pending events when closing a 272 /// file descriptor (socket). In that case, this method does not need to be called 273 /// prior to dropping a connection from the slab. 274 /// 275 /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with 276 /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error. deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented277 pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented { 278 self.poll.deregister(io) 279 } 280 281 /// Spin the event loop once, with a given timeout (forever if `None`), 282 /// and notify the handler if any of the registered handles become ready 283 /// during that time. run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()>284 pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> { 285 trace!("event loop tick"); 286 287 // Check the registered IO handles for any new events. Each poll 288 // is for one second, so a shutdown request can last as long as 289 // one second before it takes effect. 290 let events = match self.io_poll(timeout) { 291 Ok(e) => e, 292 Err(err) => { 293 if err.kind() == io::ErrorKind::Interrupted { 294 handler.interrupted(self); 295 0 296 } else { 297 return Err(err); 298 } 299 } 300 }; 301 302 self.io_process(handler, events); 303 handler.tick(self); 304 Ok(()) 305 } 306 307 #[inline] io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>308 fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> { 309 self.poll.poll(&mut self.events, timeout) 310 } 311 312 // Process IO events that have been previously polled io_process(&mut self, handler: &mut H, cnt: usize)313 fn io_process(&mut self, handler: &mut H, cnt: usize) { 314 let mut i = 0; 315 316 trace!("io_process(..); cnt={}; len={}", cnt, self.events.len()); 317 318 // Iterate over the notifications. Each event provides the token 319 // it was registered with (which usually represents, at least, the 320 // handle that the event is about) as well as information about 321 // what kind of event occurred (readable, writable, signal, etc.) 322 while i < cnt { 323 let evt = self.events.get(i).unwrap(); 324 325 trace!("event={:?}; idx={:?}", evt, i); 326 327 match evt.token() { 328 NOTIFY => self.notify(handler), 329 TIMER => self.timer_process(handler), 330 _ => self.io_event(handler, evt) 331 } 332 333 i += 1; 334 } 335 } 336 io_event(&mut self, handler: &mut H, evt: Event)337 fn io_event(&mut self, handler: &mut H, evt: Event) { 338 handler.ready(self, evt.token(), evt.kind()); 339 } 340 notify(&mut self, handler: &mut H)341 fn notify(&mut self, handler: &mut H) { 342 for _ in 0..self.config.messages_per_tick { 343 match self.notify_rx.try_recv() { 344 Ok(msg) => handler.notify(self, msg), 345 _ => break, 346 } 347 } 348 349 // Re-register 350 let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()); 351 } 352 timer_process(&mut self, handler: &mut H)353 fn timer_process(&mut self, handler: &mut H) { 354 while let Some(t) = self.timer.poll() { 355 handler.timeout(self, t); 356 } 357 } 358 } 359 360 impl<H: Handler> fmt::Debug for EventLoop<H> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result361 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 362 fmt.debug_struct("EventLoop") 363 .field("run", &self.run) 364 .field("poll", &self.poll) 365 .field("config", &self.config) 366 .finish() 367 } 368 } 369 370 /// Sends messages to the EventLoop from other threads. 371 pub struct Sender<M> { 372 tx: channel::SyncSender<M> 373 } 374 375 impl<M> fmt::Debug for Sender<M> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result376 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 377 write!(fmt, "Sender<?> {{ ... }}") 378 } 379 } 380 381 impl<M> Clone for Sender <M> { clone(&self) -> Sender<M>382 fn clone(&self) -> Sender<M> { 383 Sender { tx: self.tx.clone() } 384 } 385 } 386 387 impl<M> Sender<M> { new(tx: channel::SyncSender<M>) -> Sender<M>388 fn new(tx: channel::SyncSender<M>) -> Sender<M> { 389 Sender { tx: tx } 390 } 391 send(&self, msg: M) -> Result<(), NotifyError<M>>392 pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> { 393 try!(self.tx.try_send(msg)); 394 Ok(()) 395 } 396 } 397