1 //! A single-producer, multi-consumer channel that only retains the *last* sent
2 //! value.
3 //!
4 //! This channel is useful for watching for changes to a value from multiple
5 //! points in the code base, for example, changes to configuration values.
6 //!
7 //! # Usage
8 //!
9 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
10 //! the producer and sender halves of the channel. The channel is
11 //! created with an initial value. [`Receiver::recv`] will always
12 //! be ready upon creation and will yield either this initial value or
13 //! the latest value that has been sent by `Sender`.
14 //!
15 //! Calls to [`Receiver::recv`] will always yield the latest value.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //!     let (tx, mut rx) = watch::channel("hello");
24 //!
25 //!     tokio::spawn(async move {
26 //!         while let Some(value) = rx.recv().await {
27 //!             println!("received = {:?}", value);
28 //!         }
29 //!     });
30 //!
31 //!     tx.broadcast("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
39 //! handles have been dropped. This indicates that there is no further interest
40 //! in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::recv`]: crate::sync::watch::Receiver::recv
51 //! [`channel`]: crate::sync::watch::channel
52 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
53 
54 use crate::future::poll_fn;
55 use crate::sync::task::AtomicWaker;
56 
57 use fnv::FnvHashSet;
58 use std::ops;
59 use std::sync::atomic::AtomicUsize;
60 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
61 use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
62 use std::task::Poll::{Pending, Ready};
63 use std::task::{Context, Poll};
64 
65 /// Receives values from the associated [`Sender`](struct@Sender).
66 ///
67 /// Instances are created by the [`channel`](fn@channel) function.
68 #[derive(Debug)]
69 pub struct Receiver<T> {
70     /// Pointer to the shared state
71     shared: Arc<Shared<T>>,
72 
73     /// Pointer to the watcher's internal state
74     inner: Watcher,
75 }
76 
77 /// Sends values to the associated [`Receiver`](struct@Receiver).
78 ///
79 /// Instances are created by the [`channel`](fn@channel) function.
80 #[derive(Debug)]
81 pub struct Sender<T> {
82     shared: Weak<Shared<T>>,
83 }
84 
85 /// Returns a reference to the inner value
86 ///
87 /// Outstanding borrows hold a read lock on the inner value. This means that
88 /// long lived borrows could cause the produce half to block. It is recommended
89 /// to keep the borrow as short lived as possible.
90 #[derive(Debug)]
91 pub struct Ref<'a, T> {
92     inner: RwLockReadGuard<'a, T>,
93 }
94 
95 pub mod error {
96     //! Watch error types
97 
98     use std::fmt;
99 
100     /// Error produced when sending a value fails.
101     #[derive(Debug)]
102     pub struct SendError<T> {
103         pub(crate) inner: T,
104     }
105 
106     // ===== impl SendError =====
107 
108     impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result109         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
110             write!(fmt, "channel closed")
111         }
112     }
113 
114     impl<T: fmt::Debug> std::error::Error for SendError<T> {}
115 }
116 
117 #[derive(Debug)]
118 struct Shared<T> {
119     /// The most recent value
120     value: RwLock<T>,
121 
122     /// The current version
123     ///
124     /// The lowest bit represents a "closed" state. The rest of the bits
125     /// represent the current version.
126     version: AtomicUsize,
127 
128     /// All watchers
129     watchers: Mutex<Watchers>,
130 
131     /// Task to notify when all watchers drop
132     cancel: AtomicWaker,
133 }
134 
135 type Watchers = FnvHashSet<Watcher>;
136 
137 /// The watcher's ID is based on the Arc's pointer.
138 #[derive(Clone, Debug)]
139 struct Watcher(Arc<WatchInner>);
140 
141 #[derive(Debug)]
142 struct WatchInner {
143     /// Last observed version
144     version: AtomicUsize,
145     waker: AtomicWaker,
146 }
147 
148 const CLOSED: usize = 1;
149 
150 /// Creates a new watch channel, returning the "send" and "receive" handles.
151 ///
152 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
153 /// Only the last value sent is made available to the [`Receiver`] half. All
154 /// intermediate values are dropped.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use tokio::sync::watch;
160 ///
161 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
162 ///     let (tx, mut rx) = watch::channel("hello");
163 ///
164 ///     tokio::spawn(async move {
165 ///         while let Some(value) = rx.recv().await {
166 ///             println!("received = {:?}", value);
167 ///         }
168 ///     });
169 ///
170 ///     tx.broadcast("world")?;
171 /// # Ok(())
172 /// # }
173 /// ```
174 ///
175 /// [`Sender`]: struct@Sender
176 /// [`Receiver`]: struct@Receiver
channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>)177 pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
178     const VERSION_0: usize = 0;
179     const VERSION_1: usize = 2;
180 
181     // We don't start knowing VERSION_1
182     let inner = Watcher::new_version(VERSION_0);
183 
184     // Insert the watcher
185     let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default());
186     watchers.insert(inner.clone());
187 
188     let shared = Arc::new(Shared {
189         value: RwLock::new(init),
190         version: AtomicUsize::new(VERSION_1),
191         watchers: Mutex::new(watchers),
192         cancel: AtomicWaker::new(),
193     });
194 
195     let tx = Sender {
196         shared: Arc::downgrade(&shared),
197     };
198 
199     let rx = Receiver { shared, inner };
200 
201     (tx, rx)
202 }
203 
204 impl<T> Receiver<T> {
205     /// Returns a reference to the most recently sent value
206     ///
207     /// Outstanding borrows hold a read lock. This means that long lived borrows
208     /// could cause the send half to block. It is recommended to keep the borrow
209     /// as short lived as possible.
210     ///
211     /// # Examples
212     ///
213     /// ```
214     /// use tokio::sync::watch;
215     ///
216     /// let (_, rx) = watch::channel("hello");
217     /// assert_eq!(*rx.borrow(), "hello");
218     /// ```
borrow(&self) -> Ref<'_, T>219     pub fn borrow(&self) -> Ref<'_, T> {
220         let inner = self.shared.value.read().unwrap();
221         Ref { inner }
222     }
223 
224     // TODO: document
225     #[doc(hidden)]
poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>>226     pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
227         // Make sure the task is up to date
228         self.inner.waker.register_by_ref(cx.waker());
229 
230         let state = self.shared.version.load(SeqCst);
231         let version = state & !CLOSED;
232 
233         if self.inner.version.swap(version, Relaxed) != version {
234             let inner = self.shared.value.read().unwrap();
235 
236             return Ready(Some(Ref { inner }));
237         }
238 
239         if CLOSED == state & CLOSED {
240             // The `Store` handle has been dropped.
241             return Ready(None);
242         }
243 
244         Pending
245     }
246 }
247 
248 impl<T: Clone> Receiver<T> {
249     /// Attempts to clone the latest value sent via the channel.
250     ///
251     /// If this is the first time the function is called on a `Receiver`
252     /// instance, then the function completes immediately with the **current**
253     /// value held by the channel. On the next call, the function waits until
254     /// a new value is sent in the channel.
255     ///
256     /// `None` is returned if the `Sender` half is dropped.
257     ///
258     /// # Examples
259     ///
260     /// ```
261     /// use tokio::sync::watch;
262     ///
263     /// #[tokio::main]
264     /// async fn main() {
265     ///     let (tx, mut rx) = watch::channel("hello");
266     ///
267     ///     let v = rx.recv().await.unwrap();
268     ///     assert_eq!(v, "hello");
269     ///
270     ///     tokio::spawn(async move {
271     ///         tx.broadcast("goodbye").unwrap();
272     ///     });
273     ///
274     ///     // Waits for the new task to spawn and send the value.
275     ///     let v = rx.recv().await.unwrap();
276     ///     assert_eq!(v, "goodbye");
277     ///
278     ///     let v = rx.recv().await;
279     ///     assert!(v.is_none());
280     /// }
281     /// ```
recv(&mut self) -> Option<T>282     pub async fn recv(&mut self) -> Option<T> {
283         poll_fn(|cx| {
284             let v_ref = ready!(self.poll_recv_ref(cx));
285             Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
286         })
287         .await
288     }
289 }
290 
291 #[cfg(feature = "stream")]
292 impl<T: Clone> crate::stream::Stream for Receiver<T> {
293     type Item = T;
294 
poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>295     fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
296         let v_ref = ready!(self.poll_recv_ref(cx));
297 
298         Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
299     }
300 }
301 
302 impl<T> Clone for Receiver<T> {
clone(&self) -> Self303     fn clone(&self) -> Self {
304         let ver = self.inner.version.load(Relaxed);
305         let inner = Watcher::new_version(ver);
306         let shared = self.shared.clone();
307 
308         shared.watchers.lock().unwrap().insert(inner.clone());
309 
310         Receiver { shared, inner }
311     }
312 }
313 
314 impl<T> Drop for Receiver<T> {
drop(&mut self)315     fn drop(&mut self) {
316         self.shared.watchers.lock().unwrap().remove(&self.inner);
317     }
318 }
319 
320 impl<T> Sender<T> {
321     /// Broadcasts a new value via the channel, notifying all receivers.
broadcast(&self, value: T) -> Result<(), error::SendError<T>>322     pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
323         let shared = match self.shared.upgrade() {
324             Some(shared) => shared,
325             // All `Watch` handles have been canceled
326             None => return Err(error::SendError { inner: value }),
327         };
328 
329         // Replace the value
330         {
331             let mut lock = shared.value.write().unwrap();
332             *lock = value;
333         }
334 
335         // Update the version. 2 is used so that the CLOSED bit is not set.
336         shared.version.fetch_add(2, SeqCst);
337 
338         // Notify all watchers
339         notify_all(&*shared);
340 
341         Ok(())
342     }
343 
344     /// Completes when all receivers have dropped.
345     ///
346     /// This allows the producer to get notified when interest in the produced
347     /// values is canceled and immediately stop doing work.
closed(&mut self)348     pub async fn closed(&mut self) {
349         poll_fn(|cx| self.poll_close(cx)).await
350     }
351 
poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()>352     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
353         match self.shared.upgrade() {
354             Some(shared) => {
355                 shared.cancel.register_by_ref(cx.waker());
356                 Pending
357             }
358             None => Ready(()),
359         }
360     }
361 }
362 
363 /// Notifies all watchers of a change
notify_all<T>(shared: &Shared<T>)364 fn notify_all<T>(shared: &Shared<T>) {
365     let watchers = shared.watchers.lock().unwrap();
366 
367     for watcher in watchers.iter() {
368         // Notify the task
369         watcher.waker.wake();
370     }
371 }
372 
373 impl<T> Drop for Sender<T> {
drop(&mut self)374     fn drop(&mut self) {
375         if let Some(shared) = self.shared.upgrade() {
376             shared.version.fetch_or(CLOSED, SeqCst);
377             notify_all(&*shared);
378         }
379     }
380 }
381 
382 // ===== impl Ref =====
383 
384 impl<T> ops::Deref for Ref<'_, T> {
385     type Target = T;
386 
deref(&self) -> &T387     fn deref(&self) -> &T {
388         self.inner.deref()
389     }
390 }
391 
392 // ===== impl Shared =====
393 
394 impl<T> Drop for Shared<T> {
drop(&mut self)395     fn drop(&mut self) {
396         self.cancel.wake();
397     }
398 }
399 
400 // ===== impl Watcher =====
401 
402 impl Watcher {
new_version(version: usize) -> Self403     fn new_version(version: usize) -> Self {
404         Watcher(Arc::new(WatchInner {
405             version: AtomicUsize::new(version),
406             waker: AtomicWaker::new(),
407         }))
408     }
409 }
410 
411 impl std::cmp::PartialEq for Watcher {
eq(&self, other: &Watcher) -> bool412     fn eq(&self, other: &Watcher) -> bool {
413         Arc::ptr_eq(&self.0, &other.0)
414     }
415 }
416 
417 impl std::cmp::Eq for Watcher {}
418 
419 impl std::hash::Hash for Watcher {
hash<H: std::hash::Hasher>(&self, state: &mut H)420     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
421         (&*self.0 as *const WatchInner).hash(state)
422     }
423 }
424 
425 impl std::ops::Deref for Watcher {
426     type Target = WatchInner;
427 
deref(&self) -> &Self::Target428     fn deref(&self) -> &Self::Target {
429         &self.0
430     }
431 }
432