1 //! Tests for the zero channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = bounded(0);
22 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25
26 #[test]
capacity()27 fn capacity() {
28 let (s, r) = bounded::<()>(0);
29 assert_eq!(s.capacity(), Some(0));
30 assert_eq!(r.capacity(), Some(0));
31 }
32
33 #[test]
len_empty_full()34 fn len_empty_full() {
35 let (s, r) = bounded(0);
36
37 assert_eq!(s.len(), 0);
38 assert_eq!(s.is_empty(), true);
39 assert_eq!(s.is_full(), true);
40 assert_eq!(r.len(), 0);
41 assert_eq!(r.is_empty(), true);
42 assert_eq!(r.is_full(), true);
43
44 scope(|scope| {
45 scope.spawn(|_| s.send(0).unwrap());
46 scope.spawn(|_| r.recv().unwrap());
47 })
48 .unwrap();
49
50 assert_eq!(s.len(), 0);
51 assert_eq!(s.is_empty(), true);
52 assert_eq!(s.is_full(), true);
53 assert_eq!(r.len(), 0);
54 assert_eq!(r.is_empty(), true);
55 assert_eq!(r.is_full(), true);
56 }
57
58 #[test]
try_recv()59 fn try_recv() {
60 let (s, r) = bounded(0);
61
62 scope(|scope| {
63 scope.spawn(move |_| {
64 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65 thread::sleep(ms(1500));
66 assert_eq!(r.try_recv(), Ok(7));
67 thread::sleep(ms(500));
68 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69 });
70 scope.spawn(move |_| {
71 thread::sleep(ms(1000));
72 s.send(7).unwrap();
73 });
74 })
75 .unwrap();
76 }
77
78 #[test]
recv()79 fn recv() {
80 let (s, r) = bounded(0);
81
82 scope(|scope| {
83 scope.spawn(move |_| {
84 assert_eq!(r.recv(), Ok(7));
85 thread::sleep(ms(1000));
86 assert_eq!(r.recv(), Ok(8));
87 thread::sleep(ms(1000));
88 assert_eq!(r.recv(), Ok(9));
89 assert_eq!(r.recv(), Err(RecvError));
90 });
91 scope.spawn(move |_| {
92 thread::sleep(ms(1500));
93 s.send(7).unwrap();
94 s.send(8).unwrap();
95 s.send(9).unwrap();
96 });
97 })
98 .unwrap();
99 }
100
101 #[test]
recv_timeout()102 fn recv_timeout() {
103 let (s, r) = bounded::<i32>(0);
104
105 scope(|scope| {
106 scope.spawn(move |_| {
107 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109 assert_eq!(
110 r.recv_timeout(ms(1000)),
111 Err(RecvTimeoutError::Disconnected)
112 );
113 });
114 scope.spawn(move |_| {
115 thread::sleep(ms(1500));
116 s.send(7).unwrap();
117 });
118 })
119 .unwrap();
120 }
121
122 #[test]
try_send()123 fn try_send() {
124 let (s, r) = bounded(0);
125
126 scope(|scope| {
127 scope.spawn(move |_| {
128 assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129 thread::sleep(ms(1500));
130 assert_eq!(s.try_send(8), Ok(()));
131 thread::sleep(ms(500));
132 assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133 });
134 scope.spawn(move |_| {
135 thread::sleep(ms(1000));
136 assert_eq!(r.recv(), Ok(8));
137 });
138 })
139 .unwrap();
140 }
141
142 #[test]
send()143 fn send() {
144 let (s, r) = bounded(0);
145
146 scope(|scope| {
147 scope.spawn(move |_| {
148 s.send(7).unwrap();
149 thread::sleep(ms(1000));
150 s.send(8).unwrap();
151 thread::sleep(ms(1000));
152 s.send(9).unwrap();
153 });
154 scope.spawn(move |_| {
155 thread::sleep(ms(1500));
156 assert_eq!(r.recv(), Ok(7));
157 assert_eq!(r.recv(), Ok(8));
158 assert_eq!(r.recv(), Ok(9));
159 });
160 })
161 .unwrap();
162 }
163
164 #[test]
send_timeout()165 fn send_timeout() {
166 let (s, r) = bounded(0);
167
168 scope(|scope| {
169 scope.spawn(move |_| {
170 assert_eq!(
171 s.send_timeout(7, ms(1000)),
172 Err(SendTimeoutError::Timeout(7))
173 );
174 assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175 assert_eq!(
176 s.send_timeout(9, ms(1000)),
177 Err(SendTimeoutError::Disconnected(9))
178 );
179 });
180 scope.spawn(move |_| {
181 thread::sleep(ms(1500));
182 assert_eq!(r.recv(), Ok(8));
183 });
184 })
185 .unwrap();
186 }
187
188 #[test]
len()189 fn len() {
190 const COUNT: usize = 25_000;
191
192 let (s, r) = bounded(0);
193
194 assert_eq!(s.len(), 0);
195 assert_eq!(r.len(), 0);
196
197 scope(|scope| {
198 scope.spawn(|_| {
199 for i in 0..COUNT {
200 assert_eq!(r.recv(), Ok(i));
201 assert_eq!(r.len(), 0);
202 }
203 });
204
205 scope.spawn(|_| {
206 for i in 0..COUNT {
207 s.send(i).unwrap();
208 assert_eq!(s.len(), 0);
209 }
210 });
211 })
212 .unwrap();
213
214 assert_eq!(s.len(), 0);
215 assert_eq!(r.len(), 0);
216 }
217
218 #[test]
disconnect_wakes_sender()219 fn disconnect_wakes_sender() {
220 let (s, r) = bounded(0);
221
222 scope(|scope| {
223 scope.spawn(move |_| {
224 assert_eq!(s.send(()), Err(SendError(())));
225 });
226 scope.spawn(move |_| {
227 thread::sleep(ms(1000));
228 drop(r);
229 });
230 })
231 .unwrap();
232 }
233
234 #[test]
disconnect_wakes_receiver()235 fn disconnect_wakes_receiver() {
236 let (s, r) = bounded::<()>(0);
237
238 scope(|scope| {
239 scope.spawn(move |_| {
240 assert_eq!(r.recv(), Err(RecvError));
241 });
242 scope.spawn(move |_| {
243 thread::sleep(ms(1000));
244 drop(s);
245 });
246 })
247 .unwrap();
248 }
249
250 #[test]
spsc()251 fn spsc() {
252 const COUNT: usize = 100_000;
253
254 let (s, r) = bounded(0);
255
256 scope(|scope| {
257 scope.spawn(move |_| {
258 for i in 0..COUNT {
259 assert_eq!(r.recv(), Ok(i));
260 }
261 assert_eq!(r.recv(), Err(RecvError));
262 });
263 scope.spawn(move |_| {
264 for i in 0..COUNT {
265 s.send(i).unwrap();
266 }
267 });
268 })
269 .unwrap();
270 }
271
272 #[test]
mpmc()273 fn mpmc() {
274 const COUNT: usize = 25_000;
275 const THREADS: usize = 4;
276
277 let (s, r) = bounded::<usize>(0);
278 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
279
280 scope(|scope| {
281 for _ in 0..THREADS {
282 scope.spawn(|_| {
283 for _ in 0..COUNT {
284 let n = r.recv().unwrap();
285 v[n].fetch_add(1, Ordering::SeqCst);
286 }
287 });
288 }
289 for _ in 0..THREADS {
290 scope.spawn(|_| {
291 for i in 0..COUNT {
292 s.send(i).unwrap();
293 }
294 });
295 }
296 })
297 .unwrap();
298
299 for c in v {
300 assert_eq!(c.load(Ordering::SeqCst), THREADS);
301 }
302 }
303
304 #[test]
stress_oneshot()305 fn stress_oneshot() {
306 const COUNT: usize = 10_000;
307
308 for _ in 0..COUNT {
309 let (s, r) = bounded(1);
310
311 scope(|scope| {
312 scope.spawn(|_| r.recv().unwrap());
313 scope.spawn(|_| s.send(0).unwrap());
314 })
315 .unwrap();
316 }
317 }
318
319 #[test]
stress_iter()320 fn stress_iter() {
321 const COUNT: usize = 1000;
322
323 let (request_s, request_r) = bounded(0);
324 let (response_s, response_r) = bounded(0);
325
326 scope(|scope| {
327 scope.spawn(move |_| {
328 let mut count = 0;
329 loop {
330 for x in response_r.try_iter() {
331 count += x;
332 if count == COUNT {
333 return;
334 }
335 }
336 let _ = request_s.try_send(());
337 }
338 });
339
340 for _ in request_r.iter() {
341 if response_s.send(1).is_err() {
342 break;
343 }
344 }
345 })
346 .unwrap();
347 }
348
349 #[test]
stress_timeout_two_threads()350 fn stress_timeout_two_threads() {
351 const COUNT: usize = 100;
352
353 let (s, r) = bounded(0);
354
355 scope(|scope| {
356 scope.spawn(|_| {
357 for i in 0..COUNT {
358 if i % 2 == 0 {
359 thread::sleep(ms(50));
360 }
361 loop {
362 if let Ok(()) = s.send_timeout(i, ms(10)) {
363 break;
364 }
365 }
366 }
367 });
368
369 scope.spawn(|_| {
370 for i in 0..COUNT {
371 if i % 2 == 0 {
372 thread::sleep(ms(50));
373 }
374 loop {
375 if let Ok(x) = r.recv_timeout(ms(10)) {
376 assert_eq!(x, i);
377 break;
378 }
379 }
380 }
381 });
382 })
383 .unwrap();
384 }
385
386 #[test]
drops()387 fn drops() {
388 static DROPS: AtomicUsize = AtomicUsize::new(0);
389
390 #[derive(Debug, PartialEq)]
391 struct DropCounter;
392
393 impl Drop for DropCounter {
394 fn drop(&mut self) {
395 DROPS.fetch_add(1, Ordering::SeqCst);
396 }
397 }
398
399 let mut rng = thread_rng();
400
401 for _ in 0..100 {
402 let steps = rng.gen_range(0..3_000);
403
404 DROPS.store(0, Ordering::SeqCst);
405 let (s, r) = bounded::<DropCounter>(0);
406
407 scope(|scope| {
408 scope.spawn(|_| {
409 for _ in 0..steps {
410 r.recv().unwrap();
411 }
412 });
413
414 scope.spawn(|_| {
415 for _ in 0..steps {
416 s.send(DropCounter).unwrap();
417 }
418 });
419 })
420 .unwrap();
421
422 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
423 drop(s);
424 drop(r);
425 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
426 }
427 }
428
429 #[test]
fairness()430 fn fairness() {
431 const COUNT: usize = 10_000;
432
433 let (s1, r1) = bounded::<()>(0);
434 let (s2, r2) = bounded::<()>(0);
435
436 scope(|scope| {
437 scope.spawn(|_| {
438 let mut hits = [0usize; 2];
439 for _ in 0..COUNT {
440 select! {
441 recv(r1) -> _ => hits[0] += 1,
442 recv(r2) -> _ => hits[1] += 1,
443 }
444 }
445 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
446 });
447
448 let mut hits = [0usize; 2];
449 for _ in 0..COUNT {
450 select! {
451 send(s1, ()) -> _ => hits[0] += 1,
452 send(s2, ()) -> _ => hits[1] += 1,
453 }
454 }
455 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
456 })
457 .unwrap();
458 }
459
460 #[test]
fairness_duplicates()461 fn fairness_duplicates() {
462 const COUNT: usize = 10_000;
463
464 let (s, r) = bounded::<()>(0);
465
466 scope(|scope| {
467 scope.spawn(|_| {
468 let mut hits = [0usize; 5];
469 for _ in 0..COUNT {
470 select! {
471 recv(r) -> _ => hits[0] += 1,
472 recv(r) -> _ => hits[1] += 1,
473 recv(r) -> _ => hits[2] += 1,
474 recv(r) -> _ => hits[3] += 1,
475 recv(r) -> _ => hits[4] += 1,
476 }
477 }
478 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
479 });
480
481 let mut hits = [0usize; 5];
482 for _ in 0..COUNT {
483 select! {
484 send(s, ()) -> _ => hits[0] += 1,
485 send(s, ()) -> _ => hits[1] += 1,
486 send(s, ()) -> _ => hits[2] += 1,
487 send(s, ()) -> _ => hits[3] += 1,
488 send(s, ()) -> _ => hits[4] += 1,
489 }
490 }
491 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
492 })
493 .unwrap();
494 }
495
496 #[test]
recv_in_send()497 fn recv_in_send() {
498 let (s, r) = bounded(0);
499
500 scope(|scope| {
501 scope.spawn(|_| {
502 thread::sleep(ms(100));
503 r.recv()
504 });
505
506 scope.spawn(|_| {
507 thread::sleep(ms(500));
508 s.send(()).unwrap();
509 });
510
511 select! {
512 send(s, r.recv().unwrap()) -> _ => {}
513 }
514 })
515 .unwrap();
516 }
517
518 #[test]
channel_through_channel()519 fn channel_through_channel() {
520 const COUNT: usize = 1000;
521
522 type T = Box<dyn Any + Send>;
523
524 let (s, r) = bounded::<T>(0);
525
526 scope(|scope| {
527 scope.spawn(move |_| {
528 let mut s = s;
529
530 for _ in 0..COUNT {
531 let (new_s, new_r) = bounded(0);
532 let new_r: T = Box::new(Some(new_r));
533
534 s.send(new_r).unwrap();
535 s = new_s;
536 }
537 });
538
539 scope.spawn(move |_| {
540 let mut r = r;
541
542 for _ in 0..COUNT {
543 r = r
544 .recv()
545 .unwrap()
546 .downcast_mut::<Option<Receiver<T>>>()
547 .unwrap()
548 .take()
549 .unwrap()
550 }
551 });
552 })
553 .unwrap();
554 }
555