1 use super::*;
2 use crate::env;
3 use crate::thread;
4 use crate::time::Duration;
5
stress_factor() -> usize6 pub fn stress_factor() -> usize {
7 match env::var("RUST_TEST_STRESS") {
8 Ok(val) => val.parse().unwrap(),
9 Err(..) => 1,
10 }
11 }
12
13 #[test]
smoke()14 fn smoke() {
15 let (tx, rx) = sync_channel::<i32>(1);
16 tx.send(1).unwrap();
17 assert_eq!(rx.recv().unwrap(), 1);
18 }
19
20 #[test]
drop_full()21 fn drop_full() {
22 let (tx, _rx) = sync_channel::<Box<isize>>(1);
23 tx.send(box 1).unwrap();
24 }
25
26 #[test]
smoke_shared()27 fn smoke_shared() {
28 let (tx, rx) = sync_channel::<i32>(1);
29 tx.send(1).unwrap();
30 assert_eq!(rx.recv().unwrap(), 1);
31 let tx = tx.clone();
32 tx.send(1).unwrap();
33 assert_eq!(rx.recv().unwrap(), 1);
34 }
35
36 #[test]
recv_timeout()37 fn recv_timeout() {
38 let (tx, rx) = sync_channel::<i32>(1);
39 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
40 tx.send(1).unwrap();
41 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
42 }
43
44 #[test]
smoke_threads()45 fn smoke_threads() {
46 let (tx, rx) = sync_channel::<i32>(0);
47 let _t = thread::spawn(move || {
48 tx.send(1).unwrap();
49 });
50 assert_eq!(rx.recv().unwrap(), 1);
51 }
52
53 #[test]
smoke_port_gone()54 fn smoke_port_gone() {
55 let (tx, rx) = sync_channel::<i32>(0);
56 drop(rx);
57 assert!(tx.send(1).is_err());
58 }
59
60 #[test]
smoke_shared_port_gone2()61 fn smoke_shared_port_gone2() {
62 let (tx, rx) = sync_channel::<i32>(0);
63 drop(rx);
64 let tx2 = tx.clone();
65 drop(tx);
66 assert!(tx2.send(1).is_err());
67 }
68
69 #[test]
port_gone_concurrent()70 fn port_gone_concurrent() {
71 let (tx, rx) = sync_channel::<i32>(0);
72 let _t = thread::spawn(move || {
73 rx.recv().unwrap();
74 });
75 while tx.send(1).is_ok() {}
76 }
77
78 #[test]
port_gone_concurrent_shared()79 fn port_gone_concurrent_shared() {
80 let (tx, rx) = sync_channel::<i32>(0);
81 let tx2 = tx.clone();
82 let _t = thread::spawn(move || {
83 rx.recv().unwrap();
84 });
85 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
86 }
87
88 #[test]
smoke_chan_gone()89 fn smoke_chan_gone() {
90 let (tx, rx) = sync_channel::<i32>(0);
91 drop(tx);
92 assert!(rx.recv().is_err());
93 }
94
95 #[test]
smoke_chan_gone_shared()96 fn smoke_chan_gone_shared() {
97 let (tx, rx) = sync_channel::<()>(0);
98 let tx2 = tx.clone();
99 drop(tx);
100 drop(tx2);
101 assert!(rx.recv().is_err());
102 }
103
104 #[test]
chan_gone_concurrent()105 fn chan_gone_concurrent() {
106 let (tx, rx) = sync_channel::<i32>(0);
107 thread::spawn(move || {
108 tx.send(1).unwrap();
109 tx.send(1).unwrap();
110 });
111 while rx.recv().is_ok() {}
112 }
113
114 #[test]
stress()115 fn stress() {
116 let (tx, rx) = sync_channel::<i32>(0);
117 thread::spawn(move || {
118 for _ in 0..10000 {
119 tx.send(1).unwrap();
120 }
121 });
122 for _ in 0..10000 {
123 assert_eq!(rx.recv().unwrap(), 1);
124 }
125 }
126
127 #[test]
stress_recv_timeout_two_threads()128 fn stress_recv_timeout_two_threads() {
129 let (tx, rx) = sync_channel::<i32>(0);
130
131 thread::spawn(move || {
132 for _ in 0..10000 {
133 tx.send(1).unwrap();
134 }
135 });
136
137 let mut recv_count = 0;
138 loop {
139 match rx.recv_timeout(Duration::from_millis(1)) {
140 Ok(v) => {
141 assert_eq!(v, 1);
142 recv_count += 1;
143 }
144 Err(RecvTimeoutError::Timeout) => continue,
145 Err(RecvTimeoutError::Disconnected) => break,
146 }
147 }
148
149 assert_eq!(recv_count, 10000);
150 }
151
152 #[test]
stress_recv_timeout_shared()153 fn stress_recv_timeout_shared() {
154 const AMT: u32 = 1000;
155 const NTHREADS: u32 = 8;
156 let (tx, rx) = sync_channel::<i32>(0);
157 let (dtx, drx) = sync_channel::<()>(0);
158
159 thread::spawn(move || {
160 let mut recv_count = 0;
161 loop {
162 match rx.recv_timeout(Duration::from_millis(10)) {
163 Ok(v) => {
164 assert_eq!(v, 1);
165 recv_count += 1;
166 }
167 Err(RecvTimeoutError::Timeout) => continue,
168 Err(RecvTimeoutError::Disconnected) => break,
169 }
170 }
171
172 assert_eq!(recv_count, AMT * NTHREADS);
173 assert!(rx.try_recv().is_err());
174
175 dtx.send(()).unwrap();
176 });
177
178 for _ in 0..NTHREADS {
179 let tx = tx.clone();
180 thread::spawn(move || {
181 for _ in 0..AMT {
182 tx.send(1).unwrap();
183 }
184 });
185 }
186
187 drop(tx);
188
189 drx.recv().unwrap();
190 }
191
192 #[test]
stress_shared()193 fn stress_shared() {
194 const AMT: u32 = 1000;
195 const NTHREADS: u32 = 8;
196 let (tx, rx) = sync_channel::<i32>(0);
197 let (dtx, drx) = sync_channel::<()>(0);
198
199 thread::spawn(move || {
200 for _ in 0..AMT * NTHREADS {
201 assert_eq!(rx.recv().unwrap(), 1);
202 }
203 match rx.try_recv() {
204 Ok(..) => panic!(),
205 _ => {}
206 }
207 dtx.send(()).unwrap();
208 });
209
210 for _ in 0..NTHREADS {
211 let tx = tx.clone();
212 thread::spawn(move || {
213 for _ in 0..AMT {
214 tx.send(1).unwrap();
215 }
216 });
217 }
218 drop(tx);
219 drx.recv().unwrap();
220 }
221
222 #[test]
oneshot_single_thread_close_port_first()223 fn oneshot_single_thread_close_port_first() {
224 // Simple test of closing without sending
225 let (_tx, rx) = sync_channel::<i32>(0);
226 drop(rx);
227 }
228
229 #[test]
oneshot_single_thread_close_chan_first()230 fn oneshot_single_thread_close_chan_first() {
231 // Simple test of closing without sending
232 let (tx, _rx) = sync_channel::<i32>(0);
233 drop(tx);
234 }
235
236 #[test]
oneshot_single_thread_send_port_close()237 fn oneshot_single_thread_send_port_close() {
238 // Testing that the sender cleans up the payload if receiver is closed
239 let (tx, rx) = sync_channel::<Box<i32>>(0);
240 drop(rx);
241 assert!(tx.send(box 0).is_err());
242 }
243
244 #[test]
oneshot_single_thread_recv_chan_close()245 fn oneshot_single_thread_recv_chan_close() {
246 // Receiving on a closed chan will panic
247 let res = thread::spawn(move || {
248 let (tx, rx) = sync_channel::<i32>(0);
249 drop(tx);
250 rx.recv().unwrap();
251 })
252 .join();
253 // What is our res?
254 assert!(res.is_err());
255 }
256
257 #[test]
oneshot_single_thread_send_then_recv()258 fn oneshot_single_thread_send_then_recv() {
259 let (tx, rx) = sync_channel::<Box<i32>>(1);
260 tx.send(box 10).unwrap();
261 assert!(*rx.recv().unwrap() == 10);
262 }
263
264 #[test]
oneshot_single_thread_try_send_open()265 fn oneshot_single_thread_try_send_open() {
266 let (tx, rx) = sync_channel::<i32>(1);
267 assert_eq!(tx.try_send(10), Ok(()));
268 assert!(rx.recv().unwrap() == 10);
269 }
270
271 #[test]
oneshot_single_thread_try_send_closed()272 fn oneshot_single_thread_try_send_closed() {
273 let (tx, rx) = sync_channel::<i32>(0);
274 drop(rx);
275 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
276 }
277
278 #[test]
oneshot_single_thread_try_send_closed2()279 fn oneshot_single_thread_try_send_closed2() {
280 let (tx, _rx) = sync_channel::<i32>(0);
281 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
282 }
283
284 #[test]
oneshot_single_thread_try_recv_open()285 fn oneshot_single_thread_try_recv_open() {
286 let (tx, rx) = sync_channel::<i32>(1);
287 tx.send(10).unwrap();
288 assert!(rx.recv() == Ok(10));
289 }
290
291 #[test]
oneshot_single_thread_try_recv_closed()292 fn oneshot_single_thread_try_recv_closed() {
293 let (tx, rx) = sync_channel::<i32>(0);
294 drop(tx);
295 assert!(rx.recv().is_err());
296 }
297
298 #[test]
oneshot_single_thread_try_recv_closed_with_data()299 fn oneshot_single_thread_try_recv_closed_with_data() {
300 let (tx, rx) = sync_channel::<i32>(1);
301 tx.send(10).unwrap();
302 drop(tx);
303 assert_eq!(rx.try_recv(), Ok(10));
304 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
305 }
306
307 #[test]
oneshot_single_thread_peek_data()308 fn oneshot_single_thread_peek_data() {
309 let (tx, rx) = sync_channel::<i32>(1);
310 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
311 tx.send(10).unwrap();
312 assert_eq!(rx.try_recv(), Ok(10));
313 }
314
315 #[test]
oneshot_single_thread_peek_close()316 fn oneshot_single_thread_peek_close() {
317 let (tx, rx) = sync_channel::<i32>(0);
318 drop(tx);
319 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
320 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
321 }
322
323 #[test]
oneshot_single_thread_peek_open()324 fn oneshot_single_thread_peek_open() {
325 let (_tx, rx) = sync_channel::<i32>(0);
326 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
327 }
328
329 #[test]
oneshot_multi_task_recv_then_send()330 fn oneshot_multi_task_recv_then_send() {
331 let (tx, rx) = sync_channel::<Box<i32>>(0);
332 let _t = thread::spawn(move || {
333 assert!(*rx.recv().unwrap() == 10);
334 });
335
336 tx.send(box 10).unwrap();
337 }
338
339 #[test]
oneshot_multi_task_recv_then_close()340 fn oneshot_multi_task_recv_then_close() {
341 let (tx, rx) = sync_channel::<Box<i32>>(0);
342 let _t = thread::spawn(move || {
343 drop(tx);
344 });
345 let res = thread::spawn(move || {
346 assert!(*rx.recv().unwrap() == 10);
347 })
348 .join();
349 assert!(res.is_err());
350 }
351
352 #[test]
oneshot_multi_thread_close_stress()353 fn oneshot_multi_thread_close_stress() {
354 for _ in 0..stress_factor() {
355 let (tx, rx) = sync_channel::<i32>(0);
356 let _t = thread::spawn(move || {
357 drop(rx);
358 });
359 drop(tx);
360 }
361 }
362
363 #[test]
oneshot_multi_thread_send_close_stress()364 fn oneshot_multi_thread_send_close_stress() {
365 for _ in 0..stress_factor() {
366 let (tx, rx) = sync_channel::<i32>(0);
367 let _t = thread::spawn(move || {
368 drop(rx);
369 });
370 let _ = thread::spawn(move || {
371 tx.send(1).unwrap();
372 })
373 .join();
374 }
375 }
376
377 #[test]
oneshot_multi_thread_recv_close_stress()378 fn oneshot_multi_thread_recv_close_stress() {
379 for _ in 0..stress_factor() {
380 let (tx, rx) = sync_channel::<i32>(0);
381 let _t = thread::spawn(move || {
382 let res = thread::spawn(move || {
383 rx.recv().unwrap();
384 })
385 .join();
386 assert!(res.is_err());
387 });
388 let _t = thread::spawn(move || {
389 thread::spawn(move || {
390 drop(tx);
391 });
392 });
393 }
394 }
395
396 #[test]
oneshot_multi_thread_send_recv_stress()397 fn oneshot_multi_thread_send_recv_stress() {
398 for _ in 0..stress_factor() {
399 let (tx, rx) = sync_channel::<Box<i32>>(0);
400 let _t = thread::spawn(move || {
401 tx.send(box 10).unwrap();
402 });
403 assert!(*rx.recv().unwrap() == 10);
404 }
405 }
406
407 #[test]
stream_send_recv_stress()408 fn stream_send_recv_stress() {
409 for _ in 0..stress_factor() {
410 let (tx, rx) = sync_channel::<Box<i32>>(0);
411
412 send(tx, 0);
413 recv(rx, 0);
414
415 fn send(tx: SyncSender<Box<i32>>, i: i32) {
416 if i == 10 {
417 return;
418 }
419
420 thread::spawn(move || {
421 tx.send(box i).unwrap();
422 send(tx, i + 1);
423 });
424 }
425
426 fn recv(rx: Receiver<Box<i32>>, i: i32) {
427 if i == 10 {
428 return;
429 }
430
431 thread::spawn(move || {
432 assert!(*rx.recv().unwrap() == i);
433 recv(rx, i + 1);
434 });
435 }
436 }
437 }
438
439 #[test]
recv_a_lot()440 fn recv_a_lot() {
441 // Regression test that we don't run out of stack in scheduler context
442 let (tx, rx) = sync_channel(10000);
443 for _ in 0..10000 {
444 tx.send(()).unwrap();
445 }
446 for _ in 0..10000 {
447 rx.recv().unwrap();
448 }
449 }
450
451 #[test]
shared_chan_stress()452 fn shared_chan_stress() {
453 let (tx, rx) = sync_channel(0);
454 let total = stress_factor() + 100;
455 for _ in 0..total {
456 let tx = tx.clone();
457 thread::spawn(move || {
458 tx.send(()).unwrap();
459 });
460 }
461
462 for _ in 0..total {
463 rx.recv().unwrap();
464 }
465 }
466
467 #[test]
test_nested_recv_iter()468 fn test_nested_recv_iter() {
469 let (tx, rx) = sync_channel::<i32>(0);
470 let (total_tx, total_rx) = sync_channel::<i32>(0);
471
472 let _t = thread::spawn(move || {
473 let mut acc = 0;
474 for x in rx.iter() {
475 acc += x;
476 }
477 total_tx.send(acc).unwrap();
478 });
479
480 tx.send(3).unwrap();
481 tx.send(1).unwrap();
482 tx.send(2).unwrap();
483 drop(tx);
484 assert_eq!(total_rx.recv().unwrap(), 6);
485 }
486
487 #[test]
test_recv_iter_break()488 fn test_recv_iter_break() {
489 let (tx, rx) = sync_channel::<i32>(0);
490 let (count_tx, count_rx) = sync_channel(0);
491
492 let _t = thread::spawn(move || {
493 let mut count = 0;
494 for x in rx.iter() {
495 if count >= 3 {
496 break;
497 } else {
498 count += x;
499 }
500 }
501 count_tx.send(count).unwrap();
502 });
503
504 tx.send(2).unwrap();
505 tx.send(2).unwrap();
506 tx.send(2).unwrap();
507 let _ = tx.try_send(2);
508 drop(tx);
509 assert_eq!(count_rx.recv().unwrap(), 4);
510 }
511
512 #[test]
try_recv_states()513 fn try_recv_states() {
514 let (tx1, rx1) = sync_channel::<i32>(1);
515 let (tx2, rx2) = sync_channel::<()>(1);
516 let (tx3, rx3) = sync_channel::<()>(1);
517 let _t = thread::spawn(move || {
518 rx2.recv().unwrap();
519 tx1.send(1).unwrap();
520 tx3.send(()).unwrap();
521 rx2.recv().unwrap();
522 drop(tx1);
523 tx3.send(()).unwrap();
524 });
525
526 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
527 tx2.send(()).unwrap();
528 rx3.recv().unwrap();
529 assert_eq!(rx1.try_recv(), Ok(1));
530 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
531 tx2.send(()).unwrap();
532 rx3.recv().unwrap();
533 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
534 }
535
536 // This bug used to end up in a livelock inside of the Receiver destructor
537 // because the internal state of the Shared packet was corrupted
538 #[test]
destroy_upgraded_shared_port_when_sender_still_active()539 fn destroy_upgraded_shared_port_when_sender_still_active() {
540 let (tx, rx) = sync_channel::<()>(0);
541 let (tx2, rx2) = sync_channel::<()>(0);
542 let _t = thread::spawn(move || {
543 rx.recv().unwrap(); // wait on a oneshot
544 drop(rx); // destroy a shared
545 tx2.send(()).unwrap();
546 });
547 // make sure the other thread has gone to sleep
548 for _ in 0..5000 {
549 thread::yield_now();
550 }
551
552 // upgrade to a shared chan and send a message
553 let t = tx.clone();
554 drop(tx);
555 t.send(()).unwrap();
556
557 // wait for the child thread to exit before we exit
558 rx2.recv().unwrap();
559 }
560
561 #[test]
send1()562 fn send1() {
563 let (tx, rx) = sync_channel::<i32>(0);
564 let _t = thread::spawn(move || {
565 rx.recv().unwrap();
566 });
567 assert_eq!(tx.send(1), Ok(()));
568 }
569
570 #[test]
send2()571 fn send2() {
572 let (tx, rx) = sync_channel::<i32>(0);
573 let _t = thread::spawn(move || {
574 drop(rx);
575 });
576 assert!(tx.send(1).is_err());
577 }
578
579 #[test]
send3()580 fn send3() {
581 let (tx, rx) = sync_channel::<i32>(1);
582 assert_eq!(tx.send(1), Ok(()));
583 let _t = thread::spawn(move || {
584 drop(rx);
585 });
586 assert!(tx.send(1).is_err());
587 }
588
589 #[test]
send4()590 fn send4() {
591 let (tx, rx) = sync_channel::<i32>(0);
592 let tx2 = tx.clone();
593 let (done, donerx) = channel();
594 let done2 = done.clone();
595 let _t = thread::spawn(move || {
596 assert!(tx.send(1).is_err());
597 done.send(()).unwrap();
598 });
599 let _t = thread::spawn(move || {
600 assert!(tx2.send(2).is_err());
601 done2.send(()).unwrap();
602 });
603 drop(rx);
604 donerx.recv().unwrap();
605 donerx.recv().unwrap();
606 }
607
608 #[test]
try_send1()609 fn try_send1() {
610 let (tx, _rx) = sync_channel::<i32>(0);
611 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
612 }
613
614 #[test]
try_send2()615 fn try_send2() {
616 let (tx, _rx) = sync_channel::<i32>(1);
617 assert_eq!(tx.try_send(1), Ok(()));
618 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
619 }
620
621 #[test]
try_send3()622 fn try_send3() {
623 let (tx, rx) = sync_channel::<i32>(1);
624 assert_eq!(tx.try_send(1), Ok(()));
625 drop(rx);
626 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
627 }
628
629 #[test]
issue_15761()630 fn issue_15761() {
631 fn repro() {
632 let (tx1, rx1) = sync_channel::<()>(3);
633 let (tx2, rx2) = sync_channel::<()>(3);
634
635 let _t = thread::spawn(move || {
636 rx1.recv().unwrap();
637 tx2.try_send(()).unwrap();
638 });
639
640 tx1.try_send(()).unwrap();
641 rx2.recv().unwrap();
642 }
643
644 for _ in 0..100 {
645 repro()
646 }
647 }
648