1 //! A single-producer, multi-consumer channel that only retains the *last* sent
2 //! value.
3 //!
4 //! This channel is useful for watching for changes to a value from multiple
5 //! points in the code base, for example, changes to configuration values.
6 //!
7 //! # Usage
8 //!
9 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are
10 //! the producer and sender halves of the channel. The channel is
11 //! created with an initial value. [`Receiver::recv`] will always
12 //! be ready upon creation and will yield either this initial value or
13 //! the latest value that has been sent by `Sender`.
14 //!
15 //! Calls to [`Receiver::recv`] will always yield the latest value.
16 //!
17 //! # Examples
18 //!
19 //! ```
20 //! use tokio::sync::watch;
21 //!
22 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23 //! let (tx, mut rx) = watch::channel("hello");
24 //!
25 //! tokio::spawn(async move {
26 //! while let Some(value) = rx.recv().await {
27 //! println!("received = {:?}", value);
28 //! }
29 //! });
30 //!
31 //! tx.broadcast("world")?;
32 //! # Ok(())
33 //! # }
34 //! ```
35 //!
36 //! # Closing
37 //!
38 //! [`Sender::closed`] allows the producer to detect when all [`Receiver`]
39 //! handles have been dropped. This indicates that there is no further interest
40 //! in the values being produced and work can be stopped.
41 //!
42 //! # Thread safety
43 //!
44 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
45 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
46 //! handles may be moved to separate threads and also used concurrently.
47 //!
48 //! [`Sender`]: crate::sync::watch::Sender
49 //! [`Receiver`]: crate::sync::watch::Receiver
50 //! [`Receiver::recv`]: crate::sync::watch::Receiver::recv
51 //! [`channel`]: crate::sync::watch::channel
52 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
53
54 use crate::future::poll_fn;
55 use crate::sync::task::AtomicWaker;
56
57 use fnv::FnvHashSet;
58 use std::ops;
59 use std::sync::atomic::AtomicUsize;
60 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
61 use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
62 use std::task::Poll::{Pending, Ready};
63 use std::task::{Context, Poll};
64
65 /// Receives values from the associated [`Sender`](struct@Sender).
66 ///
67 /// Instances are created by the [`channel`](fn@channel) function.
68 #[derive(Debug)]
69 pub struct Receiver<T> {
70 /// Pointer to the shared state
71 shared: Arc<Shared<T>>,
72
73 /// Pointer to the watcher's internal state
74 inner: Watcher,
75 }
76
77 /// Sends values to the associated [`Receiver`](struct@Receiver).
78 ///
79 /// Instances are created by the [`channel`](fn@channel) function.
80 #[derive(Debug)]
81 pub struct Sender<T> {
82 shared: Weak<Shared<T>>,
83 }
84
85 /// Returns a reference to the inner value
86 ///
87 /// Outstanding borrows hold a read lock on the inner value. This means that
88 /// long lived borrows could cause the produce half to block. It is recommended
89 /// to keep the borrow as short lived as possible.
90 #[derive(Debug)]
91 pub struct Ref<'a, T> {
92 inner: RwLockReadGuard<'a, T>,
93 }
94
95 pub mod error {
96 //! Watch error types
97
98 use std::fmt;
99
100 /// Error produced when sending a value fails.
101 #[derive(Debug)]
102 pub struct SendError<T> {
103 pub(crate) inner: T,
104 }
105
106 // ===== impl SendError =====
107
108 impl<T: fmt::Debug> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result109 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
110 write!(fmt, "channel closed")
111 }
112 }
113
114 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
115 }
116
117 #[derive(Debug)]
118 struct Shared<T> {
119 /// The most recent value
120 value: RwLock<T>,
121
122 /// The current version
123 ///
124 /// The lowest bit represents a "closed" state. The rest of the bits
125 /// represent the current version.
126 version: AtomicUsize,
127
128 /// All watchers
129 watchers: Mutex<Watchers>,
130
131 /// Task to notify when all watchers drop
132 cancel: AtomicWaker,
133 }
134
135 type Watchers = FnvHashSet<Watcher>;
136
137 /// The watcher's ID is based on the Arc's pointer.
138 #[derive(Clone, Debug)]
139 struct Watcher(Arc<WatchInner>);
140
141 #[derive(Debug)]
142 struct WatchInner {
143 /// Last observed version
144 version: AtomicUsize,
145 waker: AtomicWaker,
146 }
147
148 const CLOSED: usize = 1;
149
150 /// Creates a new watch channel, returning the "send" and "receive" handles.
151 ///
152 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
153 /// Only the last value sent is made available to the [`Receiver`] half. All
154 /// intermediate values are dropped.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use tokio::sync::watch;
160 ///
161 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
162 /// let (tx, mut rx) = watch::channel("hello");
163 ///
164 /// tokio::spawn(async move {
165 /// while let Some(value) = rx.recv().await {
166 /// println!("received = {:?}", value);
167 /// }
168 /// });
169 ///
170 /// tx.broadcast("world")?;
171 /// # Ok(())
172 /// # }
173 /// ```
174 ///
175 /// [`Sender`]: struct@Sender
176 /// [`Receiver`]: struct@Receiver
channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>)177 pub fn channel<T: Clone>(init: T) -> (Sender<T>, Receiver<T>) {
178 const VERSION_0: usize = 0;
179 const VERSION_1: usize = 2;
180
181 // We don't start knowing VERSION_1
182 let inner = Watcher::new_version(VERSION_0);
183
184 // Insert the watcher
185 let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default());
186 watchers.insert(inner.clone());
187
188 let shared = Arc::new(Shared {
189 value: RwLock::new(init),
190 version: AtomicUsize::new(VERSION_1),
191 watchers: Mutex::new(watchers),
192 cancel: AtomicWaker::new(),
193 });
194
195 let tx = Sender {
196 shared: Arc::downgrade(&shared),
197 };
198
199 let rx = Receiver { shared, inner };
200
201 (tx, rx)
202 }
203
204 impl<T> Receiver<T> {
205 /// Returns a reference to the most recently sent value
206 ///
207 /// Outstanding borrows hold a read lock. This means that long lived borrows
208 /// could cause the send half to block. It is recommended to keep the borrow
209 /// as short lived as possible.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use tokio::sync::watch;
215 ///
216 /// let (_, rx) = watch::channel("hello");
217 /// assert_eq!(*rx.borrow(), "hello");
218 /// ```
borrow(&self) -> Ref<'_, T>219 pub fn borrow(&self) -> Ref<'_, T> {
220 let inner = self.shared.value.read().unwrap();
221 Ref { inner }
222 }
223
224 // TODO: document
225 #[doc(hidden)]
poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>>226 pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
227 // Make sure the task is up to date
228 self.inner.waker.register_by_ref(cx.waker());
229
230 let state = self.shared.version.load(SeqCst);
231 let version = state & !CLOSED;
232
233 if self.inner.version.swap(version, Relaxed) != version {
234 let inner = self.shared.value.read().unwrap();
235
236 return Ready(Some(Ref { inner }));
237 }
238
239 if CLOSED == state & CLOSED {
240 // The `Store` handle has been dropped.
241 return Ready(None);
242 }
243
244 Pending
245 }
246 }
247
248 impl<T: Clone> Receiver<T> {
249 /// Attempts to clone the latest value sent via the channel.
250 ///
251 /// If this is the first time the function is called on a `Receiver`
252 /// instance, then the function completes immediately with the **current**
253 /// value held by the channel. On the next call, the function waits until
254 /// a new value is sent in the channel.
255 ///
256 /// `None` is returned if the `Sender` half is dropped.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use tokio::sync::watch;
262 ///
263 /// #[tokio::main]
264 /// async fn main() {
265 /// let (tx, mut rx) = watch::channel("hello");
266 ///
267 /// let v = rx.recv().await.unwrap();
268 /// assert_eq!(v, "hello");
269 ///
270 /// tokio::spawn(async move {
271 /// tx.broadcast("goodbye").unwrap();
272 /// });
273 ///
274 /// // Waits for the new task to spawn and send the value.
275 /// let v = rx.recv().await.unwrap();
276 /// assert_eq!(v, "goodbye");
277 ///
278 /// let v = rx.recv().await;
279 /// assert!(v.is_none());
280 /// }
281 /// ```
recv(&mut self) -> Option<T>282 pub async fn recv(&mut self) -> Option<T> {
283 poll_fn(|cx| {
284 let v_ref = ready!(self.poll_recv_ref(cx));
285 Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
286 })
287 .await
288 }
289 }
290
291 #[cfg(feature = "stream")]
292 impl<T: Clone> crate::stream::Stream for Receiver<T> {
293 type Item = T;
294
poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>295 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
296 let v_ref = ready!(self.poll_recv_ref(cx));
297
298 Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
299 }
300 }
301
302 impl<T> Clone for Receiver<T> {
clone(&self) -> Self303 fn clone(&self) -> Self {
304 let ver = self.inner.version.load(Relaxed);
305 let inner = Watcher::new_version(ver);
306 let shared = self.shared.clone();
307
308 shared.watchers.lock().unwrap().insert(inner.clone());
309
310 Receiver { shared, inner }
311 }
312 }
313
314 impl<T> Drop for Receiver<T> {
drop(&mut self)315 fn drop(&mut self) {
316 self.shared.watchers.lock().unwrap().remove(&self.inner);
317 }
318 }
319
320 impl<T> Sender<T> {
321 /// Broadcasts a new value via the channel, notifying all receivers.
broadcast(&self, value: T) -> Result<(), error::SendError<T>>322 pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
323 let shared = match self.shared.upgrade() {
324 Some(shared) => shared,
325 // All `Watch` handles have been canceled
326 None => return Err(error::SendError { inner: value }),
327 };
328
329 // Replace the value
330 {
331 let mut lock = shared.value.write().unwrap();
332 *lock = value;
333 }
334
335 // Update the version. 2 is used so that the CLOSED bit is not set.
336 shared.version.fetch_add(2, SeqCst);
337
338 // Notify all watchers
339 notify_all(&*shared);
340
341 // Return the old value
342 Ok(())
343 }
344
345 /// Completes when all receivers have dropped.
346 ///
347 /// This allows the producer to get notified when interest in the produced
348 /// values is canceled and immediately stop doing work.
closed(&mut self)349 pub async fn closed(&mut self) {
350 poll_fn(|cx| self.poll_close(cx)).await
351 }
352
poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()>353 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
354 match self.shared.upgrade() {
355 Some(shared) => {
356 shared.cancel.register_by_ref(cx.waker());
357 Pending
358 }
359 None => Ready(()),
360 }
361 }
362 }
363
364 /// Notifies all watchers of a change
notify_all<T>(shared: &Shared<T>)365 fn notify_all<T>(shared: &Shared<T>) {
366 let watchers = shared.watchers.lock().unwrap();
367
368 for watcher in watchers.iter() {
369 // Notify the task
370 watcher.waker.wake();
371 }
372 }
373
374 impl<T> Drop for Sender<T> {
drop(&mut self)375 fn drop(&mut self) {
376 if let Some(shared) = self.shared.upgrade() {
377 shared.version.fetch_or(CLOSED, SeqCst);
378 notify_all(&*shared);
379 }
380 }
381 }
382
383 // ===== impl Ref =====
384
385 impl<T> ops::Deref for Ref<'_, T> {
386 type Target = T;
387
deref(&self) -> &T388 fn deref(&self) -> &T {
389 self.inner.deref()
390 }
391 }
392
393 // ===== impl Shared =====
394
395 impl<T> Drop for Shared<T> {
drop(&mut self)396 fn drop(&mut self) {
397 self.cancel.wake();
398 }
399 }
400
401 // ===== impl Watcher =====
402
403 impl Watcher {
new_version(version: usize) -> Self404 fn new_version(version: usize) -> Self {
405 Watcher(Arc::new(WatchInner {
406 version: AtomicUsize::new(version),
407 waker: AtomicWaker::new(),
408 }))
409 }
410 }
411
412 impl std::cmp::PartialEq for Watcher {
eq(&self, other: &Watcher) -> bool413 fn eq(&self, other: &Watcher) -> bool {
414 Arc::ptr_eq(&self.0, &other.0)
415 }
416 }
417
418 impl std::cmp::Eq for Watcher {}
419
420 impl std::hash::Hash for Watcher {
hash<H: std::hash::Hasher>(&self, state: &mut H)421 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
422 (&*self.0 as *const WatchInner).hash(state)
423 }
424 }
425
426 impl std::ops::Deref for Watcher {
427 type Target = WatchInner;
428
deref(&self) -> &Self::Target429 fn deref(&self) -> &Self::Target {
430 &self.0
431 }
432 }
433