1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use tokio::sync::watch;
6 use tokio_test::task::spawn;
7 use tokio_test::{assert_pending, assert_ready};
8 
9 #[test]
single_rx_recv()10 fn single_rx_recv() {
11     let (tx, mut rx) = watch::channel("one");
12 
13     {
14         let mut t = spawn(rx.recv());
15         let v = assert_ready!(t.poll()).unwrap();
16         assert_eq!(v, "one");
17     }
18 
19     {
20         let mut t = spawn(rx.recv());
21 
22         assert_pending!(t.poll());
23 
24         tx.broadcast("two").unwrap();
25 
26         assert!(t.is_woken());
27 
28         let v = assert_ready!(t.poll()).unwrap();
29         assert_eq!(v, "two");
30     }
31 
32     {
33         let mut t = spawn(rx.recv());
34 
35         assert_pending!(t.poll());
36 
37         drop(tx);
38 
39         let res = assert_ready!(t.poll());
40         assert!(res.is_none());
41     }
42 }
43 
44 #[test]
multi_rx()45 fn multi_rx() {
46     let (tx, mut rx1) = watch::channel("one");
47     let mut rx2 = rx1.clone();
48 
49     {
50         let mut t1 = spawn(rx1.recv());
51         let mut t2 = spawn(rx2.recv());
52 
53         let res = assert_ready!(t1.poll());
54         assert_eq!(res.unwrap(), "one");
55 
56         let res = assert_ready!(t2.poll());
57         assert_eq!(res.unwrap(), "one");
58     }
59 
60     let mut t2 = spawn(rx2.recv());
61 
62     {
63         let mut t1 = spawn(rx1.recv());
64 
65         assert_pending!(t1.poll());
66         assert_pending!(t2.poll());
67 
68         tx.broadcast("two").unwrap();
69 
70         assert!(t1.is_woken());
71         assert!(t2.is_woken());
72 
73         let res = assert_ready!(t1.poll());
74         assert_eq!(res.unwrap(), "two");
75     }
76 
77     {
78         let mut t1 = spawn(rx1.recv());
79 
80         assert_pending!(t1.poll());
81 
82         tx.broadcast("three").unwrap();
83 
84         assert!(t1.is_woken());
85         assert!(t2.is_woken());
86 
87         let res = assert_ready!(t1.poll());
88         assert_eq!(res.unwrap(), "three");
89 
90         let res = assert_ready!(t2.poll());
91         assert_eq!(res.unwrap(), "three");
92     }
93 
94     drop(t2);
95 
96     {
97         let mut t1 = spawn(rx1.recv());
98         let mut t2 = spawn(rx2.recv());
99 
100         assert_pending!(t1.poll());
101         assert_pending!(t2.poll());
102 
103         tx.broadcast("four").unwrap();
104 
105         let res = assert_ready!(t1.poll());
106         assert_eq!(res.unwrap(), "four");
107         drop(t1);
108 
109         let mut t1 = spawn(rx1.recv());
110         assert_pending!(t1.poll());
111 
112         drop(tx);
113 
114         assert!(t1.is_woken());
115         let res = assert_ready!(t1.poll());
116         assert!(res.is_none());
117 
118         let res = assert_ready!(t2.poll());
119         assert_eq!(res.unwrap(), "four");
120 
121         drop(t2);
122         let mut t2 = spawn(rx2.recv());
123         let res = assert_ready!(t2.poll());
124         assert!(res.is_none());
125     }
126 }
127 
128 #[test]
rx_observes_final_value()129 fn rx_observes_final_value() {
130     // Initial value
131 
132     let (tx, mut rx) = watch::channel("one");
133     drop(tx);
134 
135     {
136         let mut t1 = spawn(rx.recv());
137         let res = assert_ready!(t1.poll());
138         assert_eq!(res.unwrap(), "one");
139     }
140 
141     {
142         let mut t1 = spawn(rx.recv());
143         let res = assert_ready!(t1.poll());
144         assert!(res.is_none());
145     }
146 
147     // Sending a value
148 
149     let (tx, mut rx) = watch::channel("one");
150 
151     tx.broadcast("two").unwrap();
152 
153     {
154         let mut t1 = spawn(rx.recv());
155         let res = assert_ready!(t1.poll());
156         assert_eq!(res.unwrap(), "two");
157     }
158 
159     {
160         let mut t1 = spawn(rx.recv());
161         assert_pending!(t1.poll());
162 
163         tx.broadcast("three").unwrap();
164         drop(tx);
165 
166         assert!(t1.is_woken());
167 
168         let res = assert_ready!(t1.poll());
169         assert_eq!(res.unwrap(), "three");
170     }
171 
172     {
173         let mut t1 = spawn(rx.recv());
174         let res = assert_ready!(t1.poll());
175         assert!(res.is_none());
176     }
177 }
178 
179 #[test]
poll_close()180 fn poll_close() {
181     let (mut tx, rx) = watch::channel("one");
182 
183     {
184         let mut t = spawn(tx.closed());
185         assert_pending!(t.poll());
186 
187         drop(rx);
188 
189         assert!(t.is_woken());
190         assert_ready!(t.poll());
191     }
192 
193     assert!(tx.broadcast("two").is_err());
194 }
195 
196 #[test]
stream_impl()197 fn stream_impl() {
198     use tokio::stream::StreamExt;
199 
200     let (tx, mut rx) = watch::channel("one");
201 
202     {
203         let mut t = spawn(rx.next());
204         let v = assert_ready!(t.poll()).unwrap();
205         assert_eq!(v, "one");
206     }
207 
208     {
209         let mut t = spawn(rx.next());
210 
211         assert_pending!(t.poll());
212 
213         tx.broadcast("two").unwrap();
214 
215         assert!(t.is_woken());
216 
217         let v = assert_ready!(t.poll()).unwrap();
218         assert_eq!(v, "two");
219     }
220 
221     {
222         let mut t = spawn(rx.next());
223 
224         assert_pending!(t.poll());
225 
226         drop(tx);
227 
228         let res = assert_ready!(t.poll());
229         assert!(res.is_none());
230     }
231 }
232