1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::sync::mpsc::error::{ClosedError, TryRecvError};
6 use crate::sync::mpsc::{error, list};
7 
8 use std::fmt;
9 use std::process;
10 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
11 use std::task::Poll::{Pending, Ready};
12 use std::task::{Context, Poll};
13 
14 /// Channel sender
15 pub(crate) struct Tx<T, S: Semaphore> {
16     inner: Arc<Chan<T, S>>,
17     permit: S::Permit,
18 }
19 
20 impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
21 where
22     S::Permit: fmt::Debug,
23     S: fmt::Debug,
24 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result25     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
26         fmt.debug_struct("Tx")
27             .field("inner", &self.inner)
28             .field("permit", &self.permit)
29             .finish()
30     }
31 }
32 
33 /// Channel receiver
34 pub(crate) struct Rx<T, S: Semaphore> {
35     inner: Arc<Chan<T, S>>,
36 }
37 
38 impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
39 where
40     S: fmt::Debug,
41 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result42     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
43         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
44     }
45 }
46 
47 #[derive(Debug, Eq, PartialEq)]
48 pub(crate) enum TrySendError {
49     Closed,
50     Full,
51 }
52 
53 impl<T> From<(T, TrySendError)> for error::SendError<T> {
from(src: (T, TrySendError)) -> error::SendError<T>54     fn from(src: (T, TrySendError)) -> error::SendError<T> {
55         match src.1 {
56             TrySendError::Closed => error::SendError(src.0),
57             TrySendError::Full => unreachable!(),
58         }
59     }
60 }
61 
62 impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
from(src: (T, TrySendError)) -> error::TrySendError<T>63     fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
64         match src.1 {
65             TrySendError::Closed => error::TrySendError::Closed(src.0),
66             TrySendError::Full => error::TrySendError::Full(src.0),
67         }
68     }
69 }
70 
71 pub(crate) trait Semaphore {
72     type Permit;
73 
new_permit() -> Self::Permit74     fn new_permit() -> Self::Permit;
75 
76     /// The permit is dropped without a value being sent. In this case, the
77     /// permit must be returned to the semaphore.
drop_permit(&self, permit: &mut Self::Permit)78     fn drop_permit(&self, permit: &mut Self::Permit);
79 
is_idle(&self) -> bool80     fn is_idle(&self) -> bool;
81 
add_permit(&self)82     fn add_permit(&self);
83 
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Self::Permit, ) -> Poll<Result<(), ClosedError>>84     fn poll_acquire(
85         &self,
86         cx: &mut Context<'_>,
87         permit: &mut Self::Permit,
88     ) -> Poll<Result<(), ClosedError>>;
89 
try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>90     fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
91 
92     /// A value was sent into the channel and the permit held by `tx` is
93     /// dropped. In this case, the permit should not immeditely be returned to
94     /// the semaphore. Instead, the permit is returnred to the semaphore once
95     /// the sent value is read by the rx handle.
forget(&self, permit: &mut Self::Permit)96     fn forget(&self, permit: &mut Self::Permit);
97 
close(&self)98     fn close(&self);
99 }
100 
101 struct Chan<T, S> {
102     /// Handle to the push half of the lock-free list.
103     tx: list::Tx<T>,
104 
105     /// Coordinates access to channel's capacity.
106     semaphore: S,
107 
108     /// Receiver waker. Notified when a value is pushed into the channel.
109     rx_waker: AtomicWaker,
110 
111     /// Tracks the number of outstanding sender handles.
112     ///
113     /// When this drops to zero, the send half of the channel is closed.
114     tx_count: AtomicUsize,
115 
116     /// Only accessed by `Rx` handle.
117     rx_fields: UnsafeCell<RxFields<T>>,
118 }
119 
120 impl<T, S> fmt::Debug for Chan<T, S>
121 where
122     S: fmt::Debug,
123 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result124     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
125         fmt.debug_struct("Chan")
126             .field("tx", &self.tx)
127             .field("semaphore", &self.semaphore)
128             .field("rx_waker", &self.rx_waker)
129             .field("tx_count", &self.tx_count)
130             .field("rx_fields", &"...")
131             .finish()
132     }
133 }
134 
135 /// Fields only accessed by `Rx` handle.
136 struct RxFields<T> {
137     /// Channel receiver. This field is only accessed by the `Receiver` type.
138     list: list::Rx<T>,
139 
140     /// `true` if `Rx::close` is called.
141     rx_closed: bool,
142 }
143 
144 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result145     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
146         fmt.debug_struct("RxFields")
147             .field("list", &self.list)
148             .field("rx_closed", &self.rx_closed)
149             .finish()
150     }
151 }
152 
153 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
154 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
155 
channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) where S: Semaphore,156 pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
157 where
158     S: Semaphore,
159 {
160     let (tx, rx) = list::channel();
161 
162     let chan = Arc::new(Chan {
163         tx,
164         semaphore,
165         rx_waker: AtomicWaker::new(),
166         tx_count: AtomicUsize::new(1),
167         rx_fields: UnsafeCell::new(RxFields {
168             list: rx,
169             rx_closed: false,
170         }),
171     });
172 
173     (Tx::new(chan.clone()), Rx::new(chan))
174 }
175 
176 // ===== impl Tx =====
177 
178 impl<T, S> Tx<T, S>
179 where
180     S: Semaphore,
181 {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>182     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
183         Tx {
184             inner: chan,
185             permit: S::new_permit(),
186         }
187     }
188 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>189     pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
190         self.inner.semaphore.poll_acquire(cx, &mut self.permit)
191     }
192 
disarm(&mut self)193     pub(crate) fn disarm(&mut self) {
194         // TODO: should this error if not acquired?
195         self.inner.semaphore.drop_permit(&mut self.permit)
196     }
197 
198     /// Send a message and notify the receiver.
try_send(&mut self, value: T) -> Result<(), (T, TrySendError)>199     pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
200         self.inner.try_send(value, &mut self.permit)
201     }
202 }
203 
204 impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
is_ready(&self) -> bool205     pub(crate) fn is_ready(&self) -> bool {
206         self.permit.is_acquired()
207     }
208 }
209 
210 impl<T> Tx<T, AtomicUsize> {
send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)>211     pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
212         self.inner.try_send(value, &mut ())
213     }
214 }
215 
216 impl<T, S> Clone for Tx<T, S>
217 where
218     S: Semaphore,
219 {
clone(&self) -> Tx<T, S>220     fn clone(&self) -> Tx<T, S> {
221         // Using a Relaxed ordering here is sufficient as the caller holds a
222         // strong ref to `self`, preventing a concurrent decrement to zero.
223         self.inner.tx_count.fetch_add(1, Relaxed);
224 
225         Tx {
226             inner: self.inner.clone(),
227             permit: S::new_permit(),
228         }
229     }
230 }
231 
232 impl<T, S> Drop for Tx<T, S>
233 where
234     S: Semaphore,
235 {
drop(&mut self)236     fn drop(&mut self) {
237         self.inner.semaphore.drop_permit(&mut self.permit);
238 
239         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
240             return;
241         }
242 
243         // Close the list, which sends a `Close` message
244         self.inner.tx.close();
245 
246         // Notify the receiver
247         self.inner.rx_waker.wake();
248     }
249 }
250 
251 // ===== impl Rx =====
252 
253 impl<T, S> Rx<T, S>
254 where
255     S: Semaphore,
256 {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>257     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
258         Rx { inner: chan }
259     }
260 
close(&mut self)261     pub(crate) fn close(&mut self) {
262         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
263             let rx_fields = unsafe { &mut *rx_fields_ptr };
264 
265             if rx_fields.rx_closed {
266                 return;
267             }
268 
269             rx_fields.rx_closed = true;
270         });
271 
272         self.inner.semaphore.close();
273     }
274 
275     /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>276     pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
277         use super::block::Read::*;
278 
279         // Keep track of task budget
280         ready!(crate::coop::poll_proceed(cx));
281 
282         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
283             let rx_fields = unsafe { &mut *rx_fields_ptr };
284 
285             macro_rules! try_recv {
286                 () => {
287                     match rx_fields.list.pop(&self.inner.tx) {
288                         Some(Value(value)) => {
289                             self.inner.semaphore.add_permit();
290                             return Ready(Some(value));
291                         }
292                         Some(Closed) => {
293                             // TODO: This check may not be required as it most
294                             // likely can only return `true` at this point. A
295                             // channel is closed when all tx handles are
296                             // dropped. Dropping a tx handle releases memory,
297                             // which ensures that if dropping the tx handle is
298                             // visible, then all messages sent are also visible.
299                             assert!(self.inner.semaphore.is_idle());
300                             return Ready(None);
301                         }
302                         None => {} // fall through
303                     }
304                 };
305             }
306 
307             try_recv!();
308 
309             self.inner.rx_waker.register_by_ref(cx.waker());
310 
311             // It is possible that a value was pushed between attempting to read
312             // and registering the task, so we have to check the channel a
313             // second time here.
314             try_recv!();
315 
316             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
317                 Ready(None)
318             } else {
319                 Pending
320             }
321         })
322     }
323 
324     /// Receives the next value without blocking
try_recv(&mut self) -> Result<T, TryRecvError>325     pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
326         use super::block::Read::*;
327         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
328             let rx_fields = unsafe { &mut *rx_fields_ptr };
329             match rx_fields.list.pop(&self.inner.tx) {
330                 Some(Value(value)) => {
331                     self.inner.semaphore.add_permit();
332                     Ok(value)
333                 }
334                 Some(Closed) => Err(TryRecvError::Closed),
335                 None => Err(TryRecvError::Empty),
336             }
337         })
338     }
339 }
340 
341 impl<T, S> Drop for Rx<T, S>
342 where
343     S: Semaphore,
344 {
drop(&mut self)345     fn drop(&mut self) {
346         use super::block::Read::Value;
347 
348         self.close();
349 
350         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
351             let rx_fields = unsafe { &mut *rx_fields_ptr };
352 
353             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
354                 self.inner.semaphore.add_permit();
355             }
356         })
357     }
358 }
359 
360 // ===== impl Chan =====
361 
362 impl<T, S> Chan<T, S>
363 where
364     S: Semaphore,
365 {
try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)>366     fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
367         if let Err(e) = self.semaphore.try_acquire(permit) {
368             return Err((value, e));
369         }
370 
371         // Push the value
372         self.tx.push(value);
373 
374         // Notify the rx task
375         self.rx_waker.wake();
376 
377         // Release the permit
378         self.semaphore.forget(permit);
379 
380         Ok(())
381     }
382 }
383 
384 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)385     fn drop(&mut self) {
386         use super::block::Read::Value;
387 
388         // Safety: the only owner of the rx fields is Chan, and eing
389         // inside its own Drop means we're the last ones to touch it.
390         self.rx_fields.with_mut(|rx_fields_ptr| {
391             let rx_fields = unsafe { &mut *rx_fields_ptr };
392 
393             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
394             unsafe { rx_fields.list.free_blocks() };
395         });
396     }
397 }
398 
399 use crate::sync::semaphore_ll::TryAcquireError;
400 
401 impl From<TryAcquireError> for TrySendError {
from(src: TryAcquireError) -> TrySendError402     fn from(src: TryAcquireError) -> TrySendError {
403         if src.is_closed() {
404             TrySendError::Closed
405         } else if src.is_no_permits() {
406             TrySendError::Full
407         } else {
408             unreachable!();
409         }
410     }
411 }
412 
413 // ===== impl Semaphore for (::Semaphore, capacity) =====
414 
415 use crate::sync::semaphore_ll::Permit;
416 
417 impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
418     type Permit = Permit;
419 
new_permit() -> Permit420     fn new_permit() -> Permit {
421         Permit::new()
422     }
423 
drop_permit(&self, permit: &mut Permit)424     fn drop_permit(&self, permit: &mut Permit) {
425         permit.release(1, &self.0);
426     }
427 
add_permit(&self)428     fn add_permit(&self) {
429         self.0.add_permits(1)
430     }
431 
is_idle(&self) -> bool432     fn is_idle(&self) -> bool {
433         self.0.available_permits() == self.1
434     }
435 
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll<Result<(), ClosedError>>436     fn poll_acquire(
437         &self,
438         cx: &mut Context<'_>,
439         permit: &mut Permit,
440     ) -> Poll<Result<(), ClosedError>> {
441         // Keep track of task budget
442         ready!(crate::coop::poll_proceed(cx));
443 
444         permit
445             .poll_acquire(cx, 1, &self.0)
446             .map_err(|_| ClosedError::new())
447     }
448 
try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError>449     fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
450         permit.try_acquire(1, &self.0)?;
451         Ok(())
452     }
453 
forget(&self, permit: &mut Self::Permit)454     fn forget(&self, permit: &mut Self::Permit) {
455         permit.forget(1);
456     }
457 
close(&self)458     fn close(&self) {
459         self.0.close();
460     }
461 }
462 
463 // ===== impl Semaphore for AtomicUsize =====
464 
465 use std::sync::atomic::Ordering::{Acquire, Release};
466 use std::usize;
467 
468 impl Semaphore for AtomicUsize {
469     type Permit = ();
470 
new_permit()471     fn new_permit() {}
472 
drop_permit(&self, _permit: &mut ())473     fn drop_permit(&self, _permit: &mut ()) {}
474 
add_permit(&self)475     fn add_permit(&self) {
476         let prev = self.fetch_sub(2, Release);
477 
478         if prev >> 1 == 0 {
479             // Something went wrong
480             process::abort();
481         }
482     }
483 
is_idle(&self) -> bool484     fn is_idle(&self) -> bool {
485         self.load(Acquire) >> 1 == 0
486     }
487 
poll_acquire( &self, _cx: &mut Context<'_>, permit: &mut (), ) -> Poll<Result<(), ClosedError>>488     fn poll_acquire(
489         &self,
490         _cx: &mut Context<'_>,
491         permit: &mut (),
492     ) -> Poll<Result<(), ClosedError>> {
493         Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
494     }
495 
try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError>496     fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
497         let mut curr = self.load(Acquire);
498 
499         loop {
500             if curr & 1 == 1 {
501                 return Err(TrySendError::Closed);
502             }
503 
504             if curr == usize::MAX ^ 1 {
505                 // Overflowed the ref count. There is no safe way to recover, so
506                 // abort the process. In practice, this should never happen.
507                 process::abort()
508             }
509 
510             match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
511                 Ok(_) => return Ok(()),
512                 Err(actual) => {
513                     curr = actual;
514                 }
515             }
516         }
517     }
518 
forget(&self, _permit: &mut ())519     fn forget(&self, _permit: &mut ()) {}
520 
close(&self)521     fn close(&self) {
522         self.fetch_or(1, Release);
523     }
524 }
525