1 #[cfg(any(target_os = "linux", target_os = "android"))]
2 mod eventfd {
3     use crate::sys::Selector;
4     use crate::{Interest, Token};
5 
6     use std::fs::File;
7     use std::io::{self, Read, Write};
8     use std::os::unix::io::FromRawFd;
9 
10     /// Waker backed by `eventfd`.
11     ///
12     /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
13     /// bytes (64 bits) and are converted (native endian) into an 64 bit
14     /// unsigned integer and added to the count. Reads must also be 8 bytes and
15     /// reset the count to 0, returning the count.
16     #[derive(Debug)]
17     pub struct Waker {
18         fd: File,
19     }
20 
21     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>22         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
23             syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
24                 // Turn the file descriptor into a file first so we're ensured
25                 // it's closed when dropped, e.g. when register below fails.
26                 let file = unsafe { File::from_raw_fd(fd) };
27                 selector
28                     .register(fd, token, Interest::READABLE)
29                     .map(|()| Waker { fd: file })
30             })
31         }
32 
wake(&self) -> io::Result<()>33         pub fn wake(&self) -> io::Result<()> {
34             let buf: [u8; 8] = 1u64.to_ne_bytes();
35             match (&self.fd).write(&buf) {
36                 Ok(_) => Ok(()),
37                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
38                     // Writing only blocks if the counter is going to overflow.
39                     // So we'll reset the counter to 0 and wake it again.
40                     self.reset()?;
41                     self.wake()
42                 }
43                 Err(err) => Err(err),
44             }
45         }
46 
47         /// Reset the eventfd object, only need to call this if `wake` fails.
reset(&self) -> io::Result<()>48         fn reset(&self) -> io::Result<()> {
49             let mut buf: [u8; 8] = 0u64.to_ne_bytes();
50             match (&self.fd).read(&mut buf) {
51                 Ok(_) => Ok(()),
52                 // If the `Waker` hasn't been awoken yet this will return a
53                 // `WouldBlock` error which we can safely ignore.
54                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
55                 Err(err) => Err(err),
56             }
57         }
58     }
59 }
60 
61 #[cfg(any(target_os = "linux", target_os = "android"))]
62 pub use self::eventfd::Waker;
63 
64 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
65 mod kqueue {
66     use crate::sys::Selector;
67     use crate::Token;
68 
69     use std::io;
70 
71     /// Waker backed by kqueue user space notifications (`EVFILT_USER`).
72     ///
73     /// The implementation is fairly simple, first the kqueue must be setup to
74     /// receive waker events this done by calling `Selector.setup_waker`. Next
75     /// we need access to kqueue, thus we need to duplicate the file descriptor.
76     /// Now waking is as simple as adding an event to the kqueue.
77     #[derive(Debug)]
78     pub struct Waker {
79         selector: Selector,
80         token: Token,
81     }
82 
83     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>84         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
85             selector.try_clone().and_then(|selector| {
86                 selector
87                     .setup_waker(token)
88                     .map(|()| Waker { selector, token })
89             })
90         }
91 
wake(&self) -> io::Result<()>92         pub fn wake(&self) -> io::Result<()> {
93             self.selector.wake(self.token)
94         }
95     }
96 }
97 
98 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
99 pub use self::kqueue::Waker;
100 
101 #[cfg(any(
102     target_os = "dragonfly",
103     target_os = "illumos",
104     target_os = "netbsd",
105     target_os = "openbsd",
106     target_os = "solaris"
107 ))]
108 mod pipe {
109     use crate::sys::unix::Selector;
110     use crate::{Interest, Token};
111 
112     use std::fs::File;
113     use std::io::{self, Read, Write};
114     use std::os::unix::io::FromRawFd;
115 
116     /// Waker backed by a unix pipe.
117     ///
118     /// Waker controls both the sending and receiving ends and empties the pipe
119     /// if writing to it (waking) fails.
120     #[derive(Debug)]
121     pub struct Waker {
122         sender: File,
123         receiver: File,
124     }
125 
126     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>127         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
128             let mut fds = [-1; 2];
129             syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
130             // Turn the file descriptors into files first so we're ensured
131             // they're closed when dropped, e.g. when register below fails.
132             let sender = unsafe { File::from_raw_fd(fds[1]) };
133             let receiver = unsafe { File::from_raw_fd(fds[0]) };
134             selector
135                 .register(fds[0], token, Interest::READABLE)
136                 .map(|()| Waker { sender, receiver })
137         }
138 
wake(&self) -> io::Result<()>139         pub fn wake(&self) -> io::Result<()> {
140             // The epoll emulation on some illumos systems currently requires
141             // the pipe buffer to be completely empty for an edge-triggered
142             // wakeup on the pipe read side.
143             #[cfg(target_os = "illumos")]
144             self.empty();
145 
146             match (&self.sender).write(&[1]) {
147                 Ok(_) => Ok(()),
148                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
149                     // The reading end is full so we'll empty the buffer and try
150                     // again.
151                     self.empty();
152                     self.wake()
153                 }
154                 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
155                 Err(err) => Err(err),
156             }
157         }
158 
159         /// Empty the pipe's buffer, only need to call this if `wake` fails.
160         /// This ignores any errors.
empty(&self)161         fn empty(&self) {
162             let mut buf = [0; 4096];
163             loop {
164                 match (&self.receiver).read(&mut buf) {
165                     Ok(n) if n > 0 => continue,
166                     _ => return,
167                 }
168             }
169         }
170     }
171 }
172 
173 #[cfg(any(
174     target_os = "dragonfly",
175     target_os = "illumos",
176     target_os = "netbsd",
177     target_os = "openbsd",
178     target_os = "solaris"
179 ))]
180 pub use self::pipe::Waker;
181