1 //! Reference counter for channels.
2 
3 use std::isize;
4 use std::ops;
5 use std::process;
6 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 
8 /// Reference counter internals.
9 struct Counter<C> {
10     /// The number of senders associated with the channel.
11     senders: AtomicUsize,
12 
13     /// The number of receivers associated with the channel.
14     receivers: AtomicUsize,
15 
16     /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
17     destroy: AtomicBool,
18 
19     /// The internal channel.
20     chan: C,
21 }
22 
23 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)24 pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25     let counter = Box::into_raw(Box::new(Counter {
26         senders: AtomicUsize::new(1),
27         receivers: AtomicUsize::new(1),
28         destroy: AtomicBool::new(false),
29         chan,
30     }));
31     let s = Sender { counter };
32     let r = Receiver { counter };
33     (s, r)
34 }
35 
36 /// The sending side.
37 pub(crate) struct Sender<C> {
38     counter: *mut Counter<C>,
39 }
40 
41 impl<C> Sender<C> {
42     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>43     fn counter(&self) -> &Counter<C> {
44         unsafe { &*self.counter }
45     }
46 
47     /// Acquires another sender reference.
acquire(&self) -> Sender<C>48     pub(crate) fn acquire(&self) -> Sender<C> {
49         let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50 
51         // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53         // just abort when the count becomes very large.
54         if count > isize::MAX as usize {
55             process::abort();
56         }
57 
58         Sender {
59             counter: self.counter,
60         }
61     }
62 
63     /// Releases the sender reference.
64     ///
65     /// Function `disconnect` will be called if this is the last sender reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)66     pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
67         if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68             disconnect(&self.counter().chan);
69 
70             if self.counter().destroy.swap(true, Ordering::AcqRel) {
71                 drop(Box::from_raw(self.counter));
72             }
73         }
74     }
75 }
76 
77 impl<C> ops::Deref for Sender<C> {
78     type Target = C;
79 
deref(&self) -> &C80     fn deref(&self) -> &C {
81         &self.counter().chan
82     }
83 }
84 
85 impl<C> PartialEq for Sender<C> {
eq(&self, other: &Sender<C>) -> bool86     fn eq(&self, other: &Sender<C>) -> bool {
87         self.counter == other.counter
88     }
89 }
90 
91 /// The receiving side.
92 pub(crate) struct Receiver<C> {
93     counter: *mut Counter<C>,
94 }
95 
96 impl<C> Receiver<C> {
97     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>98     fn counter(&self) -> &Counter<C> {
99         unsafe { &*self.counter }
100     }
101 
102     /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>103     pub(crate) fn acquire(&self) -> Receiver<C> {
104         let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
105 
106         // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
107         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
108         // just abort when the count becomes very large.
109         if count > isize::MAX as usize {
110             process::abort();
111         }
112 
113         Receiver {
114             counter: self.counter,
115         }
116     }
117 
118     /// Releases the receiver reference.
119     ///
120     /// Function `disconnect` will be called if this is the last receiver reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)121     pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
122         if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
123             disconnect(&self.counter().chan);
124 
125             if self.counter().destroy.swap(true, Ordering::AcqRel) {
126                 drop(Box::from_raw(self.counter));
127             }
128         }
129     }
130 }
131 
132 impl<C> ops::Deref for Receiver<C> {
133     type Target = C;
134 
deref(&self) -> &C135     fn deref(&self) -> &C {
136         &self.counter().chan
137     }
138 }
139 
140 impl<C> PartialEq for Receiver<C> {
eq(&self, other: &Receiver<C>) -> bool141     fn eq(&self, other: &Receiver<C>) -> bool {
142         self.counter == other.counter
143     }
144 }
145