1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::{block_on, block_on_stream};
3 use futures::sink::SinkExt;
4 use futures::stream::StreamExt;
5 use std::sync::mpsc as std_mpsc;
6 use std::thread;
7 
8 #[test]
9 #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
works()10 fn works() {
11     const N: usize = 4;
12 
13     let (mut tx, rx) = mpsc::channel(1);
14 
15     let (tx2, rx2) = std_mpsc::channel();
16     let (tx3, rx3) = std_mpsc::channel();
17     let t1 = thread::spawn(move || {
18         for _ in 0..=N {
19             let (mytx, myrx) = oneshot::channel();
20             block_on(tx.send(myrx)).unwrap();
21             tx3.send(mytx).unwrap();
22         }
23         rx2.recv().unwrap();
24         for _ in 0..N {
25             let (mytx, myrx) = oneshot::channel();
26             block_on(tx.send(myrx)).unwrap();
27             tx3.send(mytx).unwrap();
28         }
29     });
30 
31     let (tx4, rx4) = std_mpsc::channel();
32     let t2 = thread::spawn(move || {
33         for item in block_on_stream(rx.buffer_unordered(N)) {
34             tx4.send(item.unwrap()).unwrap();
35         }
36     });
37 
38     let o1 = rx3.recv().unwrap();
39     let o2 = rx3.recv().unwrap();
40     let o3 = rx3.recv().unwrap();
41     let o4 = rx3.recv().unwrap();
42     assert!(rx4.try_recv().is_err());
43 
44     o1.send(1).unwrap();
45     assert_eq!(rx4.recv(), Ok(1));
46     o3.send(3).unwrap();
47     assert_eq!(rx4.recv(), Ok(3));
48     tx2.send(()).unwrap();
49     o2.send(2).unwrap();
50     assert_eq!(rx4.recv(), Ok(2));
51     o4.send(4).unwrap();
52     assert_eq!(rx4.recv(), Ok(4));
53 
54     let o5 = rx3.recv().unwrap();
55     let o6 = rx3.recv().unwrap();
56     let o7 = rx3.recv().unwrap();
57     let o8 = rx3.recv().unwrap();
58     let o9 = rx3.recv().unwrap();
59 
60     o5.send(5).unwrap();
61     assert_eq!(rx4.recv(), Ok(5));
62     o8.send(8).unwrap();
63     assert_eq!(rx4.recv(), Ok(8));
64     o9.send(9).unwrap();
65     assert_eq!(rx4.recv(), Ok(9));
66     o7.send(7).unwrap();
67     assert_eq!(rx4.recv(), Ok(7));
68     o6.send(6).unwrap();
69     assert_eq!(rx4.recv(), Ok(6));
70 
71     t1.join().unwrap();
72     t2.join().unwrap();
73 }
74