1 ///! Reference counter for channels.
2
3 use std::isize;
4 use std::ops;
5 use std::process;
6 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8 /// Reference counter internals.
9 struct Counter<C> {
10 /// The number of senders associated with the channel.
11 senders: AtomicUsize,
12
13 /// The number of receivers associated with the channel.
14 receivers: AtomicUsize,
15
16 /// If `true`, either the sending or receiving side has been dropped.
17 disconnected: AtomicBool,
18
19 /// The internal channel.
20 chan: C,
21 }
22
23 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)24 pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25 let counter = Box::into_raw(Box::new(Counter {
26 senders: AtomicUsize::new(1),
27 receivers: AtomicUsize::new(1),
28 disconnected: AtomicBool::new(false),
29 chan,
30 }));
31 let s = Sender { counter };
32 let r = Receiver { counter };
33 (s, r)
34 }
35
36 /// The sending side.
37 pub struct Sender<C> {
38 counter: *mut Counter<C>,
39 }
40
41 impl<C> Sender<C> {
42 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>43 fn counter(&self) -> &Counter<C> {
44 unsafe { &*self.counter }
45 }
46
47 /// Acquires another sender reference.
acquire(&self) -> Sender<C>48 pub fn acquire(&self) -> Sender<C> {
49 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50
51 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53 // just abort when the count becomes very large.
54 if count > isize::MAX as usize {
55 process::abort();
56 }
57
58 Sender {
59 counter: self.counter,
60 }
61 }
62
63 /// Releases the sender reference.
64 ///
65 /// Function `f` will be called if this is the last sender reference.
release<F: FnOnce(&C)>(&self, f: F)66 pub unsafe fn release<F: FnOnce(&C)>(&self, f: F) {
67 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68 f(&self.counter().chan);
69
70 if self.counter().disconnected.swap(true, Ordering::AcqRel) {
71 drop(Box::from_raw(self.counter));
72 }
73 }
74 }
75 }
76
77 impl<C> ops::Deref for Sender<C> {
78 type Target = C;
79
deref(&self) -> &C80 fn deref(&self) -> &C {
81 &self.counter().chan
82 }
83 }
84
85 /// The receiving side.
86 pub struct Receiver<C> {
87 counter: *mut Counter<C>,
88 }
89
90 impl<C> Receiver<C> {
91 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>92 fn counter(&self) -> &Counter<C> {
93 unsafe { &*self.counter }
94 }
95
96 /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>97 pub fn acquire(&self) -> Receiver<C> {
98 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
99
100 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
101 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
102 // just abort when the count becomes very large.
103 if count > isize::MAX as usize {
104 process::abort();
105 }
106
107 Receiver {
108 counter: self.counter,
109 }
110 }
111
112 /// Releases the receiver reference.
113 ///
114 /// Function `f` will be called if this is the last receiver reference.
release<F: FnOnce(&C)>(&self, f: F)115 pub unsafe fn release<F: FnOnce(&C)>(&self, f: F) {
116 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
117 f(&self.counter().chan);
118
119 if self.counter().disconnected.swap(true, Ordering::AcqRel) {
120 drop(Box::from_raw(self.counter));
121 }
122 }
123 }
124 }
125
126 impl<C> ops::Deref for Receiver<C> {
127 type Target = C;
128
deref(&self) -> &C129 fn deref(&self) -> &C {
130 &self.counter().chan
131 }
132 }
133