1 use {channel, Poll, Events, Token};
2 use event::Evented;
3 use deprecated::{Handler, NotifyError};
4 use event_imp::{Event, Ready, PollOpt};
5 use timer::{self, Timer, Timeout};
6 use std::{io, fmt, usize};
7 use std::default::Default;
8 use std::time::Duration;
9 
10 #[derive(Debug, Default, Clone)]
11 pub struct EventLoopBuilder {
12     config: Config,
13 }
14 
15 /// `EventLoop` configuration details
16 #[derive(Clone, Debug)]
17 struct Config {
18     // == Notifications ==
19     notify_capacity: usize,
20     messages_per_tick: usize,
21 
22     // == Timer ==
23     timer_tick: Duration,
24     timer_wheel_size: usize,
25     timer_capacity: usize,
26 }
27 
28 impl Default for Config {
default() -> Config29     fn default() -> Config {
30         // Default EventLoop configuration values
31         Config {
32             notify_capacity: 4_096,
33             messages_per_tick: 256,
34             timer_tick: Duration::from_millis(100),
35             timer_wheel_size: 1_024,
36             timer_capacity: 65_536,
37         }
38     }
39 }
40 
41 impl EventLoopBuilder {
42     /// Construct a new `EventLoopBuilder` with the default configuration
43     /// values.
new() -> EventLoopBuilder44     pub fn new() -> EventLoopBuilder {
45         EventLoopBuilder::default()
46     }
47 
48     /// Sets the maximum number of messages that can be buffered on the event
49     /// loop's notification channel before a send will fail.
50     ///
51     /// The default value for this is 4096.
notify_capacity(&mut self, capacity: usize) -> &mut Self52     pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self {
53         self.config.notify_capacity = capacity;
54         self
55     }
56 
57     /// Sets the maximum number of messages that can be processed on any tick of
58     /// the event loop.
59     ///
60     /// The default value for this is 256.
messages_per_tick(&mut self, messages: usize) -> &mut Self61     pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self {
62         self.config.messages_per_tick = messages;
63         self
64     }
65 
timer_tick(&mut self, val: Duration) -> &mut Self66     pub fn timer_tick(&mut self, val: Duration) -> &mut Self {
67         self.config.timer_tick = val;
68         self
69     }
70 
timer_wheel_size(&mut self, size: usize) -> &mut Self71     pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self {
72         self.config.timer_wheel_size = size;
73         self
74     }
75 
timer_capacity(&mut self, cap: usize) -> &mut Self76     pub fn timer_capacity(&mut self, cap: usize) -> &mut Self {
77         self.config.timer_capacity = cap;
78         self
79     }
80 
81     /// Constructs a new `EventLoop` using the configured values. The
82     /// `EventLoop` will not be running.
build<H: Handler>(self) -> io::Result<EventLoop<H>>83     pub fn build<H: Handler>(self) -> io::Result<EventLoop<H>> {
84         EventLoop::configured(self.config)
85     }
86 }
87 
88 /// Single threaded IO event loop.
89 pub struct EventLoop<H: Handler> {
90     run: bool,
91     poll: Poll,
92     events: Events,
93     timer: Timer<H::Timeout>,
94     notify_tx: channel::SyncSender<H::Message>,
95     notify_rx: channel::Receiver<H::Message>,
96     config: Config,
97 }
98 
99 // Token used to represent notifications
100 const NOTIFY: Token = Token(usize::MAX - 1);
101 const TIMER: Token = Token(usize::MAX - 2);
102 
103 impl<H: Handler> EventLoop<H> {
104 
105     /// Constructs a new `EventLoop` using the default configuration values.
106     /// The `EventLoop` will not be running.
new() -> io::Result<EventLoop<H>>107     pub fn new() -> io::Result<EventLoop<H>> {
108         EventLoop::configured(Config::default())
109     }
110 
configured(config: Config) -> io::Result<EventLoop<H>>111     fn configured(config: Config) -> io::Result<EventLoop<H>> {
112         // Create the IO poller
113         let poll = Poll::new()?;
114 
115         let timer = timer::Builder::default()
116             .tick_duration(config.timer_tick)
117             .num_slots(config.timer_wheel_size)
118             .capacity(config.timer_capacity)
119             .build();
120 
121         // Create cross thread notification queue
122         let (tx, rx) = channel::sync_channel(config.notify_capacity);
123 
124         // Register the notification wakeup FD with the IO poller
125         poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
126         poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;
127 
128         Ok(EventLoop {
129             run: true,
130             poll,
131             timer,
132             notify_tx: tx,
133             notify_rx: rx,
134             config,
135             events: Events::with_capacity(1024),
136         })
137     }
138 
139     /// Returns a sender that allows sending messages to the event loop in a
140     /// thread-safe way, waking up the event loop if needed.
141     ///
142     /// # Implementation Details
143     ///
144     /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
145     /// buffer size. The size can be changed by modifying
146     /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
147     /// When a message is sent to the EventLoop, it is first pushed on to the
148     /// queue. Then, if the EventLoop is currently running, an atomic flag is
149     /// set to indicate that the next loop iteration should be started without
150     /// waiting.
151     ///
152     /// If the loop is blocked waiting for IO events, then it is woken up. The
153     /// strategy for waking up the event loop is platform dependent. For
154     /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
155     /// is used.
156     ///
157     /// The strategy of setting an atomic flag if the event loop is not already
158     /// sleeping allows avoiding an expensive wakeup operation if at all possible.
channel(&self) -> Sender<H::Message>159     pub fn channel(&self) -> Sender<H::Message> {
160         Sender::new(self.notify_tx.clone())
161     }
162 
163     /// Schedules a timeout after the requested time interval. When the
164     /// duration has been reached,
165     /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
166     /// passing in the supplied token.
167     ///
168     /// Returns a handle to the timeout that can be used to cancel the timeout
169     /// using [#clear_timeout](#method.clear_timeout).
timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout>170     pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
171         self.timer.set_timeout(delay, token)
172     }
173 
174     /// If the supplied timeout has not been triggered, cancel it such that it
175     /// will not be triggered in the future.
clear_timeout(&mut self, timeout: &Timeout) -> bool176     pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
177         self.timer.cancel_timeout(&timeout).is_some()
178     }
179 
180     /// Tells the event loop to exit after it is done handling all events in the
181     /// current iteration.
shutdown(&mut self)182     pub fn shutdown(&mut self) {
183         self.run = false;
184     }
185 
186     /// Indicates whether the event loop is currently running. If it's not it has either
187     /// stopped or is scheduled to stop on the next tick.
is_running(&self) -> bool188     pub fn is_running(&self) -> bool {
189         self.run
190     }
191 
192     /// Registers an IO handle with the event loop.
register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented193     pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
194         where E: Evented
195     {
196         self.poll.register(io, token, interest, opt)
197     }
198 
199     /// Re-Registers an IO handle with the event loop.
reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented200     pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
201         where E: Evented
202     {
203         self.poll.reregister(io, token, interest, opt)
204     }
205 
206     /// Keep spinning the event loop indefinitely, and notify the handler whenever
207     /// any of the registered handles are ready.
run(&mut self, handler: &mut H) -> io::Result<()>208     pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
209         self.run = true;
210 
211         while self.run {
212             // Execute ticks as long as the event loop is running
213             self.run_once(handler, None)?;
214         }
215 
216         Ok(())
217     }
218 
219     /// Deregisters an IO handle with the event loop.
220     ///
221     /// Both kqueue and epoll will automatically clear any pending events when closing a
222     /// file descriptor (socket). In that case, this method does not need to be called
223     /// prior to dropping a connection from the slab.
224     ///
225     /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
226     /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented227     pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
228         self.poll.deregister(io)
229     }
230 
231     /// Spin the event loop once, with a given timeout (forever if `None`),
232     /// and notify the handler if any of the registered handles become ready
233     /// during that time.
run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()>234     pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
235         trace!("event loop tick");
236 
237         // Check the registered IO handles for any new events. Each poll
238         // is for one second, so a shutdown request can last as long as
239         // one second before it takes effect.
240         let events = match self.io_poll(timeout) {
241             Ok(e) => e,
242             Err(err) => {
243                 if err.kind() == io::ErrorKind::Interrupted {
244                     handler.interrupted(self);
245                     0
246                 } else {
247                     return Err(err);
248                 }
249             }
250         };
251 
252         self.io_process(handler, events);
253         handler.tick(self);
254         Ok(())
255     }
256 
257     #[inline]
io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize>258     fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
259         self.poll.poll(&mut self.events, timeout)
260     }
261 
262     // Process IO events that have been previously polled
io_process(&mut self, handler: &mut H, cnt: usize)263     fn io_process(&mut self, handler: &mut H, cnt: usize) {
264         let mut i = 0;
265 
266         trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());
267 
268         // Iterate over the notifications. Each event provides the token
269         // it was registered with (which usually represents, at least, the
270         // handle that the event is about) as well as information about
271         // what kind of event occurred (readable, writable, signal, etc.)
272         while i < cnt {
273             let evt = self.events.get(i).unwrap();
274 
275             trace!("event={:?}; idx={:?}", evt, i);
276 
277             match evt.token() {
278                 NOTIFY => self.notify(handler),
279                 TIMER => self.timer_process(handler),
280                 _ => self.io_event(handler, evt)
281             }
282 
283             i += 1;
284         }
285     }
286 
io_event(&mut self, handler: &mut H, evt: Event)287     fn io_event(&mut self, handler: &mut H, evt: Event) {
288         handler.ready(self, evt.token(), evt.readiness());
289     }
290 
notify(&mut self, handler: &mut H)291     fn notify(&mut self, handler: &mut H) {
292         for _ in 0..self.config.messages_per_tick {
293             match self.notify_rx.try_recv() {
294                 Ok(msg) => handler.notify(self, msg),
295                 _ => break,
296             }
297         }
298 
299         // Re-register
300         let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
301     }
302 
timer_process(&mut self, handler: &mut H)303     fn timer_process(&mut self, handler: &mut H) {
304         while let Some(t) = self.timer.poll() {
305             handler.timeout(self, t);
306         }
307     }
308 }
309 
310 impl<H: Handler> fmt::Debug for EventLoop<H> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result311     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
312         fmt.debug_struct("EventLoop")
313             .field("run", &self.run)
314             .field("poll", &self.poll)
315             .field("config", &self.config)
316             .finish()
317     }
318 }
319 
320 /// Sends messages to the EventLoop from other threads.
321 pub struct Sender<M> {
322     tx: channel::SyncSender<M>
323 }
324 
325 impl<M> fmt::Debug for Sender<M> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result326     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
327         write!(fmt, "Sender<?> {{ ... }}")
328     }
329 }
330 
331 impl<M> Clone for Sender <M> {
clone(&self) -> Sender<M>332     fn clone(&self) -> Sender<M> {
333         Sender { tx: self.tx.clone() }
334     }
335 }
336 
337 impl<M> Sender<M> {
new(tx: channel::SyncSender<M>) -> Sender<M>338     fn new(tx: channel::SyncSender<M>) -> Sender<M> {
339         Sender { tx }
340     }
341 
send(&self, msg: M) -> Result<(), NotifyError<M>>342     pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
343         self.tx.try_send(msg)?;
344         Ok(())
345     }
346 }
347