1 #[cfg(any(target_os = "linux", target_os = "android"))] 2 mod eventfd { 3 use crate::sys::Selector; 4 use crate::{Interest, Token}; 5 6 use std::fs::File; 7 use std::io::{self, Read, Write}; 8 use std::os::unix::io::FromRawFd; 9 10 /// Waker backed by `eventfd`. 11 /// 12 /// `eventfd` is effectively an 64 bit counter. All writes must be of 8 13 /// bytes (64 bits) and are converted (native endian) into an 64 bit 14 /// unsigned integer and added to the count. Reads must also be 8 bytes and 15 /// reset the count to 0, returning the count. 16 #[derive(Debug)] 17 pub struct Waker { 18 fd: File, 19 } 20 21 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>22 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 23 syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| { 24 // Turn the file descriptor into a file first so we're ensured 25 // it's closed when dropped, e.g. when register below fails. 26 let file = unsafe { File::from_raw_fd(fd) }; 27 selector 28 .register(fd, token, Interest::READABLE) 29 .map(|()| Waker { fd: file }) 30 }) 31 } 32 wake(&self) -> io::Result<()>33 pub fn wake(&self) -> io::Result<()> { 34 let buf: [u8; 8] = 1u64.to_ne_bytes(); 35 match (&self.fd).write(&buf) { 36 Ok(_) => Ok(()), 37 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { 38 // Writing only blocks if the counter is going to overflow. 39 // So we'll reset the counter to 0 and wake it again. 40 self.reset()?; 41 self.wake() 42 } 43 Err(err) => Err(err), 44 } 45 } 46 47 /// Reset the eventfd object, only need to call this if `wake` fails. reset(&self) -> io::Result<()>48 fn reset(&self) -> io::Result<()> { 49 let mut buf: [u8; 8] = 0u64.to_ne_bytes(); 50 match (&self.fd).read(&mut buf) { 51 Ok(_) => Ok(()), 52 // If the `Waker` hasn't been awoken yet this will return a 53 // `WouldBlock` error which we can safely ignore. 54 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()), 55 Err(err) => Err(err), 56 } 57 } 58 } 59 } 60 61 #[cfg(any(target_os = "linux", target_os = "android"))] 62 pub use self::eventfd::Waker; 63 64 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] 65 mod kqueue { 66 use crate::sys::Selector; 67 use crate::Token; 68 69 use std::io; 70 71 /// Waker backed by kqueue user space notifications (`EVFILT_USER`). 72 /// 73 /// The implementation is fairly simple, first the kqueue must be setup to 74 /// receive waker events this done by calling `Selector.setup_waker`. Next 75 /// we need access to kqueue, thus we need to duplicate the file descriptor. 76 /// Now waking is as simple as adding an event to the kqueue. 77 #[derive(Debug)] 78 pub struct Waker { 79 selector: Selector, 80 token: Token, 81 } 82 83 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>84 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 85 selector.try_clone().and_then(|selector| { 86 selector 87 .setup_waker(token) 88 .map(|()| Waker { selector, token }) 89 }) 90 } 91 wake(&self) -> io::Result<()>92 pub fn wake(&self) -> io::Result<()> { 93 self.selector.wake(self.token) 94 } 95 } 96 } 97 98 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] 99 pub use self::kqueue::Waker; 100 101 #[cfg(any( 102 target_os = "dragonfly", 103 target_os = "illumos", 104 target_os = "netbsd", 105 target_os = "openbsd", 106 target_os = "solaris" 107 ))] 108 mod pipe { 109 use crate::sys::unix::Selector; 110 use crate::{Interest, Token}; 111 112 use std::fs::File; 113 use std::io::{self, Read, Write}; 114 use std::os::unix::io::FromRawFd; 115 116 /// Waker backed by a unix pipe. 117 /// 118 /// Waker controls both the sending and receiving ends and empties the pipe 119 /// if writing to it (waking) fails. 120 #[derive(Debug)] 121 pub struct Waker { 122 sender: File, 123 receiver: File, 124 } 125 126 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>127 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 128 let mut fds = [-1; 2]; 129 syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; 130 // Turn the file descriptors into files first so we're ensured 131 // they're closed when dropped, e.g. when register below fails. 132 let sender = unsafe { File::from_raw_fd(fds[1]) }; 133 let receiver = unsafe { File::from_raw_fd(fds[0]) }; 134 selector 135 .register(fds[0], token, Interest::READABLE) 136 .map(|()| Waker { sender, receiver }) 137 } 138 wake(&self) -> io::Result<()>139 pub fn wake(&self) -> io::Result<()> { 140 // The epoll emulation on some illumos systems currently requires 141 // the pipe buffer to be completely empty for an edge-triggered 142 // wakeup on the pipe read side. 143 #[cfg(target_os = "illumos")] 144 self.empty(); 145 146 match (&self.sender).write(&[1]) { 147 Ok(_) => Ok(()), 148 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { 149 // The reading end is full so we'll empty the buffer and try 150 // again. 151 self.empty(); 152 self.wake() 153 } 154 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), 155 Err(err) => Err(err), 156 } 157 } 158 159 /// Empty the pipe's buffer, only need to call this if `wake` fails. 160 /// This ignores any errors. empty(&self)161 fn empty(&self) { 162 let mut buf = [0; 4096]; 163 loop { 164 match (&self.receiver).read(&mut buf) { 165 Ok(n) if n > 0 => continue, 166 _ => return, 167 } 168 } 169 } 170 } 171 } 172 173 #[cfg(any( 174 target_os = "dragonfly", 175 target_os = "illumos", 176 target_os = "netbsd", 177 target_os = "openbsd", 178 target_os = "solaris" 179 ))] 180 pub use self::pipe::Waker; 181