1 use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
2 use crate::sync::mpsc::chan;
3 use crate::sync::mpsc::error::{SendError, TrySendError};
4
5 cfg_time! {
6 use crate::sync::mpsc::error::SendTimeoutError;
7 use crate::time::Duration;
8 }
9
10 use std::fmt;
11 use std::task::{Context, Poll};
12
13 /// Send values to the associated `Receiver`.
14 ///
15 /// Instances are created by the [`channel`](channel) function.
16 ///
17 /// To use the `Sender` in a poll function, you can use the [`PollSender`]
18 /// utility.
19 ///
20 /// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
21 pub struct Sender<T> {
22 chan: chan::Tx<T, Semaphore>,
23 }
24
25 /// Permit to send one value into the channel.
26 ///
27 /// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
28 /// and are used to guarantee channel capacity before generating a message to send.
29 ///
30 /// [`Sender::reserve()`]: Sender::reserve
31 /// [`Sender::try_reserve()`]: Sender::try_reserve
32 pub struct Permit<'a, T> {
33 chan: &'a chan::Tx<T, Semaphore>,
34 }
35
36 /// Owned permit to send one value into the channel.
37 ///
38 /// This is identical to the [`Permit`] type, except that it moves the sender
39 /// rather than borrowing it.
40 ///
41 /// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
42 /// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
43 /// before generating a message to send.
44 ///
45 /// [`Permit`]: Permit
46 /// [`Sender::reserve_owned()`]: Sender::reserve_owned
47 /// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
48 pub struct OwnedPermit<T> {
49 chan: Option<chan::Tx<T, Semaphore>>,
50 }
51
52 /// Receive values from the associated `Sender`.
53 ///
54 /// Instances are created by the [`channel`](channel) function.
55 ///
56 /// This receiver can be turned into a `Stream` using [`ReceiverStream`].
57 ///
58 /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
59 pub struct Receiver<T> {
60 /// The channel receiver
61 chan: chan::Rx<T, Semaphore>,
62 }
63
64 /// Creates a bounded mpsc channel for communicating between asynchronous tasks
65 /// with backpressure.
66 ///
67 /// The channel will buffer up to the provided number of messages. Once the
68 /// buffer is full, attempts to send new messages will wait until a message is
69 /// received from the channel. The provided buffer capacity must be at least 1.
70 ///
71 /// All data sent on `Sender` will become available on `Receiver` in the same
72 /// order as it was sent.
73 ///
74 /// The `Sender` can be cloned to `send` to the same channel from multiple code
75 /// locations. Only one `Receiver` is supported.
76 ///
77 /// If the `Receiver` is disconnected while trying to `send`, the `send` method
78 /// will return a `SendError`. Similarly, if `Sender` is disconnected while
79 /// trying to `recv`, the `recv` method will return `None`.
80 ///
81 /// # Panics
82 ///
83 /// Panics if the buffer capacity is 0.
84 ///
85 /// # Examples
86 ///
87 /// ```rust
88 /// use tokio::sync::mpsc;
89 ///
90 /// #[tokio::main]
91 /// async fn main() {
92 /// let (tx, mut rx) = mpsc::channel(100);
93 ///
94 /// tokio::spawn(async move {
95 /// for i in 0..10 {
96 /// if let Err(_) = tx.send(i).await {
97 /// println!("receiver dropped");
98 /// return;
99 /// }
100 /// }
101 /// });
102 ///
103 /// while let Some(i) = rx.recv().await {
104 /// println!("got = {}", i);
105 /// }
106 /// }
107 /// ```
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)108 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
109 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
110 let semaphore = (semaphore::Semaphore::new(buffer), buffer);
111 let (tx, rx) = chan::channel(semaphore);
112
113 let tx = Sender::new(tx);
114 let rx = Receiver::new(rx);
115
116 (tx, rx)
117 }
118
119 /// Channel semaphore is a tuple of the semaphore implementation and a `usize`
120 /// representing the channel bound.
121 type Semaphore = (semaphore::Semaphore, usize);
122
123 impl<T> Receiver<T> {
new(chan: chan::Rx<T, Semaphore>) -> Receiver<T>124 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
125 Receiver { chan }
126 }
127
128 /// Receives the next value for this receiver.
129 ///
130 /// This method returns `None` if the channel has been closed and there are
131 /// no remaining messages in the channel's buffer. This indicates that no
132 /// further values can ever be received from this `Receiver`. The channel is
133 /// closed when all senders have been dropped, or when [`close`] is called.
134 ///
135 /// If there are no messages in the channel's buffer, but the channel has
136 /// not yet been closed, this method will sleep until a message is sent or
137 /// the channel is closed. Note that if [`close`] is called, but there are
138 /// still outstanding [`Permits`] from before it was closed, the channel is
139 /// not considered closed by `recv` until the permits are released.
140 ///
141 /// # Cancel safety
142 ///
143 /// This method is cancel safe. If `recv` is used as the event in a
144 /// [`tokio::select!`](crate::select) statement and some other branch
145 /// completes first, it is guaranteed that no messages were received on this
146 /// channel.
147 ///
148 /// [`close`]: Self::close
149 /// [`Permits`]: struct@crate::sync::mpsc::Permit
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::channel(100);
159 ///
160 /// tokio::spawn(async move {
161 /// tx.send("hello").await.unwrap();
162 /// });
163 ///
164 /// assert_eq!(Some("hello"), rx.recv().await);
165 /// assert_eq!(None, rx.recv().await);
166 /// }
167 /// ```
168 ///
169 /// Values are buffered:
170 ///
171 /// ```
172 /// use tokio::sync::mpsc;
173 ///
174 /// #[tokio::main]
175 /// async fn main() {
176 /// let (tx, mut rx) = mpsc::channel(100);
177 ///
178 /// tx.send("hello").await.unwrap();
179 /// tx.send("world").await.unwrap();
180 ///
181 /// assert_eq!(Some("hello"), rx.recv().await);
182 /// assert_eq!(Some("world"), rx.recv().await);
183 /// }
184 /// ```
recv(&mut self) -> Option<T>185 pub async fn recv(&mut self) -> Option<T> {
186 use crate::future::poll_fn;
187 poll_fn(|cx| self.chan.recv(cx)).await
188 }
189
190 /// Blocking receive to call outside of asynchronous contexts.
191 ///
192 /// This method returns `None` if the channel has been closed and there are
193 /// no remaining messages in the channel's buffer. This indicates that no
194 /// further values can ever be received from this `Receiver`. The channel is
195 /// closed when all senders have been dropped, or when [`close`] is called.
196 ///
197 /// If there are no messages in the channel's buffer, but the channel has
198 /// not yet been closed, this method will block until a message is sent or
199 /// the channel is closed.
200 ///
201 /// This method is intended for use cases where you are sending from
202 /// asynchronous code to synchronous code, and will work even if the sender
203 /// is not using [`blocking_send`] to send the message.
204 ///
205 /// Note that if [`close`] is called, but there are still outstanding
206 /// [`Permits`] from before it was closed, the channel is not considered
207 /// closed by `blocking_recv` until the permits are released.
208 ///
209 /// [`close`]: Self::close
210 /// [`Permits`]: struct@crate::sync::mpsc::Permit
211 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
212 ///
213 /// # Panics
214 ///
215 /// This function panics if called within an asynchronous execution
216 /// context.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// use std::thread;
222 /// use tokio::runtime::Runtime;
223 /// use tokio::sync::mpsc;
224 ///
225 /// fn main() {
226 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
227 ///
228 /// let sync_code = thread::spawn(move || {
229 /// assert_eq!(Some(10), rx.blocking_recv());
230 /// });
231 ///
232 /// Runtime::new()
233 /// .unwrap()
234 /// .block_on(async move {
235 /// let _ = tx.send(10).await;
236 /// });
237 /// sync_code.join().unwrap()
238 /// }
239 /// ```
240 #[cfg(feature = "sync")]
blocking_recv(&mut self) -> Option<T>241 pub fn blocking_recv(&mut self) -> Option<T> {
242 crate::future::block_on(self.recv())
243 }
244
245 /// Closes the receiving half of a channel without dropping it.
246 ///
247 /// This prevents any further messages from being sent on the channel while
248 /// still enabling the receiver to drain messages that are buffered. Any
249 /// outstanding [`Permit`] values will still be able to send messages.
250 ///
251 /// To guarantee that no messages are dropped, after calling `close()`,
252 /// `recv()` must be called until `None` is returned. If there are
253 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
254 /// not return `None` until those are released.
255 ///
256 /// [`Permit`]: Permit
257 /// [`OwnedPermit`]: OwnedPermit
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use tokio::sync::mpsc;
263 ///
264 /// #[tokio::main]
265 /// async fn main() {
266 /// let (tx, mut rx) = mpsc::channel(20);
267 ///
268 /// tokio::spawn(async move {
269 /// let mut i = 0;
270 /// while let Ok(permit) = tx.reserve().await {
271 /// permit.send(i);
272 /// i += 1;
273 /// }
274 /// });
275 ///
276 /// rx.close();
277 ///
278 /// while let Some(msg) = rx.recv().await {
279 /// println!("got {}", msg);
280 /// }
281 ///
282 /// // Channel closed and no messages are lost.
283 /// }
284 /// ```
close(&mut self)285 pub fn close(&mut self) {
286 self.chan.close();
287 }
288
289 /// Polls to receive the next message on this channel.
290 ///
291 /// This method returns:
292 ///
293 /// * `Poll::Pending` if no messages are available but the channel is not
294 /// closed.
295 /// * `Poll::Ready(Some(message))` if a message is available.
296 /// * `Poll::Ready(None)` if the channel has been closed and all messages
297 /// sent before it was closed have been received.
298 ///
299 /// When the method returns `Poll::Pending`, the `Waker` in the provided
300 /// `Context` is scheduled to receive a wakeup when a message is sent on any
301 /// receiver, or when the channel is closed. Note that on multiple calls to
302 /// `poll_recv`, only the `Waker` from the `Context` passed to the most
303 /// recent call is scheduled to receive a wakeup.
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>304 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
305 self.chan.recv(cx)
306 }
307 }
308
309 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result310 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
311 fmt.debug_struct("Receiver")
312 .field("chan", &self.chan)
313 .finish()
314 }
315 }
316
317 impl<T> Unpin for Receiver<T> {}
318
319 impl<T> Sender<T> {
new(chan: chan::Tx<T, Semaphore>) -> Sender<T>320 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
321 Sender { chan }
322 }
323
324 /// Sends a value, waiting until there is capacity.
325 ///
326 /// A successful send occurs when it is determined that the other end of the
327 /// channel has not hung up already. An unsuccessful send would be one where
328 /// the corresponding receiver has already been closed. Note that a return
329 /// value of `Err` means that the data will never be received, but a return
330 /// value of `Ok` does not mean that the data will be received. It is
331 /// possible for the corresponding receiver to hang up immediately after
332 /// this function returns `Ok`.
333 ///
334 /// # Errors
335 ///
336 /// If the receive half of the channel is closed, either due to [`close`]
337 /// being called or the [`Receiver`] handle dropping, the function returns
338 /// an error. The error includes the value passed to `send`.
339 ///
340 /// [`close`]: Receiver::close
341 /// [`Receiver`]: Receiver
342 ///
343 /// # Cancel safety
344 ///
345 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
346 /// statement and some other branch completes first, then it is guaranteed
347 /// that the message was not sent.
348 ///
349 /// This channel uses a queue to ensure that calls to `send` and `reserve`
350 /// complete in the order they were requested. Cancelling a call to
351 /// `send` makes you lose your place in the queue.
352 ///
353 /// # Examples
354 ///
355 /// In the following example, each call to `send` will block until the
356 /// previously sent value was received.
357 ///
358 /// ```rust
359 /// use tokio::sync::mpsc;
360 ///
361 /// #[tokio::main]
362 /// async fn main() {
363 /// let (tx, mut rx) = mpsc::channel(1);
364 ///
365 /// tokio::spawn(async move {
366 /// for i in 0..10 {
367 /// if let Err(_) = tx.send(i).await {
368 /// println!("receiver dropped");
369 /// return;
370 /// }
371 /// }
372 /// });
373 ///
374 /// while let Some(i) = rx.recv().await {
375 /// println!("got = {}", i);
376 /// }
377 /// }
378 /// ```
send(&self, value: T) -> Result<(), SendError<T>>379 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
380 match self.reserve().await {
381 Ok(permit) => {
382 permit.send(value);
383 Ok(())
384 }
385 Err(_) => Err(SendError(value)),
386 }
387 }
388
389 /// Completes when the receiver has dropped.
390 ///
391 /// This allows the producers to get notified when interest in the produced
392 /// values is canceled and immediately stop doing work.
393 ///
394 /// # Cancel safety
395 ///
396 /// This method is cancel safe. Once the channel is closed, it stays closed
397 /// forever and all future calls to `closed` will return immediately.
398 ///
399 /// # Examples
400 ///
401 /// ```
402 /// use tokio::sync::mpsc;
403 ///
404 /// #[tokio::main]
405 /// async fn main() {
406 /// let (tx1, rx) = mpsc::channel::<()>(1);
407 /// let tx2 = tx1.clone();
408 /// let tx3 = tx1.clone();
409 /// let tx4 = tx1.clone();
410 /// let tx5 = tx1.clone();
411 /// tokio::spawn(async move {
412 /// drop(rx);
413 /// });
414 ///
415 /// futures::join!(
416 /// tx1.closed(),
417 /// tx2.closed(),
418 /// tx3.closed(),
419 /// tx4.closed(),
420 /// tx5.closed()
421 /// );
422 /// println!("Receiver dropped");
423 /// }
424 /// ```
closed(&self)425 pub async fn closed(&self) {
426 self.chan.closed().await
427 }
428
429 /// Attempts to immediately send a message on this `Sender`
430 ///
431 /// This method differs from [`send`] by returning immediately if the channel's
432 /// buffer is full or no receiver is waiting to acquire some data. Compared
433 /// with [`send`], this function has two failure cases instead of one (one for
434 /// disconnection, one for a full buffer).
435 ///
436 /// # Errors
437 ///
438 /// If the channel capacity has been reached, i.e., the channel has `n`
439 /// buffered values where `n` is the argument passed to [`channel`], then an
440 /// error is returned.
441 ///
442 /// If the receive half of the channel is closed, either due to [`close`]
443 /// being called or the [`Receiver`] handle dropping, the function returns
444 /// an error. The error includes the value passed to `send`.
445 ///
446 /// [`send`]: Sender::send
447 /// [`channel`]: channel
448 /// [`close`]: Receiver::close
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// use tokio::sync::mpsc;
454 ///
455 /// #[tokio::main]
456 /// async fn main() {
457 /// // Create a channel with buffer size 1
458 /// let (tx1, mut rx) = mpsc::channel(1);
459 /// let tx2 = tx1.clone();
460 ///
461 /// tokio::spawn(async move {
462 /// tx1.send(1).await.unwrap();
463 /// tx1.send(2).await.unwrap();
464 /// // task waits until the receiver receives a value.
465 /// });
466 ///
467 /// tokio::spawn(async move {
468 /// // This will return an error and send
469 /// // no message if the buffer is full
470 /// let _ = tx2.try_send(3);
471 /// });
472 ///
473 /// let mut msg;
474 /// msg = rx.recv().await.unwrap();
475 /// println!("message {} received", msg);
476 ///
477 /// msg = rx.recv().await.unwrap();
478 /// println!("message {} received", msg);
479 ///
480 /// // Third message may have never been sent
481 /// match rx.recv().await {
482 /// Some(msg) => println!("message {} received", msg),
483 /// None => println!("the third message was never sent"),
484 /// }
485 /// }
486 /// ```
try_send(&self, message: T) -> Result<(), TrySendError<T>>487 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
488 match self.chan.semaphore().0.try_acquire(1) {
489 Ok(_) => {}
490 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
491 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
492 }
493
494 // Send the message
495 self.chan.send(message);
496 Ok(())
497 }
498
499 /// Sends a value, waiting until there is capacity, but only for a limited time.
500 ///
501 /// Shares the same success and error conditions as [`send`], adding one more
502 /// condition for an unsuccessful send, which is when the provided timeout has
503 /// elapsed, and there is no capacity available.
504 ///
505 /// [`send`]: Sender::send
506 ///
507 /// # Errors
508 ///
509 /// If the receive half of the channel is closed, either due to [`close`]
510 /// being called or the [`Receiver`] having been dropped,
511 /// the function returns an error. The error includes the value passed to `send`.
512 ///
513 /// [`close`]: Receiver::close
514 /// [`Receiver`]: Receiver
515 ///
516 /// # Examples
517 ///
518 /// In the following example, each call to `send_timeout` will block until the
519 /// previously sent value was received, unless the timeout has elapsed.
520 ///
521 /// ```rust
522 /// use tokio::sync::mpsc;
523 /// use tokio::time::{sleep, Duration};
524 ///
525 /// #[tokio::main]
526 /// async fn main() {
527 /// let (tx, mut rx) = mpsc::channel(1);
528 ///
529 /// tokio::spawn(async move {
530 /// for i in 0..10 {
531 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
532 /// println!("send error: #{:?}", e);
533 /// return;
534 /// }
535 /// }
536 /// });
537 ///
538 /// while let Some(i) = rx.recv().await {
539 /// println!("got = {}", i);
540 /// sleep(Duration::from_millis(200)).await;
541 /// }
542 /// }
543 /// ```
544 #[cfg(feature = "time")]
545 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>546 pub async fn send_timeout(
547 &self,
548 value: T,
549 timeout: Duration,
550 ) -> Result<(), SendTimeoutError<T>> {
551 let permit = match crate::time::timeout(timeout, self.reserve()).await {
552 Err(_) => {
553 return Err(SendTimeoutError::Timeout(value));
554 }
555 Ok(Err(_)) => {
556 return Err(SendTimeoutError::Closed(value));
557 }
558 Ok(Ok(permit)) => permit,
559 };
560
561 permit.send(value);
562 Ok(())
563 }
564
565 /// Blocking send to call outside of asynchronous contexts.
566 ///
567 /// This method is intended for use cases where you are sending from
568 /// synchronous code to asynchronous code, and will work even if the
569 /// receiver is not using [`blocking_recv`] to receive the message.
570 ///
571 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
572 ///
573 /// # Panics
574 ///
575 /// This function panics if called within an asynchronous execution
576 /// context.
577 ///
578 /// # Examples
579 ///
580 /// ```
581 /// use std::thread;
582 /// use tokio::runtime::Runtime;
583 /// use tokio::sync::mpsc;
584 ///
585 /// fn main() {
586 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
587 ///
588 /// let sync_code = thread::spawn(move || {
589 /// tx.blocking_send(10).unwrap();
590 /// });
591 ///
592 /// Runtime::new().unwrap().block_on(async move {
593 /// assert_eq!(Some(10), rx.recv().await);
594 /// });
595 /// sync_code.join().unwrap()
596 /// }
597 /// ```
598 #[cfg(feature = "sync")]
blocking_send(&self, value: T) -> Result<(), SendError<T>>599 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
600 crate::future::block_on(self.send(value))
601 }
602
603 /// Checks if the channel has been closed. This happens when the
604 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
605 /// called.
606 ///
607 /// [`Receiver`]: crate::sync::mpsc::Receiver
608 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
609 ///
610 /// ```
611 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
612 /// assert!(!tx.is_closed());
613 ///
614 /// let tx2 = tx.clone();
615 /// assert!(!tx2.is_closed());
616 ///
617 /// drop(rx);
618 /// assert!(tx.is_closed());
619 /// assert!(tx2.is_closed());
620 /// ```
is_closed(&self) -> bool621 pub fn is_closed(&self) -> bool {
622 self.chan.is_closed()
623 }
624
625 /// Wait for channel capacity. Once capacity to send one message is
626 /// available, it is reserved for the caller.
627 ///
628 /// If the channel is full, the function waits for the number of unreceived
629 /// messages to become less than the channel capacity. Capacity to send one
630 /// message is reserved for the caller. A [`Permit`] is returned to track
631 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
632 /// reserved capacity.
633 ///
634 /// Dropping [`Permit`] without sending a message releases the capacity back
635 /// to the channel.
636 ///
637 /// [`Permit`]: Permit
638 /// [`send`]: Permit::send
639 ///
640 /// # Cancel safety
641 ///
642 /// This channel uses a queue to ensure that calls to `send` and `reserve`
643 /// complete in the order they were requested. Cancelling a call to
644 /// `reserve` makes you lose your place in the queue.
645 ///
646 /// # Examples
647 ///
648 /// ```
649 /// use tokio::sync::mpsc;
650 ///
651 /// #[tokio::main]
652 /// async fn main() {
653 /// let (tx, mut rx) = mpsc::channel(1);
654 ///
655 /// // Reserve capacity
656 /// let permit = tx.reserve().await.unwrap();
657 ///
658 /// // Trying to send directly on the `tx` will fail due to no
659 /// // available capacity.
660 /// assert!(tx.try_send(123).is_err());
661 ///
662 /// // Sending on the permit succeeds
663 /// permit.send(456);
664 ///
665 /// // The value sent on the permit is received
666 /// assert_eq!(rx.recv().await.unwrap(), 456);
667 /// }
668 /// ```
reserve(&self) -> Result<Permit<'_, T>, SendError<()>>669 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
670 self.reserve_inner().await?;
671 Ok(Permit { chan: &self.chan })
672 }
673
674 /// Wait for channel capacity, moving the `Sender` and returning an owned
675 /// permit. Once capacity to send one message is available, it is reserved
676 /// for the caller.
677 ///
678 /// This moves the sender _by value_, and returns an owned permit that can
679 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
680 /// this method may be used in cases where the permit must be valid for the
681 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
682 /// essentially a reference count increment, comparable to [`Arc::clone`]),
683 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
684 /// moved, it can be cloned prior to calling `reserve_owned`.
685 ///
686 /// If the channel is full, the function waits for the number of unreceived
687 /// messages to become less than the channel capacity. Capacity to send one
688 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
689 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
690 /// consumes the reserved capacity.
691 ///
692 /// Dropping the [`OwnedPermit`] without sending a message releases the
693 /// capacity back to the channel.
694 ///
695 /// # Cancel safety
696 ///
697 /// This channel uses a queue to ensure that calls to `send` and `reserve`
698 /// complete in the order they were requested. Cancelling a call to
699 /// `reserve_owned` makes you lose your place in the queue.
700 ///
701 /// # Examples
702 /// Sending a message using an [`OwnedPermit`]:
703 /// ```
704 /// use tokio::sync::mpsc;
705 ///
706 /// #[tokio::main]
707 /// async fn main() {
708 /// let (tx, mut rx) = mpsc::channel(1);
709 ///
710 /// // Reserve capacity, moving the sender.
711 /// let permit = tx.reserve_owned().await.unwrap();
712 ///
713 /// // Send a message, consuming the permit and returning
714 /// // the moved sender.
715 /// let tx = permit.send(123);
716 ///
717 /// // The value sent on the permit is received.
718 /// assert_eq!(rx.recv().await.unwrap(), 123);
719 ///
720 /// // The sender can now be used again.
721 /// tx.send(456).await.unwrap();
722 /// }
723 /// ```
724 ///
725 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
726 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
727 ///
728 /// ```
729 /// use tokio::sync::mpsc;
730 ///
731 /// #[tokio::main]
732 /// async fn main() {
733 /// let (tx, mut rx) = mpsc::channel(1);
734 ///
735 /// // Clone the sender and reserve capacity.
736 /// let permit = tx.clone().reserve_owned().await.unwrap();
737 ///
738 /// // Trying to send directly on the `tx` will fail due to no
739 /// // available capacity.
740 /// assert!(tx.try_send(123).is_err());
741 ///
742 /// // Sending on the permit succeeds.
743 /// permit.send(456);
744 ///
745 /// // The value sent on the permit is received
746 /// assert_eq!(rx.recv().await.unwrap(), 456);
747 /// }
748 /// ```
749 ///
750 /// [`Sender::reserve`]: Sender::reserve
751 /// [`OwnedPermit`]: OwnedPermit
752 /// [`send`]: OwnedPermit::send
753 /// [`Arc::clone`]: std::sync::Arc::clone
reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>754 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
755 self.reserve_inner().await?;
756 Ok(OwnedPermit {
757 chan: Some(self.chan),
758 })
759 }
760
reserve_inner(&self) -> Result<(), SendError<()>>761 async fn reserve_inner(&self) -> Result<(), SendError<()>> {
762 match self.chan.semaphore().0.acquire(1).await {
763 Ok(_) => Ok(()),
764 Err(_) => Err(SendError(())),
765 }
766 }
767
768 /// Try to acquire a slot in the channel without waiting for the slot to become
769 /// available.
770 ///
771 /// If the channel is full this function will return [`TrySendError`], otherwise
772 /// if there is a slot available it will return a [`Permit`] that will then allow you
773 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
774 /// [`reserve`] except it does not await for the slot to become available.
775 ///
776 /// Dropping [`Permit`] without sending a message releases the capacity back
777 /// to the channel.
778 ///
779 /// [`Permit`]: Permit
780 /// [`send`]: Permit::send
781 /// [`reserve`]: Sender::reserve
782 ///
783 /// # Examples
784 ///
785 /// ```
786 /// use tokio::sync::mpsc;
787 ///
788 /// #[tokio::main]
789 /// async fn main() {
790 /// let (tx, mut rx) = mpsc::channel(1);
791 ///
792 /// // Reserve capacity
793 /// let permit = tx.try_reserve().unwrap();
794 ///
795 /// // Trying to send directly on the `tx` will fail due to no
796 /// // available capacity.
797 /// assert!(tx.try_send(123).is_err());
798 ///
799 /// // Trying to reserve an additional slot on the `tx` will
800 /// // fail because there is no capacity.
801 /// assert!(tx.try_reserve().is_err());
802 ///
803 /// // Sending on the permit succeeds
804 /// permit.send(456);
805 ///
806 /// // The value sent on the permit is received
807 /// assert_eq!(rx.recv().await.unwrap(), 456);
808 ///
809 /// }
810 /// ```
try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>811 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
812 match self.chan.semaphore().0.try_acquire(1) {
813 Ok(_) => {}
814 Err(_) => return Err(TrySendError::Full(())),
815 }
816
817 Ok(Permit { chan: &self.chan })
818 }
819
820 /// Try to acquire a slot in the channel without waiting for the slot to become
821 /// available, returning an owned permit.
822 ///
823 /// This moves the sender _by value_, and returns an owned permit that can
824 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
825 /// this method may be used in cases where the permit must be valid for the
826 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
827 /// essentially a reference count increment, comparable to [`Arc::clone`]),
828 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
829 /// moved, it can be cloned prior to calling `try_reserve_owned`.
830 ///
831 /// If the channel is full this function will return a [`TrySendError`].
832 /// Since the sender is taken by value, the `TrySendError` returned in this
833 /// case contains the sender, so that it may be used again. Otherwise, if
834 /// there is a slot available, this method will return an [`OwnedPermit`]
835 /// that can then be used to [`send`] on the channel with a guaranteed slot.
836 /// This function is similar to [`reserve_owned`] except it does not await
837 /// for the slot to become available.
838 ///
839 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
840 /// to the channel.
841 ///
842 /// [`OwnedPermit`]: OwnedPermit
843 /// [`send`]: OwnedPermit::send
844 /// [`reserve_owned`]: Sender::reserve_owned
845 /// [`Arc::clone`]: std::sync::Arc::clone
846 ///
847 /// # Examples
848 ///
849 /// ```
850 /// use tokio::sync::mpsc;
851 ///
852 /// #[tokio::main]
853 /// async fn main() {
854 /// let (tx, mut rx) = mpsc::channel(1);
855 ///
856 /// // Reserve capacity
857 /// let permit = tx.clone().try_reserve_owned().unwrap();
858 ///
859 /// // Trying to send directly on the `tx` will fail due to no
860 /// // available capacity.
861 /// assert!(tx.try_send(123).is_err());
862 ///
863 /// // Trying to reserve an additional slot on the `tx` will
864 /// // fail because there is no capacity.
865 /// assert!(tx.try_reserve().is_err());
866 ///
867 /// // Sending on the permit succeeds
868 /// permit.send(456);
869 ///
870 /// // The value sent on the permit is received
871 /// assert_eq!(rx.recv().await.unwrap(), 456);
872 ///
873 /// }
874 /// ```
try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>875 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
876 match self.chan.semaphore().0.try_acquire(1) {
877 Ok(_) => {}
878 Err(_) => return Err(TrySendError::Full(self)),
879 }
880
881 Ok(OwnedPermit {
882 chan: Some(self.chan),
883 })
884 }
885
886 /// Returns `true` if senders belong to the same channel.
887 ///
888 /// # Examples
889 ///
890 /// ```
891 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
892 /// let tx2 = tx.clone();
893 /// assert!(tx.same_channel(&tx2));
894 ///
895 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
896 /// assert!(!tx3.same_channel(&tx2));
897 /// ```
same_channel(&self, other: &Self) -> bool898 pub fn same_channel(&self, other: &Self) -> bool {
899 self.chan.same_channel(&other.chan)
900 }
901
902 /// Returns the current capacity of the channel.
903 ///
904 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
905 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
906 ///
907 /// # Examples
908 ///
909 /// ```
910 /// use tokio::sync::mpsc;
911 ///
912 /// #[tokio::main]
913 /// async fn main() {
914 /// let (tx, mut rx) = mpsc::channel::<()>(5);
915 ///
916 /// assert_eq!(tx.capacity(), 5);
917 ///
918 /// // Making a reservation drops the capacity by one.
919 /// let permit = tx.reserve().await.unwrap();
920 /// assert_eq!(tx.capacity(), 4);
921 ///
922 /// // Sending and receiving a value increases the capacity by one.
923 /// permit.send(());
924 /// rx.recv().await.unwrap();
925 /// assert_eq!(tx.capacity(), 5);
926 /// }
927 /// ```
928 ///
929 /// [`send`]: Sender::send
930 /// [`reserve`]: Sender::reserve
capacity(&self) -> usize931 pub fn capacity(&self) -> usize {
932 self.chan.semaphore().0.available_permits()
933 }
934 }
935
936 impl<T> Clone for Sender<T> {
clone(&self) -> Self937 fn clone(&self) -> Self {
938 Sender {
939 chan: self.chan.clone(),
940 }
941 }
942 }
943
944 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result945 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
946 fmt.debug_struct("Sender")
947 .field("chan", &self.chan)
948 .finish()
949 }
950 }
951
952 // ===== impl Permit =====
953
954 impl<T> Permit<'_, T> {
955 /// Sends a value using the reserved capacity.
956 ///
957 /// Capacity for the message has already been reserved. The message is sent
958 /// to the receiver and the permit is consumed. The operation will succeed
959 /// even if the receiver half has been closed. See [`Receiver::close`] for
960 /// more details on performing a clean shutdown.
961 ///
962 /// [`Receiver::close`]: Receiver::close
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// use tokio::sync::mpsc;
968 ///
969 /// #[tokio::main]
970 /// async fn main() {
971 /// let (tx, mut rx) = mpsc::channel(1);
972 ///
973 /// // Reserve capacity
974 /// let permit = tx.reserve().await.unwrap();
975 ///
976 /// // Trying to send directly on the `tx` will fail due to no
977 /// // available capacity.
978 /// assert!(tx.try_send(123).is_err());
979 ///
980 /// // Send a message on the permit
981 /// permit.send(456);
982 ///
983 /// // The value sent on the permit is received
984 /// assert_eq!(rx.recv().await.unwrap(), 456);
985 /// }
986 /// ```
send(self, value: T)987 pub fn send(self, value: T) {
988 use std::mem;
989
990 self.chan.send(value);
991
992 // Avoid the drop logic
993 mem::forget(self);
994 }
995 }
996
997 impl<T> Drop for Permit<'_, T> {
drop(&mut self)998 fn drop(&mut self) {
999 use chan::Semaphore;
1000
1001 let semaphore = self.chan.semaphore();
1002
1003 // Add the permit back to the semaphore
1004 semaphore.add_permit();
1005
1006 // If this is the last sender for this channel, wake the receiver so
1007 // that it can be notified that the channel is closed.
1008 if semaphore.is_closed() && semaphore.is_idle() {
1009 self.chan.wake_rx();
1010 }
1011 }
1012 }
1013
1014 impl<T> fmt::Debug for Permit<'_, T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1015 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1016 fmt.debug_struct("Permit")
1017 .field("chan", &self.chan)
1018 .finish()
1019 }
1020 }
1021
1022 // ===== impl Permit =====
1023
1024 impl<T> OwnedPermit<T> {
1025 /// Sends a value using the reserved capacity.
1026 ///
1027 /// Capacity for the message has already been reserved. The message is sent
1028 /// to the receiver and the permit is consumed. The operation will succeed
1029 /// even if the receiver half has been closed. See [`Receiver::close`] for
1030 /// more details on performing a clean shutdown.
1031 ///
1032 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1033 /// the `OwnedPermit` was reserved.
1034 ///
1035 /// [`Receiver::close`]: Receiver::close
1036 ///
1037 /// # Examples
1038 ///
1039 /// ```
1040 /// use tokio::sync::mpsc;
1041 ///
1042 /// #[tokio::main]
1043 /// async fn main() {
1044 /// let (tx, mut rx) = mpsc::channel(1);
1045 ///
1046 /// // Reserve capacity
1047 /// let permit = tx.reserve_owned().await.unwrap();
1048 ///
1049 /// // Send a message on the permit, returning the sender.
1050 /// let tx = permit.send(456);
1051 ///
1052 /// // The value sent on the permit is received
1053 /// assert_eq!(rx.recv().await.unwrap(), 456);
1054 ///
1055 /// // We may now reuse `tx` to send another message.
1056 /// tx.send(789).await.unwrap();
1057 /// }
1058 /// ```
send(mut self, value: T) -> Sender<T>1059 pub fn send(mut self, value: T) -> Sender<T> {
1060 let chan = self.chan.take().unwrap_or_else(|| {
1061 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1062 });
1063 chan.send(value);
1064
1065 Sender { chan }
1066 }
1067
1068 /// Release the reserved capacity *without* sending a message, returning the
1069 /// [`Sender`].
1070 ///
1071 /// # Examples
1072 ///
1073 /// ```
1074 /// use tokio::sync::mpsc;
1075 ///
1076 /// #[tokio::main]
1077 /// async fn main() {
1078 /// let (tx, rx) = mpsc::channel(1);
1079 ///
1080 /// // Clone the sender and reserve capacity
1081 /// let permit = tx.clone().reserve_owned().await.unwrap();
1082 ///
1083 /// // Trying to send on the original `tx` will fail, since the `permit`
1084 /// // has reserved all the available capacity.
1085 /// assert!(tx.try_send(123).is_err());
1086 ///
1087 /// // Release the permit without sending a message, returning the clone
1088 /// // of the sender.
1089 /// let tx2 = permit.release();
1090 ///
1091 /// // We may now reuse `tx` to send another message.
1092 /// tx.send(789).await.unwrap();
1093 /// # drop(rx); drop(tx2);
1094 /// }
1095 /// ```
1096 ///
1097 /// [`Sender`]: Sender
release(mut self) -> Sender<T>1098 pub fn release(mut self) -> Sender<T> {
1099 use chan::Semaphore;
1100
1101 let chan = self.chan.take().unwrap_or_else(|| {
1102 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1103 });
1104
1105 // Add the permit back to the semaphore
1106 chan.semaphore().add_permit();
1107 Sender { chan }
1108 }
1109 }
1110
1111 impl<T> Drop for OwnedPermit<T> {
drop(&mut self)1112 fn drop(&mut self) {
1113 use chan::Semaphore;
1114
1115 // Are we still holding onto the sender?
1116 if let Some(chan) = self.chan.take() {
1117 let semaphore = chan.semaphore();
1118
1119 // Add the permit back to the semaphore
1120 semaphore.add_permit();
1121
1122 // If this `OwnedPermit` is holding the last sender for this
1123 // channel, wake the receiver so that it can be notified that the
1124 // channel is closed.
1125 if semaphore.is_closed() && semaphore.is_idle() {
1126 chan.wake_rx();
1127 }
1128 }
1129
1130 // Otherwise, do nothing.
1131 }
1132 }
1133
1134 impl<T> fmt::Debug for OwnedPermit<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1135 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1136 fmt.debug_struct("OwnedPermit")
1137 .field("chan", &self.chan)
1138 .finish()
1139 }
1140 }
1141