1 //! Notify async tasks or threads.
2 //!
3 //! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4 //!
5 //! You can use this crate to turn non-blocking data structures into async or blocking data
6 //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7 //! for acquiring locks.
8 //!
9 //! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
10 //! [simple mutex]: https://github.com/stjepang/event-listener/blob/master/examples/mutex.rs
11 //!
12 //! # Examples
13 //!
14 //! Wait until another thread sets a boolean flag:
15 //!
16 //! ```
17 //! use std::sync::atomic::{AtomicBool, Ordering};
18 //! use std::sync::Arc;
19 //! use std::thread;
20 //! use std::time::Duration;
21 //! use std::usize;
22 //! use event_listener::Event;
23 //!
24 //! let flag = Arc::new(AtomicBool::new(false));
25 //! let event = Arc::new(Event::new());
26 //!
27 //! // Spawn a thread that will set the flag after 1 second.
28 //! thread::spawn({
29 //! let flag = flag.clone();
30 //! let event = event.clone();
31 //! move || {
32 //! // Wait for a second.
33 //! thread::sleep(Duration::from_secs(1));
34 //!
35 //! // Set the flag.
36 //! flag.store(true, Ordering::SeqCst);
37 //!
38 //! // Notify all listeners that the flag has been set.
39 //! event.notify(usize::MAX);
40 //! }
41 //! });
42 //!
43 //! // Wait until the flag is set.
44 //! loop {
45 //! // Check the flag.
46 //! if flag.load(Ordering::SeqCst) {
47 //! break;
48 //! }
49 //!
50 //! // Start listening for events.
51 //! let listener = event.listen();
52 //!
53 //! // Check the flag again after creating the listener.
54 //! if flag.load(Ordering::SeqCst) {
55 //! break;
56 //! }
57 //!
58 //! // Wait for a notification and continue the loop.
59 //! listener.wait();
60 //! }
61 //! ```
62
63 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
64
65 use std::cell::{Cell, UnsafeCell};
66 use std::fmt;
67 use std::future::Future;
68 use std::mem::{self, ManuallyDrop};
69 use std::ops::{Deref, DerefMut};
70 use std::panic::{RefUnwindSafe, UnwindSafe};
71 use std::pin::Pin;
72 use std::ptr::{self, NonNull};
73 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
74 use std::sync::{Arc, Mutex, MutexGuard};
75 use std::task::{Context, Poll, Waker};
76 use std::thread::{self, Thread};
77 use std::time::{Duration, Instant};
78 use std::usize;
79
80 /// Inner state of [`Event`].
81 struct Inner {
82 /// The number of notified entries, or `usize::MAX` if all of them have been notified.
83 ///
84 /// If there are no entries, this value is set to `usize::MAX`.
85 notified: AtomicUsize,
86
87 /// A linked list holding registered listeners.
88 list: Mutex<List>,
89
90 /// A single cached list entry to avoid allocations on the fast path of the insertion.
91 cache: UnsafeCell<Entry>,
92 }
93
94 impl Inner {
95 /// Locks the list.
lock(&self) -> ListGuard<'_>96 fn lock(&self) -> ListGuard<'_> {
97 ListGuard {
98 inner: self,
99 guard: self.list.lock().unwrap(),
100 }
101 }
102
103 /// Returns the pointer to the single cached list entry.
104 #[inline(always)]
cache_ptr(&self) -> NonNull<Entry>105 fn cache_ptr(&self) -> NonNull<Entry> {
106 unsafe { NonNull::new_unchecked(self.cache.get()) }
107 }
108 }
109
110 /// A synchronization primitive for notifying async tasks and threads.
111 ///
112 /// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
113 ///
114 /// 1. [`Event::notify()`] notifies a number of listeners.
115 /// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
116 ///
117 /// If there are no active listeners at the time a notification is sent, it simply gets lost.
118 ///
119 /// There are two ways for a listener to wait for a notification:
120 ///
121 /// 1. In an asynchronous manner using `.await`.
122 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
123 ///
124 /// If a notified listener is dropped without receiving a notification, dropping will notify
125 /// another active listener. Whether one *additional* listener will be notified depends on what
126 /// kind of notification was delivered.
127 ///
128 /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
129 pub struct Event {
130 /// A pointer to heap-allocated inner state.
131 ///
132 /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
133 /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
134 /// reference count.
135 inner: AtomicPtr<Inner>,
136 }
137
138 unsafe impl Send for Event {}
139 unsafe impl Sync for Event {}
140
141 impl UnwindSafe for Event {}
142 impl RefUnwindSafe for Event {}
143
144 impl Event {
145 /// Creates a new [`Event`].
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use event_listener::Event;
151 ///
152 /// let event = Event::new();
153 /// ```
154 #[inline]
new() -> Event155 pub const fn new() -> Event {
156 Event {
157 inner: AtomicPtr::new(ptr::null_mut()),
158 }
159 }
160
161 /// Returns a guard listening for a notification.
162 ///
163 /// This method emits a `SeqCst` fence after registering a listener.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// use event_listener::Event;
169 ///
170 /// let event = Event::new();
171 /// let listener = event.listen();
172 /// ```
173 #[cold]
listen(&self) -> EventListener174 pub fn listen(&self) -> EventListener {
175 let inner = self.inner();
176 let listener = EventListener {
177 inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
178 entry: Some(inner.lock().insert(inner.cache_ptr())),
179 };
180
181 // Make sure the listener is registered before whatever happens next.
182 full_fence();
183 listener
184 }
185
186 /// Notifies a number of active listeners.
187 ///
188 /// The number is allowed to be zero or exceed the current number of listeners.
189 ///
190 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
191 /// listeners among the active ones are notified.
192 ///
193 /// This method emits a `SeqCst` fence before notifying listeners.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use event_listener::Event;
199 ///
200 /// let event = Event::new();
201 ///
202 /// // This notification gets lost because there are no listeners.
203 /// event.notify(1);
204 ///
205 /// let listener1 = event.listen();
206 /// let listener2 = event.listen();
207 /// let listener3 = event.listen();
208 ///
209 /// // Notifies two listeners.
210 /// //
211 /// // Listener queueing is fair, which means `listener1` and `listener2`
212 /// // get notified here since they start listening before `listener3`.
213 /// event.notify(2);
214 /// ```
215 #[inline]
notify(&self, n: usize)216 pub fn notify(&self, n: usize) {
217 // Make sure the notification comes after whatever triggered it.
218 full_fence();
219
220 if let Some(inner) = self.try_inner() {
221 // Notify if there is at least one unnotified listener and the number of notified
222 // listeners is less than `n`.
223 if inner.notified.load(Ordering::Acquire) < n {
224 inner.lock().notify(n);
225 }
226 }
227 }
228
229 /// Notifies a number of active listeners without emitting a `SeqCst` fence.
230 ///
231 /// The number is allowed to be zero or exceed the current number of listeners.
232 ///
233 /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
234 /// listeners among the active ones are notified.
235 ///
236 /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// use event_listener::Event;
242 /// use std::sync::atomic::{self, Ordering};
243 ///
244 /// let event = Event::new();
245 ///
246 /// // This notification gets lost because there are no listeners.
247 /// event.notify(1);
248 ///
249 /// let listener1 = event.listen();
250 /// let listener2 = event.listen();
251 /// let listener3 = event.listen();
252 ///
253 /// // We should emit a fence manually when using relaxed notifications.
254 /// atomic::fence(Ordering::SeqCst);
255 ///
256 /// // Notifies two listeners.
257 /// //
258 /// // Listener queueing is fair, which means `listener1` and `listener2`
259 /// // get notified here since they start listening before `listener3`.
260 /// event.notify(2);
261 /// ```
262 #[inline]
notify_relaxed(&self, n: usize)263 pub fn notify_relaxed(&self, n: usize) {
264 if let Some(inner) = self.try_inner() {
265 // Notify if there is at least one unnotified listener and the number of notified
266 // listeners is less than `n`.
267 if inner.notified.load(Ordering::Acquire) < n {
268 inner.lock().notify(n);
269 }
270 }
271 }
272
273 /// Notifies a number of active and still unnotified listeners.
274 ///
275 /// The number is allowed to be zero or exceed the current number of listeners.
276 ///
277 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
278 /// were previously unnotified.
279 ///
280 /// This method emits a `SeqCst` fence before notifying listeners.
281 ///
282 /// # Examples
283 ///
284 /// ```
285 /// use event_listener::Event;
286 ///
287 /// let event = Event::new();
288 ///
289 /// // This notification gets lost because there are no listeners.
290 /// event.notify(1);
291 ///
292 /// let listener1 = event.listen();
293 /// let listener2 = event.listen();
294 /// let listener3 = event.listen();
295 ///
296 /// // Notifies two listeners.
297 /// //
298 /// // Listener queueing is fair, which means `listener1` and `listener2`
299 /// // get notified here since they start listening before `listener3`.
300 /// event.notify_additional(1);
301 /// event.notify_additional(1);
302 /// ```
303 #[inline]
notify_additional(&self, n: usize)304 pub fn notify_additional(&self, n: usize) {
305 // Make sure the notification comes after whatever triggered it.
306 full_fence();
307
308 if let Some(inner) = self.try_inner() {
309 // Notify if there is at least one unnotified listener.
310 if inner.notified.load(Ordering::Acquire) < usize::MAX {
311 inner.lock().notify_additional(n);
312 }
313 }
314 }
315
316 /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
317 /// fence.
318 ///
319 /// The number is allowed to be zero or exceed the current number of listeners.
320 ///
321 /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
322 /// were previously unnotified.
323 ///
324 /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
325 ///
326 /// # Examples
327 ///
328 /// ```
329 /// use event_listener::Event;
330 /// use std::sync::atomic::{self, Ordering};
331 ///
332 /// let event = Event::new();
333 ///
334 /// // This notification gets lost because there are no listeners.
335 /// event.notify(1);
336 ///
337 /// let listener1 = event.listen();
338 /// let listener2 = event.listen();
339 /// let listener3 = event.listen();
340 ///
341 /// // We should emit a fence manually when using relaxed notifications.
342 /// atomic::fence(Ordering::SeqCst);
343 ///
344 /// // Notifies two listeners.
345 /// //
346 /// // Listener queueing is fair, which means `listener1` and `listener2`
347 /// // get notified here since they start listening before `listener3`.
348 /// event.notify_additional_relaxed(1);
349 /// event.notify_additional_relaxed(1);
350 /// ```
351 #[inline]
notify_additional_relaxed(&self, n: usize)352 pub fn notify_additional_relaxed(&self, n: usize) {
353 if let Some(inner) = self.try_inner() {
354 // Notify if there is at least one unnotified listener.
355 if inner.notified.load(Ordering::Acquire) < usize::MAX {
356 inner.lock().notify_additional(n);
357 }
358 }
359 }
360
361 /// Returns a reference to the inner state if it was initialized.
362 #[inline]
try_inner(&self) -> Option<&Inner>363 fn try_inner(&self) -> Option<&Inner> {
364 let inner = self.inner.load(Ordering::Acquire);
365 unsafe { inner.as_ref() }
366 }
367
368 /// Returns a reference to the inner state, initializing it if necessary.
inner(&self) -> &Inner369 fn inner(&self) -> &Inner {
370 let mut inner = self.inner.load(Ordering::Acquire);
371
372 // Initialize the state if this is its first use.
373 if inner.is_null() {
374 // Allocate on the heap.
375 let new = Arc::new(Inner {
376 notified: AtomicUsize::new(usize::MAX),
377 list: std::sync::Mutex::new(List {
378 head: None,
379 tail: None,
380 start: None,
381 len: 0,
382 notified: 0,
383 cache_used: false,
384 }),
385 cache: UnsafeCell::new(Entry {
386 state: Cell::new(State::Created),
387 prev: Cell::new(None),
388 next: Cell::new(None),
389 }),
390 });
391 // Convert the heap-allocated state into a raw pointer.
392 let new = Arc::into_raw(new) as *mut Inner;
393
394 // Attempt to replace the null-pointer with the new state pointer.
395 inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel);
396
397 // Check if the old pointer value was indeed null.
398 if inner.is_null() {
399 // If yes, then use the new state pointer.
400 inner = new;
401 } else {
402 // If not, that means a concurrent operation has initialized the state.
403 // In that case, use the old pointer and deallocate the new one.
404 unsafe {
405 drop(Arc::from_raw(new));
406 }
407 }
408 }
409
410 unsafe { &*inner }
411 }
412 }
413
414 impl Drop for Event {
415 #[inline]
drop(&mut self)416 fn drop(&mut self) {
417 let inner: *mut Inner = *self.inner.get_mut();
418
419 // If the state pointer has been initialized, deallocate it.
420 if !inner.is_null() {
421 unsafe {
422 drop(Arc::from_raw(inner));
423 }
424 }
425 }
426 }
427
428 impl fmt::Debug for Event {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 f.pad("Event { .. }")
431 }
432 }
433
434 impl Default for Event {
default() -> Event435 fn default() -> Event {
436 Event::new()
437 }
438 }
439
440 /// A guard waiting for a notification from an [`Event`].
441 ///
442 /// There are two ways for a listener to wait for a notification:
443 ///
444 /// 1. In an asynchronous manner using `.await`.
445 /// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
446 ///
447 /// If a notified listener is dropped without receiving a notification, dropping will notify
448 /// another active listener. Whether one *additional* listener will be notified depends on what
449 /// kind of notification was delivered.
450 pub struct EventListener {
451 /// A reference to [`Event`]'s inner state.
452 inner: Arc<Inner>,
453
454 /// A pointer to this listener's entry in the linked list.
455 entry: Option<NonNull<Entry>>,
456 }
457
458 unsafe impl Send for EventListener {}
459 unsafe impl Sync for EventListener {}
460
461 impl UnwindSafe for EventListener {}
462 impl RefUnwindSafe for EventListener {}
463
464 impl EventListener {
465 /// Blocks until a notification is received.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// use event_listener::Event;
471 ///
472 /// let event = Event::new();
473 /// let listener = event.listen();
474 ///
475 /// // Notify `listener`.
476 /// event.notify(1);
477 ///
478 /// // Receive the notification.
479 /// listener.wait();
480 /// ```
wait(self)481 pub fn wait(self) {
482 self.wait_internal(None);
483 }
484
485 /// Blocks until a notification is received or a timeout is reached.
486 ///
487 /// Returns `true` if a notification was received.
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use std::time::Duration;
493 /// use event_listener::Event;
494 ///
495 /// let event = Event::new();
496 /// let listener = event.listen();
497 ///
498 /// // There are no notification so this times out.
499 /// assert!(!listener.wait_timeout(Duration::from_secs(1)));
500 /// ```
wait_timeout(self, timeout: Duration) -> bool501 pub fn wait_timeout(self, timeout: Duration) -> bool {
502 self.wait_internal(Some(Instant::now() + timeout))
503 }
504
505 /// Blocks until a notification is received or a deadline is reached.
506 ///
507 /// Returns `true` if a notification was received.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// use std::time::{Duration, Instant};
513 /// use event_listener::Event;
514 ///
515 /// let event = Event::new();
516 /// let listener = event.listen();
517 ///
518 /// // There are no notification so this times out.
519 /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
520 /// ```
wait_deadline(self, deadline: Instant) -> bool521 pub fn wait_deadline(self, deadline: Instant) -> bool {
522 self.wait_internal(Some(deadline))
523 }
524
525 /// Drops this listener and discards its notification (if any) without notifying another
526 /// active listener.
527 ///
528 /// Returns `true` if a notification was discarded.
529 ///
530 /// # Examples
531 /// ```
532 /// use event_listener::Event;
533 ///
534 /// let event = Event::new();
535 /// let listener1 = event.listen();
536 /// let listener2 = event.listen();
537 ///
538 /// event.notify(1);
539 ///
540 /// assert!(listener1.discard());
541 /// assert!(!listener2.discard());
542 /// ```
discard(mut self) -> bool543 pub fn discard(mut self) -> bool {
544 // If this listener has never picked up a notification...
545 if let Some(entry) = self.entry.take() {
546 let mut list = self.inner.lock();
547 // Remove the listener from the list and return `true` if it was notified.
548 if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
549 return true;
550 }
551 }
552 false
553 }
554
555 /// Returns `true` if this listener listens to the given `Event`.
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// use event_listener::Event;
561 ///
562 /// let event = Event::new();
563 /// let listener = event.listen();
564 ///
565 /// assert!(listener.listens_to(&event));
566 /// ```
567 #[inline]
listens_to(&self, event: &Event) -> bool568 pub fn listens_to(&self, event: &Event) -> bool {
569 ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
570 }
571
572 /// Returns `true` if both listeners listen to the same `Event`.
573 ///
574 /// # Examples
575 ///
576 /// ```
577 /// use event_listener::Event;
578 ///
579 /// let event = Event::new();
580 /// let listener1 = event.listen();
581 /// let listener2 = event.listen();
582 ///
583 /// assert!(listener1.same_event(&listener2));
584 /// ```
same_event(&self, other: &EventListener) -> bool585 pub fn same_event(&self, other: &EventListener) -> bool {
586 ptr::eq::<Inner>(&*self.inner, &*other.inner)
587 }
588
wait_internal(mut self, deadline: Option<Instant>) -> bool589 fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
590 // Take out the entry pointer and set it to `None`.
591 let entry = match self.entry.take() {
592 None => unreachable!("cannot wait twice on an `EventListener`"),
593 Some(entry) => entry,
594 };
595
596 // Set this listener's state to `Waiting`.
597 {
598 let mut list = self.inner.lock();
599 let e = unsafe { entry.as_ref() };
600
601 // Do a dummy replace operation in order to take out the state.
602 match e.state.replace(State::Notified(false)) {
603 State::Notified(_) => {
604 // If this listener has been notified, remove it from the list and return.
605 list.remove(entry, self.inner.cache_ptr());
606 return true;
607 }
608 // Otherwise, set the state to `Waiting`.
609 _ => e.state.set(State::Waiting(thread::current())),
610 }
611 }
612
613 // Wait until a notification is received or the timeout is reached.
614 loop {
615 match deadline {
616 None => thread::park(),
617
618 Some(deadline) => {
619 // Check for timeout.
620 let now = Instant::now();
621 if now >= deadline {
622 // Remove the entry and check if notified.
623 return self
624 .inner
625 .lock()
626 .remove(entry, self.inner.cache_ptr())
627 .is_notified();
628 }
629
630 // Park until the deadline.
631 thread::park_timeout(deadline - now);
632 }
633 }
634
635 let mut list = self.inner.lock();
636 let e = unsafe { entry.as_ref() };
637
638 // Do a dummy replace operation in order to take out the state.
639 match e.state.replace(State::Notified(false)) {
640 State::Notified(_) => {
641 // If this listener has been notified, remove it from the list and return.
642 list.remove(entry, self.inner.cache_ptr());
643 return true;
644 }
645 // Otherwise, set the state back to `Waiting`.
646 state => e.state.set(state),
647 }
648 }
649 }
650 }
651
652 impl fmt::Debug for EventListener {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result653 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654 f.pad("EventListener { .. }")
655 }
656 }
657
658 impl Future for EventListener {
659 type Output = ();
660
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>661 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662 let mut list = self.inner.lock();
663
664 let entry = match self.entry {
665 None => unreachable!("cannot poll a completed `EventListener` future"),
666 Some(entry) => entry,
667 };
668 let state = unsafe { &entry.as_ref().state };
669
670 // Do a dummy replace operation in order to take out the state.
671 match state.replace(State::Notified(false)) {
672 State::Notified(_) => {
673 // If this listener has been notified, remove it from the list and return.
674 list.remove(entry, self.inner.cache_ptr());
675 drop(list);
676 self.entry = None;
677 return Poll::Ready(());
678 }
679 State::Created => {
680 // If the listener was just created, put it in the `Polling` state.
681 state.set(State::Polling(cx.waker().clone()));
682 }
683 State::Polling(w) => {
684 // If the listener was in the `Polling` state, update the waker.
685 if w.will_wake(cx.waker()) {
686 state.set(State::Polling(w));
687 } else {
688 state.set(State::Polling(cx.waker().clone()));
689 }
690 }
691 State::Waiting(_) => {
692 unreachable!("cannot poll and wait on `EventListener` at the same time")
693 }
694 }
695
696 Poll::Pending
697 }
698 }
699
700 impl Drop for EventListener {
drop(&mut self)701 fn drop(&mut self) {
702 // If this listener has never picked up a notification...
703 if let Some(entry) = self.entry.take() {
704 let mut list = self.inner.lock();
705
706 // But if a notification was delivered to it...
707 if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) {
708 // Then pass it on to another active listener.
709 if additional {
710 list.notify_additional(1);
711 } else {
712 list.notify(1);
713 }
714 }
715 }
716 }
717 }
718
719 /// A guard holding the linked list locked.
720 struct ListGuard<'a> {
721 /// A reference to [`Event`]'s inner state.
722 inner: &'a Inner,
723
724 /// The actual guard that acquired the linked list.
725 guard: MutexGuard<'a, List>,
726 }
727
728 impl Drop for ListGuard<'_> {
729 #[inline]
drop(&mut self)730 fn drop(&mut self) {
731 let list = &mut **self;
732
733 // Update the atomic `notified` counter.
734 let notified = if list.notified < list.len {
735 list.notified
736 } else {
737 usize::MAX
738 };
739 self.inner.notified.store(notified, Ordering::Release);
740 }
741 }
742
743 impl Deref for ListGuard<'_> {
744 type Target = List;
745
746 #[inline]
deref(&self) -> &List747 fn deref(&self) -> &List {
748 &*self.guard
749 }
750 }
751
752 impl DerefMut for ListGuard<'_> {
753 #[inline]
deref_mut(&mut self) -> &mut List754 fn deref_mut(&mut self) -> &mut List {
755 &mut *self.guard
756 }
757 }
758
759 /// The state of a listener.
760 enum State {
761 /// It has just been created.
762 Created,
763
764 /// It has received a notification.
765 ///
766 /// The `bool` is `true` if this was an "additional" notification.
767 Notified(bool),
768
769 /// An async task is polling it.
770 Polling(Waker),
771
772 /// A thread is blocked on it.
773 Waiting(Thread),
774 }
775
776 impl State {
777 /// Returns `true` if this is the `Notified` state.
778 #[inline]
is_notified(&self) -> bool779 fn is_notified(&self) -> bool {
780 match self {
781 State::Notified(_) => true,
782 State::Created | State::Polling(_) | State::Waiting(_) => false,
783 }
784 }
785 }
786
787 /// An entry representing a registered listener.
788 struct Entry {
789 /// THe state of this listener.
790 state: Cell<State>,
791
792 /// Previous entry in the linked list.
793 prev: Cell<Option<NonNull<Entry>>>,
794
795 /// Next entry in the linked list.
796 next: Cell<Option<NonNull<Entry>>>,
797 }
798
799 /// A linked list of entries.
800 struct List {
801 /// First entry in the list.
802 head: Option<NonNull<Entry>>,
803
804 /// Last entry in the list.
805 tail: Option<NonNull<Entry>>,
806
807 /// The first unnotified entry in the list.
808 start: Option<NonNull<Entry>>,
809
810 /// Total number of entries in the list.
811 len: usize,
812
813 /// The number of notified entries in the list.
814 notified: usize,
815
816 /// Whether the cached entry is used.
817 cache_used: bool,
818 }
819
820 impl List {
821 /// Inserts a new entry into the list.
insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry>822 fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
823 unsafe {
824 let entry = Entry {
825 state: Cell::new(State::Created),
826 prev: Cell::new(self.tail),
827 next: Cell::new(None),
828 };
829
830 let entry = if self.cache_used {
831 // Allocate an entry that is going to become the new tail.
832 NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
833 } else {
834 // No need to allocate - we can use the cached entry.
835 self.cache_used = true;
836 cache.as_ptr().write(entry);
837 cache
838 };
839
840 // Replace the tail with the new entry.
841 match mem::replace(&mut self.tail, Some(entry)) {
842 None => self.head = Some(entry),
843 Some(t) => t.as_ref().next.set(Some(entry)),
844 }
845
846 // If there were no unnotified entries, this one is the first now.
847 if self.start.is_none() {
848 self.start = self.tail;
849 }
850
851 // Bump the entry count.
852 self.len += 1;
853
854 entry
855 }
856 }
857
858 /// Removes an entry from the list and returns its state.
remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State859 fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
860 unsafe {
861 let prev = entry.as_ref().prev.get();
862 let next = entry.as_ref().next.get();
863
864 // Unlink from the previous entry.
865 match prev {
866 None => self.head = next,
867 Some(p) => p.as_ref().next.set(next),
868 }
869
870 // Unlink from the next entry.
871 match next {
872 None => self.tail = prev,
873 Some(n) => n.as_ref().prev.set(prev),
874 }
875
876 // If this was the first unnotified entry, move the pointer to the next one.
877 if self.start == Some(entry) {
878 self.start = next;
879 }
880
881 // Extract the state.
882 let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
883 // Free the cached entry.
884 self.cache_used = false;
885 entry.as_ref().state.replace(State::Created)
886 } else {
887 // Deallocate the entry.
888 Box::from_raw(entry.as_ptr()).state.into_inner()
889 };
890
891 // Update the counters.
892 if state.is_notified() {
893 self.notified -= 1;
894 }
895 self.len -= 1;
896
897 state
898 }
899 }
900
901 /// Notifies a number of entries.
902 #[cold]
notify(&mut self, mut n: usize)903 fn notify(&mut self, mut n: usize) {
904 if n <= self.notified {
905 return;
906 }
907 n -= self.notified;
908
909 while n > 0 {
910 n -= 1;
911
912 // Notify the first unnotified entry.
913 match self.start {
914 None => break,
915 Some(e) => {
916 // Get the entry and move the pointer forward.
917 let e = unsafe { e.as_ref() };
918 self.start = e.next.get();
919
920 // Set the state of this entry to `Notified` and notify.
921 match e.state.replace(State::Notified(false)) {
922 State::Notified(_) => {}
923 State::Created => {}
924 State::Polling(w) => w.wake(),
925 State::Waiting(t) => t.unpark(),
926 }
927
928 // Update the counter.
929 self.notified += 1;
930 }
931 }
932 }
933 }
934
935 /// Notifies a number of additional entries.
936 #[cold]
notify_additional(&mut self, mut n: usize)937 fn notify_additional(&mut self, mut n: usize) {
938 while n > 0 {
939 n -= 1;
940
941 // Notify the first unnotified entry.
942 match self.start {
943 None => break,
944 Some(e) => {
945 // Get the entry and move the pointer forward.
946 let e = unsafe { e.as_ref() };
947 self.start = e.next.get();
948
949 // Set the state of this entry to `Notified` and notify.
950 match e.state.replace(State::Notified(true)) {
951 State::Notified(_) => {}
952 State::Created => {}
953 State::Polling(w) => w.wake(),
954 State::Waiting(t) => t.unpark(),
955 }
956
957 // Update the counter.
958 self.notified += 1;
959 }
960 }
961 }
962 }
963 }
964
965 /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
966 #[inline]
full_fence()967 fn full_fence() {
968 if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
969 // HACK(stjepang): On x86 architectures there are two different ways of executing
970 // a `SeqCst` fence.
971 //
972 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
973 // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
974 //
975 // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
976 // that the second one is sometimes a bit faster.
977 //
978 // The ideal solution here would be to use inline assembly, but we're instead creating a
979 // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
980 // x86 platforms is going to optimize this away.
981 let a = AtomicUsize::new(0);
982 a.compare_and_swap(0, 1, Ordering::SeqCst);
983 } else {
984 atomic::fence(Ordering::SeqCst);
985 }
986 }
987