1 use super::list;
2 use futures::Poll;
3 
4 use loom::{
5     futures::AtomicTask,
6     sync::atomic::AtomicUsize,
7     sync::{Arc, CausalCell},
8 };
9 
10 use std::fmt;
11 use std::process;
12 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
13 
14 /// Channel sender
15 pub(crate) struct Tx<T, S: Semaphore> {
16     inner: Arc<Chan<T, S>>,
17     permit: S::Permit,
18 }
19 
20 impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
21 where
22     S::Permit: fmt::Debug,
23     S: fmt::Debug,
24 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result25     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26         fmt.debug_struct("Tx")
27             .field("inner", &self.inner)
28             .field("permit", &self.permit)
29             .finish()
30     }
31 }
32 
33 /// Channel receiver
34 pub(crate) struct Rx<T, S: Semaphore> {
35     inner: Arc<Chan<T, S>>,
36 }
37 
38 impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
39 where
40     S: fmt::Debug,
41 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result42     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
44     }
45 }
46 
47 #[derive(Debug, Eq, PartialEq)]
48 pub(crate) enum TrySendError {
49     Closed,
50     NoPermits,
51 }
52 
53 pub(crate) trait Semaphore {
54     type Permit;
55 
new_permit() -> Self::Permit56     fn new_permit() -> Self::Permit;
57 
58     /// The permit is dropped without a value being sent. In this case, the
59     /// permit must be returned to the semaphore.
drop_permit(&self, permit: &mut Self::Permit)60     fn drop_permit(&self, permit: &mut Self::Permit);
61 
is_idle(&self) -> bool62     fn is_idle(&self) -> bool;
63 
add_permit(&self)64     fn add_permit(&self);
65 
poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>66     fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
67 
try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>68     fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
69 
70     /// A value was sent into the channel and the permit held by `tx` is
71     /// dropped. In this case, the permit should not immeditely be returned to
72     /// the semaphore. Instead, the permit is returnred to the semaphore once
73     /// the sent value is read by the rx handle.
forget(&self, permit: &mut Self::Permit)74     fn forget(&self, permit: &mut Self::Permit);
75 
close(&self)76     fn close(&self);
77 }
78 
79 struct Chan<T, S> {
80     /// Handle to the push half of the lock-free list.
81     tx: list::Tx<T>,
82 
83     /// Coordinates access to channel's capacity.
84     semaphore: S,
85 
86     /// Receiver task. Notified when a value is pushed into the channel.
87     rx_task: AtomicTask,
88 
89     /// Tracks the number of outstanding sender handles.
90     ///
91     /// When this drops to zero, the send half of the channel is closed.
92     tx_count: AtomicUsize,
93 
94     /// Only accessed by `Rx` handle.
95     rx_fields: CausalCell<RxFields<T>>,
96 }
97 
98 impl<T, S> fmt::Debug for Chan<T, S>
99 where
100     S: fmt::Debug,
101 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result102     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
103         fmt.debug_struct("Chan")
104             .field("tx", &self.tx)
105             .field("semaphore", &self.semaphore)
106             .field("rx_task", &self.rx_task)
107             .field("tx_count", &self.tx_count)
108             .field("rx_fields", &"...")
109             .finish()
110     }
111 }
112 
113 /// Fields only accessed by `Rx` handle.
114 struct RxFields<T> {
115     /// Channel receiver. This field is only accessed by the `Receiver` type.
116     list: list::Rx<T>,
117 
118     /// `true` if `Rx::close` is called.
119     rx_closed: bool,
120 }
121 
122 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result123     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
124         fmt.debug_struct("RxFields")
125             .field("list", &self.list)
126             .field("rx_closed", &self.rx_closed)
127             .finish()
128     }
129 }
130 
131 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
132 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
133 
channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) where S: Semaphore,134 pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
135 where
136     S: Semaphore,
137 {
138     let (tx, rx) = list::channel();
139 
140     let chan = Arc::new(Chan {
141         tx,
142         semaphore,
143         rx_task: AtomicTask::new(),
144         tx_count: AtomicUsize::new(1),
145         rx_fields: CausalCell::new(RxFields {
146             list: rx,
147             rx_closed: false,
148         }),
149     });
150 
151     (Tx::new(chan.clone()), Rx::new(chan))
152 }
153 
154 // ===== impl Tx =====
155 
156 impl<T, S> Tx<T, S>
157 where
158     S: Semaphore,
159 {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>160     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
161         Tx {
162             inner: chan,
163             permit: S::new_permit(),
164         }
165     }
166 
167     /// TODO: Docs
poll_ready(&mut self) -> Poll<(), ()>168     pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
169         self.inner.semaphore.poll_acquire(&mut self.permit)
170     }
171 
172     /// Send a message and notify the receiver.
try_send(&mut self, value: T) -> Result<(), (T, TrySendError)>173     pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
174         if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
175             return Err((value, e));
176         }
177 
178         // Push the value
179         self.inner.tx.push(value);
180 
181         // Notify the rx task
182         self.inner.rx_task.notify();
183 
184         // Release the permit
185         self.inner.semaphore.forget(&mut self.permit);
186 
187         Ok(())
188     }
189 }
190 
191 impl<T, S> Clone for Tx<T, S>
192 where
193     S: Semaphore,
194 {
clone(&self) -> Tx<T, S>195     fn clone(&self) -> Tx<T, S> {
196         // Using a Relaxed ordering here is sufficient as the caller holds a
197         // strong ref to `self`, preventing a concurrent decrement to zero.
198         self.inner.tx_count.fetch_add(1, Relaxed);
199 
200         Tx {
201             inner: self.inner.clone(),
202             permit: S::new_permit(),
203         }
204     }
205 }
206 
207 impl<T, S> Drop for Tx<T, S>
208 where
209     S: Semaphore,
210 {
drop(&mut self)211     fn drop(&mut self) {
212         self.inner.semaphore.drop_permit(&mut self.permit);
213 
214         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
215             return;
216         }
217 
218         // Close the list, which sends a `Close` message
219         self.inner.tx.close();
220 
221         // Notify the receiver
222         self.inner.rx_task.notify();
223     }
224 }
225 
226 // ===== impl Rx =====
227 
228 impl<T, S> Rx<T, S>
229 where
230     S: Semaphore,
231 {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>232     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
233         Rx { inner: chan }
234     }
235 
close(&mut self)236     pub(crate) fn close(&mut self) {
237         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
238             let rx_fields = unsafe { &mut *rx_fields_ptr };
239 
240             if rx_fields.rx_closed {
241                 return;
242             }
243 
244             rx_fields.rx_closed = true;
245         });
246 
247         self.inner.semaphore.close();
248     }
249 
250     /// Receive the next value
recv(&mut self) -> Poll<Option<T>, ()>251     pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
252         use super::block::Read::*;
253         use futures::Async::*;
254 
255         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
256             let rx_fields = unsafe { &mut *rx_fields_ptr };
257 
258             macro_rules! try_recv {
259                 () => {
260                     match rx_fields.list.pop(&self.inner.tx) {
261                         Some(Value(value)) => {
262                             self.inner.semaphore.add_permit();
263                             return Ok(Ready(Some(value)));
264                         }
265                         Some(Closed) => {
266                             // TODO: This check may not be required as it most
267                             // likely can only return `true` at this point. A
268                             // channel is closed when all tx handles are
269                             // dropped.  Dropping a tx handle releases memory,
270                             // which ensures that if dropping the tx handle is
271                             // visible, then all messages sent are also visible.
272                             assert!(self.inner.semaphore.is_idle());
273                             return Ok(Ready(None));
274                         }
275                         None => {} // fall through
276                     }
277                 };
278             }
279 
280             try_recv!();
281 
282             self.inner.rx_task.register();
283 
284             // It is possible that a value was pushed between attempting to read
285             // and registering the task, so we have to check the channel a
286             // second time here.
287             try_recv!();
288 
289             debug!(
290                 "recv; rx_closed = {:?}; is_idle = {:?}",
291                 rx_fields.rx_closed,
292                 self.inner.semaphore.is_idle()
293             );
294 
295             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
296                 Ok(Ready(None))
297             } else {
298                 Ok(NotReady)
299             }
300         })
301     }
302 }
303 
304 impl<T, S> Drop for Rx<T, S>
305 where
306     S: Semaphore,
307 {
drop(&mut self)308     fn drop(&mut self) {
309         use super::block::Read::Value;
310 
311         self.close();
312 
313         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
314             let rx_fields = unsafe { &mut *rx_fields_ptr };
315 
316             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
317                 self.inner.semaphore.add_permit();
318             }
319         })
320     }
321 }
322 
323 // ===== impl Chan =====
324 
325 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)326     fn drop(&mut self) {
327         use super::block::Read::Value;
328 
329         // Safety: the only owner of the rx fields is Chan, and eing
330         // inside its own Drop means we're the last ones to touch it.
331         self.rx_fields.with_mut(|rx_fields_ptr| {
332             let rx_fields = unsafe { &mut *rx_fields_ptr };
333 
334             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
335             unsafe { rx_fields.list.free_blocks() };
336         });
337     }
338 }
339 
340 use semaphore::TryAcquireError;
341 
342 impl From<TryAcquireError> for TrySendError {
from(src: TryAcquireError) -> TrySendError343     fn from(src: TryAcquireError) -> TrySendError {
344         if src.is_closed() {
345             TrySendError::Closed
346         } else if src.is_no_permits() {
347             TrySendError::NoPermits
348         } else {
349             unreachable!();
350         }
351     }
352 }
353 
354 // ===== impl Semaphore for (::Semaphore, capacity) =====
355 
356 use semaphore::Permit;
357 
358 impl Semaphore for (::semaphore::Semaphore, usize) {
359     type Permit = Permit;
360 
new_permit() -> Permit361     fn new_permit() -> Permit {
362         Permit::new()
363     }
364 
drop_permit(&self, permit: &mut Permit)365     fn drop_permit(&self, permit: &mut Permit) {
366         permit.release(&self.0);
367     }
368 
add_permit(&self)369     fn add_permit(&self) {
370         self.0.add_permits(1)
371     }
372 
is_idle(&self) -> bool373     fn is_idle(&self) -> bool {
374         self.0.available_permits() == self.1
375     }
376 
poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()>377     fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
378         permit.poll_acquire(&self.0).map_err(|_| ())
379     }
380 
try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError>381     fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
382         permit.try_acquire(&self.0)?;
383         Ok(())
384     }
385 
forget(&self, permit: &mut Self::Permit)386     fn forget(&self, permit: &mut Self::Permit) {
387         permit.forget()
388     }
389 
close(&self)390     fn close(&self) {
391         self.0.close();
392     }
393 }
394 
395 // ===== impl Semaphore for AtomicUsize =====
396 
397 use std::sync::atomic::Ordering::{Acquire, Release};
398 use std::usize;
399 
400 impl Semaphore for AtomicUsize {
401     type Permit = ();
402 
new_permit()403     fn new_permit() {}
404 
drop_permit(&self, _permit: &mut ())405     fn drop_permit(&self, _permit: &mut ()) {}
406 
add_permit(&self)407     fn add_permit(&self) {
408         let prev = self.fetch_sub(2, Release);
409 
410         if prev >> 1 == 0 {
411             // Something went wrong
412             process::abort();
413         }
414     }
415 
is_idle(&self) -> bool416     fn is_idle(&self) -> bool {
417         self.load(Acquire) >> 1 == 0
418     }
419 
poll_acquire(&self, permit: &mut ()) -> Poll<(), ()>420     fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
421         use futures::Async::Ready;
422         self.try_acquire(permit).map(Ready).map_err(|_| ())
423     }
424 
try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError>425     fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
426         let mut curr = self.load(Acquire);
427 
428         loop {
429             if curr & 1 == 1 {
430                 return Err(TrySendError::Closed);
431             }
432 
433             if curr == usize::MAX ^ 1 {
434                 // Overflowed the ref count. There is no safe way to recover, so
435                 // abort the process. In practice, this should never happen.
436                 process::abort()
437             }
438 
439             match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
440                 Ok(_) => return Ok(()),
441                 Err(actual) => {
442                     curr = actual;
443                 }
444             }
445         }
446     }
447 
forget(&self, _permit: &mut ())448     fn forget(&self, _permit: &mut ()) {}
449 
close(&self)450     fn close(&self) {
451         self.fetch_or(1, Release);
452     }
453 }
454