1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use tokio::sync::watch;
6 use tokio_test::task::spawn;
7 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
8 
9 #[test]
single_rx_recv()10 fn single_rx_recv() {
11     let (tx, mut rx) = watch::channel("one");
12 
13     {
14         // Not initially notified
15         let mut t = spawn(rx.changed());
16         assert_pending!(t.poll());
17     }
18     assert_eq!(*rx.borrow(), "one");
19 
20     {
21         let mut t = spawn(rx.changed());
22         assert_pending!(t.poll());
23 
24         tx.send("two").unwrap();
25 
26         assert!(t.is_woken());
27 
28         assert_ready_ok!(t.poll());
29     }
30     assert_eq!(*rx.borrow(), "two");
31 
32     {
33         let mut t = spawn(rx.changed());
34         assert_pending!(t.poll());
35 
36         drop(tx);
37 
38         assert!(t.is_woken());
39         assert_ready_err!(t.poll());
40     }
41     assert_eq!(*rx.borrow(), "two");
42 }
43 
44 #[test]
multi_rx()45 fn multi_rx() {
46     let (tx, mut rx1) = watch::channel("one");
47     let mut rx2 = rx1.clone();
48 
49     {
50         let mut t1 = spawn(rx1.changed());
51         let mut t2 = spawn(rx2.changed());
52 
53         assert_pending!(t1.poll());
54         assert_pending!(t2.poll());
55     }
56     assert_eq!(*rx1.borrow(), "one");
57     assert_eq!(*rx2.borrow(), "one");
58 
59     let mut t2 = spawn(rx2.changed());
60 
61     {
62         let mut t1 = spawn(rx1.changed());
63 
64         assert_pending!(t1.poll());
65         assert_pending!(t2.poll());
66 
67         tx.send("two").unwrap();
68 
69         assert!(t1.is_woken());
70         assert!(t2.is_woken());
71 
72         assert_ready_ok!(t1.poll());
73     }
74     assert_eq!(*rx1.borrow(), "two");
75 
76     {
77         let mut t1 = spawn(rx1.changed());
78 
79         assert_pending!(t1.poll());
80 
81         tx.send("three").unwrap();
82 
83         assert!(t1.is_woken());
84         assert!(t2.is_woken());
85 
86         assert_ready_ok!(t1.poll());
87         assert_ready_ok!(t2.poll());
88     }
89     assert_eq!(*rx1.borrow(), "three");
90 
91     drop(t2);
92 
93     assert_eq!(*rx2.borrow(), "three");
94 
95     {
96         let mut t1 = spawn(rx1.changed());
97         let mut t2 = spawn(rx2.changed());
98 
99         assert_pending!(t1.poll());
100         assert_pending!(t2.poll());
101 
102         tx.send("four").unwrap();
103 
104         assert_ready_ok!(t1.poll());
105         assert_ready_ok!(t2.poll());
106     }
107     assert_eq!(*rx1.borrow(), "four");
108     assert_eq!(*rx2.borrow(), "four");
109 }
110 
111 #[test]
rx_observes_final_value()112 fn rx_observes_final_value() {
113     // Initial value
114 
115     let (tx, mut rx) = watch::channel("one");
116     drop(tx);
117 
118     {
119         let mut t1 = spawn(rx.changed());
120         assert_ready_err!(t1.poll());
121     }
122     assert_eq!(*rx.borrow(), "one");
123 
124     // Sending a value
125 
126     let (tx, mut rx) = watch::channel("one");
127 
128     tx.send("two").unwrap();
129 
130     {
131         let mut t1 = spawn(rx.changed());
132         assert_ready_ok!(t1.poll());
133     }
134     assert_eq!(*rx.borrow(), "two");
135 
136     {
137         let mut t1 = spawn(rx.changed());
138         assert_pending!(t1.poll());
139 
140         tx.send("three").unwrap();
141         drop(tx);
142 
143         assert!(t1.is_woken());
144 
145         assert_ready_ok!(t1.poll());
146     }
147     assert_eq!(*rx.borrow(), "three");
148 
149     {
150         let mut t1 = spawn(rx.changed());
151         assert_ready_err!(t1.poll());
152     }
153     assert_eq!(*rx.borrow(), "three");
154 }
155 
156 #[test]
poll_close()157 fn poll_close() {
158     let (tx, rx) = watch::channel("one");
159 
160     {
161         let mut t = spawn(tx.closed());
162         assert_pending!(t.poll());
163 
164         drop(rx);
165 
166         assert!(t.is_woken());
167         assert_ready!(t.poll());
168     }
169 
170     assert!(tx.send("two").is_err());
171 }
172 
173 #[test]
borrow_and_update()174 fn borrow_and_update() {
175     let (tx, mut rx) = watch::channel("one");
176 
177     tx.send("two").unwrap();
178     assert_ready!(spawn(rx.changed()).poll()).unwrap();
179     assert_pending!(spawn(rx.changed()).poll());
180 
181     tx.send("three").unwrap();
182     assert_eq!(*rx.borrow_and_update(), "three");
183     assert_pending!(spawn(rx.changed()).poll());
184 
185     drop(tx);
186     assert_eq!(*rx.borrow_and_update(), "three");
187     assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
188 }
189 
190 #[test]
reopened_after_subscribe()191 fn reopened_after_subscribe() {
192     let (tx, rx) = watch::channel("one");
193     assert!(!tx.is_closed());
194 
195     drop(rx);
196     assert!(tx.is_closed());
197 
198     let rx = tx.subscribe();
199     assert!(!tx.is_closed());
200 
201     drop(rx);
202     assert!(tx.is_closed());
203 }
204