1 //! Notify async tasks or threads.
2 //!
3 //! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4 //!
5 //! You can use this crate to turn non-blocking data structures into async or blocking data
6 //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7 //! for acquiring locks.
8 //!
9 //! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
10 //! [simple mutex]: https://github.com/stjepang/event-listener/blob/master/examples/mutex.rs
11 //!
12 //! # Examples
13 //!
14 //! Wait until another thread sets a boolean flag:
15 //!
16 //! ```
17 //! use std::sync::atomic::{AtomicBool, Ordering};
18 //! use std::sync::Arc;
19 //! use std::thread;
20 //! use std::time::Duration;
21 //! use std::usize;
22 //! use event_listener::Event;
23 //!
24 //! let flag = Arc::new(AtomicBool::new(false));
25 //! let event = Arc::new(Event::new());
26 //!
27 //! // Spawn a thread that will set the flag after 1 second.
28 //! thread::spawn({
29 //!     let flag = flag.clone();
30 //!     let event = event.clone();
31 //!     move || {
32 //!         // Wait for a second.
33 //!         thread::sleep(Duration::from_secs(1));
34 //!
35 //!         // Set the flag.
36 //!         flag.store(true, Ordering::SeqCst);
37 //!
38 //!         // Notify all listeners that the flag has been set.
39 //!         event.notify(usize::MAX);
40 //!     }
41 //! });
42 //!
43 //! // Wait until the flag is set.
44 //! loop {
45 //!     // Check the flag.
46 //!     if flag.load(Ordering::SeqCst) {
47 //!         break;
48 //!     }
49 //!
50 //!     // Start listening for events.
51 //!     let listener = event.listen();
52 //!
53 //!     // Check the flag again after creating the listener.
54 //!     if flag.load(Ordering::SeqCst) {
55 //!         break;
56 //!     }
57 //!
58 //!     // Wait for a notification and continue the loop.
59 //!     listener.wait();
60 //! }
61 //! ```
62 
63 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
64 
65 use std::cell::{Cell, UnsafeCell};
66 use std::fmt;
67 use std::future::Future;
68 use std::mem::{self, ManuallyDrop};
69 use std::ops::{Deref, DerefMut};
70 use std::panic::{RefUnwindSafe, UnwindSafe};
71 use std::pin::Pin;
72 use std::ptr::{self, NonNull};
73 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
74 use std::sync::{Arc, Mutex, MutexGuard};
75 use std::task::{Context, Poll, Waker};
76 use std::thread::{self, Thread};
77 use std::time::{Duration, Instant};
78 use std::usize;
79 
80 /// Inner state of [`Event`].
81 struct Inner {
82     /// The number of notified entries, or `usize::MAX` if all of them have been notified.
83     ///
84     /// If there are no entries, this value is set to `usize::MAX`.
85     notified: AtomicUsize,
86 
87     /// A linked list holding registered listeners.
88     list: Mutex<List>,
89 
90     /// A single cached list entry to avoid allocations on the fast path of the insertion.
91     cache: UnsafeCell<Entry>,
92 }
93 
94 impl Inner {
95     /// Locks the list.
lock(&self) -> ListGuard<'_>96     fn lock(&self) -> ListGuard<'_> {
97         ListGuard {
98             inner: self,
99             guard: self.list.lock().unwrap(),
100         }
101     }
102 
103     /// Returns the pointer to the single cached list entry.
104     #[inline(always)]
cache_ptr(&self) -> NonNull<Entry>105     fn cache_ptr(&self) -> NonNull<Entry> {
106         unsafe { NonNull::new_unchecked(self.cache.get()) }
107     }
108 }
109 
110 /// A synchronization primitive for notifying async tasks and threads.
111 ///
112 /// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
113 ///
114 /// 1. [`Event::notify()`] notifies a number of listeners.
115 /// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
116 ///
117 /// If there are no active listeners at the time a notification is sent, it simply gets lost.
118 ///
119 /// There are two ways for a listener to wait for a notification:
120 ///
121 /// 1. In an asynchronous manner using `.await`.
122 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
123 ///
124 /// If a notified listener is dropped without receiving a notification, dropping will notify
125 /// another active listener. Whether one *additional* listener will be notified depends on what
126 /// kind of notification was delivered.
127 ///
128 /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
129 pub struct Event {
130     /// A pointer to heap-allocated inner state.
131     ///
132     /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
133     /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
134     /// reference count.
135     inner: AtomicPtr<Inner>,
136 }
137 
138 unsafe impl Send for Event {}
139 unsafe impl Sync for Event {}
140 
141 impl UnwindSafe for Event {}
142 impl RefUnwindSafe for Event {}
143 
144 impl Event {
145     /// Creates a new [`Event`].
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use event_listener::Event;
151     ///
152     /// let event = Event::new();
153     /// ```
154     #[inline]
new() -> Event155     pub const fn new() -> Event {
156         Event {
157             inner: AtomicPtr::new(ptr::null_mut()),
158         }
159     }
160 
161     /// Returns a guard listening for a notification.
162     ///
163     /// This method emits a `SeqCst` fence after registering a listener.
164     ///
165     /// # Examples
166     ///
167     /// ```
168     /// use event_listener::Event;
169     ///
170     /// let event = Event::new();
171     /// let listener = event.listen();
172     /// ```
173     #[cold]
listen(&self) -> EventListener174     pub fn listen(&self) -> EventListener {
175         let inner = self.inner();
176         let listener = EventListener {
177             inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
178             entry: Some(inner.lock().insert(inner.cache_ptr())),
179         };
180 
181         // Make sure the listener is registered before whatever happens next.
182         full_fence();
183         listener
184     }
185 
186     /// Notifies a number of active listeners.
187     ///
188     /// The number is allowed to be zero or exceed the current number of listeners.
189     ///
190     /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
191     /// listeners among the active ones are notified.
192     ///
193     /// This method emits a `SeqCst` fence before notifying listeners.
194     ///
195     /// # Examples
196     ///
197     /// ```
198     /// use event_listener::Event;
199     ///
200     /// let event = Event::new();
201     ///
202     /// // This notification gets lost because there are no listeners.
203     /// event.notify(1);
204     ///
205     /// let listener1 = event.listen();
206     /// let listener2 = event.listen();
207     /// let listener3 = event.listen();
208     ///
209     /// // Notifies two listeners.
210     /// //
211     /// // Listener queueing is fair, which means `listener1` and `listener2`
212     /// // get notified here since they start listening before `listener3`.
213     /// event.notify(2);
214     /// ```
215     #[inline]
notify(&self, n: usize)216     pub fn notify(&self, n: usize) {
217         // Make sure the notification comes after whatever triggered it.
218         full_fence();
219 
220         if let Some(inner) = self.try_inner() {
221             // Notify if there is at least one unnotified listener and the number of notified
222             // listeners is less than `n`.
223             if inner.notified.load(Ordering::Acquire) < n {
224                 inner.lock().notify(n);
225             }
226         }
227     }
228 
229     /// Notifies a number of active listeners without emitting a `SeqCst` fence.
230     ///
231     /// The number is allowed to be zero or exceed the current number of listeners.
232     ///
233     /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
234     /// listeners among the active ones are notified.
235     ///
236     /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// use event_listener::Event;
242     /// use std::sync::atomic::{self, Ordering};
243     ///
244     /// let event = Event::new();
245     ///
246     /// // This notification gets lost because there are no listeners.
247     /// event.notify(1);
248     ///
249     /// let listener1 = event.listen();
250     /// let listener2 = event.listen();
251     /// let listener3 = event.listen();
252     ///
253     /// // We should emit a fence manually when using relaxed notifications.
254     /// atomic::fence(Ordering::SeqCst);
255     ///
256     /// // Notifies two listeners.
257     /// //
258     /// // Listener queueing is fair, which means `listener1` and `listener2`
259     /// // get notified here since they start listening before `listener3`.
260     /// event.notify(2);
261     /// ```
262     #[inline]
notify_relaxed(&self, n: usize)263     pub fn notify_relaxed(&self, n: usize) {
264         if let Some(inner) = self.try_inner() {
265             // Notify if there is at least one unnotified listener and the number of notified
266             // listeners is less than `n`.
267             if inner.notified.load(Ordering::Acquire) < n {
268                 inner.lock().notify(n);
269             }
270         }
271     }
272 
273     /// Notifies a number of active and still unnotified listeners.
274     ///
275     /// The number is allowed to be zero or exceed the current number of listeners.
276     ///
277     /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
278     /// were previously unnotified.
279     ///
280     /// This method emits a `SeqCst` fence before notifying listeners.
281     ///
282     /// # Examples
283     ///
284     /// ```
285     /// use event_listener::Event;
286     ///
287     /// let event = Event::new();
288     ///
289     /// // This notification gets lost because there are no listeners.
290     /// event.notify(1);
291     ///
292     /// let listener1 = event.listen();
293     /// let listener2 = event.listen();
294     /// let listener3 = event.listen();
295     ///
296     /// // Notifies two listeners.
297     /// //
298     /// // Listener queueing is fair, which means `listener1` and `listener2`
299     /// // get notified here since they start listening before `listener3`.
300     /// event.notify_additional(1);
301     /// event.notify_additional(1);
302     /// ```
303     #[inline]
notify_additional(&self, n: usize)304     pub fn notify_additional(&self, n: usize) {
305         // Make sure the notification comes after whatever triggered it.
306         full_fence();
307 
308         if let Some(inner) = self.try_inner() {
309             // Notify if there is at least one unnotified listener.
310             if inner.notified.load(Ordering::Acquire) < usize::MAX {
311                 inner.lock().notify_additional(n);
312             }
313         }
314     }
315 
316     /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
317     /// fence.
318     ///
319     /// The number is allowed to be zero or exceed the current number of listeners.
320     ///
321     /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
322     /// were previously unnotified.
323     ///
324     /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
325     ///
326     /// # Examples
327     ///
328     /// ```
329     /// use event_listener::Event;
330     /// use std::sync::atomic::{self, Ordering};
331     ///
332     /// let event = Event::new();
333     ///
334     /// // This notification gets lost because there are no listeners.
335     /// event.notify(1);
336     ///
337     /// let listener1 = event.listen();
338     /// let listener2 = event.listen();
339     /// let listener3 = event.listen();
340     ///
341     /// // We should emit a fence manually when using relaxed notifications.
342     /// atomic::fence(Ordering::SeqCst);
343     ///
344     /// // Notifies two listeners.
345     /// //
346     /// // Listener queueing is fair, which means `listener1` and `listener2`
347     /// // get notified here since they start listening before `listener3`.
348     /// event.notify_additional_relaxed(1);
349     /// event.notify_additional_relaxed(1);
350     /// ```
351     #[inline]
notify_additional_relaxed(&self, n: usize)352     pub fn notify_additional_relaxed(&self, n: usize) {
353         if let Some(inner) = self.try_inner() {
354             // Notify if there is at least one unnotified listener.
355             if inner.notified.load(Ordering::Acquire) < usize::MAX {
356                 inner.lock().notify_additional(n);
357             }
358         }
359     }
360 
361     /// Returns a reference to the inner state if it was initialized.
362     #[inline]
try_inner(&self) -> Option<&Inner>363     fn try_inner(&self) -> Option<&Inner> {
364         let inner = self.inner.load(Ordering::Acquire);
365         unsafe { inner.as_ref() }
366     }
367 
368     /// Returns a reference to the inner state, initializing it if necessary.
inner(&self) -> &Inner369     fn inner(&self) -> &Inner {
370         let mut inner = self.inner.load(Ordering::Acquire);
371 
372         // Initialize the state if this is its first use.
373         if inner.is_null() {
374             // Allocate on the heap.
375             let new = Arc::new(Inner {
376                 notified: AtomicUsize::new(usize::MAX),
377                 list: std::sync::Mutex::new(List {
378                     head: None,
379                     tail: None,
380                     start: None,
381                     len: 0,
382                     notified: 0,
383                     cache_used: false,
384                 }),
385                 cache: UnsafeCell::new(Entry {
386                     state: Cell::new(State::Created),
387                     prev: Cell::new(None),
388                     next: Cell::new(None),
389                 }),
390             });
391             // Convert the heap-allocated state into a raw pointer.
392             let new = Arc::into_raw(new) as *mut Inner;
393 
394             // Attempt to replace the null-pointer with the new state pointer.
395             inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel);
396 
397             // Check if the old pointer value was indeed null.
398             if inner.is_null() {
399                 // If yes, then use the new state pointer.
400                 inner = new;
401             } else {
402                 // If not, that means a concurrent operation has initialized the state.
403                 // In that case, use the old pointer and deallocate the new one.
404                 unsafe {
405                     drop(Arc::from_raw(new));
406                 }
407             }
408         }
409 
410         unsafe { &*inner }
411     }
412 }
413 
414 impl Drop for Event {
415     #[inline]
drop(&mut self)416     fn drop(&mut self) {
417         let inner: *mut Inner = *self.inner.get_mut();
418 
419         // If the state pointer has been initialized, deallocate it.
420         if !inner.is_null() {
421             unsafe {
422                 drop(Arc::from_raw(inner));
423             }
424         }
425     }
426 }
427 
428 impl fmt::Debug for Event {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result429     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430         f.pad("Event { .. }")
431     }
432 }
433 
434 impl Default for Event {
default() -> Event435     fn default() -> Event {
436         Event::new()
437     }
438 }
439 
440 /// A guard waiting for a notification from an [`Event`].
441 ///
442 /// There are two ways for a listener to wait for a notification:
443 ///
444 /// 1. In an asynchronous manner using `.await`.
445 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
446 ///
447 /// If a notified listener is dropped without receiving a notification, dropping will notify
448 /// another active listener. Whether one *additional* listener will be notified depends on what
449 /// kind of notification was delivered.
450 pub struct EventListener {
451     /// A reference to [`Event`]'s inner state.
452     inner: Arc<Inner>,
453 
454     /// A pointer to this listener's entry in the linked list.
455     entry: Option<NonNull<Entry>>,
456 }
457 
458 unsafe impl Send for EventListener {}
459 unsafe impl Sync for EventListener {}
460 
461 impl UnwindSafe for EventListener {}
462 impl RefUnwindSafe for EventListener {}
463 
464 impl EventListener {
465     /// Blocks until a notification is received.
466     ///
467     /// # Examples
468     ///
469     /// ```
470     /// use event_listener::Event;
471     ///
472     /// let event = Event::new();
473     /// let listener = event.listen();
474     ///
475     /// // Notify `listener`.
476     /// event.notify(1);
477     ///
478     /// // Receive the notification.
479     /// listener.wait();
480     /// ```
wait(self)481     pub fn wait(self) {
482         self.wait_internal(None);
483     }
484 
485     /// Blocks until a notification is received or a timeout is reached.
486     ///
487     /// Returns `true` if a notification was received.
488     ///
489     /// # Examples
490     ///
491     /// ```
492     /// use std::time::Duration;
493     /// use event_listener::Event;
494     ///
495     /// let event = Event::new();
496     /// let listener = event.listen();
497     ///
498     /// // There are no notification so this times out.
499     /// assert!(!listener.wait_timeout(Duration::from_secs(1)));
500     /// ```
wait_timeout(self, timeout: Duration) -> bool501     pub fn wait_timeout(self, timeout: Duration) -> bool {
502         self.wait_internal(Some(Instant::now() + timeout))
503     }
504 
505     /// Blocks until a notification is received or a deadline is reached.
506     ///
507     /// Returns `true` if a notification was received.
508     ///
509     /// # Examples
510     ///
511     /// ```
512     /// use std::time::{Duration, Instant};
513     /// use event_listener::Event;
514     ///
515     /// let event = Event::new();
516     /// let listener = event.listen();
517     ///
518     /// // There are no notification so this times out.
519     /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
520     /// ```
wait_deadline(self, deadline: Instant) -> bool521     pub fn wait_deadline(self, deadline: Instant) -> bool {
522         self.wait_internal(Some(deadline))
523     }
524 
525     /// Drops this listener and discards its notification (if any) without notifying another
526     /// active listener.
527     ///
528     /// Returns `true` if a notification was discarded.
529     ///
530     /// # Examples
531     /// ```
532     /// use event_listener::Event;
533     ///
534     /// let event = Event::new();
535     /// let listener1 = event.listen();
536     /// let listener2 = event.listen();
537     ///
538     /// event.notify(1);
539     ///
540     /// assert!(listener1.discard());
541     /// assert!(!listener2.discard());
542     /// ```
discard(mut self) -> bool543     pub fn discard(mut self) -> bool {
544         // If this listener has never picked up a notification...
545         if let Some(entry) = self.entry.take() {
546             let mut list = self.inner.lock();
547             // Remove the listener from the list and return `true` if it was notified.
548             if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
549                 return true;
550             }
551         }
552         false
553     }
554 
555     /// Returns `true` if this listener listens to the given `Event`.
556     ///
557     /// # Examples
558     ///
559     /// ```
560     /// use event_listener::Event;
561     ///
562     /// let event = Event::new();
563     /// let listener = event.listen();
564     ///
565     /// assert!(listener.listens_to(&event));
566     /// ```
567     #[inline]
listens_to(&self, event: &Event) -> bool568     pub fn listens_to(&self, event: &Event) -> bool {
569         ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
570     }
571 
572     /// Returns `true` if both listeners listen to the same `Event`.
573     ///
574     /// # Examples
575     ///
576     /// ```
577     /// use event_listener::Event;
578     ///
579     /// let event = Event::new();
580     /// let listener1 = event.listen();
581     /// let listener2 = event.listen();
582     ///
583     /// assert!(listener1.same_event(&listener2));
584     /// ```
same_event(&self, other: &EventListener) -> bool585     pub fn same_event(&self, other: &EventListener) -> bool {
586         ptr::eq::<Inner>(&*self.inner, &*other.inner)
587     }
588 
wait_internal(mut self, deadline: Option<Instant>) -> bool589     fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
590         // Take out the entry pointer and set it to `None`.
591         let entry = match self.entry.take() {
592             None => unreachable!("cannot wait twice on an `EventListener`"),
593             Some(entry) => entry,
594         };
595 
596         // Set this listener's state to `Waiting`.
597         {
598             let mut list = self.inner.lock();
599             let e = unsafe { entry.as_ref() };
600 
601             // Do a dummy replace operation in order to take out the state.
602             match e.state.replace(State::Notified(false)) {
603                 State::Notified(_) => {
604                     // If this listener has been notified, remove it from the list and return.
605                     list.remove(entry, self.inner.cache_ptr());
606                     return true;
607                 }
608                 // Otherwise, set the state to `Waiting`.
609                 _ => e.state.set(State::Waiting(thread::current())),
610             }
611         }
612 
613         // Wait until a notification is received or the timeout is reached.
614         loop {
615             match deadline {
616                 None => thread::park(),
617 
618                 Some(deadline) => {
619                     // Check for timeout.
620                     let now = Instant::now();
621                     if now >= deadline {
622                         // Remove the entry and check if notified.
623                         return self
624                             .inner
625                             .lock()
626                             .remove(entry, self.inner.cache_ptr())
627                             .is_notified();
628                     }
629 
630                     // Park until the deadline.
631                     thread::park_timeout(deadline - now);
632                 }
633             }
634 
635             let mut list = self.inner.lock();
636             let e = unsafe { entry.as_ref() };
637 
638             // Do a dummy replace operation in order to take out the state.
639             match e.state.replace(State::Notified(false)) {
640                 State::Notified(_) => {
641                     // If this listener has been notified, remove it from the list and return.
642                     list.remove(entry, self.inner.cache_ptr());
643                     return true;
644                 }
645                 // Otherwise, set the state back to `Waiting`.
646                 state => e.state.set(state),
647             }
648         }
649     }
650 }
651 
652 impl fmt::Debug for EventListener {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result653     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654         f.pad("EventListener { .. }")
655     }
656 }
657 
658 impl Future for EventListener {
659     type Output = ();
660 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>661     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662         let mut list = self.inner.lock();
663 
664         let entry = match self.entry {
665             None => unreachable!("cannot poll a completed `EventListener` future"),
666             Some(entry) => entry,
667         };
668         let state = unsafe { &entry.as_ref().state };
669 
670         // Do a dummy replace operation in order to take out the state.
671         match state.replace(State::Notified(false)) {
672             State::Notified(_) => {
673                 // If this listener has been notified, remove it from the list and return.
674                 list.remove(entry, self.inner.cache_ptr());
675                 drop(list);
676                 self.entry = None;
677                 return Poll::Ready(());
678             }
679             State::Created => {
680                 // If the listener was just created, put it in the `Polling` state.
681                 state.set(State::Polling(cx.waker().clone()));
682             }
683             State::Polling(w) => {
684                 // If the listener was in the `Polling` state, update the waker.
685                 if w.will_wake(cx.waker()) {
686                     state.set(State::Polling(w));
687                 } else {
688                     state.set(State::Polling(cx.waker().clone()));
689                 }
690             }
691             State::Waiting(_) => {
692                 unreachable!("cannot poll and wait on `EventListener` at the same time")
693             }
694         }
695 
696         Poll::Pending
697     }
698 }
699 
700 impl Drop for EventListener {
drop(&mut self)701     fn drop(&mut self) {
702         // If this listener has never picked up a notification...
703         if let Some(entry) = self.entry.take() {
704             let mut list = self.inner.lock();
705 
706             // But if a notification was delivered to it...
707             if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) {
708                 // Then pass it on to another active listener.
709                 if additional {
710                     list.notify_additional(1);
711                 } else {
712                     list.notify(1);
713                 }
714             }
715         }
716     }
717 }
718 
719 /// A guard holding the linked list locked.
720 struct ListGuard<'a> {
721     /// A reference to [`Event`]'s inner state.
722     inner: &'a Inner,
723 
724     /// The actual guard that acquired the linked list.
725     guard: MutexGuard<'a, List>,
726 }
727 
728 impl Drop for ListGuard<'_> {
729     #[inline]
drop(&mut self)730     fn drop(&mut self) {
731         let list = &mut **self;
732 
733         // Update the atomic `notified` counter.
734         let notified = if list.notified < list.len {
735             list.notified
736         } else {
737             usize::MAX
738         };
739         self.inner.notified.store(notified, Ordering::Release);
740     }
741 }
742 
743 impl Deref for ListGuard<'_> {
744     type Target = List;
745 
746     #[inline]
deref(&self) -> &List747     fn deref(&self) -> &List {
748         &*self.guard
749     }
750 }
751 
752 impl DerefMut for ListGuard<'_> {
753     #[inline]
deref_mut(&mut self) -> &mut List754     fn deref_mut(&mut self) -> &mut List {
755         &mut *self.guard
756     }
757 }
758 
759 /// The state of a listener.
760 enum State {
761     /// It has just been created.
762     Created,
763 
764     /// It has received a notification.
765     ///
766     /// The `bool` is `true` if this was an "additional" notification.
767     Notified(bool),
768 
769     /// An async task is polling it.
770     Polling(Waker),
771 
772     /// A thread is blocked on it.
773     Waiting(Thread),
774 }
775 
776 impl State {
777     /// Returns `true` if this is the `Notified` state.
778     #[inline]
is_notified(&self) -> bool779     fn is_notified(&self) -> bool {
780         match self {
781             State::Notified(_) => true,
782             State::Created | State::Polling(_) | State::Waiting(_) => false,
783         }
784     }
785 }
786 
787 /// An entry representing a registered listener.
788 struct Entry {
789     /// THe state of this listener.
790     state: Cell<State>,
791 
792     /// Previous entry in the linked list.
793     prev: Cell<Option<NonNull<Entry>>>,
794 
795     /// Next entry in the linked list.
796     next: Cell<Option<NonNull<Entry>>>,
797 }
798 
799 /// A linked list of entries.
800 struct List {
801     /// First entry in the list.
802     head: Option<NonNull<Entry>>,
803 
804     /// Last entry in the list.
805     tail: Option<NonNull<Entry>>,
806 
807     /// The first unnotified entry in the list.
808     start: Option<NonNull<Entry>>,
809 
810     /// Total number of entries in the list.
811     len: usize,
812 
813     /// The number of notified entries in the list.
814     notified: usize,
815 
816     /// Whether the cached entry is used.
817     cache_used: bool,
818 }
819 
820 impl List {
821     /// Inserts a new entry into the list.
insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry>822     fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
823         unsafe {
824             let entry = Entry {
825                 state: Cell::new(State::Created),
826                 prev: Cell::new(self.tail),
827                 next: Cell::new(None),
828             };
829 
830             let entry = if self.cache_used {
831                 // Allocate an entry that is going to become the new tail.
832                 NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
833             } else {
834                 // No need to allocate - we can use the cached entry.
835                 self.cache_used = true;
836                 cache.as_ptr().write(entry);
837                 cache
838             };
839 
840             // Replace the tail with the new entry.
841             match mem::replace(&mut self.tail, Some(entry)) {
842                 None => self.head = Some(entry),
843                 Some(t) => t.as_ref().next.set(Some(entry)),
844             }
845 
846             // If there were no unnotified entries, this one is the first now.
847             if self.start.is_none() {
848                 self.start = self.tail;
849             }
850 
851             // Bump the entry count.
852             self.len += 1;
853 
854             entry
855         }
856     }
857 
858     /// Removes an entry from the list and returns its state.
remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State859     fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
860         unsafe {
861             let prev = entry.as_ref().prev.get();
862             let next = entry.as_ref().next.get();
863 
864             // Unlink from the previous entry.
865             match prev {
866                 None => self.head = next,
867                 Some(p) => p.as_ref().next.set(next),
868             }
869 
870             // Unlink from the next entry.
871             match next {
872                 None => self.tail = prev,
873                 Some(n) => n.as_ref().prev.set(prev),
874             }
875 
876             // If this was the first unnotified entry, move the pointer to the next one.
877             if self.start == Some(entry) {
878                 self.start = next;
879             }
880 
881             // Extract the state.
882             let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
883                 // Free the cached entry.
884                 self.cache_used = false;
885                 entry.as_ref().state.replace(State::Created)
886             } else {
887                 // Deallocate the entry.
888                 Box::from_raw(entry.as_ptr()).state.into_inner()
889             };
890 
891             // Update the counters.
892             if state.is_notified() {
893                 self.notified -= 1;
894             }
895             self.len -= 1;
896 
897             state
898         }
899     }
900 
901     /// Notifies a number of entries.
902     #[cold]
notify(&mut self, mut n: usize)903     fn notify(&mut self, mut n: usize) {
904         if n <= self.notified {
905             return;
906         }
907         n -= self.notified;
908 
909         while n > 0 {
910             n -= 1;
911 
912             // Notify the first unnotified entry.
913             match self.start {
914                 None => break,
915                 Some(e) => {
916                     // Get the entry and move the pointer forward.
917                     let e = unsafe { e.as_ref() };
918                     self.start = e.next.get();
919 
920                     // Set the state of this entry to `Notified` and notify.
921                     match e.state.replace(State::Notified(false)) {
922                         State::Notified(_) => {}
923                         State::Created => {}
924                         State::Polling(w) => w.wake(),
925                         State::Waiting(t) => t.unpark(),
926                     }
927 
928                     // Update the counter.
929                     self.notified += 1;
930                 }
931             }
932         }
933     }
934 
935     /// Notifies a number of additional entries.
936     #[cold]
notify_additional(&mut self, mut n: usize)937     fn notify_additional(&mut self, mut n: usize) {
938         while n > 0 {
939             n -= 1;
940 
941             // Notify the first unnotified entry.
942             match self.start {
943                 None => break,
944                 Some(e) => {
945                     // Get the entry and move the pointer forward.
946                     let e = unsafe { e.as_ref() };
947                     self.start = e.next.get();
948 
949                     // Set the state of this entry to `Notified` and notify.
950                     match e.state.replace(State::Notified(true)) {
951                         State::Notified(_) => {}
952                         State::Created => {}
953                         State::Polling(w) => w.wake(),
954                         State::Waiting(t) => t.unpark(),
955                     }
956 
957                     // Update the counter.
958                     self.notified += 1;
959                 }
960             }
961         }
962     }
963 }
964 
965 /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
966 #[inline]
full_fence()967 fn full_fence() {
968     if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
969         // HACK(stjepang): On x86 architectures there are two different ways of executing
970         // a `SeqCst` fence.
971         //
972         // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
973         // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
974         //
975         // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
976         // that the second one is sometimes a bit faster.
977         //
978         // The ideal solution here would be to use inline assembly, but we're instead creating a
979         // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
980         // x86 platforms is going to optimize this away.
981         let a = AtomicUsize::new(0);
982         a.compare_and_swap(0, 1, Ordering::SeqCst);
983     } else {
984         atomic::fence(Ordering::SeqCst);
985     }
986 }
987