1 //! Tests for the list channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = unbounded();
22 s.try_send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31
32 #[test]
capacity()33 fn capacity() {
34 let (s, r) = unbounded::<()>();
35 assert_eq!(s.capacity(), None);
36 assert_eq!(r.capacity(), None);
37 }
38
39 #[test]
len_empty_full()40 fn len_empty_full() {
41 let (s, r) = unbounded();
42
43 assert_eq!(s.len(), 0);
44 assert!(s.is_empty());
45 assert!(!s.is_full());
46 assert_eq!(r.len(), 0);
47 assert!(r.is_empty());
48 assert!(!r.is_full());
49
50 s.send(()).unwrap();
51
52 assert_eq!(s.len(), 1);
53 assert!(!s.is_empty());
54 assert!(!s.is_full());
55 assert_eq!(r.len(), 1);
56 assert!(!r.is_empty());
57 assert!(!r.is_full());
58
59 r.recv().unwrap();
60
61 assert_eq!(s.len(), 0);
62 assert!(s.is_empty());
63 assert!(!s.is_full());
64 assert_eq!(r.len(), 0);
65 assert!(r.is_empty());
66 assert!(!r.is_full());
67 }
68
69 #[test]
try_recv()70 fn try_recv() {
71 let (s, r) = unbounded();
72
73 scope(|scope| {
74 scope.spawn(move |_| {
75 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
76 thread::sleep(ms(1500));
77 assert_eq!(r.try_recv(), Ok(7));
78 thread::sleep(ms(500));
79 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
80 });
81 scope.spawn(move |_| {
82 thread::sleep(ms(1000));
83 s.send(7).unwrap();
84 });
85 })
86 .unwrap();
87 }
88
89 #[test]
recv()90 fn recv() {
91 let (s, r) = unbounded();
92
93 scope(|scope| {
94 scope.spawn(move |_| {
95 assert_eq!(r.recv(), Ok(7));
96 thread::sleep(ms(1000));
97 assert_eq!(r.recv(), Ok(8));
98 thread::sleep(ms(1000));
99 assert_eq!(r.recv(), Ok(9));
100 assert_eq!(r.recv(), Err(RecvError));
101 });
102 scope.spawn(move |_| {
103 thread::sleep(ms(1500));
104 s.send(7).unwrap();
105 s.send(8).unwrap();
106 s.send(9).unwrap();
107 });
108 })
109 .unwrap();
110 }
111
112 #[test]
recv_timeout()113 fn recv_timeout() {
114 let (s, r) = unbounded::<i32>();
115
116 scope(|scope| {
117 scope.spawn(move |_| {
118 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
119 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
120 assert_eq!(
121 r.recv_timeout(ms(1000)),
122 Err(RecvTimeoutError::Disconnected)
123 );
124 });
125 scope.spawn(move |_| {
126 thread::sleep(ms(1500));
127 s.send(7).unwrap();
128 });
129 })
130 .unwrap();
131 }
132
133 #[test]
try_send()134 fn try_send() {
135 let (s, r) = unbounded();
136 for i in 0..1000 {
137 assert_eq!(s.try_send(i), Ok(()));
138 }
139
140 drop(r);
141 assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
142 }
143
144 #[test]
send()145 fn send() {
146 let (s, r) = unbounded();
147 for i in 0..1000 {
148 assert_eq!(s.send(i), Ok(()));
149 }
150
151 drop(r);
152 assert_eq!(s.send(777), Err(SendError(777)));
153 }
154
155 #[test]
send_timeout()156 fn send_timeout() {
157 let (s, r) = unbounded();
158 for i in 0..1000 {
159 assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
160 }
161
162 drop(r);
163 assert_eq!(
164 s.send_timeout(777, ms(0)),
165 Err(SendTimeoutError::Disconnected(777))
166 );
167 }
168
169 #[test]
send_after_disconnect()170 fn send_after_disconnect() {
171 let (s, r) = unbounded();
172
173 s.send(1).unwrap();
174 s.send(2).unwrap();
175 s.send(3).unwrap();
176
177 drop(r);
178
179 assert_eq!(s.send(4), Err(SendError(4)));
180 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
181 assert_eq!(
182 s.send_timeout(6, ms(0)),
183 Err(SendTimeoutError::Disconnected(6))
184 );
185 }
186
187 #[test]
recv_after_disconnect()188 fn recv_after_disconnect() {
189 let (s, r) = unbounded();
190
191 s.send(1).unwrap();
192 s.send(2).unwrap();
193 s.send(3).unwrap();
194
195 drop(s);
196
197 assert_eq!(r.recv(), Ok(1));
198 assert_eq!(r.recv(), Ok(2));
199 assert_eq!(r.recv(), Ok(3));
200 assert_eq!(r.recv(), Err(RecvError));
201 }
202
203 #[test]
len()204 fn len() {
205 let (s, r) = unbounded();
206
207 assert_eq!(s.len(), 0);
208 assert_eq!(r.len(), 0);
209
210 for i in 0..50 {
211 s.send(i).unwrap();
212 assert_eq!(s.len(), i + 1);
213 }
214
215 for i in 0..50 {
216 r.recv().unwrap();
217 assert_eq!(r.len(), 50 - i - 1);
218 }
219
220 assert_eq!(s.len(), 0);
221 assert_eq!(r.len(), 0);
222 }
223
224 #[test]
disconnect_wakes_receiver()225 fn disconnect_wakes_receiver() {
226 let (s, r) = unbounded::<()>();
227
228 scope(|scope| {
229 scope.spawn(move |_| {
230 assert_eq!(r.recv(), Err(RecvError));
231 });
232 scope.spawn(move |_| {
233 thread::sleep(ms(1000));
234 drop(s);
235 });
236 })
237 .unwrap();
238 }
239
240 #[test]
spsc()241 fn spsc() {
242 #[cfg(miri)]
243 const COUNT: usize = 100;
244 #[cfg(not(miri))]
245 const COUNT: usize = 100_000;
246
247 let (s, r) = unbounded();
248
249 scope(|scope| {
250 scope.spawn(move |_| {
251 for i in 0..COUNT {
252 assert_eq!(r.recv(), Ok(i));
253 }
254 assert_eq!(r.recv(), Err(RecvError));
255 });
256 scope.spawn(move |_| {
257 for i in 0..COUNT {
258 s.send(i).unwrap();
259 }
260 });
261 })
262 .unwrap();
263 }
264
265 #[test]
mpmc()266 fn mpmc() {
267 #[cfg(miri)]
268 const COUNT: usize = 100;
269 #[cfg(not(miri))]
270 const COUNT: usize = 25_000;
271 const THREADS: usize = 4;
272
273 let (s, r) = unbounded::<usize>();
274 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
275
276 scope(|scope| {
277 for _ in 0..THREADS {
278 scope.spawn(|_| {
279 for _ in 0..COUNT {
280 let n = r.recv().unwrap();
281 v[n].fetch_add(1, Ordering::SeqCst);
282 }
283 });
284 }
285 for _ in 0..THREADS {
286 scope.spawn(|_| {
287 for i in 0..COUNT {
288 s.send(i).unwrap();
289 }
290 });
291 }
292 })
293 .unwrap();
294
295 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
296
297 for c in v {
298 assert_eq!(c.load(Ordering::SeqCst), THREADS);
299 }
300 }
301
302 #[test]
stress_oneshot()303 fn stress_oneshot() {
304 #[cfg(miri)]
305 const COUNT: usize = 100;
306 #[cfg(not(miri))]
307 const COUNT: usize = 10_000;
308
309 for _ in 0..COUNT {
310 let (s, r) = unbounded();
311
312 scope(|scope| {
313 scope.spawn(|_| r.recv().unwrap());
314 scope.spawn(|_| s.send(0).unwrap());
315 })
316 .unwrap();
317 }
318 }
319
320 #[test]
stress_iter()321 fn stress_iter() {
322 #[cfg(miri)]
323 const COUNT: usize = 100;
324 #[cfg(not(miri))]
325 const COUNT: usize = 100_000;
326
327 let (request_s, request_r) = unbounded();
328 let (response_s, response_r) = unbounded();
329
330 scope(|scope| {
331 scope.spawn(move |_| {
332 let mut count = 0;
333 loop {
334 for x in response_r.try_iter() {
335 count += x;
336 if count == COUNT {
337 return;
338 }
339 }
340 request_s.send(()).unwrap();
341 }
342 });
343
344 for _ in request_r.iter() {
345 if response_s.send(1).is_err() {
346 break;
347 }
348 }
349 })
350 .unwrap();
351 }
352
353 #[test]
stress_timeout_two_threads()354 fn stress_timeout_two_threads() {
355 const COUNT: usize = 100;
356
357 let (s, r) = unbounded();
358
359 scope(|scope| {
360 scope.spawn(|_| {
361 for i in 0..COUNT {
362 if i % 2 == 0 {
363 thread::sleep(ms(50));
364 }
365 s.send(i).unwrap();
366 }
367 });
368
369 scope.spawn(|_| {
370 for i in 0..COUNT {
371 if i % 2 == 0 {
372 thread::sleep(ms(50));
373 }
374 loop {
375 if let Ok(x) = r.recv_timeout(ms(10)) {
376 assert_eq!(x, i);
377 break;
378 }
379 }
380 }
381 });
382 })
383 .unwrap();
384 }
385
386 #[cfg_attr(miri, ignore)] // Miri is too slow
387 #[test]
drops()388 fn drops() {
389 const RUNS: usize = 100;
390
391 static DROPS: AtomicUsize = AtomicUsize::new(0);
392
393 #[derive(Debug, PartialEq)]
394 struct DropCounter;
395
396 impl Drop for DropCounter {
397 fn drop(&mut self) {
398 DROPS.fetch_add(1, Ordering::SeqCst);
399 }
400 }
401
402 let mut rng = thread_rng();
403
404 for _ in 0..RUNS {
405 let steps = rng.gen_range(0..10_000);
406 let additional = rng.gen_range(0..1000);
407
408 DROPS.store(0, Ordering::SeqCst);
409 let (s, r) = unbounded::<DropCounter>();
410
411 scope(|scope| {
412 scope.spawn(|_| {
413 for _ in 0..steps {
414 r.recv().unwrap();
415 }
416 });
417
418 scope.spawn(|_| {
419 for _ in 0..steps {
420 s.send(DropCounter).unwrap();
421 }
422 });
423 })
424 .unwrap();
425
426 for _ in 0..additional {
427 s.try_send(DropCounter).unwrap();
428 }
429
430 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
431 drop(s);
432 drop(r);
433 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
434 }
435 }
436
437 #[test]
linearizable()438 fn linearizable() {
439 #[cfg(miri)]
440 const COUNT: usize = 100;
441 #[cfg(not(miri))]
442 const COUNT: usize = 25_000;
443 const THREADS: usize = 4;
444
445 let (s, r) = unbounded();
446
447 scope(|scope| {
448 for _ in 0..THREADS {
449 scope.spawn(|_| {
450 for _ in 0..COUNT {
451 s.send(0).unwrap();
452 r.try_recv().unwrap();
453 }
454 });
455 }
456 })
457 .unwrap();
458 }
459
460 #[test]
fairness()461 fn fairness() {
462 #[cfg(miri)]
463 const COUNT: usize = 100;
464 #[cfg(not(miri))]
465 const COUNT: usize = 10_000;
466
467 let (s1, r1) = unbounded::<()>();
468 let (s2, r2) = unbounded::<()>();
469
470 for _ in 0..COUNT {
471 s1.send(()).unwrap();
472 s2.send(()).unwrap();
473 }
474
475 let mut hits = [0usize; 2];
476 for _ in 0..COUNT {
477 select! {
478 recv(r1) -> _ => hits[0] += 1,
479 recv(r2) -> _ => hits[1] += 1,
480 }
481 }
482 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
483 }
484
485 #[test]
fairness_duplicates()486 fn fairness_duplicates() {
487 #[cfg(miri)]
488 const COUNT: usize = 100;
489 #[cfg(not(miri))]
490 const COUNT: usize = 10_000;
491
492 let (s, r) = unbounded();
493
494 for _ in 0..COUNT {
495 s.send(()).unwrap();
496 }
497
498 let mut hits = [0usize; 5];
499 for _ in 0..COUNT {
500 select! {
501 recv(r) -> _ => hits[0] += 1,
502 recv(r) -> _ => hits[1] += 1,
503 recv(r) -> _ => hits[2] += 1,
504 recv(r) -> _ => hits[3] += 1,
505 recv(r) -> _ => hits[4] += 1,
506 }
507 }
508 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
509 }
510
511 #[test]
recv_in_send()512 fn recv_in_send() {
513 let (s, r) = unbounded();
514 s.send(()).unwrap();
515
516 select! {
517 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
518 }
519 }
520
521 #[test]
channel_through_channel()522 fn channel_through_channel() {
523 #[cfg(miri)]
524 const COUNT: usize = 100;
525 #[cfg(not(miri))]
526 const COUNT: usize = 1000;
527
528 type T = Box<dyn Any + Send>;
529
530 let (s, r) = unbounded::<T>();
531
532 scope(|scope| {
533 scope.spawn(move |_| {
534 let mut s = s;
535
536 for _ in 0..COUNT {
537 let (new_s, new_r) = unbounded();
538 let new_r: T = Box::new(Some(new_r));
539
540 s.send(new_r).unwrap();
541 s = new_s;
542 }
543 });
544
545 scope.spawn(move |_| {
546 let mut r = r;
547
548 for _ in 0..COUNT {
549 r = r
550 .recv()
551 .unwrap()
552 .downcast_mut::<Option<Receiver<T>>>()
553 .unwrap()
554 .take()
555 .unwrap()
556 }
557 });
558 })
559 .unwrap();
560 }
561