1 extern crate futures;
2 extern crate rand;
3 extern crate tokio_executor;
4 extern crate tokio_timer;
5 
6 use tokio_executor::park::{Park, Unpark, UnparkThread};
7 use tokio_timer::*;
8 
9 use futures::stream::FuturesUnordered;
10 use futures::{Future, Stream};
11 use rand::Rng;
12 
13 use std::cmp;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering::SeqCst;
16 use std::sync::{Arc, Barrier};
17 use std::thread;
18 use std::time::{Duration, Instant};
19 
20 struct Signal {
21     rem: AtomicUsize,
22     unpark: UnparkThread,
23 }
24 
25 #[test]
hammer_complete()26 fn hammer_complete() {
27     const ITERS: usize = 5;
28     const THREADS: usize = 4;
29     const PER_THREAD: usize = 40;
30     const MIN_DELAY: u64 = 1;
31     const MAX_DELAY: u64 = 5_000;
32 
33     for _ in 0..ITERS {
34         let mut timer = Timer::default();
35         let handle = timer.handle();
36         let barrier = Arc::new(Barrier::new(THREADS));
37 
38         let done = Arc::new(Signal {
39             rem: AtomicUsize::new(THREADS),
40             unpark: timer.get_park().unpark(),
41         });
42 
43         for _ in 0..THREADS {
44             let handle = handle.clone();
45             let barrier = barrier.clone();
46             let done = done.clone();
47 
48             thread::spawn(move || {
49                 let mut exec = FuturesUnordered::new();
50                 let mut rng = rand::thread_rng();
51 
52                 barrier.wait();
53 
54                 for _ in 0..PER_THREAD {
55                     let deadline =
56                         Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
57 
58                     exec.push({
59                         handle.delay(deadline).and_then(move |_| {
60                             let now = Instant::now();
61                             assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
62                             Ok(())
63                         })
64                     });
65                 }
66 
67                 // Run the logic
68                 exec.for_each(|_| Ok(())).wait().unwrap();
69 
70                 if 1 == done.rem.fetch_sub(1, SeqCst) {
71                     done.unpark.unpark();
72                 }
73             });
74         }
75 
76         while done.rem.load(SeqCst) > 0 {
77             timer.turn(None).unwrap();
78         }
79     }
80 }
81 
82 #[test]
hammer_cancel()83 fn hammer_cancel() {
84     const ITERS: usize = 5;
85     const THREADS: usize = 4;
86     const PER_THREAD: usize = 40;
87     const MIN_DELAY: u64 = 1;
88     const MAX_DELAY: u64 = 5_000;
89 
90     for _ in 0..ITERS {
91         let mut timer = Timer::default();
92         let handle = timer.handle();
93         let barrier = Arc::new(Barrier::new(THREADS));
94 
95         let done = Arc::new(Signal {
96             rem: AtomicUsize::new(THREADS),
97             unpark: timer.get_park().unpark(),
98         });
99 
100         for _ in 0..THREADS {
101             let handle = handle.clone();
102             let barrier = barrier.clone();
103             let done = done.clone();
104 
105             thread::spawn(move || {
106                 let mut exec = FuturesUnordered::new();
107                 let mut rng = rand::thread_rng();
108 
109                 barrier.wait();
110 
111                 for _ in 0..PER_THREAD {
112                     let deadline1 =
113                         Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
114 
115                     let deadline2 =
116                         Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
117 
118                     let deadline = cmp::min(deadline1, deadline2);
119 
120                     let delay = handle.delay(deadline1);
121                     let join = handle.timeout(delay, deadline2);
122 
123                     exec.push({
124                         join.and_then(move |_| {
125                             let now = Instant::now();
126                             assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
127                             Ok(())
128                         })
129                     });
130                 }
131 
132                 // Run the logic
133                 exec.or_else(|e| {
134                     assert!(e.is_elapsed());
135                     Ok::<_, ()>(())
136                 })
137                 .for_each(|_| Ok(()))
138                 .wait()
139                 .unwrap();
140 
141                 if 1 == done.rem.fetch_sub(1, SeqCst) {
142                     done.unpark.unpark();
143                 }
144             });
145         }
146 
147         while done.rem.load(SeqCst) > 0 {
148             timer.turn(None).unwrap();
149         }
150     }
151 }
152 
153 #[test]
hammer_reset()154 fn hammer_reset() {
155     const ITERS: usize = 5;
156     const THREADS: usize = 4;
157     const PER_THREAD: usize = 40;
158     const MIN_DELAY: u64 = 1;
159     const MAX_DELAY: u64 = 250;
160 
161     for _ in 0..ITERS {
162         let mut timer = Timer::default();
163         let handle = timer.handle();
164         let barrier = Arc::new(Barrier::new(THREADS));
165 
166         let done = Arc::new(Signal {
167             rem: AtomicUsize::new(THREADS),
168             unpark: timer.get_park().unpark(),
169         });
170 
171         for _ in 0..THREADS {
172             let handle = handle.clone();
173             let barrier = barrier.clone();
174             let done = done.clone();
175 
176             thread::spawn(move || {
177                 let mut exec = FuturesUnordered::new();
178                 let mut rng = rand::thread_rng();
179 
180                 barrier.wait();
181 
182                 for _ in 0..PER_THREAD {
183                     let deadline1 =
184                         Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
185 
186                     let deadline2 =
187                         deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
188 
189                     let deadline3 =
190                         deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
191 
192                     exec.push({
193                         handle
194                             .delay(deadline1)
195                             // Select over a second delay
196                             .select2(handle.delay(deadline2))
197                             .map_err(|e| panic!("boom; err={:?}", e))
198                             .and_then(move |res| {
199                                 use futures::future::Either::*;
200 
201                                 let now = Instant::now();
202                                 assert!(
203                                     now >= deadline1,
204                                     "deadline greater by {:?}",
205                                     deadline1 - now
206                                 );
207 
208                                 let mut other = match res {
209                                     A((_, other)) => other,
210                                     B((_, other)) => other,
211                                 };
212 
213                                 other.reset(deadline3);
214                                 other
215                             })
216                             .and_then(move |_| {
217                                 let now = Instant::now();
218                                 assert!(
219                                     now >= deadline3,
220                                     "deadline greater by {:?}",
221                                     deadline3 - now
222                                 );
223                                 Ok(())
224                             })
225                     });
226                 }
227 
228                 // Run the logic
229                 exec.for_each(|_| Ok(())).wait().unwrap();
230 
231                 if 1 == done.rem.fetch_sub(1, SeqCst) {
232                     done.unpark.unpark();
233                 }
234             });
235         }
236 
237         while done.rem.load(SeqCst) > 0 {
238             timer.turn(None).unwrap();
239         }
240     }
241 }
242