1 //! Interface to the select mechanism.
2 
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::Backoff;
9 
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17 
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 // This is a private API that is used by the select macro.
23 #[derive(Debug, Default)]
24 pub struct Token {
25     pub(crate) at: flavors::at::AtToken,
26     pub(crate) array: flavors::array::ArrayToken,
27     pub(crate) list: flavors::list::ListToken,
28     #[allow(dead_code)]
29     pub(crate) never: flavors::never::NeverToken,
30     pub(crate) tick: flavors::tick::TickToken,
31     pub(crate) zero: flavors::zero::ZeroToken,
32 }
33 
34 /// Identifier associated with an operation by a specific thread on a specific channel.
35 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36 pub struct Operation(usize);
37 
38 impl Operation {
39     /// Creates an operation identifier from a mutable reference.
40     ///
41     /// This function essentially just turns the address of the reference into a number. The
42     /// reference should point to a variable that is specific to the thread and the operation,
43     /// and is alive for the entire duration of select or blocking operation.
44     #[inline]
hook<T>(r: &mut T) -> Operation45     pub fn hook<T>(r: &mut T) -> Operation {
46         let val = r as *mut T as usize;
47         // Make sure that the pointer address doesn't equal the numerical representation of
48         // `Selected::{Waiting, Aborted, Disconnected}`.
49         assert!(val > 2);
50         Operation(val)
51     }
52 }
53 
54 /// Current state of a select or a blocking operation.
55 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56 pub enum Selected {
57     /// Still waiting for an operation.
58     Waiting,
59 
60     /// The attempt to block the current thread has been aborted.
61     Aborted,
62 
63     /// An operation became ready because a channel is disconnected.
64     Disconnected,
65 
66     /// An operation became ready because a message can be sent or received.
67     Operation(Operation),
68 }
69 
70 impl From<usize> for Selected {
71     #[inline]
from(val: usize) -> Selected72     fn from(val: usize) -> Selected {
73         match val {
74             0 => Selected::Waiting,
75             1 => Selected::Aborted,
76             2 => Selected::Disconnected,
77             oper => Selected::Operation(Operation(oper)),
78         }
79     }
80 }
81 
82 impl Into<usize> for Selected {
83     #[inline]
into(self) -> usize84     fn into(self) -> usize {
85         match self {
86             Selected::Waiting => 0,
87             Selected::Aborted => 1,
88             Selected::Disconnected => 2,
89             Selected::Operation(Operation(val)) => val,
90         }
91     }
92 }
93 
94 /// A receiver or a sender that can participate in select.
95 ///
96 /// This is a handle that assists select in executing an operation, registration, deciding on the
97 /// appropriate deadline for blocking, etc.
98 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
99 pub trait SelectHandle {
100     /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool101     fn try_select(&self, token: &mut Token) -> bool;
102 
103     /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>104     fn deadline(&self) -> Option<Instant>;
105 
106     /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool107     fn register(&self, oper: Operation, cx: &Context) -> bool;
108 
109     /// Unregisters an operation for execution.
unregister(&self, oper: Operation)110     fn unregister(&self, oper: Operation);
111 
112     /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool113     fn accept(&self, token: &mut Token, cx: &Context) -> bool;
114 
115     /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool116     fn is_ready(&self) -> bool;
117 
118     /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool119     fn watch(&self, oper: Operation, cx: &Context) -> bool;
120 
121     /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)122     fn unwatch(&self, oper: Operation);
123 }
124 
125 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool126     fn try_select(&self, token: &mut Token) -> bool {
127         (**self).try_select(token)
128     }
129 
deadline(&self) -> Option<Instant>130     fn deadline(&self) -> Option<Instant> {
131         (**self).deadline()
132     }
133 
register(&self, oper: Operation, cx: &Context) -> bool134     fn register(&self, oper: Operation, cx: &Context) -> bool {
135         (**self).register(oper, cx)
136     }
137 
unregister(&self, oper: Operation)138     fn unregister(&self, oper: Operation) {
139         (**self).unregister(oper);
140     }
141 
accept(&self, token: &mut Token, cx: &Context) -> bool142     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
143         (**self).accept(token, cx)
144     }
145 
is_ready(&self) -> bool146     fn is_ready(&self) -> bool {
147         (**self).is_ready()
148     }
149 
watch(&self, oper: Operation, cx: &Context) -> bool150     fn watch(&self, oper: Operation, cx: &Context) -> bool {
151         (**self).watch(oper, cx)
152     }
153 
unwatch(&self, oper: Operation)154     fn unwatch(&self, oper: Operation) {
155         (**self).unwatch(oper)
156     }
157 }
158 
159 /// Determines when a select operation should time out.
160 #[derive(Clone, Copy, Eq, PartialEq)]
161 enum Timeout {
162     /// No blocking.
163     Now,
164 
165     /// Block forever.
166     Never,
167 
168     /// Time out after the time instant.
169     At(Instant),
170 }
171 
172 /// Runs until one of the operations is selected, potentially blocking the current thread.
173 ///
174 /// Successful receive operations will have to be followed up by `channel::read()` and successful
175 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>176 fn run_select(
177     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
178     timeout: Timeout,
179 ) -> Option<(Token, usize, *const u8)> {
180     if handles.is_empty() {
181         // Wait until the timeout and return.
182         match timeout {
183             Timeout::Now => return None,
184             Timeout::Never => {
185                 utils::sleep_until(None);
186                 unreachable!();
187             }
188             Timeout::At(when) => {
189                 utils::sleep_until(Some(when));
190                 return None;
191             }
192         }
193     }
194 
195     // Shuffle the operations for fairness.
196     utils::shuffle(handles);
197 
198     // Create a token, which serves as a temporary variable that gets initialized in this function
199     // and is later used by a call to `channel::read()` or `channel::write()` that completes the
200     // selected operation.
201     let mut token = Token::default();
202 
203     // Try selecting one of the operations without blocking.
204     for &(handle, i, ptr) in handles.iter() {
205         if handle.try_select(&mut token) {
206             return Some((token, i, ptr));
207         }
208     }
209 
210     loop {
211         // Prepare for blocking.
212         let res = Context::with(|cx| {
213             let mut sel = Selected::Waiting;
214             let mut registered_count = 0;
215             let mut index_ready = None;
216 
217             if let Timeout::Now = timeout {
218                 cx.try_select(Selected::Aborted).unwrap();
219             }
220 
221             // Register all operations.
222             for (handle, i, _) in handles.iter_mut() {
223                 registered_count += 1;
224 
225                 // If registration returns `false`, that means the operation has just become ready.
226                 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
227                     // Try aborting select.
228                     sel = match cx.try_select(Selected::Aborted) {
229                         Ok(()) => {
230                             index_ready = Some(*i);
231                             Selected::Aborted
232                         }
233                         Err(s) => s,
234                     };
235                     break;
236                 }
237 
238                 // If another thread has already selected one of the operations, stop registration.
239                 sel = cx.selected();
240                 if sel != Selected::Waiting {
241                     break;
242                 }
243             }
244 
245             if sel == Selected::Waiting {
246                 // Check with each operation for how long we're allowed to block, and compute the
247                 // earliest deadline.
248                 let mut deadline: Option<Instant> = match timeout {
249                     Timeout::Now => return None,
250                     Timeout::Never => None,
251                     Timeout::At(when) => Some(when),
252                 };
253                 for &(handle, _, _) in handles.iter() {
254                     if let Some(x) = handle.deadline() {
255                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
256                     }
257                 }
258 
259                 // Block the current thread.
260                 sel = cx.wait_until(deadline);
261             }
262 
263             // Unregister all registered operations.
264             for (handle, _, _) in handles.iter_mut().take(registered_count) {
265                 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
266             }
267 
268             match sel {
269                 Selected::Waiting => unreachable!(),
270                 Selected::Aborted => {
271                     // If an operation became ready during registration, try selecting it.
272                     if let Some(index_ready) = index_ready {
273                         for &(handle, i, ptr) in handles.iter() {
274                             if i == index_ready && handle.try_select(&mut token) {
275                                 return Some((i, ptr));
276                             }
277                         }
278                     }
279                 }
280                 Selected::Disconnected => {}
281                 Selected::Operation(_) => {
282                     // Find the selected operation.
283                     for (handle, i, ptr) in handles.iter_mut() {
284                         // Is this the selected operation?
285                         if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
286                         {
287                             // Try selecting this operation.
288                             if handle.accept(&mut token, cx) {
289                                 return Some((*i, *ptr));
290                             }
291                         }
292                     }
293                 }
294             }
295 
296             None
297         });
298 
299         // Return if an operation was selected.
300         if let Some((i, ptr)) = res {
301             return Some((token, i, ptr));
302         }
303 
304         // Try selecting one of the operations without blocking.
305         for &(handle, i, ptr) in handles.iter() {
306             if handle.try_select(&mut token) {
307                 return Some((token, i, ptr));
308             }
309         }
310 
311         match timeout {
312             Timeout::Now => return None,
313             Timeout::Never => {}
314             Timeout::At(when) => {
315                 if Instant::now() >= when {
316                     return None;
317                 }
318             }
319         }
320     }
321 }
322 
323 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>324 fn run_ready(
325     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
326     timeout: Timeout,
327 ) -> Option<usize> {
328     if handles.is_empty() {
329         // Wait until the timeout and return.
330         match timeout {
331             Timeout::Now => return None,
332             Timeout::Never => {
333                 utils::sleep_until(None);
334                 unreachable!();
335             }
336             Timeout::At(when) => {
337                 utils::sleep_until(Some(when));
338                 return None;
339             }
340         }
341     }
342 
343     // Shuffle the operations for fairness.
344     utils::shuffle(handles);
345 
346     loop {
347         let backoff = Backoff::new();
348         loop {
349             // Check operations for readiness.
350             for &(handle, i, _) in handles.iter() {
351                 if handle.is_ready() {
352                     return Some(i);
353                 }
354             }
355 
356             if backoff.is_completed() {
357                 break;
358             } else {
359                 backoff.snooze();
360             }
361         }
362 
363         // Check for timeout.
364         match timeout {
365             Timeout::Now => return None,
366             Timeout::Never => {}
367             Timeout::At(when) => {
368                 if Instant::now() >= when {
369                     return None;
370                 }
371             }
372         }
373 
374         // Prepare for blocking.
375         let res = Context::with(|cx| {
376             let mut sel = Selected::Waiting;
377             let mut registered_count = 0;
378 
379             // Begin watching all operations.
380             for (handle, _, _) in handles.iter_mut() {
381                 registered_count += 1;
382                 let oper = Operation::hook::<&dyn SelectHandle>(handle);
383 
384                 // If registration returns `false`, that means the operation has just become ready.
385                 if handle.watch(oper, cx) {
386                     sel = match cx.try_select(Selected::Operation(oper)) {
387                         Ok(()) => Selected::Operation(oper),
388                         Err(s) => s,
389                     };
390                     break;
391                 }
392 
393                 // If another thread has already chosen one of the operations, stop registration.
394                 sel = cx.selected();
395                 if sel != Selected::Waiting {
396                     break;
397                 }
398             }
399 
400             if sel == Selected::Waiting {
401                 // Check with each operation for how long we're allowed to block, and compute the
402                 // earliest deadline.
403                 let mut deadline: Option<Instant> = match timeout {
404                     Timeout::Now => unreachable!(),
405                     Timeout::Never => None,
406                     Timeout::At(when) => Some(when),
407                 };
408                 for &(handle, _, _) in handles.iter() {
409                     if let Some(x) = handle.deadline() {
410                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
411                     }
412                 }
413 
414                 // Block the current thread.
415                 sel = cx.wait_until(deadline);
416             }
417 
418             // Unwatch all operations.
419             for (handle, _, _) in handles.iter_mut().take(registered_count) {
420                 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
421             }
422 
423             match sel {
424                 Selected::Waiting => unreachable!(),
425                 Selected::Aborted => {}
426                 Selected::Disconnected => {}
427                 Selected::Operation(_) => {
428                     for (handle, i, _) in handles.iter_mut() {
429                         let oper = Operation::hook::<&dyn SelectHandle>(handle);
430                         if sel == Selected::Operation(oper) {
431                             return Some(*i);
432                         }
433                     }
434                 }
435             }
436 
437             None
438         });
439 
440         // Return if an operation became ready.
441         if res.is_some() {
442             return res;
443         }
444     }
445 }
446 
447 /// Attempts to select one of the operations without blocking.
448 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
449 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>450 pub fn try_select<'a>(
451     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
452 ) -> Result<SelectedOperation<'a>, TrySelectError> {
453     match run_select(handles, Timeout::Now) {
454         None => Err(TrySelectError),
455         Some((token, index, ptr)) => Ok(SelectedOperation {
456             token,
457             index,
458             ptr,
459             _marker: PhantomData,
460         }),
461     }
462 }
463 
464 /// Blocks until one of the operations becomes ready and selects it.
465 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
466 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>467 pub fn select<'a>(
468     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
469 ) -> SelectedOperation<'a> {
470     if handles.is_empty() {
471         panic!("no operations have been added to `Select`");
472     }
473 
474     let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
475     SelectedOperation {
476         token,
477         index,
478         ptr,
479         _marker: PhantomData,
480     }
481 }
482 
483 /// Blocks for a limited time until one of the operations becomes ready and selects it.
484 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
485 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>486 pub fn select_timeout<'a>(
487     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
488     timeout: Duration,
489 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
490     select_deadline(handles, utils::convert_timeout_to_deadline(timeout))
491 }
492 
493 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
494 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>495 pub(crate) fn select_deadline<'a>(
496     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497     deadline: Instant,
498 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
499     match run_select(handles, Timeout::At(deadline)) {
500         None => Err(SelectTimeoutError),
501         Some((token, index, ptr)) => Ok(SelectedOperation {
502             token,
503             index,
504             ptr,
505             _marker: PhantomData,
506         }),
507     }
508 }
509 
510 /// Selects from a set of channel operations.
511 ///
512 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
513 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
514 /// among them is selected.
515 ///
516 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
517 /// when it will simply return an error because the channel is disconnected.
518 ///
519 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
520 /// dynamically created list of channel operations.
521 ///
522 /// Once a list of operations has been built with `Select`, there are two different ways of
523 /// proceeding:
524 ///
525 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
526 ///   the returned selected operation has already begun and **must** be completed. If we don't
527 ///   complete it, a panic will occur.
528 ///
529 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
530 ///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
531 ///   possible for another thread to make the operation not ready just before we try executing it,
532 ///   so it's wise to use a retry loop. However, note that these methods might return with success
533 ///   spuriously, so it's a good idea to always double check if the operation is really ready.
534 ///
535 /// # Examples
536 ///
537 /// Use [`select`] to receive a message from a list of receivers:
538 ///
539 /// ```
540 /// use crossbeam_channel::{Receiver, RecvError, Select};
541 ///
542 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
543 ///     // Build a list of operations.
544 ///     let mut sel = Select::new();
545 ///     for r in rs {
546 ///         sel.recv(r);
547 ///     }
548 ///
549 ///     // Complete the selected operation.
550 ///     let oper = sel.select();
551 ///     let index = oper.index();
552 ///     oper.recv(&rs[index])
553 /// }
554 /// ```
555 ///
556 /// Use [`ready`] to receive a message from a list of receivers:
557 ///
558 /// ```
559 /// use crossbeam_channel::{Receiver, RecvError, Select};
560 ///
561 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
562 ///     // Build a list of operations.
563 ///     let mut sel = Select::new();
564 ///     for r in rs {
565 ///         sel.recv(r);
566 ///     }
567 ///
568 ///     loop {
569 ///         // Wait until a receive operation becomes ready and try executing it.
570 ///         let index = sel.ready();
571 ///         let res = rs[index].try_recv();
572 ///
573 ///         // If the operation turns out not to be ready, retry.
574 ///         if let Err(e) = res {
575 ///             if e.is_empty() {
576 ///                 continue;
577 ///             }
578 ///         }
579 ///
580 ///         // Success!
581 ///         return res.map_err(|_| RecvError);
582 ///     }
583 /// }
584 /// ```
585 ///
586 /// [`try_select`]: Select::try_select
587 /// [`select`]: Select::select
588 /// [`select_timeout`]: Select::select_timeout
589 /// [`try_ready`]: Select::try_ready
590 /// [`ready`]: Select::ready
591 /// [`ready_timeout`]: Select::ready_timeout
592 pub struct Select<'a> {
593     /// A list of senders and receivers participating in selection.
594     handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
595 
596     /// The next index to assign to an operation.
597     next_index: usize,
598 }
599 
600 unsafe impl Send for Select<'_> {}
601 unsafe impl Sync for Select<'_> {}
602 
603 impl<'a> Select<'a> {
604     /// Creates an empty list of channel operations for selection.
605     ///
606     /// # Examples
607     ///
608     /// ```
609     /// use crossbeam_channel::Select;
610     ///
611     /// let mut sel = Select::new();
612     ///
613     /// // The list of operations is empty, which means no operation can be selected.
614     /// assert!(sel.try_select().is_err());
615     /// ```
new() -> Select<'a>616     pub fn new() -> Select<'a> {
617         Select {
618             handles: Vec::with_capacity(4),
619             next_index: 0,
620         }
621     }
622 
623     /// Adds a send operation.
624     ///
625     /// Returns the index of the added operation.
626     ///
627     /// # Examples
628     ///
629     /// ```
630     /// use crossbeam_channel::{unbounded, Select};
631     ///
632     /// let (s, r) = unbounded::<i32>();
633     ///
634     /// let mut sel = Select::new();
635     /// let index = sel.send(&s);
636     /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize637     pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
638         let i = self.next_index;
639         let ptr = s as *const Sender<_> as *const u8;
640         self.handles.push((s, i, ptr));
641         self.next_index += 1;
642         i
643     }
644 
645     /// Adds a receive operation.
646     ///
647     /// Returns the index of the added operation.
648     ///
649     /// # Examples
650     ///
651     /// ```
652     /// use crossbeam_channel::{unbounded, Select};
653     ///
654     /// let (s, r) = unbounded::<i32>();
655     ///
656     /// let mut sel = Select::new();
657     /// let index = sel.recv(&r);
658     /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize659     pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
660         let i = self.next_index;
661         let ptr = r as *const Receiver<_> as *const u8;
662         self.handles.push((r, i, ptr));
663         self.next_index += 1;
664         i
665     }
666 
667     /// Removes a previously added operation.
668     ///
669     /// This is useful when an operation is selected because the channel got disconnected and we
670     /// want to try again to select a different operation instead.
671     ///
672     /// If new operations are added after removing some, the indices of removed operations will not
673     /// be reused.
674     ///
675     /// # Panics
676     ///
677     /// An attempt to remove a non-existing or already removed operation will panic.
678     ///
679     /// # Examples
680     ///
681     /// ```
682     /// use crossbeam_channel::{unbounded, Select};
683     ///
684     /// let (s1, r1) = unbounded::<i32>();
685     /// let (_, r2) = unbounded::<i32>();
686     ///
687     /// let mut sel = Select::new();
688     /// let oper1 = sel.recv(&r1);
689     /// let oper2 = sel.recv(&r2);
690     ///
691     /// // Both operations are initially ready, so a random one will be executed.
692     /// let oper = sel.select();
693     /// assert_eq!(oper.index(), oper2);
694     /// assert!(oper.recv(&r2).is_err());
695     /// sel.remove(oper2);
696     ///
697     /// s1.send(10).unwrap();
698     ///
699     /// let oper = sel.select();
700     /// assert_eq!(oper.index(), oper1);
701     /// assert_eq!(oper.recv(&r1), Ok(10));
702     /// ```
remove(&mut self, index: usize)703     pub fn remove(&mut self, index: usize) {
704         assert!(
705             index < self.next_index,
706             "index out of bounds; {} >= {}",
707             index,
708             self.next_index,
709         );
710 
711         let i = self
712             .handles
713             .iter()
714             .enumerate()
715             .find(|(_, (_, i, _))| *i == index)
716             .expect("no operation with this index")
717             .0;
718 
719         self.handles.swap_remove(i);
720     }
721 
722     /// Attempts to select one of the operations without blocking.
723     ///
724     /// If an operation is ready, it is selected and returned. If multiple operations are ready at
725     /// the same time, a random one among them is selected. If none of the operations are ready, an
726     /// error is returned.
727     ///
728     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
729     /// even when it will simply return an error because the channel is disconnected.
730     ///
731     /// The selected operation must be completed with [`SelectedOperation::send`]
732     /// or [`SelectedOperation::recv`].
733     ///
734     /// # Examples
735     ///
736     /// ```
737     /// use crossbeam_channel::{unbounded, Select};
738     ///
739     /// let (s1, r1) = unbounded();
740     /// let (s2, r2) = unbounded();
741     ///
742     /// s1.send(10).unwrap();
743     /// s2.send(20).unwrap();
744     ///
745     /// let mut sel = Select::new();
746     /// let oper1 = sel.recv(&r1);
747     /// let oper2 = sel.recv(&r2);
748     ///
749     /// // Both operations are initially ready, so a random one will be executed.
750     /// let oper = sel.try_select();
751     /// match oper {
752     ///     Err(_) => panic!("both operations should be ready"),
753     ///     Ok(oper) => match oper.index() {
754     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
755     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
756     ///         _ => unreachable!(),
757     ///     }
758     /// }
759     /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>760     pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
761         try_select(&mut self.handles)
762     }
763 
764     /// Blocks until one of the operations becomes ready and selects it.
765     ///
766     /// Once an operation becomes ready, it is selected and returned. If multiple operations are
767     /// ready at the same time, a random one among them is selected.
768     ///
769     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
770     /// even when it will simply return an error because the channel is disconnected.
771     ///
772     /// The selected operation must be completed with [`SelectedOperation::send`]
773     /// or [`SelectedOperation::recv`].
774     ///
775     /// # Panics
776     ///
777     /// Panics if no operations have been added to `Select`.
778     ///
779     /// # Examples
780     ///
781     /// ```
782     /// use std::thread;
783     /// use std::time::Duration;
784     /// use crossbeam_channel::{unbounded, Select};
785     ///
786     /// let (s1, r1) = unbounded();
787     /// let (s2, r2) = unbounded();
788     ///
789     /// thread::spawn(move || {
790     ///     thread::sleep(Duration::from_secs(1));
791     ///     s1.send(10).unwrap();
792     /// });
793     /// thread::spawn(move || s2.send(20).unwrap());
794     ///
795     /// let mut sel = Select::new();
796     /// let oper1 = sel.recv(&r1);
797     /// let oper2 = sel.recv(&r2);
798     ///
799     /// // The second operation will be selected because it becomes ready first.
800     /// let oper = sel.select();
801     /// match oper.index() {
802     ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
803     ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
804     ///     _ => unreachable!(),
805     /// }
806     /// ```
select(&mut self) -> SelectedOperation<'a>807     pub fn select(&mut self) -> SelectedOperation<'a> {
808         select(&mut self.handles)
809     }
810 
811     /// Blocks for a limited time until one of the operations becomes ready and selects it.
812     ///
813     /// If an operation becomes ready, it is selected and returned. If multiple operations are
814     /// ready at the same time, a random one among them is selected. If none of the operations
815     /// become ready for the specified duration, an error is returned.
816     ///
817     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
818     /// even when it will simply return an error because the channel is disconnected.
819     ///
820     /// The selected operation must be completed with [`SelectedOperation::send`]
821     /// or [`SelectedOperation::recv`].
822     ///
823     /// # Examples
824     ///
825     /// ```
826     /// use std::thread;
827     /// use std::time::Duration;
828     /// use crossbeam_channel::{unbounded, Select};
829     ///
830     /// let (s1, r1) = unbounded();
831     /// let (s2, r2) = unbounded();
832     ///
833     /// thread::spawn(move || {
834     ///     thread::sleep(Duration::from_secs(1));
835     ///     s1.send(10).unwrap();
836     /// });
837     /// thread::spawn(move || s2.send(20).unwrap());
838     ///
839     /// let mut sel = Select::new();
840     /// let oper1 = sel.recv(&r1);
841     /// let oper2 = sel.recv(&r2);
842     ///
843     /// // The second operation will be selected because it becomes ready first.
844     /// let oper = sel.select_timeout(Duration::from_millis(500));
845     /// match oper {
846     ///     Err(_) => panic!("should not have timed out"),
847     ///     Ok(oper) => match oper.index() {
848     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
849     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
850     ///         _ => unreachable!(),
851     ///     }
852     /// }
853     /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>854     pub fn select_timeout(
855         &mut self,
856         timeout: Duration,
857     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
858         select_timeout(&mut self.handles, timeout)
859     }
860 
861     /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
862     ///
863     /// If an operation becomes ready, it is selected and returned. If multiple operations are
864     /// ready at the same time, a random one among them is selected. If none of the operations
865     /// become ready before the given deadline, an error is returned.
866     ///
867     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
868     /// even when it will simply return an error because the channel is disconnected.
869     ///
870     /// The selected operation must be completed with [`SelectedOperation::send`]
871     /// or [`SelectedOperation::recv`].
872     ///
873     /// # Examples
874     ///
875     /// ```
876     /// use std::thread;
877     /// use std::time::{Instant, Duration};
878     /// use crossbeam_channel::{unbounded, Select};
879     ///
880     /// let (s1, r1) = unbounded();
881     /// let (s2, r2) = unbounded();
882     ///
883     /// thread::spawn(move || {
884     ///     thread::sleep(Duration::from_secs(1));
885     ///     s1.send(10).unwrap();
886     /// });
887     /// thread::spawn(move || s2.send(20).unwrap());
888     ///
889     /// let mut sel = Select::new();
890     /// let oper1 = sel.recv(&r1);
891     /// let oper2 = sel.recv(&r2);
892     ///
893     /// let deadline = Instant::now() + Duration::from_millis(500);
894     ///
895     /// // The second operation will be selected because it becomes ready first.
896     /// let oper = sel.select_deadline(deadline);
897     /// match oper {
898     ///     Err(_) => panic!("should not have timed out"),
899     ///     Ok(oper) => match oper.index() {
900     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
901     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
902     ///         _ => unreachable!(),
903     ///     }
904     /// }
905     /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>906     pub fn select_deadline(
907         &mut self,
908         deadline: Instant,
909     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
910         select_deadline(&mut self.handles, deadline)
911     }
912 
913     /// Attempts to find a ready operation without blocking.
914     ///
915     /// If an operation is ready, its index is returned. If multiple operations are ready at the
916     /// same time, a random one among them is chosen. If none of the operations are ready, an error
917     /// is returned.
918     ///
919     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
920     /// even when it will simply return an error because the channel is disconnected.
921     ///
922     /// Note that this method might return with success spuriously, so it's a good idea to always
923     /// double check if the operation is really ready.
924     ///
925     /// # Examples
926     ///
927     /// ```
928     /// use crossbeam_channel::{unbounded, Select};
929     ///
930     /// let (s1, r1) = unbounded();
931     /// let (s2, r2) = unbounded();
932     ///
933     /// s1.send(10).unwrap();
934     /// s2.send(20).unwrap();
935     ///
936     /// let mut sel = Select::new();
937     /// let oper1 = sel.recv(&r1);
938     /// let oper2 = sel.recv(&r2);
939     ///
940     /// // Both operations are initially ready, so a random one will be chosen.
941     /// match sel.try_ready() {
942     ///     Err(_) => panic!("both operations should be ready"),
943     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
944     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
945     ///     Ok(_) => unreachable!(),
946     /// }
947     /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>948     pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
949         match run_ready(&mut self.handles, Timeout::Now) {
950             None => Err(TryReadyError),
951             Some(index) => Ok(index),
952         }
953     }
954 
955     /// Blocks until one of the operations becomes ready.
956     ///
957     /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
958     /// the same time, a random one among them is chosen.
959     ///
960     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
961     /// even when it will simply return an error because the channel is disconnected.
962     ///
963     /// Note that this method might return with success spuriously, so it's a good idea to always
964     /// double check if the operation is really ready.
965     ///
966     /// # Panics
967     ///
968     /// Panics if no operations have been added to `Select`.
969     ///
970     /// # Examples
971     ///
972     /// ```
973     /// use std::thread;
974     /// use std::time::Duration;
975     /// use crossbeam_channel::{unbounded, Select};
976     ///
977     /// let (s1, r1) = unbounded();
978     /// let (s2, r2) = unbounded();
979     ///
980     /// thread::spawn(move || {
981     ///     thread::sleep(Duration::from_secs(1));
982     ///     s1.send(10).unwrap();
983     /// });
984     /// thread::spawn(move || s2.send(20).unwrap());
985     ///
986     /// let mut sel = Select::new();
987     /// let oper1 = sel.recv(&r1);
988     /// let oper2 = sel.recv(&r2);
989     ///
990     /// // The second operation will be selected because it becomes ready first.
991     /// match sel.ready() {
992     ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
993     ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
994     ///     _ => unreachable!(),
995     /// }
996     /// ```
ready(&mut self) -> usize997     pub fn ready(&mut self) -> usize {
998         if self.handles.is_empty() {
999             panic!("no operations have been added to `Select`");
1000         }
1001 
1002         run_ready(&mut self.handles, Timeout::Never).unwrap()
1003     }
1004 
1005     /// Blocks for a limited time until one of the operations becomes ready.
1006     ///
1007     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1008     /// the same time, a random one among them is chosen. If none of the operations become ready
1009     /// for the specified duration, an error is returned.
1010     ///
1011     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1012     /// even when it will simply return an error because the channel is disconnected.
1013     ///
1014     /// Note that this method might return with success spuriously, so it's a good idea to double
1015     /// check if the operation is really ready.
1016     ///
1017     /// # Examples
1018     ///
1019     /// ```
1020     /// use std::thread;
1021     /// use std::time::Duration;
1022     /// use crossbeam_channel::{unbounded, Select};
1023     ///
1024     /// let (s1, r1) = unbounded();
1025     /// let (s2, r2) = unbounded();
1026     ///
1027     /// thread::spawn(move || {
1028     ///     thread::sleep(Duration::from_secs(1));
1029     ///     s1.send(10).unwrap();
1030     /// });
1031     /// thread::spawn(move || s2.send(20).unwrap());
1032     ///
1033     /// let mut sel = Select::new();
1034     /// let oper1 = sel.recv(&r1);
1035     /// let oper2 = sel.recv(&r2);
1036     ///
1037     /// // The second operation will be selected because it becomes ready first.
1038     /// match sel.ready_timeout(Duration::from_millis(500)) {
1039     ///     Err(_) => panic!("should not have timed out"),
1040     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1041     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1042     ///     Ok(_) => unreachable!(),
1043     /// }
1044     /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1045     pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1046         self.ready_deadline(utils::convert_timeout_to_deadline(timeout))
1047     }
1048 
1049     /// Blocks until a given deadline, or until one of the operations becomes ready.
1050     ///
1051     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1052     /// the same time, a random one among them is chosen. If none of the operations become ready
1053     /// before the deadline, an error is returned.
1054     ///
1055     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1056     /// even when it will simply return an error because the channel is disconnected.
1057     ///
1058     /// Note that this method might return with success spuriously, so it's a good idea to double
1059     /// check if the operation is really ready.
1060     ///
1061     /// # Examples
1062     ///
1063     /// ```
1064     /// use std::thread;
1065     /// use std::time::{Duration, Instant};
1066     /// use crossbeam_channel::{unbounded, Select};
1067     ///
1068     /// let deadline = Instant::now() + Duration::from_millis(500);
1069     ///
1070     /// let (s1, r1) = unbounded();
1071     /// let (s2, r2) = unbounded();
1072     ///
1073     /// thread::spawn(move || {
1074     ///     thread::sleep(Duration::from_secs(1));
1075     ///     s1.send(10).unwrap();
1076     /// });
1077     /// thread::spawn(move || s2.send(20).unwrap());
1078     ///
1079     /// let mut sel = Select::new();
1080     /// let oper1 = sel.recv(&r1);
1081     /// let oper2 = sel.recv(&r2);
1082     ///
1083     /// // The second operation will be selected because it becomes ready first.
1084     /// match sel.ready_deadline(deadline) {
1085     ///     Err(_) => panic!("should not have timed out"),
1086     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1087     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1088     ///     Ok(_) => unreachable!(),
1089     /// }
1090     /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1091     pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1092         match run_ready(&mut self.handles, Timeout::At(deadline)) {
1093             None => Err(ReadyTimeoutError),
1094             Some(index) => Ok(index),
1095         }
1096     }
1097 }
1098 
1099 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1100     fn clone(&self) -> Select<'a> {
1101         Select {
1102             handles: self.handles.clone(),
1103             next_index: self.next_index,
1104         }
1105     }
1106 }
1107 
1108 impl<'a> Default for Select<'a> {
default() -> Select<'a>1109     fn default() -> Select<'a> {
1110         Select::new()
1111     }
1112 }
1113 
1114 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1115     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116         f.pad("Select { .. }")
1117     }
1118 }
1119 
1120 /// A selected operation that needs to be completed.
1121 ///
1122 /// To complete the operation, call [`send`] or [`recv`].
1123 ///
1124 /// # Panics
1125 ///
1126 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1127 /// `SelectedOperation` is dropped without completion, a panic occurs.
1128 ///
1129 /// [`send`]: SelectedOperation::send
1130 /// [`recv`]: SelectedOperation::recv
1131 #[must_use]
1132 pub struct SelectedOperation<'a> {
1133     /// Token needed to complete the operation.
1134     token: Token,
1135 
1136     /// The index of the selected operation.
1137     index: usize,
1138 
1139     /// The address of the selected `Sender` or `Receiver`.
1140     ptr: *const u8,
1141 
1142     /// Indicates that `Sender`s and `Receiver`s are borrowed.
1143     _marker: PhantomData<&'a ()>,
1144 }
1145 
1146 impl SelectedOperation<'_> {
1147     /// Returns the index of the selected operation.
1148     ///
1149     /// # Examples
1150     ///
1151     /// ```
1152     /// use crossbeam_channel::{bounded, Select};
1153     ///
1154     /// let (s1, r1) = bounded::<()>(0);
1155     /// let (s2, r2) = bounded::<()>(0);
1156     /// let (s3, r3) = bounded::<()>(1);
1157     ///
1158     /// let mut sel = Select::new();
1159     /// let oper1 = sel.send(&s1);
1160     /// let oper2 = sel.recv(&r2);
1161     /// let oper3 = sel.send(&s3);
1162     ///
1163     /// // Only the last operation is ready.
1164     /// let oper = sel.select();
1165     /// assert_eq!(oper.index(), 2);
1166     /// assert_eq!(oper.index(), oper3);
1167     ///
1168     /// // Complete the operation.
1169     /// oper.send(&s3, ()).unwrap();
1170     /// ```
index(&self) -> usize1171     pub fn index(&self) -> usize {
1172         self.index
1173     }
1174 
1175     /// Completes the send operation.
1176     ///
1177     /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1178     /// when the operation was added.
1179     ///
1180     /// # Panics
1181     ///
1182     /// Panics if an incorrect [`Sender`] reference is passed.
1183     ///
1184     /// # Examples
1185     ///
1186     /// ```
1187     /// use crossbeam_channel::{bounded, Select, SendError};
1188     ///
1189     /// let (s, r) = bounded::<i32>(0);
1190     /// drop(r);
1191     ///
1192     /// let mut sel = Select::new();
1193     /// let oper1 = sel.send(&s);
1194     ///
1195     /// let oper = sel.select();
1196     /// assert_eq!(oper.index(), oper1);
1197     /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1198     /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1199     pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1200         assert!(
1201             s as *const Sender<T> as *const u8 == self.ptr,
1202             "passed a sender that wasn't selected",
1203         );
1204         let res = unsafe { channel::write(s, &mut self.token, msg) };
1205         mem::forget(self);
1206         res.map_err(SendError)
1207     }
1208 
1209     /// Completes the receive operation.
1210     ///
1211     /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1212     /// when the operation was added.
1213     ///
1214     /// # Panics
1215     ///
1216     /// Panics if an incorrect [`Receiver`] reference is passed.
1217     ///
1218     /// # Examples
1219     ///
1220     /// ```
1221     /// use crossbeam_channel::{bounded, Select, RecvError};
1222     ///
1223     /// let (s, r) = bounded::<i32>(0);
1224     /// drop(s);
1225     ///
1226     /// let mut sel = Select::new();
1227     /// let oper1 = sel.recv(&r);
1228     ///
1229     /// let oper = sel.select();
1230     /// assert_eq!(oper.index(), oper1);
1231     /// assert_eq!(oper.recv(&r), Err(RecvError));
1232     /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1233     pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1234         assert!(
1235             r as *const Receiver<T> as *const u8 == self.ptr,
1236             "passed a receiver that wasn't selected",
1237         );
1238         let res = unsafe { channel::read(r, &mut self.token) };
1239         mem::forget(self);
1240         res.map_err(|_| RecvError)
1241     }
1242 }
1243 
1244 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1245     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1246         f.pad("SelectedOperation { .. }")
1247     }
1248 }
1249 
1250 impl Drop for SelectedOperation<'_> {
drop(&mut self)1251     fn drop(&mut self) {
1252         panic!("dropped `SelectedOperation` without completing the operation");
1253     }
1254 }
1255