1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4 
5 use std::cell::UnsafeCell;
6 use std::marker::PhantomData;
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::time::Instant;
9 
10 use crossbeam_utils::Backoff;
11 
12 use crate::context::Context;
13 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14 use crate::select::{Operation, SelectHandle, Selected, Token};
15 use crate::utils::Spinlock;
16 use crate::waker::Waker;
17 
18 /// A pointer to a packet.
19 pub(crate) type ZeroToken = usize;
20 
21 /// A slot for passing one message from a sender to a receiver.
22 struct Packet<T> {
23     /// Equals `true` if the packet is allocated on the stack.
24     on_stack: bool,
25 
26     /// Equals `true` once the packet is ready for reading or writing.
27     ready: AtomicBool,
28 
29     /// The message.
30     msg: UnsafeCell<Option<T>>,
31 }
32 
33 impl<T> Packet<T> {
34     /// Creates an empty packet on the stack.
empty_on_stack() -> Packet<T>35     fn empty_on_stack() -> Packet<T> {
36         Packet {
37             on_stack: true,
38             ready: AtomicBool::new(false),
39             msg: UnsafeCell::new(None),
40         }
41     }
42 
43     /// Creates an empty packet on the heap.
empty_on_heap() -> Box<Packet<T>>44     fn empty_on_heap() -> Box<Packet<T>> {
45         Box::new(Packet {
46             on_stack: false,
47             ready: AtomicBool::new(false),
48             msg: UnsafeCell::new(None),
49         })
50     }
51 
52     /// Creates a packet on the stack, containing a message.
message_on_stack(msg: T) -> Packet<T>53     fn message_on_stack(msg: T) -> Packet<T> {
54         Packet {
55             on_stack: true,
56             ready: AtomicBool::new(false),
57             msg: UnsafeCell::new(Some(msg)),
58         }
59     }
60 
61     /// Waits until the packet becomes ready for reading or writing.
wait_ready(&self)62     fn wait_ready(&self) {
63         let backoff = Backoff::new();
64         while !self.ready.load(Ordering::Acquire) {
65             backoff.snooze();
66         }
67     }
68 }
69 
70 /// Inner representation of a zero-capacity channel.
71 struct Inner {
72     /// Senders waiting to pair up with a receive operation.
73     senders: Waker,
74 
75     /// Receivers waiting to pair up with a send operation.
76     receivers: Waker,
77 
78     /// Equals `true` when the channel is disconnected.
79     is_disconnected: bool,
80 }
81 
82 /// Zero-capacity channel.
83 pub(crate) struct Channel<T> {
84     /// Inner representation of the channel.
85     inner: Spinlock<Inner>,
86 
87     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
88     _marker: PhantomData<T>,
89 }
90 
91 impl<T> Channel<T> {
92     /// Constructs a new zero-capacity channel.
new() -> Self93     pub(crate) fn new() -> Self {
94         Channel {
95             inner: Spinlock::new(Inner {
96                 senders: Waker::new(),
97                 receivers: Waker::new(),
98                 is_disconnected: false,
99             }),
100             _marker: PhantomData,
101         }
102     }
103 
104     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>105     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
106         Receiver(self)
107     }
108 
109     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>110     pub(crate) fn sender(&self) -> Sender<'_, T> {
111         Sender(self)
112     }
113 
114     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool115     fn start_send(&self, token: &mut Token) -> bool {
116         let mut inner = self.inner.lock();
117 
118         // If there's a waiting receiver, pair up with it.
119         if let Some(operation) = inner.receivers.try_select() {
120             token.zero = operation.packet;
121             true
122         } else if inner.is_disconnected {
123             token.zero = 0;
124             true
125         } else {
126             false
127         }
128     }
129 
130     /// Writes a message into the packet.
write(&self, token: &mut Token, msg: T) -> Result<(), T>131     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
132         // If there is no packet, the channel is disconnected.
133         if token.zero == 0 {
134             return Err(msg);
135         }
136 
137         let packet = &*(token.zero as *const Packet<T>);
138         packet.msg.get().write(Some(msg));
139         packet.ready.store(true, Ordering::Release);
140         Ok(())
141     }
142 
143     /// Attempts to pair up with a sender.
start_recv(&self, token: &mut Token) -> bool144     fn start_recv(&self, token: &mut Token) -> bool {
145         let mut inner = self.inner.lock();
146 
147         // If there's a waiting sender, pair up with it.
148         if let Some(operation) = inner.senders.try_select() {
149             token.zero = operation.packet;
150             true
151         } else if inner.is_disconnected {
152             token.zero = 0;
153             true
154         } else {
155             false
156         }
157     }
158 
159     /// Reads a message from the packet.
read(&self, token: &mut Token) -> Result<T, ()>160     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
161         // If there is no packet, the channel is disconnected.
162         if token.zero == 0 {
163             return Err(());
164         }
165 
166         let packet = &*(token.zero as *const Packet<T>);
167 
168         if packet.on_stack {
169             // The message has been in the packet from the beginning, so there is no need to wait
170             // for it. However, after reading the message, we need to set `ready` to `true` in
171             // order to signal that the packet can be destroyed.
172             let msg = packet.msg.get().replace(None).unwrap();
173             packet.ready.store(true, Ordering::Release);
174             Ok(msg)
175         } else {
176             // Wait until the message becomes available, then read it and destroy the
177             // heap-allocated packet.
178             packet.wait_ready();
179             let msg = packet.msg.get().replace(None).unwrap();
180             drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
181             Ok(msg)
182         }
183     }
184 
185     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>186     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
187         let token = &mut Token::default();
188         let mut inner = self.inner.lock();
189 
190         // If there's a waiting receiver, pair up with it.
191         if let Some(operation) = inner.receivers.try_select() {
192             token.zero = operation.packet;
193             drop(inner);
194             unsafe {
195                 self.write(token, msg).ok().unwrap();
196             }
197             Ok(())
198         } else if inner.is_disconnected {
199             Err(TrySendError::Disconnected(msg))
200         } else {
201             Err(TrySendError::Full(msg))
202         }
203     }
204 
205     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>206     pub(crate) fn send(
207         &self,
208         msg: T,
209         deadline: Option<Instant>,
210     ) -> Result<(), SendTimeoutError<T>> {
211         let token = &mut Token::default();
212         let mut inner = self.inner.lock();
213 
214         // If there's a waiting receiver, pair up with it.
215         if let Some(operation) = inner.receivers.try_select() {
216             token.zero = operation.packet;
217             drop(inner);
218             unsafe {
219                 self.write(token, msg).ok().unwrap();
220             }
221             return Ok(());
222         }
223 
224         if inner.is_disconnected {
225             return Err(SendTimeoutError::Disconnected(msg));
226         }
227 
228         Context::with(|cx| {
229             // Prepare for blocking until a receiver wakes us up.
230             let oper = Operation::hook(token);
231             let packet = Packet::<T>::message_on_stack(msg);
232             inner
233                 .senders
234                 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
235             inner.receivers.notify();
236             drop(inner);
237 
238             // Block the current thread.
239             let sel = cx.wait_until(deadline);
240 
241             match sel {
242                 Selected::Waiting => unreachable!(),
243                 Selected::Aborted => {
244                     self.inner.lock().senders.unregister(oper).unwrap();
245                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
246                     Err(SendTimeoutError::Timeout(msg))
247                 }
248                 Selected::Disconnected => {
249                     self.inner.lock().senders.unregister(oper).unwrap();
250                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
251                     Err(SendTimeoutError::Disconnected(msg))
252                 }
253                 Selected::Operation(_) => {
254                     // Wait until the message is read, then drop the packet.
255                     packet.wait_ready();
256                     Ok(())
257                 }
258             }
259         })
260     }
261 
262     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>263     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
264         let token = &mut Token::default();
265         let mut inner = self.inner.lock();
266 
267         // If there's a waiting sender, pair up with it.
268         if let Some(operation) = inner.senders.try_select() {
269             token.zero = operation.packet;
270             drop(inner);
271             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
272         } else if inner.is_disconnected {
273             Err(TryRecvError::Disconnected)
274         } else {
275             Err(TryRecvError::Empty)
276         }
277     }
278 
279     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>280     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
281         let token = &mut Token::default();
282         let mut inner = self.inner.lock();
283 
284         // If there's a waiting sender, pair up with it.
285         if let Some(operation) = inner.senders.try_select() {
286             token.zero = operation.packet;
287             drop(inner);
288             unsafe {
289                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
290             }
291         }
292 
293         if inner.is_disconnected {
294             return Err(RecvTimeoutError::Disconnected);
295         }
296 
297         Context::with(|cx| {
298             // Prepare for blocking until a sender wakes us up.
299             let oper = Operation::hook(token);
300             let packet = Packet::<T>::empty_on_stack();
301             inner
302                 .receivers
303                 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
304             inner.senders.notify();
305             drop(inner);
306 
307             // Block the current thread.
308             let sel = cx.wait_until(deadline);
309 
310             match sel {
311                 Selected::Waiting => unreachable!(),
312                 Selected::Aborted => {
313                     self.inner.lock().receivers.unregister(oper).unwrap();
314                     Err(RecvTimeoutError::Timeout)
315                 }
316                 Selected::Disconnected => {
317                     self.inner.lock().receivers.unregister(oper).unwrap();
318                     Err(RecvTimeoutError::Disconnected)
319                 }
320                 Selected::Operation(_) => {
321                     // Wait until the message is provided, then read it.
322                     packet.wait_ready();
323                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
324                 }
325             }
326         })
327     }
328 
329     /// Disconnects the channel and wakes up all blocked senders and receivers.
330     ///
331     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool332     pub(crate) fn disconnect(&self) -> bool {
333         let mut inner = self.inner.lock();
334 
335         if !inner.is_disconnected {
336             inner.is_disconnected = true;
337             inner.senders.disconnect();
338             inner.receivers.disconnect();
339             true
340         } else {
341             false
342         }
343     }
344 
345     /// Returns the current number of messages inside the channel.
len(&self) -> usize346     pub(crate) fn len(&self) -> usize {
347         0
348     }
349 
350     /// Returns the capacity of the channel.
351     #[allow(clippy::unnecessary_wraps)] // This is intentional.
capacity(&self) -> Option<usize>352     pub(crate) fn capacity(&self) -> Option<usize> {
353         Some(0)
354     }
355 
356     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool357     pub(crate) fn is_empty(&self) -> bool {
358         true
359     }
360 
361     /// Returns `true` if the channel is full.
is_full(&self) -> bool362     pub(crate) fn is_full(&self) -> bool {
363         true
364     }
365 }
366 
367 /// Receiver handle to a channel.
368 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
369 
370 /// Sender handle to a channel.
371 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
372 
373 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool374     fn try_select(&self, token: &mut Token) -> bool {
375         self.0.start_recv(token)
376     }
377 
deadline(&self) -> Option<Instant>378     fn deadline(&self) -> Option<Instant> {
379         None
380     }
381 
register(&self, oper: Operation, cx: &Context) -> bool382     fn register(&self, oper: Operation, cx: &Context) -> bool {
383         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
384 
385         let mut inner = self.0.inner.lock();
386         inner
387             .receivers
388             .register_with_packet(oper, packet as usize, cx);
389         inner.senders.notify();
390         inner.senders.can_select() || inner.is_disconnected
391     }
392 
unregister(&self, oper: Operation)393     fn unregister(&self, oper: Operation) {
394         if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
395             unsafe {
396                 drop(Box::from_raw(operation.packet as *mut Packet<T>));
397             }
398         }
399     }
400 
accept(&self, token: &mut Token, cx: &Context) -> bool401     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
402         token.zero = cx.wait_packet();
403         true
404     }
405 
is_ready(&self) -> bool406     fn is_ready(&self) -> bool {
407         let inner = self.0.inner.lock();
408         inner.senders.can_select() || inner.is_disconnected
409     }
410 
watch(&self, oper: Operation, cx: &Context) -> bool411     fn watch(&self, oper: Operation, cx: &Context) -> bool {
412         let mut inner = self.0.inner.lock();
413         inner.receivers.watch(oper, cx);
414         inner.senders.can_select() || inner.is_disconnected
415     }
416 
unwatch(&self, oper: Operation)417     fn unwatch(&self, oper: Operation) {
418         let mut inner = self.0.inner.lock();
419         inner.receivers.unwatch(oper);
420     }
421 }
422 
423 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool424     fn try_select(&self, token: &mut Token) -> bool {
425         self.0.start_send(token)
426     }
427 
deadline(&self) -> Option<Instant>428     fn deadline(&self) -> Option<Instant> {
429         None
430     }
431 
register(&self, oper: Operation, cx: &Context) -> bool432     fn register(&self, oper: Operation, cx: &Context) -> bool {
433         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
434 
435         let mut inner = self.0.inner.lock();
436         inner
437             .senders
438             .register_with_packet(oper, packet as usize, cx);
439         inner.receivers.notify();
440         inner.receivers.can_select() || inner.is_disconnected
441     }
442 
unregister(&self, oper: Operation)443     fn unregister(&self, oper: Operation) {
444         if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
445             unsafe {
446                 drop(Box::from_raw(operation.packet as *mut Packet<T>));
447             }
448         }
449     }
450 
accept(&self, token: &mut Token, cx: &Context) -> bool451     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
452         token.zero = cx.wait_packet();
453         true
454     }
455 
is_ready(&self) -> bool456     fn is_ready(&self) -> bool {
457         let inner = self.0.inner.lock();
458         inner.receivers.can_select() || inner.is_disconnected
459     }
460 
watch(&self, oper: Operation, cx: &Context) -> bool461     fn watch(&self, oper: Operation, cx: &Context) -> bool {
462         let mut inner = self.0.inner.lock();
463         inner.senders.watch(oper, cx);
464         inner.receivers.can_select() || inner.is_disconnected
465     }
466 
unwatch(&self, oper: Operation)467     fn unwatch(&self, oper: Operation) {
468         let mut inner = self.0.inner.lock();
469         inner.senders.unwatch(oper);
470     }
471 }
472