1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::sync::mpsc::error::{ClosedError, TryRecvError};
6 use crate::sync::mpsc::{error, list};
7
8 use std::fmt;
9 use std::process;
10 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
11 use std::task::Poll::{Pending, Ready};
12 use std::task::{Context, Poll};
13
14 /// Channel sender
15 pub(crate) struct Tx<T, S: Semaphore> {
16 inner: Arc<Chan<T, S>>,
17 permit: S::Permit,
18 }
19
20 impl<T, S: Semaphore> fmt::Debug for Tx<T, S>
21 where
22 S::Permit: fmt::Debug,
23 S: fmt::Debug,
24 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result25 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
26 fmt.debug_struct("Tx")
27 .field("inner", &self.inner)
28 .field("permit", &self.permit)
29 .finish()
30 }
31 }
32
33 /// Channel receiver
34 pub(crate) struct Rx<T, S: Semaphore> {
35 inner: Arc<Chan<T, S>>,
36 }
37
38 impl<T, S: Semaphore> fmt::Debug for Rx<T, S>
39 where
40 S: fmt::Debug,
41 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result42 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
43 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
44 }
45 }
46
47 #[derive(Debug, Eq, PartialEq)]
48 pub(crate) enum TrySendError {
49 Closed,
50 Full,
51 }
52
53 impl<T> From<(T, TrySendError)> for error::SendError<T> {
from(src: (T, TrySendError)) -> error::SendError<T>54 fn from(src: (T, TrySendError)) -> error::SendError<T> {
55 match src.1 {
56 TrySendError::Closed => error::SendError(src.0),
57 TrySendError::Full => unreachable!(),
58 }
59 }
60 }
61
62 impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
from(src: (T, TrySendError)) -> error::TrySendError<T>63 fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
64 match src.1 {
65 TrySendError::Closed => error::TrySendError::Closed(src.0),
66 TrySendError::Full => error::TrySendError::Full(src.0),
67 }
68 }
69 }
70
71 pub(crate) trait Semaphore {
72 type Permit;
73
new_permit() -> Self::Permit74 fn new_permit() -> Self::Permit;
75
76 /// The permit is dropped without a value being sent. In this case, the
77 /// permit must be returned to the semaphore.
drop_permit(&self, permit: &mut Self::Permit)78 fn drop_permit(&self, permit: &mut Self::Permit);
79
is_idle(&self) -> bool80 fn is_idle(&self) -> bool;
81
add_permit(&self)82 fn add_permit(&self);
83
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Self::Permit, ) -> Poll<Result<(), ClosedError>>84 fn poll_acquire(
85 &self,
86 cx: &mut Context<'_>,
87 permit: &mut Self::Permit,
88 ) -> Poll<Result<(), ClosedError>>;
89
try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>90 fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
91
92 /// A value was sent into the channel and the permit held by `tx` is
93 /// dropped. In this case, the permit should not immeditely be returned to
94 /// the semaphore. Instead, the permit is returnred to the semaphore once
95 /// the sent value is read by the rx handle.
forget(&self, permit: &mut Self::Permit)96 fn forget(&self, permit: &mut Self::Permit);
97
close(&self)98 fn close(&self);
99 }
100
101 struct Chan<T, S> {
102 /// Handle to the push half of the lock-free list.
103 tx: list::Tx<T>,
104
105 /// Coordinates access to channel's capacity.
106 semaphore: S,
107
108 /// Receiver waker. Notified when a value is pushed into the channel.
109 rx_waker: AtomicWaker,
110
111 /// Tracks the number of outstanding sender handles.
112 ///
113 /// When this drops to zero, the send half of the channel is closed.
114 tx_count: AtomicUsize,
115
116 /// Only accessed by `Rx` handle.
117 rx_fields: UnsafeCell<RxFields<T>>,
118 }
119
120 impl<T, S> fmt::Debug for Chan<T, S>
121 where
122 S: fmt::Debug,
123 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result124 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
125 fmt.debug_struct("Chan")
126 .field("tx", &self.tx)
127 .field("semaphore", &self.semaphore)
128 .field("rx_waker", &self.rx_waker)
129 .field("tx_count", &self.tx_count)
130 .field("rx_fields", &"...")
131 .finish()
132 }
133 }
134
135 /// Fields only accessed by `Rx` handle.
136 struct RxFields<T> {
137 /// Channel receiver. This field is only accessed by the `Receiver` type.
138 list: list::Rx<T>,
139
140 /// `true` if `Rx::close` is called.
141 rx_closed: bool,
142 }
143
144 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result145 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
146 fmt.debug_struct("RxFields")
147 .field("list", &self.list)
148 .field("rx_closed", &self.rx_closed)
149 .finish()
150 }
151 }
152
153 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
154 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
155
channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) where S: Semaphore,156 pub(crate) fn channel<T, S>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)
157 where
158 S: Semaphore,
159 {
160 let (tx, rx) = list::channel();
161
162 let chan = Arc::new(Chan {
163 tx,
164 semaphore,
165 rx_waker: AtomicWaker::new(),
166 tx_count: AtomicUsize::new(1),
167 rx_fields: UnsafeCell::new(RxFields {
168 list: rx,
169 rx_closed: false,
170 }),
171 });
172
173 (Tx::new(chan.clone()), Rx::new(chan))
174 }
175
176 // ===== impl Tx =====
177
178 impl<T, S> Tx<T, S>
179 where
180 S: Semaphore,
181 {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>182 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
183 Tx {
184 inner: chan,
185 permit: S::new_permit(),
186 }
187 }
188
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>189 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
190 self.inner.semaphore.poll_acquire(cx, &mut self.permit)
191 }
192
disarm(&mut self)193 pub(crate) fn disarm(&mut self) {
194 // TODO: should this error if not acquired?
195 self.inner.semaphore.drop_permit(&mut self.permit)
196 }
197
198 /// Send a message and notify the receiver.
try_send(&mut self, value: T) -> Result<(), (T, TrySendError)>199 pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
200 self.inner.try_send(value, &mut self.permit)
201 }
202 }
203
204 impl<T> Tx<T, (crate::sync::semaphore_ll::Semaphore, usize)> {
is_ready(&self) -> bool205 pub(crate) fn is_ready(&self) -> bool {
206 self.permit.is_acquired()
207 }
208 }
209
210 impl<T> Tx<T, AtomicUsize> {
send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)>211 pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
212 self.inner.try_send(value, &mut ())
213 }
214 }
215
216 impl<T, S> Clone for Tx<T, S>
217 where
218 S: Semaphore,
219 {
clone(&self) -> Tx<T, S>220 fn clone(&self) -> Tx<T, S> {
221 // Using a Relaxed ordering here is sufficient as the caller holds a
222 // strong ref to `self`, preventing a concurrent decrement to zero.
223 self.inner.tx_count.fetch_add(1, Relaxed);
224
225 Tx {
226 inner: self.inner.clone(),
227 permit: S::new_permit(),
228 }
229 }
230 }
231
232 impl<T, S> Drop for Tx<T, S>
233 where
234 S: Semaphore,
235 {
drop(&mut self)236 fn drop(&mut self) {
237 self.inner.semaphore.drop_permit(&mut self.permit);
238
239 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
240 return;
241 }
242
243 // Close the list, which sends a `Close` message
244 self.inner.tx.close();
245
246 // Notify the receiver
247 self.inner.rx_waker.wake();
248 }
249 }
250
251 // ===== impl Rx =====
252
253 impl<T, S> Rx<T, S>
254 where
255 S: Semaphore,
256 {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>257 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
258 Rx { inner: chan }
259 }
260
close(&mut self)261 pub(crate) fn close(&mut self) {
262 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
263 let rx_fields = unsafe { &mut *rx_fields_ptr };
264
265 if rx_fields.rx_closed {
266 return;
267 }
268
269 rx_fields.rx_closed = true;
270 });
271
272 self.inner.semaphore.close();
273 }
274
275 /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>276 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
277 use super::block::Read::*;
278
279 // Keep track of task budget
280 ready!(crate::coop::poll_proceed(cx));
281
282 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
283 let rx_fields = unsafe { &mut *rx_fields_ptr };
284
285 macro_rules! try_recv {
286 () => {
287 match rx_fields.list.pop(&self.inner.tx) {
288 Some(Value(value)) => {
289 self.inner.semaphore.add_permit();
290 return Ready(Some(value));
291 }
292 Some(Closed) => {
293 // TODO: This check may not be required as it most
294 // likely can only return `true` at this point. A
295 // channel is closed when all tx handles are
296 // dropped. Dropping a tx handle releases memory,
297 // which ensures that if dropping the tx handle is
298 // visible, then all messages sent are also visible.
299 assert!(self.inner.semaphore.is_idle());
300 return Ready(None);
301 }
302 None => {} // fall through
303 }
304 };
305 }
306
307 try_recv!();
308
309 self.inner.rx_waker.register_by_ref(cx.waker());
310
311 // It is possible that a value was pushed between attempting to read
312 // and registering the task, so we have to check the channel a
313 // second time here.
314 try_recv!();
315
316 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
317 Ready(None)
318 } else {
319 Pending
320 }
321 })
322 }
323
324 /// Receives the next value without blocking
try_recv(&mut self) -> Result<T, TryRecvError>325 pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
326 use super::block::Read::*;
327 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
328 let rx_fields = unsafe { &mut *rx_fields_ptr };
329 match rx_fields.list.pop(&self.inner.tx) {
330 Some(Value(value)) => {
331 self.inner.semaphore.add_permit();
332 Ok(value)
333 }
334 Some(Closed) => Err(TryRecvError::Closed),
335 None => Err(TryRecvError::Empty),
336 }
337 })
338 }
339 }
340
341 impl<T, S> Drop for Rx<T, S>
342 where
343 S: Semaphore,
344 {
drop(&mut self)345 fn drop(&mut self) {
346 use super::block::Read::Value;
347
348 self.close();
349
350 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
351 let rx_fields = unsafe { &mut *rx_fields_ptr };
352
353 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
354 self.inner.semaphore.add_permit();
355 }
356 })
357 }
358 }
359
360 // ===== impl Chan =====
361
362 impl<T, S> Chan<T, S>
363 where
364 S: Semaphore,
365 {
try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)>366 fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
367 if let Err(e) = self.semaphore.try_acquire(permit) {
368 return Err((value, e));
369 }
370
371 // Push the value
372 self.tx.push(value);
373
374 // Notify the rx task
375 self.rx_waker.wake();
376
377 // Release the permit
378 self.semaphore.forget(permit);
379
380 Ok(())
381 }
382 }
383
384 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)385 fn drop(&mut self) {
386 use super::block::Read::Value;
387
388 // Safety: the only owner of the rx fields is Chan, and eing
389 // inside its own Drop means we're the last ones to touch it.
390 self.rx_fields.with_mut(|rx_fields_ptr| {
391 let rx_fields = unsafe { &mut *rx_fields_ptr };
392
393 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
394 unsafe { rx_fields.list.free_blocks() };
395 });
396 }
397 }
398
399 use crate::sync::semaphore_ll::TryAcquireError;
400
401 impl From<TryAcquireError> for TrySendError {
from(src: TryAcquireError) -> TrySendError402 fn from(src: TryAcquireError) -> TrySendError {
403 if src.is_closed() {
404 TrySendError::Closed
405 } else if src.is_no_permits() {
406 TrySendError::Full
407 } else {
408 unreachable!();
409 }
410 }
411 }
412
413 // ===== impl Semaphore for (::Semaphore, capacity) =====
414
415 use crate::sync::semaphore_ll::Permit;
416
417 impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
418 type Permit = Permit;
419
new_permit() -> Permit420 fn new_permit() -> Permit {
421 Permit::new()
422 }
423
drop_permit(&self, permit: &mut Permit)424 fn drop_permit(&self, permit: &mut Permit) {
425 permit.release(1, &self.0);
426 }
427
add_permit(&self)428 fn add_permit(&self) {
429 self.0.add_permits(1)
430 }
431
is_idle(&self) -> bool432 fn is_idle(&self) -> bool {
433 self.0.available_permits() == self.1
434 }
435
poll_acquire( &self, cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll<Result<(), ClosedError>>436 fn poll_acquire(
437 &self,
438 cx: &mut Context<'_>,
439 permit: &mut Permit,
440 ) -> Poll<Result<(), ClosedError>> {
441 // Keep track of task budget
442 ready!(crate::coop::poll_proceed(cx));
443
444 permit
445 .poll_acquire(cx, 1, &self.0)
446 .map_err(|_| ClosedError::new())
447 }
448
try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError>449 fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
450 permit.try_acquire(1, &self.0)?;
451 Ok(())
452 }
453
forget(&self, permit: &mut Self::Permit)454 fn forget(&self, permit: &mut Self::Permit) {
455 permit.forget(1);
456 }
457
close(&self)458 fn close(&self) {
459 self.0.close();
460 }
461 }
462
463 // ===== impl Semaphore for AtomicUsize =====
464
465 use std::sync::atomic::Ordering::{Acquire, Release};
466 use std::usize;
467
468 impl Semaphore for AtomicUsize {
469 type Permit = ();
470
new_permit()471 fn new_permit() {}
472
drop_permit(&self, _permit: &mut ())473 fn drop_permit(&self, _permit: &mut ()) {}
474
add_permit(&self)475 fn add_permit(&self) {
476 let prev = self.fetch_sub(2, Release);
477
478 if prev >> 1 == 0 {
479 // Something went wrong
480 process::abort();
481 }
482 }
483
is_idle(&self) -> bool484 fn is_idle(&self) -> bool {
485 self.load(Acquire) >> 1 == 0
486 }
487
poll_acquire( &self, _cx: &mut Context<'_>, permit: &mut (), ) -> Poll<Result<(), ClosedError>>488 fn poll_acquire(
489 &self,
490 _cx: &mut Context<'_>,
491 permit: &mut (),
492 ) -> Poll<Result<(), ClosedError>> {
493 Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
494 }
495
try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError>496 fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
497 let mut curr = self.load(Acquire);
498
499 loop {
500 if curr & 1 == 1 {
501 return Err(TrySendError::Closed);
502 }
503
504 if curr == usize::MAX ^ 1 {
505 // Overflowed the ref count. There is no safe way to recover, so
506 // abort the process. In practice, this should never happen.
507 process::abort()
508 }
509
510 match self.compare_exchange(curr, curr + 2, AcqRel, Acquire) {
511 Ok(_) => return Ok(()),
512 Err(actual) => {
513 curr = actual;
514 }
515 }
516 }
517 }
518
forget(&self, _permit: &mut ())519 fn forget(&self, _permit: &mut ()) {}
520
close(&self)521 fn close(&self) {
522 self.fetch_or(1, Release);
523 }
524 }
525