1 //! A backend module for implementing the iterator like
2 //! [`iterator`][crate::iterator] module and the asynchronous
3 //! adapter crates.
4 //!
5 //! This module contains generic types which abstract over the concrete
6 //! IO type for the self-pipe. The motivation for having this abstraction
7 //! are the adapter crates for different asynchronous runtimes. The runtimes
8 //! provide their own wrappers for [`std::os::unix::net::UnixStream`]
9 //! which should be used as the internal self pipe. But large parts of the
10 //! remaining functionality doesn't depend directly onto the IO type and can
11 //! be reused.
12 //!
13 //! See also the [`SignalDelivery::with_pipe`] method for more information
14 //! about requirements the IO types have to fulfill.
15 //!
16 //! As a regular user you shouldn't need to use the types in this module.
17 //! Use the [`Signals`][crate::iterator::Signals] struct or one of the types
18 //! contained in the adapter libraries instead.
19 
20 use std::borrow::{Borrow, BorrowMut};
21 use std::fmt::{Debug, Formatter, Result as FmtResult};
22 use std::io::Error;
23 use std::mem::MaybeUninit;
24 use std::os::unix::io::AsRawFd;
25 use std::ptr;
26 use std::sync::atomic::{AtomicBool, Ordering};
27 use std::sync::{Arc, Mutex};
28 
29 use libc::{self, c_int};
30 
31 use super::exfiltrator::Exfiltrator;
32 use crate::low_level::pipe::{self, WakeMethod};
33 use crate::SigId;
34 
35 /// Maximal signal number we support.
36 const MAX_SIGNUM: usize = 128;
37 
38 trait SelfPipeWrite: Debug + Send + Sync {
wake_readers(&self)39     fn wake_readers(&self);
40 }
41 
42 impl<W: AsRawFd + Debug + Send + Sync> SelfPipeWrite for W {
wake_readers(&self)43     fn wake_readers(&self) {
44         pipe::wake(self.as_raw_fd(), WakeMethod::Send);
45     }
46 }
47 
48 #[derive(Debug)]
49 struct DeliveryState {
50     closed: AtomicBool,
51     registered_signal_ids: Mutex<Vec<Option<SigId>>>,
52 }
53 
54 impl DeliveryState {
new() -> Self55     fn new() -> Self {
56         let ids = (0..MAX_SIGNUM).map(|_| None).collect();
57         Self {
58             closed: AtomicBool::new(false),
59             registered_signal_ids: Mutex::new(ids),
60         }
61     }
62 }
63 
64 impl Drop for DeliveryState {
drop(&mut self)65     fn drop(&mut self) {
66         let lock = self.registered_signal_ids.lock().unwrap();
67         for id in lock.iter().filter_map(|s| *s) {
68             crate::low_level::unregister(id);
69         }
70     }
71 }
72 
73 struct PendingSignals<E: Exfiltrator> {
74     exfiltrator: E,
75     slots: [E::Storage; MAX_SIGNUM],
76 }
77 
78 impl<E: Exfiltrator> PendingSignals<E> {
new(exfiltrator: E) -> Self79     fn new(exfiltrator: E) -> Self {
80         // Unfortunately, Default is not implemented for long arrays :-(
81         //
82         // Note that if the default impl panics, the already existing instances are leaked.
83         let mut slots = MaybeUninit::<[E::Storage; MAX_SIGNUM]>::uninit();
84         for i in 0..MAX_SIGNUM {
85             unsafe {
86                 let slot: *mut E::Storage = slots.as_mut_ptr() as *mut _;
87                 let slot = slot.add(i);
88                 ptr::write(slot, E::Storage::default());
89             }
90         }
91 
92         Self {
93             exfiltrator,
94             slots: unsafe { slots.assume_init() },
95         }
96     }
97 }
98 
99 /// An internal trait to hide adding new signals into a Handle behind a dynamic dispatch.
100 trait AddSignal: Debug + Send + Sync {
add_signal( self: Arc<Self>, write: Arc<dyn SelfPipeWrite>, signal: c_int, ) -> Result<SigId, Error>101     fn add_signal(
102         self: Arc<Self>,
103         write: Arc<dyn SelfPipeWrite>,
104         signal: c_int,
105     ) -> Result<SigId, Error>;
106 }
107 
108 // Implemented manually because 1.36.0 doesn't yet support Debug for [X; BIG_NUMBER].
109 impl<E: Exfiltrator> Debug for PendingSignals<E> {
fmt(&self, fmt: &mut Formatter) -> FmtResult110     fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
111         fmt.debug_struct("PendingSignals")
112             .field("exfiltrator", &self.exfiltrator)
113             // While the array does not, the slice does implement Debug
114             .field("slots", &&self.slots[..])
115             .finish()
116     }
117 }
118 
119 impl<E: Exfiltrator> AddSignal for PendingSignals<E> {
add_signal( self: Arc<Self>, write: Arc<dyn SelfPipeWrite>, signal: c_int, ) -> Result<SigId, Error>120     fn add_signal(
121         self: Arc<Self>,
122         write: Arc<dyn SelfPipeWrite>,
123         signal: c_int,
124     ) -> Result<SigId, Error> {
125         assert!(signal >= 0);
126         assert!(
127             (signal as usize) < MAX_SIGNUM,
128             "Signal number {} too large. If your OS really supports such signal, file a bug",
129             signal,
130         );
131         assert!(
132             self.exfiltrator.supports_signal(signal),
133             "Signal {} not supported by exfiltrator {:?}",
134             signal,
135             self.exfiltrator,
136         );
137         self.exfiltrator.init(&self.slots[signal as usize], signal);
138 
139         let action = move |act: &_| {
140             let slot = &self.slots[signal as usize];
141             let ex = &self.exfiltrator;
142             ex.store(slot, signal, act);
143             write.wake_readers();
144         };
145         let id = unsafe { signal_hook_registry::register_sigaction(signal, action) }?;
146         Ok(id)
147     }
148 }
149 
150 /// A struct to control an instance of an associated type
151 /// (like for example [`Signals`][super::Signals]).
152 ///
153 /// It allows to register more signal handlers and to shutdown the signal
154 /// delivery. You can [`clone`][Handle::clone] this type which isn't a
155 /// very expensive operation. The cloned instances can be shared between
156 /// multiple threads.
157 #[derive(Debug, Clone)]
158 pub struct Handle {
159     pending: Arc<dyn AddSignal>,
160     write: Arc<dyn SelfPipeWrite>,
161     delivery_state: Arc<DeliveryState>,
162 }
163 
164 impl Handle {
new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self where W: 'static + SelfPipeWrite,165     fn new<W>(write: W, pending: Arc<dyn AddSignal>) -> Self
166     where
167         W: 'static + SelfPipeWrite,
168     {
169         Self {
170             pending,
171             write: Arc::new(write),
172             delivery_state: Arc::new(DeliveryState::new()),
173         }
174     }
175 
176     /// Registers another signal to the set watched by the associated instance.
177     ///
178     /// # Notes
179     ///
180     /// * This is safe to call concurrently from whatever thread.
181     /// * This is *not* safe to call from within a signal handler.
182     /// * If the signal number was already registered previously, this is a no-op.
183     /// * If this errors, the original set of signals is left intact.
184     ///
185     /// # Panics
186     ///
187     /// * If the given signal is [forbidden][crate::FORBIDDEN].
188     /// * If the signal number is negative or larger than internal limit. The limit should be
189     ///   larger than any supported signal the OS supports.
190     /// * If the relevant [`Exfiltrator`] does not support this particular signal. The default
191     ///   [`SignalOnly`] one supports all signals.
add_signal(&self, signal: c_int) -> Result<(), Error>192     pub fn add_signal(&self, signal: c_int) -> Result<(), Error> {
193         let mut lock = self.delivery_state.registered_signal_ids.lock().unwrap();
194         // Already registered, ignoring
195         if lock[signal as usize].is_some() {
196             return Ok(());
197         }
198 
199         let id = Arc::clone(&self.pending).add_signal(Arc::clone(&self.write), signal)?;
200 
201         lock[signal as usize] = Some(id);
202 
203         Ok(())
204     }
205 
206     /// Closes the associated instance.
207     ///
208     /// This is meant to signalize termination of the signal delivery process.
209     /// After calling close:
210     ///
211     /// * [`is_closed`][Handle::is_closed] will return true.
212     /// * All currently blocking operations of associated instances
213     ///   are interrupted and terminate.
214     /// * Any further operations will not block.
215     /// * Further signals may or may not be returned from the iterators. However, if any are
216     ///   returned, these are real signals that happened.
217     ///
218     /// The goal is to be able to shut down any background thread that handles only the signals.
close(&self)219     pub fn close(&self) {
220         self.delivery_state.closed.store(true, Ordering::SeqCst);
221         self.write.wake_readers();
222     }
223 
224     /// Is it closed?
225     ///
226     /// See [`close`][Handle::close].
is_closed(&self) -> bool227     pub fn is_closed(&self) -> bool {
228         self.delivery_state.closed.load(Ordering::SeqCst)
229     }
230 }
231 
232 /// A struct for delivering received signals to the main program flow.
233 /// The self-pipe IO type is generic. See the
234 /// [`with_pipe`][SignalDelivery::with_pipe] method for requirements
235 /// for the IO type.
236 #[derive(Debug)]
237 pub struct SignalDelivery<R, E: Exfiltrator> {
238     read: R,
239     handle: Handle,
240     pending: Arc<PendingSignals<E>>,
241 }
242 
243 impl<R, E: Exfiltrator> SignalDelivery<R, E>
244 where
245     R: 'static + AsRawFd + Send + Sync,
246 {
247     /// Creates the `SignalDelivery` structure.
248     ///
249     /// The read and write arguments must be the ends of a suitable pipe type. These are used
250     /// for communication between the signal handler and main program flow.
251     ///
252     /// Registers all the signals listed. The same restrictions (panics, errors) apply as with
253     /// [`add_signal`][Handle::add_signal].
254     ///
255     /// # Requirements for the pipe type
256     ///
257     /// * Must support [`send`](https://man7.org/linux/man-pages/man2/send.2.html) for
258     ///   asynchronously writing bytes to the write end
259     /// * Must support [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) for
260     ///   reading bytes from the read end
261     ///
262     /// So UnixStream is a good choice for this.
with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error> where I: IntoIterator<Item = S>, S: Borrow<c_int>, W: 'static + AsRawFd + Debug + Send + Sync,263     pub fn with_pipe<I, S, W>(read: R, write: W, exfiltrator: E, signals: I) -> Result<Self, Error>
264     where
265         I: IntoIterator<Item = S>,
266         S: Borrow<c_int>,
267         W: 'static + AsRawFd + Debug + Send + Sync,
268     {
269         let pending = Arc::new(PendingSignals::new(exfiltrator));
270         let pending_add_signal = Arc::clone(&pending);
271         let handle = Handle::new(write, pending_add_signal);
272         let me = Self {
273             read,
274             handle,
275             pending,
276         };
277         for sig in signals {
278             me.handle.add_signal(*sig.borrow())?;
279         }
280         Ok(me)
281     }
282 
283     /// Get a reference to the read end of the self pipe
284     ///
285     /// You may use this method to register the underlying file descriptor
286     /// with an eventing system (e. g. epoll) to get notified if there are
287     /// bytes in the pipe. If the event system reports the file descriptor
288     /// ready for reading you can then call [`pending`][SignalDelivery::pending]
289     /// to get the arrived signals.
get_read(&self) -> &R290     pub fn get_read(&self) -> &R {
291         &self.read
292     }
293 
294     /// Get a mutable reference to the read end of the self pipe
295     ///
296     /// See the [`get_read`][SignalDelivery::get_read] method for some additional
297     /// information.
get_read_mut(&mut self) -> &mut R298     pub fn get_read_mut(&mut self) -> &mut R {
299         &mut self.read
300     }
301 
302     /// Drains all data from the internal self-pipe. This method will never block.
flush(&mut self)303     fn flush(&mut self) {
304         const SIZE: usize = 1024;
305         let mut buff = [0u8; SIZE];
306 
307         unsafe {
308             // Draining the data in the self pipe. We ignore all errors on purpose. This
309             // should not be something like closed file descriptor. It could EAGAIN, but
310             // that's OK in case we say MSG_DONTWAIT. If it's EINTR, then it's OK too,
311             // it'll only create a spurious wakeup.
312             while libc::recv(
313                 self.read.as_raw_fd(),
314                 buff.as_mut_ptr() as *mut libc::c_void,
315                 SIZE,
316                 libc::MSG_DONTWAIT,
317             ) > 0
318             {}
319         }
320     }
321 
322     /// Returns an iterator of already received signals.
323     ///
324     /// This returns an iterator over all the signal numbers of the signals received since last
325     /// time they were read (out of the set registered by this `SignalDelivery` instance). Note
326     /// that they are returned in arbitrary order and a signal number is returned only once even
327     /// if it was received multiple times.
328     ///
329     /// This method returns immediately (does not block) and may produce an empty iterator if
330     /// there are no signals ready.
pending(&mut self) -> Pending<E>331     pub fn pending(&mut self) -> Pending<E> {
332         self.flush();
333         Pending::new(Arc::clone(&self.pending))
334     }
335 
336     /// Checks the reading end of the self pipe for available signals.
337     ///
338     /// If there are no signals available or this instance was already closed it returns
339     /// [`Option::None`]. If there are some signals it returns a [`Pending`]
340     /// instance wrapped inside a [`Option::Some`]. However, due to implementation details,
341     /// this still can produce an empty iterator.
342     ///
343     /// This method doesn't check the reading end by itself but uses the passed in callback.
344     /// This method blocks if and only if the callback blocks trying to read some bytes.
poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error> where F: FnMut(&mut R) -> Result<bool, Error>,345     pub fn poll_pending<F>(&mut self, has_signals: &mut F) -> Result<Option<Pending<E>>, Error>
346     where
347         F: FnMut(&mut R) -> Result<bool, Error>,
348     {
349         if self.handle.is_closed() {
350             return Ok(None);
351         }
352 
353         match has_signals(self.get_read_mut()) {
354             Ok(false) => Ok(None),
355             Ok(true) => Ok(Some(self.pending())),
356             Err(err) => Err(err),
357         }
358     }
359 
360     /// Get a [`Handle`] for this `SignalDelivery` instance.
361     ///
362     /// This can be used to add further signals or close the whole
363     /// signal delivery mechanism.
handle(&self) -> Handle364     pub fn handle(&self) -> Handle {
365         self.handle.clone()
366     }
367 }
368 
369 /// The iterator of one batch of signals.
370 ///
371 /// This is returned by the [`pending`][SignalDelivery::pending] method.
372 #[derive(Debug)]
373 pub struct Pending<E: Exfiltrator> {
374     pending: Arc<PendingSignals<E>>,
375     position: usize,
376 }
377 
378 impl<E: Exfiltrator> Pending<E> {
new(pending: Arc<PendingSignals<E>>) -> Self379     fn new(pending: Arc<PendingSignals<E>>) -> Self {
380         Self {
381             pending,
382             position: 0,
383         }
384     }
385 }
386 
387 impl<E: Exfiltrator> Iterator for Pending<E> {
388     type Item = E::Output;
389 
next(&mut self) -> Option<E::Output>390     fn next(&mut self) -> Option<E::Output> {
391         while self.position < self.pending.slots.len() {
392             let sig = self.position;
393             let slot = &self.pending.slots[sig];
394             let result = self.pending.exfiltrator.load(slot, sig as c_int);
395             if result.is_some() {
396                 return result;
397             } else {
398                 self.position += 1;
399             }
400         }
401 
402         None
403     }
404 }
405 
406 /// Possible results of the [`poll_signal`][SignalIterator::poll_signal] function.
407 pub enum PollResult<O> {
408     /// A signal arrived
409     Signal(O),
410     /// There are no signals yet but there may arrive some in the future
411     Pending,
412     /// The iterator was closed. There won't be any signals reported from now on.
413     Closed,
414     /// An error happened during polling for arrived signals.
415     Err(Error),
416 }
417 
418 /// An infinite iterator of received signals.
419 pub struct SignalIterator<SD, E: Exfiltrator> {
420     signals: SD,
421     iter: Pending<E>,
422 }
423 
424 impl<SD, E: Exfiltrator> SignalIterator<SD, E> {
425     /// Create a new infinite iterator for signals registered with the passed
426     /// in [`SignalDelivery`] instance.
new<R>(mut signals: SD) -> Self where SD: BorrowMut<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync,427     pub fn new<R>(mut signals: SD) -> Self
428     where
429         SD: BorrowMut<SignalDelivery<R, E>>,
430         R: 'static + AsRawFd + Send + Sync,
431     {
432         let iter = signals.borrow_mut().pending();
433         Self { signals, iter }
434     }
435 
436     /// Return a signal if there is one or tell the caller that there is none at the moment.
437     ///
438     /// You have to pass in a callback which checks the underlying reading end of the pipe if
439     /// there may be any pending signals. This callback may or may not block. If the callback
440     /// returns [`true`] this method will try to fetch the next signal and return it as a
441     /// [`PollResult::Signal`]. If the callback returns [`false`] the method will return
442     /// [`PollResult::Pending`] and assume it will be called again at a later point in time.
443     /// The callback may be called any number of times by this function.
444     ///
445     /// If the iterator was closed by the [`close`][Handle::close] method of the associtated
446     /// [`Handle`] this method will return [`PollResult::Closed`].
poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output> where SD: BorrowMut<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync, F: FnMut(&mut R) -> Result<bool, Error>,447     pub fn poll_signal<R, F>(&mut self, has_signals: &mut F) -> PollResult<E::Output>
448     where
449         SD: BorrowMut<SignalDelivery<R, E>>,
450         R: 'static + AsRawFd + Send + Sync,
451         F: FnMut(&mut R) -> Result<bool, Error>,
452     {
453         // The loop is necessary because it is possible that a signal was already consumed
454         // by a previous pending iterator due to the asynchronous nature of signals and
455         // always moving to the end of the iterator before calling has_more.
456         while !self.signals.borrow_mut().handle.is_closed() {
457             if let Some(result) = self.iter.next() {
458                 return PollResult::Signal(result);
459             }
460 
461             match self.signals.borrow_mut().poll_pending(has_signals) {
462                 Ok(Some(pending)) => self.iter = pending,
463                 Ok(None) => return PollResult::Pending,
464                 Err(err) => return PollResult::Err(err),
465             }
466         }
467 
468         PollResult::Closed
469     }
470 
471     /// Get a shareable [`Handle`] for this instance.
472     ///
473     /// This can be used to add further signals or terminate the whole
474     /// signal iteration using the [`close`][Handle::close] method.
handle<R>(&self) -> Handle where SD: Borrow<SignalDelivery<R, E>>, R: 'static + AsRawFd + Send + Sync,475     pub fn handle<R>(&self) -> Handle
476     where
477         SD: Borrow<SignalDelivery<R, E>>,
478         R: 'static + AsRawFd + Send + Sync,
479     {
480         self.signals.borrow().handle()
481     }
482 }
483 
484 /// A signal iterator which consumes a [`SignalDelivery`] instance and takes
485 /// ownership of it.
486 pub type OwningSignalIterator<R, E> = SignalIterator<SignalDelivery<R, E>, E>;
487 
488 /// A signal iterator which takes a mutable reference to a [`SignalDelivery`]
489 /// instance.
490 pub type RefSignalIterator<'a, R, E> = SignalIterator<&'a mut SignalDelivery<R, E>, E>;
491