1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use tokio::sync::watch;
6 use tokio_test::task::spawn;
7 use tokio_test::{assert_pending, assert_ready};
8
9 #[test]
single_rx_recv()10 fn single_rx_recv() {
11 let (tx, mut rx) = watch::channel("one");
12
13 {
14 let mut t = spawn(rx.recv());
15 let v = assert_ready!(t.poll()).unwrap();
16 assert_eq!(v, "one");
17 }
18
19 {
20 let mut t = spawn(rx.recv());
21
22 assert_pending!(t.poll());
23
24 tx.broadcast("two").unwrap();
25
26 assert!(t.is_woken());
27
28 let v = assert_ready!(t.poll()).unwrap();
29 assert_eq!(v, "two");
30 }
31
32 {
33 let mut t = spawn(rx.recv());
34
35 assert_pending!(t.poll());
36
37 drop(tx);
38
39 let res = assert_ready!(t.poll());
40 assert!(res.is_none());
41 }
42 }
43
44 #[test]
multi_rx()45 fn multi_rx() {
46 let (tx, mut rx1) = watch::channel("one");
47 let mut rx2 = rx1.clone();
48
49 {
50 let mut t1 = spawn(rx1.recv());
51 let mut t2 = spawn(rx2.recv());
52
53 let res = assert_ready!(t1.poll());
54 assert_eq!(res.unwrap(), "one");
55
56 let res = assert_ready!(t2.poll());
57 assert_eq!(res.unwrap(), "one");
58 }
59
60 let mut t2 = spawn(rx2.recv());
61
62 {
63 let mut t1 = spawn(rx1.recv());
64
65 assert_pending!(t1.poll());
66 assert_pending!(t2.poll());
67
68 tx.broadcast("two").unwrap();
69
70 assert!(t1.is_woken());
71 assert!(t2.is_woken());
72
73 let res = assert_ready!(t1.poll());
74 assert_eq!(res.unwrap(), "two");
75 }
76
77 {
78 let mut t1 = spawn(rx1.recv());
79
80 assert_pending!(t1.poll());
81
82 tx.broadcast("three").unwrap();
83
84 assert!(t1.is_woken());
85 assert!(t2.is_woken());
86
87 let res = assert_ready!(t1.poll());
88 assert_eq!(res.unwrap(), "three");
89
90 let res = assert_ready!(t2.poll());
91 assert_eq!(res.unwrap(), "three");
92 }
93
94 drop(t2);
95
96 {
97 let mut t1 = spawn(rx1.recv());
98 let mut t2 = spawn(rx2.recv());
99
100 assert_pending!(t1.poll());
101 assert_pending!(t2.poll());
102
103 tx.broadcast("four").unwrap();
104
105 let res = assert_ready!(t1.poll());
106 assert_eq!(res.unwrap(), "four");
107 drop(t1);
108
109 let mut t1 = spawn(rx1.recv());
110 assert_pending!(t1.poll());
111
112 drop(tx);
113
114 assert!(t1.is_woken());
115 let res = assert_ready!(t1.poll());
116 assert!(res.is_none());
117
118 let res = assert_ready!(t2.poll());
119 assert_eq!(res.unwrap(), "four");
120
121 drop(t2);
122 let mut t2 = spawn(rx2.recv());
123 let res = assert_ready!(t2.poll());
124 assert!(res.is_none());
125 }
126 }
127
128 #[test]
rx_observes_final_value()129 fn rx_observes_final_value() {
130 // Initial value
131
132 let (tx, mut rx) = watch::channel("one");
133 drop(tx);
134
135 {
136 let mut t1 = spawn(rx.recv());
137 let res = assert_ready!(t1.poll());
138 assert_eq!(res.unwrap(), "one");
139 }
140
141 {
142 let mut t1 = spawn(rx.recv());
143 let res = assert_ready!(t1.poll());
144 assert!(res.is_none());
145 }
146
147 // Sending a value
148
149 let (tx, mut rx) = watch::channel("one");
150
151 tx.broadcast("two").unwrap();
152
153 {
154 let mut t1 = spawn(rx.recv());
155 let res = assert_ready!(t1.poll());
156 assert_eq!(res.unwrap(), "two");
157 }
158
159 {
160 let mut t1 = spawn(rx.recv());
161 assert_pending!(t1.poll());
162
163 tx.broadcast("three").unwrap();
164 drop(tx);
165
166 assert!(t1.is_woken());
167
168 let res = assert_ready!(t1.poll());
169 assert_eq!(res.unwrap(), "three");
170 }
171
172 {
173 let mut t1 = spawn(rx.recv());
174 let res = assert_ready!(t1.poll());
175 assert!(res.is_none());
176 }
177 }
178
179 #[test]
poll_close()180 fn poll_close() {
181 let (mut tx, rx) = watch::channel("one");
182
183 {
184 let mut t = spawn(tx.closed());
185 assert_pending!(t.poll());
186
187 drop(rx);
188
189 assert!(t.is_woken());
190 assert_ready!(t.poll());
191 }
192
193 assert!(tx.broadcast("two").is_err());
194 }
195
196 #[test]
stream_impl()197 fn stream_impl() {
198 use tokio::stream::StreamExt;
199
200 let (tx, mut rx) = watch::channel("one");
201
202 {
203 let mut t = spawn(rx.next());
204 let v = assert_ready!(t.poll()).unwrap();
205 assert_eq!(v, "one");
206 }
207
208 {
209 let mut t = spawn(rx.next());
210
211 assert_pending!(t.poll());
212
213 tx.broadcast("two").unwrap();
214
215 assert!(t.is_woken());
216
217 let v = assert_ready!(t.poll()).unwrap();
218 assert_eq!(v, "two");
219 }
220
221 {
222 let mut t = spawn(rx.next());
223
224 assert_pending!(t.poll());
225
226 drop(tx);
227
228 let res = assert_ready!(t.poll());
229 assert!(res.is_none());
230 }
231 }
232