1 //! Multi-producer multi-consumer channels for message passing. 2 //! 3 //! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. 4 //! 5 //! # Hello, world! 6 //! 7 //! ``` 8 //! use crossbeam_channel::unbounded; 9 //! 10 //! // Create a channel of unbounded capacity. 11 //! let (s, r) = unbounded(); 12 //! 13 //! // Send a message into the channel. 14 //! s.send("Hello, world!").unwrap(); 15 //! 16 //! // Receive the message from the channel. 17 //! assert_eq!(r.recv(), Ok("Hello, world!")); 18 //! ``` 19 //! 20 //! # Channel types 21 //! 22 //! Channels can be created using two functions: 23 //! 24 //! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages 25 //! it can hold at a time. 26 //! 27 //! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of 28 //! messages at a time. 29 //! 30 //! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides 31 //! of a channel. 32 //! 33 //! Creating a bounded channel: 34 //! 35 //! ``` 36 //! use crossbeam_channel::bounded; 37 //! 38 //! // Create a channel that can hold at most 5 messages at a time. 39 //! let (s, r) = bounded(5); 40 //! 41 //! // Can send only 5 messages without blocking. 42 //! for i in 0..5 { 43 //! s.send(i).unwrap(); 44 //! } 45 //! 46 //! // Another call to `send` would block because the channel is full. 47 //! // s.send(5).unwrap(); 48 //! ``` 49 //! 50 //! Creating an unbounded channel: 51 //! 52 //! ``` 53 //! use crossbeam_channel::unbounded; 54 //! 55 //! // Create an unbounded channel. 56 //! let (s, r) = unbounded(); 57 //! 58 //! // Can send any number of messages into the channel without blocking. 59 //! for i in 0..1000 { 60 //! s.send(i).unwrap(); 61 //! } 62 //! ``` 63 //! 64 //! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and 65 //! receive operations must appear at the same time in order to pair up and pass the message over: 66 //! 67 //! ``` 68 //! use std::thread; 69 //! use crossbeam_channel::bounded; 70 //! 71 //! // Create a zero-capacity channel. 72 //! let (s, r) = bounded(0); 73 //! 74 //! // Sending blocks until a receive operation appears on the other side. 75 //! thread::spawn(move || s.send("Hi!").unwrap()); 76 //! 77 //! // Receiving blocks until a send operation appears on the other side. 78 //! assert_eq!(r.recv(), Ok("Hi!")); 79 //! ``` 80 //! 81 //! # Sharing channels 82 //! 83 //! Senders and receivers can be cloned and sent to other threads: 84 //! 85 //! ``` 86 //! use std::thread; 87 //! use crossbeam_channel::bounded; 88 //! 89 //! let (s1, r1) = bounded(0); 90 //! let (s2, r2) = (s1.clone(), r1.clone()); 91 //! 92 //! // Spawn a thread that receives a message and then sends one. 93 //! thread::spawn(move || { 94 //! r2.recv().unwrap(); 95 //! s2.send(2).unwrap(); 96 //! }); 97 //! 98 //! // Send a message and then receive one. 99 //! s1.send(1).unwrap(); 100 //! r1.recv().unwrap(); 101 //! ``` 102 //! 103 //! Note that cloning only creates a new handle to the same sending or receiving side. It does not 104 //! create a separate stream of messages in any way: 105 //! 106 //! ``` 107 //! use crossbeam_channel::unbounded; 108 //! 109 //! let (s1, r1) = unbounded(); 110 //! let (s2, r2) = (s1.clone(), r1.clone()); 111 //! let (s3, r3) = (s2.clone(), r2.clone()); 112 //! 113 //! s1.send(10).unwrap(); 114 //! s2.send(20).unwrap(); 115 //! s3.send(30).unwrap(); 116 //! 117 //! assert_eq!(r3.recv(), Ok(10)); 118 //! assert_eq!(r1.recv(), Ok(20)); 119 //! assert_eq!(r2.recv(), Ok(30)); 120 //! ``` 121 //! 122 //! It's also possible to share senders and receivers by reference: 123 //! 124 //! ``` 125 //! # extern crate crossbeam_channel; 126 //! # extern crate crossbeam_utils; 127 //! # fn main() { 128 //! use std::thread; 129 //! use crossbeam_channel::bounded; 130 //! use crossbeam_utils::thread::scope; 131 //! 132 //! let (s, r) = bounded(0); 133 //! 134 //! scope(|scope| { 135 //! // Spawn a thread that receives a message and then sends one. 136 //! scope.spawn(|_| { 137 //! r.recv().unwrap(); 138 //! s.send(2).unwrap(); 139 //! }); 140 //! 141 //! // Send a message and then receive one. 142 //! s.send(1).unwrap(); 143 //! r.recv().unwrap(); 144 //! }).unwrap(); 145 //! # } 146 //! ``` 147 //! 148 //! # Disconnection 149 //! 150 //! When all senders or all receivers associated with a channel get dropped, the channel becomes 151 //! disconnected. No more messages can be sent, but any remaining messages can still be received. 152 //! Send and receive operations on a disconnected channel never block. 153 //! 154 //! ``` 155 //! use crossbeam_channel::{unbounded, RecvError}; 156 //! 157 //! let (s, r) = unbounded(); 158 //! s.send(1).unwrap(); 159 //! s.send(2).unwrap(); 160 //! s.send(3).unwrap(); 161 //! 162 //! // The only sender is dropped, disconnecting the channel. 163 //! drop(s); 164 //! 165 //! // The remaining messages can be received. 166 //! assert_eq!(r.recv(), Ok(1)); 167 //! assert_eq!(r.recv(), Ok(2)); 168 //! assert_eq!(r.recv(), Ok(3)); 169 //! 170 //! // There are no more messages in the channel. 171 //! assert!(r.is_empty()); 172 //! 173 //! // Note that calling `r.recv()` does not block. 174 //! // Instead, `Err(RecvError)` is returned immediately. 175 //! assert_eq!(r.recv(), Err(RecvError)); 176 //! ``` 177 //! 178 //! # Blocking operations 179 //! 180 //! Send and receive operations come in three flavors: 181 //! 182 //! * Non-blocking (returns immediately with success or failure). 183 //! * Blocking (waits until the operation succeeds or the channel becomes disconnected). 184 //! * Blocking with a timeout (blocks only for a certain duration of time). 185 //! 186 //! A simple example showing the difference between non-blocking and blocking operations: 187 //! 188 //! ``` 189 //! use crossbeam_channel::{bounded, RecvError, TryRecvError}; 190 //! 191 //! let (s, r) = bounded(1); 192 //! 193 //! // Send a message into the channel. 194 //! s.send("foo").unwrap(); 195 //! 196 //! // This call would block because the channel is full. 197 //! // s.send("bar").unwrap(); 198 //! 199 //! // Receive the message. 200 //! assert_eq!(r.recv(), Ok("foo")); 201 //! 202 //! // This call would block because the channel is empty. 203 //! // r.recv(); 204 //! 205 //! // Try receiving a message without blocking. 206 //! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); 207 //! 208 //! // Disconnect the channel. 209 //! drop(s); 210 //! 211 //! // This call doesn't block because the channel is now disconnected. 212 //! assert_eq!(r.recv(), Err(RecvError)); 213 //! ``` 214 //! 215 //! # Iteration 216 //! 217 //! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that 218 //! receives messages until the channel becomes empty and disconnected. Note that iteration may 219 //! block waiting for next message to arrive. 220 //! 221 //! ``` 222 //! use std::thread; 223 //! use crossbeam_channel::unbounded; 224 //! 225 //! let (s, r) = unbounded(); 226 //! 227 //! thread::spawn(move || { 228 //! s.send(1).unwrap(); 229 //! s.send(2).unwrap(); 230 //! s.send(3).unwrap(); 231 //! drop(s); // Disconnect the channel. 232 //! }); 233 //! 234 //! // Collect all messages from the channel. 235 //! // Note that the call to `collect` blocks until the sender is dropped. 236 //! let v: Vec<_> = r.iter().collect(); 237 //! 238 //! assert_eq!(v, [1, 2, 3]); 239 //! ``` 240 //! 241 //! A non-blocking iterator can be created using [`try_iter`], which receives all available 242 //! messages without blocking: 243 //! 244 //! ``` 245 //! use crossbeam_channel::unbounded; 246 //! 247 //! let (s, r) = unbounded(); 248 //! s.send(1).unwrap(); 249 //! s.send(2).unwrap(); 250 //! s.send(3).unwrap(); 251 //! // No need to drop the sender. 252 //! 253 //! // Receive all messages currently in the channel. 254 //! let v: Vec<_> = r.try_iter().collect(); 255 //! 256 //! assert_eq!(v, [1, 2, 3]); 257 //! ``` 258 //! 259 //! # Selection 260 //! 261 //! The [`select!`] macro allows you to define a set of channel operations, wait until any one of 262 //! them becomes ready, and finally execute it. If multiple operations are ready at the same time, 263 //! a random one among them is selected. 264 //! 265 //! It is also possible to define a `default` case that gets executed if none of the operations are 266 //! ready, either right away or for a certain duration of time. 267 //! 268 //! An operation is considered to be ready if it doesn't have to block. Note that it is ready even 269 //! when it will simply return an error because the channel is disconnected. 270 //! 271 //! An example of receiving a message from two channels: 272 //! 273 //! ``` 274 //! # #[macro_use] 275 //! # extern crate crossbeam_channel; 276 //! # fn main() { 277 //! use std::thread; 278 //! use std::time::Duration; 279 //! use crossbeam_channel::unbounded; 280 //! 281 //! let (s1, r1) = unbounded(); 282 //! let (s2, r2) = unbounded(); 283 //! 284 //! thread::spawn(move || s1.send(10).unwrap()); 285 //! thread::spawn(move || s2.send(20).unwrap()); 286 //! 287 //! // At most one of these two receive operations will be executed. 288 //! select! { 289 //! recv(r1) -> msg => assert_eq!(msg, Ok(10)), 290 //! recv(r2) -> msg => assert_eq!(msg, Ok(20)), 291 //! default(Duration::from_secs(1)) => println!("timed out"), 292 //! } 293 //! # } 294 //! ``` 295 //! 296 //! If you need to select over a dynamically created list of channel operations, use [`Select`] 297 //! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. 298 //! 299 //! # Extra channels 300 //! 301 //! Three functions can create special kinds of channels, all of which return just a [`Receiver`] 302 //! handle: 303 //! 304 //! * [`after`] creates a channel that delivers a single message after a certain duration of time. 305 //! * [`tick`] creates a channel that delivers messages periodically. 306 //! * [`never`] creates a channel that never delivers messages. 307 //! 308 //! These channels are very efficient because messages get lazily generated on receive operations. 309 //! 310 //! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: 311 //! 312 //! ``` 313 //! # #[macro_use] 314 //! # extern crate crossbeam_channel; 315 //! # fn main() { 316 //! use std::time::{Duration, Instant}; 317 //! use crossbeam_channel::{after, tick}; 318 //! 319 //! let start = Instant::now(); 320 //! let ticker = tick(Duration::from_millis(50)); 321 //! let timeout = after(Duration::from_secs(1)); 322 //! 323 //! loop { 324 //! select! { 325 //! recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()), 326 //! recv(timeout) -> _ => break, 327 //! } 328 //! } 329 //! # } 330 //! ``` 331 //! 332 //! [`std::sync::mpsc`]: https://doc.rust-lang.org/std/sync/mpsc/index.html 333 //! [`unbounded`]: fn.unbounded.html 334 //! [`bounded`]: fn.bounded.html 335 //! [`after`]: fn.after.html 336 //! [`tick`]: fn.tick.html 337 //! [`never`]: fn.never.html 338 //! [`send`]: struct.Sender.html#method.send 339 //! [`recv`]: struct.Receiver.html#method.recv 340 //! [`iter`]: struct.Receiver.html#method.iter 341 //! [`try_iter`]: struct.Receiver.html#method.try_iter 342 //! [`select!`]: macro.select.html 343 //! [`Select`]: struct.Select.html 344 //! [`Sender`]: struct.Sender.html 345 //! [`Receiver`]: struct.Receiver.html 346 347 #![warn(missing_docs)] 348 #![warn(missing_debug_implementations)] 349 350 extern crate crossbeam_utils; 351 extern crate maybe_uninit; 352 353 mod channel; 354 mod context; 355 mod counter; 356 mod err; 357 mod flavors; 358 mod select; 359 mod select_macro; 360 mod utils; 361 mod waker; 362 363 /// Crate internals used by the `select!` macro. 364 #[doc(hidden)] 365 pub mod internal { 366 pub use select::SelectHandle; 367 pub use select::{select, select_timeout, try_select}; 368 } 369 370 pub use channel::{after, never, tick}; 371 pub use channel::{bounded, unbounded}; 372 pub use channel::{IntoIter, Iter, TryIter}; 373 pub use channel::{Receiver, Sender}; 374 375 pub use select::{Select, SelectedOperation}; 376 377 pub use err::{ReadyTimeoutError, SelectTimeoutError, TryReadyError, TrySelectError}; 378 pub use err::{RecvError, RecvTimeoutError, TryRecvError}; 379 pub use err::{SendError, SendTimeoutError, TrySendError}; 380