1 ///! Reference counter for channels.
2 
3 use std::isize;
4 use std::ops;
5 use std::process;
6 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7 
8 /// Reference counter internals.
9 struct Counter<C> {
10     /// The number of senders associated with the channel.
11     senders: AtomicUsize,
12 
13     /// The number of receivers associated with the channel.
14     receivers: AtomicUsize,
15 
16     /// If `true`, either the sending or receiving side has been dropped.
17     disconnected: AtomicBool,
18 
19     /// The internal channel.
20     chan: C,
21 }
22 
23 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)24 pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25     let counter = Box::into_raw(Box::new(Counter {
26         senders: AtomicUsize::new(1),
27         receivers: AtomicUsize::new(1),
28         disconnected: AtomicBool::new(false),
29         chan,
30     }));
31     let s = Sender { counter };
32     let r = Receiver { counter };
33     (s, r)
34 }
35 
36 /// The sending side.
37 pub struct Sender<C> {
38     counter: *mut Counter<C>,
39 }
40 
41 impl<C> Sender<C> {
42     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>43     fn counter(&self) -> &Counter<C> {
44         unsafe { &*self.counter }
45     }
46 
47     /// Acquires another sender reference.
acquire(&self) -> Sender<C>48     pub fn acquire(&self) -> Sender<C> {
49         let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50 
51         // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53         // just abort when the count becomes very large.
54         if count > isize::MAX as usize {
55             process::abort();
56         }
57 
58         Sender {
59             counter: self.counter,
60         }
61     }
62 
63     /// Releases the sender reference.
64     ///
65     /// Function `f` will be called if this is the last sender reference.
release<F: FnOnce(&C)>(&self, f: F)66     pub unsafe fn release<F: FnOnce(&C)>(&self, f: F) {
67         if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68             f(&self.counter().chan);
69 
70             if self.counter().disconnected.swap(true, Ordering::AcqRel) {
71                 drop(Box::from_raw(self.counter));
72             }
73         }
74     }
75 }
76 
77 impl<C> ops::Deref for Sender<C> {
78     type Target = C;
79 
deref(&self) -> &C80     fn deref(&self) -> &C {
81         &self.counter().chan
82     }
83 }
84 
85 /// The receiving side.
86 pub struct Receiver<C> {
87     counter: *mut Counter<C>,
88 }
89 
90 impl<C> Receiver<C> {
91     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>92     fn counter(&self) -> &Counter<C> {
93         unsafe { &*self.counter }
94     }
95 
96     /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>97     pub fn acquire(&self) -> Receiver<C> {
98         let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
99 
100         // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
101         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
102         // just abort when the count becomes very large.
103         if count > isize::MAX as usize {
104             process::abort();
105         }
106 
107         Receiver {
108             counter: self.counter,
109         }
110     }
111 
112     /// Releases the receiver reference.
113     ///
114     /// Function `f` will be called if this is the last receiver reference.
release<F: FnOnce(&C)>(&self, f: F)115     pub unsafe fn release<F: FnOnce(&C)>(&self, f: F) {
116         if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
117             f(&self.counter().chan);
118 
119             if self.counter().disconnected.swap(true, Ordering::AcqRel) {
120                 drop(Box::from_raw(self.counter));
121             }
122         }
123     }
124 }
125 
126 impl<C> ops::Deref for Receiver<C> {
127     type Target = C;
128 
deref(&self) -> &C129     fn deref(&self) -> &C {
130         &self.counter().chan
131     }
132 }
133