1 //! Interface to the select mechanism.
2
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_utils::Backoff;
9
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 // This is a private API that is used by the select macro.
23 #[derive(Debug, Default)]
24 pub struct Token {
25 pub(crate) at: flavors::at::AtToken,
26 pub(crate) array: flavors::array::ArrayToken,
27 pub(crate) list: flavors::list::ListToken,
28 #[allow(dead_code)]
29 pub(crate) never: flavors::never::NeverToken,
30 pub(crate) tick: flavors::tick::TickToken,
31 pub(crate) zero: flavors::zero::ZeroToken,
32 }
33
34 /// Identifier associated with an operation by a specific thread on a specific channel.
35 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36 pub struct Operation(usize);
37
38 impl Operation {
39 /// Creates an operation identifier from a mutable reference.
40 ///
41 /// This function essentially just turns the address of the reference into a number. The
42 /// reference should point to a variable that is specific to the thread and the operation,
43 /// and is alive for the entire duration of select or blocking operation.
44 #[inline]
hook<T>(r: &mut T) -> Operation45 pub fn hook<T>(r: &mut T) -> Operation {
46 let val = r as *mut T as usize;
47 // Make sure that the pointer address doesn't equal the numerical representation of
48 // `Selected::{Waiting, Aborted, Disconnected}`.
49 assert!(val > 2);
50 Operation(val)
51 }
52 }
53
54 /// Current state of a select or a blocking operation.
55 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56 pub enum Selected {
57 /// Still waiting for an operation.
58 Waiting,
59
60 /// The attempt to block the current thread has been aborted.
61 Aborted,
62
63 /// An operation became ready because a channel is disconnected.
64 Disconnected,
65
66 /// An operation became ready because a message can be sent or received.
67 Operation(Operation),
68 }
69
70 impl From<usize> for Selected {
71 #[inline]
from(val: usize) -> Selected72 fn from(val: usize) -> Selected {
73 match val {
74 0 => Selected::Waiting,
75 1 => Selected::Aborted,
76 2 => Selected::Disconnected,
77 oper => Selected::Operation(Operation(oper)),
78 }
79 }
80 }
81
82 impl Into<usize> for Selected {
83 #[inline]
into(self) -> usize84 fn into(self) -> usize {
85 match self {
86 Selected::Waiting => 0,
87 Selected::Aborted => 1,
88 Selected::Disconnected => 2,
89 Selected::Operation(Operation(val)) => val,
90 }
91 }
92 }
93
94 /// A receiver or a sender that can participate in select.
95 ///
96 /// This is a handle that assists select in executing an operation, registration, deciding on the
97 /// appropriate deadline for blocking, etc.
98 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
99 pub trait SelectHandle {
100 /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool101 fn try_select(&self, token: &mut Token) -> bool;
102
103 /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>104 fn deadline(&self) -> Option<Instant>;
105
106 /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool107 fn register(&self, oper: Operation, cx: &Context) -> bool;
108
109 /// Unregisters an operation for execution.
unregister(&self, oper: Operation)110 fn unregister(&self, oper: Operation);
111
112 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool113 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
114
115 /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool116 fn is_ready(&self) -> bool;
117
118 /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool119 fn watch(&self, oper: Operation, cx: &Context) -> bool;
120
121 /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)122 fn unwatch(&self, oper: Operation);
123 }
124
125 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool126 fn try_select(&self, token: &mut Token) -> bool {
127 (**self).try_select(token)
128 }
129
deadline(&self) -> Option<Instant>130 fn deadline(&self) -> Option<Instant> {
131 (**self).deadline()
132 }
133
register(&self, oper: Operation, cx: &Context) -> bool134 fn register(&self, oper: Operation, cx: &Context) -> bool {
135 (**self).register(oper, cx)
136 }
137
unregister(&self, oper: Operation)138 fn unregister(&self, oper: Operation) {
139 (**self).unregister(oper);
140 }
141
accept(&self, token: &mut Token, cx: &Context) -> bool142 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
143 (**self).accept(token, cx)
144 }
145
is_ready(&self) -> bool146 fn is_ready(&self) -> bool {
147 (**self).is_ready()
148 }
149
watch(&self, oper: Operation, cx: &Context) -> bool150 fn watch(&self, oper: Operation, cx: &Context) -> bool {
151 (**self).watch(oper, cx)
152 }
153
unwatch(&self, oper: Operation)154 fn unwatch(&self, oper: Operation) {
155 (**self).unwatch(oper)
156 }
157 }
158
159 /// Determines when a select operation should time out.
160 #[derive(Clone, Copy, Eq, PartialEq)]
161 enum Timeout {
162 /// No blocking.
163 Now,
164
165 /// Block forever.
166 Never,
167
168 /// Time out after the time instant.
169 At(Instant),
170 }
171
172 /// Runs until one of the operations is selected, potentially blocking the current thread.
173 ///
174 /// Successful receive operations will have to be followed up by `channel::read()` and successful
175 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>176 fn run_select(
177 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
178 timeout: Timeout,
179 ) -> Option<(Token, usize, *const u8)> {
180 if handles.is_empty() {
181 // Wait until the timeout and return.
182 match timeout {
183 Timeout::Now => return None,
184 Timeout::Never => {
185 utils::sleep_until(None);
186 unreachable!();
187 }
188 Timeout::At(when) => {
189 utils::sleep_until(Some(when));
190 return None;
191 }
192 }
193 }
194
195 // Shuffle the operations for fairness.
196 utils::shuffle(handles);
197
198 // Create a token, which serves as a temporary variable that gets initialized in this function
199 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
200 // selected operation.
201 let mut token = Token::default();
202
203 // Try selecting one of the operations without blocking.
204 for &(handle, i, ptr) in handles.iter() {
205 if handle.try_select(&mut token) {
206 return Some((token, i, ptr));
207 }
208 }
209
210 loop {
211 // Prepare for blocking.
212 let res = Context::with(|cx| {
213 let mut sel = Selected::Waiting;
214 let mut registered_count = 0;
215 let mut index_ready = None;
216
217 if let Timeout::Now = timeout {
218 cx.try_select(Selected::Aborted).unwrap();
219 }
220
221 // Register all operations.
222 for (handle, i, _) in handles.iter_mut() {
223 registered_count += 1;
224
225 // If registration returns `false`, that means the operation has just become ready.
226 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
227 // Try aborting select.
228 sel = match cx.try_select(Selected::Aborted) {
229 Ok(()) => {
230 index_ready = Some(*i);
231 Selected::Aborted
232 }
233 Err(s) => s,
234 };
235 break;
236 }
237
238 // If another thread has already selected one of the operations, stop registration.
239 sel = cx.selected();
240 if sel != Selected::Waiting {
241 break;
242 }
243 }
244
245 if sel == Selected::Waiting {
246 // Check with each operation for how long we're allowed to block, and compute the
247 // earliest deadline.
248 let mut deadline: Option<Instant> = match timeout {
249 Timeout::Now => return None,
250 Timeout::Never => None,
251 Timeout::At(when) => Some(when),
252 };
253 for &(handle, _, _) in handles.iter() {
254 if let Some(x) = handle.deadline() {
255 deadline = deadline.map(|y| x.min(y)).or(Some(x));
256 }
257 }
258
259 // Block the current thread.
260 sel = cx.wait_until(deadline);
261 }
262
263 // Unregister all registered operations.
264 for (handle, _, _) in handles.iter_mut().take(registered_count) {
265 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
266 }
267
268 match sel {
269 Selected::Waiting => unreachable!(),
270 Selected::Aborted => {
271 // If an operation became ready during registration, try selecting it.
272 if let Some(index_ready) = index_ready {
273 for &(handle, i, ptr) in handles.iter() {
274 if i == index_ready && handle.try_select(&mut token) {
275 return Some((i, ptr));
276 }
277 }
278 }
279 }
280 Selected::Disconnected => {}
281 Selected::Operation(_) => {
282 // Find the selected operation.
283 for (handle, i, ptr) in handles.iter_mut() {
284 // Is this the selected operation?
285 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
286 {
287 // Try selecting this operation.
288 if handle.accept(&mut token, cx) {
289 return Some((*i, *ptr));
290 }
291 }
292 }
293 }
294 }
295
296 None
297 });
298
299 // Return if an operation was selected.
300 if let Some((i, ptr)) = res {
301 return Some((token, i, ptr));
302 }
303
304 // Try selecting one of the operations without blocking.
305 for &(handle, i, ptr) in handles.iter() {
306 if handle.try_select(&mut token) {
307 return Some((token, i, ptr));
308 }
309 }
310
311 match timeout {
312 Timeout::Now => return None,
313 Timeout::Never => {}
314 Timeout::At(when) => {
315 if Instant::now() >= when {
316 return None;
317 }
318 }
319 }
320 }
321 }
322
323 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>324 fn run_ready(
325 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
326 timeout: Timeout,
327 ) -> Option<usize> {
328 if handles.is_empty() {
329 // Wait until the timeout and return.
330 match timeout {
331 Timeout::Now => return None,
332 Timeout::Never => {
333 utils::sleep_until(None);
334 unreachable!();
335 }
336 Timeout::At(when) => {
337 utils::sleep_until(Some(when));
338 return None;
339 }
340 }
341 }
342
343 // Shuffle the operations for fairness.
344 utils::shuffle(handles);
345
346 loop {
347 let backoff = Backoff::new();
348 loop {
349 // Check operations for readiness.
350 for &(handle, i, _) in handles.iter() {
351 if handle.is_ready() {
352 return Some(i);
353 }
354 }
355
356 if backoff.is_completed() {
357 break;
358 } else {
359 backoff.snooze();
360 }
361 }
362
363 // Check for timeout.
364 match timeout {
365 Timeout::Now => return None,
366 Timeout::Never => {}
367 Timeout::At(when) => {
368 if Instant::now() >= when {
369 return None;
370 }
371 }
372 }
373
374 // Prepare for blocking.
375 let res = Context::with(|cx| {
376 let mut sel = Selected::Waiting;
377 let mut registered_count = 0;
378
379 // Begin watching all operations.
380 for (handle, _, _) in handles.iter_mut() {
381 registered_count += 1;
382 let oper = Operation::hook::<&dyn SelectHandle>(handle);
383
384 // If registration returns `false`, that means the operation has just become ready.
385 if handle.watch(oper, cx) {
386 sel = match cx.try_select(Selected::Operation(oper)) {
387 Ok(()) => Selected::Operation(oper),
388 Err(s) => s,
389 };
390 break;
391 }
392
393 // If another thread has already chosen one of the operations, stop registration.
394 sel = cx.selected();
395 if sel != Selected::Waiting {
396 break;
397 }
398 }
399
400 if sel == Selected::Waiting {
401 // Check with each operation for how long we're allowed to block, and compute the
402 // earliest deadline.
403 let mut deadline: Option<Instant> = match timeout {
404 Timeout::Now => unreachable!(),
405 Timeout::Never => None,
406 Timeout::At(when) => Some(when),
407 };
408 for &(handle, _, _) in handles.iter() {
409 if let Some(x) = handle.deadline() {
410 deadline = deadline.map(|y| x.min(y)).or(Some(x));
411 }
412 }
413
414 // Block the current thread.
415 sel = cx.wait_until(deadline);
416 }
417
418 // Unwatch all operations.
419 for (handle, _, _) in handles.iter_mut().take(registered_count) {
420 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
421 }
422
423 match sel {
424 Selected::Waiting => unreachable!(),
425 Selected::Aborted => {}
426 Selected::Disconnected => {}
427 Selected::Operation(_) => {
428 for (handle, i, _) in handles.iter_mut() {
429 let oper = Operation::hook::<&dyn SelectHandle>(handle);
430 if sel == Selected::Operation(oper) {
431 return Some(*i);
432 }
433 }
434 }
435 }
436
437 None
438 });
439
440 // Return if an operation became ready.
441 if res.is_some() {
442 return res;
443 }
444 }
445 }
446
447 /// Attempts to select one of the operations without blocking.
448 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
449 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>450 pub fn try_select<'a>(
451 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
452 ) -> Result<SelectedOperation<'a>, TrySelectError> {
453 match run_select(handles, Timeout::Now) {
454 None => Err(TrySelectError),
455 Some((token, index, ptr)) => Ok(SelectedOperation {
456 token,
457 index,
458 ptr,
459 _marker: PhantomData,
460 }),
461 }
462 }
463
464 /// Blocks until one of the operations becomes ready and selects it.
465 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
466 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>467 pub fn select<'a>(
468 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
469 ) -> SelectedOperation<'a> {
470 if handles.is_empty() {
471 panic!("no operations have been added to `Select`");
472 }
473
474 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
475 SelectedOperation {
476 token,
477 index,
478 ptr,
479 _marker: PhantomData,
480 }
481 }
482
483 /// Blocks for a limited time until one of the operations becomes ready and selects it.
484 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
485 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>486 pub fn select_timeout<'a>(
487 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
488 timeout: Duration,
489 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
490 select_deadline(handles, utils::convert_timeout_to_deadline(timeout))
491 }
492
493 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
494 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>495 pub(crate) fn select_deadline<'a>(
496 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497 deadline: Instant,
498 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
499 match run_select(handles, Timeout::At(deadline)) {
500 None => Err(SelectTimeoutError),
501 Some((token, index, ptr)) => Ok(SelectedOperation {
502 token,
503 index,
504 ptr,
505 _marker: PhantomData,
506 }),
507 }
508 }
509
510 /// Selects from a set of channel operations.
511 ///
512 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
513 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
514 /// among them is selected.
515 ///
516 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
517 /// when it will simply return an error because the channel is disconnected.
518 ///
519 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
520 /// dynamically created list of channel operations.
521 ///
522 /// Once a list of operations has been built with `Select`, there are two different ways of
523 /// proceeding:
524 ///
525 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
526 /// the returned selected operation has already begun and **must** be completed. If we don't
527 /// complete it, a panic will occur.
528 ///
529 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
530 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
531 /// possible for another thread to make the operation not ready just before we try executing it,
532 /// so it's wise to use a retry loop. However, note that these methods might return with success
533 /// spuriously, so it's a good idea to always double check if the operation is really ready.
534 ///
535 /// # Examples
536 ///
537 /// Use [`select`] to receive a message from a list of receivers:
538 ///
539 /// ```
540 /// use crossbeam_channel::{Receiver, RecvError, Select};
541 ///
542 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
543 /// // Build a list of operations.
544 /// let mut sel = Select::new();
545 /// for r in rs {
546 /// sel.recv(r);
547 /// }
548 ///
549 /// // Complete the selected operation.
550 /// let oper = sel.select();
551 /// let index = oper.index();
552 /// oper.recv(&rs[index])
553 /// }
554 /// ```
555 ///
556 /// Use [`ready`] to receive a message from a list of receivers:
557 ///
558 /// ```
559 /// use crossbeam_channel::{Receiver, RecvError, Select};
560 ///
561 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
562 /// // Build a list of operations.
563 /// let mut sel = Select::new();
564 /// for r in rs {
565 /// sel.recv(r);
566 /// }
567 ///
568 /// loop {
569 /// // Wait until a receive operation becomes ready and try executing it.
570 /// let index = sel.ready();
571 /// let res = rs[index].try_recv();
572 ///
573 /// // If the operation turns out not to be ready, retry.
574 /// if let Err(e) = res {
575 /// if e.is_empty() {
576 /// continue;
577 /// }
578 /// }
579 ///
580 /// // Success!
581 /// return res.map_err(|_| RecvError);
582 /// }
583 /// }
584 /// ```
585 ///
586 /// [`try_select`]: Select::try_select
587 /// [`select`]: Select::select
588 /// [`select_timeout`]: Select::select_timeout
589 /// [`try_ready`]: Select::try_ready
590 /// [`ready`]: Select::ready
591 /// [`ready_timeout`]: Select::ready_timeout
592 pub struct Select<'a> {
593 /// A list of senders and receivers participating in selection.
594 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
595
596 /// The next index to assign to an operation.
597 next_index: usize,
598 }
599
600 unsafe impl Send for Select<'_> {}
601 unsafe impl Sync for Select<'_> {}
602
603 impl<'a> Select<'a> {
604 /// Creates an empty list of channel operations for selection.
605 ///
606 /// # Examples
607 ///
608 /// ```
609 /// use crossbeam_channel::Select;
610 ///
611 /// let mut sel = Select::new();
612 ///
613 /// // The list of operations is empty, which means no operation can be selected.
614 /// assert!(sel.try_select().is_err());
615 /// ```
new() -> Select<'a>616 pub fn new() -> Select<'a> {
617 Select {
618 handles: Vec::with_capacity(4),
619 next_index: 0,
620 }
621 }
622
623 /// Adds a send operation.
624 ///
625 /// Returns the index of the added operation.
626 ///
627 /// # Examples
628 ///
629 /// ```
630 /// use crossbeam_channel::{unbounded, Select};
631 ///
632 /// let (s, r) = unbounded::<i32>();
633 ///
634 /// let mut sel = Select::new();
635 /// let index = sel.send(&s);
636 /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize637 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
638 let i = self.next_index;
639 let ptr = s as *const Sender<_> as *const u8;
640 self.handles.push((s, i, ptr));
641 self.next_index += 1;
642 i
643 }
644
645 /// Adds a receive operation.
646 ///
647 /// Returns the index of the added operation.
648 ///
649 /// # Examples
650 ///
651 /// ```
652 /// use crossbeam_channel::{unbounded, Select};
653 ///
654 /// let (s, r) = unbounded::<i32>();
655 ///
656 /// let mut sel = Select::new();
657 /// let index = sel.recv(&r);
658 /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize659 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
660 let i = self.next_index;
661 let ptr = r as *const Receiver<_> as *const u8;
662 self.handles.push((r, i, ptr));
663 self.next_index += 1;
664 i
665 }
666
667 /// Removes a previously added operation.
668 ///
669 /// This is useful when an operation is selected because the channel got disconnected and we
670 /// want to try again to select a different operation instead.
671 ///
672 /// If new operations are added after removing some, the indices of removed operations will not
673 /// be reused.
674 ///
675 /// # Panics
676 ///
677 /// An attempt to remove a non-existing or already removed operation will panic.
678 ///
679 /// # Examples
680 ///
681 /// ```
682 /// use crossbeam_channel::{unbounded, Select};
683 ///
684 /// let (s1, r1) = unbounded::<i32>();
685 /// let (_, r2) = unbounded::<i32>();
686 ///
687 /// let mut sel = Select::new();
688 /// let oper1 = sel.recv(&r1);
689 /// let oper2 = sel.recv(&r2);
690 ///
691 /// // Both operations are initially ready, so a random one will be executed.
692 /// let oper = sel.select();
693 /// assert_eq!(oper.index(), oper2);
694 /// assert!(oper.recv(&r2).is_err());
695 /// sel.remove(oper2);
696 ///
697 /// s1.send(10).unwrap();
698 ///
699 /// let oper = sel.select();
700 /// assert_eq!(oper.index(), oper1);
701 /// assert_eq!(oper.recv(&r1), Ok(10));
702 /// ```
remove(&mut self, index: usize)703 pub fn remove(&mut self, index: usize) {
704 assert!(
705 index < self.next_index,
706 "index out of bounds; {} >= {}",
707 index,
708 self.next_index,
709 );
710
711 let i = self
712 .handles
713 .iter()
714 .enumerate()
715 .find(|(_, (_, i, _))| *i == index)
716 .expect("no operation with this index")
717 .0;
718
719 self.handles.swap_remove(i);
720 }
721
722 /// Attempts to select one of the operations without blocking.
723 ///
724 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
725 /// the same time, a random one among them is selected. If none of the operations are ready, an
726 /// error is returned.
727 ///
728 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
729 /// even when it will simply return an error because the channel is disconnected.
730 ///
731 /// The selected operation must be completed with [`SelectedOperation::send`]
732 /// or [`SelectedOperation::recv`].
733 ///
734 /// # Examples
735 ///
736 /// ```
737 /// use crossbeam_channel::{unbounded, Select};
738 ///
739 /// let (s1, r1) = unbounded();
740 /// let (s2, r2) = unbounded();
741 ///
742 /// s1.send(10).unwrap();
743 /// s2.send(20).unwrap();
744 ///
745 /// let mut sel = Select::new();
746 /// let oper1 = sel.recv(&r1);
747 /// let oper2 = sel.recv(&r2);
748 ///
749 /// // Both operations are initially ready, so a random one will be executed.
750 /// let oper = sel.try_select();
751 /// match oper {
752 /// Err(_) => panic!("both operations should be ready"),
753 /// Ok(oper) => match oper.index() {
754 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
755 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
756 /// _ => unreachable!(),
757 /// }
758 /// }
759 /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>760 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
761 try_select(&mut self.handles)
762 }
763
764 /// Blocks until one of the operations becomes ready and selects it.
765 ///
766 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
767 /// ready at the same time, a random one among them is selected.
768 ///
769 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
770 /// even when it will simply return an error because the channel is disconnected.
771 ///
772 /// The selected operation must be completed with [`SelectedOperation::send`]
773 /// or [`SelectedOperation::recv`].
774 ///
775 /// # Panics
776 ///
777 /// Panics if no operations have been added to `Select`.
778 ///
779 /// # Examples
780 ///
781 /// ```
782 /// use std::thread;
783 /// use std::time::Duration;
784 /// use crossbeam_channel::{unbounded, Select};
785 ///
786 /// let (s1, r1) = unbounded();
787 /// let (s2, r2) = unbounded();
788 ///
789 /// thread::spawn(move || {
790 /// thread::sleep(Duration::from_secs(1));
791 /// s1.send(10).unwrap();
792 /// });
793 /// thread::spawn(move || s2.send(20).unwrap());
794 ///
795 /// let mut sel = Select::new();
796 /// let oper1 = sel.recv(&r1);
797 /// let oper2 = sel.recv(&r2);
798 ///
799 /// // The second operation will be selected because it becomes ready first.
800 /// let oper = sel.select();
801 /// match oper.index() {
802 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
803 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
804 /// _ => unreachable!(),
805 /// }
806 /// ```
select(&mut self) -> SelectedOperation<'a>807 pub fn select(&mut self) -> SelectedOperation<'a> {
808 select(&mut self.handles)
809 }
810
811 /// Blocks for a limited time until one of the operations becomes ready and selects it.
812 ///
813 /// If an operation becomes ready, it is selected and returned. If multiple operations are
814 /// ready at the same time, a random one among them is selected. If none of the operations
815 /// become ready for the specified duration, an error is returned.
816 ///
817 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
818 /// even when it will simply return an error because the channel is disconnected.
819 ///
820 /// The selected operation must be completed with [`SelectedOperation::send`]
821 /// or [`SelectedOperation::recv`].
822 ///
823 /// # Examples
824 ///
825 /// ```
826 /// use std::thread;
827 /// use std::time::Duration;
828 /// use crossbeam_channel::{unbounded, Select};
829 ///
830 /// let (s1, r1) = unbounded();
831 /// let (s2, r2) = unbounded();
832 ///
833 /// thread::spawn(move || {
834 /// thread::sleep(Duration::from_secs(1));
835 /// s1.send(10).unwrap();
836 /// });
837 /// thread::spawn(move || s2.send(20).unwrap());
838 ///
839 /// let mut sel = Select::new();
840 /// let oper1 = sel.recv(&r1);
841 /// let oper2 = sel.recv(&r2);
842 ///
843 /// // The second operation will be selected because it becomes ready first.
844 /// let oper = sel.select_timeout(Duration::from_millis(500));
845 /// match oper {
846 /// Err(_) => panic!("should not have timed out"),
847 /// Ok(oper) => match oper.index() {
848 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
849 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
850 /// _ => unreachable!(),
851 /// }
852 /// }
853 /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>854 pub fn select_timeout(
855 &mut self,
856 timeout: Duration,
857 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
858 select_timeout(&mut self.handles, timeout)
859 }
860
861 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
862 ///
863 /// If an operation becomes ready, it is selected and returned. If multiple operations are
864 /// ready at the same time, a random one among them is selected. If none of the operations
865 /// become ready before the given deadline, an error is returned.
866 ///
867 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
868 /// even when it will simply return an error because the channel is disconnected.
869 ///
870 /// The selected operation must be completed with [`SelectedOperation::send`]
871 /// or [`SelectedOperation::recv`].
872 ///
873 /// # Examples
874 ///
875 /// ```
876 /// use std::thread;
877 /// use std::time::{Instant, Duration};
878 /// use crossbeam_channel::{unbounded, Select};
879 ///
880 /// let (s1, r1) = unbounded();
881 /// let (s2, r2) = unbounded();
882 ///
883 /// thread::spawn(move || {
884 /// thread::sleep(Duration::from_secs(1));
885 /// s1.send(10).unwrap();
886 /// });
887 /// thread::spawn(move || s2.send(20).unwrap());
888 ///
889 /// let mut sel = Select::new();
890 /// let oper1 = sel.recv(&r1);
891 /// let oper2 = sel.recv(&r2);
892 ///
893 /// let deadline = Instant::now() + Duration::from_millis(500);
894 ///
895 /// // The second operation will be selected because it becomes ready first.
896 /// let oper = sel.select_deadline(deadline);
897 /// match oper {
898 /// Err(_) => panic!("should not have timed out"),
899 /// Ok(oper) => match oper.index() {
900 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
901 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
902 /// _ => unreachable!(),
903 /// }
904 /// }
905 /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>906 pub fn select_deadline(
907 &mut self,
908 deadline: Instant,
909 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
910 select_deadline(&mut self.handles, deadline)
911 }
912
913 /// Attempts to find a ready operation without blocking.
914 ///
915 /// If an operation is ready, its index is returned. If multiple operations are ready at the
916 /// same time, a random one among them is chosen. If none of the operations are ready, an error
917 /// is returned.
918 ///
919 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
920 /// even when it will simply return an error because the channel is disconnected.
921 ///
922 /// Note that this method might return with success spuriously, so it's a good idea to always
923 /// double check if the operation is really ready.
924 ///
925 /// # Examples
926 ///
927 /// ```
928 /// use crossbeam_channel::{unbounded, Select};
929 ///
930 /// let (s1, r1) = unbounded();
931 /// let (s2, r2) = unbounded();
932 ///
933 /// s1.send(10).unwrap();
934 /// s2.send(20).unwrap();
935 ///
936 /// let mut sel = Select::new();
937 /// let oper1 = sel.recv(&r1);
938 /// let oper2 = sel.recv(&r2);
939 ///
940 /// // Both operations are initially ready, so a random one will be chosen.
941 /// match sel.try_ready() {
942 /// Err(_) => panic!("both operations should be ready"),
943 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
944 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
945 /// Ok(_) => unreachable!(),
946 /// }
947 /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>948 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
949 match run_ready(&mut self.handles, Timeout::Now) {
950 None => Err(TryReadyError),
951 Some(index) => Ok(index),
952 }
953 }
954
955 /// Blocks until one of the operations becomes ready.
956 ///
957 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
958 /// the same time, a random one among them is chosen.
959 ///
960 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
961 /// even when it will simply return an error because the channel is disconnected.
962 ///
963 /// Note that this method might return with success spuriously, so it's a good idea to always
964 /// double check if the operation is really ready.
965 ///
966 /// # Panics
967 ///
968 /// Panics if no operations have been added to `Select`.
969 ///
970 /// # Examples
971 ///
972 /// ```
973 /// use std::thread;
974 /// use std::time::Duration;
975 /// use crossbeam_channel::{unbounded, Select};
976 ///
977 /// let (s1, r1) = unbounded();
978 /// let (s2, r2) = unbounded();
979 ///
980 /// thread::spawn(move || {
981 /// thread::sleep(Duration::from_secs(1));
982 /// s1.send(10).unwrap();
983 /// });
984 /// thread::spawn(move || s2.send(20).unwrap());
985 ///
986 /// let mut sel = Select::new();
987 /// let oper1 = sel.recv(&r1);
988 /// let oper2 = sel.recv(&r2);
989 ///
990 /// // The second operation will be selected because it becomes ready first.
991 /// match sel.ready() {
992 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
993 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
994 /// _ => unreachable!(),
995 /// }
996 /// ```
ready(&mut self) -> usize997 pub fn ready(&mut self) -> usize {
998 if self.handles.is_empty() {
999 panic!("no operations have been added to `Select`");
1000 }
1001
1002 run_ready(&mut self.handles, Timeout::Never).unwrap()
1003 }
1004
1005 /// Blocks for a limited time until one of the operations becomes ready.
1006 ///
1007 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1008 /// the same time, a random one among them is chosen. If none of the operations become ready
1009 /// for the specified duration, an error is returned.
1010 ///
1011 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1012 /// even when it will simply return an error because the channel is disconnected.
1013 ///
1014 /// Note that this method might return with success spuriously, so it's a good idea to double
1015 /// check if the operation is really ready.
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```
1020 /// use std::thread;
1021 /// use std::time::Duration;
1022 /// use crossbeam_channel::{unbounded, Select};
1023 ///
1024 /// let (s1, r1) = unbounded();
1025 /// let (s2, r2) = unbounded();
1026 ///
1027 /// thread::spawn(move || {
1028 /// thread::sleep(Duration::from_secs(1));
1029 /// s1.send(10).unwrap();
1030 /// });
1031 /// thread::spawn(move || s2.send(20).unwrap());
1032 ///
1033 /// let mut sel = Select::new();
1034 /// let oper1 = sel.recv(&r1);
1035 /// let oper2 = sel.recv(&r2);
1036 ///
1037 /// // The second operation will be selected because it becomes ready first.
1038 /// match sel.ready_timeout(Duration::from_millis(500)) {
1039 /// Err(_) => panic!("should not have timed out"),
1040 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1041 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1042 /// Ok(_) => unreachable!(),
1043 /// }
1044 /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1045 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1046 self.ready_deadline(utils::convert_timeout_to_deadline(timeout))
1047 }
1048
1049 /// Blocks until a given deadline, or until one of the operations becomes ready.
1050 ///
1051 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1052 /// the same time, a random one among them is chosen. If none of the operations become ready
1053 /// before the deadline, an error is returned.
1054 ///
1055 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1056 /// even when it will simply return an error because the channel is disconnected.
1057 ///
1058 /// Note that this method might return with success spuriously, so it's a good idea to double
1059 /// check if the operation is really ready.
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use std::thread;
1065 /// use std::time::{Duration, Instant};
1066 /// use crossbeam_channel::{unbounded, Select};
1067 ///
1068 /// let deadline = Instant::now() + Duration::from_millis(500);
1069 ///
1070 /// let (s1, r1) = unbounded();
1071 /// let (s2, r2) = unbounded();
1072 ///
1073 /// thread::spawn(move || {
1074 /// thread::sleep(Duration::from_secs(1));
1075 /// s1.send(10).unwrap();
1076 /// });
1077 /// thread::spawn(move || s2.send(20).unwrap());
1078 ///
1079 /// let mut sel = Select::new();
1080 /// let oper1 = sel.recv(&r1);
1081 /// let oper2 = sel.recv(&r2);
1082 ///
1083 /// // The second operation will be selected because it becomes ready first.
1084 /// match sel.ready_deadline(deadline) {
1085 /// Err(_) => panic!("should not have timed out"),
1086 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1087 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1088 /// Ok(_) => unreachable!(),
1089 /// }
1090 /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1091 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1092 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1093 None => Err(ReadyTimeoutError),
1094 Some(index) => Ok(index),
1095 }
1096 }
1097 }
1098
1099 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1100 fn clone(&self) -> Select<'a> {
1101 Select {
1102 handles: self.handles.clone(),
1103 next_index: self.next_index,
1104 }
1105 }
1106 }
1107
1108 impl<'a> Default for Select<'a> {
default() -> Select<'a>1109 fn default() -> Select<'a> {
1110 Select::new()
1111 }
1112 }
1113
1114 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116 f.pad("Select { .. }")
1117 }
1118 }
1119
1120 /// A selected operation that needs to be completed.
1121 ///
1122 /// To complete the operation, call [`send`] or [`recv`].
1123 ///
1124 /// # Panics
1125 ///
1126 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1127 /// `SelectedOperation` is dropped without completion, a panic occurs.
1128 ///
1129 /// [`send`]: SelectedOperation::send
1130 /// [`recv`]: SelectedOperation::recv
1131 #[must_use]
1132 pub struct SelectedOperation<'a> {
1133 /// Token needed to complete the operation.
1134 token: Token,
1135
1136 /// The index of the selected operation.
1137 index: usize,
1138
1139 /// The address of the selected `Sender` or `Receiver`.
1140 ptr: *const u8,
1141
1142 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1143 _marker: PhantomData<&'a ()>,
1144 }
1145
1146 impl SelectedOperation<'_> {
1147 /// Returns the index of the selected operation.
1148 ///
1149 /// # Examples
1150 ///
1151 /// ```
1152 /// use crossbeam_channel::{bounded, Select};
1153 ///
1154 /// let (s1, r1) = bounded::<()>(0);
1155 /// let (s2, r2) = bounded::<()>(0);
1156 /// let (s3, r3) = bounded::<()>(1);
1157 ///
1158 /// let mut sel = Select::new();
1159 /// let oper1 = sel.send(&s1);
1160 /// let oper2 = sel.recv(&r2);
1161 /// let oper3 = sel.send(&s3);
1162 ///
1163 /// // Only the last operation is ready.
1164 /// let oper = sel.select();
1165 /// assert_eq!(oper.index(), 2);
1166 /// assert_eq!(oper.index(), oper3);
1167 ///
1168 /// // Complete the operation.
1169 /// oper.send(&s3, ()).unwrap();
1170 /// ```
index(&self) -> usize1171 pub fn index(&self) -> usize {
1172 self.index
1173 }
1174
1175 /// Completes the send operation.
1176 ///
1177 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1178 /// when the operation was added.
1179 ///
1180 /// # Panics
1181 ///
1182 /// Panics if an incorrect [`Sender`] reference is passed.
1183 ///
1184 /// # Examples
1185 ///
1186 /// ```
1187 /// use crossbeam_channel::{bounded, Select, SendError};
1188 ///
1189 /// let (s, r) = bounded::<i32>(0);
1190 /// drop(r);
1191 ///
1192 /// let mut sel = Select::new();
1193 /// let oper1 = sel.send(&s);
1194 ///
1195 /// let oper = sel.select();
1196 /// assert_eq!(oper.index(), oper1);
1197 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1198 /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1199 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1200 assert!(
1201 s as *const Sender<T> as *const u8 == self.ptr,
1202 "passed a sender that wasn't selected",
1203 );
1204 let res = unsafe { channel::write(s, &mut self.token, msg) };
1205 mem::forget(self);
1206 res.map_err(SendError)
1207 }
1208
1209 /// Completes the receive operation.
1210 ///
1211 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1212 /// when the operation was added.
1213 ///
1214 /// # Panics
1215 ///
1216 /// Panics if an incorrect [`Receiver`] reference is passed.
1217 ///
1218 /// # Examples
1219 ///
1220 /// ```
1221 /// use crossbeam_channel::{bounded, Select, RecvError};
1222 ///
1223 /// let (s, r) = bounded::<i32>(0);
1224 /// drop(s);
1225 ///
1226 /// let mut sel = Select::new();
1227 /// let oper1 = sel.recv(&r);
1228 ///
1229 /// let oper = sel.select();
1230 /// assert_eq!(oper.index(), oper1);
1231 /// assert_eq!(oper.recv(&r), Err(RecvError));
1232 /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1233 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1234 assert!(
1235 r as *const Receiver<T> as *const u8 == self.ptr,
1236 "passed a receiver that wasn't selected",
1237 );
1238 let res = unsafe { channel::read(r, &mut self.token) };
1239 mem::forget(self);
1240 res.map_err(|_| RecvError)
1241 }
1242 }
1243
1244 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1246 f.pad("SelectedOperation { .. }")
1247 }
1248 }
1249
1250 impl Drop for SelectedOperation<'_> {
drop(&mut self)1251 fn drop(&mut self) {
1252 panic!("dropped `SelectedOperation` without completing the operation");
1253 }
1254 }
1255