1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(target_arch = "wasm32")]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7
8 use tokio::sync::watch;
9 use tokio_test::task::spawn;
10 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
11
12 #[test]
single_rx_recv()13 fn single_rx_recv() {
14 let (tx, mut rx) = watch::channel("one");
15
16 {
17 // Not initially notified
18 let mut t = spawn(rx.changed());
19 assert_pending!(t.poll());
20 }
21 assert_eq!(*rx.borrow(), "one");
22
23 {
24 let mut t = spawn(rx.changed());
25 assert_pending!(t.poll());
26
27 tx.send("two").unwrap();
28
29 assert!(t.is_woken());
30
31 assert_ready_ok!(t.poll());
32 }
33 assert_eq!(*rx.borrow(), "two");
34
35 {
36 let mut t = spawn(rx.changed());
37 assert_pending!(t.poll());
38
39 drop(tx);
40
41 assert!(t.is_woken());
42 assert_ready_err!(t.poll());
43 }
44 assert_eq!(*rx.borrow(), "two");
45 }
46
47 #[test]
multi_rx()48 fn multi_rx() {
49 let (tx, mut rx1) = watch::channel("one");
50 let mut rx2 = rx1.clone();
51
52 {
53 let mut t1 = spawn(rx1.changed());
54 let mut t2 = spawn(rx2.changed());
55
56 assert_pending!(t1.poll());
57 assert_pending!(t2.poll());
58 }
59 assert_eq!(*rx1.borrow(), "one");
60 assert_eq!(*rx2.borrow(), "one");
61
62 let mut t2 = spawn(rx2.changed());
63
64 {
65 let mut t1 = spawn(rx1.changed());
66
67 assert_pending!(t1.poll());
68 assert_pending!(t2.poll());
69
70 tx.send("two").unwrap();
71
72 assert!(t1.is_woken());
73 assert!(t2.is_woken());
74
75 assert_ready_ok!(t1.poll());
76 }
77 assert_eq!(*rx1.borrow(), "two");
78
79 {
80 let mut t1 = spawn(rx1.changed());
81
82 assert_pending!(t1.poll());
83
84 tx.send("three").unwrap();
85
86 assert!(t1.is_woken());
87 assert!(t2.is_woken());
88
89 assert_ready_ok!(t1.poll());
90 assert_ready_ok!(t2.poll());
91 }
92 assert_eq!(*rx1.borrow(), "three");
93
94 drop(t2);
95
96 assert_eq!(*rx2.borrow(), "three");
97
98 {
99 let mut t1 = spawn(rx1.changed());
100 let mut t2 = spawn(rx2.changed());
101
102 assert_pending!(t1.poll());
103 assert_pending!(t2.poll());
104
105 tx.send("four").unwrap();
106
107 assert_ready_ok!(t1.poll());
108 assert_ready_ok!(t2.poll());
109 }
110 assert_eq!(*rx1.borrow(), "four");
111 assert_eq!(*rx2.borrow(), "four");
112 }
113
114 #[test]
rx_observes_final_value()115 fn rx_observes_final_value() {
116 // Initial value
117
118 let (tx, mut rx) = watch::channel("one");
119 drop(tx);
120
121 {
122 let mut t1 = spawn(rx.changed());
123 assert_ready_err!(t1.poll());
124 }
125 assert_eq!(*rx.borrow(), "one");
126
127 // Sending a value
128
129 let (tx, mut rx) = watch::channel("one");
130
131 tx.send("two").unwrap();
132
133 {
134 let mut t1 = spawn(rx.changed());
135 assert_ready_ok!(t1.poll());
136 }
137 assert_eq!(*rx.borrow(), "two");
138
139 {
140 let mut t1 = spawn(rx.changed());
141 assert_pending!(t1.poll());
142
143 tx.send("three").unwrap();
144 drop(tx);
145
146 assert!(t1.is_woken());
147
148 assert_ready_ok!(t1.poll());
149 }
150 assert_eq!(*rx.borrow(), "three");
151
152 {
153 let mut t1 = spawn(rx.changed());
154 assert_ready_err!(t1.poll());
155 }
156 assert_eq!(*rx.borrow(), "three");
157 }
158
159 #[test]
poll_close()160 fn poll_close() {
161 let (tx, rx) = watch::channel("one");
162
163 {
164 let mut t = spawn(tx.closed());
165 assert_pending!(t.poll());
166
167 drop(rx);
168
169 assert!(t.is_woken());
170 assert_ready!(t.poll());
171 }
172
173 assert!(tx.send("two").is_err());
174 }
175
176 #[test]
borrow_and_update()177 fn borrow_and_update() {
178 let (tx, mut rx) = watch::channel("one");
179
180 assert!(!rx.has_changed().unwrap());
181
182 tx.send("two").unwrap();
183 assert!(rx.has_changed().unwrap());
184 assert_ready!(spawn(rx.changed()).poll()).unwrap();
185 assert_pending!(spawn(rx.changed()).poll());
186 assert!(!rx.has_changed().unwrap());
187
188 tx.send("three").unwrap();
189 assert!(rx.has_changed().unwrap());
190 assert_eq!(*rx.borrow_and_update(), "three");
191 assert_pending!(spawn(rx.changed()).poll());
192 assert!(!rx.has_changed().unwrap());
193
194 drop(tx);
195 assert_eq!(*rx.borrow_and_update(), "three");
196 assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
197 assert!(rx.has_changed().is_err());
198 }
199
200 #[test]
reopened_after_subscribe()201 fn reopened_after_subscribe() {
202 let (tx, rx) = watch::channel("one");
203 assert!(!tx.is_closed());
204
205 drop(rx);
206 assert!(tx.is_closed());
207
208 let rx = tx.subscribe();
209 assert!(!tx.is_closed());
210
211 drop(rx);
212 assert!(tx.is_closed());
213 }
214