1 //! A backend module for implementing the iterator like 2 //! [`iterator`][crate::iterator] module and the asynchronous 3 //! adapter crates. 4 //! 5 //! This module contains generic types which abstract over the concrete 6 //! IO type for the self-pipe. The motivation for having this abstraction 7 //! are the adapter crates for different asynchronous runtimes. The runtimes 8 //! provide their own wrappers for [`std::os::unix::net::UnixStream`] 9 //! which should be used as the internal self pipe. But large parts of the 10 //! remaining functionality doesn't depend directly onto the IO type and can 11 //! be reused. 12 //! 13 //! See also the [`SignalDelivery::with_pipe`] method for more information 14 //! about requirements the IO types have to fulfill. 15 //! 16 //! As a regular user you shouldn't need to use the types in this module. 17 //! Use the [`Signals`][crate::iterator::Signals] struct or one of the types 18 //! contained in the adapter libraries instead. 19 20 use std::borrow::{Borrow, BorrowMut}; 21 use std::fmt::{Debug, Formatter, Result as FmtResult}; 22 use std::io::Error; 23 use std::mem::MaybeUninit; 24 use std::os::unix::io::AsRawFd; 25 use std::ptr; 26 use std::sync::atomic::{AtomicBool, Ordering}; 27 use std::sync::{Arc, Mutex}; 28 29 use libc::{self, c_int}; 30 31 use super::exfiltrator::Exfiltrator; 32 use crate::low_level::pipe::{self, WakeMethod}; 33 use crate::SigId; 34 35 /// Maximal signal number we support. 36 const MAX_SIGNUM: usize = 128; 37 38 trait SelfPipeWrite: Debug + Send + Sync { wake_readers(&self)39 fn wake_readers(&self); 40 } 41 42 impl<W: AsRawFd + Debug + Send + Sync> SelfPipeWrite for W { wake_readers(&self)43 fn wake_readers(&self) { 44 pipe::wake(self.as_raw_fd(), WakeMethod::Send); 45 } 46 } 47 48 #[derive(Debug)] 49 struct DeliveryState { 50 closed: AtomicBool, 51 registered_signal_ids: Mutex<Vec<Option<SigId>>>, 52 } 53 54 impl DeliveryState { new() -> Self55 fn new() -> Self { 56 let ids = (0..MAX_SIGNUM).map(|_| None).collect(); 57 Self { 58 closed: AtomicBool::new(false), 59 registered_signal_ids: Mutex::new(ids), 60 } 61 } 62 } 63 64 impl Drop for DeliveryState { drop(&mut self)65 fn drop(&mut self) { 66 let lock = self.registered_signal_ids.lock().unwrap(); 67 for id in lock.iter().filter_map(|s| *s) { 68 crate::low_level::unregister(id); 69 } 70 } 71 } 72 73 struct PendingSignals<E: Exfiltrator> { 74 exfiltrator: E, 75 slots: [E::Storage; MAX_SIGNUM], 76 } 77 78 impl<E: Exfiltrator> PendingSignals<E> { new(exfiltrator: E) -> Self79 fn new(exfiltrator: E) -> Self { 80 // Unfortunately, Default is not implemented for long arrays :-( 81 // 82 // Note that if the default impl panics, the already existing instances are leaked. 83 let mut slots = MaybeUninit::<[E::Storage; MAX_SIGNUM]>::uninit(); 84 for i in 0..MAX_SIGNUM { 85 unsafe { 86 let slot: *mut E::Storage = slots.as_mut_ptr() as *mut _; 87 let slot = slot.add(i); 88 ptr::write(slot, E::Storage::default()); 89 } 90 } 91 92 Self { 93 exfiltrator, 94 slots: unsafe { slots.assume_init() }, 95 } 96 } 97 } 98 99 /// An internal trait to hide adding new signals into a Handle behind a dynamic dispatch. 100 trait AddSignal: Debug + Send + Sync { add_signal( self: Arc<Self>, write: Arc<dyn SelfPipeWrite>, signal: c_int, ) -> Result<SigId, Error>101 fn add_signal( 102 self: Arc<Self>, 103 write: Arc<dyn SelfPipeWrite>, 104 signal: c_int, 105 ) -> Result<SigId, Error>; 106 } 107 108 // Implemented manually because 1.36.0 doesn't yet support Debug for [X; BIG_NUMBER]. 109 impl<E: Exfiltrator> Debug for PendingSignals<E> { fmt(&self, fmt: &mut Formatter) -> FmtResult110 fn fmt(&self, fmt: &mut Formatter) -> FmtResult { 111 fmt.debug_struct("PendingSignals") 112 .field("exfiltrator", &self.exfiltrator) 113 // While the array does not, the slice does implement Debug 114 .field("slots", &&self.slots[..]) 115 .finish() 116 } 117 } 118 119 impl<E: Exfiltrator> AddSignal for PendingSignals<E> { add_signal( self: Arc<Self>, write: Arc<dyn SelfPipeWrite>, signal: c_int, ) -> Result<SigId, Error>120 fn add_signal( 121 self: Arc<Self>, 122 write: Arc<dyn SelfPipeWrite>, 123 signal: c_int, 124 ) -> Result<SigId, Error> { 125 assert!(signal >= 0); 126 assert!( 127 (signal as usize) < MAX_SIGNUM, 128 "Signal number {} too large. If your OS really supports such signal, file a bug", 129 signal, 130 ); 131 assert!( 132 self.exfiltrator.supports_signal(signal), 133 "Signal {} not supported by exfiltrator {:?}", 134 signal, 135 self.exfiltrator, 136 ); 137 self.exfiltrator.init(&self.slots[signal as usize], signal); 138 139 let action = move |act: &_| { 140 let slot = &self.slots[signal as usize]; 141 let ex = &self.exfiltrator; 142 ex.store(slot, signal, act); 143 write.wake_readers(); 144 }; 145 let id = unsafe { signal_hook_registry::register_sigaction(signal, action) }?; 146 Ok(id) 147 } 148 } 149 150 /// A struct to control an instance of an associated type 151 /// (like for example [`Signals`][super::Signals]). 152 /// 153 /// It allows to register more signal handlers and to shutdown the signal 154 /// delivery. You can [`clone`][Handle::clone] this type which isn't a 155 /// very expensive operation. The cloned instances can be shared between 156 /// multiple threads. 157 #[derive(Debug, Clone)] 158 pub struct Handle { 159 pending: Arc<dyn AddSignal>, 160 write: Arc<dyn SelfPipeWrite>, 161 delivery_state: Arc<DeliveryState>, 162 } 163 164 impl Handle { new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self where W: 'static + SelfPipeWrite,165 fn new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self 166 where 167 W: 'static + SelfPipeWrite, 168 { 169 Self { 170 pending, 171 write: Arc::new(write), 172 delivery_state: Arc::new(DeliveryState::new()), 173 } 174 } 175 176 /// Registers another signal to the set watched by the associated instance. 177 /// 178 /// # Notes 179 /// 180 /// * This is safe to call concurrently from whatever thread. 181 /// * This is *not* safe to call from within a signal handler. 182 /// * If the signal number was already registered previously, this is a no-op. 183 /// * If this errors, the original set of signals is left intact. 184 /// 185 /// # Panics 186 /// 187 /// * If the given signal is [forbidden][crate::FORBIDDEN]. 188 /// * If the signal number is negative or larger than internal limit. The limit should be 189 /// larger than any supported signal the OS supports. 190 /// * If the relevant [`Exfiltrator`] does not support this particular signal. The default 191 /// [`SignalOnly`] one supports all signals. add_signal(&self, signal: c_int) -> Result<(), Error>192 pub fn add_signal(&self, signal: c_int) -> Result<(), Error> { 193 let mut lock = self.delivery_state.registered_signal_ids.lock().unwrap(); 194 // Already registered, ignoring 195 if lock[signal as usize].is_some() { 196 return Ok(()); 197 } 198 199 let id = Arc::clone(&self.pending).add_signal(Arc::clone(&self.write), signal)?; 200 201 lock[signal as usize] = Some(id); 202 203 Ok(()) 204 } 205 206 /// Closes the associated instance. 207 /// 208 /// This is meant to signalize termination of the signal delivery process. 209 /// After calling close: 210 /// 211 /// * [`is_closed`][Handle::is_closed] will return true. 212 /// * All currently blocking operations of associated instances 213 /// are interrupted and terminate. 214 /// * Any further operations will not block. 215 /// * Further signals may or may not be returned from the iterators. However, if any are 216 /// returned, these are real signals that happened. 217 /// 218 /// The goal is to be able to shut down any background thread that handles only the signals. close(&self)219 pub fn close(&self) { 220 self.delivery_state.closed.store(true, Ordering::SeqCst); 221 self.write.wake_readers(); 222 } 223 224 /// Is it closed? 225 /// 226 /// See [`close`][Handle::close]. is_closed(&self) -> bool227 pub fn is_closed(&self) -> bool { 228 self.delivery_state.closed.load(Ordering::SeqCst) 229 } 230 } 231 232 /// A struct for delivering received signals to the main program flow. 233 /// The self-pipe IO type is generic. See the 234 /// [`with_pipe`][SignalDelivery::with_pipe] method for requirements 235 /// for the IO type. 236 #[derive(Debug)] 237 pub struct SignalDelivery<R, E: Exfiltrator> { 238 read: R, 239 handle: Handle, 240 pending: Arc<PendingSignals<E>>, 241 } 242 243 impl<R, E: Exfiltrator> SignalDelivery<R, E> 244 where 245 R: 'static + AsRawFd + Send + Sync, 246 { 247 /// Creates the `SignalDelivery` structure. 248 /// 249 /// The read and write arguments must be the ends of a suitable pipe type. These are used 250 /// for communication between the signal handler and main program flow. 251 /// 252 /// Registers all the signals listed. The same restrictions (panics, errors) apply as with 253 /// [`add_signal`][Handle::add_signal]. 254 /// 255 /// # Requirements for the pipe type 256 /// 257 /// * Must support [`send`](https://man7.org/linux/man-pages/man2/send.2.html) for 258 /// asynchronously writing bytes to the write end 259 /// * Must support [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) for 260 /// reading bytes from the read end 261 /// 262 /// So UnixStream is a good choice for this. with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error> where I: IntoIterator<Item = S>, S: Borrow<c_int>, W: 'static + AsRawFd + Debug + Send + Sync,263 pub fn with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error> 264 where 265 I: IntoIterator<Item = S>, 266 S: Borrow<c_int>, 267 W: 'static + AsRawFd + Debug + Send + Sync, 268 { 269 let pending = Arc::new(PendingSignals::new(exfiltrator)); 270 let pending_add_signal = Arc::clone(&pending); 271 let handle = Handle::new(write, pending_add_signal); 272 let me = Self { 273 read, 274 handle, 275 pending, 276 }; 277 for sig in signals { 278 me.handle.add_signal(*sig.borrow())?; 279 } 280 Ok(me) 281 } 282 283 /// Get a reference to the read end of the self pipe 284 /// 285 /// You may use this method to register the underlying file descriptor 286 /// with an eventing system (e. g. epoll) to get notified if there are 287 /// bytes in the pipe. If the event system reports the file descriptor 288 /// ready for reading you can then call [`pending`][SignalDelivery::pending] 289 /// to get the arrived signals. get_read(&self) -> &R290 pub fn get_read(&self) -> &R { 291 &self.read 292 } 293 294 /// Get a mutable reference to the read end of the self pipe 295 /// 296 /// See the [`get_read`][SignalDelivery::get_read] method for some additional 297 /// information. get_read_mut(&mut self) -> &mut R298 pub fn get_read_mut(&mut self) -> &mut R { 299 &mut self.read 300 } 301 302 /// Drains all data from the internal self-pipe. This method will never block. flush(&mut self)303 fn flush(&mut self) { 304 const SIZE: usize = 1024; 305 let mut buff = [0u8; SIZE]; 306 307 unsafe { 308 // Draining the data in the self pipe. We ignore all errors on purpose. This 309 // should not be something like closed file descriptor. It could EAGAIN, but 310 // that's OK in case we say MSG_DONTWAIT. If it's EINTR, then it's OK too, 311 // it'll only create a spurious wakeup. 312 while libc::recv( 313 self.read.as_raw_fd(), 314 buff.as_mut_ptr() as *mut libc::c_void, 315 SIZE, 316 libc::MSG_DONTWAIT, 317 ) > 0 318 {} 319 } 320 } 321 322 /// Returns an iterator of already received signals. 323 /// 324 /// This returns an iterator over all the signal numbers of the signals received since last 325 /// time they were read (out of the set registered by this `SignalDelivery` instance). Note 326 /// that they are returned in arbitrary order and a signal number is returned only once even 327 /// if it was received multiple times. 328 /// 329 /// This method returns immediately (does not block) and may produce an empty iterator if 330 /// there are no signals ready. pending(&mut self) -> Pending<E>331 pub fn pending(&mut self) -> Pending<E> { 332 self.flush(); 333 Pending::new(Arc::clone(&self.pending)) 334 } 335 336 /// Checks the reading end of the self pipe for available signals. 337 /// 338 /// If there are no signals available or this instance was already closed it returns 339 /// [`Option::None`]. If there are some signals it returns a [`Pending`] 340 /// instance wrapped inside a [`Option::Some`]. However, due to implementation details, 341 /// this still can produce an empty iterator. 342 /// 343 /// This method doesn't check the reading end by itself but uses the passed in callback. 344 /// This method blocks if and only if the callback blocks trying to read some bytes. poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error> where F: FnMut(&mut R) -> Result<bool, Error>,345 pub fn poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error> 346 where 347 F: FnMut(&mut R) -> Result<bool, Error>, 348 { 349 if self.handle.is_closed() { 350 return Ok(None); 351 } 352 353 match has_signals(self.get_read_mut()) { 354 Ok(false) => Ok(None), 355 Ok(true) => Ok(Some(self.pending())), 356 Err(err) => Err(err), 357 } 358 } 359 360 /// Get a [`Handle`] for this `SignalDelivery` instance. 361 /// 362 /// This can be used to add further signals or close the whole 363 /// signal delivery mechanism. handle(&self) -> Handle364 pub fn handle(&self) -> Handle { 365 self.handle.clone() 366 } 367 } 368 369 /// The iterator of one batch of signals. 370 /// 371 /// This is returned by the [`pending`][SignalDelivery::pending] method. 372 #[derive(Debug)] 373 pub struct Pending<E: Exfiltrator> { 374 pending: Arc<PendingSignals<E>>, 375 position: usize, 376 } 377 378 impl<E: Exfiltrator> Pending<E> { new(pending: Arc<PendingSignals<E>>) -> Self379 fn new(pending: Arc<PendingSignals<E>>) -> Self { 380 Self { 381 pending, 382 position: 0, 383 } 384 } 385 } 386 387 impl<E: Exfiltrator> Iterator for Pending<E> { 388 type Item = E::Output; 389 next(&mut self) -> Option<E::Output>390 fn next(&mut self) -> Option<E::Output> { 391 while self.position < self.pending.slots.len() { 392 let sig = self.position; 393 let slot = &self.pending.slots[sig]; 394 let result = self.pending.exfiltrator.load(slot, sig as c_int); 395 if result.is_some() { 396 return result; 397 } else { 398 self.position += 1; 399 } 400 } 401 402 None 403 } 404 } 405 406 /// Possible results of the [`poll_signal`][SignalIterator::poll_signal] function. 407 pub enum PollResult<O> { 408 /// A signal arrived 409 Signal(O), 410 /// There are no signals yet but there may arrive some in the future 411 Pending, 412 /// The iterator was closed. There won't be any signals reported from now on. 413 Closed, 414 /// An error happened during polling for arrived signals. 415 Err(Error), 416 } 417 418 /// An infinite iterator of received signals. 419 pub struct SignalIterator<SD, E: Exfiltrator> { 420 signals: SD, 421 iter: Pending<E>, 422 } 423 424 impl<SD, E: Exfiltrator> SignalIterator<SD, E> { 425 /// Create a new infinite iterator for signals registered with the passed 426 /// in [`SignalDelivery`] instance. new<R>(mut signals: SD) -> Self where SD: BorrowMut<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync,427 pub fn new<R>(mut signals: SD) -> Self 428 where 429 SD: BorrowMut<SignalDelivery<R, E>>, 430 R: 'static + AsRawFd + Send + Sync, 431 { 432 let iter = signals.borrow_mut().pending(); 433 Self { signals, iter } 434 } 435 436 /// Return a signal if there is one or tell the caller that there is none at the moment. 437 /// 438 /// You have to pass in a callback which checks the underlying reading end of the pipe if 439 /// there may be any pending signals. This callback may or may not block. If the callback 440 /// returns [`true`] this method will try to fetch the next signal and return it as a 441 /// [`PollResult::Signal`]. If the callback returns [`false`] the method will return 442 /// [`PollResult::Pending`] and assume it will be called again at a later point in time. 443 /// The callback may be called any number of times by this function. 444 /// 445 /// If the iterator was closed by the [`close`][Handle::close] method of the associtated 446 /// [`Handle`] this method will return [`PollResult::Closed`]. poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output> where SD: BorrowMut<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync, F: FnMut(&mut R) -> Result<bool, Error>,447 pub fn poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output> 448 where 449 SD: BorrowMut<SignalDelivery<R, E>>, 450 R: 'static + AsRawFd + Send + Sync, 451 F: FnMut(&mut R) -> Result<bool, Error>, 452 { 453 // The loop is necessary because it is possible that a signal was already consumed 454 // by a previous pending iterator due to the asynchronous nature of signals and 455 // always moving to the end of the iterator before calling has_more. 456 while !self.signals.borrow_mut().handle.is_closed() { 457 if let Some(result) = self.iter.next() { 458 return PollResult::Signal(result); 459 } 460 461 match self.signals.borrow_mut().poll_pending(has_signals) { 462 Ok(Some(pending)) => self.iter = pending, 463 Ok(None) => return PollResult::Pending, 464 Err(err) => return PollResult::Err(err), 465 } 466 } 467 468 PollResult::Closed 469 } 470 471 /// Get a shareable [`Handle`] for this instance. 472 /// 473 /// This can be used to add further signals or terminate the whole 474 /// signal iteration using the [`close`][Handle::close] method. handle<R>(&self) -> Handle where SD: Borrow<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync,475 pub fn handle<R>(&self) -> Handle 476 where 477 SD: Borrow<SignalDelivery<R, E>>, 478 R: 'static + AsRawFd + Send + Sync, 479 { 480 self.signals.borrow().handle() 481 } 482 } 483 484 /// A signal iterator which consumes a [`SignalDelivery`] instance and takes 485 /// ownership of it. 486 pub type OwningSignalIterator<R, E> = SignalIterator<SignalDelivery<R, E>, E>; 487 488 /// A signal iterator which takes a mutable reference to a [`SignalDelivery`] 489 /// instance. 490 pub type RefSignalIterator<'a, R, E> = SignalIterator<&'a mut SignalDelivery<R, E>, E>; 491