1 use super::*;
2 use crate::env;
3 use crate::thread;
4 use crate::time::Duration;
5 
stress_factor() -> usize6 pub fn stress_factor() -> usize {
7     match env::var("RUST_TEST_STRESS") {
8         Ok(val) => val.parse().unwrap(),
9         Err(..) => 1,
10     }
11 }
12 
13 #[test]
smoke()14 fn smoke() {
15     let (tx, rx) = sync_channel::<i32>(1);
16     tx.send(1).unwrap();
17     assert_eq!(rx.recv().unwrap(), 1);
18 }
19 
20 #[test]
drop_full()21 fn drop_full() {
22     let (tx, _rx) = sync_channel::<Box<isize>>(1);
23     tx.send(box 1).unwrap();
24 }
25 
26 #[test]
smoke_shared()27 fn smoke_shared() {
28     let (tx, rx) = sync_channel::<i32>(1);
29     tx.send(1).unwrap();
30     assert_eq!(rx.recv().unwrap(), 1);
31     let tx = tx.clone();
32     tx.send(1).unwrap();
33     assert_eq!(rx.recv().unwrap(), 1);
34 }
35 
36 #[test]
recv_timeout()37 fn recv_timeout() {
38     let (tx, rx) = sync_channel::<i32>(1);
39     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
40     tx.send(1).unwrap();
41     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
42 }
43 
44 #[test]
smoke_threads()45 fn smoke_threads() {
46     let (tx, rx) = sync_channel::<i32>(0);
47     let _t = thread::spawn(move || {
48         tx.send(1).unwrap();
49     });
50     assert_eq!(rx.recv().unwrap(), 1);
51 }
52 
53 #[test]
smoke_port_gone()54 fn smoke_port_gone() {
55     let (tx, rx) = sync_channel::<i32>(0);
56     drop(rx);
57     assert!(tx.send(1).is_err());
58 }
59 
60 #[test]
smoke_shared_port_gone2()61 fn smoke_shared_port_gone2() {
62     let (tx, rx) = sync_channel::<i32>(0);
63     drop(rx);
64     let tx2 = tx.clone();
65     drop(tx);
66     assert!(tx2.send(1).is_err());
67 }
68 
69 #[test]
port_gone_concurrent()70 fn port_gone_concurrent() {
71     let (tx, rx) = sync_channel::<i32>(0);
72     let _t = thread::spawn(move || {
73         rx.recv().unwrap();
74     });
75     while tx.send(1).is_ok() {}
76 }
77 
78 #[test]
port_gone_concurrent_shared()79 fn port_gone_concurrent_shared() {
80     let (tx, rx) = sync_channel::<i32>(0);
81     let tx2 = tx.clone();
82     let _t = thread::spawn(move || {
83         rx.recv().unwrap();
84     });
85     while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
86 }
87 
88 #[test]
smoke_chan_gone()89 fn smoke_chan_gone() {
90     let (tx, rx) = sync_channel::<i32>(0);
91     drop(tx);
92     assert!(rx.recv().is_err());
93 }
94 
95 #[test]
smoke_chan_gone_shared()96 fn smoke_chan_gone_shared() {
97     let (tx, rx) = sync_channel::<()>(0);
98     let tx2 = tx.clone();
99     drop(tx);
100     drop(tx2);
101     assert!(rx.recv().is_err());
102 }
103 
104 #[test]
chan_gone_concurrent()105 fn chan_gone_concurrent() {
106     let (tx, rx) = sync_channel::<i32>(0);
107     thread::spawn(move || {
108         tx.send(1).unwrap();
109         tx.send(1).unwrap();
110     });
111     while rx.recv().is_ok() {}
112 }
113 
114 #[test]
stress()115 fn stress() {
116     let (tx, rx) = sync_channel::<i32>(0);
117     thread::spawn(move || {
118         for _ in 0..10000 {
119             tx.send(1).unwrap();
120         }
121     });
122     for _ in 0..10000 {
123         assert_eq!(rx.recv().unwrap(), 1);
124     }
125 }
126 
127 #[test]
stress_recv_timeout_two_threads()128 fn stress_recv_timeout_two_threads() {
129     let (tx, rx) = sync_channel::<i32>(0);
130 
131     thread::spawn(move || {
132         for _ in 0..10000 {
133             tx.send(1).unwrap();
134         }
135     });
136 
137     let mut recv_count = 0;
138     loop {
139         match rx.recv_timeout(Duration::from_millis(1)) {
140             Ok(v) => {
141                 assert_eq!(v, 1);
142                 recv_count += 1;
143             }
144             Err(RecvTimeoutError::Timeout) => continue,
145             Err(RecvTimeoutError::Disconnected) => break,
146         }
147     }
148 
149     assert_eq!(recv_count, 10000);
150 }
151 
152 #[test]
stress_recv_timeout_shared()153 fn stress_recv_timeout_shared() {
154     const AMT: u32 = 1000;
155     const NTHREADS: u32 = 8;
156     let (tx, rx) = sync_channel::<i32>(0);
157     let (dtx, drx) = sync_channel::<()>(0);
158 
159     thread::spawn(move || {
160         let mut recv_count = 0;
161         loop {
162             match rx.recv_timeout(Duration::from_millis(10)) {
163                 Ok(v) => {
164                     assert_eq!(v, 1);
165                     recv_count += 1;
166                 }
167                 Err(RecvTimeoutError::Timeout) => continue,
168                 Err(RecvTimeoutError::Disconnected) => break,
169             }
170         }
171 
172         assert_eq!(recv_count, AMT * NTHREADS);
173         assert!(rx.try_recv().is_err());
174 
175         dtx.send(()).unwrap();
176     });
177 
178     for _ in 0..NTHREADS {
179         let tx = tx.clone();
180         thread::spawn(move || {
181             for _ in 0..AMT {
182                 tx.send(1).unwrap();
183             }
184         });
185     }
186 
187     drop(tx);
188 
189     drx.recv().unwrap();
190 }
191 
192 #[test]
stress_shared()193 fn stress_shared() {
194     const AMT: u32 = 1000;
195     const NTHREADS: u32 = 8;
196     let (tx, rx) = sync_channel::<i32>(0);
197     let (dtx, drx) = sync_channel::<()>(0);
198 
199     thread::spawn(move || {
200         for _ in 0..AMT * NTHREADS {
201             assert_eq!(rx.recv().unwrap(), 1);
202         }
203         match rx.try_recv() {
204             Ok(..) => panic!(),
205             _ => {}
206         }
207         dtx.send(()).unwrap();
208     });
209 
210     for _ in 0..NTHREADS {
211         let tx = tx.clone();
212         thread::spawn(move || {
213             for _ in 0..AMT {
214                 tx.send(1).unwrap();
215             }
216         });
217     }
218     drop(tx);
219     drx.recv().unwrap();
220 }
221 
222 #[test]
oneshot_single_thread_close_port_first()223 fn oneshot_single_thread_close_port_first() {
224     // Simple test of closing without sending
225     let (_tx, rx) = sync_channel::<i32>(0);
226     drop(rx);
227 }
228 
229 #[test]
oneshot_single_thread_close_chan_first()230 fn oneshot_single_thread_close_chan_first() {
231     // Simple test of closing without sending
232     let (tx, _rx) = sync_channel::<i32>(0);
233     drop(tx);
234 }
235 
236 #[test]
oneshot_single_thread_send_port_close()237 fn oneshot_single_thread_send_port_close() {
238     // Testing that the sender cleans up the payload if receiver is closed
239     let (tx, rx) = sync_channel::<Box<i32>>(0);
240     drop(rx);
241     assert!(tx.send(box 0).is_err());
242 }
243 
244 #[test]
oneshot_single_thread_recv_chan_close()245 fn oneshot_single_thread_recv_chan_close() {
246     // Receiving on a closed chan will panic
247     let res = thread::spawn(move || {
248         let (tx, rx) = sync_channel::<i32>(0);
249         drop(tx);
250         rx.recv().unwrap();
251     })
252     .join();
253     // What is our res?
254     assert!(res.is_err());
255 }
256 
257 #[test]
oneshot_single_thread_send_then_recv()258 fn oneshot_single_thread_send_then_recv() {
259     let (tx, rx) = sync_channel::<Box<i32>>(1);
260     tx.send(box 10).unwrap();
261     assert!(*rx.recv().unwrap() == 10);
262 }
263 
264 #[test]
oneshot_single_thread_try_send_open()265 fn oneshot_single_thread_try_send_open() {
266     let (tx, rx) = sync_channel::<i32>(1);
267     assert_eq!(tx.try_send(10), Ok(()));
268     assert!(rx.recv().unwrap() == 10);
269 }
270 
271 #[test]
oneshot_single_thread_try_send_closed()272 fn oneshot_single_thread_try_send_closed() {
273     let (tx, rx) = sync_channel::<i32>(0);
274     drop(rx);
275     assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
276 }
277 
278 #[test]
oneshot_single_thread_try_send_closed2()279 fn oneshot_single_thread_try_send_closed2() {
280     let (tx, _rx) = sync_channel::<i32>(0);
281     assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
282 }
283 
284 #[test]
oneshot_single_thread_try_recv_open()285 fn oneshot_single_thread_try_recv_open() {
286     let (tx, rx) = sync_channel::<i32>(1);
287     tx.send(10).unwrap();
288     assert!(rx.recv() == Ok(10));
289 }
290 
291 #[test]
oneshot_single_thread_try_recv_closed()292 fn oneshot_single_thread_try_recv_closed() {
293     let (tx, rx) = sync_channel::<i32>(0);
294     drop(tx);
295     assert!(rx.recv().is_err());
296 }
297 
298 #[test]
oneshot_single_thread_try_recv_closed_with_data()299 fn oneshot_single_thread_try_recv_closed_with_data() {
300     let (tx, rx) = sync_channel::<i32>(1);
301     tx.send(10).unwrap();
302     drop(tx);
303     assert_eq!(rx.try_recv(), Ok(10));
304     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
305 }
306 
307 #[test]
oneshot_single_thread_peek_data()308 fn oneshot_single_thread_peek_data() {
309     let (tx, rx) = sync_channel::<i32>(1);
310     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
311     tx.send(10).unwrap();
312     assert_eq!(rx.try_recv(), Ok(10));
313 }
314 
315 #[test]
oneshot_single_thread_peek_close()316 fn oneshot_single_thread_peek_close() {
317     let (tx, rx) = sync_channel::<i32>(0);
318     drop(tx);
319     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
320     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
321 }
322 
323 #[test]
oneshot_single_thread_peek_open()324 fn oneshot_single_thread_peek_open() {
325     let (_tx, rx) = sync_channel::<i32>(0);
326     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
327 }
328 
329 #[test]
oneshot_multi_task_recv_then_send()330 fn oneshot_multi_task_recv_then_send() {
331     let (tx, rx) = sync_channel::<Box<i32>>(0);
332     let _t = thread::spawn(move || {
333         assert!(*rx.recv().unwrap() == 10);
334     });
335 
336     tx.send(box 10).unwrap();
337 }
338 
339 #[test]
oneshot_multi_task_recv_then_close()340 fn oneshot_multi_task_recv_then_close() {
341     let (tx, rx) = sync_channel::<Box<i32>>(0);
342     let _t = thread::spawn(move || {
343         drop(tx);
344     });
345     let res = thread::spawn(move || {
346         assert!(*rx.recv().unwrap() == 10);
347     })
348     .join();
349     assert!(res.is_err());
350 }
351 
352 #[test]
oneshot_multi_thread_close_stress()353 fn oneshot_multi_thread_close_stress() {
354     for _ in 0..stress_factor() {
355         let (tx, rx) = sync_channel::<i32>(0);
356         let _t = thread::spawn(move || {
357             drop(rx);
358         });
359         drop(tx);
360     }
361 }
362 
363 #[test]
oneshot_multi_thread_send_close_stress()364 fn oneshot_multi_thread_send_close_stress() {
365     for _ in 0..stress_factor() {
366         let (tx, rx) = sync_channel::<i32>(0);
367         let _t = thread::spawn(move || {
368             drop(rx);
369         });
370         let _ = thread::spawn(move || {
371             tx.send(1).unwrap();
372         })
373         .join();
374     }
375 }
376 
377 #[test]
oneshot_multi_thread_recv_close_stress()378 fn oneshot_multi_thread_recv_close_stress() {
379     for _ in 0..stress_factor() {
380         let (tx, rx) = sync_channel::<i32>(0);
381         let _t = thread::spawn(move || {
382             let res = thread::spawn(move || {
383                 rx.recv().unwrap();
384             })
385             .join();
386             assert!(res.is_err());
387         });
388         let _t = thread::spawn(move || {
389             thread::spawn(move || {
390                 drop(tx);
391             });
392         });
393     }
394 }
395 
396 #[test]
oneshot_multi_thread_send_recv_stress()397 fn oneshot_multi_thread_send_recv_stress() {
398     for _ in 0..stress_factor() {
399         let (tx, rx) = sync_channel::<Box<i32>>(0);
400         let _t = thread::spawn(move || {
401             tx.send(box 10).unwrap();
402         });
403         assert!(*rx.recv().unwrap() == 10);
404     }
405 }
406 
407 #[test]
stream_send_recv_stress()408 fn stream_send_recv_stress() {
409     for _ in 0..stress_factor() {
410         let (tx, rx) = sync_channel::<Box<i32>>(0);
411 
412         send(tx, 0);
413         recv(rx, 0);
414 
415         fn send(tx: SyncSender<Box<i32>>, i: i32) {
416             if i == 10 {
417                 return;
418             }
419 
420             thread::spawn(move || {
421                 tx.send(box i).unwrap();
422                 send(tx, i + 1);
423             });
424         }
425 
426         fn recv(rx: Receiver<Box<i32>>, i: i32) {
427             if i == 10 {
428                 return;
429             }
430 
431             thread::spawn(move || {
432                 assert!(*rx.recv().unwrap() == i);
433                 recv(rx, i + 1);
434             });
435         }
436     }
437 }
438 
439 #[test]
recv_a_lot()440 fn recv_a_lot() {
441     // Regression test that we don't run out of stack in scheduler context
442     let (tx, rx) = sync_channel(10000);
443     for _ in 0..10000 {
444         tx.send(()).unwrap();
445     }
446     for _ in 0..10000 {
447         rx.recv().unwrap();
448     }
449 }
450 
451 #[test]
shared_chan_stress()452 fn shared_chan_stress() {
453     let (tx, rx) = sync_channel(0);
454     let total = stress_factor() + 100;
455     for _ in 0..total {
456         let tx = tx.clone();
457         thread::spawn(move || {
458             tx.send(()).unwrap();
459         });
460     }
461 
462     for _ in 0..total {
463         rx.recv().unwrap();
464     }
465 }
466 
467 #[test]
test_nested_recv_iter()468 fn test_nested_recv_iter() {
469     let (tx, rx) = sync_channel::<i32>(0);
470     let (total_tx, total_rx) = sync_channel::<i32>(0);
471 
472     let _t = thread::spawn(move || {
473         let mut acc = 0;
474         for x in rx.iter() {
475             acc += x;
476         }
477         total_tx.send(acc).unwrap();
478     });
479 
480     tx.send(3).unwrap();
481     tx.send(1).unwrap();
482     tx.send(2).unwrap();
483     drop(tx);
484     assert_eq!(total_rx.recv().unwrap(), 6);
485 }
486 
487 #[test]
test_recv_iter_break()488 fn test_recv_iter_break() {
489     let (tx, rx) = sync_channel::<i32>(0);
490     let (count_tx, count_rx) = sync_channel(0);
491 
492     let _t = thread::spawn(move || {
493         let mut count = 0;
494         for x in rx.iter() {
495             if count >= 3 {
496                 break;
497             } else {
498                 count += x;
499             }
500         }
501         count_tx.send(count).unwrap();
502     });
503 
504     tx.send(2).unwrap();
505     tx.send(2).unwrap();
506     tx.send(2).unwrap();
507     let _ = tx.try_send(2);
508     drop(tx);
509     assert_eq!(count_rx.recv().unwrap(), 4);
510 }
511 
512 #[test]
try_recv_states()513 fn try_recv_states() {
514     let (tx1, rx1) = sync_channel::<i32>(1);
515     let (tx2, rx2) = sync_channel::<()>(1);
516     let (tx3, rx3) = sync_channel::<()>(1);
517     let _t = thread::spawn(move || {
518         rx2.recv().unwrap();
519         tx1.send(1).unwrap();
520         tx3.send(()).unwrap();
521         rx2.recv().unwrap();
522         drop(tx1);
523         tx3.send(()).unwrap();
524     });
525 
526     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
527     tx2.send(()).unwrap();
528     rx3.recv().unwrap();
529     assert_eq!(rx1.try_recv(), Ok(1));
530     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
531     tx2.send(()).unwrap();
532     rx3.recv().unwrap();
533     assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
534 }
535 
536 // This bug used to end up in a livelock inside of the Receiver destructor
537 // because the internal state of the Shared packet was corrupted
538 #[test]
destroy_upgraded_shared_port_when_sender_still_active()539 fn destroy_upgraded_shared_port_when_sender_still_active() {
540     let (tx, rx) = sync_channel::<()>(0);
541     let (tx2, rx2) = sync_channel::<()>(0);
542     let _t = thread::spawn(move || {
543         rx.recv().unwrap(); // wait on a oneshot
544         drop(rx); // destroy a shared
545         tx2.send(()).unwrap();
546     });
547     // make sure the other thread has gone to sleep
548     for _ in 0..5000 {
549         thread::yield_now();
550     }
551 
552     // upgrade to a shared chan and send a message
553     let t = tx.clone();
554     drop(tx);
555     t.send(()).unwrap();
556 
557     // wait for the child thread to exit before we exit
558     rx2.recv().unwrap();
559 }
560 
561 #[test]
send1()562 fn send1() {
563     let (tx, rx) = sync_channel::<i32>(0);
564     let _t = thread::spawn(move || {
565         rx.recv().unwrap();
566     });
567     assert_eq!(tx.send(1), Ok(()));
568 }
569 
570 #[test]
send2()571 fn send2() {
572     let (tx, rx) = sync_channel::<i32>(0);
573     let _t = thread::spawn(move || {
574         drop(rx);
575     });
576     assert!(tx.send(1).is_err());
577 }
578 
579 #[test]
send3()580 fn send3() {
581     let (tx, rx) = sync_channel::<i32>(1);
582     assert_eq!(tx.send(1), Ok(()));
583     let _t = thread::spawn(move || {
584         drop(rx);
585     });
586     assert!(tx.send(1).is_err());
587 }
588 
589 #[test]
send4()590 fn send4() {
591     let (tx, rx) = sync_channel::<i32>(0);
592     let tx2 = tx.clone();
593     let (done, donerx) = channel();
594     let done2 = done.clone();
595     let _t = thread::spawn(move || {
596         assert!(tx.send(1).is_err());
597         done.send(()).unwrap();
598     });
599     let _t = thread::spawn(move || {
600         assert!(tx2.send(2).is_err());
601         done2.send(()).unwrap();
602     });
603     drop(rx);
604     donerx.recv().unwrap();
605     donerx.recv().unwrap();
606 }
607 
608 #[test]
try_send1()609 fn try_send1() {
610     let (tx, _rx) = sync_channel::<i32>(0);
611     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
612 }
613 
614 #[test]
try_send2()615 fn try_send2() {
616     let (tx, _rx) = sync_channel::<i32>(1);
617     assert_eq!(tx.try_send(1), Ok(()));
618     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
619 }
620 
621 #[test]
try_send3()622 fn try_send3() {
623     let (tx, rx) = sync_channel::<i32>(1);
624     assert_eq!(tx.try_send(1), Ok(()));
625     drop(rx);
626     assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
627 }
628 
629 #[test]
issue_15761()630 fn issue_15761() {
631     fn repro() {
632         let (tx1, rx1) = sync_channel::<()>(3);
633         let (tx2, rx2) = sync_channel::<()>(3);
634 
635         let _t = thread::spawn(move || {
636             rx1.recv().unwrap();
637             tx2.try_send(()).unwrap();
638         });
639 
640         tx1.try_send(()).unwrap();
641         rx2.recv().unwrap();
642     }
643 
644     for _ in 0..100 {
645         repro()
646     }
647 }
648