1 //! Interface to the select mechanism.
2
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_utils::Backoff;
9
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
23 pub struct Token {
24 pub at: flavors::at::AtToken,
25 pub array: flavors::array::ArrayToken,
26 pub list: flavors::list::ListToken,
27 pub never: flavors::never::NeverToken,
28 pub tick: flavors::tick::TickToken,
29 pub zero: flavors::zero::ZeroToken,
30 }
31
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
35
36 impl Operation {
37 /// Creates an operation identifier from a mutable reference.
38 ///
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
42 #[inline]
hook<T>(r: &mut T) -> Operation43 pub fn hook<T>(r: &mut T) -> Operation {
44 let val = r as *mut T as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
47 assert!(val > 2);
48 Operation(val)
49 }
50 }
51
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
54 pub enum Selected {
55 /// Still waiting for an operation.
56 Waiting,
57
58 /// The attempt to block the current thread has been aborted.
59 Aborted,
60
61 /// An operation became ready because a channel is disconnected.
62 Disconnected,
63
64 /// An operation became ready because a message can be sent or received.
65 Operation(Operation),
66 }
67
68 impl From<usize> for Selected {
69 #[inline]
from(val: usize) -> Selected70 fn from(val: usize) -> Selected {
71 match val {
72 0 => Selected::Waiting,
73 1 => Selected::Aborted,
74 2 => Selected::Disconnected,
75 oper => Selected::Operation(Operation(oper)),
76 }
77 }
78 }
79
80 impl Into<usize> for Selected {
81 #[inline]
into(self) -> usize82 fn into(self) -> usize {
83 match self {
84 Selected::Waiting => 0,
85 Selected::Aborted => 1,
86 Selected::Disconnected => 2,
87 Selected::Operation(Operation(val)) => val,
88 }
89 }
90 }
91
92 /// A receiver or a sender that can participate in select.
93 ///
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle {
97 /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool98 fn try_select(&self, token: &mut Token) -> bool;
99
100 /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>101 fn deadline(&self) -> Option<Instant>;
102
103 /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool104 fn register(&self, oper: Operation, cx: &Context) -> bool;
105
106 /// Unregisters an operation for execution.
unregister(&self, oper: Operation)107 fn unregister(&self, oper: Operation);
108
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool110 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111
112 /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool113 fn is_ready(&self) -> bool;
114
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool116 fn watch(&self, oper: Operation, cx: &Context) -> bool;
117
118 /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)119 fn unwatch(&self, oper: Operation);
120 }
121
122 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool123 fn try_select(&self, token: &mut Token) -> bool {
124 (**self).try_select(token)
125 }
126
deadline(&self) -> Option<Instant>127 fn deadline(&self) -> Option<Instant> {
128 (**self).deadline()
129 }
130
register(&self, oper: Operation, cx: &Context) -> bool131 fn register(&self, oper: Operation, cx: &Context) -> bool {
132 (**self).register(oper, cx)
133 }
134
unregister(&self, oper: Operation)135 fn unregister(&self, oper: Operation) {
136 (**self).unregister(oper);
137 }
138
accept(&self, token: &mut Token, cx: &Context) -> bool139 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140 (**self).accept(token, cx)
141 }
142
is_ready(&self) -> bool143 fn is_ready(&self) -> bool {
144 (**self).is_ready()
145 }
146
watch(&self, oper: Operation, cx: &Context) -> bool147 fn watch(&self, oper: Operation, cx: &Context) -> bool {
148 (**self).watch(oper, cx)
149 }
150
unwatch(&self, oper: Operation)151 fn unwatch(&self, oper: Operation) {
152 (**self).unwatch(oper)
153 }
154 }
155
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
158 enum Timeout {
159 /// No blocking.
160 Now,
161
162 /// Block forever.
163 Never,
164
165 /// Time out after the time instant.
166 At(Instant),
167 }
168
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
170 ///
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>173 fn run_select(
174 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175 timeout: Timeout,
176 ) -> Option<(Token, usize, *const u8)> {
177 if handles.is_empty() {
178 // Wait until the timeout and return.
179 match timeout {
180 Timeout::Now => return None,
181 Timeout::Never => {
182 utils::sleep_until(None);
183 unreachable!();
184 }
185 Timeout::At(when) => {
186 utils::sleep_until(Some(when));
187 return None;
188 }
189 }
190 }
191
192 // Shuffle the operations for fairness.
193 utils::shuffle(handles);
194
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token = Token::default();
199
200 // Try selecting one of the operations without blocking.
201 for &(handle, i, ptr) in handles.iter() {
202 if handle.try_select(&mut token) {
203 return Some((token, i, ptr));
204 }
205 }
206
207 loop {
208 // Prepare for blocking.
209 let res = Context::with(|cx| {
210 let mut sel = Selected::Waiting;
211 let mut registered_count = 0;
212 let mut index_ready = None;
213
214 if let Timeout::Now = timeout {
215 cx.try_select(Selected::Aborted).unwrap();
216 }
217
218 // Register all operations.
219 for (handle, i, _) in handles.iter_mut() {
220 registered_count += 1;
221
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224 // Try aborting select.
225 sel = match cx.try_select(Selected::Aborted) {
226 Ok(()) => {
227 index_ready = Some(*i);
228 Selected::Aborted
229 }
230 Err(s) => s,
231 };
232 break;
233 }
234
235 // If another thread has already selected one of the operations, stop registration.
236 sel = cx.selected();
237 if sel != Selected::Waiting {
238 break;
239 }
240 }
241
242 if sel == Selected::Waiting {
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline: Option<Instant> = match timeout {
246 Timeout::Now => return None,
247 Timeout::Never => None,
248 Timeout::At(when) => Some(when),
249 };
250 for &(handle, _, _) in handles.iter() {
251 if let Some(x) = handle.deadline() {
252 deadline = deadline.map(|y| x.min(y)).or(Some(x));
253 }
254 }
255
256 // Block the current thread.
257 sel = cx.wait_until(deadline);
258 }
259
260 // Unregister all registered operations.
261 for (handle, _, _) in handles.iter_mut().take(registered_count) {
262 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263 }
264
265 match sel {
266 Selected::Waiting => unreachable!(),
267 Selected::Aborted => {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready) = index_ready {
270 for &(handle, i, ptr) in handles.iter() {
271 if i == index_ready && handle.try_select(&mut token) {
272 return Some((i, ptr));
273 }
274 }
275 }
276 }
277 Selected::Disconnected => {}
278 Selected::Operation(_) => {
279 // Find the selected operation.
280 for (handle, i, ptr) in handles.iter_mut() {
281 // Is this the selected operation?
282 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283 {
284 // Try selecting this operation.
285 if handle.accept(&mut token, cx) {
286 return Some((*i, *ptr));
287 }
288 }
289 }
290 }
291 }
292
293 None
294 });
295
296 // Return if an operation was selected.
297 if let Some((i, ptr)) = res {
298 return Some((token, i, ptr));
299 }
300
301 // Try selecting one of the operations without blocking.
302 for &(handle, i, ptr) in handles.iter() {
303 if handle.try_select(&mut token) {
304 return Some((token, i, ptr));
305 }
306 }
307
308 match timeout {
309 Timeout::Now => return None,
310 Timeout::Never => {}
311 Timeout::At(when) => {
312 if Instant::now() >= when {
313 return None;
314 }
315 }
316 }
317 }
318 }
319
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>321 fn run_ready(
322 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323 timeout: Timeout,
324 ) -> Option<usize> {
325 if handles.is_empty() {
326 // Wait until the timeout and return.
327 match timeout {
328 Timeout::Now => return None,
329 Timeout::Never => {
330 utils::sleep_until(None);
331 unreachable!();
332 }
333 Timeout::At(when) => {
334 utils::sleep_until(Some(when));
335 return None;
336 }
337 }
338 }
339
340 // Shuffle the operations for fairness.
341 utils::shuffle(handles);
342
343 loop {
344 let backoff = Backoff::new();
345 loop {
346 // Check operations for readiness.
347 for &(handle, i, _) in handles.iter() {
348 if handle.is_ready() {
349 return Some(i);
350 }
351 }
352
353 if backoff.is_completed() {
354 break;
355 } else {
356 backoff.snooze();
357 }
358 }
359
360 // Check for timeout.
361 match timeout {
362 Timeout::Now => return None,
363 Timeout::Never => {}
364 Timeout::At(when) => {
365 if Instant::now() >= when {
366 return None;
367 }
368 }
369 }
370
371 // Prepare for blocking.
372 let res = Context::with(|cx| {
373 let mut sel = Selected::Waiting;
374 let mut registered_count = 0;
375
376 // Begin watching all operations.
377 for (handle, _, _) in handles.iter_mut() {
378 registered_count += 1;
379 let oper = Operation::hook::<&dyn SelectHandle>(handle);
380
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle.watch(oper, cx) {
383 sel = match cx.try_select(Selected::Operation(oper)) {
384 Ok(()) => Selected::Operation(oper),
385 Err(s) => s,
386 };
387 break;
388 }
389
390 // If another thread has already chosen one of the operations, stop registration.
391 sel = cx.selected();
392 if sel != Selected::Waiting {
393 break;
394 }
395 }
396
397 if sel == Selected::Waiting {
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline: Option<Instant> = match timeout {
401 Timeout::Now => unreachable!(),
402 Timeout::Never => None,
403 Timeout::At(when) => Some(when),
404 };
405 for &(handle, _, _) in handles.iter() {
406 if let Some(x) = handle.deadline() {
407 deadline = deadline.map(|y| x.min(y)).or(Some(x));
408 }
409 }
410
411 // Block the current thread.
412 sel = cx.wait_until(deadline);
413 }
414
415 // Unwatch all operations.
416 for (handle, _, _) in handles.iter_mut().take(registered_count) {
417 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418 }
419
420 match sel {
421 Selected::Waiting => unreachable!(),
422 Selected::Aborted => {}
423 Selected::Disconnected => {}
424 Selected::Operation(_) => {
425 for (handle, i, _) in handles.iter_mut() {
426 let oper = Operation::hook::<&dyn SelectHandle>(handle);
427 if sel == Selected::Operation(oper) {
428 return Some(*i);
429 }
430 }
431 }
432 }
433
434 None
435 });
436
437 // Return if an operation became ready.
438 if res.is_some() {
439 return res;
440 }
441 }
442 }
443
444 /// Attempts to select one of the operations without blocking.
445 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>446 pub fn try_select<'a>(
447 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448 ) -> Result<SelectedOperation<'a>, TrySelectError> {
449 match run_select(handles, Timeout::Now) {
450 None => Err(TrySelectError),
451 Some((token, index, ptr)) => Ok(SelectedOperation {
452 token,
453 index,
454 ptr,
455 _marker: PhantomData,
456 }),
457 }
458 }
459
460 /// Blocks until one of the operations becomes ready and selects it.
461 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>462 pub fn select<'a>(
463 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464 ) -> SelectedOperation<'a> {
465 if handles.is_empty() {
466 panic!("no operations have been added to `Select`");
467 }
468
469 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470 SelectedOperation {
471 token,
472 index,
473 ptr,
474 _marker: PhantomData,
475 }
476 }
477
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
479 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>480 pub fn select_timeout<'a>(
481 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482 timeout: Duration,
483 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
484 select_deadline(handles, Instant::now() + timeout)
485 }
486
487 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
488 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>489 pub(crate) fn select_deadline<'a>(
490 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
491 deadline: Instant,
492 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
493 match run_select(handles, Timeout::At(deadline)) {
494 None => Err(SelectTimeoutError),
495 Some((token, index, ptr)) => Ok(SelectedOperation {
496 token,
497 index,
498 ptr,
499 _marker: PhantomData,
500 }),
501 }
502 }
503
504 /// Selects from a set of channel operations.
505 ///
506 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508 /// among them is selected.
509 ///
510 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511 /// when it will simply return an error because the channel is disconnected.
512 ///
513 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514 /// dynamically created list of channel operations.
515 ///
516 /// Once a list of operations has been built with `Select`, there are two different ways of
517 /// proceeding:
518 ///
519 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520 /// the returned selected operation has already begun and **must** be completed. If we don't
521 /// complete it, a panic will occur.
522 ///
523 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525 /// possible for another thread to make the operation not ready just before we try executing it,
526 /// so it's wise to use a retry loop. However, note that these methods might return with success
527 /// spuriously, so it's a good idea to always double check if the operation is really ready.
528 ///
529 /// # Examples
530 ///
531 /// Use [`select`] to receive a message from a list of receivers:
532 ///
533 /// ```
534 /// use crossbeam_channel::{Receiver, RecvError, Select};
535 ///
536 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537 /// // Build a list of operations.
538 /// let mut sel = Select::new();
539 /// for r in rs {
540 /// sel.recv(r);
541 /// }
542 ///
543 /// // Complete the selected operation.
544 /// let oper = sel.select();
545 /// let index = oper.index();
546 /// oper.recv(&rs[index])
547 /// }
548 /// ```
549 ///
550 /// Use [`ready`] to receive a message from a list of receivers:
551 ///
552 /// ```
553 /// use crossbeam_channel::{Receiver, RecvError, Select};
554 ///
555 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556 /// // Build a list of operations.
557 /// let mut sel = Select::new();
558 /// for r in rs {
559 /// sel.recv(r);
560 /// }
561 ///
562 /// loop {
563 /// // Wait until a receive operation becomes ready and try executing it.
564 /// let index = sel.ready();
565 /// let res = rs[index].try_recv();
566 ///
567 /// // If the operation turns out not to be ready, retry.
568 /// if let Err(e) = res {
569 /// if e.is_empty() {
570 /// continue;
571 /// }
572 /// }
573 ///
574 /// // Success!
575 /// return res.map_err(|_| RecvError);
576 /// }
577 /// }
578 /// ```
579 ///
580 /// [`try_select`]: Select::try_select
581 /// [`select`]: Select::select
582 /// [`select_timeout`]: Select::select_timeout
583 /// [`try_ready`]: Select::try_ready
584 /// [`ready`]: Select::ready
585 /// [`ready_timeout`]: Select::ready_timeout
586 pub struct Select<'a> {
587 /// A list of senders and receivers participating in selection.
588 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
589
590 /// The next index to assign to an operation.
591 next_index: usize,
592 }
593
594 unsafe impl Send for Select<'_> {}
595 unsafe impl Sync for Select<'_> {}
596
597 impl<'a> Select<'a> {
598 /// Creates an empty list of channel operations for selection.
599 ///
600 /// # Examples
601 ///
602 /// ```
603 /// use crossbeam_channel::Select;
604 ///
605 /// let mut sel = Select::new();
606 ///
607 /// // The list of operations is empty, which means no operation can be selected.
608 /// assert!(sel.try_select().is_err());
609 /// ```
new() -> Select<'a>610 pub fn new() -> Select<'a> {
611 Select {
612 handles: Vec::with_capacity(4),
613 next_index: 0,
614 }
615 }
616
617 /// Adds a send operation.
618 ///
619 /// Returns the index of the added operation.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// use crossbeam_channel::{unbounded, Select};
625 ///
626 /// let (s, r) = unbounded::<i32>();
627 ///
628 /// let mut sel = Select::new();
629 /// let index = sel.send(&s);
630 /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize631 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
632 let i = self.next_index;
633 let ptr = s as *const Sender<_> as *const u8;
634 self.handles.push((s, i, ptr));
635 self.next_index += 1;
636 i
637 }
638
639 /// Adds a receive operation.
640 ///
641 /// Returns the index of the added operation.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use crossbeam_channel::{unbounded, Select};
647 ///
648 /// let (s, r) = unbounded::<i32>();
649 ///
650 /// let mut sel = Select::new();
651 /// let index = sel.recv(&r);
652 /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize653 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
654 let i = self.next_index;
655 let ptr = r as *const Receiver<_> as *const u8;
656 self.handles.push((r, i, ptr));
657 self.next_index += 1;
658 i
659 }
660
661 /// Removes a previously added operation.
662 ///
663 /// This is useful when an operation is selected because the channel got disconnected and we
664 /// want to try again to select a different operation instead.
665 ///
666 /// If new operations are added after removing some, the indices of removed operations will not
667 /// be reused.
668 ///
669 /// # Panics
670 ///
671 /// An attempt to remove a non-existing or already removed operation will panic.
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use crossbeam_channel::{unbounded, Select};
677 ///
678 /// let (s1, r1) = unbounded::<i32>();
679 /// let (_, r2) = unbounded::<i32>();
680 ///
681 /// let mut sel = Select::new();
682 /// let oper1 = sel.recv(&r1);
683 /// let oper2 = sel.recv(&r2);
684 ///
685 /// // Both operations are initially ready, so a random one will be executed.
686 /// let oper = sel.select();
687 /// assert_eq!(oper.index(), oper2);
688 /// assert!(oper.recv(&r2).is_err());
689 /// sel.remove(oper2);
690 ///
691 /// s1.send(10).unwrap();
692 ///
693 /// let oper = sel.select();
694 /// assert_eq!(oper.index(), oper1);
695 /// assert_eq!(oper.recv(&r1), Ok(10));
696 /// ```
remove(&mut self, index: usize)697 pub fn remove(&mut self, index: usize) {
698 assert!(
699 index < self.next_index,
700 "index out of bounds; {} >= {}",
701 index,
702 self.next_index,
703 );
704
705 let i = self
706 .handles
707 .iter()
708 .enumerate()
709 .find(|(_, (_, i, _))| *i == index)
710 .expect("no operation with this index")
711 .0;
712
713 self.handles.swap_remove(i);
714 }
715
716 /// Attempts to select one of the operations without blocking.
717 ///
718 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719 /// the same time, a random one among them is selected. If none of the operations are ready, an
720 /// error is returned.
721 ///
722 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723 /// even when it will simply return an error because the channel is disconnected.
724 ///
725 /// The selected operation must be completed with [`SelectedOperation::send`]
726 /// or [`SelectedOperation::recv`].
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// use crossbeam_channel::{unbounded, Select};
732 ///
733 /// let (s1, r1) = unbounded();
734 /// let (s2, r2) = unbounded();
735 ///
736 /// s1.send(10).unwrap();
737 /// s2.send(20).unwrap();
738 ///
739 /// let mut sel = Select::new();
740 /// let oper1 = sel.recv(&r1);
741 /// let oper2 = sel.recv(&r2);
742 ///
743 /// // Both operations are initially ready, so a random one will be executed.
744 /// let oper = sel.try_select();
745 /// match oper {
746 /// Err(_) => panic!("both operations should be ready"),
747 /// Ok(oper) => match oper.index() {
748 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750 /// _ => unreachable!(),
751 /// }
752 /// }
753 /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>754 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
755 try_select(&mut self.handles)
756 }
757
758 /// Blocks until one of the operations becomes ready and selects it.
759 ///
760 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761 /// ready at the same time, a random one among them is selected.
762 ///
763 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764 /// even when it will simply return an error because the channel is disconnected.
765 ///
766 /// The selected operation must be completed with [`SelectedOperation::send`]
767 /// or [`SelectedOperation::recv`].
768 ///
769 /// # Panics
770 ///
771 /// Panics if no operations have been added to `Select`.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use std::thread;
777 /// use std::time::Duration;
778 /// use crossbeam_channel::{unbounded, Select};
779 ///
780 /// let (s1, r1) = unbounded();
781 /// let (s2, r2) = unbounded();
782 ///
783 /// thread::spawn(move || {
784 /// thread::sleep(Duration::from_secs(1));
785 /// s1.send(10).unwrap();
786 /// });
787 /// thread::spawn(move || s2.send(20).unwrap());
788 ///
789 /// let mut sel = Select::new();
790 /// let oper1 = sel.recv(&r1);
791 /// let oper2 = sel.recv(&r2);
792 ///
793 /// // The second operation will be selected because it becomes ready first.
794 /// let oper = sel.select();
795 /// match oper.index() {
796 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798 /// _ => unreachable!(),
799 /// }
800 /// ```
select(&mut self) -> SelectedOperation<'a>801 pub fn select(&mut self) -> SelectedOperation<'a> {
802 select(&mut self.handles)
803 }
804
805 /// Blocks for a limited time until one of the operations becomes ready and selects it.
806 ///
807 /// If an operation becomes ready, it is selected and returned. If multiple operations are
808 /// ready at the same time, a random one among them is selected. If none of the operations
809 /// become ready for the specified duration, an error is returned.
810 ///
811 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812 /// even when it will simply return an error because the channel is disconnected.
813 ///
814 /// The selected operation must be completed with [`SelectedOperation::send`]
815 /// or [`SelectedOperation::recv`].
816 ///
817 /// # Examples
818 ///
819 /// ```
820 /// use std::thread;
821 /// use std::time::Duration;
822 /// use crossbeam_channel::{unbounded, Select};
823 ///
824 /// let (s1, r1) = unbounded();
825 /// let (s2, r2) = unbounded();
826 ///
827 /// thread::spawn(move || {
828 /// thread::sleep(Duration::from_secs(1));
829 /// s1.send(10).unwrap();
830 /// });
831 /// thread::spawn(move || s2.send(20).unwrap());
832 ///
833 /// let mut sel = Select::new();
834 /// let oper1 = sel.recv(&r1);
835 /// let oper2 = sel.recv(&r2);
836 ///
837 /// // The second operation will be selected because it becomes ready first.
838 /// let oper = sel.select_timeout(Duration::from_millis(500));
839 /// match oper {
840 /// Err(_) => panic!("should not have timed out"),
841 /// Ok(oper) => match oper.index() {
842 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844 /// _ => unreachable!(),
845 /// }
846 /// }
847 /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>848 pub fn select_timeout(
849 &mut self,
850 timeout: Duration,
851 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
852 select_timeout(&mut self.handles, timeout)
853 }
854
855 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
856 ///
857 /// If an operation becomes ready, it is selected and returned. If multiple operations are
858 /// ready at the same time, a random one among them is selected. If none of the operations
859 /// become ready before the given deadline, an error is returned.
860 ///
861 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862 /// even when it will simply return an error because the channel is disconnected.
863 ///
864 /// The selected operation must be completed with [`SelectedOperation::send`]
865 /// or [`SelectedOperation::recv`].
866 ///
867 /// # Examples
868 ///
869 /// ```
870 /// use std::thread;
871 /// use std::time::{Instant, Duration};
872 /// use crossbeam_channel::{unbounded, Select};
873 ///
874 /// let (s1, r1) = unbounded();
875 /// let (s2, r2) = unbounded();
876 ///
877 /// thread::spawn(move || {
878 /// thread::sleep(Duration::from_secs(1));
879 /// s1.send(10).unwrap();
880 /// });
881 /// thread::spawn(move || s2.send(20).unwrap());
882 ///
883 /// let mut sel = Select::new();
884 /// let oper1 = sel.recv(&r1);
885 /// let oper2 = sel.recv(&r2);
886 ///
887 /// let deadline = Instant::now() + Duration::from_millis(500);
888 ///
889 /// // The second operation will be selected because it becomes ready first.
890 /// let oper = sel.select_deadline(deadline);
891 /// match oper {
892 /// Err(_) => panic!("should not have timed out"),
893 /// Ok(oper) => match oper.index() {
894 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
895 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
896 /// _ => unreachable!(),
897 /// }
898 /// }
899 /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>900 pub fn select_deadline(
901 &mut self,
902 deadline: Instant,
903 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
904 select_deadline(&mut self.handles, deadline)
905 }
906
907 /// Attempts to find a ready operation without blocking.
908 ///
909 /// If an operation is ready, its index is returned. If multiple operations are ready at the
910 /// same time, a random one among them is chosen. If none of the operations are ready, an error
911 /// is returned.
912 ///
913 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
914 /// even when it will simply return an error because the channel is disconnected.
915 ///
916 /// Note that this method might return with success spuriously, so it's a good idea to always
917 /// double check if the operation is really ready.
918 ///
919 /// # Examples
920 ///
921 /// ```
922 /// use crossbeam_channel::{unbounded, Select};
923 ///
924 /// let (s1, r1) = unbounded();
925 /// let (s2, r2) = unbounded();
926 ///
927 /// s1.send(10).unwrap();
928 /// s2.send(20).unwrap();
929 ///
930 /// let mut sel = Select::new();
931 /// let oper1 = sel.recv(&r1);
932 /// let oper2 = sel.recv(&r2);
933 ///
934 /// // Both operations are initially ready, so a random one will be chosen.
935 /// match sel.try_ready() {
936 /// Err(_) => panic!("both operations should be ready"),
937 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
938 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
939 /// Ok(_) => unreachable!(),
940 /// }
941 /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>942 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
943 match run_ready(&mut self.handles, Timeout::Now) {
944 None => Err(TryReadyError),
945 Some(index) => Ok(index),
946 }
947 }
948
949 /// Blocks until one of the operations becomes ready.
950 ///
951 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
952 /// the same time, a random one among them is chosen.
953 ///
954 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
955 /// even when it will simply return an error because the channel is disconnected.
956 ///
957 /// Note that this method might return with success spuriously, so it's a good idea to always
958 /// double check if the operation is really ready.
959 ///
960 /// # Panics
961 ///
962 /// Panics if no operations have been added to `Select`.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// use std::thread;
968 /// use std::time::Duration;
969 /// use crossbeam_channel::{unbounded, Select};
970 ///
971 /// let (s1, r1) = unbounded();
972 /// let (s2, r2) = unbounded();
973 ///
974 /// thread::spawn(move || {
975 /// thread::sleep(Duration::from_secs(1));
976 /// s1.send(10).unwrap();
977 /// });
978 /// thread::spawn(move || s2.send(20).unwrap());
979 ///
980 /// let mut sel = Select::new();
981 /// let oper1 = sel.recv(&r1);
982 /// let oper2 = sel.recv(&r2);
983 ///
984 /// // The second operation will be selected because it becomes ready first.
985 /// match sel.ready() {
986 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
987 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
988 /// _ => unreachable!(),
989 /// }
990 /// ```
ready(&mut self) -> usize991 pub fn ready(&mut self) -> usize {
992 if self.handles.is_empty() {
993 panic!("no operations have been added to `Select`");
994 }
995
996 run_ready(&mut self.handles, Timeout::Never).unwrap()
997 }
998
999 /// Blocks for a limited time until one of the operations becomes ready.
1000 ///
1001 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1002 /// the same time, a random one among them is chosen. If none of the operations become ready
1003 /// for the specified duration, an error is returned.
1004 ///
1005 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1006 /// even when it will simply return an error because the channel is disconnected.
1007 ///
1008 /// Note that this method might return with success spuriously, so it's a good idea to double
1009 /// check if the operation is really ready.
1010 ///
1011 /// # Examples
1012 ///
1013 /// ```
1014 /// use std::thread;
1015 /// use std::time::Duration;
1016 /// use crossbeam_channel::{unbounded, Select};
1017 ///
1018 /// let (s1, r1) = unbounded();
1019 /// let (s2, r2) = unbounded();
1020 ///
1021 /// thread::spawn(move || {
1022 /// thread::sleep(Duration::from_secs(1));
1023 /// s1.send(10).unwrap();
1024 /// });
1025 /// thread::spawn(move || s2.send(20).unwrap());
1026 ///
1027 /// let mut sel = Select::new();
1028 /// let oper1 = sel.recv(&r1);
1029 /// let oper2 = sel.recv(&r2);
1030 ///
1031 /// // The second operation will be selected because it becomes ready first.
1032 /// match sel.ready_timeout(Duration::from_millis(500)) {
1033 /// Err(_) => panic!("should not have timed out"),
1034 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1035 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1036 /// Ok(_) => unreachable!(),
1037 /// }
1038 /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1039 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1040 self.ready_deadline(Instant::now() + timeout)
1041 }
1042
1043 /// Blocks until a given deadline, or until one of the operations becomes ready.
1044 ///
1045 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1046 /// the same time, a random one among them is chosen. If none of the operations become ready
1047 /// before the deadline, an error is returned.
1048 ///
1049 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1050 /// even when it will simply return an error because the channel is disconnected.
1051 ///
1052 /// Note that this method might return with success spuriously, so it's a good idea to double
1053 /// check if the operation is really ready.
1054 ///
1055 /// # Examples
1056 ///
1057 /// ```
1058 /// use std::thread;
1059 /// use std::time::{Duration, Instant};
1060 /// use crossbeam_channel::{unbounded, Select};
1061 ///
1062 /// let deadline = Instant::now() + Duration::from_millis(500);
1063 ///
1064 /// let (s1, r1) = unbounded();
1065 /// let (s2, r2) = unbounded();
1066 ///
1067 /// thread::spawn(move || {
1068 /// thread::sleep(Duration::from_secs(1));
1069 /// s1.send(10).unwrap();
1070 /// });
1071 /// thread::spawn(move || s2.send(20).unwrap());
1072 ///
1073 /// let mut sel = Select::new();
1074 /// let oper1 = sel.recv(&r1);
1075 /// let oper2 = sel.recv(&r2);
1076 ///
1077 /// // The second operation will be selected because it becomes ready first.
1078 /// match sel.ready_deadline(deadline) {
1079 /// Err(_) => panic!("should not have timed out"),
1080 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1081 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1082 /// Ok(_) => unreachable!(),
1083 /// }
1084 /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1085 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1086 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1087 None => Err(ReadyTimeoutError),
1088 Some(index) => Ok(index),
1089 }
1090 }
1091 }
1092
1093 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1094 fn clone(&self) -> Select<'a> {
1095 Select {
1096 handles: self.handles.clone(),
1097 next_index: self.next_index,
1098 }
1099 }
1100 }
1101
1102 impl<'a> Default for Select<'a> {
default() -> Select<'a>1103 fn default() -> Select<'a> {
1104 Select::new()
1105 }
1106 }
1107
1108 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1110 f.pad("Select { .. }")
1111 }
1112 }
1113
1114 /// A selected operation that needs to be completed.
1115 ///
1116 /// To complete the operation, call [`send`] or [`recv`].
1117 ///
1118 /// # Panics
1119 ///
1120 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1121 /// `SelectedOperation` is dropped without completion, a panic occurs.
1122 ///
1123 /// [`send`]: SelectedOperation::send
1124 /// [`recv`]: SelectedOperation::recv
1125 #[must_use]
1126 pub struct SelectedOperation<'a> {
1127 /// Token needed to complete the operation.
1128 token: Token,
1129
1130 /// The index of the selected operation.
1131 index: usize,
1132
1133 /// The address of the selected `Sender` or `Receiver`.
1134 ptr: *const u8,
1135
1136 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1137 _marker: PhantomData<&'a ()>,
1138 }
1139
1140 impl SelectedOperation<'_> {
1141 /// Returns the index of the selected operation.
1142 ///
1143 /// # Examples
1144 ///
1145 /// ```
1146 /// use crossbeam_channel::{bounded, Select};
1147 ///
1148 /// let (s1, r1) = bounded::<()>(0);
1149 /// let (s2, r2) = bounded::<()>(0);
1150 /// let (s3, r3) = bounded::<()>(1);
1151 ///
1152 /// let mut sel = Select::new();
1153 /// let oper1 = sel.send(&s1);
1154 /// let oper2 = sel.recv(&r2);
1155 /// let oper3 = sel.send(&s3);
1156 ///
1157 /// // Only the last operation is ready.
1158 /// let oper = sel.select();
1159 /// assert_eq!(oper.index(), 2);
1160 /// assert_eq!(oper.index(), oper3);
1161 ///
1162 /// // Complete the operation.
1163 /// oper.send(&s3, ()).unwrap();
1164 /// ```
index(&self) -> usize1165 pub fn index(&self) -> usize {
1166 self.index
1167 }
1168
1169 /// Completes the send operation.
1170 ///
1171 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1172 /// when the operation was added.
1173 ///
1174 /// # Panics
1175 ///
1176 /// Panics if an incorrect [`Sender`] reference is passed.
1177 ///
1178 /// # Examples
1179 ///
1180 /// ```
1181 /// use crossbeam_channel::{bounded, Select, SendError};
1182 ///
1183 /// let (s, r) = bounded::<i32>(0);
1184 /// drop(r);
1185 ///
1186 /// let mut sel = Select::new();
1187 /// let oper1 = sel.send(&s);
1188 ///
1189 /// let oper = sel.select();
1190 /// assert_eq!(oper.index(), oper1);
1191 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1192 /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1193 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1194 assert!(
1195 s as *const Sender<T> as *const u8 == self.ptr,
1196 "passed a sender that wasn't selected",
1197 );
1198 let res = unsafe { channel::write(s, &mut self.token, msg) };
1199 mem::forget(self);
1200 res.map_err(SendError)
1201 }
1202
1203 /// Completes the receive operation.
1204 ///
1205 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1206 /// when the operation was added.
1207 ///
1208 /// # Panics
1209 ///
1210 /// Panics if an incorrect [`Receiver`] reference is passed.
1211 ///
1212 /// # Examples
1213 ///
1214 /// ```
1215 /// use crossbeam_channel::{bounded, Select, RecvError};
1216 ///
1217 /// let (s, r) = bounded::<i32>(0);
1218 /// drop(s);
1219 ///
1220 /// let mut sel = Select::new();
1221 /// let oper1 = sel.recv(&r);
1222 ///
1223 /// let oper = sel.select();
1224 /// assert_eq!(oper.index(), oper1);
1225 /// assert_eq!(oper.recv(&r), Err(RecvError));
1226 /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1227 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1228 assert!(
1229 r as *const Receiver<T> as *const u8 == self.ptr,
1230 "passed a receiver that wasn't selected",
1231 );
1232 let res = unsafe { channel::read(r, &mut self.token) };
1233 mem::forget(self);
1234 res.map_err(|_| RecvError)
1235 }
1236 }
1237
1238 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1240 f.pad("SelectedOperation { .. }")
1241 }
1242 }
1243
1244 impl Drop for SelectedOperation<'_> {
drop(&mut self)1245 fn drop(&mut self) {
1246 panic!("dropped `SelectedOperation` without completing the operation");
1247 }
1248 }
1249