1 //! Multi-producer multi-consumer channels for message passing. 2 //! 3 //! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. 4 //! 5 //! # Hello, world! 6 //! 7 //! ``` 8 //! use crossbeam_channel::unbounded; 9 //! 10 //! // Create a channel of unbounded capacity. 11 //! let (s, r) = unbounded(); 12 //! 13 //! // Send a message into the channel. 14 //! s.send("Hello, world!").unwrap(); 15 //! 16 //! // Receive the message from the channel. 17 //! assert_eq!(r.recv(), Ok("Hello, world!")); 18 //! ``` 19 //! 20 //! # Channel types 21 //! 22 //! Channels can be created using two functions: 23 //! 24 //! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages 25 //! it can hold at a time. 26 //! 27 //! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of 28 //! messages at a time. 29 //! 30 //! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides 31 //! of a channel. 32 //! 33 //! Creating a bounded channel: 34 //! 35 //! ``` 36 //! use crossbeam_channel::bounded; 37 //! 38 //! // Create a channel that can hold at most 5 messages at a time. 39 //! let (s, r) = bounded(5); 40 //! 41 //! // Can send only 5 messages without blocking. 42 //! for i in 0..5 { 43 //! s.send(i).unwrap(); 44 //! } 45 //! 46 //! // Another call to `send` would block because the channel is full. 47 //! // s.send(5).unwrap(); 48 //! ``` 49 //! 50 //! Creating an unbounded channel: 51 //! 52 //! ``` 53 //! use crossbeam_channel::unbounded; 54 //! 55 //! // Create an unbounded channel. 56 //! let (s, r) = unbounded(); 57 //! 58 //! // Can send any number of messages into the channel without blocking. 59 //! for i in 0..1000 { 60 //! s.send(i).unwrap(); 61 //! } 62 //! ``` 63 //! 64 //! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and 65 //! receive operations must appear at the same time in order to pair up and pass the message over: 66 //! 67 //! ``` 68 //! use std::thread; 69 //! use crossbeam_channel::bounded; 70 //! 71 //! // Create a zero-capacity channel. 72 //! let (s, r) = bounded(0); 73 //! 74 //! // Sending blocks until a receive operation appears on the other side. 75 //! thread::spawn(move || s.send("Hi!").unwrap()); 76 //! 77 //! // Receiving blocks until a send operation appears on the other side. 78 //! assert_eq!(r.recv(), Ok("Hi!")); 79 //! ``` 80 //! 81 //! # Sharing channels 82 //! 83 //! Senders and receivers can be cloned and sent to other threads: 84 //! 85 //! ``` 86 //! use std::thread; 87 //! use crossbeam_channel::bounded; 88 //! 89 //! let (s1, r1) = bounded(0); 90 //! let (s2, r2) = (s1.clone(), r1.clone()); 91 //! 92 //! // Spawn a thread that receives a message and then sends one. 93 //! thread::spawn(move || { 94 //! r2.recv().unwrap(); 95 //! s2.send(2).unwrap(); 96 //! }); 97 //! 98 //! // Send a message and then receive one. 99 //! s1.send(1).unwrap(); 100 //! r1.recv().unwrap(); 101 //! ``` 102 //! 103 //! Note that cloning only creates a new handle to the same sending or receiving side. It does not 104 //! create a separate stream of messages in any way: 105 //! 106 //! ``` 107 //! use crossbeam_channel::unbounded; 108 //! 109 //! let (s1, r1) = unbounded(); 110 //! let (s2, r2) = (s1.clone(), r1.clone()); 111 //! let (s3, r3) = (s2.clone(), r2.clone()); 112 //! 113 //! s1.send(10).unwrap(); 114 //! s2.send(20).unwrap(); 115 //! s3.send(30).unwrap(); 116 //! 117 //! assert_eq!(r3.recv(), Ok(10)); 118 //! assert_eq!(r1.recv(), Ok(20)); 119 //! assert_eq!(r2.recv(), Ok(30)); 120 //! ``` 121 //! 122 //! It's also possible to share senders and receivers by reference: 123 //! 124 //! ``` 125 //! use crossbeam_channel::bounded; 126 //! use crossbeam_utils::thread::scope; 127 //! 128 //! let (s, r) = bounded(0); 129 //! 130 //! scope(|scope| { 131 //! // Spawn a thread that receives a message and then sends one. 132 //! scope.spawn(|_| { 133 //! r.recv().unwrap(); 134 //! s.send(2).unwrap(); 135 //! }); 136 //! 137 //! // Send a message and then receive one. 138 //! s.send(1).unwrap(); 139 //! r.recv().unwrap(); 140 //! }).unwrap(); 141 //! ``` 142 //! 143 //! # Disconnection 144 //! 145 //! When all senders or all receivers associated with a channel get dropped, the channel becomes 146 //! disconnected. No more messages can be sent, but any remaining messages can still be received. 147 //! Send and receive operations on a disconnected channel never block. 148 //! 149 //! ``` 150 //! use crossbeam_channel::{unbounded, RecvError}; 151 //! 152 //! let (s, r) = unbounded(); 153 //! s.send(1).unwrap(); 154 //! s.send(2).unwrap(); 155 //! s.send(3).unwrap(); 156 //! 157 //! // The only sender is dropped, disconnecting the channel. 158 //! drop(s); 159 //! 160 //! // The remaining messages can be received. 161 //! assert_eq!(r.recv(), Ok(1)); 162 //! assert_eq!(r.recv(), Ok(2)); 163 //! assert_eq!(r.recv(), Ok(3)); 164 //! 165 //! // There are no more messages in the channel. 166 //! assert!(r.is_empty()); 167 //! 168 //! // Note that calling `r.recv()` does not block. 169 //! // Instead, `Err(RecvError)` is returned immediately. 170 //! assert_eq!(r.recv(), Err(RecvError)); 171 //! ``` 172 //! 173 //! # Blocking operations 174 //! 175 //! Send and receive operations come in three flavors: 176 //! 177 //! * Non-blocking (returns immediately with success or failure). 178 //! * Blocking (waits until the operation succeeds or the channel becomes disconnected). 179 //! * Blocking with a timeout (blocks only for a certain duration of time). 180 //! 181 //! A simple example showing the difference between non-blocking and blocking operations: 182 //! 183 //! ``` 184 //! use crossbeam_channel::{bounded, RecvError, TryRecvError}; 185 //! 186 //! let (s, r) = bounded(1); 187 //! 188 //! // Send a message into the channel. 189 //! s.send("foo").unwrap(); 190 //! 191 //! // This call would block because the channel is full. 192 //! // s.send("bar").unwrap(); 193 //! 194 //! // Receive the message. 195 //! assert_eq!(r.recv(), Ok("foo")); 196 //! 197 //! // This call would block because the channel is empty. 198 //! // r.recv(); 199 //! 200 //! // Try receiving a message without blocking. 201 //! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); 202 //! 203 //! // Disconnect the channel. 204 //! drop(s); 205 //! 206 //! // This call doesn't block because the channel is now disconnected. 207 //! assert_eq!(r.recv(), Err(RecvError)); 208 //! ``` 209 //! 210 //! # Iteration 211 //! 212 //! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that 213 //! receives messages until the channel becomes empty and disconnected. Note that iteration may 214 //! block waiting for next message to arrive. 215 //! 216 //! ``` 217 //! use std::thread; 218 //! use crossbeam_channel::unbounded; 219 //! 220 //! let (s, r) = unbounded(); 221 //! 222 //! thread::spawn(move || { 223 //! s.send(1).unwrap(); 224 //! s.send(2).unwrap(); 225 //! s.send(3).unwrap(); 226 //! drop(s); // Disconnect the channel. 227 //! }); 228 //! 229 //! // Collect all messages from the channel. 230 //! // Note that the call to `collect` blocks until the sender is dropped. 231 //! let v: Vec<_> = r.iter().collect(); 232 //! 233 //! assert_eq!(v, [1, 2, 3]); 234 //! ``` 235 //! 236 //! A non-blocking iterator can be created using [`try_iter`], which receives all available 237 //! messages without blocking: 238 //! 239 //! ``` 240 //! use crossbeam_channel::unbounded; 241 //! 242 //! let (s, r) = unbounded(); 243 //! s.send(1).unwrap(); 244 //! s.send(2).unwrap(); 245 //! s.send(3).unwrap(); 246 //! // No need to drop the sender. 247 //! 248 //! // Receive all messages currently in the channel. 249 //! let v: Vec<_> = r.try_iter().collect(); 250 //! 251 //! assert_eq!(v, [1, 2, 3]); 252 //! ``` 253 //! 254 //! # Selection 255 //! 256 //! The [`select!`] macro allows you to define a set of channel operations, wait until any one of 257 //! them becomes ready, and finally execute it. If multiple operations are ready at the same time, 258 //! a random one among them is selected. 259 //! 260 //! It is also possible to define a `default` case that gets executed if none of the operations are 261 //! ready, either right away or for a certain duration of time. 262 //! 263 //! An operation is considered to be ready if it doesn't have to block. Note that it is ready even 264 //! when it will simply return an error because the channel is disconnected. 265 //! 266 //! An example of receiving a message from two channels: 267 //! 268 //! ``` 269 //! use std::thread; 270 //! use std::time::Duration; 271 //! use crossbeam_channel::{select, unbounded}; 272 //! 273 //! let (s1, r1) = unbounded(); 274 //! let (s2, r2) = unbounded(); 275 //! 276 //! thread::spawn(move || s1.send(10).unwrap()); 277 //! thread::spawn(move || s2.send(20).unwrap()); 278 //! 279 //! // At most one of these two receive operations will be executed. 280 //! select! { 281 //! recv(r1) -> msg => assert_eq!(msg, Ok(10)), 282 //! recv(r2) -> msg => assert_eq!(msg, Ok(20)), 283 //! default(Duration::from_secs(1)) => println!("timed out"), 284 //! } 285 //! ``` 286 //! 287 //! If you need to select over a dynamically created list of channel operations, use [`Select`] 288 //! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. 289 //! 290 //! # Extra channels 291 //! 292 //! Three functions can create special kinds of channels, all of which return just a [`Receiver`] 293 //! handle: 294 //! 295 //! * [`after`] creates a channel that delivers a single message after a certain duration of time. 296 //! * [`tick`] creates a channel that delivers messages periodically. 297 //! * [`never`](never()) creates a channel that never delivers messages. 298 //! 299 //! These channels are very efficient because messages get lazily generated on receive operations. 300 //! 301 //! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: 302 //! 303 //! ``` 304 //! use std::time::{Duration, Instant}; 305 //! use crossbeam_channel::{after, select, tick}; 306 //! 307 //! let start = Instant::now(); 308 //! let ticker = tick(Duration::from_millis(50)); 309 //! let timeout = after(Duration::from_secs(1)); 310 //! 311 //! loop { 312 //! select! { 313 //! recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()), 314 //! recv(timeout) -> _ => break, 315 //! } 316 //! } 317 //! ``` 318 //! 319 //! [`send`]: Sender::send 320 //! [`recv`]: Receiver::recv 321 //! [`iter`]: Receiver::iter 322 //! [`try_iter`]: Receiver::try_iter 323 324 #![doc(test( 325 no_crate_inject, 326 attr( 327 deny(warnings, rust_2018_idioms), 328 allow(dead_code, unused_assignments, unused_variables) 329 ) 330 ))] 331 #![warn( 332 missing_docs, 333 missing_debug_implementations, 334 rust_2018_idioms, 335 unreachable_pub 336 )] 337 #![cfg_attr(not(feature = "std"), no_std)] 338 339 use cfg_if::cfg_if; 340 341 cfg_if! { 342 if #[cfg(feature = "std")] { 343 mod channel; 344 mod context; 345 mod counter; 346 mod err; 347 mod flavors; 348 mod select; 349 mod select_macro; 350 mod utils; 351 mod waker; 352 353 /// Crate internals used by the `select!` macro. 354 #[doc(hidden)] 355 pub mod internal { 356 pub use crate::select::SelectHandle; 357 pub use crate::select::{select, select_timeout, try_select}; 358 } 359 360 pub use crate::channel::{after, at, never, tick}; 361 pub use crate::channel::{bounded, unbounded}; 362 pub use crate::channel::{IntoIter, Iter, TryIter}; 363 pub use crate::channel::{Receiver, Sender}; 364 365 pub use crate::select::{Select, SelectedOperation}; 366 367 pub use crate::err::{ReadyTimeoutError, SelectTimeoutError, TryReadyError, TrySelectError}; 368 pub use crate::err::{RecvError, RecvTimeoutError, TryRecvError}; 369 pub use crate::err::{SendError, SendTimeoutError, TrySendError}; 370 } 371 } 372