1 //! Tests for the list channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = unbounded();
22 s.try_send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31
32 #[test]
capacity()33 fn capacity() {
34 let (s, r) = unbounded::<()>();
35 assert_eq!(s.capacity(), None);
36 assert_eq!(r.capacity(), None);
37 }
38
39 #[test]
len_empty_full()40 fn len_empty_full() {
41 let (s, r) = unbounded();
42
43 assert_eq!(s.len(), 0);
44 assert_eq!(s.is_empty(), true);
45 assert_eq!(s.is_full(), false);
46 assert_eq!(r.len(), 0);
47 assert_eq!(r.is_empty(), true);
48 assert_eq!(r.is_full(), false);
49
50 s.send(()).unwrap();
51
52 assert_eq!(s.len(), 1);
53 assert_eq!(s.is_empty(), false);
54 assert_eq!(s.is_full(), false);
55 assert_eq!(r.len(), 1);
56 assert_eq!(r.is_empty(), false);
57 assert_eq!(r.is_full(), false);
58
59 r.recv().unwrap();
60
61 assert_eq!(s.len(), 0);
62 assert_eq!(s.is_empty(), true);
63 assert_eq!(s.is_full(), false);
64 assert_eq!(r.len(), 0);
65 assert_eq!(r.is_empty(), true);
66 assert_eq!(r.is_full(), false);
67 }
68
69 #[test]
try_recv()70 fn try_recv() {
71 let (s, r) = unbounded();
72
73 scope(|scope| {
74 scope.spawn(move |_| {
75 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
76 thread::sleep(ms(1500));
77 assert_eq!(r.try_recv(), Ok(7));
78 thread::sleep(ms(500));
79 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
80 });
81 scope.spawn(move |_| {
82 thread::sleep(ms(1000));
83 s.send(7).unwrap();
84 });
85 })
86 .unwrap();
87 }
88
89 #[test]
recv()90 fn recv() {
91 let (s, r) = unbounded();
92
93 scope(|scope| {
94 scope.spawn(move |_| {
95 assert_eq!(r.recv(), Ok(7));
96 thread::sleep(ms(1000));
97 assert_eq!(r.recv(), Ok(8));
98 thread::sleep(ms(1000));
99 assert_eq!(r.recv(), Ok(9));
100 assert_eq!(r.recv(), Err(RecvError));
101 });
102 scope.spawn(move |_| {
103 thread::sleep(ms(1500));
104 s.send(7).unwrap();
105 s.send(8).unwrap();
106 s.send(9).unwrap();
107 });
108 })
109 .unwrap();
110 }
111
112 #[test]
recv_timeout()113 fn recv_timeout() {
114 let (s, r) = unbounded::<i32>();
115
116 scope(|scope| {
117 scope.spawn(move |_| {
118 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
119 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
120 assert_eq!(
121 r.recv_timeout(ms(1000)),
122 Err(RecvTimeoutError::Disconnected)
123 );
124 });
125 scope.spawn(move |_| {
126 thread::sleep(ms(1500));
127 s.send(7).unwrap();
128 });
129 })
130 .unwrap();
131 }
132
133 #[test]
try_send()134 fn try_send() {
135 let (s, r) = unbounded();
136 for i in 0..1000 {
137 assert_eq!(s.try_send(i), Ok(()));
138 }
139
140 drop(r);
141 assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
142 }
143
144 #[test]
send()145 fn send() {
146 let (s, r) = unbounded();
147 for i in 0..1000 {
148 assert_eq!(s.send(i), Ok(()));
149 }
150
151 drop(r);
152 assert_eq!(s.send(777), Err(SendError(777)));
153 }
154
155 #[test]
send_timeout()156 fn send_timeout() {
157 let (s, r) = unbounded();
158 for i in 0..1000 {
159 assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
160 }
161
162 drop(r);
163 assert_eq!(
164 s.send_timeout(777, ms(0)),
165 Err(SendTimeoutError::Disconnected(777))
166 );
167 }
168
169 #[test]
send_after_disconnect()170 fn send_after_disconnect() {
171 let (s, r) = unbounded();
172
173 s.send(1).unwrap();
174 s.send(2).unwrap();
175 s.send(3).unwrap();
176
177 drop(r);
178
179 assert_eq!(s.send(4), Err(SendError(4)));
180 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
181 assert_eq!(
182 s.send_timeout(6, ms(0)),
183 Err(SendTimeoutError::Disconnected(6))
184 );
185 }
186
187 #[test]
recv_after_disconnect()188 fn recv_after_disconnect() {
189 let (s, r) = unbounded();
190
191 s.send(1).unwrap();
192 s.send(2).unwrap();
193 s.send(3).unwrap();
194
195 drop(s);
196
197 assert_eq!(r.recv(), Ok(1));
198 assert_eq!(r.recv(), Ok(2));
199 assert_eq!(r.recv(), Ok(3));
200 assert_eq!(r.recv(), Err(RecvError));
201 }
202
203 #[test]
len()204 fn len() {
205 let (s, r) = unbounded();
206
207 assert_eq!(s.len(), 0);
208 assert_eq!(r.len(), 0);
209
210 for i in 0..50 {
211 s.send(i).unwrap();
212 assert_eq!(s.len(), i + 1);
213 }
214
215 for i in 0..50 {
216 r.recv().unwrap();
217 assert_eq!(r.len(), 50 - i - 1);
218 }
219
220 assert_eq!(s.len(), 0);
221 assert_eq!(r.len(), 0);
222 }
223
224 #[test]
disconnect_wakes_receiver()225 fn disconnect_wakes_receiver() {
226 let (s, r) = unbounded::<()>();
227
228 scope(|scope| {
229 scope.spawn(move |_| {
230 assert_eq!(r.recv(), Err(RecvError));
231 });
232 scope.spawn(move |_| {
233 thread::sleep(ms(1000));
234 drop(s);
235 });
236 })
237 .unwrap();
238 }
239
240 #[test]
spsc()241 fn spsc() {
242 const COUNT: usize = 100_000;
243
244 let (s, r) = unbounded();
245
246 scope(|scope| {
247 scope.spawn(move |_| {
248 for i in 0..COUNT {
249 assert_eq!(r.recv(), Ok(i));
250 }
251 assert_eq!(r.recv(), Err(RecvError));
252 });
253 scope.spawn(move |_| {
254 for i in 0..COUNT {
255 s.send(i).unwrap();
256 }
257 });
258 })
259 .unwrap();
260 }
261
262 #[test]
mpmc()263 fn mpmc() {
264 const COUNT: usize = 25_000;
265 const THREADS: usize = 4;
266
267 let (s, r) = unbounded::<usize>();
268 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
269
270 scope(|scope| {
271 for _ in 0..THREADS {
272 scope.spawn(|_| {
273 for _ in 0..COUNT {
274 let n = r.recv().unwrap();
275 v[n].fetch_add(1, Ordering::SeqCst);
276 }
277 });
278 }
279 for _ in 0..THREADS {
280 scope.spawn(|_| {
281 for i in 0..COUNT {
282 s.send(i).unwrap();
283 }
284 });
285 }
286 })
287 .unwrap();
288
289 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
290
291 for c in v {
292 assert_eq!(c.load(Ordering::SeqCst), THREADS);
293 }
294 }
295
296 #[test]
stress_oneshot()297 fn stress_oneshot() {
298 const COUNT: usize = 10_000;
299
300 for _ in 0..COUNT {
301 let (s, r) = unbounded();
302
303 scope(|scope| {
304 scope.spawn(|_| r.recv().unwrap());
305 scope.spawn(|_| s.send(0).unwrap());
306 })
307 .unwrap();
308 }
309 }
310
311 #[test]
stress_iter()312 fn stress_iter() {
313 const COUNT: usize = 100_000;
314
315 let (request_s, request_r) = unbounded();
316 let (response_s, response_r) = unbounded();
317
318 scope(|scope| {
319 scope.spawn(move |_| {
320 let mut count = 0;
321 loop {
322 for x in response_r.try_iter() {
323 count += x;
324 if count == COUNT {
325 return;
326 }
327 }
328 request_s.send(()).unwrap();
329 }
330 });
331
332 for _ in request_r.iter() {
333 if response_s.send(1).is_err() {
334 break;
335 }
336 }
337 })
338 .unwrap();
339 }
340
341 #[test]
stress_timeout_two_threads()342 fn stress_timeout_two_threads() {
343 const COUNT: usize = 100;
344
345 let (s, r) = unbounded();
346
347 scope(|scope| {
348 scope.spawn(|_| {
349 for i in 0..COUNT {
350 if i % 2 == 0 {
351 thread::sleep(ms(50));
352 }
353 s.send(i).unwrap();
354 }
355 });
356
357 scope.spawn(|_| {
358 for i in 0..COUNT {
359 if i % 2 == 0 {
360 thread::sleep(ms(50));
361 }
362 loop {
363 if let Ok(x) = r.recv_timeout(ms(10)) {
364 assert_eq!(x, i);
365 break;
366 }
367 }
368 }
369 });
370 })
371 .unwrap();
372 }
373
374 #[test]
drops()375 fn drops() {
376 static DROPS: AtomicUsize = AtomicUsize::new(0);
377
378 #[derive(Debug, PartialEq)]
379 struct DropCounter;
380
381 impl Drop for DropCounter {
382 fn drop(&mut self) {
383 DROPS.fetch_add(1, Ordering::SeqCst);
384 }
385 }
386
387 let mut rng = thread_rng();
388
389 for _ in 0..100 {
390 let steps = rng.gen_range(0..10_000);
391 let additional = rng.gen_range(0..1000);
392
393 DROPS.store(0, Ordering::SeqCst);
394 let (s, r) = unbounded::<DropCounter>();
395
396 scope(|scope| {
397 scope.spawn(|_| {
398 for _ in 0..steps {
399 r.recv().unwrap();
400 }
401 });
402
403 scope.spawn(|_| {
404 for _ in 0..steps {
405 s.send(DropCounter).unwrap();
406 }
407 });
408 })
409 .unwrap();
410
411 for _ in 0..additional {
412 s.try_send(DropCounter).unwrap();
413 }
414
415 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
416 drop(s);
417 drop(r);
418 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
419 }
420 }
421
422 #[test]
linearizable()423 fn linearizable() {
424 const COUNT: usize = 25_000;
425 const THREADS: usize = 4;
426
427 let (s, r) = unbounded();
428
429 scope(|scope| {
430 for _ in 0..THREADS {
431 scope.spawn(|_| {
432 for _ in 0..COUNT {
433 s.send(0).unwrap();
434 r.try_recv().unwrap();
435 }
436 });
437 }
438 })
439 .unwrap();
440 }
441
442 #[test]
fairness()443 fn fairness() {
444 const COUNT: usize = 10_000;
445
446 let (s1, r1) = unbounded::<()>();
447 let (s2, r2) = unbounded::<()>();
448
449 for _ in 0..COUNT {
450 s1.send(()).unwrap();
451 s2.send(()).unwrap();
452 }
453
454 let mut hits = [0usize; 2];
455 for _ in 0..COUNT {
456 select! {
457 recv(r1) -> _ => hits[0] += 1,
458 recv(r2) -> _ => hits[1] += 1,
459 }
460 }
461 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
462 }
463
464 #[test]
fairness_duplicates()465 fn fairness_duplicates() {
466 const COUNT: usize = 10_000;
467
468 let (s, r) = unbounded();
469
470 for _ in 0..COUNT {
471 s.send(()).unwrap();
472 }
473
474 let mut hits = [0usize; 5];
475 for _ in 0..COUNT {
476 select! {
477 recv(r) -> _ => hits[0] += 1,
478 recv(r) -> _ => hits[1] += 1,
479 recv(r) -> _ => hits[2] += 1,
480 recv(r) -> _ => hits[3] += 1,
481 recv(r) -> _ => hits[4] += 1,
482 }
483 }
484 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
485 }
486
487 #[test]
recv_in_send()488 fn recv_in_send() {
489 let (s, r) = unbounded();
490 s.send(()).unwrap();
491
492 select! {
493 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
494 }
495 }
496
497 #[test]
channel_through_channel()498 fn channel_through_channel() {
499 const COUNT: usize = 1000;
500
501 type T = Box<dyn Any + Send>;
502
503 let (s, r) = unbounded::<T>();
504
505 scope(|scope| {
506 scope.spawn(move |_| {
507 let mut s = s;
508
509 for _ in 0..COUNT {
510 let (new_s, new_r) = unbounded();
511 let new_r: T = Box::new(Some(new_r));
512
513 s.send(new_r).unwrap();
514 s = new_s;
515 }
516 });
517
518 scope.spawn(move |_| {
519 let mut r = r;
520
521 for _ in 0..COUNT {
522 r = r
523 .recv()
524 .unwrap()
525 .downcast_mut::<Option<Receiver<T>>>()
526 .unwrap()
527 .take()
528 .unwrap()
529 }
530 });
531 })
532 .unwrap();
533 }
534