1 extern crate futures;
2 extern crate rand;
3 extern crate tokio_executor;
4 extern crate tokio_timer;
5
6 use tokio_executor::park::{Park, Unpark, UnparkThread};
7 use tokio_timer::*;
8
9 use futures::stream::FuturesUnordered;
10 use futures::{Future, Stream};
11 use rand::Rng;
12
13 use std::cmp;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::SeqCst;
16 use std::sync::{Arc, Barrier};
17 use std::thread;
18 use std::time::{Duration, Instant};
19
20 struct Signal {
21 rem: AtomicUsize,
22 unpark: UnparkThread,
23 }
24
25 #[test]
hammer_complete()26 fn hammer_complete() {
27 const ITERS: usize = 5;
28 const THREADS: usize = 4;
29 const PER_THREAD: usize = 40;
30 const MIN_DELAY: u64 = 1;
31 const MAX_DELAY: u64 = 5_000;
32
33 for _ in 0..ITERS {
34 let mut timer = Timer::default();
35 let handle = timer.handle();
36 let barrier = Arc::new(Barrier::new(THREADS));
37
38 let done = Arc::new(Signal {
39 rem: AtomicUsize::new(THREADS),
40 unpark: timer.get_park().unpark(),
41 });
42
43 for _ in 0..THREADS {
44 let handle = handle.clone();
45 let barrier = barrier.clone();
46 let done = done.clone();
47
48 thread::spawn(move || {
49 let mut exec = FuturesUnordered::new();
50 let mut rng = rand::thread_rng();
51
52 barrier.wait();
53
54 for _ in 0..PER_THREAD {
55 let deadline =
56 Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
57
58 exec.push({
59 handle.delay(deadline).and_then(move |_| {
60 let now = Instant::now();
61 assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
62 Ok(())
63 })
64 });
65 }
66
67 // Run the logic
68 exec.for_each(|_| Ok(())).wait().unwrap();
69
70 if 1 == done.rem.fetch_sub(1, SeqCst) {
71 done.unpark.unpark();
72 }
73 });
74 }
75
76 while done.rem.load(SeqCst) > 0 {
77 timer.turn(None).unwrap();
78 }
79 }
80 }
81
82 #[test]
hammer_cancel()83 fn hammer_cancel() {
84 const ITERS: usize = 5;
85 const THREADS: usize = 4;
86 const PER_THREAD: usize = 40;
87 const MIN_DELAY: u64 = 1;
88 const MAX_DELAY: u64 = 5_000;
89
90 for _ in 0..ITERS {
91 let mut timer = Timer::default();
92 let handle = timer.handle();
93 let barrier = Arc::new(Barrier::new(THREADS));
94
95 let done = Arc::new(Signal {
96 rem: AtomicUsize::new(THREADS),
97 unpark: timer.get_park().unpark(),
98 });
99
100 for _ in 0..THREADS {
101 let handle = handle.clone();
102 let barrier = barrier.clone();
103 let done = done.clone();
104
105 thread::spawn(move || {
106 let mut exec = FuturesUnordered::new();
107 let mut rng = rand::thread_rng();
108
109 barrier.wait();
110
111 for _ in 0..PER_THREAD {
112 let deadline1 =
113 Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
114
115 let deadline2 =
116 Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
117
118 let deadline = cmp::min(deadline1, deadline2);
119
120 let delay = handle.delay(deadline1);
121 let join = handle.timeout(delay, deadline2);
122
123 exec.push({
124 join.and_then(move |_| {
125 let now = Instant::now();
126 assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
127 Ok(())
128 })
129 });
130 }
131
132 // Run the logic
133 exec.or_else(|e| {
134 assert!(e.is_elapsed());
135 Ok::<_, ()>(())
136 })
137 .for_each(|_| Ok(()))
138 .wait()
139 .unwrap();
140
141 if 1 == done.rem.fetch_sub(1, SeqCst) {
142 done.unpark.unpark();
143 }
144 });
145 }
146
147 while done.rem.load(SeqCst) > 0 {
148 timer.turn(None).unwrap();
149 }
150 }
151 }
152
153 #[test]
hammer_reset()154 fn hammer_reset() {
155 const ITERS: usize = 5;
156 const THREADS: usize = 4;
157 const PER_THREAD: usize = 40;
158 const MIN_DELAY: u64 = 1;
159 const MAX_DELAY: u64 = 250;
160
161 for _ in 0..ITERS {
162 let mut timer = Timer::default();
163 let handle = timer.handle();
164 let barrier = Arc::new(Barrier::new(THREADS));
165
166 let done = Arc::new(Signal {
167 rem: AtomicUsize::new(THREADS),
168 unpark: timer.get_park().unpark(),
169 });
170
171 for _ in 0..THREADS {
172 let handle = handle.clone();
173 let barrier = barrier.clone();
174 let done = done.clone();
175
176 thread::spawn(move || {
177 let mut exec = FuturesUnordered::new();
178 let mut rng = rand::thread_rng();
179
180 barrier.wait();
181
182 for _ in 0..PER_THREAD {
183 let deadline1 =
184 Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
185
186 let deadline2 =
187 deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
188
189 let deadline3 =
190 deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
191
192 exec.push({
193 handle
194 .delay(deadline1)
195 // Select over a second delay
196 .select2(handle.delay(deadline2))
197 .map_err(|e| panic!("boom; err={:?}", e))
198 .and_then(move |res| {
199 use futures::future::Either::*;
200
201 let now = Instant::now();
202 assert!(
203 now >= deadline1,
204 "deadline greater by {:?}",
205 deadline1 - now
206 );
207
208 let mut other = match res {
209 A((_, other)) => other,
210 B((_, other)) => other,
211 };
212
213 other.reset(deadline3);
214 other
215 })
216 .and_then(move |_| {
217 let now = Instant::now();
218 assert!(
219 now >= deadline3,
220 "deadline greater by {:?}",
221 deadline3 - now
222 );
223 Ok(())
224 })
225 });
226 }
227
228 // Run the logic
229 exec.for_each(|_| Ok(())).wait().unwrap();
230
231 if 1 == done.rem.fetch_sub(1, SeqCst) {
232 done.unpark.unpark();
233 }
234 });
235 }
236
237 while done.rem.load(SeqCst) > 0 {
238 timer.turn(None).unwrap();
239 }
240 }
241 }
242