1 use {sys, Token};
2 use event_imp::{self as event, Ready, Event, Evented, PollOpt};
3 use std::{fmt, io, ptr, usize};
4 use std::cell::UnsafeCell;
5 use std::{mem, ops, isize};
6 #[cfg(all(unix, not(target_os = "fuchsia")))]
7 use std::os::unix::io::AsRawFd;
8 #[cfg(all(unix, not(target_os = "fuchsia")))]
9 use std::os::unix::io::RawFd;
10 use std::process;
11 use std::sync::{Arc, Mutex, Condvar};
12 use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
13 use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
14 use std::time::{Duration, Instant};
15
16 // Poll is backed by two readiness queues. The first is a system readiness queue
17 // represented by `sys::Selector`. The system readiness queue handles events
18 // provided by the system, such as TCP and UDP. The second readiness queue is
19 // implemented in user space by `ReadinessQueue`. It provides a way to implement
20 // purely user space `Evented` types.
21 //
22 // `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
23 // list nodes. This significantly reduces the number of required allocations.
24 // Each `Registration` / `SetReadiness` pair allocates a single readiness node
25 // that is used for the lifetime of the registration.
26 //
27 // The readiness node also includes a single atomic variable, `state` that
28 // tracks most of the state associated with the registration. This includes the
29 // current readiness, interest, poll options, and internal state. When the node
30 // state is mutated, it is queued in the MPSC channel. A call to
31 // `ReadinessQueue::poll` will dequeue and process nodes. The node state can
32 // still be mutated while it is queued in the channel for processing.
33 // Intermediate state values do not matter as long as the final state is
34 // included in the call to `poll`. This is the eventually consistent nature of
35 // the readiness queue.
36 //
37 // The readiness node is ref counted using the `ref_count` field. On creation,
38 // the ref_count is initialized to 3: one `Registration` handle, one
39 // `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
40 // doesn't *always* hold a handle to the node, we don't use the Arc type for
41 // managing ref counts (this is to avoid constantly incrementing and
42 // decrementing the ref count when pushing & popping from the queue). When the
43 // `Registration` handle is dropped, the `dropped` flag is set on the node, then
44 // the node is pushed into the registration queue. When Poll::poll pops the
45 // node, it sees the drop flag is set, and decrements it's ref count.
46 //
47 // The MPSC queue is a modified version of the intrusive MPSC node based queue
48 // described by 1024cores [1].
49 //
50 // The first modification is that two markers are used instead of a single
51 // `stub`. The second marker is a `sleep_marker` which is used to signal to
52 // producers that the consumer is going to sleep. This sleep_marker is only used
53 // when the queue is empty, implying that the only node in the queue is
54 // `end_marker`.
55 //
56 // The second modification is an `until` argument passed to the dequeue
57 // function. When `poll` encounters a level-triggered node, the node will be
58 // immediately pushed back into the queue. In order to avoid an infinite loop,
59 // `poll` before pushing the node, the pointer is saved off and then passed
60 // again as the `until` argument. If the next node to pop is `until`, then
61 // `Dequeue::Empty` is returned.
62 //
63 // [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
64
65
66 /// Polls for readiness events on all registered values.
67 ///
68 /// `Poll` allows a program to monitor a large number of `Evented` types,
69 /// waiting until one or more become "ready" for some class of operations; e.g.
70 /// reading and writing. An `Evented` type is considered ready if it is possible
71 /// to immediately perform a corresponding operation; e.g. [`read`] or
72 /// [`write`].
73 ///
74 /// To use `Poll`, an `Evented` type must first be registered with the `Poll`
75 /// instance using the [`register`] method, supplying readiness interest. The
76 /// readiness interest tells `Poll` which specific operations on the handle to
77 /// monitor for readiness. A `Token` is also passed to the [`register`]
78 /// function. When `Poll` returns a readiness event, it will include this token.
79 /// This associates the event with the `Evented` handle that generated the
80 /// event.
81 ///
82 /// [`read`]: tcp/struct.TcpStream.html#method.read
83 /// [`write`]: tcp/struct.TcpStream.html#method.write
84 /// [`register`]: #method.register
85 ///
86 /// # Examples
87 ///
88 /// A basic example -- establishing a `TcpStream` connection.
89 ///
90 /// ```
91 /// # use std::error::Error;
92 /// # fn try_main() -> Result<(), Box<Error>> {
93 /// use mio::{Events, Poll, Ready, PollOpt, Token};
94 /// use mio::net::TcpStream;
95 ///
96 /// use std::net::{TcpListener, SocketAddr};
97 ///
98 /// // Bind a server socket to connect to.
99 /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
100 /// let server = TcpListener::bind(&addr)?;
101 ///
102 /// // Construct a new `Poll` handle as well as the `Events` we'll store into
103 /// let poll = Poll::new()?;
104 /// let mut events = Events::with_capacity(1024);
105 ///
106 /// // Connect the stream
107 /// let stream = TcpStream::connect(&server.local_addr()?)?;
108 ///
109 /// // Register the stream with `Poll`
110 /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
111 ///
112 /// // Wait for the socket to become ready. This has to happens in a loop to
113 /// // handle spurious wakeups.
114 /// loop {
115 /// poll.poll(&mut events, None)?;
116 ///
117 /// for event in &events {
118 /// if event.token() == Token(0) && event.readiness().is_writable() {
119 /// // The socket connected (probably, it could still be a spurious
120 /// // wakeup)
121 /// return Ok(());
122 /// }
123 /// }
124 /// }
125 /// # Ok(())
126 /// # }
127 /// #
128 /// # fn main() {
129 /// # try_main().unwrap();
130 /// # }
131 /// ```
132 ///
133 /// # Edge-triggered and level-triggered
134 ///
135 /// An [`Evented`] registration may request edge-triggered events or
136 /// level-triggered events. This is done by setting `register`'s
137 /// [`PollOpt`] argument to either [`edge`] or [`level`].
138 ///
139 /// The difference between the two can be described as follows. Supposed that
140 /// this scenario happens:
141 ///
142 /// 1. A [`TcpStream`] is registered with `Poll`.
143 /// 2. The socket receives 2kb of data.
144 /// 3. A call to [`Poll::poll`] returns the token associated with the socket
145 /// indicating readable readiness.
146 /// 4. 1kb is read from the socket.
147 /// 5. Another call to [`Poll::poll`] is made.
148 ///
149 /// If when the socket was registered with `Poll`, edge triggered events were
150 /// requested, then the call to [`Poll::poll`] done in step **5** will
151 /// (probably) hang despite there being another 1kb still present in the socket
152 /// read buffer. The reason for this is that edge-triggered mode delivers events
153 /// only when changes occur on the monitored [`Evented`]. So, in step *5* the
154 /// caller might end up waiting for some data that is already present inside the
155 /// socket buffer.
156 ///
157 /// With edge-triggered events, operations **must** be performed on the
158 /// `Evented` type until [`WouldBlock`] is returned. In other words, after
159 /// receiving an event indicating readiness for a certain operation, one should
160 /// assume that [`Poll::poll`] may never return another event for the same token
161 /// and readiness until the operation returns [`WouldBlock`].
162 ///
163 /// By contrast, when level-triggered notifications was requested, each call to
164 /// [`Poll::poll`] will return an event for the socket as long as data remains
165 /// in the socket buffer. Generally, level-triggered events should be avoided if
166 /// high performance is a concern.
167 ///
168 /// Since even with edge-triggered events, multiple events can be generated upon
169 /// receipt of multiple chunks of data, the caller has the option to set the
170 /// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
171 /// after the event is returned from [`Poll::poll`]. The subsequent calls to
172 /// [`Poll::poll`] will no longer include events for [`Evented`] handles that
173 /// are disabled even if the readiness state changes. The handle can be
174 /// re-enabled by calling [`reregister`]. When handles are disabled, internal
175 /// resources used to monitor the handle are maintained until the handle is
176 /// dropped or deregistered. This makes re-registering the handle a fast
177 /// operation.
178 ///
179 /// For example, in the following scenario:
180 ///
181 /// 1. A [`TcpStream`] is registered with `Poll`.
182 /// 2. The socket receives 2kb of data.
183 /// 3. A call to [`Poll::poll`] returns the token associated with the socket
184 /// indicating readable readiness.
185 /// 4. 2kb is read from the socket.
186 /// 5. Another call to read is issued and [`WouldBlock`] is returned
187 /// 6. The socket receives another 2kb of data.
188 /// 7. Another call to [`Poll::poll`] is made.
189 ///
190 /// Assuming the socket was registered with `Poll` with the [`edge`] and
191 /// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
192 /// is because, [`oneshot`] tells `Poll` to disable events for the socket after
193 /// returning an event.
194 ///
195 /// In order to receive the event for the data received in step 6, the socket
196 /// would need to be reregistered using [`reregister`].
197 ///
198 /// [`PollOpt`]: struct.PollOpt.html
199 /// [`edge`]: struct.PollOpt.html#method.edge
200 /// [`level`]: struct.PollOpt.html#method.level
201 /// [`Poll::poll`]: struct.Poll.html#method.poll
202 /// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
203 /// [`Evented`]: event/trait.Evented.html
204 /// [`TcpStream`]: tcp/struct.TcpStream.html
205 /// [`reregister`]: #method.reregister
206 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
207 ///
208 /// # Portability
209 ///
210 /// Using `Poll` provides a portable interface across supported platforms as
211 /// long as the caller takes the following into consideration:
212 ///
213 /// ### Spurious events
214 ///
215 /// [`Poll::poll`] may return readiness events even if the associated
216 /// [`Evented`] handle is not actually ready. Given the same code, this may
217 /// happen more on some platforms than others. It is important to never assume
218 /// that, just because a readiness notification was received, that the
219 /// associated operation will succeed as well.
220 ///
221 /// If operation fails with [`WouldBlock`], then the caller should not treat
222 /// this as an error, but instead should wait until another readiness event is
223 /// received.
224 ///
225 /// ### Draining readiness
226 ///
227 /// When using edge-triggered mode, once a readiness event is received, the
228 /// corresponding operation must be performed repeatedly until it returns
229 /// [`WouldBlock`]. Unless this is done, there is no guarantee that another
230 /// readiness event will be delivered, even if further data is received for the
231 /// [`Evented`] handle.
232 ///
233 /// For example, in the first scenario described above, after step 5, even if
234 /// the socket receives more data there is no guarantee that another readiness
235 /// event will be delivered.
236 ///
237 /// ### Readiness operations
238 ///
239 /// The only readiness operations that are guaranteed to be present on all
240 /// supported platforms are [`readable`] and [`writable`]. All other readiness
241 /// operations may have false negatives and as such should be considered
242 /// **hints**. This means that if a socket is registered with [`readable`],
243 /// [`error`], and [`hup`] interest, and either an error or hup is received, a
244 /// readiness event will be generated for the socket, but it **may** only
245 /// include `readable` readiness. Also note that, given the potential for
246 /// spurious events, receiving a readiness event with `hup` or `error` doesn't
247 /// actually mean that a `read` on the socket will return a result matching the
248 /// readiness event.
249 ///
250 /// In other words, portable programs that explicitly check for [`hup`] or
251 /// [`error`] readiness should be doing so as an **optimization** and always be
252 /// able to handle an error or HUP situation when performing the actual read
253 /// operation.
254 ///
255 /// [`readable`]: struct.Ready.html#method.readable
256 /// [`writable`]: struct.Ready.html#method.writable
257 /// [`error`]: unix/struct.UnixReady.html#method.error
258 /// [`hup`]: unix/struct.UnixReady.html#method.hup
259 ///
260 /// ### Registering handles
261 ///
262 /// Unless otherwise noted, it should be assumed that types implementing
263 /// [`Evented`] will never become ready unless they are registered with `Poll`.
264 ///
265 /// For example:
266 ///
267 /// ```
268 /// # use std::error::Error;
269 /// # fn try_main() -> Result<(), Box<Error>> {
270 /// use mio::{Poll, Ready, PollOpt, Token};
271 /// use mio::net::TcpStream;
272 /// use std::time::Duration;
273 /// use std::thread;
274 ///
275 /// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
276 ///
277 /// thread::sleep(Duration::from_secs(1));
278 ///
279 /// let poll = Poll::new()?;
280 ///
281 /// // The connect is not guaranteed to have started until it is registered at
282 /// // this point
283 /// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
284 /// # Ok(())
285 /// # }
286 /// #
287 /// # fn main() {
288 /// # try_main().unwrap();
289 /// # }
290 /// ```
291 ///
292 /// # Implementation notes
293 ///
294 /// `Poll` is backed by the selector provided by the operating system.
295 ///
296 /// | OS | Selector |
297 /// |------------|-----------|
298 /// | Linux | [epoll] |
299 /// | OS X, iOS | [kqueue] |
300 /// | Windows | [IOCP] |
301 /// | FreeBSD | [kqueue] |
302 /// | Android | [epoll] |
303 ///
304 /// On all supported platforms, socket operations are handled by using the
305 /// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
306 /// accessing other features provided by individual system selectors. For
307 /// example, Linux's [`signalfd`] feature can be used by registering the FD with
308 /// `Poll` via [`EventedFd`].
309 ///
310 /// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
311 /// direct call to the system selector. However, [IOCP] uses a completion model
312 /// instead of a readiness model. In this case, `Poll` must adapt the completion
313 /// model Mio's API. While non-trivial, the bridge layer is still quite
314 /// efficient. The most expensive part being calls to `read` and `write` require
315 /// data to be copied into an intermediate buffer before it is passed to the
316 /// kernel.
317 ///
318 /// Notifications generated by [`SetReadiness`] are handled by an internal
319 /// readiness queue. A single call to [`Poll::poll`] will collect events from
320 /// both from the system selector and the internal readiness queue.
321 ///
322 /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
323 /// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
324 /// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
325 /// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
326 /// [`EventedFd`]: unix/struct.EventedFd.html
327 /// [`SetReadiness`]: struct.SetReadiness.html
328 /// [`Poll::poll`]: struct.Poll.html#method.poll
329 pub struct Poll {
330 // Platform specific IO selector
331 selector: sys::Selector,
332
333 // Custom readiness queue
334 readiness_queue: ReadinessQueue,
335
336 // Use an atomic to first check if a full lock will be required. This is a
337 // fast-path check for single threaded cases avoiding the extra syscall
338 lock_state: AtomicUsize,
339
340 // Sequences concurrent calls to `Poll::poll`
341 lock: Mutex<()>,
342
343 // Wakeup the next waiter
344 condvar: Condvar,
345 }
346
347 /// Handle to a user space `Poll` registration.
348 ///
349 /// `Registration` allows implementing [`Evented`] for types that cannot work
350 /// with the [system selector]. A `Registration` is always paired with a
351 /// `SetReadiness`, which allows updating the registration's readiness state.
352 /// When [`set_readiness`] is called and the `Registration` is associated with a
353 /// [`Poll`] instance, a readiness event will be created and eventually returned
354 /// by [`poll`].
355 ///
356 /// A `Registration` / `SetReadiness` pair is created by calling
357 /// [`Registration::new2`]. At this point, the registration is not being
358 /// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
359 /// result in any readiness notifications.
360 ///
361 /// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
362 /// the same [`register`], [`reregister`], and [`deregister`] functions used
363 /// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
364 /// changes result in readiness events being dispatched to the [`Poll`] instance
365 /// with which `Registration` is registered.
366 ///
367 /// **Note**, before using `Registration` be sure to read the
368 /// [`set_readiness`] documentation and the [portability] notes. The
369 /// guarantees offered by `Registration` may be weaker than expected.
370 ///
371 /// For high level documentation, see [`Poll`].
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// use mio::{Ready, Registration, Poll, PollOpt, Token};
377 /// use mio::event::Evented;
378 ///
379 /// use std::io;
380 /// use std::time::Instant;
381 /// use std::thread;
382 ///
383 /// pub struct Deadline {
384 /// when: Instant,
385 /// registration: Registration,
386 /// }
387 ///
388 /// impl Deadline {
389 /// pub fn new(when: Instant) -> Deadline {
390 /// let (registration, set_readiness) = Registration::new2();
391 ///
392 /// thread::spawn(move || {
393 /// let now = Instant::now();
394 ///
395 /// if now < when {
396 /// thread::sleep(when - now);
397 /// }
398 ///
399 /// set_readiness.set_readiness(Ready::readable());
400 /// });
401 ///
402 /// Deadline {
403 /// when: when,
404 /// registration: registration,
405 /// }
406 /// }
407 ///
408 /// pub fn is_elapsed(&self) -> bool {
409 /// Instant::now() >= self.when
410 /// }
411 /// }
412 ///
413 /// impl Evented for Deadline {
414 /// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
415 /// -> io::Result<()>
416 /// {
417 /// self.registration.register(poll, token, interest, opts)
418 /// }
419 ///
420 /// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
421 /// -> io::Result<()>
422 /// {
423 /// self.registration.reregister(poll, token, interest, opts)
424 /// }
425 ///
426 /// fn deregister(&self, poll: &Poll) -> io::Result<()> {
427 /// poll.deregister(&self.registration)
428 /// }
429 /// }
430 /// ```
431 ///
432 /// [system selector]: struct.Poll.html#implementation-notes
433 /// [`Poll`]: struct.Poll.html
434 /// [`Registration::new2`]: struct.Registration.html#method.new2
435 /// [`Evented`]: event/trait.Evented.html
436 /// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
437 /// [`register`]: struct.Poll.html#method.register
438 /// [`reregister`]: struct.Poll.html#method.reregister
439 /// [`deregister`]: struct.Poll.html#method.deregister
440 /// [portability]: struct.Poll.html#portability
441 pub struct Registration {
442 inner: RegistrationInner,
443 }
444
445 unsafe impl Send for Registration {}
446 unsafe impl Sync for Registration {}
447
448 /// Updates the readiness state of the associated `Registration`.
449 ///
450 /// See [`Registration`] for more documentation on using `SetReadiness` and
451 /// [`Poll`] for high level polling documentation.
452 ///
453 /// [`Poll`]: struct.Poll.html
454 /// [`Registration`]: struct.Registration.html
455 #[derive(Clone)]
456 pub struct SetReadiness {
457 inner: RegistrationInner,
458 }
459
460 unsafe impl Send for SetReadiness {}
461 unsafe impl Sync for SetReadiness {}
462
463 /// Used to associate an IO type with a Selector
464 #[derive(Debug)]
465 pub struct SelectorId {
466 id: AtomicUsize,
467 }
468
469 struct RegistrationInner {
470 // Unsafe pointer to the registration's node. The node is ref counted. This
471 // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
472 // handle though it isn't stored anywhere. In other words, `Poll::poll`
473 // needs to decrement the ref count before the node is freed.
474 node: *mut ReadinessNode,
475 }
476
477 #[derive(Clone)]
478 struct ReadinessQueue {
479 inner: Arc<ReadinessQueueInner>,
480 }
481
482 unsafe impl Send for ReadinessQueue {}
483 unsafe impl Sync for ReadinessQueue {}
484
485 struct ReadinessQueueInner {
486 // Used to wake up `Poll` when readiness is set in another thread.
487 awakener: sys::Awakener,
488
489 // Head of the MPSC queue used to signal readiness to `Poll::poll`.
490 head_readiness: AtomicPtr<ReadinessNode>,
491
492 // Tail of the readiness queue.
493 //
494 // Only accessed by Poll::poll. Coordination will be handled by the poll fn
495 tail_readiness: UnsafeCell<*mut ReadinessNode>,
496
497 // Fake readiness node used to punctuate the end of the readiness queue.
498 // Before attempting to read from the queue, this node is inserted in order
499 // to partition the queue between nodes that are "owned" by the dequeue end
500 // and nodes that will be pushed on by producers.
501 end_marker: Box<ReadinessNode>,
502
503 // Similar to `end_marker`, but this node signals to producers that `Poll`
504 // has gone to sleep and must be woken up.
505 sleep_marker: Box<ReadinessNode>,
506
507 // Similar to `end_marker`, but the node signals that the queue is closed.
508 // This happens when `ReadyQueue` is dropped and signals to producers that
509 // the nodes should no longer be pushed into the queue.
510 closed_marker: Box<ReadinessNode>,
511 }
512
513 /// Node shared by a `Registration` / `SetReadiness` pair as well as the node
514 /// queued into the MPSC channel.
515 struct ReadinessNode {
516 // Node state, see struct docs for `ReadinessState`
517 //
518 // This variable is the primary point of coordination between all the
519 // various threads concurrently accessing the node.
520 state: AtomicState,
521
522 // The registration token cannot fit into the `state` variable, so it is
523 // broken out here. In order to atomically update both the state and token
524 // we have to jump through a few hoops.
525 //
526 // First, `state` includes `token_read_pos` and `token_write_pos`. These can
527 // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
528 // the token slot that contains the most up to date registration token.
529 // `token_read_pos` is the token slot that `poll` is currently reading from.
530 //
531 // When a call to `update` includes a different token than the one currently
532 // associated with the registration (token_write_pos), first an unused token
533 // slot is found. The unused slot is the one not represented by
534 // `token_read_pos` OR `token_write_pos`. The new token is written to this
535 // slot, then `state` is updated with the new `token_write_pos` value. This
536 // requires that there is only a *single* concurrent call to `update`.
537 //
538 // When `poll` reads a node state, it checks that `token_read_pos` matches
539 // `token_write_pos`. If they do not match, then it atomically updates
540 // `state` such that `token_read_pos` is set to `token_write_pos`. It will
541 // then read the token at the newly updated `token_read_pos`.
542 token_0: UnsafeCell<Token>,
543 token_1: UnsafeCell<Token>,
544 token_2: UnsafeCell<Token>,
545
546 // Used when the node is queued in the readiness linked list. Accessing
547 // this field requires winning the "queue" lock
548 next_readiness: AtomicPtr<ReadinessNode>,
549
550 // Ensures that there is only one concurrent call to `update`.
551 //
552 // Each call to `update` will attempt to swap `update_lock` from `false` to
553 // `true`. If the CAS succeeds, the thread has obtained the update lock. If
554 // the CAS fails, then the `update` call returns immediately and the update
555 // is discarded.
556 update_lock: AtomicBool,
557
558 // Pointer to Arc<ReadinessQueueInner>
559 readiness_queue: AtomicPtr<()>,
560
561 // Tracks the number of `ReadyRef` pointers
562 ref_count: AtomicUsize,
563 }
564
565 /// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
566 /// atomic variable handles encoding / decoding `ReadinessState` values.
567 struct AtomicState {
568 inner: AtomicUsize,
569 }
570
571 const MASK_2: usize = 4 - 1;
572 const MASK_4: usize = 16 - 1;
573 const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
574 const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
575
576 const READINESS_SHIFT: usize = 0;
577 const INTEREST_SHIFT: usize = 4;
578 const POLL_OPT_SHIFT: usize = 8;
579 const TOKEN_RD_SHIFT: usize = 12;
580 const TOKEN_WR_SHIFT: usize = 14;
581 const QUEUED_SHIFT: usize = 16;
582 const DROPPED_SHIFT: usize = 17;
583
584 /// Tracks all state for a single `ReadinessNode`. The state is packed into a
585 /// `usize` variable from low to high bit as follows:
586 ///
587 /// 4 bits: Registration current readiness
588 /// 4 bits: Registration interest
589 /// 4 bits: Poll options
590 /// 2 bits: Token position currently being read from by `poll`
591 /// 2 bits: Token position last written to by `update`
592 /// 1 bit: Queued flag, set when node is being pushed into MPSC queue.
593 /// 1 bit: Dropped flag, set when all `Registration` handles have been dropped.
594 #[derive(Debug, Copy, Clone, Eq, PartialEq)]
595 struct ReadinessState(usize);
596
597 /// Returned by `dequeue_node`. Represents the different states as described by
598 /// the queue documentation on 1024cores.net.
599 enum Dequeue {
600 Data(*mut ReadinessNode),
601 Empty,
602 Inconsistent,
603 }
604
605 const AWAKEN: Token = Token(usize::MAX);
606 const MAX_REFCOUNT: usize = (isize::MAX) as usize;
607
608 /*
609 *
610 * ===== Poll =====
611 *
612 */
613
614 impl Poll {
615 /// Return a new `Poll` handle.
616 ///
617 /// This function will make a syscall to the operating system to create the
618 /// system selector. If this syscall fails, `Poll::new` will return with the
619 /// error.
620 ///
621 /// See [struct] level docs for more details.
622 ///
623 /// [struct]: struct.Poll.html
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// # use std::error::Error;
629 /// # fn try_main() -> Result<(), Box<Error>> {
630 /// use mio::{Poll, Events};
631 /// use std::time::Duration;
632 ///
633 /// let poll = match Poll::new() {
634 /// Ok(poll) => poll,
635 /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
636 /// };
637 ///
638 /// // Create a structure to receive polled events
639 /// let mut events = Events::with_capacity(1024);
640 ///
641 /// // Wait for events, but none will be received because no `Evented`
642 /// // handles have been registered with this `Poll` instance.
643 /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
644 /// assert_eq!(n, 0);
645 /// # Ok(())
646 /// # }
647 /// #
648 /// # fn main() {
649 /// # try_main().unwrap();
650 /// # }
651 /// ```
new() -> io::Result<Poll>652 pub fn new() -> io::Result<Poll> {
653 is_send::<Poll>();
654 is_sync::<Poll>();
655
656 let poll = Poll {
657 selector: sys::Selector::new()?,
658 readiness_queue: ReadinessQueue::new()?,
659 lock_state: AtomicUsize::new(0),
660 lock: Mutex::new(()),
661 condvar: Condvar::new(),
662 };
663
664 // Register the notification wakeup FD with the IO poller
665 poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?;
666
667 Ok(poll)
668 }
669
670 /// Register an `Evented` handle with the `Poll` instance.
671 ///
672 /// Once registered, the `Poll` instance will monitor the `Evented` handle
673 /// for readiness state changes. When it notices a state change, it will
674 /// return a readiness event for the handle the next time [`poll`] is
675 /// called.
676 ///
677 /// See the [`struct`] docs for a high level overview.
678 ///
679 /// # Arguments
680 ///
681 /// `handle: &E: Evented`: This is the handle that the `Poll` instance
682 /// should monitor for readiness state changes.
683 ///
684 /// `token: Token`: The caller picks a token to associate with the socket.
685 /// When [`poll`] returns an event for the handle, this token is included.
686 /// This allows the caller to map the event to its handle. The token
687 /// associated with the `Evented` handle can be changed at any time by
688 /// calling [`reregister`].
689 ///
690 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
691 /// usage.
692 ///
693 /// See documentation on [`Token`] for an example showing how to pick
694 /// [`Token`] values.
695 ///
696 /// `interest: Ready`: Specifies which operations `Poll` should monitor for
697 /// readiness. `Poll` will only return readiness events for operations
698 /// specified by this argument.
699 ///
700 /// If a socket is registered with readable interest and the socket becomes
701 /// writable, no event will be returned from [`poll`].
702 ///
703 /// The readiness interest for an `Evented` handle can be changed at any
704 /// time by calling [`reregister`].
705 ///
706 /// `opts: PollOpt`: Specifies the registration options. The most common
707 /// options being [`level`] for level-triggered events, [`edge`] for
708 /// edge-triggered events, and [`oneshot`].
709 ///
710 /// The registration options for an `Evented` handle can be changed at any
711 /// time by calling [`reregister`].
712 ///
713 /// # Notes
714 ///
715 /// Unless otherwise specified, the caller should assume that once an
716 /// `Evented` handle is registered with a `Poll` instance, it is bound to
717 /// that `Poll` instance for the lifetime of the `Evented` handle. This
718 /// remains true even if the `Evented` handle is deregistered from the poll
719 /// instance using [`deregister`].
720 ///
721 /// This function is **thread safe**. It can be called concurrently from
722 /// multiple threads.
723 ///
724 /// [`struct`]: #
725 /// [`reregister`]: #method.reregister
726 /// [`deregister`]: #method.deregister
727 /// [`poll`]: #method.poll
728 /// [`level`]: struct.PollOpt.html#method.level
729 /// [`edge`]: struct.PollOpt.html#method.edge
730 /// [`oneshot`]: struct.PollOpt.html#method.oneshot
731 /// [`Token`]: struct.Token.html
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// # use std::error::Error;
737 /// # fn try_main() -> Result<(), Box<Error>> {
738 /// use mio::{Events, Poll, Ready, PollOpt, Token};
739 /// use mio::net::TcpStream;
740 /// use std::time::{Duration, Instant};
741 ///
742 /// let poll = Poll::new()?;
743 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
744 ///
745 /// // Register the socket with `poll`
746 /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
747 ///
748 /// let mut events = Events::with_capacity(1024);
749 /// let start = Instant::now();
750 /// let timeout = Duration::from_millis(500);
751 ///
752 /// loop {
753 /// let elapsed = start.elapsed();
754 ///
755 /// if elapsed >= timeout {
756 /// // Connection timed out
757 /// return Ok(());
758 /// }
759 ///
760 /// let remaining = timeout - elapsed;
761 /// poll.poll(&mut events, Some(remaining))?;
762 ///
763 /// for event in &events {
764 /// if event.token() == Token(0) {
765 /// // Something (probably) happened on the socket.
766 /// return Ok(());
767 /// }
768 /// }
769 /// }
770 /// # Ok(())
771 /// # }
772 /// #
773 /// # fn main() {
774 /// # try_main().unwrap();
775 /// # }
776 /// ```
register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented777 pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
778 where E: Evented
779 {
780 validate_args(token)?;
781
782 /*
783 * Undefined behavior:
784 * - Reusing a token with a different `Evented` without deregistering
785 * (or closing) the original `Evented`.
786 */
787 trace!("registering with poller");
788
789 // Register interests for this socket
790 handle.register(self, token, interest, opts)?;
791
792 Ok(())
793 }
794
795 /// Re-register an `Evented` handle with the `Poll` instance.
796 ///
797 /// Re-registering an `Evented` handle allows changing the details of the
798 /// registration. Specifically, it allows updating the associated `token`,
799 /// `interest`, and `opts` specified in previous `register` and `reregister`
800 /// calls.
801 ///
802 /// The `reregister` arguments fully override the previous values. In other
803 /// words, if a socket is registered with [`readable`] interest and the call
804 /// to `reregister` specifies [`writable`], then read interest is no longer
805 /// requested for the handle.
806 ///
807 /// The `Evented` handle must have previously been registered with this
808 /// instance of `Poll` otherwise the call to `reregister` will return with
809 /// an error.
810 ///
811 /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
812 /// usage.
813 ///
814 /// See the [`register`] documentation for details about the function
815 /// arguments and see the [`struct`] docs for a high level overview of
816 /// polling.
817 ///
818 /// # Examples
819 ///
820 /// ```
821 /// # use std::error::Error;
822 /// # fn try_main() -> Result<(), Box<Error>> {
823 /// use mio::{Poll, Ready, PollOpt, Token};
824 /// use mio::net::TcpStream;
825 ///
826 /// let poll = Poll::new()?;
827 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
828 ///
829 /// // Register the socket with `poll`, requesting readable
830 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
831 ///
832 /// // Reregister the socket specifying a different token and write interest
833 /// // instead. `PollOpt::edge()` must be specified even though that value
834 /// // is not being changed.
835 /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
836 /// # Ok(())
837 /// # }
838 /// #
839 /// # fn main() {
840 /// # try_main().unwrap();
841 /// # }
842 /// ```
843 ///
844 /// [`struct`]: #
845 /// [`register`]: #method.register
846 /// [`readable`]: struct.Ready.html#method.readable
847 /// [`writable`]: struct.Ready.html#method.writable
reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented848 pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
849 where E: Evented
850 {
851 validate_args(token)?;
852
853 trace!("registering with poller");
854
855 // Register interests for this socket
856 handle.reregister(self, token, interest, opts)?;
857
858 Ok(())
859 }
860
861 /// Deregister an `Evented` handle with the `Poll` instance.
862 ///
863 /// When an `Evented` handle is deregistered, the `Poll` instance will
864 /// no longer monitor it for readiness state changes. Unlike disabling
865 /// handles with oneshot, deregistering clears up any internal resources
866 /// needed to track the handle.
867 ///
868 /// A handle can be passed back to `register` after it has been
869 /// deregistered; however, it must be passed back to the **same** `Poll`
870 /// instance.
871 ///
872 /// `Evented` handles are automatically deregistered when they are dropped.
873 /// It is common to never need to explicitly call `deregister`.
874 ///
875 /// # Examples
876 ///
877 /// ```
878 /// # use std::error::Error;
879 /// # fn try_main() -> Result<(), Box<Error>> {
880 /// use mio::{Events, Poll, Ready, PollOpt, Token};
881 /// use mio::net::TcpStream;
882 /// use std::time::Duration;
883 ///
884 /// let poll = Poll::new()?;
885 /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
886 ///
887 /// // Register the socket with `poll`
888 /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
889 ///
890 /// poll.deregister(&socket)?;
891 ///
892 /// let mut events = Events::with_capacity(1024);
893 ///
894 /// // Set a timeout because this poll should never receive any events.
895 /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
896 /// assert_eq!(0, n);
897 /// # Ok(())
898 /// # }
899 /// #
900 /// # fn main() {
901 /// # try_main().unwrap();
902 /// # }
903 /// ```
deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()> where E: Evented904 pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
905 where E: Evented
906 {
907 trace!("deregistering handle with poller");
908
909 // Deregister interests for this socket
910 handle.deregister(self)?;
911
912 Ok(())
913 }
914
915 /// Wait for readiness events
916 ///
917 /// Blocks the current thread and waits for readiness events for any of the
918 /// `Evented` handles that have been registered with this `Poll` instance.
919 /// The function will block until either at least one readiness event has
920 /// been received or `timeout` has elapsed. A `timeout` of `None` means that
921 /// `poll` will block until a readiness event has been received.
922 ///
923 /// The supplied `events` will be cleared and newly received readiness events
924 /// will be pushed onto the end. At most `events.capacity()` events will be
925 /// returned. If there are further pending readiness events, they will be
926 /// returned on the next call to `poll`.
927 ///
928 /// A single call to `poll` may result in multiple readiness events being
929 /// returned for a single `Evented` handle. For example, if a TCP socket
930 /// becomes both readable and writable, it may be possible for a single
931 /// readiness event to be returned with both [`readable`] and [`writable`]
932 /// readiness **OR** two separate events may be returned, one with
933 /// [`readable`] set and one with [`writable`] set.
934 ///
935 /// Note that the `timeout` will be rounded up to the system clock
936 /// granularity (usually 1ms), and kernel scheduling delays mean that
937 /// the blocking interval may be overrun by a small amount.
938 ///
939 /// `poll` returns the number of readiness events that have been pushed into
940 /// `events` or `Err` when an error has been encountered with the system
941 /// selector. The value returned is deprecated and will be removed in 0.7.0.
942 /// Accessing the events by index is also deprecated. Events can be
943 /// inserted by other events triggering, thus making sequential access
944 /// problematic. Use the iterator API instead. See [`iter`].
945 ///
946 /// See the [struct] level documentation for a higher level discussion of
947 /// polling.
948 ///
949 /// [`readable`]: struct.Ready.html#method.readable
950 /// [`writable`]: struct.Ready.html#method.writable
951 /// [struct]: #
952 /// [`iter`]: struct.Events.html#method.iter
953 ///
954 /// # Examples
955 ///
956 /// A basic example -- establishing a `TcpStream` connection.
957 ///
958 /// ```
959 /// # use std::error::Error;
960 /// # fn try_main() -> Result<(), Box<Error>> {
961 /// use mio::{Events, Poll, Ready, PollOpt, Token};
962 /// use mio::net::TcpStream;
963 ///
964 /// use std::net::{TcpListener, SocketAddr};
965 /// use std::thread;
966 ///
967 /// // Bind a server socket to connect to.
968 /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
969 /// let server = TcpListener::bind(&addr)?;
970 /// let addr = server.local_addr()?.clone();
971 ///
972 /// // Spawn a thread to accept the socket
973 /// thread::spawn(move || {
974 /// let _ = server.accept();
975 /// });
976 ///
977 /// // Construct a new `Poll` handle as well as the `Events` we'll store into
978 /// let poll = Poll::new()?;
979 /// let mut events = Events::with_capacity(1024);
980 ///
981 /// // Connect the stream
982 /// let stream = TcpStream::connect(&addr)?;
983 ///
984 /// // Register the stream with `Poll`
985 /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
986 ///
987 /// // Wait for the socket to become ready. This has to happens in a loop to
988 /// // handle spurious wakeups.
989 /// loop {
990 /// poll.poll(&mut events, None)?;
991 ///
992 /// for event in &events {
993 /// if event.token() == Token(0) && event.readiness().is_writable() {
994 /// // The socket connected (probably, it could still be a spurious
995 /// // wakeup)
996 /// return Ok(());
997 /// }
998 /// }
999 /// }
1000 /// # Ok(())
1001 /// # }
1002 /// #
1003 /// # fn main() {
1004 /// # try_main().unwrap();
1005 /// # }
1006 /// ```
1007 ///
1008 /// [struct]: #
poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize>1009 pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1010 self.poll1(events, timeout, false)
1011 }
1012
1013 /// Like `poll`, but may be interrupted by a signal
1014 ///
1015 /// If `poll` is inturrupted while blocking, it will transparently retry the syscall. If you
1016 /// want to handle signals yourself, however, use `poll_interruptible`.
poll_interruptible(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize>1017 pub fn poll_interruptible(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
1018 self.poll1(events, timeout, true)
1019 }
1020
poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize>1021 fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
1022 let zero = Some(Duration::from_millis(0));
1023
1024 // At a high level, the synchronization strategy is to acquire access to
1025 // the critical section by transitioning the atomic from unlocked ->
1026 // locked. If the attempt fails, the thread will wait on the condition
1027 // variable.
1028 //
1029 // # Some more detail
1030 //
1031 // The `lock_state` atomic usize combines:
1032 //
1033 // - locked flag, stored in the least significant bit
1034 // - number of waiting threads, stored in the rest of the bits.
1035 //
1036 // When a thread transitions the locked flag from 0 -> 1, it has
1037 // obtained access to the critical section.
1038 //
1039 // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
1040 // This is a fast path for the case when there are no concurrent calls
1041 // to poll, which is very common.
1042 //
1043 // On failure, the mutex is locked, and the thread attempts to increment
1044 // the number of waiting threads component of `lock_state`. If this is
1045 // successfully done while the locked flag is set, then the thread can
1046 // wait on the condition variable.
1047 //
1048 // When a thread exits the critical section, it unsets the locked flag.
1049 // If there are any waiters, which is atomically determined while
1050 // unsetting the locked flag, then the condvar is notified.
1051
1052 let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
1053
1054 if 0 != curr {
1055 // Enter slower path
1056 let mut lock = self.lock.lock().unwrap();
1057 let mut inc = false;
1058
1059 loop {
1060 if curr & 1 == 0 {
1061 // The lock is currently free, attempt to grab it
1062 let mut next = curr | 1;
1063
1064 if inc {
1065 // The waiter count has previously been incremented, so
1066 // decrement it here
1067 next -= 2;
1068 }
1069
1070 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1071
1072 if actual != curr {
1073 curr = actual;
1074 continue;
1075 }
1076
1077 // Lock acquired, break from the loop
1078 break;
1079 }
1080
1081 if timeout == zero {
1082 if inc {
1083 self.lock_state.fetch_sub(2, SeqCst);
1084 }
1085
1086 return Ok(0);
1087 }
1088
1089 // The lock is currently held, so wait for it to become
1090 // free. If the waiter count hasn't been incremented yet, do
1091 // so now
1092 if !inc {
1093 let next = curr.checked_add(2).expect("overflow");
1094 let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
1095
1096 if actual != curr {
1097 curr = actual;
1098 continue;
1099 }
1100
1101 // Track that the waiter count has been incremented for
1102 // this thread and fall through to the condvar waiting
1103 inc = true;
1104 }
1105
1106 lock = match timeout {
1107 Some(to) => {
1108 let now = Instant::now();
1109
1110 // Wait to be notified
1111 let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
1112
1113 // See how much time was elapsed in the wait
1114 let elapsed = now.elapsed();
1115
1116 // Update `timeout` to reflect how much time is left to
1117 // wait.
1118 if elapsed >= to {
1119 timeout = zero;
1120 } else {
1121 // Update the timeout
1122 timeout = Some(to - elapsed);
1123 }
1124
1125 l
1126 }
1127 None => {
1128 self.condvar.wait(lock).unwrap()
1129 }
1130 };
1131
1132 // Reload the state
1133 curr = self.lock_state.load(SeqCst);
1134
1135 // Try to lock again...
1136 }
1137 }
1138
1139 let ret = self.poll2(events, timeout, interruptible);
1140
1141 // Release the lock
1142 if 1 != self.lock_state.fetch_and(!1, Release) {
1143 // Acquire the mutex
1144 let _lock = self.lock.lock().unwrap();
1145
1146 // There is at least one waiting thread, so notify one
1147 self.condvar.notify_one();
1148 }
1149
1150 ret
1151 }
1152
1153 #[inline]
1154 #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize>1155 fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
1156 // Compute the timeout value passed to the system selector. If the
1157 // readiness queue has pending nodes, we still want to poll the system
1158 // selector for new events, but we don't want to block the thread to
1159 // wait for new events.
1160 if timeout == Some(Duration::from_millis(0)) {
1161 // If blocking is not requested, then there is no need to prepare
1162 // the queue for sleep
1163 //
1164 // The sleep_marker should be removed by readiness_queue.poll().
1165 } else if self.readiness_queue.prepare_for_sleep() {
1166 // The readiness queue is empty. The call to `prepare_for_sleep`
1167 // inserts `sleep_marker` into the queue. This signals to any
1168 // threads setting readiness that the `Poll::poll` is going to
1169 // sleep, so the awakener should be used.
1170 } else {
1171 // The readiness queue is not empty, so do not block the thread.
1172 timeout = Some(Duration::from_millis(0));
1173 }
1174
1175 loop {
1176 let now = Instant::now();
1177 // First get selector events
1178 let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
1179 match res {
1180 Ok(true) => {
1181 // Some awakeners require reading from a FD.
1182 self.readiness_queue.inner.awakener.cleanup();
1183 break;
1184 }
1185 Ok(false) => break,
1186 Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
1187 // Interrupted by a signal; update timeout if necessary and retry
1188 if let Some(to) = timeout {
1189 let elapsed = now.elapsed();
1190 if elapsed >= to {
1191 break;
1192 } else {
1193 timeout = Some(to - elapsed);
1194 }
1195 }
1196 }
1197 Err(e) => return Err(e),
1198 }
1199 }
1200
1201 // Poll custom event queue
1202 self.readiness_queue.poll(&mut events.inner);
1203
1204 // Return number of polled events
1205 Ok(events.inner.len())
1206 }
1207 }
1208
validate_args(token: Token) -> io::Result<()>1209 fn validate_args(token: Token) -> io::Result<()> {
1210 if token == AWAKEN {
1211 return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
1212 }
1213
1214 Ok(())
1215 }
1216
1217 impl fmt::Debug for Poll {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1218 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1219 fmt.debug_struct("Poll")
1220 .finish()
1221 }
1222 }
1223
1224 #[cfg(all(unix, not(target_os = "fuchsia")))]
1225 impl AsRawFd for Poll {
as_raw_fd(&self) -> RawFd1226 fn as_raw_fd(&self) -> RawFd {
1227 self.selector.as_raw_fd()
1228 }
1229 }
1230
1231 /// A collection of readiness events.
1232 ///
1233 /// `Events` is passed as an argument to [`Poll::poll`] and will be used to
1234 /// receive any new readiness events received since the last poll. Usually, a
1235 /// single `Events` instance is created at the same time as a [`Poll`] and
1236 /// reused on each call to [`Poll::poll`].
1237 ///
1238 /// See [`Poll`] for more documentation on polling.
1239 ///
1240 /// # Examples
1241 ///
1242 /// ```
1243 /// # use std::error::Error;
1244 /// # fn try_main() -> Result<(), Box<Error>> {
1245 /// use mio::{Events, Poll};
1246 /// use std::time::Duration;
1247 ///
1248 /// let mut events = Events::with_capacity(1024);
1249 /// let poll = Poll::new()?;
1250 ///
1251 /// assert_eq!(0, events.len());
1252 ///
1253 /// // Register `Evented` handles with `poll`
1254 ///
1255 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1256 ///
1257 /// for event in &events {
1258 /// println!("event={:?}", event);
1259 /// }
1260 /// # Ok(())
1261 /// # }
1262 /// #
1263 /// # fn main() {
1264 /// # try_main().unwrap();
1265 /// # }
1266 /// ```
1267 ///
1268 /// [`Poll::poll`]: struct.Poll.html#method.poll
1269 /// [`Poll`]: struct.Poll.html
1270 pub struct Events {
1271 inner: sys::Events,
1272 }
1273
1274 /// [`Events`] iterator.
1275 ///
1276 /// This struct is created by the [`iter`] method on [`Events`].
1277 ///
1278 /// # Examples
1279 ///
1280 /// ```
1281 /// # use std::error::Error;
1282 /// # fn try_main() -> Result<(), Box<Error>> {
1283 /// use mio::{Events, Poll};
1284 /// use std::time::Duration;
1285 ///
1286 /// let mut events = Events::with_capacity(1024);
1287 /// let poll = Poll::new()?;
1288 ///
1289 /// // Register handles with `poll`
1290 ///
1291 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1292 ///
1293 /// for event in events.iter() {
1294 /// println!("event={:?}", event);
1295 /// }
1296 /// # Ok(())
1297 /// # }
1298 /// #
1299 /// # fn main() {
1300 /// # try_main().unwrap();
1301 /// # }
1302 /// ```
1303 ///
1304 /// [`Events`]: struct.Events.html
1305 /// [`iter`]: struct.Events.html#method.iter
1306 #[derive(Debug, Clone)]
1307 pub struct Iter<'a> {
1308 inner: &'a Events,
1309 pos: usize,
1310 }
1311
1312 /// Owned [`Events`] iterator.
1313 ///
1314 /// This struct is created by the `into_iter` method on [`Events`].
1315 ///
1316 /// # Examples
1317 ///
1318 /// ```
1319 /// # use std::error::Error;
1320 /// # fn try_main() -> Result<(), Box<Error>> {
1321 /// use mio::{Events, Poll};
1322 /// use std::time::Duration;
1323 ///
1324 /// let mut events = Events::with_capacity(1024);
1325 /// let poll = Poll::new()?;
1326 ///
1327 /// // Register handles with `poll`
1328 ///
1329 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1330 ///
1331 /// for event in events {
1332 /// println!("event={:?}", event);
1333 /// }
1334 /// # Ok(())
1335 /// # }
1336 /// #
1337 /// # fn main() {
1338 /// # try_main().unwrap();
1339 /// # }
1340 /// ```
1341 /// [`Events`]: struct.Events.html
1342 #[derive(Debug)]
1343 pub struct IntoIter {
1344 inner: Events,
1345 pos: usize,
1346 }
1347
1348 impl Events {
1349 /// Return a new `Events` capable of holding up to `capacity` events.
1350 ///
1351 /// # Examples
1352 ///
1353 /// ```
1354 /// use mio::Events;
1355 ///
1356 /// let events = Events::with_capacity(1024);
1357 ///
1358 /// assert_eq!(1024, events.capacity());
1359 /// ```
with_capacity(capacity: usize) -> Events1360 pub fn with_capacity(capacity: usize) -> Events {
1361 Events {
1362 inner: sys::Events::with_capacity(capacity),
1363 }
1364 }
1365
1366 #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
1367 #[doc(hidden)]
get(&self, idx: usize) -> Option<Event>1368 pub fn get(&self, idx: usize) -> Option<Event> {
1369 self.inner.get(idx)
1370 }
1371
1372 #[doc(hidden)]
1373 #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
len(&self) -> usize1374 pub fn len(&self) -> usize {
1375 self.inner.len()
1376 }
1377
1378 /// Returns the number of `Event` values that `self` can hold.
1379 ///
1380 /// ```
1381 /// use mio::Events;
1382 ///
1383 /// let events = Events::with_capacity(1024);
1384 ///
1385 /// assert_eq!(1024, events.capacity());
1386 /// ```
capacity(&self) -> usize1387 pub fn capacity(&self) -> usize {
1388 self.inner.capacity()
1389 }
1390
1391 /// Returns `true` if `self` contains no `Event` values.
1392 ///
1393 /// # Examples
1394 ///
1395 /// ```
1396 /// use mio::Events;
1397 ///
1398 /// let events = Events::with_capacity(1024);
1399 ///
1400 /// assert!(events.is_empty());
1401 /// ```
is_empty(&self) -> bool1402 pub fn is_empty(&self) -> bool {
1403 self.inner.is_empty()
1404 }
1405
1406 /// Returns an iterator over the `Event` values.
1407 ///
1408 /// # Examples
1409 ///
1410 /// ```
1411 /// # use std::error::Error;
1412 /// # fn try_main() -> Result<(), Box<Error>> {
1413 /// use mio::{Events, Poll};
1414 /// use std::time::Duration;
1415 ///
1416 /// let mut events = Events::with_capacity(1024);
1417 /// let poll = Poll::new()?;
1418 ///
1419 /// // Register handles with `poll`
1420 ///
1421 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1422 ///
1423 /// for event in events.iter() {
1424 /// println!("event={:?}", event);
1425 /// }
1426 /// # Ok(())
1427 /// # }
1428 /// #
1429 /// # fn main() {
1430 /// # try_main().unwrap();
1431 /// # }
1432 /// ```
iter(&self) -> Iter1433 pub fn iter(&self) -> Iter {
1434 Iter {
1435 inner: self,
1436 pos: 0
1437 }
1438 }
1439
1440 /// Clearing all `Event` values from container explicitly.
1441 ///
1442 /// # Examples
1443 ///
1444 /// ```
1445 /// # use std::error::Error;
1446 /// # fn try_main() -> Result<(), Box<Error>> {
1447 /// use mio::{Events, Poll};
1448 /// use std::time::Duration;
1449 ///
1450 /// let mut events = Events::with_capacity(1024);
1451 /// let poll = Poll::new()?;
1452 ///
1453 /// // Register handles with `poll`
1454 /// for _ in 0..2 {
1455 /// events.clear();
1456 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
1457 ///
1458 /// for event in events.iter() {
1459 /// println!("event={:?}", event);
1460 /// }
1461 /// }
1462 /// # Ok(())
1463 /// # }
1464 /// #
1465 /// # fn main() {
1466 /// # try_main().unwrap();
1467 /// # }
1468 /// ```
clear(&mut self)1469 pub fn clear(&mut self) {
1470 self.inner.clear();
1471 }
1472 }
1473
1474 impl<'a> IntoIterator for &'a Events {
1475 type Item = Event;
1476 type IntoIter = Iter<'a>;
1477
into_iter(self) -> Self::IntoIter1478 fn into_iter(self) -> Self::IntoIter {
1479 self.iter()
1480 }
1481 }
1482
1483 impl<'a> Iterator for Iter<'a> {
1484 type Item = Event;
1485
next(&mut self) -> Option<Event>1486 fn next(&mut self) -> Option<Event> {
1487 let ret = self.inner.inner.get(self.pos);
1488 self.pos += 1;
1489 ret
1490 }
1491 }
1492
1493 impl IntoIterator for Events {
1494 type Item = Event;
1495 type IntoIter = IntoIter;
1496
into_iter(self) -> Self::IntoIter1497 fn into_iter(self) -> Self::IntoIter {
1498 IntoIter {
1499 inner: self,
1500 pos: 0,
1501 }
1502 }
1503 }
1504
1505 impl Iterator for IntoIter {
1506 type Item = Event;
1507
next(&mut self) -> Option<Event>1508 fn next(&mut self) -> Option<Event> {
1509 let ret = self.inner.inner.get(self.pos);
1510 self.pos += 1;
1511 ret
1512 }
1513 }
1514
1515 impl fmt::Debug for Events {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1516 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1517 f.debug_struct("Events")
1518 .field("capacity", &self.capacity())
1519 .finish()
1520 }
1521 }
1522
1523 // ===== Accessors for internal usage =====
1524
selector(poll: &Poll) -> &sys::Selector1525 pub fn selector(poll: &Poll) -> &sys::Selector {
1526 &poll.selector
1527 }
1528
1529 /*
1530 *
1531 * ===== Registration =====
1532 *
1533 */
1534
1535 // TODO: get rid of this, windows depends on it for now
1536 #[allow(dead_code)]
new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt) -> (Registration, SetReadiness)1537 pub fn new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt)
1538 -> (Registration, SetReadiness)
1539 {
1540 Registration::new_priv(poll, token, ready, opt)
1541 }
1542
1543 impl Registration {
1544 /// Create and return a new `Registration` and the associated
1545 /// `SetReadiness`.
1546 ///
1547 /// See [struct] documentation for more detail and [`Poll`]
1548 /// for high level documentation on polling.
1549 ///
1550 /// # Examples
1551 ///
1552 /// ```
1553 /// # use std::error::Error;
1554 /// # fn try_main() -> Result<(), Box<Error>> {
1555 /// use mio::{Events, Ready, Registration, Poll, PollOpt, Token};
1556 /// use std::thread;
1557 ///
1558 /// let (registration, set_readiness) = Registration::new2();
1559 ///
1560 /// thread::spawn(move || {
1561 /// use std::time::Duration;
1562 /// thread::sleep(Duration::from_millis(500));
1563 ///
1564 /// set_readiness.set_readiness(Ready::readable());
1565 /// });
1566 ///
1567 /// let poll = Poll::new()?;
1568 /// poll.register(®istration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
1569 ///
1570 /// let mut events = Events::with_capacity(256);
1571 ///
1572 /// loop {
1573 /// poll.poll(&mut events, None);
1574 ///
1575 /// for event in &events {
1576 /// if event.token() == Token(0) && event.readiness().is_readable() {
1577 /// return Ok(());
1578 /// }
1579 /// }
1580 /// }
1581 /// # Ok(())
1582 /// # }
1583 /// #
1584 /// # fn main() {
1585 /// # try_main().unwrap();
1586 /// # }
1587 /// ```
1588 /// [struct]: #
1589 /// [`Poll`]: struct.Poll.html
new2() -> (Registration, SetReadiness)1590 pub fn new2() -> (Registration, SetReadiness) {
1591 // Allocate the registration node. The new node will have `ref_count`
1592 // set to 2: one SetReadiness, one Registration.
1593 let node = Box::into_raw(Box::new(ReadinessNode::new(
1594 ptr::null_mut(), Token(0), Ready::empty(), PollOpt::empty(), 2)));
1595
1596 let registration = Registration {
1597 inner: RegistrationInner {
1598 node,
1599 },
1600 };
1601
1602 let set_readiness = SetReadiness {
1603 inner: RegistrationInner {
1604 node,
1605 },
1606 };
1607
1608 (registration, set_readiness)
1609 }
1610
1611 #[deprecated(since = "0.6.5", note = "use `new2` instead")]
1612 #[cfg(feature = "with-deprecated")]
1613 #[doc(hidden)]
new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> (Registration, SetReadiness)1614 pub fn new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
1615 -> (Registration, SetReadiness)
1616 {
1617 Registration::new_priv(poll, token, interest, opt)
1618 }
1619
1620 // TODO: Get rid of this (windows depends on it for now)
new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> (Registration, SetReadiness)1621 fn new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
1622 -> (Registration, SetReadiness)
1623 {
1624 is_send::<Registration>();
1625 is_sync::<Registration>();
1626 is_send::<SetReadiness>();
1627 is_sync::<SetReadiness>();
1628
1629 // Clone handle to the readiness queue, this bumps the ref count
1630 let queue = poll.readiness_queue.inner.clone();
1631
1632 // Convert to a *mut () pointer
1633 let queue: *mut () = unsafe { mem::transmute(queue) };
1634
1635 // Allocate the registration node. The new node will have `ref_count`
1636 // set to 3: one SetReadiness, one Registration, and one Poll handle.
1637 let node = Box::into_raw(Box::new(ReadinessNode::new(
1638 queue, token, interest, opt, 3)));
1639
1640 let registration = Registration {
1641 inner: RegistrationInner {
1642 node,
1643 },
1644 };
1645
1646 let set_readiness = SetReadiness {
1647 inner: RegistrationInner {
1648 node,
1649 },
1650 };
1651
1652 (registration, set_readiness)
1653 }
1654
1655 #[deprecated(since = "0.6.5", note = "use `Evented` impl")]
1656 #[cfg(feature = "with-deprecated")]
1657 #[doc(hidden)]
update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1658 pub fn update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1659 self.inner.update(poll, token, interest, opts)
1660 }
1661
1662 #[deprecated(since = "0.6.5", note = "use `Poll::deregister` instead")]
1663 #[cfg(feature = "with-deprecated")]
1664 #[doc(hidden)]
deregister(&self, poll: &Poll) -> io::Result<()>1665 pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
1666 self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
1667 }
1668 }
1669
1670 impl Evented for Registration {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1671 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1672 self.inner.update(poll, token, interest, opts)
1673 }
1674
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>1675 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
1676 self.inner.update(poll, token, interest, opts)
1677 }
1678
deregister(&self, poll: &Poll) -> io::Result<()>1679 fn deregister(&self, poll: &Poll) -> io::Result<()> {
1680 self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
1681 }
1682 }
1683
1684 impl Drop for Registration {
drop(&mut self)1685 fn drop(&mut self) {
1686 // `flag_as_dropped` toggles the `dropped` flag and notifies
1687 // `Poll::poll` to release its handle (which is just decrementing
1688 // the ref count).
1689 if self.inner.state.flag_as_dropped() {
1690 // Can't do anything if the queuing fails
1691 let _ = self.inner.enqueue_with_wakeup();
1692 }
1693 }
1694 }
1695
1696 impl fmt::Debug for Registration {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1697 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1698 fmt.debug_struct("Registration")
1699 .finish()
1700 }
1701 }
1702
1703 impl SetReadiness {
1704 /// Returns the registration's current readiness.
1705 ///
1706 /// # Note
1707 ///
1708 /// There is no guarantee that `readiness` establishes any sort of memory
1709 /// ordering. Any concurrent data access must be synchronized using another
1710 /// strategy.
1711 ///
1712 /// # Examples
1713 ///
1714 /// ```
1715 /// # use std::error::Error;
1716 /// # fn try_main() -> Result<(), Box<Error>> {
1717 /// use mio::{Registration, Ready};
1718 ///
1719 /// let (registration, set_readiness) = Registration::new2();
1720 ///
1721 /// assert!(set_readiness.readiness().is_empty());
1722 ///
1723 /// set_readiness.set_readiness(Ready::readable())?;
1724 /// assert!(set_readiness.readiness().is_readable());
1725 /// # Ok(())
1726 /// # }
1727 /// #
1728 /// # fn main() {
1729 /// # try_main().unwrap();
1730 /// # }
1731 /// ```
readiness(&self) -> Ready1732 pub fn readiness(&self) -> Ready {
1733 self.inner.readiness()
1734 }
1735
1736 /// Set the registration's readiness
1737 ///
1738 /// If the associated `Registration` is registered with a [`Poll`] instance
1739 /// and has requested readiness events that include `ready`, then a future
1740 /// call to [`Poll::poll`] will receive a readiness event representing the
1741 /// readiness state change.
1742 ///
1743 /// # Note
1744 ///
1745 /// There is no guarantee that `readiness` establishes any sort of memory
1746 /// ordering. Any concurrent data access must be synchronized using another
1747 /// strategy.
1748 ///
1749 /// There is also no guarantee as to when the readiness event will be
1750 /// delivered to poll. A best attempt will be made to make the delivery in a
1751 /// "timely" fashion. For example, the following is **not** guaranteed to
1752 /// work:
1753 ///
1754 /// ```
1755 /// # use std::error::Error;
1756 /// # fn try_main() -> Result<(), Box<Error>> {
1757 /// use mio::{Events, Registration, Ready, Poll, PollOpt, Token};
1758 ///
1759 /// let poll = Poll::new()?;
1760 /// let (registration, set_readiness) = Registration::new2();
1761 ///
1762 /// poll.register(®istration,
1763 /// Token(0),
1764 /// Ready::readable(),
1765 /// PollOpt::edge())?;
1766 ///
1767 /// // Set the readiness, then immediately poll to try to get the readiness
1768 /// // event
1769 /// set_readiness.set_readiness(Ready::readable())?;
1770 ///
1771 /// let mut events = Events::with_capacity(1024);
1772 /// poll.poll(&mut events, None)?;
1773 ///
1774 /// // There is NO guarantee that the following will work. It is possible
1775 /// // that the readiness event will be delivered at a later time.
1776 /// let event = events.get(0).unwrap();
1777 /// assert_eq!(event.token(), Token(0));
1778 /// assert!(event.readiness().is_readable());
1779 /// # Ok(())
1780 /// # }
1781 /// #
1782 /// # fn main() {
1783 /// # try_main().unwrap();
1784 /// # }
1785 /// ```
1786 ///
1787 /// # Examples
1788 ///
1789 /// A simple example, for a more elaborate example, see the [`Evented`]
1790 /// documentation.
1791 ///
1792 /// ```
1793 /// # use std::error::Error;
1794 /// # fn try_main() -> Result<(), Box<Error>> {
1795 /// use mio::{Registration, Ready};
1796 ///
1797 /// let (registration, set_readiness) = Registration::new2();
1798 ///
1799 /// assert!(set_readiness.readiness().is_empty());
1800 ///
1801 /// set_readiness.set_readiness(Ready::readable())?;
1802 /// assert!(set_readiness.readiness().is_readable());
1803 /// # Ok(())
1804 /// # }
1805 /// #
1806 /// # fn main() {
1807 /// # try_main().unwrap();
1808 /// # }
1809 /// ```
1810 ///
1811 /// [`Registration`]: struct.Registration.html
1812 /// [`Evented`]: event/trait.Evented.html#examples
1813 /// [`Poll`]: struct.Poll.html
1814 /// [`Poll::poll`]: struct.Poll.html#method.poll
set_readiness(&self, ready: Ready) -> io::Result<()>1815 pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1816 self.inner.set_readiness(ready)
1817 }
1818 }
1819
1820 impl fmt::Debug for SetReadiness {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1821 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1822 f.debug_struct("SetReadiness")
1823 .finish()
1824 }
1825 }
1826
1827 impl RegistrationInner {
1828 /// Get the registration's readiness.
readiness(&self) -> Ready1829 fn readiness(&self) -> Ready {
1830 self.state.load(Relaxed).readiness()
1831 }
1832
1833 /// Set the registration's readiness.
1834 ///
1835 /// This function can be called concurrently by an arbitrary number of
1836 /// SetReadiness handles.
set_readiness(&self, ready: Ready) -> io::Result<()>1837 fn set_readiness(&self, ready: Ready) -> io::Result<()> {
1838 // Load the current atomic state.
1839 let mut state = self.state.load(Acquire);
1840 let mut next;
1841
1842 loop {
1843 next = state;
1844
1845 if state.is_dropped() {
1846 // Node is dropped, no more notifications
1847 return Ok(());
1848 }
1849
1850 // Update the readiness
1851 next.set_readiness(ready);
1852
1853 // If the readiness is not blank, try to obtain permission to
1854 // push the node into the readiness queue.
1855 if !next.effective_readiness().is_empty() {
1856 next.set_queued();
1857 }
1858
1859 let actual = self.state.compare_and_swap(state, next, AcqRel);
1860
1861 if state == actual {
1862 break;
1863 }
1864
1865 state = actual;
1866 }
1867
1868 if !state.is_queued() && next.is_queued() {
1869 // We toggled the queued flag, making us responsible for queuing the
1870 // node in the MPSC readiness queue.
1871 self.enqueue_with_wakeup()?;
1872 }
1873
1874 Ok(())
1875 }
1876
1877 /// Update the registration details associated with the node
update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>1878 fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
1879 // First, ensure poll instances match
1880 //
1881 // Load the queue pointer, `Relaxed` is sufficient here as only the
1882 // pointer is being operated on. The actual memory is guaranteed to be
1883 // visible the `poll: &Poll` ref passed as an argument to the function.
1884 let mut queue = self.readiness_queue.load(Relaxed);
1885 let other: &*mut () = unsafe {
1886 &*(&poll.readiness_queue.inner as *const _ as *const *mut ())
1887 };
1888 let other = *other;
1889
1890 debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
1891
1892 if queue.is_null() {
1893 // Attempt to set the queue pointer. `Release` ordering synchronizes
1894 // with `Acquire` in `ensure_with_wakeup`.
1895 let actual = self.readiness_queue.compare_and_swap(
1896 queue, other, Release);
1897
1898 if actual.is_null() {
1899 // The CAS succeeded, this means that the node's ref count
1900 // should be incremented to reflect that the `poll` function
1901 // effectively owns the node as well.
1902 //
1903 // `Relaxed` ordering used for the same reason as in
1904 // RegistrationInner::clone
1905 self.ref_count.fetch_add(1, Relaxed);
1906
1907 // Note that the `queue` reference stored in our
1908 // `readiness_queue` field is intended to be a strong reference,
1909 // so now that we've successfully claimed the reference we bump
1910 // the refcount here.
1911 //
1912 // Down below in `release_node` when we deallocate this
1913 // `RegistrationInner` is where we'll transmute this back to an
1914 // arc and decrement the reference count.
1915 mem::forget(poll.readiness_queue.clone());
1916 } else {
1917 // The CAS failed, another thread set the queue pointer, so ensure
1918 // that the pointer and `other` match
1919 if actual != other {
1920 return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
1921 }
1922 }
1923
1924 queue = other;
1925 } else if queue != other {
1926 return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
1927 }
1928
1929 unsafe {
1930 let actual = &poll.readiness_queue.inner as *const _ as *const usize;
1931 debug_assert_eq!(queue as usize, *actual);
1932 }
1933
1934 // The `update_lock` atomic is used as a flag ensuring only a single
1935 // thread concurrently enters the `update` critical section. Any
1936 // concurrent calls to update are discarded. If coordinated updates are
1937 // required, the Mio user is responsible for handling that.
1938 //
1939 // Acquire / Release ordering is used on `update_lock` to ensure that
1940 // data access to the `token_*` variables are scoped to the critical
1941 // section.
1942
1943 // Acquire the update lock.
1944 if self.update_lock.compare_and_swap(false, true, Acquire) {
1945 // The lock is already held. Discard the update
1946 return Ok(());
1947 }
1948
1949 // Relaxed ordering is acceptable here as the only memory that needs to
1950 // be visible as part of the update are the `token_*` variables, and
1951 // ordering has already been handled by the `update_lock` access.
1952 let mut state = self.state.load(Relaxed);
1953 let mut next;
1954
1955 // Read the current token, again this memory has been ordered by the
1956 // acquire on `update_lock`.
1957 let curr_token_pos = state.token_write_pos();
1958 let curr_token = unsafe { self::token(self, curr_token_pos) };
1959
1960 let mut next_token_pos = curr_token_pos;
1961
1962 // If the `update` call is changing the token, then compute the next
1963 // available token slot and write the token there.
1964 //
1965 // Note that this computation is happening *outside* of the
1966 // compare-and-swap loop. The update lock ensures that only a single
1967 // thread could be mutating the write_token_position, so the
1968 // `next_token_pos` will never need to be recomputed even if
1969 // `token_read_pos` concurrently changes. This is because
1970 // `token_read_pos` can ONLY concurrently change to the current value of
1971 // `token_write_pos`, so `next_token_pos` will always remain valid.
1972 if token != curr_token {
1973 next_token_pos = state.next_token_pos();
1974
1975 // Update the token
1976 match next_token_pos {
1977 0 => unsafe { *self.token_0.get() = token },
1978 1 => unsafe { *self.token_1.get() = token },
1979 2 => unsafe { *self.token_2.get() = token },
1980 _ => unreachable!(),
1981 }
1982 }
1983
1984 // Now enter the compare-and-swap loop
1985 loop {
1986 next = state;
1987
1988 // The node is only dropped once all `Registration` handles are
1989 // dropped. Only `Registration` can call `update`.
1990 debug_assert!(!state.is_dropped());
1991
1992 // Update the write token position, this will also release the token
1993 // to Poll::poll.
1994 next.set_token_write_pos(next_token_pos);
1995
1996 // Update readiness and poll opts
1997 next.set_interest(interest);
1998 next.set_poll_opt(opt);
1999
2000 // If there is effective readiness, the node will need to be queued
2001 // for processing. This exact behavior is still TBD, so we are
2002 // conservative for now and always fire.
2003 //
2004 // See https://github.com/carllerche/mio/issues/535.
2005 if !next.effective_readiness().is_empty() {
2006 next.set_queued();
2007 }
2008
2009 // compare-and-swap the state values. Only `Release` is needed here.
2010 // The `Release` ensures that `Poll::poll` will see the token
2011 // update and the update function doesn't care about any other
2012 // memory visibility.
2013 let actual = self.state.compare_and_swap(state, next, Release);
2014
2015 if actual == state {
2016 break;
2017 }
2018
2019 // CAS failed, but `curr_token_pos` should not have changed given
2020 // that we still hold the update lock.
2021 debug_assert_eq!(curr_token_pos, actual.token_write_pos());
2022
2023 state = actual;
2024 }
2025
2026 // Release the lock
2027 self.update_lock.store(false, Release);
2028
2029 if !state.is_queued() && next.is_queued() {
2030 // We are responsible for enqueing the node.
2031 enqueue_with_wakeup(queue, self)?;
2032 }
2033
2034 Ok(())
2035 }
2036 }
2037
2038 impl ops::Deref for RegistrationInner {
2039 type Target = ReadinessNode;
2040
deref(&self) -> &ReadinessNode2041 fn deref(&self) -> &ReadinessNode {
2042 unsafe { &*self.node }
2043 }
2044 }
2045
2046 impl Clone for RegistrationInner {
clone(&self) -> RegistrationInner2047 fn clone(&self) -> RegistrationInner {
2048 // Using a relaxed ordering is alright here, as knowledge of the
2049 // original reference prevents other threads from erroneously deleting
2050 // the object.
2051 //
2052 // As explained in the [Boost documentation][1], Increasing the
2053 // reference counter can always be done with memory_order_relaxed: New
2054 // references to an object can only be formed from an existing
2055 // reference, and passing an existing reference from one thread to
2056 // another must already provide any required synchronization.
2057 //
2058 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
2059 let old_size = self.ref_count.fetch_add(1, Relaxed);
2060
2061 // However we need to guard against massive refcounts in case someone
2062 // is `mem::forget`ing Arcs. If we don't do this the count can overflow
2063 // and users will use-after free. We racily saturate to `isize::MAX` on
2064 // the assumption that there aren't ~2 billion threads incrementing
2065 // the reference count at once. This branch will never be taken in
2066 // any realistic program.
2067 //
2068 // We abort because such a program is incredibly degenerate, and we
2069 // don't care to support it.
2070 if old_size & !MAX_REFCOUNT != 0 {
2071 process::abort();
2072 }
2073
2074 RegistrationInner {
2075 node: self.node,
2076 }
2077 }
2078 }
2079
2080 impl Drop for RegistrationInner {
drop(&mut self)2081 fn drop(&mut self) {
2082 // Only handles releasing from `Registration` and `SetReadiness`
2083 // handles. Poll has to call this itself.
2084 release_node(self.node);
2085 }
2086 }
2087
2088 /*
2089 *
2090 * ===== ReadinessQueue =====
2091 *
2092 */
2093
2094 impl ReadinessQueue {
2095 /// Create a new `ReadinessQueue`.
new() -> io::Result<ReadinessQueue>2096 fn new() -> io::Result<ReadinessQueue> {
2097 is_send::<Self>();
2098 is_sync::<Self>();
2099
2100 let end_marker = Box::new(ReadinessNode::marker());
2101 let sleep_marker = Box::new(ReadinessNode::marker());
2102 let closed_marker = Box::new(ReadinessNode::marker());
2103
2104 let ptr = &*end_marker as *const _ as *mut _;
2105
2106 Ok(ReadinessQueue {
2107 inner: Arc::new(ReadinessQueueInner {
2108 awakener: sys::Awakener::new()?,
2109 head_readiness: AtomicPtr::new(ptr),
2110 tail_readiness: UnsafeCell::new(ptr),
2111 end_marker,
2112 sleep_marker,
2113 closed_marker,
2114 })
2115 })
2116 }
2117
2118 /// Poll the queue for new events
poll(&self, dst: &mut sys::Events)2119 fn poll(&self, dst: &mut sys::Events) {
2120 // `until` is set with the first node that gets re-enqueued due to being
2121 // set to have level-triggered notifications. This prevents an infinite
2122 // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
2123 let mut until = ptr::null_mut();
2124
2125 if dst.len() == dst.capacity() {
2126 // If `dst` is already full, the readiness queue won't be drained.
2127 // This might result in `sleep_marker` staying in the queue and
2128 // unecessary pipe writes occuring.
2129 self.inner.clear_sleep_marker();
2130 }
2131
2132 'outer:
2133 while dst.len() < dst.capacity() {
2134 // Dequeue a node. If the queue is in an inconsistent state, then
2135 // stop polling. `Poll::poll` will be called again shortly and enter
2136 // a syscall, which should be enough to enable the other thread to
2137 // finish the queuing process.
2138 let ptr = match unsafe { self.inner.dequeue_node(until) } {
2139 Dequeue::Empty | Dequeue::Inconsistent => break,
2140 Dequeue::Data(ptr) => ptr,
2141 };
2142
2143 let node = unsafe { &*ptr };
2144
2145 // Read the node state with Acquire ordering. This allows reading
2146 // the token variables.
2147 let mut state = node.state.load(Acquire);
2148 let mut next;
2149 let mut readiness;
2150 let mut opt;
2151
2152 loop {
2153 // Build up any changes to the readiness node's state and
2154 // attempt the CAS at the end
2155 next = state;
2156
2157 // Given that the node was just read from the queue, the
2158 // `queued` flag should still be set.
2159 debug_assert!(state.is_queued());
2160
2161 // The dropped flag means we need to release the node and
2162 // perform no further processing on it.
2163 if state.is_dropped() {
2164 // Release the node and continue
2165 release_node(ptr);
2166 continue 'outer;
2167 }
2168
2169 // Process the node
2170 readiness = state.effective_readiness();
2171 opt = state.poll_opt();
2172
2173 if opt.is_edge() {
2174 // Mark the node as dequeued
2175 next.set_dequeued();
2176
2177 if opt.is_oneshot() && !readiness.is_empty() {
2178 next.disarm();
2179 }
2180 } else if readiness.is_empty() {
2181 next.set_dequeued();
2182 }
2183
2184 // Ensure `token_read_pos` is set to `token_write_pos` so that
2185 // we read the most up to date token value.
2186 next.update_token_read_pos();
2187
2188 if state == next {
2189 break;
2190 }
2191
2192 let actual = node.state.compare_and_swap(state, next, AcqRel);
2193
2194 if actual == state {
2195 break;
2196 }
2197
2198 state = actual;
2199 }
2200
2201 // If the queued flag is still set, then the node must be requeued.
2202 // This typically happens when using level-triggered notifications.
2203 if next.is_queued() {
2204 if until.is_null() {
2205 // We never want to see the node again
2206 until = ptr;
2207 }
2208
2209 // Requeue the node
2210 self.inner.enqueue_node(node);
2211 }
2212
2213 if !readiness.is_empty() {
2214 // Get the token
2215 let token = unsafe { token(node, next.token_read_pos()) };
2216
2217 // Push the event
2218 dst.push_event(Event::new(readiness, token));
2219 }
2220 }
2221 }
2222
2223 /// Prepare the queue for the `Poll::poll` thread to block in the system
2224 /// selector. This involves changing `head_readiness` to `sleep_marker`.
2225 /// Returns true if successful and `poll` can block.
prepare_for_sleep(&self) -> bool2226 fn prepare_for_sleep(&self) -> bool {
2227 let end_marker = self.inner.end_marker();
2228 let sleep_marker = self.inner.sleep_marker();
2229
2230 let tail = unsafe { *self.inner.tail_readiness.get() };
2231
2232 // If the tail is currently set to the sleep_marker, then check if the
2233 // head is as well. If it is, then the queue is currently ready to
2234 // sleep. If it is not, then the queue is not empty and there should be
2235 // no sleeping.
2236 if tail == sleep_marker {
2237 return self.inner.head_readiness.load(Acquire) == sleep_marker;
2238 }
2239
2240 // If the tail is not currently set to `end_marker`, then the queue is
2241 // not empty.
2242 if tail != end_marker {
2243 return false;
2244 }
2245
2246 // The sleep marker is *not* currently in the readiness queue.
2247 //
2248 // The sleep marker is only inserted in this function. It is also only
2249 // inserted in the tail position. This is guaranteed by first checking
2250 // that the end marker is in the tail position, pushing the sleep marker
2251 // after the end marker, then removing the end marker.
2252 //
2253 // Before inserting a node into the queue, the next pointer has to be
2254 // set to null. Again, this is only safe to do when the node is not
2255 // currently in the queue, but we already have ensured this.
2256 self.inner.sleep_marker.next_readiness.store(ptr::null_mut(), Relaxed);
2257
2258 let actual = self.inner.head_readiness.compare_and_swap(
2259 end_marker, sleep_marker, AcqRel);
2260
2261 debug_assert!(actual != sleep_marker);
2262
2263 if actual != end_marker {
2264 // The readiness queue is not empty
2265 return false;
2266 }
2267
2268 // The current tail should be pointing to `end_marker`
2269 debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
2270 // The `end_marker` next pointer should be null
2271 debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
2272
2273 // Update tail pointer.
2274 unsafe { *self.inner.tail_readiness.get() = sleep_marker; }
2275 true
2276 }
2277 }
2278
2279 impl Drop for ReadinessQueue {
drop(&mut self)2280 fn drop(&mut self) {
2281 // Close the queue by enqueuing the closed node
2282 self.inner.enqueue_node(&*self.inner.closed_marker);
2283
2284 loop {
2285 // Free any nodes that happen to be left in the readiness queue
2286 let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
2287 Dequeue::Empty => break,
2288 Dequeue::Inconsistent => {
2289 // This really shouldn't be possible as all other handles to
2290 // `ReadinessQueueInner` are dropped, but handle this by
2291 // spinning I guess?
2292 continue;
2293 }
2294 Dequeue::Data(ptr) => ptr,
2295 };
2296
2297 let node = unsafe { &*ptr };
2298
2299 let state = node.state.load(Acquire);
2300
2301 debug_assert!(state.is_queued());
2302
2303 release_node(ptr);
2304 }
2305 }
2306 }
2307
2308 impl ReadinessQueueInner {
wakeup(&self) -> io::Result<()>2309 fn wakeup(&self) -> io::Result<()> {
2310 self.awakener.wakeup()
2311 }
2312
2313 /// Prepend the given node to the head of the readiness queue. This is done
2314 /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()>2315 fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
2316 if self.enqueue_node(node) {
2317 self.wakeup()?;
2318 }
2319
2320 Ok(())
2321 }
2322
2323 /// Push the node into the readiness queue
enqueue_node(&self, node: &ReadinessNode) -> bool2324 fn enqueue_node(&self, node: &ReadinessNode) -> bool {
2325 // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
2326 let node_ptr = node as *const _ as *mut _;
2327
2328 // Relaxed used as the ordering is "released" when swapping
2329 // `head_readiness`
2330 node.next_readiness.store(ptr::null_mut(), Relaxed);
2331
2332 unsafe {
2333 let mut prev = self.head_readiness.load(Acquire);
2334
2335 loop {
2336 if prev == self.closed_marker() {
2337 debug_assert!(node_ptr != self.closed_marker());
2338 // debug_assert!(node_ptr != self.end_marker());
2339 debug_assert!(node_ptr != self.sleep_marker());
2340
2341 if node_ptr != self.end_marker() {
2342 // The readiness queue is shutdown, but the enqueue flag was
2343 // set. This means that we are responsible for decrementing
2344 // the ready queue's ref count
2345 debug_assert!(node.ref_count.load(Relaxed) >= 2);
2346 release_node(node_ptr);
2347 }
2348
2349 return false;
2350 }
2351
2352 let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
2353
2354 if prev == act {
2355 break;
2356 }
2357
2358 prev = act;
2359 }
2360
2361 debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
2362
2363 (*prev).next_readiness.store(node_ptr, Release);
2364
2365 prev == self.sleep_marker()
2366 }
2367 }
2368
clear_sleep_marker(&self)2369 fn clear_sleep_marker(&self) {
2370 let end_marker = self.end_marker();
2371 let sleep_marker = self.sleep_marker();
2372
2373 unsafe {
2374 let tail = *self.tail_readiness.get();
2375
2376 if tail != self.sleep_marker() {
2377 return;
2378 }
2379
2380 // The empty markeer is *not* currently in the readiness queue
2381 // (since the sleep markeris).
2382 self.end_marker.next_readiness.store(ptr::null_mut(), Relaxed);
2383
2384 let actual = self.head_readiness.compare_and_swap(
2385 sleep_marker, end_marker, AcqRel);
2386
2387 debug_assert!(actual != end_marker);
2388
2389 if actual != sleep_marker {
2390 // The readiness queue is not empty, we cannot remove the sleep
2391 // markeer
2392 return;
2393 }
2394
2395 // Update the tail pointer.
2396 *self.tail_readiness.get() = end_marker;
2397 }
2398 }
2399
2400 /// Must only be called in `poll` or `drop`
dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue2401 unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
2402 // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
2403 // with the modifications mentioned at the top of the file.
2404 let mut tail = *self.tail_readiness.get();
2405 let mut next = (*tail).next_readiness.load(Acquire);
2406
2407 if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker() {
2408 if next.is_null() {
2409 // Make sure the sleep marker is removed (as we are no longer
2410 // sleeping
2411 self.clear_sleep_marker();
2412
2413 return Dequeue::Empty;
2414 }
2415
2416 *self.tail_readiness.get() = next;
2417 tail = next;
2418 next = (*next).next_readiness.load(Acquire);
2419 }
2420
2421 // Only need to check `until` at this point. `until` is either null,
2422 // which will never match tail OR it is a node that was pushed by
2423 // the current thread. This means that either:
2424 //
2425 // 1) The queue is inconsistent, which is handled explicitly
2426 // 2) We encounter `until` at this point in dequeue
2427 // 3) we will pop a different node
2428 if tail == until {
2429 return Dequeue::Empty;
2430 }
2431
2432 if !next.is_null() {
2433 *self.tail_readiness.get() = next;
2434 return Dequeue::Data(tail);
2435 }
2436
2437 if self.head_readiness.load(Acquire) != tail {
2438 return Dequeue::Inconsistent;
2439 }
2440
2441 // Push the stub node
2442 self.enqueue_node(&*self.end_marker);
2443
2444 next = (*tail).next_readiness.load(Acquire);
2445
2446 if !next.is_null() {
2447 *self.tail_readiness.get() = next;
2448 return Dequeue::Data(tail);
2449 }
2450
2451 Dequeue::Inconsistent
2452 }
2453
end_marker(&self) -> *mut ReadinessNode2454 fn end_marker(&self) -> *mut ReadinessNode {
2455 &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
2456 }
2457
sleep_marker(&self) -> *mut ReadinessNode2458 fn sleep_marker(&self) -> *mut ReadinessNode {
2459 &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
2460 }
2461
closed_marker(&self) -> *mut ReadinessNode2462 fn closed_marker(&self) -> *mut ReadinessNode {
2463 &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
2464 }
2465 }
2466
2467 impl ReadinessNode {
2468 /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
new(queue: *mut (), token: Token, interest: Ready, opt: PollOpt, ref_count: usize) -> ReadinessNode2469 fn new(queue: *mut (),
2470 token: Token,
2471 interest: Ready,
2472 opt: PollOpt,
2473 ref_count: usize) -> ReadinessNode
2474 {
2475 ReadinessNode {
2476 state: AtomicState::new(interest, opt),
2477 // Only the first token is set, the others are initialized to 0
2478 token_0: UnsafeCell::new(token),
2479 token_1: UnsafeCell::new(Token(0)),
2480 token_2: UnsafeCell::new(Token(0)),
2481 next_readiness: AtomicPtr::new(ptr::null_mut()),
2482 update_lock: AtomicBool::new(false),
2483 readiness_queue: AtomicPtr::new(queue),
2484 ref_count: AtomicUsize::new(ref_count),
2485 }
2486 }
2487
marker() -> ReadinessNode2488 fn marker() -> ReadinessNode {
2489 ReadinessNode {
2490 state: AtomicState::new(Ready::empty(), PollOpt::empty()),
2491 token_0: UnsafeCell::new(Token(0)),
2492 token_1: UnsafeCell::new(Token(0)),
2493 token_2: UnsafeCell::new(Token(0)),
2494 next_readiness: AtomicPtr::new(ptr::null_mut()),
2495 update_lock: AtomicBool::new(false),
2496 readiness_queue: AtomicPtr::new(ptr::null_mut()),
2497 ref_count: AtomicUsize::new(0),
2498 }
2499 }
2500
enqueue_with_wakeup(&self) -> io::Result<()>2501 fn enqueue_with_wakeup(&self) -> io::Result<()> {
2502 let queue = self.readiness_queue.load(Acquire);
2503
2504 if queue.is_null() {
2505 // Not associated with a queue, nothing to do
2506 return Ok(());
2507 }
2508
2509 enqueue_with_wakeup(queue, self)
2510 }
2511 }
2512
enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()>2513 fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
2514 debug_assert!(!queue.is_null());
2515 // This is ugly... but we don't want to bump the ref count.
2516 let queue: &Arc<ReadinessQueueInner> = unsafe {
2517 &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>)
2518 };
2519 queue.enqueue_node_with_wakeup(node)
2520 }
2521
token(node: &ReadinessNode, pos: usize) -> Token2522 unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
2523 match pos {
2524 0 => *node.token_0.get(),
2525 1 => *node.token_1.get(),
2526 2 => *node.token_2.get(),
2527 _ => unreachable!(),
2528 }
2529 }
2530
release_node(ptr: *mut ReadinessNode)2531 fn release_node(ptr: *mut ReadinessNode) {
2532 unsafe {
2533 // `AcqRel` synchronizes with other `release_node` functions and ensures
2534 // that the drop happens after any reads / writes on other threads.
2535 if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
2536 return;
2537 }
2538
2539 let node = Box::from_raw(ptr);
2540
2541 // Decrement the readiness_queue Arc
2542 let queue = node.readiness_queue.load(Acquire);
2543
2544 if queue.is_null() {
2545 return;
2546 }
2547
2548 let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
2549 }
2550 }
2551
2552 impl AtomicState {
new(interest: Ready, opt: PollOpt) -> AtomicState2553 fn new(interest: Ready, opt: PollOpt) -> AtomicState {
2554 let state = ReadinessState::new(interest, opt);
2555
2556 AtomicState {
2557 inner: AtomicUsize::new(state.into()),
2558 }
2559 }
2560
2561 /// Loads the current `ReadinessState`
load(&self, order: Ordering) -> ReadinessState2562 fn load(&self, order: Ordering) -> ReadinessState {
2563 self.inner.load(order).into()
2564 }
2565
2566 /// Stores a state if the current state is the same as `current`.
compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState2567 fn compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState {
2568 self.inner.compare_and_swap(current.into(), new.into(), order).into()
2569 }
2570
2571 // Returns `true` if the node should be queued
flag_as_dropped(&self) -> bool2572 fn flag_as_dropped(&self) -> bool {
2573 let prev: ReadinessState = self.inner.fetch_or(DROPPED_MASK | QUEUED_MASK, Release).into();
2574 // The flag should not have been previously set
2575 debug_assert!(!prev.is_dropped());
2576
2577 !prev.is_queued()
2578 }
2579 }
2580
2581 impl ReadinessState {
2582 // Create a `ReadinessState` initialized with the provided arguments
2583 #[inline]
new(interest: Ready, opt: PollOpt) -> ReadinessState2584 fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
2585 let interest = event::ready_as_usize(interest);
2586 let opt = event::opt_as_usize(opt);
2587
2588 debug_assert!(interest <= MASK_4);
2589 debug_assert!(opt <= MASK_4);
2590
2591 let mut val = interest << INTEREST_SHIFT;
2592 val |= opt << POLL_OPT_SHIFT;
2593
2594 ReadinessState(val)
2595 }
2596
2597 #[inline]
get(self, mask: usize, shift: usize) -> usize2598 fn get(self, mask: usize, shift: usize) -> usize{
2599 (self.0 >> shift) & mask
2600 }
2601
2602 #[inline]
set(&mut self, val: usize, mask: usize, shift: usize)2603 fn set(&mut self, val: usize, mask: usize, shift: usize) {
2604 self.0 = (self.0 & !(mask << shift)) | (val << shift)
2605 }
2606
2607 /// Get the readiness
2608 #[inline]
readiness(self) -> Ready2609 fn readiness(self) -> Ready {
2610 let v = self.get(MASK_4, READINESS_SHIFT);
2611 event::ready_from_usize(v)
2612 }
2613
2614 #[inline]
effective_readiness(self) -> Ready2615 fn effective_readiness(self) -> Ready {
2616 self.readiness() & self.interest()
2617 }
2618
2619 /// Set the readiness
2620 #[inline]
set_readiness(&mut self, v: Ready)2621 fn set_readiness(&mut self, v: Ready) {
2622 self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
2623 }
2624
2625 /// Get the interest
2626 #[inline]
interest(self) -> Ready2627 fn interest(self) -> Ready {
2628 let v = self.get(MASK_4, INTEREST_SHIFT);
2629 event::ready_from_usize(v)
2630 }
2631
2632 /// Set the interest
2633 #[inline]
set_interest(&mut self, v: Ready)2634 fn set_interest(&mut self, v: Ready) {
2635 self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
2636 }
2637
2638 #[inline]
disarm(&mut self)2639 fn disarm(&mut self) {
2640 self.set_interest(Ready::empty());
2641 }
2642
2643 /// Get the poll options
2644 #[inline]
poll_opt(self) -> PollOpt2645 fn poll_opt(self) -> PollOpt {
2646 let v = self.get(MASK_4, POLL_OPT_SHIFT);
2647 event::opt_from_usize(v)
2648 }
2649
2650 /// Set the poll options
2651 #[inline]
set_poll_opt(&mut self, v: PollOpt)2652 fn set_poll_opt(&mut self, v: PollOpt) {
2653 self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
2654 }
2655
2656 #[inline]
is_queued(self) -> bool2657 fn is_queued(self) -> bool {
2658 self.0 & QUEUED_MASK == QUEUED_MASK
2659 }
2660
2661 /// Set the queued flag
2662 #[inline]
set_queued(&mut self)2663 fn set_queued(&mut self) {
2664 // Dropped nodes should never be queued
2665 debug_assert!(!self.is_dropped());
2666 self.0 |= QUEUED_MASK;
2667 }
2668
2669 #[inline]
set_dequeued(&mut self)2670 fn set_dequeued(&mut self) {
2671 debug_assert!(self.is_queued());
2672 self.0 &= !QUEUED_MASK
2673 }
2674
2675 #[inline]
is_dropped(self) -> bool2676 fn is_dropped(self) -> bool {
2677 self.0 & DROPPED_MASK == DROPPED_MASK
2678 }
2679
2680 #[inline]
token_read_pos(self) -> usize2681 fn token_read_pos(self) -> usize {
2682 self.get(MASK_2, TOKEN_RD_SHIFT)
2683 }
2684
2685 #[inline]
token_write_pos(self) -> usize2686 fn token_write_pos(self) -> usize {
2687 self.get(MASK_2, TOKEN_WR_SHIFT)
2688 }
2689
2690 #[inline]
next_token_pos(self) -> usize2691 fn next_token_pos(self) -> usize {
2692 let rd = self.token_read_pos();
2693 let wr = self.token_write_pos();
2694
2695 match wr {
2696 0 => {
2697 match rd {
2698 1 => 2,
2699 2 => 1,
2700 0 => 1,
2701 _ => unreachable!(),
2702 }
2703 }
2704 1 => {
2705 match rd {
2706 0 => 2,
2707 2 => 0,
2708 1 => 2,
2709 _ => unreachable!(),
2710 }
2711 }
2712 2 => {
2713 match rd {
2714 0 => 1,
2715 1 => 0,
2716 2 => 0,
2717 _ => unreachable!(),
2718 }
2719 }
2720 _ => unreachable!(),
2721 }
2722 }
2723
2724 #[inline]
set_token_write_pos(&mut self, val: usize)2725 fn set_token_write_pos(&mut self, val: usize) {
2726 self.set(val, MASK_2, TOKEN_WR_SHIFT);
2727 }
2728
2729 #[inline]
update_token_read_pos(&mut self)2730 fn update_token_read_pos(&mut self) {
2731 let val = self.token_write_pos();
2732 self.set(val, MASK_2, TOKEN_RD_SHIFT);
2733 }
2734 }
2735
2736 impl From<ReadinessState> for usize {
from(src: ReadinessState) -> usize2737 fn from(src: ReadinessState) -> usize {
2738 src.0
2739 }
2740 }
2741
2742 impl From<usize> for ReadinessState {
from(src: usize) -> ReadinessState2743 fn from(src: usize) -> ReadinessState {
2744 ReadinessState(src)
2745 }
2746 }
2747
is_send<T: Send>()2748 fn is_send<T: Send>() {}
is_sync<T: Sync>()2749 fn is_sync<T: Sync>() {}
2750
2751 impl SelectorId {
new() -> SelectorId2752 pub fn new() -> SelectorId {
2753 SelectorId {
2754 id: AtomicUsize::new(0),
2755 }
2756 }
2757
associate_selector(&self, poll: &Poll) -> io::Result<()>2758 pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
2759 let selector_id = self.id.load(Ordering::SeqCst);
2760
2761 if selector_id != 0 && selector_id != poll.selector.id() {
2762 Err(io::Error::new(io::ErrorKind::Other, "socket already registered"))
2763 } else {
2764 self.id.store(poll.selector.id(), Ordering::SeqCst);
2765 Ok(())
2766 }
2767 }
2768 }
2769
2770 impl Clone for SelectorId {
clone(&self) -> SelectorId2771 fn clone(&self) -> SelectorId {
2772 SelectorId {
2773 id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
2774 }
2775 }
2776 }
2777
2778 #[test]
2779 #[cfg(all(unix, not(target_os = "fuchsia")))]
as_raw_fd()2780 pub fn as_raw_fd() {
2781 let poll = Poll::new().unwrap();
2782 assert!(poll.as_raw_fd() > 0);
2783 }
2784