1 extern crate futures;
2
3 use std::panic::{self, AssertUnwindSafe};
4
5 use futures::prelude::*;
6 use futures::Async::*;
7 use futures::future;
8 use futures::stream::FuturesUnordered;
9 use futures::sync::oneshot;
10
11 trait AssertSendSync: Send + Sync {}
12 impl AssertSendSync for FuturesUnordered<()> {}
13
14 #[test]
basic_usage()15 fn basic_usage() {
16 future::lazy(move || {
17 let mut queue = FuturesUnordered::new();
18 let (tx1, rx1) = oneshot::channel();
19 let (tx2, rx2) = oneshot::channel();
20 let (tx3, rx3) = oneshot::channel();
21
22 queue.push(rx1);
23 queue.push(rx2);
24 queue.push(rx3);
25
26 assert!(!queue.poll().unwrap().is_ready());
27
28 tx2.send("hello").unwrap();
29
30 assert_eq!(Ready(Some("hello")), queue.poll().unwrap());
31 assert!(!queue.poll().unwrap().is_ready());
32
33 tx1.send("world").unwrap();
34 tx3.send("world2").unwrap();
35
36 assert_eq!(Ready(Some("world")), queue.poll().unwrap());
37 assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
38 assert_eq!(Ready(None), queue.poll().unwrap());
39
40 Ok::<_, ()>(())
41 }).wait().unwrap();
42 }
43
44 #[test]
resolving_errors()45 fn resolving_errors() {
46 future::lazy(move || {
47 let mut queue = FuturesUnordered::new();
48 let (tx1, rx1) = oneshot::channel();
49 let (tx2, rx2) = oneshot::channel();
50 let (tx3, rx3) = oneshot::channel();
51
52 queue.push(rx1);
53 queue.push(rx2);
54 queue.push(rx3);
55
56 assert!(!queue.poll().unwrap().is_ready());
57
58 drop(tx2);
59
60 assert!(queue.poll().is_err());
61 assert!(!queue.poll().unwrap().is_ready());
62
63 drop(tx1);
64 tx3.send("world2").unwrap();
65
66 assert!(queue.poll().is_err());
67 assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
68 assert_eq!(Ready(None), queue.poll().unwrap());
69
70 Ok::<_, ()>(())
71 }).wait().unwrap();
72 }
73
74 #[test]
dropping_ready_queue()75 fn dropping_ready_queue() {
76 future::lazy(move || {
77 let mut queue = FuturesUnordered::new();
78 let (mut tx1, rx1) = oneshot::channel::<()>();
79 let (mut tx2, rx2) = oneshot::channel::<()>();
80 let (mut tx3, rx3) = oneshot::channel::<()>();
81
82 queue.push(rx1);
83 queue.push(rx2);
84 queue.push(rx3);
85
86 assert!(!tx1.poll_cancel().unwrap().is_ready());
87 assert!(!tx2.poll_cancel().unwrap().is_ready());
88 assert!(!tx3.poll_cancel().unwrap().is_ready());
89
90 drop(queue);
91
92 assert!(tx1.poll_cancel().unwrap().is_ready());
93 assert!(tx2.poll_cancel().unwrap().is_ready());
94 assert!(tx3.poll_cancel().unwrap().is_ready());
95
96 Ok::<_, ()>(())
97 }).wait().unwrap();
98 }
99
100 #[test]
stress()101 fn stress() {
102 const ITER: usize = 300;
103
104 use std::sync::{Arc, Barrier};
105 use std::thread;
106
107 for i in 0..ITER {
108 let n = (i % 10) + 1;
109
110 let mut queue = FuturesUnordered::new();
111
112 for _ in 0..5 {
113 let barrier = Arc::new(Barrier::new(n + 1));
114
115 for num in 0..n {
116 let barrier = barrier.clone();
117 let (tx, rx) = oneshot::channel();
118
119 queue.push(rx);
120
121 thread::spawn(move || {
122 barrier.wait();
123 tx.send(num).unwrap();
124 });
125 }
126
127 barrier.wait();
128
129 let mut sync = queue.wait();
130
131 let mut rx: Vec<_> = (&mut sync)
132 .take(n)
133 .map(|res| res.unwrap())
134 .collect();
135
136 assert_eq!(rx.len(), n);
137
138 rx.sort();
139
140 for num in 0..n {
141 assert_eq!(rx[num], num);
142 }
143
144 queue = sync.into_inner();
145 }
146 }
147 }
148
149 #[test]
panicking_future_dropped()150 fn panicking_future_dropped() {
151 future::lazy(move || {
152 let mut queue = FuturesUnordered::new();
153 queue.push(future::poll_fn(|| -> Poll<i32, i32> {
154 panic!()
155 }));
156
157 let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll()));
158 assert!(r.is_err());
159 assert!(queue.is_empty());
160 assert_eq!(Ready(None), queue.poll().unwrap());
161
162 Ok::<_, ()>(())
163 }).wait().unwrap();
164 }
165