1 extern crate futures;
2 
3 use std::panic::{self, AssertUnwindSafe};
4 
5 use futures::prelude::*;
6 use futures::Async::*;
7 use futures::future;
8 use futures::stream::FuturesUnordered;
9 use futures::sync::oneshot;
10 
11 trait AssertSendSync: Send + Sync {}
12 impl AssertSendSync for FuturesUnordered<()> {}
13 
14 #[test]
basic_usage()15 fn basic_usage() {
16     future::lazy(move || {
17         let mut queue = FuturesUnordered::new();
18         let (tx1, rx1) = oneshot::channel();
19         let (tx2, rx2) = oneshot::channel();
20         let (tx3, rx3) = oneshot::channel();
21 
22         queue.push(rx1);
23         queue.push(rx2);
24         queue.push(rx3);
25 
26         assert!(!queue.poll().unwrap().is_ready());
27 
28         tx2.send("hello").unwrap();
29 
30         assert_eq!(Ready(Some("hello")), queue.poll().unwrap());
31         assert!(!queue.poll().unwrap().is_ready());
32 
33         tx1.send("world").unwrap();
34         tx3.send("world2").unwrap();
35 
36         assert_eq!(Ready(Some("world")), queue.poll().unwrap());
37         assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
38         assert_eq!(Ready(None), queue.poll().unwrap());
39 
40         Ok::<_, ()>(())
41     }).wait().unwrap();
42 }
43 
44 #[test]
resolving_errors()45 fn resolving_errors() {
46     future::lazy(move || {
47         let mut queue = FuturesUnordered::new();
48         let (tx1, rx1) = oneshot::channel();
49         let (tx2, rx2) = oneshot::channel();
50         let (tx3, rx3) = oneshot::channel();
51 
52         queue.push(rx1);
53         queue.push(rx2);
54         queue.push(rx3);
55 
56         assert!(!queue.poll().unwrap().is_ready());
57 
58         drop(tx2);
59 
60         assert!(queue.poll().is_err());
61         assert!(!queue.poll().unwrap().is_ready());
62 
63         drop(tx1);
64         tx3.send("world2").unwrap();
65 
66         assert!(queue.poll().is_err());
67         assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
68         assert_eq!(Ready(None), queue.poll().unwrap());
69 
70         Ok::<_, ()>(())
71     }).wait().unwrap();
72 }
73 
74 #[test]
dropping_ready_queue()75 fn dropping_ready_queue() {
76     future::lazy(move || {
77         let mut queue = FuturesUnordered::new();
78         let (mut tx1, rx1) = oneshot::channel::<()>();
79         let (mut tx2, rx2) = oneshot::channel::<()>();
80         let (mut tx3, rx3) = oneshot::channel::<()>();
81 
82         queue.push(rx1);
83         queue.push(rx2);
84         queue.push(rx3);
85 
86         assert!(!tx1.poll_cancel().unwrap().is_ready());
87         assert!(!tx2.poll_cancel().unwrap().is_ready());
88         assert!(!tx3.poll_cancel().unwrap().is_ready());
89 
90         drop(queue);
91 
92         assert!(tx1.poll_cancel().unwrap().is_ready());
93         assert!(tx2.poll_cancel().unwrap().is_ready());
94         assert!(tx3.poll_cancel().unwrap().is_ready());
95 
96         Ok::<_, ()>(())
97     }).wait().unwrap();
98 }
99 
100 #[test]
stress()101 fn stress() {
102     const ITER: usize = 300;
103 
104     use std::sync::{Arc, Barrier};
105     use std::thread;
106 
107     for i in 0..ITER {
108         let n = (i % 10) + 1;
109 
110         let mut queue = FuturesUnordered::new();
111 
112         for _ in 0..5 {
113             let barrier = Arc::new(Barrier::new(n + 1));
114 
115             for num in 0..n {
116                 let barrier = barrier.clone();
117                 let (tx, rx) = oneshot::channel();
118 
119                 queue.push(rx);
120 
121                 thread::spawn(move || {
122                     barrier.wait();
123                     tx.send(num).unwrap();
124                 });
125             }
126 
127             barrier.wait();
128 
129             let mut sync = queue.wait();
130 
131             let mut rx: Vec<_> = (&mut sync)
132                 .take(n)
133                 .map(|res| res.unwrap())
134                 .collect();
135 
136             assert_eq!(rx.len(), n);
137 
138             rx.sort();
139 
140             for num in 0..n {
141                 assert_eq!(rx[num], num);
142             }
143 
144             queue = sync.into_inner();
145         }
146     }
147 }
148 
149 #[test]
panicking_future_dropped()150 fn panicking_future_dropped() {
151     future::lazy(move || {
152         let mut queue = FuturesUnordered::new();
153         queue.push(future::poll_fn(|| -> Poll<i32, i32> {
154             panic!()
155         }));
156 
157         let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll()));
158         assert!(r.is_err());
159         assert!(queue.is_empty());
160         assert_eq!(Ready(None), queue.poll().unwrap());
161 
162         Ok::<_, ()>(())
163     }).wait().unwrap();
164 }
165