1 use crate::loom::cell::CausalCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::sync::mpsc::error::{ClosedError, TryRecvError};
6 use crate::sync::mpsc::{error, list};
7 
8 use std::fmt;
9 use std::process;
10 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
11 use std::task::Poll::{Pending, Ready};
12 use std::task::{Context, Poll};
13 
14 /// Channel sender
15 pub(crate) struct Tx<T, S: Semaphore> {
16     inner: Arc<Chan<T, S>>,
17     permit: S::Permit,
18 }
19 
20 impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
21 where
22     S::Permit: fmt::Debug,
23     S: fmt::Debug,
24 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result25     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
26         fmt.debug_struct("Tx")
27             .field("inner", &self.inner)
28             .field("permit", &self.permit)
29             .finish()
30     }
31 }
32 
33 /// Channel receiver
34 pub(crate) struct Rx<T, S: Semaphore> {
35     inner: Arc<Chan<T, S>>,
36 }
37 
38 impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
39 where
40     S: fmt::Debug,
41 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result42     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
43         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
44     }
45 }
46 
47 #[derive(Debug, Eq, PartialEq)]
48 pub(crate) enum TrySendError {
49     Closed,
50     Full,
51 }
52 
53 impl<T> From<(T, TrySendError)> for error::SendError<T> {
from(src: (T, TrySendError)) -> error::SendError<T>54     fn from(src: (T, TrySendError)) -> error::SendError<T> {
55         match src.1 {
56             TrySendError::Closed => error::SendError(src.0),
57             TrySendError::Full => unreachable!(),
58         }
59     }
60 }
61 
62 impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
from(src: (T, TrySendError)) -> error::TrySendError<T>63     fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
64         match src.1 {
65             TrySendError::Closed => error::TrySendError::Closed(src.0),
66             TrySendError::Full => error::TrySendError::Full(src.0),
67         }
68     }
69 }
70 
71 pub(crate) trait Semaphore {
72     type Permit;
73 
new_permit() -> Self::Permit74     fn new_permit() -> Self::Permit;
75 
76     /// The permit is dropped without a value being sent. In this case, the
77     /// permit must be returned to the semaphore.
drop_permit(&self, permit: &mut Self::Permit)78     fn drop_permit(&self, permit: &mut Self::Permit);
79 
is_idle(&self) -> bool80     fn is_idle(&self) -> bool;
81 
add_permit(&self)82     fn add_permit(&self);
83 
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Self::Permit, ) -> Poll<Result<(), ClosedError>>84     fn poll_acquire(
85         &self,
86         cx: &mut Context<'_>,
87         permit: &mut Self::Permit,
88     ) -> Poll<Result<(), ClosedError>>;
89 
try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>90     fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
91 
92     /// A value was sent into the channel and the permit held by `tx` is
93     /// dropped. In this case, the permit should not immeditely be returned to
94     /// the semaphore. Instead, the permit is returnred to the semaphore once
95     /// the sent value is read by the rx handle.
forget(&self, permit: &mut Self::Permit)96     fn forget(&self, permit: &mut Self::Permit);
97 
close(&self)98     fn close(&self);
99 }
100 
101 struct Chan<T, S> {
102     /// Handle to the push half of the lock-free list.
103     tx: list::Tx<T>,
104 
105     /// Coordinates access to channel's capacity.
106     semaphore: S,
107 
108     /// Receiver waker. Notified when a value is pushed into the channel.
109     rx_waker: AtomicWaker,
110 
111     /// Tracks the number of outstanding sender handles.
112     ///
113     /// When this drops to zero, the send half of the channel is closed.
114     tx_count: AtomicUsize,
115 
116     /// Only accessed by `Rx` handle.
117     rx_fields: CausalCell<RxFields<T>>,
118 }
119 
120 impl<T, S> fmt::Debug for Chan<T, S>
121 where
122     S: fmt::Debug,
123 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result124     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
125         fmt.debug_struct("Chan")
126             .field("tx", &self.tx)
127             .field("semaphore", &self.semaphore)
128             .field("rx_waker", &self.rx_waker)
129             .field("tx_count", &self.tx_count)
130             .field("rx_fields", &"...")
131             .finish()
132     }
133 }
134 
135 /// Fields only accessed by `Rx` handle.
136 struct RxFields<T> {
137     /// Channel receiver. This field is only accessed by the `Receiver` type.
138     list: list::Rx<T>,
139 
140     /// `true` if `Rx::close` is called.
141     rx_closed: bool,
142 }
143 
144 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result145     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
146         fmt.debug_struct("RxFields")
147             .field("list", &self.list)
148             .field("rx_closed", &self.rx_closed)
149             .finish()
150     }
151 }
152 
153 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
154 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
155 
channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) where S: Semaphore,156 pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
157 where
158     S: Semaphore,
159 {
160     let (tx, rx) = list::channel();
161 
162     let chan = Arc::new(Chan {
163         tx,
164         semaphore,
165         rx_waker: AtomicWaker::new(),
166         tx_count: AtomicUsize::new(1),
167         rx_fields: CausalCell::new(RxFields {
168             list: rx,
169             rx_closed: false,
170         }),
171     });
172 
173     (Tx::new(chan.clone()), Rx::new(chan))
174 }
175 
176 // ===== impl Tx =====
177 
178 impl<T, S> Tx<T, S>
179 where
180     S: Semaphore,
181 {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>182     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
183         Tx {
184             inner: chan,
185             permit: S::new_permit(),
186         }
187     }
188 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>189     pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
190         self.inner.semaphore.poll_acquire(cx, &mut self.permit)
191     }
192 
193     /// Send a message and notify the receiver.
try_send(&mut self, value: T) -> Result<(), (T, TrySendError)>194     pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
195         self.inner.try_send(value, &mut self.permit)
196     }
197 }
198 
199 impl<T> Tx<T, AtomicUsize> {
send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)>200     pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
201         self.inner.try_send(value, &mut ())
202     }
203 }
204 
205 impl<T, S> Clone for Tx<T, S>
206 where
207     S: Semaphore,
208 {
clone(&self) -> Tx<T, S>209     fn clone(&self) -> Tx<T, S> {
210         // Using a Relaxed ordering here is sufficient as the caller holds a
211         // strong ref to `self`, preventing a concurrent decrement to zero.
212         self.inner.tx_count.fetch_add(1, Relaxed);
213 
214         Tx {
215             inner: self.inner.clone(),
216             permit: S::new_permit(),
217         }
218     }
219 }
220 
221 impl<T, S> Drop for Tx<T, S>
222 where
223     S: Semaphore,
224 {
drop(&mut self)225     fn drop(&mut self) {
226         self.inner.semaphore.drop_permit(&mut self.permit);
227 
228         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
229             return;
230         }
231 
232         // Close the list, which sends a `Close` message
233         self.inner.tx.close();
234 
235         // Notify the receiver
236         self.inner.rx_waker.wake();
237     }
238 }
239 
240 // ===== impl Rx =====
241 
242 impl<T, S> Rx<T, S>
243 where
244     S: Semaphore,
245 {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>246     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
247         Rx { inner: chan }
248     }
249 
close(&mut self)250     pub(crate) fn close(&mut self) {
251         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
252             let rx_fields = unsafe { &mut *rx_fields_ptr };
253 
254             if rx_fields.rx_closed {
255                 return;
256             }
257 
258             rx_fields.rx_closed = true;
259         });
260 
261         self.inner.semaphore.close();
262     }
263 
264     /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>265     pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
266         use super::block::Read::*;
267 
268         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
269             let rx_fields = unsafe { &mut *rx_fields_ptr };
270 
271             macro_rules! try_recv {
272                 () => {
273                     match rx_fields.list.pop(&self.inner.tx) {
274                         Some(Value(value)) => {
275                             self.inner.semaphore.add_permit();
276                             return Ready(Some(value));
277                         }
278                         Some(Closed) => {
279                             // TODO: This check may not be required as it most
280                             // likely can only return `true` at this point. A
281                             // channel is closed when all tx handles are
282                             // dropped. Dropping a tx handle releases memory,
283                             // which ensures that if dropping the tx handle is
284                             // visible, then all messages sent are also visible.
285                             assert!(self.inner.semaphore.is_idle());
286                             return Ready(None);
287                         }
288                         None => {} // fall through
289                     }
290                 };
291             }
292 
293             try_recv!();
294 
295             self.inner.rx_waker.register_by_ref(cx.waker());
296 
297             // It is possible that a value was pushed between attempting to read
298             // and registering the task, so we have to check the channel a
299             // second time here.
300             try_recv!();
301 
302             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
303                 Ready(None)
304             } else {
305                 Pending
306             }
307         })
308     }
309 
310     /// Receives the next value without blocking
try_recv(&mut self) -> Result<T, TryRecvError>311     pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
312         use super::block::Read::*;
313         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
314             let rx_fields = unsafe { &mut *rx_fields_ptr };
315             match rx_fields.list.pop(&self.inner.tx) {
316                 Some(Value(value)) => {
317                     self.inner.semaphore.add_permit();
318                     Ok(value)
319                 }
320                 Some(Closed) => Err(TryRecvError::Closed),
321                 None => Err(TryRecvError::Empty),
322             }
323         })
324     }
325 }
326 
327 impl<T, S> Drop for Rx<T, S>
328 where
329     S: Semaphore,
330 {
drop(&mut self)331     fn drop(&mut self) {
332         use super::block::Read::Value;
333 
334         self.close();
335 
336         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
337             let rx_fields = unsafe { &mut *rx_fields_ptr };
338 
339             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
340                 self.inner.semaphore.add_permit();
341             }
342         })
343     }
344 }
345 
346 // ===== impl Chan =====
347 
348 impl<T, S> Chan<T, S>
349 where
350     S: Semaphore,
351 {
try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)>352     fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
353         if let Err(e) = self.semaphore.try_acquire(permit) {
354             return Err((value, e));
355         }
356 
357         // Push the value
358         self.tx.push(value);
359 
360         // Notify the rx task
361         self.rx_waker.wake();
362 
363         // Release the permit
364         self.semaphore.forget(permit);
365 
366         Ok(())
367     }
368 }
369 
370 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)371     fn drop(&mut self) {
372         use super::block::Read::Value;
373 
374         // Safety: the only owner of the rx fields is Chan, and eing
375         // inside its own Drop means we're the last ones to touch it.
376         self.rx_fields.with_mut(|rx_fields_ptr| {
377             let rx_fields = unsafe { &mut *rx_fields_ptr };
378 
379             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
380             unsafe { rx_fields.list.free_blocks() };
381         });
382     }
383 }
384 
385 use crate::sync::semaphore_ll::TryAcquireError;
386 
387 impl From<TryAcquireError> for TrySendError {
from(src: TryAcquireError) -> TrySendError388     fn from(src: TryAcquireError) -> TrySendError {
389         if src.is_closed() {
390             TrySendError::Closed
391         } else if src.is_no_permits() {
392             TrySendError::Full
393         } else {
394             unreachable!();
395         }
396     }
397 }
398 
399 // ===== impl Semaphore for (::Semaphore, capacity) =====
400 
401 use crate::sync::semaphore_ll::Permit;
402 
403 impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
404     type Permit = Permit;
405 
new_permit() -> Permit406     fn new_permit() -> Permit {
407         Permit::new()
408     }
409 
drop_permit(&self, permit: &mut Permit)410     fn drop_permit(&self, permit: &mut Permit) {
411         permit.release(1, &self.0);
412     }
413 
add_permit(&self)414     fn add_permit(&self) {
415         self.0.add_permits(1)
416     }
417 
is_idle(&self) -> bool418     fn is_idle(&self) -> bool {
419         self.0.available_permits() == self.1
420     }
421 
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll<Result<(), ClosedError>>422     fn poll_acquire(
423         &self,
424         cx: &mut Context<'_>,
425         permit: &mut Permit,
426     ) -> Poll<Result<(), ClosedError>> {
427         permit
428             .poll_acquire(cx, 1, &self.0)
429             .map_err(|_| ClosedError::new())
430     }
431 
try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError>432     fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
433         permit.try_acquire(1, &self.0)?;
434         Ok(())
435     }
436 
forget(&self, permit: &mut Self::Permit)437     fn forget(&self, permit: &mut Self::Permit) {
438         permit.forget(1);
439     }
440 
close(&self)441     fn close(&self) {
442         self.0.close();
443     }
444 }
445 
446 // ===== impl Semaphore for AtomicUsize =====
447 
448 use std::sync::atomic::Ordering::{Acquire, Release};
449 use std::usize;
450 
451 impl Semaphore for AtomicUsize {
452     type Permit = ();
453 
new_permit()454     fn new_permit() {}
455 
drop_permit(&self, _permit: &mut ())456     fn drop_permit(&self, _permit: &mut ()) {}
457 
add_permit(&self)458     fn add_permit(&self) {
459         let prev = self.fetch_sub(2, Release);
460 
461         if prev >> 1 == 0 {
462             // Something went wrong
463             process::abort();
464         }
465     }
466 
is_idle(&self) -> bool467     fn is_idle(&self) -> bool {
468         self.load(Acquire) >> 1 == 0
469     }
470 
poll_acquire( &self, _cx: &mut Context<'_>, permit: &mut (), ) -> Poll<Result<(), ClosedError>>471     fn poll_acquire(
472         &self,
473         _cx: &mut Context<'_>,
474         permit: &mut (),
475     ) -> Poll<Result<(), ClosedError>> {
476         Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
477     }
478 
try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError>479     fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
480         let mut curr = self.load(Acquire);
481 
482         loop {
483             if curr & 1 == 1 {
484                 return Err(TrySendError::Closed);
485             }
486 
487             if curr == usize::MAX ^ 1 {
488                 // Overflowed the ref count. There is no safe way to recover, so
489                 // abort the process. In practice, this should never happen.
490                 process::abort()
491             }
492 
493             match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
494                 Ok(_) => return Ok(()),
495                 Err(actual) => {
496                     curr = actual;
497                 }
498             }
499         }
500     }
501 
forget(&self, _permit: &mut ())502     fn forget(&self, _permit: &mut ()) {}
503 
close(&self)504     fn close(&self) {
505         self.fetch_or(1, Release);
506     }
507 }
508