1 //! Interface to the select mechanism.
2 
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::Backoff;
9 
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17 
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
23 pub struct Token {
24     pub at: flavors::at::AtToken,
25     pub array: flavors::array::ArrayToken,
26     pub list: flavors::list::ListToken,
27     pub never: flavors::never::NeverToken,
28     pub tick: flavors::tick::TickToken,
29     pub zero: flavors::zero::ZeroToken,
30 }
31 
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
35 
36 impl Operation {
37     /// Creates an operation identifier from a mutable reference.
38     ///
39     /// This function essentially just turns the address of the reference into a number. The
40     /// reference should point to a variable that is specific to the thread and the operation,
41     /// and is alive for the entire duration of select or blocking operation.
42     #[inline]
hook<T>(r: &mut T) -> Operation43     pub fn hook<T>(r: &mut T) -> Operation {
44         let val = r as *mut T as usize;
45         // Make sure that the pointer address doesn't equal the numerical representation of
46         // `Selected::{Waiting, Aborted, Disconnected}`.
47         assert!(val > 2);
48         Operation(val)
49     }
50 }
51 
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
54 pub enum Selected {
55     /// Still waiting for an operation.
56     Waiting,
57 
58     /// The attempt to block the current thread has been aborted.
59     Aborted,
60 
61     /// An operation became ready because a channel is disconnected.
62     Disconnected,
63 
64     /// An operation became ready because a message can be sent or received.
65     Operation(Operation),
66 }
67 
68 impl From<usize> for Selected {
69     #[inline]
from(val: usize) -> Selected70     fn from(val: usize) -> Selected {
71         match val {
72             0 => Selected::Waiting,
73             1 => Selected::Aborted,
74             2 => Selected::Disconnected,
75             oper => Selected::Operation(Operation(oper)),
76         }
77     }
78 }
79 
80 impl Into<usize> for Selected {
81     #[inline]
into(self) -> usize82     fn into(self) -> usize {
83         match self {
84             Selected::Waiting => 0,
85             Selected::Aborted => 1,
86             Selected::Disconnected => 2,
87             Selected::Operation(Operation(val)) => val,
88         }
89     }
90 }
91 
92 /// A receiver or a sender that can participate in select.
93 ///
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle {
97     /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool98     fn try_select(&self, token: &mut Token) -> bool;
99 
100     /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>101     fn deadline(&self) -> Option<Instant>;
102 
103     /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool104     fn register(&self, oper: Operation, cx: &Context) -> bool;
105 
106     /// Unregisters an operation for execution.
unregister(&self, oper: Operation)107     fn unregister(&self, oper: Operation);
108 
109     /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool110     fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111 
112     /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool113     fn is_ready(&self) -> bool;
114 
115     /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool116     fn watch(&self, oper: Operation, cx: &Context) -> bool;
117 
118     /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)119     fn unwatch(&self, oper: Operation);
120 }
121 
122 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool123     fn try_select(&self, token: &mut Token) -> bool {
124         (**self).try_select(token)
125     }
126 
deadline(&self) -> Option<Instant>127     fn deadline(&self) -> Option<Instant> {
128         (**self).deadline()
129     }
130 
register(&self, oper: Operation, cx: &Context) -> bool131     fn register(&self, oper: Operation, cx: &Context) -> bool {
132         (**self).register(oper, cx)
133     }
134 
unregister(&self, oper: Operation)135     fn unregister(&self, oper: Operation) {
136         (**self).unregister(oper);
137     }
138 
accept(&self, token: &mut Token, cx: &Context) -> bool139     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140         (**self).accept(token, cx)
141     }
142 
is_ready(&self) -> bool143     fn is_ready(&self) -> bool {
144         (**self).is_ready()
145     }
146 
watch(&self, oper: Operation, cx: &Context) -> bool147     fn watch(&self, oper: Operation, cx: &Context) -> bool {
148         (**self).watch(oper, cx)
149     }
150 
unwatch(&self, oper: Operation)151     fn unwatch(&self, oper: Operation) {
152         (**self).unwatch(oper)
153     }
154 }
155 
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
158 enum Timeout {
159     /// No blocking.
160     Now,
161 
162     /// Block forever.
163     Never,
164 
165     /// Time out after the time instant.
166     At(Instant),
167 }
168 
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
170 ///
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>173 fn run_select(
174     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175     timeout: Timeout,
176 ) -> Option<(Token, usize, *const u8)> {
177     if handles.is_empty() {
178         // Wait until the timeout and return.
179         match timeout {
180             Timeout::Now => return None,
181             Timeout::Never => {
182                 utils::sleep_until(None);
183                 unreachable!();
184             }
185             Timeout::At(when) => {
186                 utils::sleep_until(Some(when));
187                 return None;
188             }
189         }
190     }
191 
192     // Shuffle the operations for fairness.
193     utils::shuffle(handles);
194 
195     // Create a token, which serves as a temporary variable that gets initialized in this function
196     // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197     // selected operation.
198     let mut token = Token::default();
199 
200     // Try selecting one of the operations without blocking.
201     for &(handle, i, ptr) in handles.iter() {
202         if handle.try_select(&mut token) {
203             return Some((token, i, ptr));
204         }
205     }
206 
207     loop {
208         // Prepare for blocking.
209         let res = Context::with(|cx| {
210             let mut sel = Selected::Waiting;
211             let mut registered_count = 0;
212             let mut index_ready = None;
213 
214             if let Timeout::Now = timeout {
215                 cx.try_select(Selected::Aborted).unwrap();
216             }
217 
218             // Register all operations.
219             for (handle, i, _) in handles.iter_mut() {
220                 registered_count += 1;
221 
222                 // If registration returns `false`, that means the operation has just become ready.
223                 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224                     // Try aborting select.
225                     sel = match cx.try_select(Selected::Aborted) {
226                         Ok(()) => {
227                             index_ready = Some(*i);
228                             Selected::Aborted
229                         }
230                         Err(s) => s,
231                     };
232                     break;
233                 }
234 
235                 // If another thread has already selected one of the operations, stop registration.
236                 sel = cx.selected();
237                 if sel != Selected::Waiting {
238                     break;
239                 }
240             }
241 
242             if sel == Selected::Waiting {
243                 // Check with each operation for how long we're allowed to block, and compute the
244                 // earliest deadline.
245                 let mut deadline: Option<Instant> = match timeout {
246                     Timeout::Now => return None,
247                     Timeout::Never => None,
248                     Timeout::At(when) => Some(when),
249                 };
250                 for &(handle, _, _) in handles.iter() {
251                     if let Some(x) = handle.deadline() {
252                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
253                     }
254                 }
255 
256                 // Block the current thread.
257                 sel = cx.wait_until(deadline);
258             }
259 
260             // Unregister all registered operations.
261             for (handle, _, _) in handles.iter_mut().take(registered_count) {
262                 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263             }
264 
265             match sel {
266                 Selected::Waiting => unreachable!(),
267                 Selected::Aborted => {
268                     // If an operation became ready during registration, try selecting it.
269                     if let Some(index_ready) = index_ready {
270                         for &(handle, i, ptr) in handles.iter() {
271                             if i == index_ready && handle.try_select(&mut token) {
272                                 return Some((i, ptr));
273                             }
274                         }
275                     }
276                 }
277                 Selected::Disconnected => {}
278                 Selected::Operation(_) => {
279                     // Find the selected operation.
280                     for (handle, i, ptr) in handles.iter_mut() {
281                         // Is this the selected operation?
282                         if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283                         {
284                             // Try selecting this operation.
285                             if handle.accept(&mut token, cx) {
286                                 return Some((*i, *ptr));
287                             }
288                         }
289                     }
290                 }
291             }
292 
293             None
294         });
295 
296         // Return if an operation was selected.
297         if let Some((i, ptr)) = res {
298             return Some((token, i, ptr));
299         }
300 
301         // Try selecting one of the operations without blocking.
302         for &(handle, i, ptr) in handles.iter() {
303             if handle.try_select(&mut token) {
304                 return Some((token, i, ptr));
305             }
306         }
307 
308         match timeout {
309             Timeout::Now => return None,
310             Timeout::Never => {}
311             Timeout::At(when) => {
312                 if Instant::now() >= when {
313                     return None;
314                 }
315             }
316         }
317     }
318 }
319 
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>321 fn run_ready(
322     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323     timeout: Timeout,
324 ) -> Option<usize> {
325     if handles.is_empty() {
326         // Wait until the timeout and return.
327         match timeout {
328             Timeout::Now => return None,
329             Timeout::Never => {
330                 utils::sleep_until(None);
331                 unreachable!();
332             }
333             Timeout::At(when) => {
334                 utils::sleep_until(Some(when));
335                 return None;
336             }
337         }
338     }
339 
340     // Shuffle the operations for fairness.
341     utils::shuffle(handles);
342 
343     loop {
344         let backoff = Backoff::new();
345         loop {
346             // Check operations for readiness.
347             for &(handle, i, _) in handles.iter() {
348                 if handle.is_ready() {
349                     return Some(i);
350                 }
351             }
352 
353             if backoff.is_completed() {
354                 break;
355             } else {
356                 backoff.snooze();
357             }
358         }
359 
360         // Check for timeout.
361         match timeout {
362             Timeout::Now => return None,
363             Timeout::Never => {}
364             Timeout::At(when) => {
365                 if Instant::now() >= when {
366                     return None;
367                 }
368             }
369         }
370 
371         // Prepare for blocking.
372         let res = Context::with(|cx| {
373             let mut sel = Selected::Waiting;
374             let mut registered_count = 0;
375 
376             // Begin watching all operations.
377             for (handle, _, _) in handles.iter_mut() {
378                 registered_count += 1;
379                 let oper = Operation::hook::<&dyn SelectHandle>(handle);
380 
381                 // If registration returns `false`, that means the operation has just become ready.
382                 if handle.watch(oper, cx) {
383                     sel = match cx.try_select(Selected::Operation(oper)) {
384                         Ok(()) => Selected::Operation(oper),
385                         Err(s) => s,
386                     };
387                     break;
388                 }
389 
390                 // If another thread has already chosen one of the operations, stop registration.
391                 sel = cx.selected();
392                 if sel != Selected::Waiting {
393                     break;
394                 }
395             }
396 
397             if sel == Selected::Waiting {
398                 // Check with each operation for how long we're allowed to block, and compute the
399                 // earliest deadline.
400                 let mut deadline: Option<Instant> = match timeout {
401                     Timeout::Now => unreachable!(),
402                     Timeout::Never => None,
403                     Timeout::At(when) => Some(when),
404                 };
405                 for &(handle, _, _) in handles.iter() {
406                     if let Some(x) = handle.deadline() {
407                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
408                     }
409                 }
410 
411                 // Block the current thread.
412                 sel = cx.wait_until(deadline);
413             }
414 
415             // Unwatch all operations.
416             for (handle, _, _) in handles.iter_mut().take(registered_count) {
417                 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418             }
419 
420             match sel {
421                 Selected::Waiting => unreachable!(),
422                 Selected::Aborted => {}
423                 Selected::Disconnected => {}
424                 Selected::Operation(_) => {
425                     for (handle, i, _) in handles.iter_mut() {
426                         let oper = Operation::hook::<&dyn SelectHandle>(handle);
427                         if sel == Selected::Operation(oper) {
428                             return Some(*i);
429                         }
430                     }
431                 }
432             }
433 
434             None
435         });
436 
437         // Return if an operation became ready.
438         if res.is_some() {
439             return res;
440         }
441     }
442 }
443 
444 /// Attempts to select one of the operations without blocking.
445 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>446 pub fn try_select<'a>(
447     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448 ) -> Result<SelectedOperation<'a>, TrySelectError> {
449     match run_select(handles, Timeout::Now) {
450         None => Err(TrySelectError),
451         Some((token, index, ptr)) => Ok(SelectedOperation {
452             token,
453             index,
454             ptr,
455             _marker: PhantomData,
456         }),
457     }
458 }
459 
460 /// Blocks until one of the operations becomes ready and selects it.
461 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>462 pub fn select<'a>(
463     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464 ) -> SelectedOperation<'a> {
465     if handles.is_empty() {
466         panic!("no operations have been added to `Select`");
467     }
468 
469     let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470     SelectedOperation {
471         token,
472         index,
473         ptr,
474         _marker: PhantomData,
475     }
476 }
477 
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
479 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>480 pub fn select_timeout<'a>(
481     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482     timeout: Duration,
483 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
484     select_deadline(handles, Instant::now() + timeout)
485 }
486 
487 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
488 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>489 pub(crate) fn select_deadline<'a>(
490     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
491     deadline: Instant,
492 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
493     match run_select(handles, Timeout::At(deadline)) {
494         None => Err(SelectTimeoutError),
495         Some((token, index, ptr)) => Ok(SelectedOperation {
496             token,
497             index,
498             ptr,
499             _marker: PhantomData,
500         }),
501     }
502 }
503 
504 /// Selects from a set of channel operations.
505 ///
506 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508 /// among them is selected.
509 ///
510 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511 /// when it will simply return an error because the channel is disconnected.
512 ///
513 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514 /// dynamically created list of channel operations.
515 ///
516 /// Once a list of operations has been built with `Select`, there are two different ways of
517 /// proceeding:
518 ///
519 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520 ///   the returned selected operation has already begun and **must** be completed. If we don't
521 ///   complete it, a panic will occur.
522 ///
523 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524 ///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525 ///   possible for another thread to make the operation not ready just before we try executing it,
526 ///   so it's wise to use a retry loop. However, note that these methods might return with success
527 ///   spuriously, so it's a good idea to always double check if the operation is really ready.
528 ///
529 /// # Examples
530 ///
531 /// Use [`select`] to receive a message from a list of receivers:
532 ///
533 /// ```
534 /// use crossbeam_channel::{Receiver, RecvError, Select};
535 ///
536 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537 ///     // Build a list of operations.
538 ///     let mut sel = Select::new();
539 ///     for r in rs {
540 ///         sel.recv(r);
541 ///     }
542 ///
543 ///     // Complete the selected operation.
544 ///     let oper = sel.select();
545 ///     let index = oper.index();
546 ///     oper.recv(&rs[index])
547 /// }
548 /// ```
549 ///
550 /// Use [`ready`] to receive a message from a list of receivers:
551 ///
552 /// ```
553 /// use crossbeam_channel::{Receiver, RecvError, Select};
554 ///
555 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556 ///     // Build a list of operations.
557 ///     let mut sel = Select::new();
558 ///     for r in rs {
559 ///         sel.recv(r);
560 ///     }
561 ///
562 ///     loop {
563 ///         // Wait until a receive operation becomes ready and try executing it.
564 ///         let index = sel.ready();
565 ///         let res = rs[index].try_recv();
566 ///
567 ///         // If the operation turns out not to be ready, retry.
568 ///         if let Err(e) = res {
569 ///             if e.is_empty() {
570 ///                 continue;
571 ///             }
572 ///         }
573 ///
574 ///         // Success!
575 ///         return res.map_err(|_| RecvError);
576 ///     }
577 /// }
578 /// ```
579 ///
580 /// [`try_select`]: Select::try_select
581 /// [`select`]: Select::select
582 /// [`select_timeout`]: Select::select_timeout
583 /// [`try_ready`]: Select::try_ready
584 /// [`ready`]: Select::ready
585 /// [`ready_timeout`]: Select::ready_timeout
586 pub struct Select<'a> {
587     /// A list of senders and receivers participating in selection.
588     handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
589 
590     /// The next index to assign to an operation.
591     next_index: usize,
592 }
593 
594 unsafe impl Send for Select<'_> {}
595 unsafe impl Sync for Select<'_> {}
596 
597 impl<'a> Select<'a> {
598     /// Creates an empty list of channel operations for selection.
599     ///
600     /// # Examples
601     ///
602     /// ```
603     /// use crossbeam_channel::Select;
604     ///
605     /// let mut sel = Select::new();
606     ///
607     /// // The list of operations is empty, which means no operation can be selected.
608     /// assert!(sel.try_select().is_err());
609     /// ```
new() -> Select<'a>610     pub fn new() -> Select<'a> {
611         Select {
612             handles: Vec::with_capacity(4),
613             next_index: 0,
614         }
615     }
616 
617     /// Adds a send operation.
618     ///
619     /// Returns the index of the added operation.
620     ///
621     /// # Examples
622     ///
623     /// ```
624     /// use crossbeam_channel::{unbounded, Select};
625     ///
626     /// let (s, r) = unbounded::<i32>();
627     ///
628     /// let mut sel = Select::new();
629     /// let index = sel.send(&s);
630     /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize631     pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
632         let i = self.next_index;
633         let ptr = s as *const Sender<_> as *const u8;
634         self.handles.push((s, i, ptr));
635         self.next_index += 1;
636         i
637     }
638 
639     /// Adds a receive operation.
640     ///
641     /// Returns the index of the added operation.
642     ///
643     /// # Examples
644     ///
645     /// ```
646     /// use crossbeam_channel::{unbounded, Select};
647     ///
648     /// let (s, r) = unbounded::<i32>();
649     ///
650     /// let mut sel = Select::new();
651     /// let index = sel.recv(&r);
652     /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize653     pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
654         let i = self.next_index;
655         let ptr = r as *const Receiver<_> as *const u8;
656         self.handles.push((r, i, ptr));
657         self.next_index += 1;
658         i
659     }
660 
661     /// Removes a previously added operation.
662     ///
663     /// This is useful when an operation is selected because the channel got disconnected and we
664     /// want to try again to select a different operation instead.
665     ///
666     /// If new operations are added after removing some, the indices of removed operations will not
667     /// be reused.
668     ///
669     /// # Panics
670     ///
671     /// An attempt to remove a non-existing or already removed operation will panic.
672     ///
673     /// # Examples
674     ///
675     /// ```
676     /// use crossbeam_channel::{unbounded, Select};
677     ///
678     /// let (s1, r1) = unbounded::<i32>();
679     /// let (_, r2) = unbounded::<i32>();
680     ///
681     /// let mut sel = Select::new();
682     /// let oper1 = sel.recv(&r1);
683     /// let oper2 = sel.recv(&r2);
684     ///
685     /// // Both operations are initially ready, so a random one will be executed.
686     /// let oper = sel.select();
687     /// assert_eq!(oper.index(), oper2);
688     /// assert!(oper.recv(&r2).is_err());
689     /// sel.remove(oper2);
690     ///
691     /// s1.send(10).unwrap();
692     ///
693     /// let oper = sel.select();
694     /// assert_eq!(oper.index(), oper1);
695     /// assert_eq!(oper.recv(&r1), Ok(10));
696     /// ```
remove(&mut self, index: usize)697     pub fn remove(&mut self, index: usize) {
698         assert!(
699             index < self.next_index,
700             "index out of bounds; {} >= {}",
701             index,
702             self.next_index,
703         );
704 
705         let i = self
706             .handles
707             .iter()
708             .enumerate()
709             .find(|(_, (_, i, _))| *i == index)
710             .expect("no operation with this index")
711             .0;
712 
713         self.handles.swap_remove(i);
714     }
715 
716     /// Attempts to select one of the operations without blocking.
717     ///
718     /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719     /// the same time, a random one among them is selected. If none of the operations are ready, an
720     /// error is returned.
721     ///
722     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723     /// even when it will simply return an error because the channel is disconnected.
724     ///
725     /// The selected operation must be completed with [`SelectedOperation::send`]
726     /// or [`SelectedOperation::recv`].
727     ///
728     /// # Examples
729     ///
730     /// ```
731     /// use crossbeam_channel::{unbounded, Select};
732     ///
733     /// let (s1, r1) = unbounded();
734     /// let (s2, r2) = unbounded();
735     ///
736     /// s1.send(10).unwrap();
737     /// s2.send(20).unwrap();
738     ///
739     /// let mut sel = Select::new();
740     /// let oper1 = sel.recv(&r1);
741     /// let oper2 = sel.recv(&r2);
742     ///
743     /// // Both operations are initially ready, so a random one will be executed.
744     /// let oper = sel.try_select();
745     /// match oper {
746     ///     Err(_) => panic!("both operations should be ready"),
747     ///     Ok(oper) => match oper.index() {
748     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750     ///         _ => unreachable!(),
751     ///     }
752     /// }
753     /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>754     pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
755         try_select(&mut self.handles)
756     }
757 
758     /// Blocks until one of the operations becomes ready and selects it.
759     ///
760     /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761     /// ready at the same time, a random one among them is selected.
762     ///
763     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764     /// even when it will simply return an error because the channel is disconnected.
765     ///
766     /// The selected operation must be completed with [`SelectedOperation::send`]
767     /// or [`SelectedOperation::recv`].
768     ///
769     /// # Panics
770     ///
771     /// Panics if no operations have been added to `Select`.
772     ///
773     /// # Examples
774     ///
775     /// ```
776     /// use std::thread;
777     /// use std::time::Duration;
778     /// use crossbeam_channel::{unbounded, Select};
779     ///
780     /// let (s1, r1) = unbounded();
781     /// let (s2, r2) = unbounded();
782     ///
783     /// thread::spawn(move || {
784     ///     thread::sleep(Duration::from_secs(1));
785     ///     s1.send(10).unwrap();
786     /// });
787     /// thread::spawn(move || s2.send(20).unwrap());
788     ///
789     /// let mut sel = Select::new();
790     /// let oper1 = sel.recv(&r1);
791     /// let oper2 = sel.recv(&r2);
792     ///
793     /// // The second operation will be selected because it becomes ready first.
794     /// let oper = sel.select();
795     /// match oper.index() {
796     ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797     ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798     ///     _ => unreachable!(),
799     /// }
800     /// ```
select(&mut self) -> SelectedOperation<'a>801     pub fn select(&mut self) -> SelectedOperation<'a> {
802         select(&mut self.handles)
803     }
804 
805     /// Blocks for a limited time until one of the operations becomes ready and selects it.
806     ///
807     /// If an operation becomes ready, it is selected and returned. If multiple operations are
808     /// ready at the same time, a random one among them is selected. If none of the operations
809     /// become ready for the specified duration, an error is returned.
810     ///
811     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812     /// even when it will simply return an error because the channel is disconnected.
813     ///
814     /// The selected operation must be completed with [`SelectedOperation::send`]
815     /// or [`SelectedOperation::recv`].
816     ///
817     /// # Examples
818     ///
819     /// ```
820     /// use std::thread;
821     /// use std::time::Duration;
822     /// use crossbeam_channel::{unbounded, Select};
823     ///
824     /// let (s1, r1) = unbounded();
825     /// let (s2, r2) = unbounded();
826     ///
827     /// thread::spawn(move || {
828     ///     thread::sleep(Duration::from_secs(1));
829     ///     s1.send(10).unwrap();
830     /// });
831     /// thread::spawn(move || s2.send(20).unwrap());
832     ///
833     /// let mut sel = Select::new();
834     /// let oper1 = sel.recv(&r1);
835     /// let oper2 = sel.recv(&r2);
836     ///
837     /// // The second operation will be selected because it becomes ready first.
838     /// let oper = sel.select_timeout(Duration::from_millis(500));
839     /// match oper {
840     ///     Err(_) => panic!("should not have timed out"),
841     ///     Ok(oper) => match oper.index() {
842     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844     ///         _ => unreachable!(),
845     ///     }
846     /// }
847     /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>848     pub fn select_timeout(
849         &mut self,
850         timeout: Duration,
851     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
852         select_timeout(&mut self.handles, timeout)
853     }
854 
855     /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
856     ///
857     /// If an operation becomes ready, it is selected and returned. If multiple operations are
858     /// ready at the same time, a random one among them is selected. If none of the operations
859     /// become ready before the given deadline, an error is returned.
860     ///
861     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862     /// even when it will simply return an error because the channel is disconnected.
863     ///
864     /// The selected operation must be completed with [`SelectedOperation::send`]
865     /// or [`SelectedOperation::recv`].
866     ///
867     /// # Examples
868     ///
869     /// ```
870     /// use std::thread;
871     /// use std::time::{Instant, Duration};
872     /// use crossbeam_channel::{unbounded, Select};
873     ///
874     /// let (s1, r1) = unbounded();
875     /// let (s2, r2) = unbounded();
876     ///
877     /// thread::spawn(move || {
878     ///     thread::sleep(Duration::from_secs(1));
879     ///     s1.send(10).unwrap();
880     /// });
881     /// thread::spawn(move || s2.send(20).unwrap());
882     ///
883     /// let mut sel = Select::new();
884     /// let oper1 = sel.recv(&r1);
885     /// let oper2 = sel.recv(&r2);
886     ///
887     /// let deadline = Instant::now() + Duration::from_millis(500);
888     ///
889     /// // The second operation will be selected because it becomes ready first.
890     /// let oper = sel.select_deadline(deadline);
891     /// match oper {
892     ///     Err(_) => panic!("should not have timed out"),
893     ///     Ok(oper) => match oper.index() {
894     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
895     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
896     ///         _ => unreachable!(),
897     ///     }
898     /// }
899     /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>900     pub fn select_deadline(
901         &mut self,
902         deadline: Instant,
903     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
904         select_deadline(&mut self.handles, deadline)
905     }
906 
907     /// Attempts to find a ready operation without blocking.
908     ///
909     /// If an operation is ready, its index is returned. If multiple operations are ready at the
910     /// same time, a random one among them is chosen. If none of the operations are ready, an error
911     /// is returned.
912     ///
913     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
914     /// even when it will simply return an error because the channel is disconnected.
915     ///
916     /// Note that this method might return with success spuriously, so it's a good idea to always
917     /// double check if the operation is really ready.
918     ///
919     /// # Examples
920     ///
921     /// ```
922     /// use crossbeam_channel::{unbounded, Select};
923     ///
924     /// let (s1, r1) = unbounded();
925     /// let (s2, r2) = unbounded();
926     ///
927     /// s1.send(10).unwrap();
928     /// s2.send(20).unwrap();
929     ///
930     /// let mut sel = Select::new();
931     /// let oper1 = sel.recv(&r1);
932     /// let oper2 = sel.recv(&r2);
933     ///
934     /// // Both operations are initially ready, so a random one will be chosen.
935     /// match sel.try_ready() {
936     ///     Err(_) => panic!("both operations should be ready"),
937     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
938     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
939     ///     Ok(_) => unreachable!(),
940     /// }
941     /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>942     pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
943         match run_ready(&mut self.handles, Timeout::Now) {
944             None => Err(TryReadyError),
945             Some(index) => Ok(index),
946         }
947     }
948 
949     /// Blocks until one of the operations becomes ready.
950     ///
951     /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
952     /// the same time, a random one among them is chosen.
953     ///
954     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
955     /// even when it will simply return an error because the channel is disconnected.
956     ///
957     /// Note that this method might return with success spuriously, so it's a good idea to always
958     /// double check if the operation is really ready.
959     ///
960     /// # Panics
961     ///
962     /// Panics if no operations have been added to `Select`.
963     ///
964     /// # Examples
965     ///
966     /// ```
967     /// use std::thread;
968     /// use std::time::Duration;
969     /// use crossbeam_channel::{unbounded, Select};
970     ///
971     /// let (s1, r1) = unbounded();
972     /// let (s2, r2) = unbounded();
973     ///
974     /// thread::spawn(move || {
975     ///     thread::sleep(Duration::from_secs(1));
976     ///     s1.send(10).unwrap();
977     /// });
978     /// thread::spawn(move || s2.send(20).unwrap());
979     ///
980     /// let mut sel = Select::new();
981     /// let oper1 = sel.recv(&r1);
982     /// let oper2 = sel.recv(&r2);
983     ///
984     /// // The second operation will be selected because it becomes ready first.
985     /// match sel.ready() {
986     ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
987     ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
988     ///     _ => unreachable!(),
989     /// }
990     /// ```
ready(&mut self) -> usize991     pub fn ready(&mut self) -> usize {
992         if self.handles.is_empty() {
993             panic!("no operations have been added to `Select`");
994         }
995 
996         run_ready(&mut self.handles, Timeout::Never).unwrap()
997     }
998 
999     /// Blocks for a limited time until one of the operations becomes ready.
1000     ///
1001     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1002     /// the same time, a random one among them is chosen. If none of the operations become ready
1003     /// for the specified duration, an error is returned.
1004     ///
1005     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1006     /// even when it will simply return an error because the channel is disconnected.
1007     ///
1008     /// Note that this method might return with success spuriously, so it's a good idea to double
1009     /// check if the operation is really ready.
1010     ///
1011     /// # Examples
1012     ///
1013     /// ```
1014     /// use std::thread;
1015     /// use std::time::Duration;
1016     /// use crossbeam_channel::{unbounded, Select};
1017     ///
1018     /// let (s1, r1) = unbounded();
1019     /// let (s2, r2) = unbounded();
1020     ///
1021     /// thread::spawn(move || {
1022     ///     thread::sleep(Duration::from_secs(1));
1023     ///     s1.send(10).unwrap();
1024     /// });
1025     /// thread::spawn(move || s2.send(20).unwrap());
1026     ///
1027     /// let mut sel = Select::new();
1028     /// let oper1 = sel.recv(&r1);
1029     /// let oper2 = sel.recv(&r2);
1030     ///
1031     /// // The second operation will be selected because it becomes ready first.
1032     /// match sel.ready_timeout(Duration::from_millis(500)) {
1033     ///     Err(_) => panic!("should not have timed out"),
1034     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1035     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1036     ///     Ok(_) => unreachable!(),
1037     /// }
1038     /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1039     pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1040         self.ready_deadline(Instant::now() + timeout)
1041     }
1042 
1043     /// Blocks until a given deadline, or until one of the operations becomes ready.
1044     ///
1045     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1046     /// the same time, a random one among them is chosen. If none of the operations become ready
1047     /// before the deadline, an error is returned.
1048     ///
1049     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1050     /// even when it will simply return an error because the channel is disconnected.
1051     ///
1052     /// Note that this method might return with success spuriously, so it's a good idea to double
1053     /// check if the operation is really ready.
1054     ///
1055     /// # Examples
1056     ///
1057     /// ```
1058     /// use std::thread;
1059     /// use std::time::{Duration, Instant};
1060     /// use crossbeam_channel::{unbounded, Select};
1061     ///
1062     /// let deadline = Instant::now() + Duration::from_millis(500);
1063     ///
1064     /// let (s1, r1) = unbounded();
1065     /// let (s2, r2) = unbounded();
1066     ///
1067     /// thread::spawn(move || {
1068     ///     thread::sleep(Duration::from_secs(1));
1069     ///     s1.send(10).unwrap();
1070     /// });
1071     /// thread::spawn(move || s2.send(20).unwrap());
1072     ///
1073     /// let mut sel = Select::new();
1074     /// let oper1 = sel.recv(&r1);
1075     /// let oper2 = sel.recv(&r2);
1076     ///
1077     /// // The second operation will be selected because it becomes ready first.
1078     /// match sel.ready_deadline(deadline) {
1079     ///     Err(_) => panic!("should not have timed out"),
1080     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1081     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1082     ///     Ok(_) => unreachable!(),
1083     /// }
1084     /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1085     pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1086         match run_ready(&mut self.handles, Timeout::At(deadline)) {
1087             None => Err(ReadyTimeoutError),
1088             Some(index) => Ok(index),
1089         }
1090     }
1091 }
1092 
1093 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1094     fn clone(&self) -> Select<'a> {
1095         Select {
1096             handles: self.handles.clone(),
1097             next_index: self.next_index,
1098         }
1099     }
1100 }
1101 
1102 impl<'a> Default for Select<'a> {
default() -> Select<'a>1103     fn default() -> Select<'a> {
1104         Select::new()
1105     }
1106 }
1107 
1108 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1109     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1110         f.pad("Select { .. }")
1111     }
1112 }
1113 
1114 /// A selected operation that needs to be completed.
1115 ///
1116 /// To complete the operation, call [`send`] or [`recv`].
1117 ///
1118 /// # Panics
1119 ///
1120 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1121 /// `SelectedOperation` is dropped without completion, a panic occurs.
1122 ///
1123 /// [`send`]: SelectedOperation::send
1124 /// [`recv`]: SelectedOperation::recv
1125 #[must_use]
1126 pub struct SelectedOperation<'a> {
1127     /// Token needed to complete the operation.
1128     token: Token,
1129 
1130     /// The index of the selected operation.
1131     index: usize,
1132 
1133     /// The address of the selected `Sender` or `Receiver`.
1134     ptr: *const u8,
1135 
1136     /// Indicates that `Sender`s and `Receiver`s are borrowed.
1137     _marker: PhantomData<&'a ()>,
1138 }
1139 
1140 impl SelectedOperation<'_> {
1141     /// Returns the index of the selected operation.
1142     ///
1143     /// # Examples
1144     ///
1145     /// ```
1146     /// use crossbeam_channel::{bounded, Select};
1147     ///
1148     /// let (s1, r1) = bounded::<()>(0);
1149     /// let (s2, r2) = bounded::<()>(0);
1150     /// let (s3, r3) = bounded::<()>(1);
1151     ///
1152     /// let mut sel = Select::new();
1153     /// let oper1 = sel.send(&s1);
1154     /// let oper2 = sel.recv(&r2);
1155     /// let oper3 = sel.send(&s3);
1156     ///
1157     /// // Only the last operation is ready.
1158     /// let oper = sel.select();
1159     /// assert_eq!(oper.index(), 2);
1160     /// assert_eq!(oper.index(), oper3);
1161     ///
1162     /// // Complete the operation.
1163     /// oper.send(&s3, ()).unwrap();
1164     /// ```
index(&self) -> usize1165     pub fn index(&self) -> usize {
1166         self.index
1167     }
1168 
1169     /// Completes the send operation.
1170     ///
1171     /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1172     /// when the operation was added.
1173     ///
1174     /// # Panics
1175     ///
1176     /// Panics if an incorrect [`Sender`] reference is passed.
1177     ///
1178     /// # Examples
1179     ///
1180     /// ```
1181     /// use crossbeam_channel::{bounded, Select, SendError};
1182     ///
1183     /// let (s, r) = bounded::<i32>(0);
1184     /// drop(r);
1185     ///
1186     /// let mut sel = Select::new();
1187     /// let oper1 = sel.send(&s);
1188     ///
1189     /// let oper = sel.select();
1190     /// assert_eq!(oper.index(), oper1);
1191     /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1192     /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1193     pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1194         assert!(
1195             s as *const Sender<T> as *const u8 == self.ptr,
1196             "passed a sender that wasn't selected",
1197         );
1198         let res = unsafe { channel::write(s, &mut self.token, msg) };
1199         mem::forget(self);
1200         res.map_err(SendError)
1201     }
1202 
1203     /// Completes the receive operation.
1204     ///
1205     /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1206     /// when the operation was added.
1207     ///
1208     /// # Panics
1209     ///
1210     /// Panics if an incorrect [`Receiver`] reference is passed.
1211     ///
1212     /// # Examples
1213     ///
1214     /// ```
1215     /// use crossbeam_channel::{bounded, Select, RecvError};
1216     ///
1217     /// let (s, r) = bounded::<i32>(0);
1218     /// drop(s);
1219     ///
1220     /// let mut sel = Select::new();
1221     /// let oper1 = sel.recv(&r);
1222     ///
1223     /// let oper = sel.select();
1224     /// assert_eq!(oper.index(), oper1);
1225     /// assert_eq!(oper.recv(&r), Err(RecvError));
1226     /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1227     pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1228         assert!(
1229             r as *const Receiver<T> as *const u8 == self.ptr,
1230             "passed a receiver that wasn't selected",
1231         );
1232         let res = unsafe { channel::read(r, &mut self.token) };
1233         mem::forget(self);
1234         res.map_err(|_| RecvError)
1235     }
1236 }
1237 
1238 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1239     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1240         f.pad("SelectedOperation { .. }")
1241     }
1242 }
1243 
1244 impl Drop for SelectedOperation<'_> {
drop(&mut self)1245     fn drop(&mut self) {
1246         panic!("dropped `SelectedOperation` without completing the operation");
1247     }
1248 }
1249