1 //! Notify async tasks or threads.
2 //!
3 //! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4 //!
5 //! You can use this crate to turn non-blocking data structures into async or blocking data
6 //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7 //! for acquiring locks.
8 //!
9 //! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
10 //! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs
11 //!
12 //! # Examples
13 //!
14 //! Wait until another thread sets a boolean flag:
15 //!
16 //! ```
17 //! use std::sync::atomic::{AtomicBool, Ordering};
18 //! use std::sync::Arc;
19 //! use std::thread;
20 //! use std::time::Duration;
21 //! use std::usize;
22 //! use event_listener::Event;
23 //!
24 //! let flag = Arc::new(AtomicBool::new(false));
25 //! let event = Arc::new(Event::new());
26 //!
27 //! // Spawn a thread that will set the flag after 1 second.
28 //! thread::spawn({
29 //!     let flag = flag.clone();
30 //!     let event = event.clone();
31 //!     move || {
32 //!         // Wait for a second.
33 //!         thread::sleep(Duration::from_secs(1));
34 //!
35 //!         // Set the flag.
36 //!         flag.store(true, Ordering::SeqCst);
37 //!
38 //!         // Notify all listeners that the flag has been set.
39 //!         event.notify(usize::MAX);
40 //!     }
41 //! });
42 //!
43 //! // Wait until the flag is set.
44 //! loop {
45 //!     // Check the flag.
46 //!     if flag.load(Ordering::SeqCst) {
47 //!         break;
48 //!     }
49 //!
50 //!     // Start listening for events.
51 //!     let listener = event.listen();
52 //!
53 //!     // Check the flag again after creating the listener.
54 //!     if flag.load(Ordering::SeqCst) {
55 //!         break;
56 //!     }
57 //!
58 //!     // Wait for a notification and continue the loop.
59 //!     listener.wait();
60 //! }
61 //! ```
62 
63 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
64 
65 use std::cell::{Cell, UnsafeCell};
66 use std::fmt;
67 use std::future::Future;
68 use std::mem::{self, ManuallyDrop};
69 use std::ops::{Deref, DerefMut};
70 use std::panic::{RefUnwindSafe, UnwindSafe};
71 use std::pin::Pin;
72 use std::ptr::{self, NonNull};
73 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
74 use std::sync::{Arc, Mutex, MutexGuard};
75 use std::task::{Context, Poll, Waker};
76 use std::thread::{self, Thread};
77 use std::time::{Duration, Instant};
78 use std::usize;
79 
80 /// Inner state of [`Event`].
81 struct Inner {
82     /// The number of notified entries, or `usize::MAX` if all of them have been notified.
83     ///
84     /// If there are no entries, this value is set to `usize::MAX`.
85     notified: AtomicUsize,
86 
87     /// A linked list holding registered listeners.
88     list: Mutex<List>,
89 
90     /// A single cached list entry to avoid allocations on the fast path of the insertion.
91     cache: UnsafeCell<Entry>,
92 }
93 
94 impl Inner {
95     /// Locks the list.
lock(&self) -> ListGuard<'_>96     fn lock(&self) -> ListGuard<'_> {
97         ListGuard {
98             inner: self,
99             guard: self.list.lock().unwrap(),
100         }
101     }
102 
103     /// Returns the pointer to the single cached list entry.
104     #[inline(always)]
cache_ptr(&self) -> NonNull<Entry>105     fn cache_ptr(&self) -> NonNull<Entry> {
106         unsafe { NonNull::new_unchecked(self.cache.get()) }
107     }
108 }
109 
110 /// A synchronization primitive for notifying async tasks and threads.
111 ///
112 /// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
113 ///
114 /// 1. [`Event::notify()`] notifies a number of listeners.
115 /// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
116 ///
117 /// If there are no active listeners at the time a notification is sent, it simply gets lost.
118 ///
119 /// There are two ways for a listener to wait for a notification:
120 ///
121 /// 1. In an asynchronous manner using `.await`.
122 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
123 ///
124 /// If a notified listener is dropped without receiving a notification, dropping will notify
125 /// another active listener. Whether one *additional* listener will be notified depends on what
126 /// kind of notification was delivered.
127 ///
128 /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
129 pub struct Event {
130     /// A pointer to heap-allocated inner state.
131     ///
132     /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
133     /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
134     /// reference count.
135     inner: AtomicPtr<Inner>,
136 }
137 
138 unsafe impl Send for Event {}
139 unsafe impl Sync for Event {}
140 
141 impl UnwindSafe for Event {}
142 impl RefUnwindSafe for Event {}
143 
144 impl Event {
145     /// Creates a new [`Event`].
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use event_listener::Event;
151     ///
152     /// let event = Event::new();
153     /// ```
154     #[inline]
new() -> Event155     pub const fn new() -> Event {
156         Event {
157             inner: AtomicPtr::new(ptr::null_mut()),
158         }
159     }
160 
161     /// Returns a guard listening for a notification.
162     ///
163     /// This method emits a `SeqCst` fence after registering a listener.
164     ///
165     /// # Examples
166     ///
167     /// ```
168     /// use event_listener::Event;
169     ///
170     /// let event = Event::new();
171     /// let listener = event.listen();
172     /// ```
173     #[cold]
listen(&self) -> EventListener174     pub fn listen(&self) -> EventListener {
175         let inner = self.inner();
176         let listener = EventListener {
177             inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
178             entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) },
179         };
180 
181         // Make sure the listener is registered before whatever happens next.
182         full_fence();
183         listener
184     }
185 
186     /// Notifies a number of active listeners.
187     ///
188     /// The number is allowed to be zero or exceed the current number of listeners.
189     ///
190     /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
191     /// listeners among the active ones are notified.
192     ///
193     /// This method emits a `SeqCst` fence before notifying listeners.
194     ///
195     /// # Examples
196     ///
197     /// ```
198     /// use event_listener::Event;
199     ///
200     /// let event = Event::new();
201     ///
202     /// // This notification gets lost because there are no listeners.
203     /// event.notify(1);
204     ///
205     /// let listener1 = event.listen();
206     /// let listener2 = event.listen();
207     /// let listener3 = event.listen();
208     ///
209     /// // Notifies two listeners.
210     /// //
211     /// // Listener queueing is fair, which means `listener1` and `listener2`
212     /// // get notified here since they start listening before `listener3`.
213     /// event.notify(2);
214     /// ```
215     #[inline]
notify(&self, n: usize)216     pub fn notify(&self, n: usize) {
217         // Make sure the notification comes after whatever triggered it.
218         full_fence();
219 
220         if let Some(inner) = self.try_inner() {
221             // Notify if there is at least one unnotified listener and the number of notified
222             // listeners is less than `n`.
223             if inner.notified.load(Ordering::Acquire) < n {
224                 inner.lock().notify(n);
225             }
226         }
227     }
228 
229     /// Notifies a number of active listeners without emitting a `SeqCst` fence.
230     ///
231     /// The number is allowed to be zero or exceed the current number of listeners.
232     ///
233     /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
234     /// listeners among the active ones are notified.
235     ///
236     /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// use event_listener::Event;
242     /// use std::sync::atomic::{self, Ordering};
243     ///
244     /// let event = Event::new();
245     ///
246     /// // This notification gets lost because there are no listeners.
247     /// event.notify(1);
248     ///
249     /// let listener1 = event.listen();
250     /// let listener2 = event.listen();
251     /// let listener3 = event.listen();
252     ///
253     /// // We should emit a fence manually when using relaxed notifications.
254     /// atomic::fence(Ordering::SeqCst);
255     ///
256     /// // Notifies two listeners.
257     /// //
258     /// // Listener queueing is fair, which means `listener1` and `listener2`
259     /// // get notified here since they start listening before `listener3`.
260     /// event.notify(2);
261     /// ```
262     #[inline]
notify_relaxed(&self, n: usize)263     pub fn notify_relaxed(&self, n: usize) {
264         if let Some(inner) = self.try_inner() {
265             // Notify if there is at least one unnotified listener and the number of notified
266             // listeners is less than `n`.
267             if inner.notified.load(Ordering::Acquire) < n {
268                 inner.lock().notify(n);
269             }
270         }
271     }
272 
273     /// Notifies a number of active and still unnotified listeners.
274     ///
275     /// The number is allowed to be zero or exceed the current number of listeners.
276     ///
277     /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
278     /// were previously unnotified.
279     ///
280     /// This method emits a `SeqCst` fence before notifying listeners.
281     ///
282     /// # Examples
283     ///
284     /// ```
285     /// use event_listener::Event;
286     ///
287     /// let event = Event::new();
288     ///
289     /// // This notification gets lost because there are no listeners.
290     /// event.notify(1);
291     ///
292     /// let listener1 = event.listen();
293     /// let listener2 = event.listen();
294     /// let listener3 = event.listen();
295     ///
296     /// // Notifies two listeners.
297     /// //
298     /// // Listener queueing is fair, which means `listener1` and `listener2`
299     /// // get notified here since they start listening before `listener3`.
300     /// event.notify_additional(1);
301     /// event.notify_additional(1);
302     /// ```
303     #[inline]
notify_additional(&self, n: usize)304     pub fn notify_additional(&self, n: usize) {
305         // Make sure the notification comes after whatever triggered it.
306         full_fence();
307 
308         if let Some(inner) = self.try_inner() {
309             // Notify if there is at least one unnotified listener.
310             if inner.notified.load(Ordering::Acquire) < usize::MAX {
311                 inner.lock().notify_additional(n);
312             }
313         }
314     }
315 
316     /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
317     /// fence.
318     ///
319     /// The number is allowed to be zero or exceed the current number of listeners.
320     ///
321     /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
322     /// were previously unnotified.
323     ///
324     /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
325     ///
326     /// # Examples
327     ///
328     /// ```
329     /// use event_listener::Event;
330     /// use std::sync::atomic::{self, Ordering};
331     ///
332     /// let event = Event::new();
333     ///
334     /// // This notification gets lost because there are no listeners.
335     /// event.notify(1);
336     ///
337     /// let listener1 = event.listen();
338     /// let listener2 = event.listen();
339     /// let listener3 = event.listen();
340     ///
341     /// // We should emit a fence manually when using relaxed notifications.
342     /// atomic::fence(Ordering::SeqCst);
343     ///
344     /// // Notifies two listeners.
345     /// //
346     /// // Listener queueing is fair, which means `listener1` and `listener2`
347     /// // get notified here since they start listening before `listener3`.
348     /// event.notify_additional_relaxed(1);
349     /// event.notify_additional_relaxed(1);
350     /// ```
351     #[inline]
notify_additional_relaxed(&self, n: usize)352     pub fn notify_additional_relaxed(&self, n: usize) {
353         if let Some(inner) = self.try_inner() {
354             // Notify if there is at least one unnotified listener.
355             if inner.notified.load(Ordering::Acquire) < usize::MAX {
356                 inner.lock().notify_additional(n);
357             }
358         }
359     }
360 
361     /// Returns a reference to the inner state if it was initialized.
362     #[inline]
try_inner(&self) -> Option<&Inner>363     fn try_inner(&self) -> Option<&Inner> {
364         let inner = self.inner.load(Ordering::Acquire);
365         unsafe { inner.as_ref() }
366     }
367 
368     /// Returns a raw pointer to the inner state, initializing it if necessary.
369     ///
370     /// This returns a raw pointer instead of reference because `from_raw`
371     /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>
inner(&self) -> *const Inner372     fn inner(&self) -> *const Inner {
373         let mut inner = self.inner.load(Ordering::Acquire);
374 
375         // Initialize the state if this is its first use.
376         if inner.is_null() {
377             // Allocate on the heap.
378             let new = Arc::new(Inner {
379                 notified: AtomicUsize::new(usize::MAX),
380                 list: std::sync::Mutex::new(List {
381                     head: None,
382                     tail: None,
383                     start: None,
384                     len: 0,
385                     notified: 0,
386                     cache_used: false,
387                 }),
388                 cache: UnsafeCell::new(Entry {
389                     state: Cell::new(State::Created),
390                     prev: Cell::new(None),
391                     next: Cell::new(None),
392                 }),
393             });
394             // Convert the heap-allocated state into a raw pointer.
395             let new = Arc::into_raw(new) as *mut Inner;
396 
397             // Attempt to replace the null-pointer with the new state pointer.
398             inner = self
399                 .inner
400                 .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
401                 .unwrap_or_else(|x| x);
402 
403             // Check if the old pointer value was indeed null.
404             if inner.is_null() {
405                 // If yes, then use the new state pointer.
406                 inner = new;
407             } else {
408                 // If not, that means a concurrent operation has initialized the state.
409                 // In that case, use the old pointer and deallocate the new one.
410                 unsafe {
411                     drop(Arc::from_raw(new));
412                 }
413             }
414         }
415 
416         inner
417     }
418 }
419 
420 impl Drop for Event {
421     #[inline]
drop(&mut self)422     fn drop(&mut self) {
423         let inner: *mut Inner = *self.inner.get_mut();
424 
425         // If the state pointer has been initialized, deallocate it.
426         if !inner.is_null() {
427             unsafe {
428                 drop(Arc::from_raw(inner));
429             }
430         }
431     }
432 }
433 
434 impl fmt::Debug for Event {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result435     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436         f.pad("Event { .. }")
437     }
438 }
439 
440 impl Default for Event {
default() -> Event441     fn default() -> Event {
442         Event::new()
443     }
444 }
445 
446 /// A guard waiting for a notification from an [`Event`].
447 ///
448 /// There are two ways for a listener to wait for a notification:
449 ///
450 /// 1. In an asynchronous manner using `.await`.
451 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
452 ///
453 /// If a notified listener is dropped without receiving a notification, dropping will notify
454 /// another active listener. Whether one *additional* listener will be notified depends on what
455 /// kind of notification was delivered.
456 pub struct EventListener {
457     /// A reference to [`Event`]'s inner state.
458     inner: Arc<Inner>,
459 
460     /// A pointer to this listener's entry in the linked list.
461     entry: Option<NonNull<Entry>>,
462 }
463 
464 unsafe impl Send for EventListener {}
465 unsafe impl Sync for EventListener {}
466 
467 impl UnwindSafe for EventListener {}
468 impl RefUnwindSafe for EventListener {}
469 
470 impl EventListener {
471     /// Blocks until a notification is received.
472     ///
473     /// # Examples
474     ///
475     /// ```
476     /// use event_listener::Event;
477     ///
478     /// let event = Event::new();
479     /// let listener = event.listen();
480     ///
481     /// // Notify `listener`.
482     /// event.notify(1);
483     ///
484     /// // Receive the notification.
485     /// listener.wait();
486     /// ```
wait(self)487     pub fn wait(self) {
488         self.wait_internal(None);
489     }
490 
491     /// Blocks until a notification is received or a timeout is reached.
492     ///
493     /// Returns `true` if a notification was received.
494     ///
495     /// # Examples
496     ///
497     /// ```
498     /// use std::time::Duration;
499     /// use event_listener::Event;
500     ///
501     /// let event = Event::new();
502     /// let listener = event.listen();
503     ///
504     /// // There are no notification so this times out.
505     /// assert!(!listener.wait_timeout(Duration::from_secs(1)));
506     /// ```
wait_timeout(self, timeout: Duration) -> bool507     pub fn wait_timeout(self, timeout: Duration) -> bool {
508         self.wait_internal(Some(Instant::now() + timeout))
509     }
510 
511     /// Blocks until a notification is received or a deadline is reached.
512     ///
513     /// Returns `true` if a notification was received.
514     ///
515     /// # Examples
516     ///
517     /// ```
518     /// use std::time::{Duration, Instant};
519     /// use event_listener::Event;
520     ///
521     /// let event = Event::new();
522     /// let listener = event.listen();
523     ///
524     /// // There are no notification so this times out.
525     /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
526     /// ```
wait_deadline(self, deadline: Instant) -> bool527     pub fn wait_deadline(self, deadline: Instant) -> bool {
528         self.wait_internal(Some(deadline))
529     }
530 
531     /// Drops this listener and discards its notification (if any) without notifying another
532     /// active listener.
533     ///
534     /// Returns `true` if a notification was discarded.
535     ///
536     /// # Examples
537     /// ```
538     /// use event_listener::Event;
539     ///
540     /// let event = Event::new();
541     /// let listener1 = event.listen();
542     /// let listener2 = event.listen();
543     ///
544     /// event.notify(1);
545     ///
546     /// assert!(listener1.discard());
547     /// assert!(!listener2.discard());
548     /// ```
discard(mut self) -> bool549     pub fn discard(mut self) -> bool {
550         // If this listener has never picked up a notification...
551         if let Some(entry) = self.entry.take() {
552             let mut list = self.inner.lock();
553             // Remove the listener from the list and return `true` if it was notified.
554             if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
555                 return true;
556             }
557         }
558         false
559     }
560 
561     /// Returns `true` if this listener listens to the given `Event`.
562     ///
563     /// # Examples
564     ///
565     /// ```
566     /// use event_listener::Event;
567     ///
568     /// let event = Event::new();
569     /// let listener = event.listen();
570     ///
571     /// assert!(listener.listens_to(&event));
572     /// ```
573     #[inline]
listens_to(&self, event: &Event) -> bool574     pub fn listens_to(&self, event: &Event) -> bool {
575         ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
576     }
577 
578     /// Returns `true` if both listeners listen to the same `Event`.
579     ///
580     /// # Examples
581     ///
582     /// ```
583     /// use event_listener::Event;
584     ///
585     /// let event = Event::new();
586     /// let listener1 = event.listen();
587     /// let listener2 = event.listen();
588     ///
589     /// assert!(listener1.same_event(&listener2));
590     /// ```
same_event(&self, other: &EventListener) -> bool591     pub fn same_event(&self, other: &EventListener) -> bool {
592         ptr::eq::<Inner>(&*self.inner, &*other.inner)
593     }
594 
wait_internal(mut self, deadline: Option<Instant>) -> bool595     fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
596         // Take out the entry pointer and set it to `None`.
597         let entry = match self.entry.take() {
598             None => unreachable!("cannot wait twice on an `EventListener`"),
599             Some(entry) => entry,
600         };
601 
602         // Set this listener's state to `Waiting`.
603         {
604             let mut list = self.inner.lock();
605             let e = unsafe { entry.as_ref() };
606 
607             // Do a dummy replace operation in order to take out the state.
608             match e.state.replace(State::Notified(false)) {
609                 State::Notified(_) => {
610                     // If this listener has been notified, remove it from the list and return.
611                     list.remove(entry, self.inner.cache_ptr());
612                     return true;
613                 }
614                 // Otherwise, set the state to `Waiting`.
615                 _ => e.state.set(State::Waiting(thread::current())),
616             }
617         }
618 
619         // Wait until a notification is received or the timeout is reached.
620         loop {
621             match deadline {
622                 None => thread::park(),
623 
624                 Some(deadline) => {
625                     // Check for timeout.
626                     let now = Instant::now();
627                     if now >= deadline {
628                         // Remove the entry and check if notified.
629                         return self
630                             .inner
631                             .lock()
632                             .remove(entry, self.inner.cache_ptr())
633                             .is_notified();
634                     }
635 
636                     // Park until the deadline.
637                     thread::park_timeout(deadline - now);
638                 }
639             }
640 
641             let mut list = self.inner.lock();
642             let e = unsafe { entry.as_ref() };
643 
644             // Do a dummy replace operation in order to take out the state.
645             match e.state.replace(State::Notified(false)) {
646                 State::Notified(_) => {
647                     // If this listener has been notified, remove it from the list and return.
648                     list.remove(entry, self.inner.cache_ptr());
649                     return true;
650                 }
651                 // Otherwise, set the state back to `Waiting`.
652                 state => e.state.set(state),
653             }
654         }
655     }
656 }
657 
658 impl fmt::Debug for EventListener {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result659     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660         f.pad("EventListener { .. }")
661     }
662 }
663 
664 impl Future for EventListener {
665     type Output = ();
666 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>667     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
668         let mut list = self.inner.lock();
669 
670         let entry = match self.entry {
671             None => unreachable!("cannot poll a completed `EventListener` future"),
672             Some(entry) => entry,
673         };
674         let state = unsafe { &entry.as_ref().state };
675 
676         // Do a dummy replace operation in order to take out the state.
677         match state.replace(State::Notified(false)) {
678             State::Notified(_) => {
679                 // If this listener has been notified, remove it from the list and return.
680                 list.remove(entry, self.inner.cache_ptr());
681                 drop(list);
682                 self.entry = None;
683                 return Poll::Ready(());
684             }
685             State::Created => {
686                 // If the listener was just created, put it in the `Polling` state.
687                 state.set(State::Polling(cx.waker().clone()));
688             }
689             State::Polling(w) => {
690                 // If the listener was in the `Polling` state, update the waker.
691                 if w.will_wake(cx.waker()) {
692                     state.set(State::Polling(w));
693                 } else {
694                     state.set(State::Polling(cx.waker().clone()));
695                 }
696             }
697             State::Waiting(_) => {
698                 unreachable!("cannot poll and wait on `EventListener` at the same time")
699             }
700         }
701 
702         Poll::Pending
703     }
704 }
705 
706 impl Drop for EventListener {
drop(&mut self)707     fn drop(&mut self) {
708         // If this listener has never picked up a notification...
709         if let Some(entry) = self.entry.take() {
710             let mut list = self.inner.lock();
711 
712             // But if a notification was delivered to it...
713             if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) {
714                 // Then pass it on to another active listener.
715                 if additional {
716                     list.notify_additional(1);
717                 } else {
718                     list.notify(1);
719                 }
720             }
721         }
722     }
723 }
724 
725 /// A guard holding the linked list locked.
726 struct ListGuard<'a> {
727     /// A reference to [`Event`]'s inner state.
728     inner: &'a Inner,
729 
730     /// The actual guard that acquired the linked list.
731     guard: MutexGuard<'a, List>,
732 }
733 
734 impl Drop for ListGuard<'_> {
735     #[inline]
drop(&mut self)736     fn drop(&mut self) {
737         let list = &mut **self;
738 
739         // Update the atomic `notified` counter.
740         let notified = if list.notified < list.len {
741             list.notified
742         } else {
743             usize::MAX
744         };
745         self.inner.notified.store(notified, Ordering::Release);
746     }
747 }
748 
749 impl Deref for ListGuard<'_> {
750     type Target = List;
751 
752     #[inline]
deref(&self) -> &List753     fn deref(&self) -> &List {
754         &*self.guard
755     }
756 }
757 
758 impl DerefMut for ListGuard<'_> {
759     #[inline]
deref_mut(&mut self) -> &mut List760     fn deref_mut(&mut self) -> &mut List {
761         &mut *self.guard
762     }
763 }
764 
765 /// The state of a listener.
766 enum State {
767     /// It has just been created.
768     Created,
769 
770     /// It has received a notification.
771     ///
772     /// The `bool` is `true` if this was an "additional" notification.
773     Notified(bool),
774 
775     /// An async task is polling it.
776     Polling(Waker),
777 
778     /// A thread is blocked on it.
779     Waiting(Thread),
780 }
781 
782 impl State {
783     /// Returns `true` if this is the `Notified` state.
784     #[inline]
is_notified(&self) -> bool785     fn is_notified(&self) -> bool {
786         match self {
787             State::Notified(_) => true,
788             State::Created | State::Polling(_) | State::Waiting(_) => false,
789         }
790     }
791 }
792 
793 /// An entry representing a registered listener.
794 struct Entry {
795     /// THe state of this listener.
796     state: Cell<State>,
797 
798     /// Previous entry in the linked list.
799     prev: Cell<Option<NonNull<Entry>>>,
800 
801     /// Next entry in the linked list.
802     next: Cell<Option<NonNull<Entry>>>,
803 }
804 
805 /// A linked list of entries.
806 struct List {
807     /// First entry in the list.
808     head: Option<NonNull<Entry>>,
809 
810     /// Last entry in the list.
811     tail: Option<NonNull<Entry>>,
812 
813     /// The first unnotified entry in the list.
814     start: Option<NonNull<Entry>>,
815 
816     /// Total number of entries in the list.
817     len: usize,
818 
819     /// The number of notified entries in the list.
820     notified: usize,
821 
822     /// Whether the cached entry is used.
823     cache_used: bool,
824 }
825 
826 impl List {
827     /// Inserts a new entry into the list.
insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry>828     fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
829         unsafe {
830             let entry = Entry {
831                 state: Cell::new(State::Created),
832                 prev: Cell::new(self.tail),
833                 next: Cell::new(None),
834             };
835 
836             let entry = if self.cache_used {
837                 // Allocate an entry that is going to become the new tail.
838                 NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
839             } else {
840                 // No need to allocate - we can use the cached entry.
841                 self.cache_used = true;
842                 cache.as_ptr().write(entry);
843                 cache
844             };
845 
846             // Replace the tail with the new entry.
847             match mem::replace(&mut self.tail, Some(entry)) {
848                 None => self.head = Some(entry),
849                 Some(t) => t.as_ref().next.set(Some(entry)),
850             }
851 
852             // If there were no unnotified entries, this one is the first now.
853             if self.start.is_none() {
854                 self.start = self.tail;
855             }
856 
857             // Bump the entry count.
858             self.len += 1;
859 
860             entry
861         }
862     }
863 
864     /// Removes an entry from the list and returns its state.
remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State865     fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
866         unsafe {
867             let prev = entry.as_ref().prev.get();
868             let next = entry.as_ref().next.get();
869 
870             // Unlink from the previous entry.
871             match prev {
872                 None => self.head = next,
873                 Some(p) => p.as_ref().next.set(next),
874             }
875 
876             // Unlink from the next entry.
877             match next {
878                 None => self.tail = prev,
879                 Some(n) => n.as_ref().prev.set(prev),
880             }
881 
882             // If this was the first unnotified entry, move the pointer to the next one.
883             if self.start == Some(entry) {
884                 self.start = next;
885             }
886 
887             // Extract the state.
888             let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
889                 // Free the cached entry.
890                 self.cache_used = false;
891                 entry.as_ref().state.replace(State::Created)
892             } else {
893                 // Deallocate the entry.
894                 Box::from_raw(entry.as_ptr()).state.into_inner()
895             };
896 
897             // Update the counters.
898             if state.is_notified() {
899                 self.notified -= 1;
900             }
901             self.len -= 1;
902 
903             state
904         }
905     }
906 
907     /// Notifies a number of entries.
908     #[cold]
notify(&mut self, mut n: usize)909     fn notify(&mut self, mut n: usize) {
910         if n <= self.notified {
911             return;
912         }
913         n -= self.notified;
914 
915         while n > 0 {
916             n -= 1;
917 
918             // Notify the first unnotified entry.
919             match self.start {
920                 None => break,
921                 Some(e) => {
922                     // Get the entry and move the pointer forward.
923                     let e = unsafe { e.as_ref() };
924                     self.start = e.next.get();
925 
926                     // Set the state of this entry to `Notified` and notify.
927                     match e.state.replace(State::Notified(false)) {
928                         State::Notified(_) => {}
929                         State::Created => {}
930                         State::Polling(w) => w.wake(),
931                         State::Waiting(t) => t.unpark(),
932                     }
933 
934                     // Update the counter.
935                     self.notified += 1;
936                 }
937             }
938         }
939     }
940 
941     /// Notifies a number of additional entries.
942     #[cold]
notify_additional(&mut self, mut n: usize)943     fn notify_additional(&mut self, mut n: usize) {
944         while n > 0 {
945             n -= 1;
946 
947             // Notify the first unnotified entry.
948             match self.start {
949                 None => break,
950                 Some(e) => {
951                     // Get the entry and move the pointer forward.
952                     let e = unsafe { e.as_ref() };
953                     self.start = e.next.get();
954 
955                     // Set the state of this entry to `Notified` and notify.
956                     match e.state.replace(State::Notified(true)) {
957                         State::Notified(_) => {}
958                         State::Created => {}
959                         State::Polling(w) => w.wake(),
960                         State::Waiting(t) => t.unpark(),
961                     }
962 
963                     // Update the counter.
964                     self.notified += 1;
965                 }
966             }
967         }
968     }
969 }
970 
971 /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
972 #[inline]
full_fence()973 fn full_fence() {
974     if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
975         // HACK(stjepang): On x86 architectures there are two different ways of executing
976         // a `SeqCst` fence.
977         //
978         // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
979         // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
980         //
981         // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
982         // that the second one is sometimes a bit faster.
983         //
984         // The ideal solution here would be to use inline assembly, but we're instead creating a
985         // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
986         // x86 platforms is going to optimize this away.
987         let a = AtomicUsize::new(0);
988         let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
989     } else {
990         atomic::fence(Ordering::SeqCst);
991     }
992 }
993