1 use super::list;
2 use futures::Poll;
3
4 use loom::{
5 futures::AtomicTask,
6 sync::atomic::AtomicUsize,
7 sync::{Arc, CausalCell},
8 };
9
10 use std::fmt;
11 use std::process;
12 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
13
14 /// Channel sender
15 pub(crate) struct Tx<T, S: Semaphore> {
16 inner: Arc<Chan<T, S>>,
17 permit: S::Permit,
18 }
19
20 impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
21 where
22 S::Permit: fmt::Debug,
23 S: fmt::Debug,
24 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result25 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26 fmt.debug_struct("Tx")
27 .field("inner", &self.inner)
28 .field("permit", &self.permit)
29 .finish()
30 }
31 }
32
33 /// Channel receiver
34 pub(crate) struct Rx<T, S: Semaphore> {
35 inner: Arc<Chan<T, S>>,
36 }
37
38 impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
39 where
40 S: fmt::Debug,
41 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result42 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
44 }
45 }
46
47 #[derive(Debug, Eq, PartialEq)]
48 pub(crate) enum TrySendError {
49 Closed,
50 NoPermits,
51 }
52
53 pub(crate) trait Semaphore {
54 type Permit;
55
new_permit() -> Self::Permit56 fn new_permit() -> Self::Permit;
57
58 /// The permit is dropped without a value being sent. In this case, the
59 /// permit must be returned to the semaphore.
drop_permit(&self, permit: &mut Self::Permit)60 fn drop_permit(&self, permit: &mut Self::Permit);
61
is_idle(&self) -> bool62 fn is_idle(&self) -> bool;
63
add_permit(&self)64 fn add_permit(&self);
65
poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>66 fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
67
try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>68 fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
69
70 /// A value was sent into the channel and the permit held by `tx` is
71 /// dropped. In this case, the permit should not immeditely be returned to
72 /// the semaphore. Instead, the permit is returnred to the semaphore once
73 /// the sent value is read by the rx handle.
forget(&self, permit: &mut Self::Permit)74 fn forget(&self, permit: &mut Self::Permit);
75
close(&self)76 fn close(&self);
77 }
78
79 struct Chan<T, S> {
80 /// Handle to the push half of the lock-free list.
81 tx: list::Tx<T>,
82
83 /// Coordinates access to channel's capacity.
84 semaphore: S,
85
86 /// Receiver task. Notified when a value is pushed into the channel.
87 rx_task: AtomicTask,
88
89 /// Tracks the number of outstanding sender handles.
90 ///
91 /// When this drops to zero, the send half of the channel is closed.
92 tx_count: AtomicUsize,
93
94 /// Only accessed by `Rx` handle.
95 rx_fields: CausalCell<RxFields<T>>,
96 }
97
98 impl<T, S> fmt::Debug for Chan<T, S>
99 where
100 S: fmt::Debug,
101 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result102 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
103 fmt.debug_struct("Chan")
104 .field("tx", &self.tx)
105 .field("semaphore", &self.semaphore)
106 .field("rx_task", &self.rx_task)
107 .field("tx_count", &self.tx_count)
108 .field("rx_fields", &"...")
109 .finish()
110 }
111 }
112
113 /// Fields only accessed by `Rx` handle.
114 struct RxFields<T> {
115 /// Channel receiver. This field is only accessed by the `Receiver` type.
116 list: list::Rx<T>,
117
118 /// `true` if `Rx::close` is called.
119 rx_closed: bool,
120 }
121
122 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result123 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
124 fmt.debug_struct("RxFields")
125 .field("list", &self.list)
126 .field("rx_closed", &self.rx_closed)
127 .finish()
128 }
129 }
130
131 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
132 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
133
channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) where S: Semaphore,134 pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
135 where
136 S: Semaphore,
137 {
138 let (tx, rx) = list::channel();
139
140 let chan = Arc::new(Chan {
141 tx,
142 semaphore,
143 rx_task: AtomicTask::new(),
144 tx_count: AtomicUsize::new(1),
145 rx_fields: CausalCell::new(RxFields {
146 list: rx,
147 rx_closed: false,
148 }),
149 });
150
151 (Tx::new(chan.clone()), Rx::new(chan))
152 }
153
154 // ===== impl Tx =====
155
156 impl<T, S> Tx<T, S>
157 where
158 S: Semaphore,
159 {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>160 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
161 Tx {
162 inner: chan,
163 permit: S::new_permit(),
164 }
165 }
166
167 /// TODO: Docs
poll_ready(&mut self) -> Poll<(), ()>168 pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
169 self.inner.semaphore.poll_acquire(&mut self.permit)
170 }
171
172 /// Send a message and notify the receiver.
try_send(&mut self, value: T) -> Result<(), (T, TrySendError)>173 pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
174 if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
175 return Err((value, e));
176 }
177
178 // Push the value
179 self.inner.tx.push(value);
180
181 // Notify the rx task
182 self.inner.rx_task.notify();
183
184 // Release the permit
185 self.inner.semaphore.forget(&mut self.permit);
186
187 Ok(())
188 }
189 }
190
191 impl<T, S> Clone for Tx<T, S>
192 where
193 S: Semaphore,
194 {
clone(&self) -> Tx<T, S>195 fn clone(&self) -> Tx<T, S> {
196 // Using a Relaxed ordering here is sufficient as the caller holds a
197 // strong ref to `self`, preventing a concurrent decrement to zero.
198 self.inner.tx_count.fetch_add(1, Relaxed);
199
200 Tx {
201 inner: self.inner.clone(),
202 permit: S::new_permit(),
203 }
204 }
205 }
206
207 impl<T, S> Drop for Tx<T, S>
208 where
209 S: Semaphore,
210 {
drop(&mut self)211 fn drop(&mut self) {
212 self.inner.semaphore.drop_permit(&mut self.permit);
213
214 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
215 return;
216 }
217
218 // Close the list, which sends a `Close` message
219 self.inner.tx.close();
220
221 // Notify the receiver
222 self.inner.rx_task.notify();
223 }
224 }
225
226 // ===== impl Rx =====
227
228 impl<T, S> Rx<T, S>
229 where
230 S: Semaphore,
231 {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>232 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
233 Rx { inner: chan }
234 }
235
close(&mut self)236 pub(crate) fn close(&mut self) {
237 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
238 let rx_fields = unsafe { &mut *rx_fields_ptr };
239
240 if rx_fields.rx_closed {
241 return;
242 }
243
244 rx_fields.rx_closed = true;
245 });
246
247 self.inner.semaphore.close();
248 }
249
250 /// Receive the next value
recv(&mut self) -> Poll<Option<T>, ()>251 pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
252 use super::block::Read::*;
253 use futures::Async::*;
254
255 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
256 let rx_fields = unsafe { &mut *rx_fields_ptr };
257
258 macro_rules! try_recv {
259 () => {
260 match rx_fields.list.pop(&self.inner.tx) {
261 Some(Value(value)) => {
262 self.inner.semaphore.add_permit();
263 return Ok(Ready(Some(value)));
264 }
265 Some(Closed) => {
266 // TODO: This check may not be required as it most
267 // likely can only return `true` at this point. A
268 // channel is closed when all tx handles are
269 // dropped. Dropping a tx handle releases memory,
270 // which ensures that if dropping the tx handle is
271 // visible, then all messages sent are also visible.
272 assert!(self.inner.semaphore.is_idle());
273 return Ok(Ready(None));
274 }
275 None => {} // fall through
276 }
277 };
278 }
279
280 try_recv!();
281
282 self.inner.rx_task.register();
283
284 // It is possible that a value was pushed between attempting to read
285 // and registering the task, so we have to check the channel a
286 // second time here.
287 try_recv!();
288
289 debug!(
290 "recv; rx_closed = {:?}; is_idle = {:?}",
291 rx_fields.rx_closed,
292 self.inner.semaphore.is_idle()
293 );
294
295 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
296 Ok(Ready(None))
297 } else {
298 Ok(NotReady)
299 }
300 })
301 }
302 }
303
304 impl<T, S> Drop for Rx<T, S>
305 where
306 S: Semaphore,
307 {
drop(&mut self)308 fn drop(&mut self) {
309 use super::block::Read::Value;
310
311 self.close();
312
313 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
314 let rx_fields = unsafe { &mut *rx_fields_ptr };
315
316 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
317 self.inner.semaphore.add_permit();
318 }
319 })
320 }
321 }
322
323 // ===== impl Chan =====
324
325 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)326 fn drop(&mut self) {
327 use super::block::Read::Value;
328
329 // Safety: the only owner of the rx fields is Chan, and eing
330 // inside its own Drop means we're the last ones to touch it.
331 self.rx_fields.with_mut(|rx_fields_ptr| {
332 let rx_fields = unsafe { &mut *rx_fields_ptr };
333
334 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
335 unsafe { rx_fields.list.free_blocks() };
336 });
337 }
338 }
339
340 use semaphore::TryAcquireError;
341
342 impl From<TryAcquireError> for TrySendError {
from(src: TryAcquireError) -> TrySendError343 fn from(src: TryAcquireError) -> TrySendError {
344 if src.is_closed() {
345 TrySendError::Closed
346 } else if src.is_no_permits() {
347 TrySendError::NoPermits
348 } else {
349 unreachable!();
350 }
351 }
352 }
353
354 // ===== impl Semaphore for (::Semaphore, capacity) =====
355
356 use semaphore::Permit;
357
358 impl Semaphore for (::semaphore::Semaphore, usize) {
359 type Permit = Permit;
360
new_permit() -> Permit361 fn new_permit() -> Permit {
362 Permit::new()
363 }
364
drop_permit(&self, permit: &mut Permit)365 fn drop_permit(&self, permit: &mut Permit) {
366 permit.release(&self.0);
367 }
368
add_permit(&self)369 fn add_permit(&self) {
370 self.0.add_permits(1)
371 }
372
is_idle(&self) -> bool373 fn is_idle(&self) -> bool {
374 self.0.available_permits() == self.1
375 }
376
poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()>377 fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
378 permit.poll_acquire(&self.0).map_err(|_| ())
379 }
380
try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError>381 fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
382 permit.try_acquire(&self.0)?;
383 Ok(())
384 }
385
forget(&self, permit: &mut Self::Permit)386 fn forget(&self, permit: &mut Self::Permit) {
387 permit.forget()
388 }
389
close(&self)390 fn close(&self) {
391 self.0.close();
392 }
393 }
394
395 // ===== impl Semaphore for AtomicUsize =====
396
397 use std::sync::atomic::Ordering::{Acquire, Release};
398 use std::usize;
399
400 impl Semaphore for AtomicUsize {
401 type Permit = ();
402
new_permit()403 fn new_permit() {}
404
drop_permit(&self, _permit: &mut ())405 fn drop_permit(&self, _permit: &mut ()) {}
406
add_permit(&self)407 fn add_permit(&self) {
408 let prev = self.fetch_sub(2, Release);
409
410 if prev >> 1 == 0 {
411 // Something went wrong
412 process::abort();
413 }
414 }
415
is_idle(&self) -> bool416 fn is_idle(&self) -> bool {
417 self.load(Acquire) >> 1 == 0
418 }
419
poll_acquire(&self, permit: &mut ()) -> Poll<(), ()>420 fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
421 use futures::Async::Ready;
422 self.try_acquire(permit).map(Ready).map_err(|_| ())
423 }
424
try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError>425 fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
426 let mut curr = self.load(Acquire);
427
428 loop {
429 if curr & 1 == 1 {
430 return Err(TrySendError::Closed);
431 }
432
433 if curr == usize::MAX ^ 1 {
434 // Overflowed the ref count. There is no safe way to recover, so
435 // abort the process. In practice, this should never happen.
436 process::abort()
437 }
438
439 match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
440 Ok(_) => return Ok(()),
441 Err(actual) => {
442 curr = actual;
443 }
444 }
445 }
446 }
447
forget(&self, _permit: &mut ())448 fn forget(&self, _permit: &mut ()) {}
449
close(&self)450 fn close(&self) {
451 self.fetch_or(1, Release);
452 }
453 }
454