1 use {channel, Evented, Poll, Events, Token};
2 use deprecated::{Handler, NotifyError};
3 use event_imp::{Event, Ready, PollOpt};
4 use timer::{self, Timer, Timeout};
5 use std::{io, fmt, usize};
6 use std::default::Default;
7 use std::time::Duration;
8 
9 #[derive(Debug, Default, Clone)]
10 pub struct EventLoopBuilder {
11     config: Config,
12 }
13 
14 /// `EventLoop` configuration details
15 #[derive(Clone, Debug)]
16 struct Config {
17     // == Notifications ==
18     notify_capacity: usize,
19     messages_per_tick: usize,
20 
21     // == Timer ==
22     timer_tick: Duration,
23     timer_wheel_size: usize,
24     timer_capacity: usize,
25 }
26 
27 impl Default for Config {
default() -> Config28     fn default() -> Config {
29         // Default EventLoop configuration values
30         Config {
31             notify_capacity: 4_096,
32             messages_per_tick: 256,
33             timer_tick: Duration::from_millis(100),
34             timer_wheel_size: 1_024,
35             timer_capacity: 65_536,
36         }
37     }
38 }
39 
40 impl EventLoopBuilder {
41     /// Construct a new `EventLoopBuilder` with the default configuration
42     /// values.
new() -> EventLoopBuilder43     pub fn new() -> EventLoopBuilder {
44         EventLoopBuilder::default()
45     }
46 
47     /// Sets the maximum number of messages that can be buffered on the event
48     /// loop's notification channel before a send will fail.
49     ///
50     /// The default value for this is 4096.
notify_capacity(&mut self, capacity: usize) -> &mut Self51     pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self {
52         self.config.notify_capacity = capacity;
53         self
54     }
55 
56     /// Sets the maximum number of messages that can be processed on any tick of
57     /// the event loop.
58     ///
59     /// The default value for this is 256.
messages_per_tick(&mut self, messages: usize) -> &mut Self60     pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self {
61         self.config.messages_per_tick = messages;
62         self
63     }
64 
timer_tick(&mut self, val: Duration) -> &mut Self65     pub fn timer_tick(&mut self, val: Duration) -> &mut Self {
66         self.config.timer_tick = val;
67         self
68     }
69 
timer_wheel_size(&mut self, size: usize) -> &mut Self70     pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self {
71         self.config.timer_wheel_size = size;
72         self
73     }
74 
timer_capacity(&mut self, cap: usize) -> &mut Self75     pub fn timer_capacity(&mut self, cap: usize) -> &mut Self {
76         self.config.timer_capacity = cap;
77         self
78     }
79 
80     /// Constructs a new `EventLoop` using the configured values. The
81     /// `EventLoop` will not be running.
build<H: Handler>(self) -> io::Result<EventLoop<H>>82     pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> {
83         EventLoop::configured(self.config)
84     }
85 }
86 
87 /// Single threaded IO event loop.
88 pub struct EventLoop<H: Handler> {
89     run: bool,
90     poll: Poll,
91     events: Events,
92     timer: Timer<H::Timeout>,
93     notify_tx: channel::SyncSender<H::Message>,
94     notify_rx: channel::Receiver<H::Message>,
95     config: Config,
96 }
97 
98 // Token used to represent notifications
99 const NOTIFY: Token = Token(usize::MAX - 1);
100 const TIMER: Token = Token(usize::MAX - 2);
101 
102 impl<H: Handler> EventLoop<H> {
103 
104     /// Constructs a new `EventLoop` using the default configuration values.
105     /// The `EventLoop` will not be running.
new() -> io::Result<EventLoop<H>>106     pub fn new() -> io::Result<EventLoop<H>> {
107         EventLoop::configured(Config::default())
108     }
109 
configured(config: Config) -> io::Result<EventLoop<H>>110     fn configured(config: Config) -> io::Result<EventLoop<H>> {
111         // Create the IO poller
112         let poll = try!(Poll::new());
113 
114         let timer = timer::Builder::default()
115             .tick_duration(config.timer_tick)
116             .num_slots(config.timer_wheel_size)
117             .capacity(config.timer_capacity)
118             .build();
119 
120         // Create cross thread notification queue
121         let (tx, rx) = channel::sync_channel(config.notify_capacity);
122 
123         // Register the notification wakeup FD with the IO poller
124         try!(poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()));
125         try!(poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge()));
126 
127         Ok(EventLoop {
128             run: true,
129             poll: poll,
130             timer: timer,
131             notify_tx: tx,
132             notify_rx: rx,
133             config: config,
134             events: Events::with_capacity(1024),
135         })
136     }
137 
138     /// Returns a sender that allows sending messages to the event loop in a
139     /// thread-safe way, waking up the event loop if needed.
140     ///
141     /// # Example
142     /// ```
143     /// use std::thread;
144     /// use mio::deprecated::{EventLoop, Handler};
145     ///
146     /// struct MyHandler;
147     ///
148     /// impl Handler for MyHandler {
149     ///     type Timeout = ();
150     ///     type Message = u32;
151     ///
152     ///     fn notify(&mut self, event_loop: &mut EventLoop<MyHandler>, msg: u32) {
153     ///         assert_eq!(msg, 123);
154     ///         event_loop.shutdown();
155     ///     }
156     /// }
157     ///
158     /// let mut event_loop = EventLoop::new().unwrap();
159     /// let sender = event_loop.channel();
160     ///
161     /// // Send the notification from another thread
162     /// thread::spawn(move || {
163     ///     let _ = sender.send(123);
164     /// });
165     ///
166     /// let _ = event_loop.run(&mut MyHandler);
167     /// ```
168     ///
169     /// # Implementation Details
170     ///
171     /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
172     /// buffer size. The size can be changed by modifying
173     /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
174     /// When a message is sent to the EventLoop, it is first pushed on to the
175     /// queue. Then, if the EventLoop is currently running, an atomic flag is
176     /// set to indicate that the next loop iteration should be started without
177     /// waiting.
178     ///
179     /// If the loop is blocked waiting for IO events, then it is woken up. The
180     /// strategy for waking up the event loop is platform dependent. For
181     /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
182     /// is used.
183     ///
184     /// The strategy of setting an atomic flag if the event loop is not already
185     /// sleeping allows avoiding an expensive wakeup operation if at all possible.
channel(&self) -> Sender<H::Message>186     pub fn channel(&self) -> Sender<H::Message> {
187         Sender::new(self.notify_tx.clone())
188     }
189 
190     /// Schedules a timeout after the requested time interval. When the
191     /// duration has been reached,
192     /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
193     /// passing in the supplied token.
194     ///
195     /// Returns a handle to the timeout that can be used to cancel the timeout
196     /// using [#clear_timeout](#method.clear_timeout).
197     ///
198     /// # Example
199     /// ```
200     /// use mio::deprecated::{EventLoop, Handler};
201     /// use std::time::Duration;
202     ///
203     /// struct MyHandler;
204     ///
205     /// impl Handler for MyHandler {
206     ///     type Timeout = u32;
207     ///     type Message = ();
208     ///
209     ///     fn timeout(&mut self, event_loop: &mut EventLoop<MyHandler>, timeout: u32) {
210     ///         assert_eq!(timeout, 123);
211     ///         event_loop.shutdown();
212     ///     }
213     /// }
214     ///
215     ///
216     /// let mut event_loop = EventLoop::new().unwrap();
217     /// let timeout = event_loop.timeout(123, Duration::from_millis(300)).unwrap();
218     /// let _ = event_loop.run(&mut MyHandler);
219     /// ```
timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout>220     pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
221         self.timer.set_timeout(delay, token)
222     }
223 
224     /// If the supplied timeout has not been triggered, cancel it such that it
225     /// will not be triggered in the future.
clear_timeout(&mut self, timeout: &Timeout) -> bool226     pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
227         self.timer.cancel_timeout(&timeout).is_some()
228     }
229 
230     /// Tells the event loop to exit after it is done handling all events in the
231     /// current iteration.
shutdown(&mut self)232     pub fn shutdown(&mut self) {
233         self.run = false;
234     }
235 
236     /// Indicates whether the event loop is currently running. If it's not it has either
237     /// stopped or is scheduled to stop on the next tick.
is_running(&self) -> bool238     pub fn is_running(&self) -> bool {
239         self.run
240     }
241 
242     /// Registers an IO handle with the event loop.
register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented243     pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
244         where E: Evented
245     {
246         self.poll.register(io, token, interest, opt)
247     }
248 
249     /// Re-Registers an IO handle with the event loop.
reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented250     pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
251         where E: Evented
252     {
253         self.poll.reregister(io, token, interest, opt)
254     }
255 
256     /// Keep spinning the event loop indefinitely, and notify the handler whenever
257     /// any of the registered handles are ready.
run(&mut self, handler: &mut H) -> io::Result<()>258     pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
259         self.run = true;
260 
261         while self.run {
262             // Execute ticks as long as the event loop is running
263             try!(self.run_once(handler, None));
264         }
265 
266         Ok(())
267     }
268 
269     /// Deregisters an IO handle with the event loop.
270     ///
271     /// Both kqueue and epoll will automatically clear any pending events when closing a
272     /// file descriptor (socket). In that case, this method does not need to be called
273     /// prior to dropping a connection from the slab.
274     ///
275     /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
276     /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented277     pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
278         self.poll.deregister(io)
279     }
280 
281     /// Spin the event loop once, with a given timeout (forever if `None`),
282     /// and notify the handler if any of the registered handles become ready
283     /// during that time.
run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()>284     pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
285         trace!("event loop tick");
286 
287         // Check the registered IO handles for any new events. Each poll
288         // is for one second, so a shutdown request can last as long as
289         // one second before it takes effect.
290         let events = match self.io_poll(timeout) {
291             Ok(e) => e,
292             Err(err) => {
293                 if err.kind() == io::ErrorKind::Interrupted {
294                     handler.interrupted(self);
295                     0
296                 } else {
297                     return Err(err);
298                 }
299             }
300         };
301 
302         self.io_process(handler, events);
303         handler.tick(self);
304         Ok(())
305     }
306 
307     #[inline]
io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>308     fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
309         self.poll.poll(&mut self.events, timeout)
310     }
311 
312     // Process IO events that have been previously polled
io_process(&mut self, handler: &mut H, cnt: usize)313     fn io_process(&mut self, handler: &mut H, cnt: usize) {
314         let mut i = 0;
315 
316         trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());
317 
318         // Iterate over the notifications. Each event provides the token
319         // it was registered with (which usually represents, at least, the
320         // handle that the event is about) as well as information about
321         // what kind of event occurred (readable, writable, signal, etc.)
322         while i < cnt {
323             let evt = self.events.get(i).unwrap();
324 
325             trace!("event={:?}; idx={:?}", evt, i);
326 
327             match evt.token() {
328                 NOTIFY => self.notify(handler),
329                 TIMER => self.timer_process(handler),
330                 _ => self.io_event(handler, evt)
331             }
332 
333             i += 1;
334         }
335     }
336 
io_event(&mut self, handler: &mut H, evt: Event)337     fn io_event(&mut self, handler: &mut H, evt: Event) {
338         handler.ready(self, evt.token(), evt.kind());
339     }
340 
notify(&mut self, handler: &mut H)341     fn notify(&mut self, handler: &mut H) {
342         for _ in 0..self.config.messages_per_tick {
343             match self.notify_rx.try_recv() {
344                 Ok(msg) => handler.notify(self, msg),
345                 _ => break,
346             }
347         }
348 
349         // Re-register
350         let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
351     }
352 
timer_process(&mut self, handler: &mut H)353     fn timer_process(&mut self, handler: &mut H) {
354         while let Some(t) = self.timer.poll() {
355             handler.timeout(self, t);
356         }
357     }
358 }
359 
360 impl<H: Handler> fmt::Debug for EventLoop<H> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result361     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
362         fmt.debug_struct("EventLoop")
363             .field("run", &self.run)
364             .field("poll", &self.poll)
365             .field("config", &self.config)
366             .finish()
367     }
368 }
369 
370 /// Sends messages to the EventLoop from other threads.
371 pub struct Sender<M> {
372     tx: channel::SyncSender<M>
373 }
374 
375 impl<M> fmt::Debug for Sender<M> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result376     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
377         write!(fmt, "Sender<?> {{ ... }}")
378     }
379 }
380 
381 impl<M> Clone for Sender <M> {
clone(&self) -> Sender<M>382     fn clone(&self) -> Sender<M> {
383         Sender { tx: self.tx.clone() }
384     }
385 }
386 
387 impl<M> Sender<M> {
new(tx: channel::SyncSender<M>) -> Sender<M>388     fn new(tx: channel::SyncSender<M>) -> Sender<M> {
389         Sender { tx: tx }
390     }
391 
send(&self, msg: M) -> Result<(), NotifyError<M>>392     pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
393         try!(self.tx.try_send(msg));
394         Ok(())
395     }
396 }
397