1 //! The channel interface.
2 
3 use std::fmt;
4 use std::iter::FusedIterator;
5 use std::mem;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9 
10 use crate::context::Context;
11 use crate::counter;
12 use crate::err::{
13     RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14 };
15 use crate::flavors;
16 use crate::select::{Operation, SelectHandle, Token};
17 
18 /// Creates a channel of unbounded capacity.
19 ///
20 /// This channel has a growable buffer that can hold any number of messages at a time.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use std::thread;
26 /// use crossbeam_channel::unbounded;
27 ///
28 /// let (s, r) = unbounded();
29 ///
30 /// // Computes the n-th Fibonacci number.
31 /// fn fib(n: i32) -> i32 {
32 ///     if n <= 1 {
33 ///         n
34 ///     } else {
35 ///         fib(n - 1) + fib(n - 2)
36 ///     }
37 /// }
38 ///
39 /// // Spawn an asynchronous computation.
40 /// thread::spawn(move || s.send(fib(20)).unwrap());
41 ///
42 /// // Print the result of the computation.
43 /// println!("{}", r.recv().unwrap());
44 /// ```
unbounded<T>() -> (Sender<T>, Receiver<T>)45 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46     let (s, r) = counter::new(flavors::list::Channel::new());
47     let s = Sender {
48         flavor: SenderFlavor::List(s),
49     };
50     let r = Receiver {
51         flavor: ReceiverFlavor::List(r),
52     };
53     (s, r)
54 }
55 
56 /// Creates a channel of bounded capacity.
57 ///
58 /// This channel has a buffer that can hold at most `cap` messages at a time.
59 ///
60 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61 /// receive operations must appear at the same time in order to pair up and pass the message over.
62 ///
63 /// # Examples
64 ///
65 /// A channel of capacity 1:
66 ///
67 /// ```
68 /// use std::thread;
69 /// use std::time::Duration;
70 /// use crossbeam_channel::bounded;
71 ///
72 /// let (s, r) = bounded(1);
73 ///
74 /// // This call returns immediately because there is enough space in the channel.
75 /// s.send(1).unwrap();
76 ///
77 /// thread::spawn(move || {
78 ///     // This call blocks the current thread because the channel is full.
79 ///     // It will be able to complete only after the first message is received.
80 ///     s.send(2).unwrap();
81 /// });
82 ///
83 /// thread::sleep(Duration::from_secs(1));
84 /// assert_eq!(r.recv(), Ok(1));
85 /// assert_eq!(r.recv(), Ok(2));
86 /// ```
87 ///
88 /// A zero-capacity channel:
89 ///
90 /// ```
91 /// use std::thread;
92 /// use std::time::Duration;
93 /// use crossbeam_channel::bounded;
94 ///
95 /// let (s, r) = bounded(0);
96 ///
97 /// thread::spawn(move || {
98 ///     // This call blocks the current thread until a receive operation appears
99 ///     // on the other side of the channel.
100 ///     s.send(1).unwrap();
101 /// });
102 ///
103 /// thread::sleep(Duration::from_secs(1));
104 /// assert_eq!(r.recv(), Ok(1));
105 /// ```
bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)106 pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107     if cap == 0 {
108         let (s, r) = counter::new(flavors::zero::Channel::new());
109         let s = Sender {
110             flavor: SenderFlavor::Zero(s),
111         };
112         let r = Receiver {
113             flavor: ReceiverFlavor::Zero(r),
114         };
115         (s, r)
116     } else {
117         let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118         let s = Sender {
119             flavor: SenderFlavor::Array(s),
120         };
121         let r = Receiver {
122             flavor: ReceiverFlavor::Array(r),
123         };
124         (s, r)
125     }
126 }
127 
128 /// Creates a receiver that delivers a message after a certain duration of time.
129 ///
130 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
132 /// sent.
133 ///
134 /// # Examples
135 ///
136 /// Using an `after` channel for timeouts:
137 ///
138 /// ```
139 /// use std::time::Duration;
140 /// use crossbeam_channel::{after, select, unbounded};
141 ///
142 /// let (s, r) = unbounded::<i32>();
143 /// let timeout = Duration::from_millis(100);
144 ///
145 /// select! {
146 ///     recv(r) -> msg => println!("received {:?}", msg),
147 ///     recv(after(timeout)) -> _ => println!("timed out"),
148 /// }
149 /// ```
150 ///
151 /// When the message gets sent:
152 ///
153 /// ```
154 /// use std::thread;
155 /// use std::time::{Duration, Instant};
156 /// use crossbeam_channel::after;
157 ///
158 /// // Converts a number of milliseconds into a `Duration`.
159 /// let ms = |ms| Duration::from_millis(ms);
160 ///
161 /// // Returns `true` if `a` and `b` are very close `Instant`s.
162 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
163 ///
164 /// let start = Instant::now();
165 /// let r = after(ms(100));
166 ///
167 /// thread::sleep(ms(500));
168 ///
169 /// // This message was sent 100 ms from the start and received 500 ms from the start.
170 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
171 /// assert!(eq(Instant::now(), start + ms(500)));
172 /// ```
after(duration: Duration) -> Receiver<Instant>173 pub fn after(duration: Duration) -> Receiver<Instant> {
174     Receiver {
175         flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
176     }
177 }
178 
179 /// Creates a receiver that delivers a message at a certain instant in time.
180 ///
181 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
182 /// be sent into the channel at the moment in time `when`. The message is the instant at which it
183 /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
184 /// instantly to the receiver.
185 ///
186 /// # Examples
187 ///
188 /// Using an `at` channel for timeouts:
189 ///
190 /// ```
191 /// use std::time::{Instant, Duration};
192 /// use crossbeam_channel::{at, select, unbounded};
193 ///
194 /// let (s, r) = unbounded::<i32>();
195 /// let deadline = Instant::now() + Duration::from_millis(500);
196 ///
197 /// select! {
198 ///     recv(r) -> msg => println!("received {:?}", msg),
199 ///     recv(at(deadline)) -> _ => println!("timed out"),
200 /// }
201 /// ```
202 ///
203 /// When the message gets sent:
204 ///
205 /// ```
206 /// use std::time::{Duration, Instant};
207 /// use crossbeam_channel::at;
208 ///
209 /// // Converts a number of milliseconds into a `Duration`.
210 /// let ms = |ms| Duration::from_millis(ms);
211 ///
212 /// let start = Instant::now();
213 /// let end = start + ms(100);
214 ///
215 /// let r = at(end);
216 ///
217 /// // This message was sent 100 ms from the start
218 /// assert_eq!(r.recv().unwrap(), end);
219 /// assert!(Instant::now() > start + ms(100));
220 /// ```
at(when: Instant) -> Receiver<Instant>221 pub fn at(when: Instant) -> Receiver<Instant> {
222     Receiver {
223         flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
224     }
225 }
226 
227 /// Creates a receiver that never delivers messages.
228 ///
229 /// The channel is bounded with capacity of 0 and never gets disconnected.
230 ///
231 /// # Examples
232 ///
233 /// Using a `never` channel to optionally add a timeout to [`select!`]:
234 ///
235 /// ```
236 /// use std::thread;
237 /// use std::time::Duration;
238 /// use crossbeam_channel::{after, select, never, unbounded};
239 ///
240 /// let (s, r) = unbounded();
241 ///
242 /// thread::spawn(move || {
243 ///     thread::sleep(Duration::from_secs(1));
244 ///     s.send(1).unwrap();
245 /// });
246 ///
247 /// // Suppose this duration can be a `Some` or a `None`.
248 /// let duration = Some(Duration::from_millis(100));
249 ///
250 /// // Create a channel that times out after the specified duration.
251 /// let timeout = duration
252 ///     .map(|d| after(d))
253 ///     .unwrap_or(never());
254 ///
255 /// select! {
256 ///     recv(r) -> msg => assert_eq!(msg, Ok(1)),
257 ///     recv(timeout) -> _ => println!("timed out"),
258 /// }
259 /// ```
never<T>() -> Receiver<T>260 pub fn never<T>() -> Receiver<T> {
261     Receiver {
262         flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
263     }
264 }
265 
266 /// Creates a receiver that delivers messages periodically.
267 ///
268 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
269 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
270 /// sent.
271 ///
272 /// # Examples
273 ///
274 /// Using a `tick` channel to periodically print elapsed time:
275 ///
276 /// ```
277 /// use std::time::{Duration, Instant};
278 /// use crossbeam_channel::tick;
279 ///
280 /// let start = Instant::now();
281 /// let ticker = tick(Duration::from_millis(100));
282 ///
283 /// for _ in 0..5 {
284 ///     ticker.recv().unwrap();
285 ///     println!("elapsed: {:?}", start.elapsed());
286 /// }
287 /// ```
288 ///
289 /// When messages get sent:
290 ///
291 /// ```
292 /// use std::thread;
293 /// use std::time::{Duration, Instant};
294 /// use crossbeam_channel::tick;
295 ///
296 /// // Converts a number of milliseconds into a `Duration`.
297 /// let ms = |ms| Duration::from_millis(ms);
298 ///
299 /// // Returns `true` if `a` and `b` are very close `Instant`s.
300 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
301 ///
302 /// let start = Instant::now();
303 /// let r = tick(ms(100));
304 ///
305 /// // This message was sent 100 ms from the start and received 100 ms from the start.
306 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
307 /// assert!(eq(Instant::now(), start + ms(100)));
308 ///
309 /// thread::sleep(ms(500));
310 ///
311 /// // This message was sent 200 ms from the start and received 600 ms from the start.
312 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
313 /// assert!(eq(Instant::now(), start + ms(600)));
314 ///
315 /// // This message was sent 700 ms from the start and received 700 ms from the start.
316 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
317 /// assert!(eq(Instant::now(), start + ms(700)));
318 /// ```
tick(duration: Duration) -> Receiver<Instant>319 pub fn tick(duration: Duration) -> Receiver<Instant> {
320     Receiver {
321         flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
322     }
323 }
324 
325 /// The sending side of a channel.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use std::thread;
331 /// use crossbeam_channel::unbounded;
332 ///
333 /// let (s1, r) = unbounded();
334 /// let s2 = s1.clone();
335 ///
336 /// thread::spawn(move || s1.send(1).unwrap());
337 /// thread::spawn(move || s2.send(2).unwrap());
338 ///
339 /// let msg1 = r.recv().unwrap();
340 /// let msg2 = r.recv().unwrap();
341 ///
342 /// assert_eq!(msg1 + msg2, 3);
343 /// ```
344 pub struct Sender<T> {
345     flavor: SenderFlavor<T>,
346 }
347 
348 /// Sender flavors.
349 enum SenderFlavor<T> {
350     /// Bounded channel based on a preallocated array.
351     Array(counter::Sender<flavors::array::Channel<T>>),
352 
353     /// Unbounded channel implemented as a linked list.
354     List(counter::Sender<flavors::list::Channel<T>>),
355 
356     /// Zero-capacity channel.
357     Zero(counter::Sender<flavors::zero::Channel<T>>),
358 }
359 
360 unsafe impl<T: Send> Send for Sender<T> {}
361 unsafe impl<T: Send> Sync for Sender<T> {}
362 
363 impl<T> UnwindSafe for Sender<T> {}
364 impl<T> RefUnwindSafe for Sender<T> {}
365 
366 impl<T> Sender<T> {
367     /// Attempts to send a message into the channel without blocking.
368     ///
369     /// This method will either send a message into the channel immediately or return an error if
370     /// the channel is full or disconnected. The returned error contains the original message.
371     ///
372     /// If called on a zero-capacity channel, this method will send the message only if there
373     /// happens to be a receive operation on the other side of the channel at the same time.
374     ///
375     /// # Examples
376     ///
377     /// ```
378     /// use crossbeam_channel::{bounded, TrySendError};
379     ///
380     /// let (s, r) = bounded(1);
381     ///
382     /// assert_eq!(s.try_send(1), Ok(()));
383     /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
384     ///
385     /// drop(r);
386     /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
387     /// ```
try_send(&self, msg: T) -> Result<(), TrySendError<T>>388     pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
389         match &self.flavor {
390             SenderFlavor::Array(chan) => chan.try_send(msg),
391             SenderFlavor::List(chan) => chan.try_send(msg),
392             SenderFlavor::Zero(chan) => chan.try_send(msg),
393         }
394     }
395 
396     /// Blocks the current thread until a message is sent or the channel is disconnected.
397     ///
398     /// If the channel is full and not disconnected, this call will block until the send operation
399     /// can proceed. If the channel becomes disconnected, this call will wake up and return an
400     /// error. The returned error contains the original message.
401     ///
402     /// If called on a zero-capacity channel, this method will wait for a receive operation to
403     /// appear on the other side of the channel.
404     ///
405     /// # Examples
406     ///
407     /// ```
408     /// use std::thread;
409     /// use std::time::Duration;
410     /// use crossbeam_channel::{bounded, SendError};
411     ///
412     /// let (s, r) = bounded(1);
413     /// assert_eq!(s.send(1), Ok(()));
414     ///
415     /// thread::spawn(move || {
416     ///     assert_eq!(r.recv(), Ok(1));
417     ///     thread::sleep(Duration::from_secs(1));
418     ///     drop(r);
419     /// });
420     ///
421     /// assert_eq!(s.send(2), Ok(()));
422     /// assert_eq!(s.send(3), Err(SendError(3)));
423     /// ```
send(&self, msg: T) -> Result<(), SendError<T>>424     pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
425         match &self.flavor {
426             SenderFlavor::Array(chan) => chan.send(msg, None),
427             SenderFlavor::List(chan) => chan.send(msg, None),
428             SenderFlavor::Zero(chan) => chan.send(msg, None),
429         }
430         .map_err(|err| match err {
431             SendTimeoutError::Disconnected(msg) => SendError(msg),
432             SendTimeoutError::Timeout(_) => unreachable!(),
433         })
434     }
435 
436     /// Waits for a message to be sent into the channel, but only for a limited time.
437     ///
438     /// If the channel is full and not disconnected, this call will block until the send operation
439     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
440     /// wake up and return an error. The returned error contains the original message.
441     ///
442     /// If called on a zero-capacity channel, this method will wait for a receive operation to
443     /// appear on the other side of the channel.
444     ///
445     /// # Examples
446     ///
447     /// ```
448     /// use std::thread;
449     /// use std::time::Duration;
450     /// use crossbeam_channel::{bounded, SendTimeoutError};
451     ///
452     /// let (s, r) = bounded(0);
453     ///
454     /// thread::spawn(move || {
455     ///     thread::sleep(Duration::from_secs(1));
456     ///     assert_eq!(r.recv(), Ok(2));
457     ///     drop(r);
458     /// });
459     ///
460     /// assert_eq!(
461     ///     s.send_timeout(1, Duration::from_millis(500)),
462     ///     Err(SendTimeoutError::Timeout(1)),
463     /// );
464     /// assert_eq!(
465     ///     s.send_timeout(2, Duration::from_secs(1)),
466     ///     Ok(()),
467     /// );
468     /// assert_eq!(
469     ///     s.send_timeout(3, Duration::from_millis(500)),
470     ///     Err(SendTimeoutError::Disconnected(3)),
471     /// );
472     /// ```
send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>473     pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
474         self.send_deadline(msg, Instant::now() + timeout)
475     }
476 
477     /// Waits for a message to be sent into the channel, but only until a given deadline.
478     ///
479     /// If the channel is full and not disconnected, this call will block until the send operation
480     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
481     /// wake up and return an error. The returned error contains the original message.
482     ///
483     /// If called on a zero-capacity channel, this method will wait for a receive operation to
484     /// appear on the other side of the channel.
485     ///
486     /// # Examples
487     ///
488     /// ```
489     /// use std::thread;
490     /// use std::time::{Duration, Instant};
491     /// use crossbeam_channel::{bounded, SendTimeoutError};
492     ///
493     /// let (s, r) = bounded(0);
494     ///
495     /// thread::spawn(move || {
496     ///     thread::sleep(Duration::from_secs(1));
497     ///     assert_eq!(r.recv(), Ok(2));
498     ///     drop(r);
499     /// });
500     ///
501     /// let now = Instant::now();
502     ///
503     /// assert_eq!(
504     ///     s.send_deadline(1, now + Duration::from_millis(500)),
505     ///     Err(SendTimeoutError::Timeout(1)),
506     /// );
507     /// assert_eq!(
508     ///     s.send_deadline(2, now + Duration::from_millis(1500)),
509     ///     Ok(()),
510     /// );
511     /// assert_eq!(
512     ///     s.send_deadline(3, now + Duration::from_millis(2000)),
513     ///     Err(SendTimeoutError::Disconnected(3)),
514     /// );
515     /// ```
send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>>516     pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
517         match &self.flavor {
518             SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
519             SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
520             SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
521         }
522     }
523 
524     /// Returns `true` if the channel is empty.
525     ///
526     /// Note: Zero-capacity channels are always empty.
527     ///
528     /// # Examples
529     ///
530     /// ```
531     /// use crossbeam_channel::unbounded;
532     ///
533     /// let (s, r) = unbounded();
534     /// assert!(s.is_empty());
535     ///
536     /// s.send(0).unwrap();
537     /// assert!(!s.is_empty());
538     /// ```
is_empty(&self) -> bool539     pub fn is_empty(&self) -> bool {
540         match &self.flavor {
541             SenderFlavor::Array(chan) => chan.is_empty(),
542             SenderFlavor::List(chan) => chan.is_empty(),
543             SenderFlavor::Zero(chan) => chan.is_empty(),
544         }
545     }
546 
547     /// Returns `true` if the channel is full.
548     ///
549     /// Note: Zero-capacity channels are always full.
550     ///
551     /// # Examples
552     ///
553     /// ```
554     /// use crossbeam_channel::bounded;
555     ///
556     /// let (s, r) = bounded(1);
557     ///
558     /// assert!(!s.is_full());
559     /// s.send(0).unwrap();
560     /// assert!(s.is_full());
561     /// ```
is_full(&self) -> bool562     pub fn is_full(&self) -> bool {
563         match &self.flavor {
564             SenderFlavor::Array(chan) => chan.is_full(),
565             SenderFlavor::List(chan) => chan.is_full(),
566             SenderFlavor::Zero(chan) => chan.is_full(),
567         }
568     }
569 
570     /// Returns the number of messages in the channel.
571     ///
572     /// # Examples
573     ///
574     /// ```
575     /// use crossbeam_channel::unbounded;
576     ///
577     /// let (s, r) = unbounded();
578     /// assert_eq!(s.len(), 0);
579     ///
580     /// s.send(1).unwrap();
581     /// s.send(2).unwrap();
582     /// assert_eq!(s.len(), 2);
583     /// ```
len(&self) -> usize584     pub fn len(&self) -> usize {
585         match &self.flavor {
586             SenderFlavor::Array(chan) => chan.len(),
587             SenderFlavor::List(chan) => chan.len(),
588             SenderFlavor::Zero(chan) => chan.len(),
589         }
590     }
591 
592     /// If the channel is bounded, returns its capacity.
593     ///
594     /// # Examples
595     ///
596     /// ```
597     /// use crossbeam_channel::{bounded, unbounded};
598     ///
599     /// let (s, _) = unbounded::<i32>();
600     /// assert_eq!(s.capacity(), None);
601     ///
602     /// let (s, _) = bounded::<i32>(5);
603     /// assert_eq!(s.capacity(), Some(5));
604     ///
605     /// let (s, _) = bounded::<i32>(0);
606     /// assert_eq!(s.capacity(), Some(0));
607     /// ```
capacity(&self) -> Option<usize>608     pub fn capacity(&self) -> Option<usize> {
609         match &self.flavor {
610             SenderFlavor::Array(chan) => chan.capacity(),
611             SenderFlavor::List(chan) => chan.capacity(),
612             SenderFlavor::Zero(chan) => chan.capacity(),
613         }
614     }
615 
616     /// Returns `true` if senders belong to the same channel.
617     ///
618     /// # Examples
619     ///
620     /// ```rust
621     /// use crossbeam_channel::unbounded;
622     ///
623     /// let (s, _) = unbounded::<usize>();
624     ///
625     /// let s2 = s.clone();
626     /// assert!(s.same_channel(&s2));
627     ///
628     /// let (s3, _) = unbounded();
629     /// assert!(!s.same_channel(&s3));
630     /// ```
same_channel(&self, other: &Sender<T>) -> bool631     pub fn same_channel(&self, other: &Sender<T>) -> bool {
632         match (&self.flavor, &other.flavor) {
633             (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
634             (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
635             (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
636             _ => false,
637         }
638     }
639 }
640 
641 impl<T> Drop for Sender<T> {
drop(&mut self)642     fn drop(&mut self) {
643         unsafe {
644             match &self.flavor {
645                 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
646                 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
647                 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
648             }
649         }
650     }
651 }
652 
653 impl<T> Clone for Sender<T> {
clone(&self) -> Self654     fn clone(&self) -> Self {
655         let flavor = match &self.flavor {
656             SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
657             SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
658             SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
659         };
660 
661         Sender { flavor }
662     }
663 }
664 
665 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result666     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667         f.pad("Sender { .. }")
668     }
669 }
670 
671 /// The receiving side of a channel.
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use std::thread;
677 /// use std::time::Duration;
678 /// use crossbeam_channel::unbounded;
679 ///
680 /// let (s, r) = unbounded();
681 ///
682 /// thread::spawn(move || {
683 ///     let _ = s.send(1);
684 ///     thread::sleep(Duration::from_secs(1));
685 ///     let _ = s.send(2);
686 /// });
687 ///
688 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
689 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
690 /// ```
691 pub struct Receiver<T> {
692     flavor: ReceiverFlavor<T>,
693 }
694 
695 /// Receiver flavors.
696 enum ReceiverFlavor<T> {
697     /// Bounded channel based on a preallocated array.
698     Array(counter::Receiver<flavors::array::Channel<T>>),
699 
700     /// Unbounded channel implemented as a linked list.
701     List(counter::Receiver<flavors::list::Channel<T>>),
702 
703     /// Zero-capacity channel.
704     Zero(counter::Receiver<flavors::zero::Channel<T>>),
705 
706     /// The after flavor.
707     At(Arc<flavors::at::Channel>),
708 
709     /// The tick flavor.
710     Tick(Arc<flavors::tick::Channel>),
711 
712     /// The never flavor.
713     Never(flavors::never::Channel<T>),
714 }
715 
716 unsafe impl<T: Send> Send for Receiver<T> {}
717 unsafe impl<T: Send> Sync for Receiver<T> {}
718 
719 impl<T> UnwindSafe for Receiver<T> {}
720 impl<T> RefUnwindSafe for Receiver<T> {}
721 
722 impl<T> Receiver<T> {
723     /// Attempts to receive a message from the channel without blocking.
724     ///
725     /// This method will either receive a message from the channel immediately or return an error
726     /// if the channel is empty.
727     ///
728     /// If called on a zero-capacity channel, this method will receive a message only if there
729     /// happens to be a send operation on the other side of the channel at the same time.
730     ///
731     /// # Examples
732     ///
733     /// ```
734     /// use crossbeam_channel::{unbounded, TryRecvError};
735     ///
736     /// let (s, r) = unbounded();
737     /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
738     ///
739     /// s.send(5).unwrap();
740     /// drop(s);
741     ///
742     /// assert_eq!(r.try_recv(), Ok(5));
743     /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
744     /// ```
try_recv(&self) -> Result<T, TryRecvError>745     pub fn try_recv(&self) -> Result<T, TryRecvError> {
746         match &self.flavor {
747             ReceiverFlavor::Array(chan) => chan.try_recv(),
748             ReceiverFlavor::List(chan) => chan.try_recv(),
749             ReceiverFlavor::Zero(chan) => chan.try_recv(),
750             ReceiverFlavor::At(chan) => {
751                 let msg = chan.try_recv();
752                 unsafe {
753                     mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
754                         &msg,
755                     )
756                 }
757             }
758             ReceiverFlavor::Tick(chan) => {
759                 let msg = chan.try_recv();
760                 unsafe {
761                     mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
762                         &msg,
763                     )
764                 }
765             }
766             ReceiverFlavor::Never(chan) => chan.try_recv(),
767         }
768     }
769 
770     /// Blocks the current thread until a message is received or the channel is empty and
771     /// disconnected.
772     ///
773     /// If the channel is empty and not disconnected, this call will block until the receive
774     /// operation can proceed. If the channel is empty and becomes disconnected, this call will
775     /// wake up and return an error.
776     ///
777     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
778     /// on the other side of the channel.
779     ///
780     /// # Examples
781     ///
782     /// ```
783     /// use std::thread;
784     /// use std::time::Duration;
785     /// use crossbeam_channel::{unbounded, RecvError};
786     ///
787     /// let (s, r) = unbounded();
788     ///
789     /// thread::spawn(move || {
790     ///     thread::sleep(Duration::from_secs(1));
791     ///     s.send(5).unwrap();
792     ///     drop(s);
793     /// });
794     ///
795     /// assert_eq!(r.recv(), Ok(5));
796     /// assert_eq!(r.recv(), Err(RecvError));
797     /// ```
recv(&self) -> Result<T, RecvError>798     pub fn recv(&self) -> Result<T, RecvError> {
799         match &self.flavor {
800             ReceiverFlavor::Array(chan) => chan.recv(None),
801             ReceiverFlavor::List(chan) => chan.recv(None),
802             ReceiverFlavor::Zero(chan) => chan.recv(None),
803             ReceiverFlavor::At(chan) => {
804                 let msg = chan.recv(None);
805                 unsafe {
806                     mem::transmute_copy::<
807                         Result<Instant, RecvTimeoutError>,
808                         Result<T, RecvTimeoutError>,
809                     >(&msg)
810                 }
811             }
812             ReceiverFlavor::Tick(chan) => {
813                 let msg = chan.recv(None);
814                 unsafe {
815                     mem::transmute_copy::<
816                         Result<Instant, RecvTimeoutError>,
817                         Result<T, RecvTimeoutError>,
818                     >(&msg)
819                 }
820             }
821             ReceiverFlavor::Never(chan) => chan.recv(None),
822         }
823         .map_err(|_| RecvError)
824     }
825 
826     /// Waits for a message to be received from the channel, but only for a limited time.
827     ///
828     /// If the channel is empty and not disconnected, this call will block until the receive
829     /// operation can proceed or the operation times out. If the channel is empty and becomes
830     /// disconnected, this call will wake up and return an error.
831     ///
832     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
833     /// on the other side of the channel.
834     ///
835     /// # Examples
836     ///
837     /// ```
838     /// use std::thread;
839     /// use std::time::Duration;
840     /// use crossbeam_channel::{unbounded, RecvTimeoutError};
841     ///
842     /// let (s, r) = unbounded();
843     ///
844     /// thread::spawn(move || {
845     ///     thread::sleep(Duration::from_secs(1));
846     ///     s.send(5).unwrap();
847     ///     drop(s);
848     /// });
849     ///
850     /// assert_eq!(
851     ///     r.recv_timeout(Duration::from_millis(500)),
852     ///     Err(RecvTimeoutError::Timeout),
853     /// );
854     /// assert_eq!(
855     ///     r.recv_timeout(Duration::from_secs(1)),
856     ///     Ok(5),
857     /// );
858     /// assert_eq!(
859     ///     r.recv_timeout(Duration::from_secs(1)),
860     ///     Err(RecvTimeoutError::Disconnected),
861     /// );
862     /// ```
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>863     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
864         self.recv_deadline(Instant::now() + timeout)
865     }
866 
867     /// Waits for a message to be received from the channel, but only before a given deadline.
868     ///
869     /// If the channel is empty and not disconnected, this call will block until the receive
870     /// operation can proceed or the operation times out. If the channel is empty and becomes
871     /// disconnected, this call will wake up and return an error.
872     ///
873     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
874     /// on the other side of the channel.
875     ///
876     /// # Examples
877     ///
878     /// ```
879     /// use std::thread;
880     /// use std::time::{Instant, Duration};
881     /// use crossbeam_channel::{unbounded, RecvTimeoutError};
882     ///
883     /// let (s, r) = unbounded();
884     ///
885     /// thread::spawn(move || {
886     ///     thread::sleep(Duration::from_secs(1));
887     ///     s.send(5).unwrap();
888     ///     drop(s);
889     /// });
890     ///
891     /// let now = Instant::now();
892     ///
893     /// assert_eq!(
894     ///     r.recv_deadline(now + Duration::from_millis(500)),
895     ///     Err(RecvTimeoutError::Timeout),
896     /// );
897     /// assert_eq!(
898     ///     r.recv_deadline(now + Duration::from_millis(1500)),
899     ///     Ok(5),
900     /// );
901     /// assert_eq!(
902     ///     r.recv_deadline(now + Duration::from_secs(5)),
903     ///     Err(RecvTimeoutError::Disconnected),
904     /// );
905     /// ```
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>906     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
907         match &self.flavor {
908             ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
909             ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
910             ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
911             ReceiverFlavor::At(chan) => {
912                 let msg = chan.recv(Some(deadline));
913                 unsafe {
914                     mem::transmute_copy::<
915                         Result<Instant, RecvTimeoutError>,
916                         Result<T, RecvTimeoutError>,
917                     >(&msg)
918                 }
919             }
920             ReceiverFlavor::Tick(chan) => {
921                 let msg = chan.recv(Some(deadline));
922                 unsafe {
923                     mem::transmute_copy::<
924                         Result<Instant, RecvTimeoutError>,
925                         Result<T, RecvTimeoutError>,
926                     >(&msg)
927                 }
928             }
929             ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
930         }
931     }
932 
933     /// Returns `true` if the channel is empty.
934     ///
935     /// Note: Zero-capacity channels are always empty.
936     ///
937     /// # Examples
938     ///
939     /// ```
940     /// use crossbeam_channel::unbounded;
941     ///
942     /// let (s, r) = unbounded();
943     ///
944     /// assert!(r.is_empty());
945     /// s.send(0).unwrap();
946     /// assert!(!r.is_empty());
947     /// ```
is_empty(&self) -> bool948     pub fn is_empty(&self) -> bool {
949         match &self.flavor {
950             ReceiverFlavor::Array(chan) => chan.is_empty(),
951             ReceiverFlavor::List(chan) => chan.is_empty(),
952             ReceiverFlavor::Zero(chan) => chan.is_empty(),
953             ReceiverFlavor::At(chan) => chan.is_empty(),
954             ReceiverFlavor::Tick(chan) => chan.is_empty(),
955             ReceiverFlavor::Never(chan) => chan.is_empty(),
956         }
957     }
958 
959     /// Returns `true` if the channel is full.
960     ///
961     /// Note: Zero-capacity channels are always full.
962     ///
963     /// # Examples
964     ///
965     /// ```
966     /// use crossbeam_channel::bounded;
967     ///
968     /// let (s, r) = bounded(1);
969     ///
970     /// assert!(!r.is_full());
971     /// s.send(0).unwrap();
972     /// assert!(r.is_full());
973     /// ```
is_full(&self) -> bool974     pub fn is_full(&self) -> bool {
975         match &self.flavor {
976             ReceiverFlavor::Array(chan) => chan.is_full(),
977             ReceiverFlavor::List(chan) => chan.is_full(),
978             ReceiverFlavor::Zero(chan) => chan.is_full(),
979             ReceiverFlavor::At(chan) => chan.is_full(),
980             ReceiverFlavor::Tick(chan) => chan.is_full(),
981             ReceiverFlavor::Never(chan) => chan.is_full(),
982         }
983     }
984 
985     /// Returns the number of messages in the channel.
986     ///
987     /// # Examples
988     ///
989     /// ```
990     /// use crossbeam_channel::unbounded;
991     ///
992     /// let (s, r) = unbounded();
993     /// assert_eq!(r.len(), 0);
994     ///
995     /// s.send(1).unwrap();
996     /// s.send(2).unwrap();
997     /// assert_eq!(r.len(), 2);
998     /// ```
len(&self) -> usize999     pub fn len(&self) -> usize {
1000         match &self.flavor {
1001             ReceiverFlavor::Array(chan) => chan.len(),
1002             ReceiverFlavor::List(chan) => chan.len(),
1003             ReceiverFlavor::Zero(chan) => chan.len(),
1004             ReceiverFlavor::At(chan) => chan.len(),
1005             ReceiverFlavor::Tick(chan) => chan.len(),
1006             ReceiverFlavor::Never(chan) => chan.len(),
1007         }
1008     }
1009 
1010     /// If the channel is bounded, returns its capacity.
1011     ///
1012     /// # Examples
1013     ///
1014     /// ```
1015     /// use crossbeam_channel::{bounded, unbounded};
1016     ///
1017     /// let (_, r) = unbounded::<i32>();
1018     /// assert_eq!(r.capacity(), None);
1019     ///
1020     /// let (_, r) = bounded::<i32>(5);
1021     /// assert_eq!(r.capacity(), Some(5));
1022     ///
1023     /// let (_, r) = bounded::<i32>(0);
1024     /// assert_eq!(r.capacity(), Some(0));
1025     /// ```
capacity(&self) -> Option<usize>1026     pub fn capacity(&self) -> Option<usize> {
1027         match &self.flavor {
1028             ReceiverFlavor::Array(chan) => chan.capacity(),
1029             ReceiverFlavor::List(chan) => chan.capacity(),
1030             ReceiverFlavor::Zero(chan) => chan.capacity(),
1031             ReceiverFlavor::At(chan) => chan.capacity(),
1032             ReceiverFlavor::Tick(chan) => chan.capacity(),
1033             ReceiverFlavor::Never(chan) => chan.capacity(),
1034         }
1035     }
1036 
1037     /// A blocking iterator over messages in the channel.
1038     ///
1039     /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1040     /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1041     ///
1042     /// [`next`]: Iterator::next
1043     ///
1044     /// # Examples
1045     ///
1046     /// ```
1047     /// use std::thread;
1048     /// use crossbeam_channel::unbounded;
1049     ///
1050     /// let (s, r) = unbounded();
1051     ///
1052     /// thread::spawn(move || {
1053     ///     s.send(1).unwrap();
1054     ///     s.send(2).unwrap();
1055     ///     s.send(3).unwrap();
1056     ///     drop(s); // Disconnect the channel.
1057     /// });
1058     ///
1059     /// // Collect all messages from the channel.
1060     /// // Note that the call to `collect` blocks until the sender is dropped.
1061     /// let v: Vec<_> = r.iter().collect();
1062     ///
1063     /// assert_eq!(v, [1, 2, 3]);
1064     /// ```
iter(&self) -> Iter<'_, T>1065     pub fn iter(&self) -> Iter<'_, T> {
1066         Iter { receiver: self }
1067     }
1068 
1069     /// A non-blocking iterator over messages in the channel.
1070     ///
1071     /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1072     /// never blocks waiting for the next message.
1073     ///
1074     /// [`next`]: Iterator::next
1075     ///
1076     /// # Examples
1077     ///
1078     /// ```
1079     /// use std::thread;
1080     /// use std::time::Duration;
1081     /// use crossbeam_channel::unbounded;
1082     ///
1083     /// let (s, r) = unbounded::<i32>();
1084     ///
1085     /// thread::spawn(move || {
1086     ///     s.send(1).unwrap();
1087     ///     thread::sleep(Duration::from_secs(1));
1088     ///     s.send(2).unwrap();
1089     ///     thread::sleep(Duration::from_secs(2));
1090     ///     s.send(3).unwrap();
1091     /// });
1092     ///
1093     /// thread::sleep(Duration::from_secs(2));
1094     ///
1095     /// // Collect all messages from the channel without blocking.
1096     /// // The third message hasn't been sent yet so we'll collect only the first two.
1097     /// let v: Vec<_> = r.try_iter().collect();
1098     ///
1099     /// assert_eq!(v, [1, 2]);
1100     /// ```
try_iter(&self) -> TryIter<'_, T>1101     pub fn try_iter(&self) -> TryIter<'_, T> {
1102         TryIter { receiver: self }
1103     }
1104 
1105     /// Returns `true` if receivers belong to the same channel.
1106     ///
1107     /// # Examples
1108     ///
1109     /// ```rust
1110     /// use crossbeam_channel::unbounded;
1111     ///
1112     /// let (_, r) = unbounded::<usize>();
1113     ///
1114     /// let r2 = r.clone();
1115     /// assert!(r.same_channel(&r2));
1116     ///
1117     /// let (_, r3) = unbounded();
1118     /// assert!(!r.same_channel(&r3));
1119     /// ```
same_channel(&self, other: &Receiver<T>) -> bool1120     pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1121         match (&self.flavor, &other.flavor) {
1122             (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1123             (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1124             (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1125             (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1126             (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1127             (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1128             _ => false,
1129         }
1130     }
1131 }
1132 
1133 impl<T> Drop for Receiver<T> {
drop(&mut self)1134     fn drop(&mut self) {
1135         unsafe {
1136             match &self.flavor {
1137                 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1138                 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1139                 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1140                 ReceiverFlavor::At(_) => {}
1141                 ReceiverFlavor::Tick(_) => {}
1142                 ReceiverFlavor::Never(_) => {}
1143             }
1144         }
1145     }
1146 }
1147 
1148 impl<T> Clone for Receiver<T> {
clone(&self) -> Self1149     fn clone(&self) -> Self {
1150         let flavor = match &self.flavor {
1151             ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1152             ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1153             ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1154             ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1155             ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1156             ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1157         };
1158 
1159         Receiver { flavor }
1160     }
1161 }
1162 
1163 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1164     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165         f.pad("Receiver { .. }")
1166     }
1167 }
1168 
1169 impl<'a, T> IntoIterator for &'a Receiver<T> {
1170     type Item = T;
1171     type IntoIter = Iter<'a, T>;
1172 
into_iter(self) -> Self::IntoIter1173     fn into_iter(self) -> Self::IntoIter {
1174         self.iter()
1175     }
1176 }
1177 
1178 impl<T> IntoIterator for Receiver<T> {
1179     type Item = T;
1180     type IntoIter = IntoIter<T>;
1181 
into_iter(self) -> Self::IntoIter1182     fn into_iter(self) -> Self::IntoIter {
1183         IntoIter { receiver: self }
1184     }
1185 }
1186 
1187 /// A blocking iterator over messages in a channel.
1188 ///
1189 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1190 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1191 ///
1192 /// [`next`]: Iterator::next
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```
1197 /// use std::thread;
1198 /// use crossbeam_channel::unbounded;
1199 ///
1200 /// let (s, r) = unbounded();
1201 ///
1202 /// thread::spawn(move || {
1203 ///     s.send(1).unwrap();
1204 ///     s.send(2).unwrap();
1205 ///     s.send(3).unwrap();
1206 ///     drop(s); // Disconnect the channel.
1207 /// });
1208 ///
1209 /// // Collect all messages from the channel.
1210 /// // Note that the call to `collect` blocks until the sender is dropped.
1211 /// let v: Vec<_> = r.iter().collect();
1212 ///
1213 /// assert_eq!(v, [1, 2, 3]);
1214 /// ```
1215 pub struct Iter<'a, T> {
1216     receiver: &'a Receiver<T>,
1217 }
1218 
1219 impl<T> FusedIterator for Iter<'_, T> {}
1220 
1221 impl<T> Iterator for Iter<'_, T> {
1222     type Item = T;
1223 
next(&mut self) -> Option<Self::Item>1224     fn next(&mut self) -> Option<Self::Item> {
1225         self.receiver.recv().ok()
1226     }
1227 }
1228 
1229 impl<T> fmt::Debug for Iter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1230     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1231         f.pad("Iter { .. }")
1232     }
1233 }
1234 
1235 /// A non-blocking iterator over messages in a channel.
1236 ///
1237 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1238 /// never blocks waiting for the next message.
1239 ///
1240 /// [`next`]: Iterator::next
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```
1245 /// use std::thread;
1246 /// use std::time::Duration;
1247 /// use crossbeam_channel::unbounded;
1248 ///
1249 /// let (s, r) = unbounded::<i32>();
1250 ///
1251 /// thread::spawn(move || {
1252 ///     s.send(1).unwrap();
1253 ///     thread::sleep(Duration::from_secs(1));
1254 ///     s.send(2).unwrap();
1255 ///     thread::sleep(Duration::from_secs(2));
1256 ///     s.send(3).unwrap();
1257 /// });
1258 ///
1259 /// thread::sleep(Duration::from_secs(2));
1260 ///
1261 /// // Collect all messages from the channel without blocking.
1262 /// // The third message hasn't been sent yet so we'll collect only the first two.
1263 /// let v: Vec<_> = r.try_iter().collect();
1264 ///
1265 /// assert_eq!(v, [1, 2]);
1266 /// ```
1267 pub struct TryIter<'a, T> {
1268     receiver: &'a Receiver<T>,
1269 }
1270 
1271 impl<T> Iterator for TryIter<'_, T> {
1272     type Item = T;
1273 
next(&mut self) -> Option<Self::Item>1274     fn next(&mut self) -> Option<Self::Item> {
1275         self.receiver.try_recv().ok()
1276     }
1277 }
1278 
1279 impl<T> fmt::Debug for TryIter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1280     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1281         f.pad("TryIter { .. }")
1282     }
1283 }
1284 
1285 /// A blocking iterator over messages in a channel.
1286 ///
1287 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1288 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1289 ///
1290 /// [`next`]: Iterator::next
1291 ///
1292 /// # Examples
1293 ///
1294 /// ```
1295 /// use std::thread;
1296 /// use crossbeam_channel::unbounded;
1297 ///
1298 /// let (s, r) = unbounded();
1299 ///
1300 /// thread::spawn(move || {
1301 ///     s.send(1).unwrap();
1302 ///     s.send(2).unwrap();
1303 ///     s.send(3).unwrap();
1304 ///     drop(s); // Disconnect the channel.
1305 /// });
1306 ///
1307 /// // Collect all messages from the channel.
1308 /// // Note that the call to `collect` blocks until the sender is dropped.
1309 /// let v: Vec<_> = r.into_iter().collect();
1310 ///
1311 /// assert_eq!(v, [1, 2, 3]);
1312 /// ```
1313 pub struct IntoIter<T> {
1314     receiver: Receiver<T>,
1315 }
1316 
1317 impl<T> FusedIterator for IntoIter<T> {}
1318 
1319 impl<T> Iterator for IntoIter<T> {
1320     type Item = T;
1321 
next(&mut self) -> Option<Self::Item>1322     fn next(&mut self) -> Option<Self::Item> {
1323         self.receiver.recv().ok()
1324     }
1325 }
1326 
1327 impl<T> fmt::Debug for IntoIter<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1328     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1329         f.pad("IntoIter { .. }")
1330     }
1331 }
1332 
1333 impl<T> SelectHandle for Sender<T> {
try_select(&self, token: &mut Token) -> bool1334     fn try_select(&self, token: &mut Token) -> bool {
1335         match &self.flavor {
1336             SenderFlavor::Array(chan) => chan.sender().try_select(token),
1337             SenderFlavor::List(chan) => chan.sender().try_select(token),
1338             SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1339         }
1340     }
1341 
deadline(&self) -> Option<Instant>1342     fn deadline(&self) -> Option<Instant> {
1343         None
1344     }
1345 
register(&self, oper: Operation, cx: &Context) -> bool1346     fn register(&self, oper: Operation, cx: &Context) -> bool {
1347         match &self.flavor {
1348             SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1349             SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1350             SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1351         }
1352     }
1353 
unregister(&self, oper: Operation)1354     fn unregister(&self, oper: Operation) {
1355         match &self.flavor {
1356             SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1357             SenderFlavor::List(chan) => chan.sender().unregister(oper),
1358             SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1359         }
1360     }
1361 
accept(&self, token: &mut Token, cx: &Context) -> bool1362     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1363         match &self.flavor {
1364             SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1365             SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1366             SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1367         }
1368     }
1369 
is_ready(&self) -> bool1370     fn is_ready(&self) -> bool {
1371         match &self.flavor {
1372             SenderFlavor::Array(chan) => chan.sender().is_ready(),
1373             SenderFlavor::List(chan) => chan.sender().is_ready(),
1374             SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1375         }
1376     }
1377 
watch(&self, oper: Operation, cx: &Context) -> bool1378     fn watch(&self, oper: Operation, cx: &Context) -> bool {
1379         match &self.flavor {
1380             SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1381             SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1382             SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1383         }
1384     }
1385 
unwatch(&self, oper: Operation)1386     fn unwatch(&self, oper: Operation) {
1387         match &self.flavor {
1388             SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1389             SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1390             SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1391         }
1392     }
1393 }
1394 
1395 impl<T> SelectHandle for Receiver<T> {
try_select(&self, token: &mut Token) -> bool1396     fn try_select(&self, token: &mut Token) -> bool {
1397         match &self.flavor {
1398             ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1399             ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1400             ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1401             ReceiverFlavor::At(chan) => chan.try_select(token),
1402             ReceiverFlavor::Tick(chan) => chan.try_select(token),
1403             ReceiverFlavor::Never(chan) => chan.try_select(token),
1404         }
1405     }
1406 
deadline(&self) -> Option<Instant>1407     fn deadline(&self) -> Option<Instant> {
1408         match &self.flavor {
1409             ReceiverFlavor::Array(_) => None,
1410             ReceiverFlavor::List(_) => None,
1411             ReceiverFlavor::Zero(_) => None,
1412             ReceiverFlavor::At(chan) => chan.deadline(),
1413             ReceiverFlavor::Tick(chan) => chan.deadline(),
1414             ReceiverFlavor::Never(chan) => chan.deadline(),
1415         }
1416     }
1417 
register(&self, oper: Operation, cx: &Context) -> bool1418     fn register(&self, oper: Operation, cx: &Context) -> bool {
1419         match &self.flavor {
1420             ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1421             ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1422             ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1423             ReceiverFlavor::At(chan) => chan.register(oper, cx),
1424             ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1425             ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1426         }
1427     }
1428 
unregister(&self, oper: Operation)1429     fn unregister(&self, oper: Operation) {
1430         match &self.flavor {
1431             ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1432             ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1433             ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1434             ReceiverFlavor::At(chan) => chan.unregister(oper),
1435             ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1436             ReceiverFlavor::Never(chan) => chan.unregister(oper),
1437         }
1438     }
1439 
accept(&self, token: &mut Token, cx: &Context) -> bool1440     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1441         match &self.flavor {
1442             ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1443             ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1444             ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1445             ReceiverFlavor::At(chan) => chan.accept(token, cx),
1446             ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1447             ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1448         }
1449     }
1450 
is_ready(&self) -> bool1451     fn is_ready(&self) -> bool {
1452         match &self.flavor {
1453             ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1454             ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1455             ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1456             ReceiverFlavor::At(chan) => chan.is_ready(),
1457             ReceiverFlavor::Tick(chan) => chan.is_ready(),
1458             ReceiverFlavor::Never(chan) => chan.is_ready(),
1459         }
1460     }
1461 
watch(&self, oper: Operation, cx: &Context) -> bool1462     fn watch(&self, oper: Operation, cx: &Context) -> bool {
1463         match &self.flavor {
1464             ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1465             ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1466             ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1467             ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1468             ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1469             ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1470         }
1471     }
1472 
unwatch(&self, oper: Operation)1473     fn unwatch(&self, oper: Operation) {
1474         match &self.flavor {
1475             ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1476             ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1477             ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1478             ReceiverFlavor::At(chan) => chan.unwatch(oper),
1479             ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1480             ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1481         }
1482     }
1483 }
1484 
1485 /// Writes a message into the channel.
write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T>1486 pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1487     match &s.flavor {
1488         SenderFlavor::Array(chan) => chan.write(token, msg),
1489         SenderFlavor::List(chan) => chan.write(token, msg),
1490         SenderFlavor::Zero(chan) => chan.write(token, msg),
1491     }
1492 }
1493 
1494 /// Reads a message from the channel.
read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()>1495 pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1496     match &r.flavor {
1497         ReceiverFlavor::Array(chan) => chan.read(token),
1498         ReceiverFlavor::List(chan) => chan.read(token),
1499         ReceiverFlavor::Zero(chan) => chan.read(token),
1500         ReceiverFlavor::At(chan) => {
1501             mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1502         }
1503         ReceiverFlavor::Tick(chan) => {
1504             mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1505         }
1506         ReceiverFlavor::Never(chan) => chan.read(token),
1507     }
1508 }
1509