1 //! The channel interface.
2
3 use std::fmt;
4 use std::iter::FusedIterator;
5 use std::mem;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9
10 use crate::context::Context;
11 use crate::counter;
12 use crate::err::{
13 RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14 };
15 use crate::flavors;
16 use crate::select::{Operation, SelectHandle, Token};
17
18 /// Creates a channel of unbounded capacity.
19 ///
20 /// This channel has a growable buffer that can hold any number of messages at a time.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use std::thread;
26 /// use crossbeam_channel::unbounded;
27 ///
28 /// let (s, r) = unbounded();
29 ///
30 /// // Computes the n-th Fibonacci number.
31 /// fn fib(n: i32) -> i32 {
32 /// if n <= 1 {
33 /// n
34 /// } else {
35 /// fib(n - 1) + fib(n - 2)
36 /// }
37 /// }
38 ///
39 /// // Spawn an asynchronous computation.
40 /// thread::spawn(move || s.send(fib(20)).unwrap());
41 ///
42 /// // Print the result of the computation.
43 /// println!("{}", r.recv().unwrap());
44 /// ```
unbounded<T>() -> (Sender<T>, Receiver<T>)45 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46 let (s, r) = counter::new(flavors::list::Channel::new());
47 let s = Sender {
48 flavor: SenderFlavor::List(s),
49 };
50 let r = Receiver {
51 flavor: ReceiverFlavor::List(r),
52 };
53 (s, r)
54 }
55
56 /// Creates a channel of bounded capacity.
57 ///
58 /// This channel has a buffer that can hold at most `cap` messages at a time.
59 ///
60 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61 /// receive operations must appear at the same time in order to pair up and pass the message over.
62 ///
63 /// # Examples
64 ///
65 /// A channel of capacity 1:
66 ///
67 /// ```
68 /// use std::thread;
69 /// use std::time::Duration;
70 /// use crossbeam_channel::bounded;
71 ///
72 /// let (s, r) = bounded(1);
73 ///
74 /// // This call returns immediately because there is enough space in the channel.
75 /// s.send(1).unwrap();
76 ///
77 /// thread::spawn(move || {
78 /// // This call blocks the current thread because the channel is full.
79 /// // It will be able to complete only after the first message is received.
80 /// s.send(2).unwrap();
81 /// });
82 ///
83 /// thread::sleep(Duration::from_secs(1));
84 /// assert_eq!(r.recv(), Ok(1));
85 /// assert_eq!(r.recv(), Ok(2));
86 /// ```
87 ///
88 /// A zero-capacity channel:
89 ///
90 /// ```
91 /// use std::thread;
92 /// use std::time::Duration;
93 /// use crossbeam_channel::bounded;
94 ///
95 /// let (s, r) = bounded(0);
96 ///
97 /// thread::spawn(move || {
98 /// // This call blocks the current thread until a receive operation appears
99 /// // on the other side of the channel.
100 /// s.send(1).unwrap();
101 /// });
102 ///
103 /// thread::sleep(Duration::from_secs(1));
104 /// assert_eq!(r.recv(), Ok(1));
105 /// ```
bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)106 pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107 if cap == 0 {
108 let (s, r) = counter::new(flavors::zero::Channel::new());
109 let s = Sender {
110 flavor: SenderFlavor::Zero(s),
111 };
112 let r = Receiver {
113 flavor: ReceiverFlavor::Zero(r),
114 };
115 (s, r)
116 } else {
117 let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118 let s = Sender {
119 flavor: SenderFlavor::Array(s),
120 };
121 let r = Receiver {
122 flavor: ReceiverFlavor::Array(r),
123 };
124 (s, r)
125 }
126 }
127
128 /// Creates a receiver that delivers a message after a certain duration of time.
129 ///
130 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
132 /// sent.
133 ///
134 /// # Examples
135 ///
136 /// Using an `after` channel for timeouts:
137 ///
138 /// ```
139 /// use std::time::Duration;
140 /// use crossbeam_channel::{after, select, unbounded};
141 ///
142 /// let (s, r) = unbounded::<i32>();
143 /// let timeout = Duration::from_millis(100);
144 ///
145 /// select! {
146 /// recv(r) -> msg => println!("received {:?}", msg),
147 /// recv(after(timeout)) -> _ => println!("timed out"),
148 /// }
149 /// ```
150 ///
151 /// When the message gets sent:
152 ///
153 /// ```
154 /// use std::thread;
155 /// use std::time::{Duration, Instant};
156 /// use crossbeam_channel::after;
157 ///
158 /// // Converts a number of milliseconds into a `Duration`.
159 /// let ms = |ms| Duration::from_millis(ms);
160 ///
161 /// // Returns `true` if `a` and `b` are very close `Instant`s.
162 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
163 ///
164 /// let start = Instant::now();
165 /// let r = after(ms(100));
166 ///
167 /// thread::sleep(ms(500));
168 ///
169 /// // This message was sent 100 ms from the start and received 500 ms from the start.
170 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
171 /// assert!(eq(Instant::now(), start + ms(500)));
172 /// ```
after(duration: Duration) -> Receiver<Instant>173 pub fn after(duration: Duration) -> Receiver<Instant> {
174 Receiver {
175 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
176 }
177 }
178
179 /// Creates a receiver that delivers a message at a certain instant in time.
180 ///
181 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
182 /// be sent into the channel at the moment in time `when`. The message is the instant at which it
183 /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
184 /// instantly to the receiver.
185 ///
186 /// # Examples
187 ///
188 /// Using an `at` channel for timeouts:
189 ///
190 /// ```
191 /// use std::time::{Instant, Duration};
192 /// use crossbeam_channel::{at, select, unbounded};
193 ///
194 /// let (s, r) = unbounded::<i32>();
195 /// let deadline = Instant::now() + Duration::from_millis(500);
196 ///
197 /// select! {
198 /// recv(r) -> msg => println!("received {:?}", msg),
199 /// recv(at(deadline)) -> _ => println!("timed out"),
200 /// }
201 /// ```
202 ///
203 /// When the message gets sent:
204 ///
205 /// ```
206 /// use std::time::{Duration, Instant};
207 /// use crossbeam_channel::at;
208 ///
209 /// // Converts a number of milliseconds into a `Duration`.
210 /// let ms = |ms| Duration::from_millis(ms);
211 ///
212 /// let start = Instant::now();
213 /// let end = start + ms(100);
214 ///
215 /// let r = at(end);
216 ///
217 /// // This message was sent 100 ms from the start
218 /// assert_eq!(r.recv().unwrap(), end);
219 /// assert!(Instant::now() > start + ms(100));
220 /// ```
at(when: Instant) -> Receiver<Instant>221 pub fn at(when: Instant) -> Receiver<Instant> {
222 Receiver {
223 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
224 }
225 }
226
227 /// Creates a receiver that never delivers messages.
228 ///
229 /// The channel is bounded with capacity of 0 and never gets disconnected.
230 ///
231 /// # Examples
232 ///
233 /// Using a `never` channel to optionally add a timeout to [`select!`]:
234 ///
235 /// ```
236 /// use std::thread;
237 /// use std::time::Duration;
238 /// use crossbeam_channel::{after, select, never, unbounded};
239 ///
240 /// let (s, r) = unbounded();
241 ///
242 /// thread::spawn(move || {
243 /// thread::sleep(Duration::from_secs(1));
244 /// s.send(1).unwrap();
245 /// });
246 ///
247 /// // Suppose this duration can be a `Some` or a `None`.
248 /// let duration = Some(Duration::from_millis(100));
249 ///
250 /// // Create a channel that times out after the specified duration.
251 /// let timeout = duration
252 /// .map(|d| after(d))
253 /// .unwrap_or(never());
254 ///
255 /// select! {
256 /// recv(r) -> msg => assert_eq!(msg, Ok(1)),
257 /// recv(timeout) -> _ => println!("timed out"),
258 /// }
259 /// ```
never<T>() -> Receiver<T>260 pub fn never<T>() -> Receiver<T> {
261 Receiver {
262 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
263 }
264 }
265
266 /// Creates a receiver that delivers messages periodically.
267 ///
268 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
269 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
270 /// sent.
271 ///
272 /// # Examples
273 ///
274 /// Using a `tick` channel to periodically print elapsed time:
275 ///
276 /// ```
277 /// use std::time::{Duration, Instant};
278 /// use crossbeam_channel::tick;
279 ///
280 /// let start = Instant::now();
281 /// let ticker = tick(Duration::from_millis(100));
282 ///
283 /// for _ in 0..5 {
284 /// ticker.recv().unwrap();
285 /// println!("elapsed: {:?}", start.elapsed());
286 /// }
287 /// ```
288 ///
289 /// When messages get sent:
290 ///
291 /// ```
292 /// use std::thread;
293 /// use std::time::{Duration, Instant};
294 /// use crossbeam_channel::tick;
295 ///
296 /// // Converts a number of milliseconds into a `Duration`.
297 /// let ms = |ms| Duration::from_millis(ms);
298 ///
299 /// // Returns `true` if `a` and `b` are very close `Instant`s.
300 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
301 ///
302 /// let start = Instant::now();
303 /// let r = tick(ms(100));
304 ///
305 /// // This message was sent 100 ms from the start and received 100 ms from the start.
306 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
307 /// assert!(eq(Instant::now(), start + ms(100)));
308 ///
309 /// thread::sleep(ms(500));
310 ///
311 /// // This message was sent 200 ms from the start and received 600 ms from the start.
312 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
313 /// assert!(eq(Instant::now(), start + ms(600)));
314 ///
315 /// // This message was sent 700 ms from the start and received 700 ms from the start.
316 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
317 /// assert!(eq(Instant::now(), start + ms(700)));
318 /// ```
tick(duration: Duration) -> Receiver<Instant>319 pub fn tick(duration: Duration) -> Receiver<Instant> {
320 Receiver {
321 flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
322 }
323 }
324
325 /// The sending side of a channel.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use std::thread;
331 /// use crossbeam_channel::unbounded;
332 ///
333 /// let (s1, r) = unbounded();
334 /// let s2 = s1.clone();
335 ///
336 /// thread::spawn(move || s1.send(1).unwrap());
337 /// thread::spawn(move || s2.send(2).unwrap());
338 ///
339 /// let msg1 = r.recv().unwrap();
340 /// let msg2 = r.recv().unwrap();
341 ///
342 /// assert_eq!(msg1 + msg2, 3);
343 /// ```
344 pub struct Sender<T> {
345 flavor: SenderFlavor<T>,
346 }
347
348 /// Sender flavors.
349 enum SenderFlavor<T> {
350 /// Bounded channel based on a preallocated array.
351 Array(counter::Sender<flavors::array::Channel<T>>),
352
353 /// Unbounded channel implemented as a linked list.
354 List(counter::Sender<flavors::list::Channel<T>>),
355
356 /// Zero-capacity channel.
357 Zero(counter::Sender<flavors::zero::Channel<T>>),
358 }
359
360 unsafe impl<T: Send> Send for Sender<T> {}
361 unsafe impl<T: Send> Sync for Sender<T> {}
362
363 impl<T> UnwindSafe for Sender<T> {}
364 impl<T> RefUnwindSafe for Sender<T> {}
365
366 impl<T> Sender<T> {
367 /// Attempts to send a message into the channel without blocking.
368 ///
369 /// This method will either send a message into the channel immediately or return an error if
370 /// the channel is full or disconnected. The returned error contains the original message.
371 ///
372 /// If called on a zero-capacity channel, this method will send the message only if there
373 /// happens to be a receive operation on the other side of the channel at the same time.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use crossbeam_channel::{bounded, TrySendError};
379 ///
380 /// let (s, r) = bounded(1);
381 ///
382 /// assert_eq!(s.try_send(1), Ok(()));
383 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
384 ///
385 /// drop(r);
386 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
387 /// ```
try_send(&self, msg: T) -> Result<(), TrySendError<T>>388 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
389 match &self.flavor {
390 SenderFlavor::Array(chan) => chan.try_send(msg),
391 SenderFlavor::List(chan) => chan.try_send(msg),
392 SenderFlavor::Zero(chan) => chan.try_send(msg),
393 }
394 }
395
396 /// Blocks the current thread until a message is sent or the channel is disconnected.
397 ///
398 /// If the channel is full and not disconnected, this call will block until the send operation
399 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
400 /// error. The returned error contains the original message.
401 ///
402 /// If called on a zero-capacity channel, this method will wait for a receive operation to
403 /// appear on the other side of the channel.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// use std::thread;
409 /// use std::time::Duration;
410 /// use crossbeam_channel::{bounded, SendError};
411 ///
412 /// let (s, r) = bounded(1);
413 /// assert_eq!(s.send(1), Ok(()));
414 ///
415 /// thread::spawn(move || {
416 /// assert_eq!(r.recv(), Ok(1));
417 /// thread::sleep(Duration::from_secs(1));
418 /// drop(r);
419 /// });
420 ///
421 /// assert_eq!(s.send(2), Ok(()));
422 /// assert_eq!(s.send(3), Err(SendError(3)));
423 /// ```
send(&self, msg: T) -> Result<(), SendError<T>>424 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
425 match &self.flavor {
426 SenderFlavor::Array(chan) => chan.send(msg, None),
427 SenderFlavor::List(chan) => chan.send(msg, None),
428 SenderFlavor::Zero(chan) => chan.send(msg, None),
429 }
430 .map_err(|err| match err {
431 SendTimeoutError::Disconnected(msg) => SendError(msg),
432 SendTimeoutError::Timeout(_) => unreachable!(),
433 })
434 }
435
436 /// Waits for a message to be sent into the channel, but only for a limited time.
437 ///
438 /// If the channel is full and not disconnected, this call will block until the send operation
439 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
440 /// wake up and return an error. The returned error contains the original message.
441 ///
442 /// If called on a zero-capacity channel, this method will wait for a receive operation to
443 /// appear on the other side of the channel.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use std::thread;
449 /// use std::time::Duration;
450 /// use crossbeam_channel::{bounded, SendTimeoutError};
451 ///
452 /// let (s, r) = bounded(0);
453 ///
454 /// thread::spawn(move || {
455 /// thread::sleep(Duration::from_secs(1));
456 /// assert_eq!(r.recv(), Ok(2));
457 /// drop(r);
458 /// });
459 ///
460 /// assert_eq!(
461 /// s.send_timeout(1, Duration::from_millis(500)),
462 /// Err(SendTimeoutError::Timeout(1)),
463 /// );
464 /// assert_eq!(
465 /// s.send_timeout(2, Duration::from_secs(1)),
466 /// Ok(()),
467 /// );
468 /// assert_eq!(
469 /// s.send_timeout(3, Duration::from_millis(500)),
470 /// Err(SendTimeoutError::Disconnected(3)),
471 /// );
472 /// ```
send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>473 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
474 self.send_deadline(msg, Instant::now() + timeout)
475 }
476
477 /// Waits for a message to be sent into the channel, but only until a given deadline.
478 ///
479 /// If the channel is full and not disconnected, this call will block until the send operation
480 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
481 /// wake up and return an error. The returned error contains the original message.
482 ///
483 /// If called on a zero-capacity channel, this method will wait for a receive operation to
484 /// appear on the other side of the channel.
485 ///
486 /// # Examples
487 ///
488 /// ```
489 /// use std::thread;
490 /// use std::time::{Duration, Instant};
491 /// use crossbeam_channel::{bounded, SendTimeoutError};
492 ///
493 /// let (s, r) = bounded(0);
494 ///
495 /// thread::spawn(move || {
496 /// thread::sleep(Duration::from_secs(1));
497 /// assert_eq!(r.recv(), Ok(2));
498 /// drop(r);
499 /// });
500 ///
501 /// let now = Instant::now();
502 ///
503 /// assert_eq!(
504 /// s.send_deadline(1, now + Duration::from_millis(500)),
505 /// Err(SendTimeoutError::Timeout(1)),
506 /// );
507 /// assert_eq!(
508 /// s.send_deadline(2, now + Duration::from_millis(1500)),
509 /// Ok(()),
510 /// );
511 /// assert_eq!(
512 /// s.send_deadline(3, now + Duration::from_millis(2000)),
513 /// Err(SendTimeoutError::Disconnected(3)),
514 /// );
515 /// ```
send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>>516 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
517 match &self.flavor {
518 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
519 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
520 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
521 }
522 }
523
524 /// Returns `true` if the channel is empty.
525 ///
526 /// Note: Zero-capacity channels are always empty.
527 ///
528 /// # Examples
529 ///
530 /// ```
531 /// use crossbeam_channel::unbounded;
532 ///
533 /// let (s, r) = unbounded();
534 /// assert!(s.is_empty());
535 ///
536 /// s.send(0).unwrap();
537 /// assert!(!s.is_empty());
538 /// ```
is_empty(&self) -> bool539 pub fn is_empty(&self) -> bool {
540 match &self.flavor {
541 SenderFlavor::Array(chan) => chan.is_empty(),
542 SenderFlavor::List(chan) => chan.is_empty(),
543 SenderFlavor::Zero(chan) => chan.is_empty(),
544 }
545 }
546
547 /// Returns `true` if the channel is full.
548 ///
549 /// Note: Zero-capacity channels are always full.
550 ///
551 /// # Examples
552 ///
553 /// ```
554 /// use crossbeam_channel::bounded;
555 ///
556 /// let (s, r) = bounded(1);
557 ///
558 /// assert!(!s.is_full());
559 /// s.send(0).unwrap();
560 /// assert!(s.is_full());
561 /// ```
is_full(&self) -> bool562 pub fn is_full(&self) -> bool {
563 match &self.flavor {
564 SenderFlavor::Array(chan) => chan.is_full(),
565 SenderFlavor::List(chan) => chan.is_full(),
566 SenderFlavor::Zero(chan) => chan.is_full(),
567 }
568 }
569
570 /// Returns the number of messages in the channel.
571 ///
572 /// # Examples
573 ///
574 /// ```
575 /// use crossbeam_channel::unbounded;
576 ///
577 /// let (s, r) = unbounded();
578 /// assert_eq!(s.len(), 0);
579 ///
580 /// s.send(1).unwrap();
581 /// s.send(2).unwrap();
582 /// assert_eq!(s.len(), 2);
583 /// ```
len(&self) -> usize584 pub fn len(&self) -> usize {
585 match &self.flavor {
586 SenderFlavor::Array(chan) => chan.len(),
587 SenderFlavor::List(chan) => chan.len(),
588 SenderFlavor::Zero(chan) => chan.len(),
589 }
590 }
591
592 /// If the channel is bounded, returns its capacity.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use crossbeam_channel::{bounded, unbounded};
598 ///
599 /// let (s, _) = unbounded::<i32>();
600 /// assert_eq!(s.capacity(), None);
601 ///
602 /// let (s, _) = bounded::<i32>(5);
603 /// assert_eq!(s.capacity(), Some(5));
604 ///
605 /// let (s, _) = bounded::<i32>(0);
606 /// assert_eq!(s.capacity(), Some(0));
607 /// ```
capacity(&self) -> Option<usize>608 pub fn capacity(&self) -> Option<usize> {
609 match &self.flavor {
610 SenderFlavor::Array(chan) => chan.capacity(),
611 SenderFlavor::List(chan) => chan.capacity(),
612 SenderFlavor::Zero(chan) => chan.capacity(),
613 }
614 }
615
616 /// Returns `true` if senders belong to the same channel.
617 ///
618 /// # Examples
619 ///
620 /// ```rust
621 /// use crossbeam_channel::unbounded;
622 ///
623 /// let (s, _) = unbounded::<usize>();
624 ///
625 /// let s2 = s.clone();
626 /// assert!(s.same_channel(&s2));
627 ///
628 /// let (s3, _) = unbounded();
629 /// assert!(!s.same_channel(&s3));
630 /// ```
same_channel(&self, other: &Sender<T>) -> bool631 pub fn same_channel(&self, other: &Sender<T>) -> bool {
632 match (&self.flavor, &other.flavor) {
633 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
634 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
635 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
636 _ => false,
637 }
638 }
639 }
640
641 impl<T> Drop for Sender<T> {
drop(&mut self)642 fn drop(&mut self) {
643 unsafe {
644 match &self.flavor {
645 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
646 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
647 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
648 }
649 }
650 }
651 }
652
653 impl<T> Clone for Sender<T> {
clone(&self) -> Self654 fn clone(&self) -> Self {
655 let flavor = match &self.flavor {
656 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
657 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
658 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
659 };
660
661 Sender { flavor }
662 }
663 }
664
665 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 f.pad("Sender { .. }")
668 }
669 }
670
671 /// The receiving side of a channel.
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use std::thread;
677 /// use std::time::Duration;
678 /// use crossbeam_channel::unbounded;
679 ///
680 /// let (s, r) = unbounded();
681 ///
682 /// thread::spawn(move || {
683 /// let _ = s.send(1);
684 /// thread::sleep(Duration::from_secs(1));
685 /// let _ = s.send(2);
686 /// });
687 ///
688 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
689 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
690 /// ```
691 pub struct Receiver<T> {
692 flavor: ReceiverFlavor<T>,
693 }
694
695 /// Receiver flavors.
696 enum ReceiverFlavor<T> {
697 /// Bounded channel based on a preallocated array.
698 Array(counter::Receiver<flavors::array::Channel<T>>),
699
700 /// Unbounded channel implemented as a linked list.
701 List(counter::Receiver<flavors::list::Channel<T>>),
702
703 /// Zero-capacity channel.
704 Zero(counter::Receiver<flavors::zero::Channel<T>>),
705
706 /// The after flavor.
707 At(Arc<flavors::at::Channel>),
708
709 /// The tick flavor.
710 Tick(Arc<flavors::tick::Channel>),
711
712 /// The never flavor.
713 Never(flavors::never::Channel<T>),
714 }
715
716 unsafe impl<T: Send> Send for Receiver<T> {}
717 unsafe impl<T: Send> Sync for Receiver<T> {}
718
719 impl<T> UnwindSafe for Receiver<T> {}
720 impl<T> RefUnwindSafe for Receiver<T> {}
721
722 impl<T> Receiver<T> {
723 /// Attempts to receive a message from the channel without blocking.
724 ///
725 /// This method will either receive a message from the channel immediately or return an error
726 /// if the channel is empty.
727 ///
728 /// If called on a zero-capacity channel, this method will receive a message only if there
729 /// happens to be a send operation on the other side of the channel at the same time.
730 ///
731 /// # Examples
732 ///
733 /// ```
734 /// use crossbeam_channel::{unbounded, TryRecvError};
735 ///
736 /// let (s, r) = unbounded();
737 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
738 ///
739 /// s.send(5).unwrap();
740 /// drop(s);
741 ///
742 /// assert_eq!(r.try_recv(), Ok(5));
743 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
744 /// ```
try_recv(&self) -> Result<T, TryRecvError>745 pub fn try_recv(&self) -> Result<T, TryRecvError> {
746 match &self.flavor {
747 ReceiverFlavor::Array(chan) => chan.try_recv(),
748 ReceiverFlavor::List(chan) => chan.try_recv(),
749 ReceiverFlavor::Zero(chan) => chan.try_recv(),
750 ReceiverFlavor::At(chan) => {
751 let msg = chan.try_recv();
752 unsafe {
753 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
754 &msg,
755 )
756 }
757 }
758 ReceiverFlavor::Tick(chan) => {
759 let msg = chan.try_recv();
760 unsafe {
761 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
762 &msg,
763 )
764 }
765 }
766 ReceiverFlavor::Never(chan) => chan.try_recv(),
767 }
768 }
769
770 /// Blocks the current thread until a message is received or the channel is empty and
771 /// disconnected.
772 ///
773 /// If the channel is empty and not disconnected, this call will block until the receive
774 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
775 /// wake up and return an error.
776 ///
777 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
778 /// on the other side of the channel.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// use std::thread;
784 /// use std::time::Duration;
785 /// use crossbeam_channel::{unbounded, RecvError};
786 ///
787 /// let (s, r) = unbounded();
788 ///
789 /// thread::spawn(move || {
790 /// thread::sleep(Duration::from_secs(1));
791 /// s.send(5).unwrap();
792 /// drop(s);
793 /// });
794 ///
795 /// assert_eq!(r.recv(), Ok(5));
796 /// assert_eq!(r.recv(), Err(RecvError));
797 /// ```
recv(&self) -> Result<T, RecvError>798 pub fn recv(&self) -> Result<T, RecvError> {
799 match &self.flavor {
800 ReceiverFlavor::Array(chan) => chan.recv(None),
801 ReceiverFlavor::List(chan) => chan.recv(None),
802 ReceiverFlavor::Zero(chan) => chan.recv(None),
803 ReceiverFlavor::At(chan) => {
804 let msg = chan.recv(None);
805 unsafe {
806 mem::transmute_copy::<
807 Result<Instant, RecvTimeoutError>,
808 Result<T, RecvTimeoutError>,
809 >(&msg)
810 }
811 }
812 ReceiverFlavor::Tick(chan) => {
813 let msg = chan.recv(None);
814 unsafe {
815 mem::transmute_copy::<
816 Result<Instant, RecvTimeoutError>,
817 Result<T, RecvTimeoutError>,
818 >(&msg)
819 }
820 }
821 ReceiverFlavor::Never(chan) => chan.recv(None),
822 }
823 .map_err(|_| RecvError)
824 }
825
826 /// Waits for a message to be received from the channel, but only for a limited time.
827 ///
828 /// If the channel is empty and not disconnected, this call will block until the receive
829 /// operation can proceed or the operation times out. If the channel is empty and becomes
830 /// disconnected, this call will wake up and return an error.
831 ///
832 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
833 /// on the other side of the channel.
834 ///
835 /// # Examples
836 ///
837 /// ```
838 /// use std::thread;
839 /// use std::time::Duration;
840 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
841 ///
842 /// let (s, r) = unbounded();
843 ///
844 /// thread::spawn(move || {
845 /// thread::sleep(Duration::from_secs(1));
846 /// s.send(5).unwrap();
847 /// drop(s);
848 /// });
849 ///
850 /// assert_eq!(
851 /// r.recv_timeout(Duration::from_millis(500)),
852 /// Err(RecvTimeoutError::Timeout),
853 /// );
854 /// assert_eq!(
855 /// r.recv_timeout(Duration::from_secs(1)),
856 /// Ok(5),
857 /// );
858 /// assert_eq!(
859 /// r.recv_timeout(Duration::from_secs(1)),
860 /// Err(RecvTimeoutError::Disconnected),
861 /// );
862 /// ```
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>863 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
864 self.recv_deadline(Instant::now() + timeout)
865 }
866
867 /// Waits for a message to be received from the channel, but only before a given deadline.
868 ///
869 /// If the channel is empty and not disconnected, this call will block until the receive
870 /// operation can proceed or the operation times out. If the channel is empty and becomes
871 /// disconnected, this call will wake up and return an error.
872 ///
873 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
874 /// on the other side of the channel.
875 ///
876 /// # Examples
877 ///
878 /// ```
879 /// use std::thread;
880 /// use std::time::{Instant, Duration};
881 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
882 ///
883 /// let (s, r) = unbounded();
884 ///
885 /// thread::spawn(move || {
886 /// thread::sleep(Duration::from_secs(1));
887 /// s.send(5).unwrap();
888 /// drop(s);
889 /// });
890 ///
891 /// let now = Instant::now();
892 ///
893 /// assert_eq!(
894 /// r.recv_deadline(now + Duration::from_millis(500)),
895 /// Err(RecvTimeoutError::Timeout),
896 /// );
897 /// assert_eq!(
898 /// r.recv_deadline(now + Duration::from_millis(1500)),
899 /// Ok(5),
900 /// );
901 /// assert_eq!(
902 /// r.recv_deadline(now + Duration::from_secs(5)),
903 /// Err(RecvTimeoutError::Disconnected),
904 /// );
905 /// ```
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>906 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
907 match &self.flavor {
908 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
909 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
910 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
911 ReceiverFlavor::At(chan) => {
912 let msg = chan.recv(Some(deadline));
913 unsafe {
914 mem::transmute_copy::<
915 Result<Instant, RecvTimeoutError>,
916 Result<T, RecvTimeoutError>,
917 >(&msg)
918 }
919 }
920 ReceiverFlavor::Tick(chan) => {
921 let msg = chan.recv(Some(deadline));
922 unsafe {
923 mem::transmute_copy::<
924 Result<Instant, RecvTimeoutError>,
925 Result<T, RecvTimeoutError>,
926 >(&msg)
927 }
928 }
929 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
930 }
931 }
932
933 /// Returns `true` if the channel is empty.
934 ///
935 /// Note: Zero-capacity channels are always empty.
936 ///
937 /// # Examples
938 ///
939 /// ```
940 /// use crossbeam_channel::unbounded;
941 ///
942 /// let (s, r) = unbounded();
943 ///
944 /// assert!(r.is_empty());
945 /// s.send(0).unwrap();
946 /// assert!(!r.is_empty());
947 /// ```
is_empty(&self) -> bool948 pub fn is_empty(&self) -> bool {
949 match &self.flavor {
950 ReceiverFlavor::Array(chan) => chan.is_empty(),
951 ReceiverFlavor::List(chan) => chan.is_empty(),
952 ReceiverFlavor::Zero(chan) => chan.is_empty(),
953 ReceiverFlavor::At(chan) => chan.is_empty(),
954 ReceiverFlavor::Tick(chan) => chan.is_empty(),
955 ReceiverFlavor::Never(chan) => chan.is_empty(),
956 }
957 }
958
959 /// Returns `true` if the channel is full.
960 ///
961 /// Note: Zero-capacity channels are always full.
962 ///
963 /// # Examples
964 ///
965 /// ```
966 /// use crossbeam_channel::bounded;
967 ///
968 /// let (s, r) = bounded(1);
969 ///
970 /// assert!(!r.is_full());
971 /// s.send(0).unwrap();
972 /// assert!(r.is_full());
973 /// ```
is_full(&self) -> bool974 pub fn is_full(&self) -> bool {
975 match &self.flavor {
976 ReceiverFlavor::Array(chan) => chan.is_full(),
977 ReceiverFlavor::List(chan) => chan.is_full(),
978 ReceiverFlavor::Zero(chan) => chan.is_full(),
979 ReceiverFlavor::At(chan) => chan.is_full(),
980 ReceiverFlavor::Tick(chan) => chan.is_full(),
981 ReceiverFlavor::Never(chan) => chan.is_full(),
982 }
983 }
984
985 /// Returns the number of messages in the channel.
986 ///
987 /// # Examples
988 ///
989 /// ```
990 /// use crossbeam_channel::unbounded;
991 ///
992 /// let (s, r) = unbounded();
993 /// assert_eq!(r.len(), 0);
994 ///
995 /// s.send(1).unwrap();
996 /// s.send(2).unwrap();
997 /// assert_eq!(r.len(), 2);
998 /// ```
len(&self) -> usize999 pub fn len(&self) -> usize {
1000 match &self.flavor {
1001 ReceiverFlavor::Array(chan) => chan.len(),
1002 ReceiverFlavor::List(chan) => chan.len(),
1003 ReceiverFlavor::Zero(chan) => chan.len(),
1004 ReceiverFlavor::At(chan) => chan.len(),
1005 ReceiverFlavor::Tick(chan) => chan.len(),
1006 ReceiverFlavor::Never(chan) => chan.len(),
1007 }
1008 }
1009
1010 /// If the channel is bounded, returns its capacity.
1011 ///
1012 /// # Examples
1013 ///
1014 /// ```
1015 /// use crossbeam_channel::{bounded, unbounded};
1016 ///
1017 /// let (_, r) = unbounded::<i32>();
1018 /// assert_eq!(r.capacity(), None);
1019 ///
1020 /// let (_, r) = bounded::<i32>(5);
1021 /// assert_eq!(r.capacity(), Some(5));
1022 ///
1023 /// let (_, r) = bounded::<i32>(0);
1024 /// assert_eq!(r.capacity(), Some(0));
1025 /// ```
capacity(&self) -> Option<usize>1026 pub fn capacity(&self) -> Option<usize> {
1027 match &self.flavor {
1028 ReceiverFlavor::Array(chan) => chan.capacity(),
1029 ReceiverFlavor::List(chan) => chan.capacity(),
1030 ReceiverFlavor::Zero(chan) => chan.capacity(),
1031 ReceiverFlavor::At(chan) => chan.capacity(),
1032 ReceiverFlavor::Tick(chan) => chan.capacity(),
1033 ReceiverFlavor::Never(chan) => chan.capacity(),
1034 }
1035 }
1036
1037 /// A blocking iterator over messages in the channel.
1038 ///
1039 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1040 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1041 ///
1042 /// [`next`]: Iterator::next
1043 ///
1044 /// # Examples
1045 ///
1046 /// ```
1047 /// use std::thread;
1048 /// use crossbeam_channel::unbounded;
1049 ///
1050 /// let (s, r) = unbounded();
1051 ///
1052 /// thread::spawn(move || {
1053 /// s.send(1).unwrap();
1054 /// s.send(2).unwrap();
1055 /// s.send(3).unwrap();
1056 /// drop(s); // Disconnect the channel.
1057 /// });
1058 ///
1059 /// // Collect all messages from the channel.
1060 /// // Note that the call to `collect` blocks until the sender is dropped.
1061 /// let v: Vec<_> = r.iter().collect();
1062 ///
1063 /// assert_eq!(v, [1, 2, 3]);
1064 /// ```
iter(&self) -> Iter<'_, T>1065 pub fn iter(&self) -> Iter<'_, T> {
1066 Iter { receiver: self }
1067 }
1068
1069 /// A non-blocking iterator over messages in the channel.
1070 ///
1071 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1072 /// never blocks waiting for the next message.
1073 ///
1074 /// [`next`]: Iterator::next
1075 ///
1076 /// # Examples
1077 ///
1078 /// ```
1079 /// use std::thread;
1080 /// use std::time::Duration;
1081 /// use crossbeam_channel::unbounded;
1082 ///
1083 /// let (s, r) = unbounded::<i32>();
1084 ///
1085 /// thread::spawn(move || {
1086 /// s.send(1).unwrap();
1087 /// thread::sleep(Duration::from_secs(1));
1088 /// s.send(2).unwrap();
1089 /// thread::sleep(Duration::from_secs(2));
1090 /// s.send(3).unwrap();
1091 /// });
1092 ///
1093 /// thread::sleep(Duration::from_secs(2));
1094 ///
1095 /// // Collect all messages from the channel without blocking.
1096 /// // The third message hasn't been sent yet so we'll collect only the first two.
1097 /// let v: Vec<_> = r.try_iter().collect();
1098 ///
1099 /// assert_eq!(v, [1, 2]);
1100 /// ```
try_iter(&self) -> TryIter<'_, T>1101 pub fn try_iter(&self) -> TryIter<'_, T> {
1102 TryIter { receiver: self }
1103 }
1104
1105 /// Returns `true` if receivers belong to the same channel.
1106 ///
1107 /// # Examples
1108 ///
1109 /// ```rust
1110 /// use crossbeam_channel::unbounded;
1111 ///
1112 /// let (_, r) = unbounded::<usize>();
1113 ///
1114 /// let r2 = r.clone();
1115 /// assert!(r.same_channel(&r2));
1116 ///
1117 /// let (_, r3) = unbounded();
1118 /// assert!(!r.same_channel(&r3));
1119 /// ```
same_channel(&self, other: &Receiver<T>) -> bool1120 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1121 match (&self.flavor, &other.flavor) {
1122 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1123 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1124 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1125 (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1126 (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1127 (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1128 _ => false,
1129 }
1130 }
1131 }
1132
1133 impl<T> Drop for Receiver<T> {
drop(&mut self)1134 fn drop(&mut self) {
1135 unsafe {
1136 match &self.flavor {
1137 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1138 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1139 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1140 ReceiverFlavor::At(_) => {}
1141 ReceiverFlavor::Tick(_) => {}
1142 ReceiverFlavor::Never(_) => {}
1143 }
1144 }
1145 }
1146 }
1147
1148 impl<T> Clone for Receiver<T> {
clone(&self) -> Self1149 fn clone(&self) -> Self {
1150 let flavor = match &self.flavor {
1151 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1152 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1153 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1154 ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1155 ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1156 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1157 };
1158
1159 Receiver { flavor }
1160 }
1161 }
1162
1163 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165 f.pad("Receiver { .. }")
1166 }
1167 }
1168
1169 impl<'a, T> IntoIterator for &'a Receiver<T> {
1170 type Item = T;
1171 type IntoIter = Iter<'a, T>;
1172
into_iter(self) -> Self::IntoIter1173 fn into_iter(self) -> Self::IntoIter {
1174 self.iter()
1175 }
1176 }
1177
1178 impl<T> IntoIterator for Receiver<T> {
1179 type Item = T;
1180 type IntoIter = IntoIter<T>;
1181
into_iter(self) -> Self::IntoIter1182 fn into_iter(self) -> Self::IntoIter {
1183 IntoIter { receiver: self }
1184 }
1185 }
1186
1187 /// A blocking iterator over messages in a channel.
1188 ///
1189 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1190 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1191 ///
1192 /// [`next`]: Iterator::next
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```
1197 /// use std::thread;
1198 /// use crossbeam_channel::unbounded;
1199 ///
1200 /// let (s, r) = unbounded();
1201 ///
1202 /// thread::spawn(move || {
1203 /// s.send(1).unwrap();
1204 /// s.send(2).unwrap();
1205 /// s.send(3).unwrap();
1206 /// drop(s); // Disconnect the channel.
1207 /// });
1208 ///
1209 /// // Collect all messages from the channel.
1210 /// // Note that the call to `collect` blocks until the sender is dropped.
1211 /// let v: Vec<_> = r.iter().collect();
1212 ///
1213 /// assert_eq!(v, [1, 2, 3]);
1214 /// ```
1215 pub struct Iter<'a, T> {
1216 receiver: &'a Receiver<T>,
1217 }
1218
1219 impl<T> FusedIterator for Iter<'_, T> {}
1220
1221 impl<T> Iterator for Iter<'_, T> {
1222 type Item = T;
1223
next(&mut self) -> Option<Self::Item>1224 fn next(&mut self) -> Option<Self::Item> {
1225 self.receiver.recv().ok()
1226 }
1227 }
1228
1229 impl<T> fmt::Debug for Iter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1231 f.pad("Iter { .. }")
1232 }
1233 }
1234
1235 /// A non-blocking iterator over messages in a channel.
1236 ///
1237 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1238 /// never blocks waiting for the next message.
1239 ///
1240 /// [`next`]: Iterator::next
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```
1245 /// use std::thread;
1246 /// use std::time::Duration;
1247 /// use crossbeam_channel::unbounded;
1248 ///
1249 /// let (s, r) = unbounded::<i32>();
1250 ///
1251 /// thread::spawn(move || {
1252 /// s.send(1).unwrap();
1253 /// thread::sleep(Duration::from_secs(1));
1254 /// s.send(2).unwrap();
1255 /// thread::sleep(Duration::from_secs(2));
1256 /// s.send(3).unwrap();
1257 /// });
1258 ///
1259 /// thread::sleep(Duration::from_secs(2));
1260 ///
1261 /// // Collect all messages from the channel without blocking.
1262 /// // The third message hasn't been sent yet so we'll collect only the first two.
1263 /// let v: Vec<_> = r.try_iter().collect();
1264 ///
1265 /// assert_eq!(v, [1, 2]);
1266 /// ```
1267 pub struct TryIter<'a, T> {
1268 receiver: &'a Receiver<T>,
1269 }
1270
1271 impl<T> Iterator for TryIter<'_, T> {
1272 type Item = T;
1273
next(&mut self) -> Option<Self::Item>1274 fn next(&mut self) -> Option<Self::Item> {
1275 self.receiver.try_recv().ok()
1276 }
1277 }
1278
1279 impl<T> fmt::Debug for TryIter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1281 f.pad("TryIter { .. }")
1282 }
1283 }
1284
1285 /// A blocking iterator over messages in a channel.
1286 ///
1287 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1288 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1289 ///
1290 /// [`next`]: Iterator::next
1291 ///
1292 /// # Examples
1293 ///
1294 /// ```
1295 /// use std::thread;
1296 /// use crossbeam_channel::unbounded;
1297 ///
1298 /// let (s, r) = unbounded();
1299 ///
1300 /// thread::spawn(move || {
1301 /// s.send(1).unwrap();
1302 /// s.send(2).unwrap();
1303 /// s.send(3).unwrap();
1304 /// drop(s); // Disconnect the channel.
1305 /// });
1306 ///
1307 /// // Collect all messages from the channel.
1308 /// // Note that the call to `collect` blocks until the sender is dropped.
1309 /// let v: Vec<_> = r.into_iter().collect();
1310 ///
1311 /// assert_eq!(v, [1, 2, 3]);
1312 /// ```
1313 pub struct IntoIter<T> {
1314 receiver: Receiver<T>,
1315 }
1316
1317 impl<T> FusedIterator for IntoIter<T> {}
1318
1319 impl<T> Iterator for IntoIter<T> {
1320 type Item = T;
1321
next(&mut self) -> Option<Self::Item>1322 fn next(&mut self) -> Option<Self::Item> {
1323 self.receiver.recv().ok()
1324 }
1325 }
1326
1327 impl<T> fmt::Debug for IntoIter<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329 f.pad("IntoIter { .. }")
1330 }
1331 }
1332
1333 impl<T> SelectHandle for Sender<T> {
try_select(&self, token: &mut Token) -> bool1334 fn try_select(&self, token: &mut Token) -> bool {
1335 match &self.flavor {
1336 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1337 SenderFlavor::List(chan) => chan.sender().try_select(token),
1338 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1339 }
1340 }
1341
deadline(&self) -> Option<Instant>1342 fn deadline(&self) -> Option<Instant> {
1343 None
1344 }
1345
register(&self, oper: Operation, cx: &Context) -> bool1346 fn register(&self, oper: Operation, cx: &Context) -> bool {
1347 match &self.flavor {
1348 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1349 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1350 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1351 }
1352 }
1353
unregister(&self, oper: Operation)1354 fn unregister(&self, oper: Operation) {
1355 match &self.flavor {
1356 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1357 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1358 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1359 }
1360 }
1361
accept(&self, token: &mut Token, cx: &Context) -> bool1362 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1363 match &self.flavor {
1364 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1365 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1366 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1367 }
1368 }
1369
is_ready(&self) -> bool1370 fn is_ready(&self) -> bool {
1371 match &self.flavor {
1372 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1373 SenderFlavor::List(chan) => chan.sender().is_ready(),
1374 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1375 }
1376 }
1377
watch(&self, oper: Operation, cx: &Context) -> bool1378 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1379 match &self.flavor {
1380 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1381 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1382 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1383 }
1384 }
1385
unwatch(&self, oper: Operation)1386 fn unwatch(&self, oper: Operation) {
1387 match &self.flavor {
1388 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1389 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1390 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1391 }
1392 }
1393 }
1394
1395 impl<T> SelectHandle for Receiver<T> {
try_select(&self, token: &mut Token) -> bool1396 fn try_select(&self, token: &mut Token) -> bool {
1397 match &self.flavor {
1398 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1399 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1400 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1401 ReceiverFlavor::At(chan) => chan.try_select(token),
1402 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1403 ReceiverFlavor::Never(chan) => chan.try_select(token),
1404 }
1405 }
1406
deadline(&self) -> Option<Instant>1407 fn deadline(&self) -> Option<Instant> {
1408 match &self.flavor {
1409 ReceiverFlavor::Array(_) => None,
1410 ReceiverFlavor::List(_) => None,
1411 ReceiverFlavor::Zero(_) => None,
1412 ReceiverFlavor::At(chan) => chan.deadline(),
1413 ReceiverFlavor::Tick(chan) => chan.deadline(),
1414 ReceiverFlavor::Never(chan) => chan.deadline(),
1415 }
1416 }
1417
register(&self, oper: Operation, cx: &Context) -> bool1418 fn register(&self, oper: Operation, cx: &Context) -> bool {
1419 match &self.flavor {
1420 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1421 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1422 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1423 ReceiverFlavor::At(chan) => chan.register(oper, cx),
1424 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1425 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1426 }
1427 }
1428
unregister(&self, oper: Operation)1429 fn unregister(&self, oper: Operation) {
1430 match &self.flavor {
1431 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1432 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1433 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1434 ReceiverFlavor::At(chan) => chan.unregister(oper),
1435 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1436 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1437 }
1438 }
1439
accept(&self, token: &mut Token, cx: &Context) -> bool1440 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1441 match &self.flavor {
1442 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1443 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1444 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1445 ReceiverFlavor::At(chan) => chan.accept(token, cx),
1446 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1447 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1448 }
1449 }
1450
is_ready(&self) -> bool1451 fn is_ready(&self) -> bool {
1452 match &self.flavor {
1453 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1454 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1455 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1456 ReceiverFlavor::At(chan) => chan.is_ready(),
1457 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1458 ReceiverFlavor::Never(chan) => chan.is_ready(),
1459 }
1460 }
1461
watch(&self, oper: Operation, cx: &Context) -> bool1462 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1463 match &self.flavor {
1464 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1465 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1466 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1467 ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1468 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1469 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1470 }
1471 }
1472
unwatch(&self, oper: Operation)1473 fn unwatch(&self, oper: Operation) {
1474 match &self.flavor {
1475 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1476 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1477 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1478 ReceiverFlavor::At(chan) => chan.unwatch(oper),
1479 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1480 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1481 }
1482 }
1483 }
1484
1485 /// Writes a message into the channel.
write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T>1486 pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1487 match &s.flavor {
1488 SenderFlavor::Array(chan) => chan.write(token, msg),
1489 SenderFlavor::List(chan) => chan.write(token, msg),
1490 SenderFlavor::Zero(chan) => chan.write(token, msg),
1491 }
1492 }
1493
1494 /// Reads a message from the channel.
read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()>1495 pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1496 match &r.flavor {
1497 ReceiverFlavor::Array(chan) => chan.read(token),
1498 ReceiverFlavor::List(chan) => chan.read(token),
1499 ReceiverFlavor::Zero(chan) => chan.read(token),
1500 ReceiverFlavor::At(chan) => {
1501 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1502 }
1503 ReceiverFlavor::Tick(chan) => {
1504 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1505 }
1506 ReceiverFlavor::Never(chan) => chan.read(token),
1507 }
1508 }
1509