1 use {sys, Token};
2 use event_imp::{self as event, Ready, Event, Evented, PollOpt};
3 use std::{fmt, io, ptr, usize};
4 use std::cell::UnsafeCell;
5 use std::{mem, ops, isize};
6 #[cfg(all(unix, not(target_os = "fuchsia")))]
7 use std::os::unix::io::AsRawFd;
8 #[cfg(all(unix, not(target_os = "fuchsia")))]
9 use std::os::unix::io::RawFd;
10 use std::process;
11 use std::sync::{Arc, Mutex, Condvar};
12 use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
13 use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
14 use std::time::{Duration, Instant};
15 
16 // Poll is backed by two readiness queues. The first is a system readiness queue
17 // represented by `sys::Selector`. The system readiness queue handles events
18 // provided by the system, such as TCP and UDP. The second readiness queue is
19 // implemented in user space by `ReadinessQueue`. It provides a way to implement
20 // purely user space `Evented` types.
21 //
22 // `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
23 // list nodes. This significantly reduces the number of required allocations.
24 // Each `Registration` / `SetReadiness` pair allocates a single readiness node
25 // that is used for the lifetime of the registration.
26 //
27 // The readiness node also includes a single atomic variable, `state` that
28 // tracks most of the state associated with the registration. This includes the
29 // current readiness, interest, poll options, and internal state. When the node
30 // state is mutated, it is queued in the MPSC channel. A call to
31 // `ReadinessQueue::poll` will dequeue and process nodes. The node state can
32 // still be mutated while it is queued in the channel for processing.
33 // Intermediate state values do not matter as long as the final state is
34 // included in the call to `poll`. This is the eventually consistent nature of
35 // the readiness queue.
36 //
37 // The readiness node is ref counted using the `ref_count` field. On creation,
38 // the ref_count is initialized to 3: one `Registration` handle, one
39 // `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
40 // doesn't *always* hold a handle to the node, we don't use the Arc type for
41 // managing ref counts (this is to avoid constantly incrementing and
42 // decrementing the ref count when pushing & popping from the queue). When the
43 // `Registration` handle is dropped, the `dropped` flag is set on the node, then
44 // the node is pushed into the registration queue. When Poll::poll pops the
45 // node, it sees the drop flag is set, and decrements it's ref count.
46 //
47 // The MPSC queue is a modified version of the intrusive MPSC node based queue
48 // described by 1024cores [1].
49 //
50 // The first modification is that two markers are used instead of a single
51 // `stub`. The second marker is a `sleep_marker` which is used to signal to
52 // producers that the consumer is going to sleep. This sleep_marker is only used
53 // when the queue is empty, implying that the only node in the queue is
54 // `end_marker`.
55 //
56 // The second modification is an `until` argument passed to the dequeue
57 // function. When `poll` encounters a level-triggered node, the node will be
58 // immediately pushed back into the queue. In order to avoid an infinite loop,
59 // `poll` before pushing the node, the pointer is saved off and then passed
60 // again as the `until` argument. If the next node to pop is `until`, then
61 // `Dequeue::Empty` is returned.
62 //
63 // [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
64 
65 
66 /// Polls for readiness events on all registered values.
67 ///
68 /// `Poll` allows a program to monitor a large number of `Evented` types,
69 /// waiting until one or more become "ready" for some class of operations; e.g.
70 /// reading and writing. An `Evented` type is considered ready if it is possible
71 /// to immediately perform a corresponding operation; e.g. [`read`] or
72 /// [`write`].
73 ///
74 /// To use `Poll`, an `Evented` type must first be registered with the `Poll`
75 /// instance using the [`register`] method, supplying readiness interest. The
76 /// readiness interest tells `Poll` which specific operations on the handle to
77 /// monitor for readiness. A `Token` is also passed to the [`register`]
78 /// function. When `Poll` returns a readiness event, it will include this token.
79 /// This associates the event with the `Evented` handle that generated the
80 /// event.
81 ///
82 /// [`read`]: tcp/struct.TcpStream.html#method.read
83 /// [`write`]: tcp/struct.TcpStream.html#method.write
84 /// [`register`]: #method.register
85 ///
86 /// # Examples
87 ///
88 /// A basic example -- establishing a `TcpStream` connection.
89 ///
90 /// ```
91 /// # use std::error::Error;
92 /// # fn try_main() -> Result<(), Box<Error>> {
93 /// use mio::{Events, Poll, Ready, PollOpt, Token};
94 /// use mio::net::TcpStream;
95 ///
96 /// use std::net::{TcpListener, SocketAddr};
97 ///
98 /// // Bind a server socket to connect to.
99 /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
100 /// let server = TcpListener::bind(&addr)?;
101 ///
102 /// // Construct a new `Poll` handle as well as the `Events` we'll store into
103 /// let poll = Poll::new()?;
104 /// let mut events = Events::with_capacity(1024);
105 ///
106 /// // Connect the stream
107 /// let stream = TcpStream::connect(&server.local_addr()?)?;
108 ///
109 /// // Register the stream with `Poll`
110 /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
111 ///
112 /// // Wait for the socket to become ready. This has to happens in a loop to
113 /// // handle spurious wakeups.
114 /// loop {
115 ///     poll.poll(&mut events, None)?;
116 ///
117 ///     for event in &events {
118 ///         if event.token() == Token(0) && event.readiness().is_writable() {
119 ///             // The socket connected (probably, it could still be a spurious
120 ///             // wakeup)
121 ///             return Ok(());
122 ///         }
123 ///     }
124 /// }
125 /// #     Ok(())
126 /// # }
127 /// #
128 /// # fn main() {
129 /// #     try_main().unwrap();
130 /// # }
131 /// ```
132 ///
133 /// # Edge-triggered and level-triggered
134 ///
135 /// An [`Evented`] registration may request edge-triggered events or
136 /// level-triggered events. This is done by setting `register`'s
137 /// [`PollOpt`] argument to either [`edge`] or [`level`].
138 ///
139 /// The difference between the two can be described as follows. Supposed that
140 /// this scenario happens:
141 ///
142 /// 1. A [`TcpStream`] is registered with `Poll`.
143 /// 2. The socket receives 2kb of data.
144 /// 3. A call to [`Poll::poll`] returns the token associated with the socket
145 ///    indicating readable readiness.
146 /// 4. 1kb is read from the socket.
147 /// 5. Another call to [`Poll::poll`] is made.
148 ///
149 /// If when the socket was registered with `Poll`, edge triggered events were
150 /// requested, then the call to [`Poll::poll`] done in step **5** will
151 /// (probably) hang despite there being another 1kb still present in the socket
152 /// read buffer. The reason for this is that edge-triggered mode delivers events
153 /// only when changes occur on the monitored [`Evented`]. So, in step *5* the
154 /// caller might end up waiting for some data that is already present inside the
155 /// socket buffer.
156 ///
157 /// With edge-triggered events, operations **must** be performed on the
158 /// `Evented` type until [`WouldBlock`] is returned. In other words, after
159 /// receiving an event indicating readiness for a certain operation, one should
160 /// assume that [`Poll::poll`] may never return another event for the same token
161 /// and readiness until the operation returns [`WouldBlock`].
162 ///
163 /// By contrast, when level-triggered notifications was requested, each call to
164 /// [`Poll::poll`] will return an event for the socket as long as data remains
165 /// in the socket buffer. Generally, level-triggered events should be avoided if
166 /// high performance is a concern.
167 ///
168 /// Since even with edge-triggered events, multiple events can be generated upon
169 /// receipt of multiple chunks of data, the caller has the option to set the
170 /// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
171 /// after the event is returned from [`Poll::poll`]. The subsequent calls to
172 /// [`Poll::poll`] will no longer include events for [`Evented`] handles that
173 /// are disabled even if the readiness state changes. The handle can be
174 /// re-enabled by calling [`reregister`]. When handles are disabled, internal
175 /// resources used to monitor the handle are maintained until the handle is
176 /// dropped or deregistered. This makes re-registering the handle a fast
177 /// operation.
178 ///
179 /// For example, in the following scenario:
180 ///
181 /// 1. A [`TcpStream`] is registered with `Poll`.
182 /// 2. The socket receives 2kb of data.
183 /// 3. A call to [`Poll::poll`] returns the token associated with the socket
184 ///    indicating readable readiness.
185 /// 4. 2kb is read from the socket.
186 /// 5. Another call to read is issued and [`WouldBlock`] is returned
187 /// 6. The socket receives another 2kb of data.
188 /// 7. Another call to [`Poll::poll`] is made.
189 ///
190 /// Assuming the socket was registered with `Poll` with the [`edge`] and
191 /// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
192 /// is because, [`oneshot`] tells `Poll` to disable events for the socket after
193 /// returning an event.
194 ///
195 /// In order to receive the event for the data received in step 6, the socket
196 /// would need to be reregistered using [`reregister`].
197 ///
198 /// [`PollOpt`]: struct.PollOpt.html
199 /// [`edge`]: struct.PollOpt.html#method.edge
200 /// [`level`]: struct.PollOpt.html#method.level
201 /// [`Poll::poll`]: struct.Poll.html#method.poll
202 /// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
203 /// [`Evented`]: event/trait.Evented.html
204 /// [`TcpStream`]: tcp/struct.TcpStream.html
205 /// [`reregister`]: #method.reregister
206 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
207 ///
208 /// # Portability
209 ///
210 /// Using `Poll` provides a portable interface across supported platforms as
211 /// long as the caller takes the following into consideration:
212 ///
213 /// ### Spurious events
214 ///
215 /// [`Poll::poll`] may return readiness events even if the associated
216 /// [`Evented`] handle is not actually ready. Given the same code, this may
217 /// happen more on some platforms than others. It is important to never assume
218 /// that, just because a readiness notification was received, that the
219 /// associated operation will succeed as well.
220 ///
221 /// If operation fails with [`WouldBlock`], then the caller should not treat
222 /// this as an error, but instead should wait until another readiness event is
223 /// received.
224 ///
225 /// ### Draining readiness
226 ///
227 /// When using edge-triggered mode, once a readiness event is received, the
228 /// corresponding operation must be performed repeatedly until it returns
229 /// [`WouldBlock`]. Unless this is done, there is no guarantee that another
230 /// readiness event will be delivered, even if further data is received for the
231 /// [`Evented`] handle.
232 ///
233 /// For example, in the first scenario described above, after step 5, even if
234 /// the socket receives more data there is no guarantee that another readiness
235 /// event will be delivered.
236 ///
237 /// ### Readiness operations
238 ///
239 /// The only readiness operations that are guaranteed to be present on all
240 /// supported platforms are [`readable`] and [`writable`]. All other readiness
241 /// operations may have false negatives and as such should be considered
242 /// **hints**. This means that if a socket is registered with [`readable`],
243 /// [`error`], and [`hup`] interest, and either an error or hup is received, a
244 /// readiness event will be generated for the socket, but it **may** only
245 /// include `readable` readiness. Also note that, given the potential for
246 /// spurious events, receiving a readiness event with `hup` or `error` doesn't
247 /// actually mean that a `read` on the socket will return a result matching the
248 /// readiness event.
249 ///
250 /// In other words, portable programs that explicitly check for [`hup`] or
251 /// [`error`] readiness should be doing so as an **optimization** and always be
252 /// able to handle an error or HUP situation when performing the actual read
253 /// operation.
254 ///
255 /// [`readable`]: struct.Ready.html#method.readable
256 /// [`writable`]: struct.Ready.html#method.writable
257 /// [`error`]: unix/struct.UnixReady.html#method.error
258 /// [`hup`]: unix/struct.UnixReady.html#method.hup
259 ///
260 /// ### Registering handles
261 ///
262 /// Unless otherwise noted, it should be assumed that types implementing
263 /// [`Evented`] will never become ready unless they are registered with `Poll`.
264 ///
265 /// For example:
266 ///
267 /// ```
268 /// # use std::error::Error;
269 /// # fn try_main() -> Result<(), Box<Error>> {
270 /// use mio::{Poll, Ready, PollOpt, Token};
271 /// use mio::net::TcpStream;
272 /// use std::time::Duration;
273 /// use std::thread;
274 ///
275 /// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
276 ///
277 /// thread::sleep(Duration::from_secs(1));
278 ///
279 /// let poll = Poll::new()?;
280 ///
281 /// // The connect is not guaranteed to have started until it is registered at
282 /// // this point
283 /// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
284 /// #     Ok(())
285 /// # }
286 /// #
287 /// # fn main() {
288 /// #     try_main().unwrap();
289 /// # }
290 /// ```
291 ///
292 /// # Implementation notes
293 ///
294 /// `Poll` is backed by the selector provided by the operating system.
295 ///
296 /// |      OS    |  Selector |
297 /// |------------|-----------|
298 /// | Linux      | [epoll]   |
299 /// | OS X, iOS  | [kqueue]  |
300 /// | Windows    | [IOCP]    |
301 /// | FreeBSD    | [kqueue]  |
302 /// | Android    | [epoll]   |
303 ///
304 /// On all supported platforms, socket operations are handled by using the
305 /// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
306 /// accessing other features provided by individual system selectors. For
307 /// example, Linux's [`signalfd`] feature can be used by registering the FD with
308 /// `Poll` via [`EventedFd`].
309 ///
310 /// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
311 /// direct call to the system selector. However, [IOCP] uses a completion model
312 /// instead of a readiness model. In this case, `Poll` must adapt the completion
313 /// model Mio's API. While non-trivial, the bridge layer is still quite
314 /// efficient. The most expensive part being calls to `read` and `write` require
315 /// data to be copied into an intermediate buffer before it is passed to the
316 /// kernel.
317 ///
318 /// Notifications generated by [`SetReadiness`] are handled by an internal
319 /// readiness queue. A single call to [`Poll::poll`] will collect events from
320 /// both from the system selector and the internal readiness queue.
321 ///
322 /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
323 /// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
324 /// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
325 /// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
326 /// [`EventedFd`]: unix/struct.EventedFd.html
327 /// [`SetReadiness`]: struct.SetReadiness.html
328 /// [`Poll::poll`]: struct.Poll.html#method.poll
329 pub struct Poll {
330     // Platform specific IO selector
331     selector: sys::Selector,
332 
333     // Custom readiness queue
334     readiness_queue: ReadinessQueue,
335 
336     // Use an atomic to first check if a full lock will be required. This is a
337     // fast-path check for single threaded cases avoiding the extra syscall
338     lock_state: AtomicUsize,
339 
340     // Sequences concurrent calls to `Poll::poll`
341     lock: Mutex<()>,
342 
343     // Wakeup the next waiter
344     condvar: Condvar,
345 }
346 
347 /// Handle to a user space `Poll` registration.
348 ///
349 /// `Registration` allows implementing [`Evented`] for types that cannot work
350 /// with the [system selector]. A `Registration` is always paired with a
351 /// `SetReadiness`, which allows updating the registration's readiness state.
352 /// When [`set_readiness`] is called and the `Registration` is associated with a
353 /// [`Poll`] instance, a readiness event will be created and eventually returned
354 /// by [`poll`].
355 ///
356 /// A `Registration` / `SetReadiness` pair is created by calling
357 /// [`Registration::new2`]. At this point, the registration is not being
358 /// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
359 /// result in any readiness notifications.
360 ///
361 /// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
362 /// the same [`register`], [`reregister`], and [`deregister`] functions used
363 /// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
364 /// changes result in readiness events being dispatched to the [`Poll`] instance
365 /// with which `Registration` is registered.
366 ///
367 /// **Note**, before using `Registration` be sure to read the
368 /// [`set_readiness`] documentation and the [portability] notes. The
369 /// guarantees offered by `Registration` may be weaker than expected.
370 ///
371 /// For high level documentation, see [`Poll`].
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// use mio::{Ready, Registration, Poll, PollOpt, Token};
377 /// use mio::event::Evented;
378 ///
379 /// use std::io;
380 /// use std::time::Instant;
381 /// use std::thread;
382 ///
383 /// pub struct Deadline {
384 ///     when: Instant,
385 ///     registration: Registration,
386 /// }
387 ///
388 /// impl Deadline {
389 ///     pub fn new(when: Instant) -> Deadline {
390 ///         let (registration, set_readiness) = Registration::new2();
391 ///
392 ///         thread::spawn(move || {
393 ///             let now = Instant::now();
394 ///
395 ///             if now < when {
396 ///                 thread::sleep(when - now);
397 ///             }
398 ///
399 ///             set_readiness.set_readiness(Ready::readable());
400 ///         });
401 ///
402 ///         Deadline {
403 ///             when: when,
404 ///             registration: registration,
405 ///         }
406 ///     }
407 ///
408 ///     pub fn is_elapsed(&self) -> bool {
409 ///         Instant::now() >= self.when
410 ///     }
411 /// }
412 ///
413 /// impl Evented for Deadline {
414 ///     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
415 ///         -> io::Result<()>
416 ///     {
417 ///         self.registration.register(poll, token, interest, opts)
418 ///     }
419 ///
420 ///     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
421 ///         -> io::Result<()>
422 ///     {
423 ///         self.registration.reregister(poll, token, interest, opts)
424 ///     }
425 ///
426 ///     fn deregister(&self, poll: &Poll) -> io::Result<()> {
427 ///         poll.deregister(&self.registration)
428 ///     }
429 /// }
430 /// ```
431 ///
432 /// [system selector]: struct.Poll.html#implementation-notes
433 /// [`Poll`]: struct.Poll.html
434 /// [`Registration::new2`]: struct.Registration.html#method.new2
435 /// [`Evented`]: event/trait.Evented.html
436 /// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
437 /// [`register`]: struct.Poll.html#method.register
438 /// [`reregister`]: struct.Poll.html#method.reregister
439 /// [`deregister`]: struct.Poll.html#method.deregister
440 /// [portability]: struct.Poll.html#portability
441 pub struct Registration {
442     inner: RegistrationInner,
443 }
444 
445 unsafe impl Send for Registration {}
446 unsafe impl Sync for Registration {}
447 
448 /// Updates the readiness state of the associated `Registration`.
449 ///
450 /// See [`Registration`] for more documentation on using `SetReadiness` and
451 /// [`Poll`] for high level polling documentation.
452 ///
453 /// [`Poll`]: struct.Poll.html
454 /// [`Registration`]: struct.Registration.html
455 #[derive(Clone)]
456 pub struct SetReadiness {
457     inner: RegistrationInner,
458 }
459 
460 unsafe impl Send for SetReadiness {}
461 unsafe impl Sync for SetReadiness {}
462 
463 /// Used to associate an IO type with a Selector
464 #[derive(Debug)]
465 pub struct SelectorId {
466     id: AtomicUsize,
467 }
468 
469 struct RegistrationInner {
470     // Unsafe pointer to the registration's node. The node is ref counted. This
471     // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
472     // handle though it isn't stored anywhere. In other words, `Poll::poll`
473     // needs to decrement the ref count before the node is freed.
474     node: *mut ReadinessNode,
475 }
476 
477 #[derive(Clone)]
478 struct ReadinessQueue {
479     inner: Arc<ReadinessQueueInner>,
480 }
481 
482 unsafe impl Send for ReadinessQueue {}
483 unsafe impl Sync for ReadinessQueue {}
484 
485 struct ReadinessQueueInner {
486     // Used to wake up `Poll` when readiness is set in another thread.
487     awakener: sys::Awakener,
488 
489     // Head of the MPSC queue used to signal readiness to `Poll::poll`.
490     head_readiness: AtomicPtr<ReadinessNode>,
491 
492     // Tail of the readiness queue.
493     //
494     // Only accessed by Poll::poll. Coordination will be handled by the poll fn
495     tail_readiness: UnsafeCell<*mut ReadinessNode>,
496 
497     // Fake readiness node used to punctuate the end of the readiness queue.
498     // Before attempting to read from the queue, this node is inserted in order
499     // to partition the queue between nodes that are "owned" by the dequeue end
500     // and nodes that will be pushed on by producers.
501     end_marker: Box<ReadinessNode>,
502 
503     // Similar to `end_marker`, but this node signals to producers that `Poll`
504     // has gone to sleep and must be woken up.
505     sleep_marker: Box<ReadinessNode>,
506 
507     // Similar to `end_marker`, but the node signals that the queue is closed.
508     // This happens when `ReadyQueue` is dropped and signals to producers that
509     // the nodes should no longer be pushed into the queue.
510     closed_marker: Box<ReadinessNode>,
511 }
512 
513 /// Node shared by a `Registration` / `SetReadiness` pair as well as the node
514 /// queued into the MPSC channel.
515 struct ReadinessNode {
516     // Node state, see struct docs for `ReadinessState`
517     //
518     // This variable is the primary point of coordination between all the
519     // various threads concurrently accessing the node.
520     state: AtomicState,
521 
522     // The registration token cannot fit into the `state` variable, so it is
523     // broken out here. In order to atomically update both the state and token
524     // we have to jump through a few hoops.
525     //
526     // First, `state` includes `token_read_pos` and `token_write_pos`. These can
527     // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
528     // the token slot that contains the most up to date registration token.
529     // `token_read_pos` is the token slot that `poll` is currently reading from.
530     //
531     // When a call to `update` includes a different token than the one currently
532     // associated with the registration (token_write_pos), first an unused token
533     // slot is found. The unused slot is the one not represented by
534     // `token_read_pos` OR `token_write_pos`. The new token is written to this
535     // slot, then `state` is updated with the new `token_write_pos` value. This
536     // requires that there is only a *single* concurrent call to `update`.
537     //
538     // When `poll` reads a node state, it checks that `token_read_pos` matches
539     // `token_write_pos`. If they do not match, then it atomically updates
540     // `state` such that `token_read_pos` is set to `token_write_pos`. It will
541     // then read the token at the newly updated `token_read_pos`.
542     token_0: UnsafeCell<Token>,
543     token_1: UnsafeCell<Token>,
544     token_2: UnsafeCell<Token>,
545 
546     // Used when the node is queued in the readiness linked list. Accessing
547     // this field requires winning the "queue" lock
548     next_readiness: AtomicPtr<ReadinessNode>,
549 
550     // Ensures that there is only one concurrent call to `update`.
551     //
552     // Each call to `update` will attempt to swap `update_lock` from `false` to
553     // `true`. If the CAS succeeds, the thread has obtained the update lock. If
554     // the CAS fails, then the `update` call returns immediately and the update
555     // is discarded.
556     update_lock: AtomicBool,
557 
558     // Pointer to Arc<ReadinessQueueInner>
559     readiness_queue: AtomicPtr<()>,
560 
561     // Tracks the number of `ReadyRef` pointers
562     ref_count: AtomicUsize,
563 }
564 
565 /// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
566 /// atomic variable handles encoding / decoding `ReadinessState` values.
567 struct AtomicState {
568     inner: AtomicUsize,
569 }
570 
571 const MASK_2: usize = 4 - 1;
572 const MASK_4: usize = 16 - 1;
573 const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
574 const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
575 
576 const READINESS_SHIFT: usize = 0;
577 const INTEREST_SHIFT: usize = 4;
578 const POLL_OPT_SHIFT: usize = 8;
579 const TOKEN_RD_SHIFT: usize = 12;
580 const TOKEN_WR_SHIFT: usize = 14;
581 const QUEUED_SHIFT: usize = 16;
582 const DROPPED_SHIFT: usize = 17;
583 
584 /// Tracks all state for a single `ReadinessNode`. The state is packed into a
585 /// `usize` variable from low to high bit as follows:
586 ///
587 /// 4 bits: Registration current readiness
588 /// 4 bits: Registration interest
589 /// 4 bits: Poll options
590 /// 2 bits: Token position currently being read from by `poll`
591 /// 2 bits: Token position last written to by `update`
592 /// 1 bit:  Queued flag, set when node is being pushed into MPSC queue.
593 /// 1 bit:  Dropped flag, set when all `Registration` handles have been dropped.
594 #[derive(Debug, Copy, Clone, Eq, PartialEq)]
595 struct ReadinessState(usize);
596 
597 /// Returned by `dequeue_node`. Represents the different states as described by
598 /// the queue documentation on 1024cores.net.
599 enum Dequeue {
600     Data(*mut ReadinessNode),
601     Empty,
602     Inconsistent,
603 }
604 
605 const AWAKEN: Token = Token(usize::MAX);
606 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
607 
608 /*
609  *
610  * ===== Poll =====
611  *
612  */
613 
614 impl Poll {
615     /// Return a new `Poll` handle.
616     ///
617     /// This function will make a syscall to the operating system to create the
618     /// system selector. If this syscall fails, `Poll::new` will return with the
619     /// error.
620     ///
621     /// See [struct] level docs for more details.
622     ///
623     /// [struct]: struct.Poll.html
624     ///
625     /// # Examples
626     ///
627     /// ```
628     /// # use std::error::Error;
629     /// # fn try_main() -> Result<(), Box<Error>> {
630     /// use mio::{Poll, Events};
631     /// use std::time::Duration;
632     ///
633     /// let poll = match Poll::new() {
634     ///     Ok(poll) => poll,
635     ///     Err(e) => panic!("failed to create Poll instance; err={:?}", e),
636     /// };
637     ///
638     /// // Create a structure to receive polled events
639     /// let mut events = Events::with_capacity(1024);
640     ///
641     /// // Wait for events, but none will be received because no `Evented`
642     /// // handles have been registered with this `Poll` instance.
643     /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
644     /// assert_eq!(n, 0);
645     /// #     Ok(())
646     /// # }
647     /// #
648     /// # fn main() {
649     /// #     try_main().unwrap();
650     /// # }
651     /// ```
new() -> io::Result<Poll>652     pub fn new() -> io::Result<Poll> {
653         is_send::<Poll>();
654         is_sync::<Poll>();
655 
656         let poll = Poll {
657             selector: sys::Selector::new()?,
658             readiness_queue: ReadinessQueue::new()?,
659             lock_state: AtomicUsize::new(0),
660             lock: Mutex::new(()),
661             condvar: Condvar::new(),
662         };
663 
664         // Register the notification wakeup FD with the IO poller
665         poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?;
666 
667         Ok(poll)
668     }
669 
670     /// Register an `Evented` handle with the `Poll` instance.
671     ///
672     /// Once registered, the `Poll` instance will monitor the `Evented` handle
673     /// for readiness state changes. When it notices a state change, it will
674     /// return a readiness event for the handle the next time [`poll`] is
675     /// called.
676     ///
677     /// See the [`struct`] docs for a high level overview.
678     ///
679     /// # Arguments
680     ///
681     /// `handle: &E: Evented`: This is the handle that the `Poll` instance
682     /// should monitor for readiness state changes.
683     ///
684     /// `token: Token`: The caller picks a token to associate with the socket.
685     /// When [`poll`] returns an event for the handle, this token is included.
686     /// This allows the caller to map the event to its handle. The token
687     /// associated with the `Evented` handle can be changed at any time by
688     /// calling [`reregister`].
689     ///
690     /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
691     /// usage.
692     ///
693     /// See documentation on [`Token`] for an example showing how to pick
694     /// [`Token`] values.
695     ///
696     /// `interest: Ready`: Specifies which operations `Poll` should monitor for
697     /// readiness. `Poll` will only return readiness events for operations
698     /// specified by this argument.
699     ///
700     /// If a socket is registered with readable interest and the socket becomes
701     /// writable, no event will be returned from [`poll`].
702     ///
703     /// The readiness interest for an `Evented` handle can be changed at any
704     /// time by calling [`reregister`].
705     ///
706     /// `opts: PollOpt`: Specifies the registration options. The most common
707     /// options being [`level`] for level-triggered events, [`edge`] for
708     /// edge-triggered events, and [`oneshot`].
709     ///
710     /// The registration options for an `Evented` handle can be changed at any
711     /// time by calling [`reregister`].
712     ///
713     /// # Notes
714     ///
715     /// Unless otherwise specified, the caller should assume that once an
716     /// `Evented` handle is registered with a `Poll` instance, it is bound to
717     /// that `Poll` instance for the lifetime of the `Evented` handle. This
718     /// remains true even if the `Evented` handle is deregistered from the poll
719     /// instance using [`deregister`].
720     ///
721     /// This function is **thread safe**. It can be called concurrently from
722     /// multiple threads.
723     ///
724     /// [`struct`]: #
725     /// [`reregister`]: #method.reregister
726     /// [`deregister`]: #method.deregister
727     /// [`poll`]: #method.poll
728     /// [`level`]: struct.PollOpt.html#method.level
729     /// [`edge`]: struct.PollOpt.html#method.edge
730     /// [`oneshot`]: struct.PollOpt.html#method.oneshot
731     /// [`Token`]: struct.Token.html
732     ///
733     /// # Examples
734     ///
735     /// ```
736     /// # use std::error::Error;
737     /// # fn try_main() -> Result<(), Box<Error>> {
738     /// use mio::{Events, Poll, Ready, PollOpt, Token};
739     /// use mio::net::TcpStream;
740     /// use std::time::{Duration, Instant};
741     ///
742     /// let poll = Poll::new()?;
743     /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
744     ///
745     /// // Register the socket with `poll`
746     /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
747     ///
748     /// let mut events = Events::with_capacity(1024);
749     /// let start = Instant::now();
750     /// let timeout = Duration::from_millis(500);
751     ///
752     /// loop {
753     ///     let elapsed = start.elapsed();
754     ///
755     ///     if elapsed >= timeout {
756     ///         // Connection timed out
757     ///         return Ok(());
758     ///     }
759     ///
760     ///     let remaining = timeout - elapsed;
761     ///     poll.poll(&mut events, Some(remaining))?;
762     ///
763     ///     for event in &events {
764     ///         if event.token() == Token(0) {
765     ///             // Something (probably) happened on the socket.
766     ///             return Ok(());
767     ///         }
768     ///     }
769     /// }
770     /// #     Ok(())
771     /// # }
772     /// #
773     /// # fn main() {
774     /// #     try_main().unwrap();
775     /// # }
776     /// ```
register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented777     pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
778         where E: Evented
779     {
780         validate_args(token)?;
781 
782         /*
783          * Undefined behavior:
784          * - Reusing a token with a different `Evented` without deregistering
785          * (or closing) the original `Evented`.
786          */
787         trace!("registering with poller");
788 
789         // Register interests for this socket
790         handle.register(self, token, interest, opts)?;
791 
792         Ok(())
793     }
794 
795     /// Re-register an `Evented` handle with the `Poll` instance.
796     ///
797     /// Re-registering an `Evented` handle allows changing the details of the
798     /// registration. Specifically, it allows updating the associated `token`,
799     /// `interest`, and `opts` specified in previous `register` and `reregister`
800     /// calls.
801     ///
802     /// The `reregister` arguments fully override the previous values. In other
803     /// words, if a socket is registered with [`readable`] interest and the call
804     /// to `reregister` specifies [`writable`], then read interest is no longer
805     /// requested for the handle.
806     ///
807     /// The `Evented` handle must have previously been registered with this
808     /// instance of `Poll` otherwise the call to `reregister` will return with
809     /// an error.
810     ///
811     /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
812     /// usage.
813     ///
814     /// See the [`register`] documentation for details about the function
815     /// arguments and see the [`struct`] docs for a high level overview of
816     /// polling.
817     ///
818     /// # Examples
819     ///
820     /// ```
821     /// # use std::error::Error;
822     /// # fn try_main() -> Result<(), Box<Error>> {
823     /// use mio::{Poll, Ready, PollOpt, Token};
824     /// use mio::net::TcpStream;
825     ///
826     /// let poll = Poll::new()?;
827     /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
828     ///
829     /// // Register the socket with `poll`, requesting readable
830     /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
831     ///
832     /// // Reregister the socket specifying a different token and write interest
833     /// // instead. `PollOpt::edge()` must be specified even though that value
834     /// // is not being changed.
835     /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
836     /// #     Ok(())
837     /// # }
838     /// #
839     /// # fn main() {
840     /// #     try_main().unwrap();
841     /// # }
842     /// ```
843     ///
844     /// [`struct`]: #
845     /// [`register`]: #method.register
846     /// [`readable`]: struct.Ready.html#method.readable
847     /// [`writable`]: struct.Ready.html#method.writable
reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented848     pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
849         where E: Evented
850     {
851         validate_args(token)?;
852 
853         trace!("registering with poller");
854 
855         // Register interests for this socket
856         handle.reregister(self, token, interest, opts)?;
857 
858         Ok(())
859     }
860 
861     /// Deregister an `Evented` handle with the `Poll` instance.
862     ///
863     /// When an `Evented` handle is deregistered, the `Poll` instance will
864     /// no longer monitor it for readiness state changes. Unlike disabling
865     /// handles with oneshot, deregistering clears up any internal resources
866     /// needed to track the handle.
867     ///
868     /// A handle can be passed back to `register` after it has been
869     /// deregistered; however, it must be passed back to the **same** `Poll`
870     /// instance.
871     ///
872     /// `Evented` handles are automatically deregistered when they are dropped.
873     /// It is common to never need to explicitly call `deregister`.
874     ///
875     /// # Examples
876     ///
877     /// ```
878     /// # use std::error::Error;
879     /// # fn try_main() -> Result<(), Box<Error>> {
880     /// use mio::{Events, Poll, Ready, PollOpt, Token};
881     /// use mio::net::TcpStream;
882     /// use std::time::Duration;
883     ///
884     /// let poll = Poll::new()?;
885     /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
886     ///
887     /// // Register the socket with `poll`
888     /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
889     ///
890     /// poll.deregister(&socket)?;
891     ///
892     /// let mut events = Events::with_capacity(1024);
893     ///
894     /// // Set a timeout because this poll should never receive any events.
895     /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
896     /// assert_eq!(0, n);
897     /// #     Ok(())
898     /// # }
899     /// #
900     /// # fn main() {
901     /// #     try_main().unwrap();
902     /// # }
903     /// ```
deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()> where E: Evented904     pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
905         where E: Evented
906     {
907         trace!("deregistering handle with poller");
908 
909         // Deregister interests for this socket
910         handle.deregister(self)?;
911 
912         Ok(())
913     }
914 
915     /// Wait for readiness events
916     ///
917     /// Blocks the current thread and waits for readiness events for any of the
918     /// `Evented` handles that have been registered with this `Poll` instance.
919     /// The function will block until either at least one readiness event has
920     /// been received or `timeout` has elapsed. A `timeout` of `None` means that
921     /// `poll` will block until a readiness event has been received.
922     ///
923     /// The supplied `events` will be cleared and newly received readiness events
924     /// will be pushed onto the end. At most `events.capacity()` events will be
925     /// returned. If there are further pending readiness events, they will be
926     /// returned on the next call to `poll`.
927     ///
928     /// A single call to `poll` may result in multiple readiness events being
929     /// returned for a single `Evented` handle. For example, if a TCP socket
930     /// becomes both readable and writable, it may be possible for a single
931     /// readiness event to be returned with both [`readable`] and [`writable`]
932     /// readiness **OR** two separate events may be returned, one with
933     /// [`readable`] set and one with [`writable`] set.
934     ///
935     /// Note that the `timeout` will be rounded up to the system clock
936     /// granularity (usually 1ms), and kernel scheduling delays mean that
937     /// the blocking interval may be overrun by a small amount.
938     ///
939     /// `poll` returns the number of readiness events that have been pushed into
940     /// `events` or `Err` when an error has been encountered with the system
941     /// selector.  The value returned is deprecated and will be removed in 0.7.0.
942     /// Accessing the events by index is also deprecated.  Events can be
943     /// inserted by other events triggering, thus making sequential access
944     /// problematic.  Use the iterator API instead.  See [`iter`].
945     ///
946     /// See the [struct] level documentation for a higher level discussion of
947     /// polling.
948     ///
949     /// [`readable`]: struct.Ready.html#method.readable
950     /// [`writable`]: struct.Ready.html#method.writable
951     /// [struct]: #
952     /// [`iter`]: struct.Events.html#method.iter
953     ///
954     /// # Examples
955     ///
956     /// A basic example -- establishing a `TcpStream` connection.
957     ///
958     /// ```
959     /// # use std::error::Error;
960     /// # fn try_main() -> Result<(), Box<Error>> {
961     /// use mio::{Events, Poll, Ready, PollOpt, Token};
962     /// use mio::net::TcpStream;
963     ///
964     /// use std::net::{TcpListener, SocketAddr};
965     /// use std::thread;
966     ///
967     /// // Bind a server socket to connect to.
968     /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
969     /// let server = TcpListener::bind(&addr)?;
970     /// let addr = server.local_addr()?.clone();
971     ///
972     /// // Spawn a thread to accept the socket
973     /// thread::spawn(move || {
974     ///     let _ = server.accept();
975     /// });
976     ///
977     /// // Construct a new `Poll` handle as well as the `Events` we'll store into
978     /// let poll = Poll::new()?;
979     /// let mut events = Events::with_capacity(1024);
980     ///
981     /// // Connect the stream
982     /// let stream = TcpStream::connect(&addr)?;
983     ///
984     /// // Register the stream with `Poll`
985     /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
986     ///
987     /// // Wait for the socket to become ready. This has to happens in a loop to
988     /// // handle spurious wakeups.
989     /// loop {
990     ///     poll.poll(&mut events, None)?;
991     ///
992     ///     for event in &events {
993     ///         if event.token() == Token(0) && event.readiness().is_writable() {
994     ///             // The socket connected (probably, it could still be a spurious
995     ///             // wakeup)
996     ///             return Ok(());
997     ///         }
998     ///     }
999     /// }
1000     /// #     Ok(())
1001     /// # }
1002     /// #
1003     /// # fn main() {
1004     /// #     try_main().unwrap();
1005     /// # }
1006     /// ```
1007     ///
1008     /// [struct]: #
poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize>1009     pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1010         self.poll1(events, timeout, false)
1011     }
1012 
1013     /// Like `poll`, but may be interrupted by a signal
1014     ///
1015     /// If `poll` is inturrupted while blocking, it will transparently retry the syscall.  If you
1016     /// want to handle signals yourself, however, use `poll_interruptible`.
poll_interruptible(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize>1017     pub fn poll_interruptible(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1018         self.poll1(events, timeout, true)
1019     }
1020 
poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize>1021     fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
1022         let zero = Some(Duration::from_millis(0));
1023 
1024         // At a high level, the synchronization strategy is to acquire access to
1025         // the critical section by transitioning the atomic from unlocked ->
1026         // locked. If the attempt fails, the thread will wait on the condition
1027         // variable.
1028         //
1029         // # Some more detail
1030         //
1031         // The `lock_state` atomic usize combines:
1032         //
1033         // - locked flag, stored in the least significant bit
1034         // - number of waiting threads, stored in the rest of the bits.
1035         //
1036         // When a thread transitions the locked flag from 0 -> 1, it has
1037         // obtained access to the critical section.
1038         //
1039         // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1040         // This is a fast path for the case when there are no concurrent calls
1041         // to poll, which is very common.
1042         //
1043         // On failure, the mutex is locked, and the thread attempts to increment
1044         // the number of waiting threads component of `lock_state`. If this is
1045         // successfully done while the locked flag is set, then the thread can
1046         // wait on the condition variable.
1047         //
1048         // When a thread exits the critical section, it unsets the locked flag.
1049         // If there are any waiters, which is atomically determined while
1050         // unsetting the locked flag, then the condvar is notified.
1051 
1052         let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1053 
1054         if 0 != curr {
1055             // Enter slower path
1056             let mut lock = self.lock.lock().unwrap();
1057             let mut inc = false;
1058 
1059             loop {
1060                 if curr & 1 == 0 {
1061                     // The lock is currently free, attempt to grab it
1062                     let mut next = curr | 1;
1063 
1064                     if inc {
1065                         // The waiter count has previously been incremented, so
1066                         // decrement it here
1067                         next -= 2;
1068                     }
1069 
1070                     let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1071 
1072                     if actual != curr {
1073                         curr = actual;
1074                         continue;
1075                     }
1076 
1077                     // Lock acquired, break from the loop
1078                     break;
1079                 }
1080 
1081                 if timeout == zero {
1082                     if inc {
1083                         self.lock_state.fetch_sub(2, SeqCst);
1084                     }
1085 
1086                     return Ok(0);
1087                 }
1088 
1089                 // The lock is currently held, so wait for it to become
1090                 // free. If the waiter count hasn't been incremented yet, do
1091                 // so now
1092                 if !inc {
1093                     let next = curr.checked_add(2).expect("overflow");
1094                     let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1095 
1096                     if actual != curr {
1097                         curr = actual;
1098                         continue;
1099                     }
1100 
1101                     // Track that the waiter count has been incremented for
1102                     // this thread and fall through to the condvar waiting
1103                     inc = true;
1104                 }
1105 
1106                 lock = match timeout {
1107                     Some(to) => {
1108                         let now = Instant::now();
1109 
1110                         // Wait to be notified
1111                         let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1112 
1113                         // See how much time was elapsed in the wait
1114                         let elapsed = now.elapsed();
1115 
1116                         // Update `timeout` to reflect how much time is left to
1117                         // wait.
1118                         if elapsed >= to {
1119                             timeout = zero;
1120                         } else {
1121                             // Update the timeout
1122                             timeout = Some(to - elapsed);
1123                         }
1124 
1125                         l
1126                     }
1127                     None => {
1128                         self.condvar.wait(lock).unwrap()
1129                     }
1130                 };
1131 
1132                 // Reload the state
1133                 curr = self.lock_state.load(SeqCst);
1134 
1135                 // Try to lock again...
1136             }
1137         }
1138 
1139         let ret = self.poll2(events, timeout, interruptible);
1140 
1141         // Release the lock
1142         if 1 != self.lock_state.fetch_and(!1, Release) {
1143             // Acquire the mutex
1144             let _lock = self.lock.lock().unwrap();
1145 
1146             // There is at least one waiting thread, so notify one
1147             self.condvar.notify_one();
1148         }
1149 
1150         ret
1151     }
1152 
1153     #[inline]
1154     #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize>1155     fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
1156         // Compute the timeout value passed to the system selector. If the
1157         // readiness queue has pending nodes, we still want to poll the system
1158         // selector for new events, but we don't want to block the thread to
1159         // wait for new events.
1160         if timeout == Some(Duration::from_millis(0)) {
1161             // If blocking is not requested, then there is no need to prepare
1162             // the queue for sleep
1163             //
1164             // The sleep_marker should be removed by readiness_queue.poll().
1165         } else if self.readiness_queue.prepare_for_sleep() {
1166             // The readiness queue is empty. The call to `prepare_for_sleep`
1167             // inserts `sleep_marker` into the queue. This signals to any
1168             // threads setting readiness that the `Poll::poll` is going to
1169             // sleep, so the awakener should be used.
1170         } else {
1171             // The readiness queue is not empty, so do not block the thread.
1172             timeout = Some(Duration::from_millis(0));
1173         }
1174 
1175         loop {
1176             let now = Instant::now();
1177             // First get selector events
1178             let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1179             match res {
1180                 Ok(true) => {
1181                     // Some awakeners require reading from a FD.
1182                     self.readiness_queue.inner.awakener.cleanup();
1183                     break;
1184                 }
1185                 Ok(false) => break,
1186                 Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
1187                     // Interrupted by a signal; update timeout if necessary and retry
1188                     if let Some(to) = timeout {
1189                         let elapsed = now.elapsed();
1190                         if elapsed >= to {
1191                             break;
1192                         } else {
1193                             timeout = Some(to - elapsed);
1194                         }
1195                     }
1196                 }
1197                 Err(e) => return Err(e),
1198             }
1199         }
1200 
1201         // Poll custom event queue
1202         self.readiness_queue.poll(&mut events.inner);
1203 
1204         // Return number of polled events
1205         Ok(events.inner.len())
1206     }
1207 }
1208 
validate_args(token: Token) -> io::Result<()>1209 fn validate_args(token: Token) -> io::Result<()> {
1210     if token == AWAKEN {
1211         return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1212     }
1213 
1214     Ok(())
1215 }
1216 
1217 impl fmt::Debug for Poll {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1218     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1219         fmt.debug_struct("Poll")
1220             .finish()
1221     }
1222 }
1223 
1224 #[cfg(all(unix, not(target_os = "fuchsia")))]
1225 impl AsRawFd for Poll {
as_raw_fd(&self) -> RawFd1226     fn as_raw_fd(&self) -> RawFd {
1227         self.selector.as_raw_fd()
1228     }
1229 }
1230 
1231 /// A collection of readiness events.
1232 ///
1233 /// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1234 /// receive any new readiness events received since the last poll. Usually, a
1235 /// single `Events` instance is created at the same time as a [`Poll`] and
1236 /// reused on each call to [`Poll::poll`].
1237 ///
1238 /// See [`Poll`] for more documentation on polling.
1239 ///
1240 /// # Examples
1241 ///
1242 /// ```
1243 /// # use std::error::Error;
1244 /// # fn try_main() -> Result<(), Box<Error>> {
1245 /// use mio::{Events, Poll};
1246 /// use std::time::Duration;
1247 ///
1248 /// let mut events = Events::with_capacity(1024);
1249 /// let poll = Poll::new()?;
1250 ///
1251 /// assert_eq!(0, events.len());
1252 ///
1253 /// // Register `Evented` handles with `poll`
1254 ///
1255 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1256 ///
1257 /// for event in &events {
1258 ///     println!("event={:?}", event);
1259 /// }
1260 /// #     Ok(())
1261 /// # }
1262 /// #
1263 /// # fn main() {
1264 /// #     try_main().unwrap();
1265 /// # }
1266 /// ```
1267 ///
1268 /// [`Poll::poll`]: struct.Poll.html#method.poll
1269 /// [`Poll`]: struct.Poll.html
1270 pub struct Events {
1271     inner: sys::Events,
1272 }
1273 
1274 /// [`Events`] iterator.
1275 ///
1276 /// This struct is created by the [`iter`] method on [`Events`].
1277 ///
1278 /// # Examples
1279 ///
1280 /// ```
1281 /// # use std::error::Error;
1282 /// # fn try_main() -> Result<(), Box<Error>> {
1283 /// use mio::{Events, Poll};
1284 /// use std::time::Duration;
1285 ///
1286 /// let mut events = Events::with_capacity(1024);
1287 /// let poll = Poll::new()?;
1288 ///
1289 /// // Register handles with `poll`
1290 ///
1291 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1292 ///
1293 /// for event in events.iter() {
1294 ///     println!("event={:?}", event);
1295 /// }
1296 /// #     Ok(())
1297 /// # }
1298 /// #
1299 /// # fn main() {
1300 /// #     try_main().unwrap();
1301 /// # }
1302 /// ```
1303 ///
1304 /// [`Events`]: struct.Events.html
1305 /// [`iter`]: struct.Events.html#method.iter
1306 #[derive(Debug, Clone)]
1307 pub struct Iter<'a> {
1308     inner: &'a Events,
1309     pos: usize,
1310 }
1311 
1312 /// Owned [`Events`] iterator.
1313 ///
1314 /// This struct is created by the `into_iter` method on [`Events`].
1315 ///
1316 /// # Examples
1317 ///
1318 /// ```
1319 /// # use std::error::Error;
1320 /// # fn try_main() -> Result<(), Box<Error>> {
1321 /// use mio::{Events, Poll};
1322 /// use std::time::Duration;
1323 ///
1324 /// let mut events = Events::with_capacity(1024);
1325 /// let poll = Poll::new()?;
1326 ///
1327 /// // Register handles with `poll`
1328 ///
1329 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1330 ///
1331 /// for event in events {
1332 ///     println!("event={:?}", event);
1333 /// }
1334 /// #     Ok(())
1335 /// # }
1336 /// #
1337 /// # fn main() {
1338 /// #     try_main().unwrap();
1339 /// # }
1340 /// ```
1341 /// [`Events`]: struct.Events.html
1342 #[derive(Debug)]
1343 pub struct IntoIter {
1344     inner: Events,
1345     pos: usize,
1346 }
1347 
1348 impl Events {
1349     /// Return a new `Events` capable of holding up to `capacity` events.
1350     ///
1351     /// # Examples
1352     ///
1353     /// ```
1354     /// use mio::Events;
1355     ///
1356     /// let events = Events::with_capacity(1024);
1357     ///
1358     /// assert_eq!(1024, events.capacity());
1359     /// ```
with_capacity(capacity: usize) -> Events1360     pub fn with_capacity(capacity: usize) -> Events {
1361         Events {
1362             inner: sys::Events::with_capacity(capacity),
1363         }
1364     }
1365 
1366     #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
1367     #[doc(hidden)]
get(&self, idx: usize) -> Option<Event>1368     pub fn get(&self, idx: usize) -> Option<Event> {
1369         self.inner.get(idx)
1370     }
1371 
1372     #[doc(hidden)]
1373     #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
len(&self) -> usize1374     pub fn len(&self) -> usize {
1375         self.inner.len()
1376     }
1377 
1378     /// Returns the number of `Event` values that `self` can hold.
1379     ///
1380     /// ```
1381     /// use mio::Events;
1382     ///
1383     /// let events = Events::with_capacity(1024);
1384     ///
1385     /// assert_eq!(1024, events.capacity());
1386     /// ```
capacity(&self) -> usize1387     pub fn capacity(&self) -> usize {
1388         self.inner.capacity()
1389     }
1390 
1391     /// Returns `true` if `self` contains no `Event` values.
1392     ///
1393     /// # Examples
1394     ///
1395     /// ```
1396     /// use mio::Events;
1397     ///
1398     /// let events = Events::with_capacity(1024);
1399     ///
1400     /// assert!(events.is_empty());
1401     /// ```
is_empty(&self) -> bool1402     pub fn is_empty(&self) -> bool {
1403         self.inner.is_empty()
1404     }
1405 
1406     /// Returns an iterator over the `Event` values.
1407     ///
1408     /// # Examples
1409     ///
1410     /// ```
1411     /// # use std::error::Error;
1412     /// # fn try_main() -> Result<(), Box<Error>> {
1413     /// use mio::{Events, Poll};
1414     /// use std::time::Duration;
1415     ///
1416     /// let mut events = Events::with_capacity(1024);
1417     /// let poll = Poll::new()?;
1418     ///
1419     /// // Register handles with `poll`
1420     ///
1421     /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1422     ///
1423     /// for event in events.iter() {
1424     ///     println!("event={:?}", event);
1425     /// }
1426     /// #     Ok(())
1427     /// # }
1428     /// #
1429     /// # fn main() {
1430     /// #     try_main().unwrap();
1431     /// # }
1432     /// ```
iter(&self) -> Iter1433     pub fn iter(&self) -> Iter {
1434         Iter {
1435             inner: self,
1436             pos: 0
1437         }
1438     }
1439 
1440     /// Clearing all `Event` values from container explicitly.
1441     ///
1442     /// # Examples
1443     ///
1444     /// ```
1445     /// # use std::error::Error;
1446     /// # fn try_main() -> Result<(), Box<Error>> {
1447     /// use mio::{Events, Poll};
1448     /// use std::time::Duration;
1449     ///
1450     /// let mut events = Events::with_capacity(1024);
1451     /// let poll = Poll::new()?;
1452     ///
1453     /// // Register handles with `poll`
1454     /// for _ in 0..2 {
1455     ///     events.clear();
1456     ///     poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1457     ///
1458     ///     for event in events.iter() {
1459     ///         println!("event={:?}", event);
1460     ///     }
1461     /// }
1462     /// #     Ok(())
1463     /// # }
1464     /// #
1465     /// # fn main() {
1466     /// #     try_main().unwrap();
1467     /// # }
1468     /// ```
clear(&mut self)1469     pub fn clear(&mut self) {
1470         self.inner.clear();
1471     }
1472 }
1473 
1474 impl<'a> IntoIterator for &'a Events {
1475     type Item = Event;
1476     type IntoIter = Iter<'a>;
1477 
into_iter(self) -> Self::IntoIter1478     fn into_iter(self) -> Self::IntoIter {
1479         self.iter()
1480     }
1481 }
1482 
1483 impl<'a> Iterator for Iter<'a> {
1484     type Item = Event;
1485 
next(&mut self) -> Option<Event>1486     fn next(&mut self) -> Option<Event> {
1487         let ret = self.inner.inner.get(self.pos);
1488         self.pos += 1;
1489         ret
1490     }
1491 }
1492 
1493 impl IntoIterator for Events {
1494     type Item = Event;
1495     type IntoIter = IntoIter;
1496 
into_iter(self) -> Self::IntoIter1497     fn into_iter(self) -> Self::IntoIter {
1498         IntoIter {
1499             inner: self,
1500             pos: 0,
1501         }
1502     }
1503 }
1504 
1505 impl Iterator for IntoIter {
1506     type Item = Event;
1507 
next(&mut self) -> Option<Event>1508     fn next(&mut self) -> Option<Event> {
1509         let ret = self.inner.inner.get(self.pos);
1510         self.pos += 1;
1511         ret
1512     }
1513 }
1514 
1515 impl fmt::Debug for Events {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1516     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1517         f.debug_struct("Events")
1518             .field("capacity", &self.capacity())
1519             .finish()
1520     }
1521 }
1522 
1523 // ===== Accessors for internal usage =====
1524 
selector(poll: &Poll) -> &sys::Selector1525 pub fn selector(poll: &Poll) -> &sys::Selector {
1526     &poll.selector
1527 }
1528 
1529 /*
1530  *
1531  * ===== Registration =====
1532  *
1533  */
1534 
1535 // TODO: get rid of this, windows depends on it for now
1536 #[allow(dead_code)]
new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt) -> (Registration, SetReadiness)1537 pub fn new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt)
1538         -> (Registration, SetReadiness)
1539 {
1540     Registration::new_priv(poll, token, ready, opt)
1541 }
1542 
1543 impl Registration {
1544     /// Create and return a new `Registration` and the associated
1545     /// `SetReadiness`.
1546     ///
1547     /// See [struct] documentation for more detail and [`Poll`]
1548     /// for high level documentation on polling.
1549     ///
1550     /// # Examples
1551     ///
1552     /// ```
1553     /// # use std::error::Error;
1554     /// # fn try_main() -> Result<(), Box<Error>> {
1555     /// use mio::{Events, Ready, Registration, Poll, PollOpt, Token};
1556     /// use std::thread;
1557     ///
1558     /// let (registration, set_readiness) = Registration::new2();
1559     ///
1560     /// thread::spawn(move || {
1561     ///     use std::time::Duration;
1562     ///     thread::sleep(Duration::from_millis(500));
1563     ///
1564     ///     set_readiness.set_readiness(Ready::readable());
1565     /// });
1566     ///
1567     /// let poll = Poll::new()?;
1568     /// poll.register(&registration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1569     ///
1570     /// let mut events = Events::with_capacity(256);
1571     ///
1572     /// loop {
1573     ///     poll.poll(&mut events, None);
1574     ///
1575     ///     for event in &events {
1576     ///         if event.token() == Token(0) && event.readiness().is_readable() {
1577     ///             return Ok(());
1578     ///         }
1579     ///     }
1580     /// }
1581     /// #     Ok(())
1582     /// # }
1583     /// #
1584     /// # fn main() {
1585     /// #     try_main().unwrap();
1586     /// # }
1587     /// ```
1588     /// [struct]: #
1589     /// [`Poll`]: struct.Poll.html
new2() -> (Registration, SetReadiness)1590     pub fn new2() -> (Registration, SetReadiness) {
1591         // Allocate the registration node. The new node will have `ref_count`
1592         // set to 2: one SetReadiness, one Registration.
1593         let node = Box::into_raw(Box::new(ReadinessNode::new(
1594                     ptr::null_mut(), Token(0), Ready::empty(), PollOpt::empty(), 2)));
1595 
1596         let registration = Registration {
1597             inner: RegistrationInner {
1598                 node,
1599             },
1600         };
1601 
1602         let set_readiness = SetReadiness {
1603             inner: RegistrationInner {
1604                 node,
1605             },
1606         };
1607 
1608         (registration, set_readiness)
1609     }
1610 
1611     #[deprecated(since = "0.6.5", note = "use `new2` instead")]
1612     #[cfg(feature = "with-deprecated")]
1613     #[doc(hidden)]
new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> (Registration, SetReadiness)1614     pub fn new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
1615         -> (Registration, SetReadiness)
1616     {
1617         Registration::new_priv(poll, token, interest, opt)
1618     }
1619 
1620     // TODO: Get rid of this (windows depends on it for now)
new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> (Registration, SetReadiness)1621     fn new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
1622         -> (Registration, SetReadiness)
1623     {
1624         is_send::<Registration>();
1625         is_sync::<Registration>();
1626         is_send::<SetReadiness>();
1627         is_sync::<SetReadiness>();
1628 
1629         // Clone handle to the readiness queue, this bumps the ref count
1630         let queue = poll.readiness_queue.inner.clone();
1631 
1632         // Convert to a *mut () pointer
1633         let queue: *mut () = unsafe { mem::transmute(queue) };
1634 
1635         // Allocate the registration node. The new node will have `ref_count`
1636         // set to 3: one SetReadiness, one Registration, and one Poll handle.
1637         let node = Box::into_raw(Box::new(ReadinessNode::new(
1638                     queue, token, interest, opt, 3)));
1639 
1640         let registration = Registration {
1641             inner: RegistrationInner {
1642                 node,
1643             },
1644         };
1645 
1646         let set_readiness = SetReadiness {
1647             inner: RegistrationInner {
1648                 node,
1649             },
1650         };
1651 
1652         (registration, set_readiness)
1653     }
1654 
1655     #[deprecated(since = "0.6.5", note = "use `Evented` impl")]
1656     #[cfg(feature = "with-deprecated")]
1657     #[doc(hidden)]
update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1658     pub fn update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1659         self.inner.update(poll, token, interest, opts)
1660     }
1661 
1662     #[deprecated(since = "0.6.5", note = "use `Poll::deregister` instead")]
1663     #[cfg(feature = "with-deprecated")]
1664     #[doc(hidden)]
deregister(&self, poll: &Poll) -> io::Result<()>1665     pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1666         self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
1667     }
1668 }
1669 
1670 impl Evented for Registration {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1671     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1672         self.inner.update(poll, token, interest, opts)
1673     }
1674 
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1675     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1676         self.inner.update(poll, token, interest, opts)
1677     }
1678 
deregister(&self, poll: &Poll) -> io::Result<()>1679     fn deregister(&self, poll: &Poll) -> io::Result<()> {
1680         self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
1681     }
1682 }
1683 
1684 impl Drop for Registration {
drop(&mut self)1685     fn drop(&mut self) {
1686         // `flag_as_dropped` toggles the `dropped` flag and notifies
1687         // `Poll::poll` to release its handle (which is just decrementing
1688         // the ref count).
1689         if self.inner.state.flag_as_dropped() {
1690             // Can't do anything if the queuing fails
1691             let _ = self.inner.enqueue_with_wakeup();
1692         }
1693     }
1694 }
1695 
1696 impl fmt::Debug for Registration {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1697     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1698         fmt.debug_struct("Registration")
1699             .finish()
1700     }
1701 }
1702 
1703 impl SetReadiness {
1704     /// Returns the registration's current readiness.
1705     ///
1706     /// # Note
1707     ///
1708     /// There is no guarantee that `readiness` establishes any sort of memory
1709     /// ordering. Any concurrent data access must be synchronized using another
1710     /// strategy.
1711     ///
1712     /// # Examples
1713     ///
1714     /// ```
1715     /// # use std::error::Error;
1716     /// # fn try_main() -> Result<(), Box<Error>> {
1717     /// use mio::{Registration, Ready};
1718     ///
1719     /// let (registration, set_readiness) = Registration::new2();
1720     ///
1721     /// assert!(set_readiness.readiness().is_empty());
1722     ///
1723     /// set_readiness.set_readiness(Ready::readable())?;
1724     /// assert!(set_readiness.readiness().is_readable());
1725     /// #     Ok(())
1726     /// # }
1727     /// #
1728     /// # fn main() {
1729     /// #     try_main().unwrap();
1730     /// # }
1731     /// ```
readiness(&self) -> Ready1732     pub fn readiness(&self) -> Ready {
1733         self.inner.readiness()
1734     }
1735 
1736     /// Set the registration's readiness
1737     ///
1738     /// If the associated `Registration` is registered with a [`Poll`] instance
1739     /// and has requested readiness events that include `ready`, then a future
1740     /// call to [`Poll::poll`] will receive a readiness event representing the
1741     /// readiness state change.
1742     ///
1743     /// # Note
1744     ///
1745     /// There is no guarantee that `readiness` establishes any sort of memory
1746     /// ordering. Any concurrent data access must be synchronized using another
1747     /// strategy.
1748     ///
1749     /// There is also no guarantee as to when the readiness event will be
1750     /// delivered to poll. A best attempt will be made to make the delivery in a
1751     /// "timely" fashion. For example, the following is **not** guaranteed to
1752     /// work:
1753     ///
1754     /// ```
1755     /// # use std::error::Error;
1756     /// # fn try_main() -> Result<(), Box<Error>> {
1757     /// use mio::{Events, Registration, Ready, Poll, PollOpt, Token};
1758     ///
1759     /// let poll = Poll::new()?;
1760     /// let (registration, set_readiness) = Registration::new2();
1761     ///
1762     /// poll.register(&registration,
1763     ///               Token(0),
1764     ///               Ready::readable(),
1765     ///               PollOpt::edge())?;
1766     ///
1767     /// // Set the readiness, then immediately poll to try to get the readiness
1768     /// // event
1769     /// set_readiness.set_readiness(Ready::readable())?;
1770     ///
1771     /// let mut events = Events::with_capacity(1024);
1772     /// poll.poll(&mut events, None)?;
1773     ///
1774     /// // There is NO guarantee that the following will work. It is possible
1775     /// // that the readiness event will be delivered at a later time.
1776     /// let event = events.get(0).unwrap();
1777     /// assert_eq!(event.token(), Token(0));
1778     /// assert!(event.readiness().is_readable());
1779     /// #     Ok(())
1780     /// # }
1781     /// #
1782     /// # fn main() {
1783     /// #     try_main().unwrap();
1784     /// # }
1785     /// ```
1786     ///
1787     /// # Examples
1788     ///
1789     /// A simple example, for a more elaborate example, see the [`Evented`]
1790     /// documentation.
1791     ///
1792     /// ```
1793     /// # use std::error::Error;
1794     /// # fn try_main() -> Result<(), Box<Error>> {
1795     /// use mio::{Registration, Ready};
1796     ///
1797     /// let (registration, set_readiness) = Registration::new2();
1798     ///
1799     /// assert!(set_readiness.readiness().is_empty());
1800     ///
1801     /// set_readiness.set_readiness(Ready::readable())?;
1802     /// assert!(set_readiness.readiness().is_readable());
1803     /// #     Ok(())
1804     /// # }
1805     /// #
1806     /// # fn main() {
1807     /// #     try_main().unwrap();
1808     /// # }
1809     /// ```
1810     ///
1811     /// [`Registration`]: struct.Registration.html
1812     /// [`Evented`]: event/trait.Evented.html#examples
1813     /// [`Poll`]: struct.Poll.html
1814     /// [`Poll::poll`]: struct.Poll.html#method.poll
set_readiness(&self, ready: Ready) -> io::Result<()>1815     pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1816         self.inner.set_readiness(ready)
1817     }
1818 }
1819 
1820 impl fmt::Debug for SetReadiness {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1821     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1822         f.debug_struct("SetReadiness")
1823             .finish()
1824     }
1825 }
1826 
1827 impl RegistrationInner {
1828     /// Get the registration's readiness.
readiness(&self) -> Ready1829     fn readiness(&self) -> Ready {
1830         self.state.load(Relaxed).readiness()
1831     }
1832 
1833     /// Set the registration's readiness.
1834     ///
1835     /// This function can be called concurrently by an arbitrary number of
1836     /// SetReadiness handles.
set_readiness(&self, ready: Ready) -> io::Result<()>1837     fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1838         // Load the current atomic state.
1839         let mut state = self.state.load(Acquire);
1840         let mut next;
1841 
1842         loop {
1843             next = state;
1844 
1845             if state.is_dropped() {
1846                 // Node is dropped, no more notifications
1847                 return Ok(());
1848             }
1849 
1850             // Update the readiness
1851             next.set_readiness(ready);
1852 
1853             // If the readiness is not blank, try to obtain permission to
1854             // push the node into the readiness queue.
1855             if !next.effective_readiness().is_empty() {
1856                 next.set_queued();
1857             }
1858 
1859             let actual = self.state.compare_and_swap(state, next, AcqRel);
1860 
1861             if state == actual {
1862                 break;
1863             }
1864 
1865             state = actual;
1866         }
1867 
1868         if !state.is_queued() && next.is_queued() {
1869             // We toggled the queued flag, making us responsible for queuing the
1870             // node in the MPSC readiness queue.
1871             self.enqueue_with_wakeup()?;
1872         }
1873 
1874         Ok(())
1875     }
1876 
1877     /// Update the registration details associated with the node
update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>1878     fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
1879         // First, ensure poll instances match
1880         //
1881         // Load the queue pointer, `Relaxed` is sufficient here as only the
1882         // pointer is being operated on. The actual memory is guaranteed to be
1883         // visible the `poll: &Poll` ref passed as an argument to the function.
1884         let mut queue = self.readiness_queue.load(Relaxed);
1885         let other: &*mut () = unsafe {
1886             &*(&poll.readiness_queue.inner as *const _ as *const *mut ())
1887         };
1888         let other = *other;
1889 
1890         debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
1891 
1892         if queue.is_null() {
1893             // Attempt to set the queue pointer. `Release` ordering synchronizes
1894             // with `Acquire` in `ensure_with_wakeup`.
1895             let actual = self.readiness_queue.compare_and_swap(
1896                 queue, other, Release);
1897 
1898             if actual.is_null() {
1899                 // The CAS succeeded, this means that the node's ref count
1900                 // should be incremented to reflect that the `poll` function
1901                 // effectively owns the node as well.
1902                 //
1903                 // `Relaxed` ordering used for the same reason as in
1904                 // RegistrationInner::clone
1905                 self.ref_count.fetch_add(1, Relaxed);
1906 
1907                 // Note that the `queue` reference stored in our
1908                 // `readiness_queue` field is intended to be a strong reference,
1909                 // so now that we've successfully claimed the reference we bump
1910                 // the refcount here.
1911                 //
1912                 // Down below in `release_node` when we deallocate this
1913                 // `RegistrationInner` is where we'll transmute this back to an
1914                 // arc and decrement the reference count.
1915                 mem::forget(poll.readiness_queue.clone());
1916             } else {
1917                 // The CAS failed, another thread set the queue pointer, so ensure
1918                 // that the pointer and `other` match
1919                 if actual != other {
1920                     return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
1921                 }
1922             }
1923 
1924             queue = other;
1925         } else if queue != other {
1926             return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
1927         }
1928 
1929         unsafe {
1930             let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1931             debug_assert_eq!(queue as usize, *actual);
1932         }
1933 
1934         // The `update_lock` atomic is used as a flag ensuring only a single
1935         // thread concurrently enters the `update` critical section. Any
1936         // concurrent calls to update are discarded. If coordinated updates are
1937         // required, the Mio user is responsible for handling that.
1938         //
1939         // Acquire / Release ordering is used on `update_lock` to ensure that
1940         // data access to the `token_*` variables are scoped to the critical
1941         // section.
1942 
1943         // Acquire the update lock.
1944         if self.update_lock.compare_and_swap(false, true, Acquire) {
1945             // The lock is already held. Discard the update
1946             return Ok(());
1947         }
1948 
1949         // Relaxed ordering is acceptable here as the only memory that needs to
1950         // be visible as part of the update are the `token_*` variables, and
1951         // ordering has already been handled by the `update_lock` access.
1952         let mut state = self.state.load(Relaxed);
1953         let mut next;
1954 
1955         // Read the current token, again this memory has been ordered by the
1956         // acquire on `update_lock`.
1957         let curr_token_pos = state.token_write_pos();
1958         let curr_token = unsafe { self::token(self, curr_token_pos) };
1959 
1960         let mut next_token_pos = curr_token_pos;
1961 
1962         // If the `update` call is changing the token, then compute the next
1963         // available token slot and write the token there.
1964         //
1965         // Note that this computation is happening *outside* of the
1966         // compare-and-swap loop. The update lock ensures that only a single
1967         // thread could be mutating the write_token_position, so the
1968         // `next_token_pos` will never need to be recomputed even if
1969         // `token_read_pos` concurrently changes. This is because
1970         // `token_read_pos` can ONLY concurrently change to the current value of
1971         // `token_write_pos`, so `next_token_pos` will always remain valid.
1972         if token != curr_token {
1973             next_token_pos = state.next_token_pos();
1974 
1975             // Update the token
1976             match next_token_pos {
1977                 0 => unsafe { *self.token_0.get() = token },
1978                 1 => unsafe { *self.token_1.get() = token },
1979                 2 => unsafe { *self.token_2.get() = token },
1980                 _ => unreachable!(),
1981             }
1982         }
1983 
1984         // Now enter the compare-and-swap loop
1985         loop {
1986             next = state;
1987 
1988             // The node is only dropped once all `Registration` handles are
1989             // dropped. Only `Registration` can call `update`.
1990             debug_assert!(!state.is_dropped());
1991 
1992             // Update the write token position, this will also release the token
1993             // to Poll::poll.
1994             next.set_token_write_pos(next_token_pos);
1995 
1996             // Update readiness and poll opts
1997             next.set_interest(interest);
1998             next.set_poll_opt(opt);
1999 
2000             // If there is effective readiness, the node will need to be queued
2001             // for processing. This exact behavior is still TBD, so we are
2002             // conservative for now and always fire.
2003             //
2004             // See https://github.com/carllerche/mio/issues/535.
2005             if !next.effective_readiness().is_empty() {
2006                 next.set_queued();
2007             }
2008 
2009             // compare-and-swap the state values. Only `Release` is needed here.
2010             // The `Release` ensures that `Poll::poll` will see the token
2011             // update and the update function doesn't care about any other
2012             // memory visibility.
2013             let actual = self.state.compare_and_swap(state, next, Release);
2014 
2015             if actual == state {
2016                 break;
2017             }
2018 
2019             // CAS failed, but `curr_token_pos` should not have changed given
2020             // that we still hold the update lock.
2021             debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2022 
2023             state = actual;
2024         }
2025 
2026         // Release the lock
2027         self.update_lock.store(false, Release);
2028 
2029         if !state.is_queued() && next.is_queued() {
2030             // We are responsible for enqueing the node.
2031             enqueue_with_wakeup(queue, self)?;
2032         }
2033 
2034         Ok(())
2035     }
2036 }
2037 
2038 impl ops::Deref for RegistrationInner {
2039     type Target = ReadinessNode;
2040 
deref(&self) -> &ReadinessNode2041     fn deref(&self) -> &ReadinessNode {
2042         unsafe { &*self.node }
2043     }
2044 }
2045 
2046 impl Clone for RegistrationInner {
clone(&self) -> RegistrationInner2047     fn clone(&self) -> RegistrationInner {
2048         // Using a relaxed ordering is alright here, as knowledge of the
2049         // original reference prevents other threads from erroneously deleting
2050         // the object.
2051         //
2052         // As explained in the [Boost documentation][1], Increasing the
2053         // reference counter can always be done with memory_order_relaxed: New
2054         // references to an object can only be formed from an existing
2055         // reference, and passing an existing reference from one thread to
2056         // another must already provide any required synchronization.
2057         //
2058         // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2059         let old_size = self.ref_count.fetch_add(1, Relaxed);
2060 
2061         // However we need to guard against massive refcounts in case someone
2062         // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2063         // and users will use-after free. We racily saturate to `isize::MAX` on
2064         // the assumption that there aren't ~2 billion threads incrementing
2065         // the reference count at once. This branch will never be taken in
2066         // any realistic program.
2067         //
2068         // We abort because such a program is incredibly degenerate, and we
2069         // don't care to support it.
2070         if old_size & !MAX_REFCOUNT != 0 {
2071             process::abort();
2072         }
2073 
2074         RegistrationInner {
2075             node: self.node,
2076         }
2077     }
2078 }
2079 
2080 impl Drop for RegistrationInner {
drop(&mut self)2081     fn drop(&mut self) {
2082         // Only handles releasing from `Registration` and `SetReadiness`
2083         // handles. Poll has to call this itself.
2084         release_node(self.node);
2085     }
2086 }
2087 
2088 /*
2089  *
2090  * ===== ReadinessQueue =====
2091  *
2092  */
2093 
2094 impl ReadinessQueue {
2095     /// Create a new `ReadinessQueue`.
new() -> io::Result<ReadinessQueue>2096     fn new() -> io::Result<ReadinessQueue> {
2097         is_send::<Self>();
2098         is_sync::<Self>();
2099 
2100         let end_marker = Box::new(ReadinessNode::marker());
2101         let sleep_marker = Box::new(ReadinessNode::marker());
2102         let closed_marker = Box::new(ReadinessNode::marker());
2103 
2104         let ptr = &*end_marker as *const _ as *mut _;
2105 
2106         Ok(ReadinessQueue {
2107             inner: Arc::new(ReadinessQueueInner {
2108                 awakener: sys::Awakener::new()?,
2109                 head_readiness: AtomicPtr::new(ptr),
2110                 tail_readiness: UnsafeCell::new(ptr),
2111                 end_marker,
2112                 sleep_marker,
2113                 closed_marker,
2114             })
2115         })
2116     }
2117 
2118     /// Poll the queue for new events
poll(&self, dst: &mut sys::Events)2119     fn poll(&self, dst: &mut sys::Events) {
2120         // `until` is set with the first node that gets re-enqueued due to being
2121         // set to have level-triggered notifications. This prevents an infinite
2122         // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2123         let mut until = ptr::null_mut();
2124 
2125         if dst.len() == dst.capacity() {
2126             // If `dst` is already full, the readiness queue won't be drained.
2127             // This might result in `sleep_marker` staying in the queue and
2128             // unecessary pipe writes occuring.
2129             self.inner.clear_sleep_marker();
2130         }
2131 
2132         'outer:
2133         while dst.len() < dst.capacity() {
2134             // Dequeue a node. If the queue is in an inconsistent state, then
2135             // stop polling. `Poll::poll` will be called again shortly and enter
2136             // a syscall, which should be enough to enable the other thread to
2137             // finish the queuing process.
2138             let ptr = match unsafe { self.inner.dequeue_node(until) } {
2139                 Dequeue::Empty | Dequeue::Inconsistent => break,
2140                 Dequeue::Data(ptr) => ptr,
2141             };
2142 
2143             let node = unsafe { &*ptr };
2144 
2145             // Read the node state with Acquire ordering. This allows reading
2146             // the token variables.
2147             let mut state = node.state.load(Acquire);
2148             let mut next;
2149             let mut readiness;
2150             let mut opt;
2151 
2152             loop {
2153                 // Build up any changes to the readiness node's state and
2154                 // attempt the CAS at the end
2155                 next = state;
2156 
2157                 // Given that the node was just read from the queue, the
2158                 // `queued` flag should still be set.
2159                 debug_assert!(state.is_queued());
2160 
2161                 // The dropped flag means we need to release the node and
2162                 // perform no further processing on it.
2163                 if state.is_dropped() {
2164                     // Release the node and continue
2165                     release_node(ptr);
2166                     continue 'outer;
2167                 }
2168 
2169                 // Process the node
2170                 readiness = state.effective_readiness();
2171                 opt = state.poll_opt();
2172 
2173                 if opt.is_edge() {
2174                     // Mark the node as dequeued
2175                     next.set_dequeued();
2176 
2177                     if opt.is_oneshot() && !readiness.is_empty() {
2178                         next.disarm();
2179                     }
2180                 } else if readiness.is_empty() {
2181                     next.set_dequeued();
2182                 }
2183 
2184                 // Ensure `token_read_pos` is set to `token_write_pos` so that
2185                 // we read the most up to date token value.
2186                 next.update_token_read_pos();
2187 
2188                 if state == next {
2189                     break;
2190                 }
2191 
2192                 let actual = node.state.compare_and_swap(state, next, AcqRel);
2193 
2194                 if actual == state {
2195                     break;
2196                 }
2197 
2198                 state = actual;
2199             }
2200 
2201             // If the queued flag is still set, then the node must be requeued.
2202             // This typically happens when using level-triggered notifications.
2203             if next.is_queued() {
2204                 if until.is_null() {
2205                     // We never want to see the node again
2206                     until = ptr;
2207                 }
2208 
2209                 // Requeue the node
2210                 self.inner.enqueue_node(node);
2211             }
2212 
2213             if !readiness.is_empty() {
2214                 // Get the token
2215                 let token = unsafe { token(node, next.token_read_pos()) };
2216 
2217                 // Push the event
2218                 dst.push_event(Event::new(readiness, token));
2219             }
2220         }
2221     }
2222 
2223     /// Prepare the queue for the `Poll::poll` thread to block in the system
2224     /// selector. This involves changing `head_readiness` to `sleep_marker`.
2225     /// Returns true if successful and `poll` can block.
prepare_for_sleep(&self) -> bool2226     fn prepare_for_sleep(&self) -> bool {
2227         let end_marker = self.inner.end_marker();
2228         let sleep_marker = self.inner.sleep_marker();
2229 
2230         let tail = unsafe { *self.inner.tail_readiness.get() };
2231 
2232         // If the tail is currently set to the sleep_marker, then check if the
2233         // head is as well. If it is, then the queue is currently ready to
2234         // sleep. If it is not, then the queue is not empty and there should be
2235         // no sleeping.
2236         if tail == sleep_marker {
2237             return self.inner.head_readiness.load(Acquire) == sleep_marker;
2238         }
2239 
2240         // If the tail is not currently set to `end_marker`, then the queue is
2241         // not empty.
2242         if tail != end_marker {
2243             return false;
2244         }
2245 
2246         // The sleep marker is *not* currently in the readiness queue.
2247         //
2248         // The sleep marker is only inserted in this function. It is also only
2249         // inserted in the tail position. This is guaranteed by first checking
2250         // that the end marker is in the tail position, pushing the sleep marker
2251         // after the end marker, then removing the end marker.
2252         //
2253         // Before inserting a node into the queue, the next pointer has to be
2254         // set to null. Again, this is only safe to do when the node is not
2255         // currently in the queue, but we already have ensured this.
2256         self.inner.sleep_marker.next_readiness.store(ptr::null_mut(), Relaxed);
2257 
2258         let actual = self.inner.head_readiness.compare_and_swap(
2259             end_marker, sleep_marker, AcqRel);
2260 
2261         debug_assert!(actual != sleep_marker);
2262 
2263         if actual != end_marker {
2264             // The readiness queue is not empty
2265             return false;
2266         }
2267 
2268         // The current tail should be pointing to `end_marker`
2269         debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2270         // The `end_marker` next pointer should be null
2271         debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2272 
2273         // Update tail pointer.
2274         unsafe { *self.inner.tail_readiness.get() = sleep_marker; }
2275         true
2276     }
2277 }
2278 
2279 impl Drop for ReadinessQueue {
drop(&mut self)2280     fn drop(&mut self) {
2281         // Close the queue by enqueuing the closed node
2282         self.inner.enqueue_node(&*self.inner.closed_marker);
2283 
2284         loop {
2285             // Free any nodes that happen to be left in the readiness queue
2286             let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2287                 Dequeue::Empty => break,
2288                 Dequeue::Inconsistent => {
2289                     // This really shouldn't be possible as all other handles to
2290                     // `ReadinessQueueInner` are dropped, but handle this by
2291                     // spinning I guess?
2292                     continue;
2293                 }
2294                 Dequeue::Data(ptr) => ptr,
2295             };
2296 
2297             let node = unsafe { &*ptr };
2298 
2299             let state = node.state.load(Acquire);
2300 
2301             debug_assert!(state.is_queued());
2302 
2303             release_node(ptr);
2304         }
2305     }
2306 }
2307 
2308 impl ReadinessQueueInner {
wakeup(&self) -> io::Result<()>2309     fn wakeup(&self) -> io::Result<()> {
2310         self.awakener.wakeup()
2311     }
2312 
2313     /// Prepend the given node to the head of the readiness queue. This is done
2314     /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()>2315     fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2316         if self.enqueue_node(node) {
2317             self.wakeup()?;
2318         }
2319 
2320         Ok(())
2321     }
2322 
2323     /// Push the node into the readiness queue
enqueue_node(&self, node: &ReadinessNode) -> bool2324     fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2325         // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2326         let node_ptr = node as *const _ as *mut _;
2327 
2328         // Relaxed used as the ordering is "released" when swapping
2329         // `head_readiness`
2330         node.next_readiness.store(ptr::null_mut(), Relaxed);
2331 
2332         unsafe {
2333             let mut prev = self.head_readiness.load(Acquire);
2334 
2335             loop {
2336                 if prev == self.closed_marker() {
2337                     debug_assert!(node_ptr != self.closed_marker());
2338                     // debug_assert!(node_ptr != self.end_marker());
2339                     debug_assert!(node_ptr != self.sleep_marker());
2340 
2341                     if node_ptr != self.end_marker() {
2342                         // The readiness queue is shutdown, but the enqueue flag was
2343                         // set. This means that we are responsible for decrementing
2344                         // the ready queue's ref count
2345                         debug_assert!(node.ref_count.load(Relaxed) >= 2);
2346                         release_node(node_ptr);
2347                     }
2348 
2349                     return false;
2350                 }
2351 
2352                 let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2353 
2354                 if prev == act {
2355                     break;
2356                 }
2357 
2358                 prev = act;
2359             }
2360 
2361             debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2362 
2363             (*prev).next_readiness.store(node_ptr, Release);
2364 
2365             prev == self.sleep_marker()
2366         }
2367     }
2368 
clear_sleep_marker(&self)2369     fn clear_sleep_marker(&self) {
2370         let end_marker = self.end_marker();
2371         let sleep_marker = self.sleep_marker();
2372 
2373         unsafe {
2374             let tail = *self.tail_readiness.get();
2375 
2376             if tail != self.sleep_marker() {
2377                 return;
2378             }
2379 
2380             // The empty markeer is *not* currently in the readiness queue
2381             // (since the sleep markeris).
2382             self.end_marker.next_readiness.store(ptr::null_mut(), Relaxed);
2383 
2384             let actual = self.head_readiness.compare_and_swap(
2385                 sleep_marker, end_marker, AcqRel);
2386 
2387             debug_assert!(actual != end_marker);
2388 
2389             if actual != sleep_marker {
2390                 // The readiness queue is not empty, we cannot remove the sleep
2391                 // markeer
2392                 return;
2393             }
2394 
2395             // Update the tail pointer.
2396             *self.tail_readiness.get() = end_marker;
2397         }
2398     }
2399 
2400     /// Must only be called in `poll` or `drop`
dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue2401     unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2402         // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2403         // with the modifications mentioned at the top of the file.
2404         let mut tail = *self.tail_readiness.get();
2405         let mut next = (*tail).next_readiness.load(Acquire);
2406 
2407         if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker() {
2408             if next.is_null() {
2409                 // Make sure the sleep marker is removed (as we are no longer
2410                 // sleeping
2411                 self.clear_sleep_marker();
2412 
2413                 return Dequeue::Empty;
2414             }
2415 
2416             *self.tail_readiness.get() = next;
2417             tail = next;
2418             next = (*next).next_readiness.load(Acquire);
2419         }
2420 
2421         // Only need to check `until` at this point. `until` is either null,
2422         // which will never match tail OR it is a node that was pushed by
2423         // the current thread. This means that either:
2424         //
2425         // 1) The queue is inconsistent, which is handled explicitly
2426         // 2) We encounter `until` at this point in dequeue
2427         // 3) we will pop a different node
2428         if tail == until {
2429             return Dequeue::Empty;
2430         }
2431 
2432         if !next.is_null() {
2433             *self.tail_readiness.get() = next;
2434             return Dequeue::Data(tail);
2435         }
2436 
2437         if self.head_readiness.load(Acquire) != tail {
2438             return Dequeue::Inconsistent;
2439         }
2440 
2441         // Push the stub node
2442         self.enqueue_node(&*self.end_marker);
2443 
2444         next = (*tail).next_readiness.load(Acquire);
2445 
2446         if !next.is_null() {
2447             *self.tail_readiness.get() = next;
2448             return Dequeue::Data(tail);
2449         }
2450 
2451         Dequeue::Inconsistent
2452     }
2453 
end_marker(&self) -> *mut ReadinessNode2454     fn end_marker(&self) -> *mut ReadinessNode {
2455         &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2456     }
2457 
sleep_marker(&self) -> *mut ReadinessNode2458     fn sleep_marker(&self) -> *mut ReadinessNode {
2459         &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2460     }
2461 
closed_marker(&self) -> *mut ReadinessNode2462     fn closed_marker(&self) -> *mut ReadinessNode {
2463         &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2464     }
2465 }
2466 
2467 impl ReadinessNode {
2468     /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
new(queue: *mut (), token: Token, interest: Ready, opt: PollOpt, ref_count: usize) -> ReadinessNode2469     fn new(queue: *mut (),
2470            token: Token,
2471            interest: Ready,
2472            opt: PollOpt,
2473            ref_count: usize) -> ReadinessNode
2474     {
2475         ReadinessNode {
2476             state: AtomicState::new(interest, opt),
2477             // Only the first token is set, the others are initialized to 0
2478             token_0: UnsafeCell::new(token),
2479             token_1: UnsafeCell::new(Token(0)),
2480             token_2: UnsafeCell::new(Token(0)),
2481             next_readiness: AtomicPtr::new(ptr::null_mut()),
2482             update_lock: AtomicBool::new(false),
2483             readiness_queue: AtomicPtr::new(queue),
2484             ref_count: AtomicUsize::new(ref_count),
2485         }
2486     }
2487 
marker() -> ReadinessNode2488     fn marker() -> ReadinessNode {
2489         ReadinessNode {
2490             state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2491             token_0: UnsafeCell::new(Token(0)),
2492             token_1: UnsafeCell::new(Token(0)),
2493             token_2: UnsafeCell::new(Token(0)),
2494             next_readiness: AtomicPtr::new(ptr::null_mut()),
2495             update_lock: AtomicBool::new(false),
2496             readiness_queue: AtomicPtr::new(ptr::null_mut()),
2497             ref_count: AtomicUsize::new(0),
2498         }
2499     }
2500 
enqueue_with_wakeup(&self) -> io::Result<()>2501     fn enqueue_with_wakeup(&self) -> io::Result<()> {
2502         let queue = self.readiness_queue.load(Acquire);
2503 
2504         if queue.is_null() {
2505             // Not associated with a queue, nothing to do
2506             return Ok(());
2507         }
2508 
2509         enqueue_with_wakeup(queue, self)
2510     }
2511 }
2512 
enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()>2513 fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2514     debug_assert!(!queue.is_null());
2515     // This is ugly... but we don't want to bump the ref count.
2516     let queue: &Arc<ReadinessQueueInner> = unsafe {
2517         &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>)
2518     };
2519     queue.enqueue_node_with_wakeup(node)
2520 }
2521 
token(node: &ReadinessNode, pos: usize) -> Token2522 unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2523     match pos {
2524         0 => *node.token_0.get(),
2525         1 => *node.token_1.get(),
2526         2 => *node.token_2.get(),
2527         _ => unreachable!(),
2528     }
2529 }
2530 
release_node(ptr: *mut ReadinessNode)2531 fn release_node(ptr: *mut ReadinessNode) {
2532     unsafe {
2533         // `AcqRel` synchronizes with other `release_node` functions and ensures
2534         // that the drop happens after any reads / writes on other threads.
2535         if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2536             return;
2537         }
2538 
2539         let node = Box::from_raw(ptr);
2540 
2541         // Decrement the readiness_queue Arc
2542         let queue = node.readiness_queue.load(Acquire);
2543 
2544         if queue.is_null() {
2545             return;
2546         }
2547 
2548         let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2549     }
2550 }
2551 
2552 impl AtomicState {
new(interest: Ready, opt: PollOpt) -> AtomicState2553     fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2554         let state = ReadinessState::new(interest, opt);
2555 
2556         AtomicState {
2557             inner: AtomicUsize::new(state.into()),
2558         }
2559     }
2560 
2561     /// Loads the current `ReadinessState`
load(&self, order: Ordering) -> ReadinessState2562     fn load(&self, order: Ordering) -> ReadinessState {
2563         self.inner.load(order).into()
2564     }
2565 
2566     /// Stores a state if the current state is the same as `current`.
compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState2567     fn compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState {
2568         self.inner.compare_and_swap(current.into(), new.into(), order).into()
2569     }
2570 
2571     // Returns `true` if the node should be queued
flag_as_dropped(&self) -> bool2572     fn flag_as_dropped(&self) -> bool {
2573         let prev: ReadinessState = self.inner.fetch_or(DROPPED_MASK | QUEUED_MASK, Release).into();
2574         // The flag should not have been previously set
2575         debug_assert!(!prev.is_dropped());
2576 
2577         !prev.is_queued()
2578     }
2579 }
2580 
2581 impl ReadinessState {
2582     // Create a `ReadinessState` initialized with the provided arguments
2583     #[inline]
new(interest: Ready, opt: PollOpt) -> ReadinessState2584     fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2585         let interest = event::ready_as_usize(interest);
2586         let opt = event::opt_as_usize(opt);
2587 
2588         debug_assert!(interest <= MASK_4);
2589         debug_assert!(opt <= MASK_4);
2590 
2591         let mut val = interest << INTEREST_SHIFT;
2592         val |= opt << POLL_OPT_SHIFT;
2593 
2594         ReadinessState(val)
2595     }
2596 
2597     #[inline]
get(self, mask: usize, shift: usize) -> usize2598     fn get(self, mask: usize, shift: usize) -> usize{
2599         (self.0 >> shift) & mask
2600     }
2601 
2602     #[inline]
set(&mut self, val: usize, mask: usize, shift: usize)2603     fn set(&mut self, val: usize, mask: usize, shift: usize) {
2604         self.0 = (self.0 & !(mask << shift)) | (val << shift)
2605     }
2606 
2607     /// Get the readiness
2608     #[inline]
readiness(self) -> Ready2609     fn readiness(self) -> Ready {
2610         let v = self.get(MASK_4, READINESS_SHIFT);
2611         event::ready_from_usize(v)
2612     }
2613 
2614     #[inline]
effective_readiness(self) -> Ready2615     fn effective_readiness(self) -> Ready {
2616         self.readiness() & self.interest()
2617     }
2618 
2619     /// Set the readiness
2620     #[inline]
set_readiness(&mut self, v: Ready)2621     fn set_readiness(&mut self, v: Ready) {
2622         self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2623     }
2624 
2625     /// Get the interest
2626     #[inline]
interest(self) -> Ready2627     fn interest(self) -> Ready {
2628         let v = self.get(MASK_4, INTEREST_SHIFT);
2629         event::ready_from_usize(v)
2630     }
2631 
2632     /// Set the interest
2633     #[inline]
set_interest(&mut self, v: Ready)2634     fn set_interest(&mut self, v: Ready) {
2635         self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2636     }
2637 
2638     #[inline]
disarm(&mut self)2639     fn disarm(&mut self) {
2640         self.set_interest(Ready::empty());
2641     }
2642 
2643     /// Get the poll options
2644     #[inline]
poll_opt(self) -> PollOpt2645     fn poll_opt(self) -> PollOpt {
2646         let v = self.get(MASK_4, POLL_OPT_SHIFT);
2647         event::opt_from_usize(v)
2648     }
2649 
2650     /// Set the poll options
2651     #[inline]
set_poll_opt(&mut self, v: PollOpt)2652     fn set_poll_opt(&mut self, v: PollOpt) {
2653         self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2654     }
2655 
2656     #[inline]
is_queued(self) -> bool2657     fn is_queued(self) -> bool {
2658         self.0 & QUEUED_MASK == QUEUED_MASK
2659     }
2660 
2661     /// Set the queued flag
2662     #[inline]
set_queued(&mut self)2663     fn set_queued(&mut self) {
2664         // Dropped nodes should never be queued
2665         debug_assert!(!self.is_dropped());
2666         self.0 |= QUEUED_MASK;
2667     }
2668 
2669     #[inline]
set_dequeued(&mut self)2670     fn set_dequeued(&mut self) {
2671         debug_assert!(self.is_queued());
2672         self.0 &= !QUEUED_MASK
2673     }
2674 
2675     #[inline]
is_dropped(self) -> bool2676     fn is_dropped(self) -> bool {
2677         self.0 & DROPPED_MASK == DROPPED_MASK
2678     }
2679 
2680     #[inline]
token_read_pos(self) -> usize2681     fn token_read_pos(self) -> usize {
2682         self.get(MASK_2, TOKEN_RD_SHIFT)
2683     }
2684 
2685     #[inline]
token_write_pos(self) -> usize2686     fn token_write_pos(self) -> usize {
2687         self.get(MASK_2, TOKEN_WR_SHIFT)
2688     }
2689 
2690     #[inline]
next_token_pos(self) -> usize2691     fn next_token_pos(self) -> usize {
2692         let rd = self.token_read_pos();
2693         let wr = self.token_write_pos();
2694 
2695         match wr {
2696             0 => {
2697                 match rd {
2698                     1 => 2,
2699                     2 => 1,
2700                     0 => 1,
2701                     _ => unreachable!(),
2702                 }
2703             }
2704             1 => {
2705                 match rd {
2706                     0 => 2,
2707                     2 => 0,
2708                     1 => 2,
2709                     _ => unreachable!(),
2710                 }
2711             }
2712             2 => {
2713                 match rd {
2714                     0 => 1,
2715                     1 => 0,
2716                     2 => 0,
2717                     _ => unreachable!(),
2718                 }
2719             }
2720             _ => unreachable!(),
2721         }
2722     }
2723 
2724     #[inline]
set_token_write_pos(&mut self, val: usize)2725     fn set_token_write_pos(&mut self, val: usize) {
2726         self.set(val, MASK_2, TOKEN_WR_SHIFT);
2727     }
2728 
2729     #[inline]
update_token_read_pos(&mut self)2730     fn update_token_read_pos(&mut self) {
2731         let val = self.token_write_pos();
2732         self.set(val, MASK_2, TOKEN_RD_SHIFT);
2733     }
2734 }
2735 
2736 impl From<ReadinessState> for usize {
from(src: ReadinessState) -> usize2737     fn from(src: ReadinessState) -> usize {
2738         src.0
2739     }
2740 }
2741 
2742 impl From<usize> for ReadinessState {
from(src: usize) -> ReadinessState2743     fn from(src: usize) -> ReadinessState {
2744         ReadinessState(src)
2745     }
2746 }
2747 
is_send<T: Send>()2748 fn is_send<T: Send>() {}
is_sync<T: Sync>()2749 fn is_sync<T: Sync>() {}
2750 
2751 impl SelectorId {
new() -> SelectorId2752     pub fn new() -> SelectorId {
2753         SelectorId {
2754             id: AtomicUsize::new(0),
2755         }
2756     }
2757 
associate_selector(&self, poll: &Poll) -> io::Result<()>2758     pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2759         let selector_id = self.id.load(Ordering::SeqCst);
2760 
2761         if selector_id != 0 && selector_id != poll.selector.id() {
2762             Err(io::Error::new(io::ErrorKind::Other, "socket already registered"))
2763         } else {
2764             self.id.store(poll.selector.id(), Ordering::SeqCst);
2765             Ok(())
2766         }
2767     }
2768 }
2769 
2770 impl Clone for SelectorId {
clone(&self) -> SelectorId2771     fn clone(&self) -> SelectorId {
2772         SelectorId {
2773             id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2774         }
2775     }
2776 }
2777 
2778 #[test]
2779 #[cfg(all(unix, not(target_os = "fuchsia")))]
as_raw_fd()2780 pub fn as_raw_fd() {
2781     let poll = Poll::new().unwrap();
2782     assert!(poll.as_raw_fd() > 0);
2783 }
2784