1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(target_arch = "wasm32")]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 
8 use tokio::sync::watch;
9 use tokio_test::task::spawn;
10 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
11 
12 #[test]
single_rx_recv()13 fn single_rx_recv() {
14     let (tx, mut rx) = watch::channel("one");
15 
16     {
17         // Not initially notified
18         let mut t = spawn(rx.changed());
19         assert_pending!(t.poll());
20     }
21     assert_eq!(*rx.borrow(), "one");
22 
23     {
24         let mut t = spawn(rx.changed());
25         assert_pending!(t.poll());
26 
27         tx.send("two").unwrap();
28 
29         assert!(t.is_woken());
30 
31         assert_ready_ok!(t.poll());
32     }
33     assert_eq!(*rx.borrow(), "two");
34 
35     {
36         let mut t = spawn(rx.changed());
37         assert_pending!(t.poll());
38 
39         drop(tx);
40 
41         assert!(t.is_woken());
42         assert_ready_err!(t.poll());
43     }
44     assert_eq!(*rx.borrow(), "two");
45 }
46 
47 #[test]
multi_rx()48 fn multi_rx() {
49     let (tx, mut rx1) = watch::channel("one");
50     let mut rx2 = rx1.clone();
51 
52     {
53         let mut t1 = spawn(rx1.changed());
54         let mut t2 = spawn(rx2.changed());
55 
56         assert_pending!(t1.poll());
57         assert_pending!(t2.poll());
58     }
59     assert_eq!(*rx1.borrow(), "one");
60     assert_eq!(*rx2.borrow(), "one");
61 
62     let mut t2 = spawn(rx2.changed());
63 
64     {
65         let mut t1 = spawn(rx1.changed());
66 
67         assert_pending!(t1.poll());
68         assert_pending!(t2.poll());
69 
70         tx.send("two").unwrap();
71 
72         assert!(t1.is_woken());
73         assert!(t2.is_woken());
74 
75         assert_ready_ok!(t1.poll());
76     }
77     assert_eq!(*rx1.borrow(), "two");
78 
79     {
80         let mut t1 = spawn(rx1.changed());
81 
82         assert_pending!(t1.poll());
83 
84         tx.send("three").unwrap();
85 
86         assert!(t1.is_woken());
87         assert!(t2.is_woken());
88 
89         assert_ready_ok!(t1.poll());
90         assert_ready_ok!(t2.poll());
91     }
92     assert_eq!(*rx1.borrow(), "three");
93 
94     drop(t2);
95 
96     assert_eq!(*rx2.borrow(), "three");
97 
98     {
99         let mut t1 = spawn(rx1.changed());
100         let mut t2 = spawn(rx2.changed());
101 
102         assert_pending!(t1.poll());
103         assert_pending!(t2.poll());
104 
105         tx.send("four").unwrap();
106 
107         assert_ready_ok!(t1.poll());
108         assert_ready_ok!(t2.poll());
109     }
110     assert_eq!(*rx1.borrow(), "four");
111     assert_eq!(*rx2.borrow(), "four");
112 }
113 
114 #[test]
rx_observes_final_value()115 fn rx_observes_final_value() {
116     // Initial value
117 
118     let (tx, mut rx) = watch::channel("one");
119     drop(tx);
120 
121     {
122         let mut t1 = spawn(rx.changed());
123         assert_ready_err!(t1.poll());
124     }
125     assert_eq!(*rx.borrow(), "one");
126 
127     // Sending a value
128 
129     let (tx, mut rx) = watch::channel("one");
130 
131     tx.send("two").unwrap();
132 
133     {
134         let mut t1 = spawn(rx.changed());
135         assert_ready_ok!(t1.poll());
136     }
137     assert_eq!(*rx.borrow(), "two");
138 
139     {
140         let mut t1 = spawn(rx.changed());
141         assert_pending!(t1.poll());
142 
143         tx.send("three").unwrap();
144         drop(tx);
145 
146         assert!(t1.is_woken());
147 
148         assert_ready_ok!(t1.poll());
149     }
150     assert_eq!(*rx.borrow(), "three");
151 
152     {
153         let mut t1 = spawn(rx.changed());
154         assert_ready_err!(t1.poll());
155     }
156     assert_eq!(*rx.borrow(), "three");
157 }
158 
159 #[test]
poll_close()160 fn poll_close() {
161     let (tx, rx) = watch::channel("one");
162 
163     {
164         let mut t = spawn(tx.closed());
165         assert_pending!(t.poll());
166 
167         drop(rx);
168 
169         assert!(t.is_woken());
170         assert_ready!(t.poll());
171     }
172 
173     assert!(tx.send("two").is_err());
174 }
175 
176 #[test]
borrow_and_update()177 fn borrow_and_update() {
178     let (tx, mut rx) = watch::channel("one");
179 
180     assert!(!rx.has_changed().unwrap());
181 
182     tx.send("two").unwrap();
183     assert!(rx.has_changed().unwrap());
184     assert_ready!(spawn(rx.changed()).poll()).unwrap();
185     assert_pending!(spawn(rx.changed()).poll());
186     assert!(!rx.has_changed().unwrap());
187 
188     tx.send("three").unwrap();
189     assert!(rx.has_changed().unwrap());
190     assert_eq!(*rx.borrow_and_update(), "three");
191     assert_pending!(spawn(rx.changed()).poll());
192     assert!(!rx.has_changed().unwrap());
193 
194     drop(tx);
195     assert_eq!(*rx.borrow_and_update(), "three");
196     assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
197     assert!(rx.has_changed().is_err());
198 }
199 
200 #[test]
reopened_after_subscribe()201 fn reopened_after_subscribe() {
202     let (tx, rx) = watch::channel("one");
203     assert!(!tx.is_closed());
204 
205     drop(rx);
206     assert!(tx.is_closed());
207 
208     let rx = tx.subscribe();
209     assert!(!tx.is_closed());
210 
211     drop(rx);
212     assert!(tx.is_closed());
213 }
214