1 use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
2 use mio::event::Evented;
3 use std::time::Duration;
4 
5 #[test]
smoke()6 fn smoke() {
7     let poll = Poll::new().unwrap();
8     let mut events = Events::with_capacity(128);
9 
10     let (r, set) = Registration::new2();
11     r.register(&poll, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
12 
13     let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
14     assert_eq!(n, 0);
15 
16     set.set_readiness(Ready::readable()).unwrap();
17 
18     let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
19     assert_eq!(n, 1);
20 
21     assert_eq!(events.get(0).unwrap().token(), Token(0));
22 }
23 
24 #[test]
set_readiness_before_register()25 fn set_readiness_before_register() {
26     use std::sync::{Arc, Barrier};
27     use std::thread;
28 
29     let poll = Poll::new().unwrap();
30     let mut events = Events::with_capacity(128);
31 
32     for _ in 0..5_000 {
33         let (r, set) = Registration::new2();
34 
35         let b1 = Arc::new(Barrier::new(2));
36         let b2 = b1.clone();
37 
38         let th = thread::spawn(move || {
39             // set readiness before register
40             set.set_readiness(Ready::readable()).unwrap();
41 
42             // run into barrier so both can pass
43             b2.wait();
44         });
45 
46         // wait for readiness
47         b1.wait();
48 
49         // now register
50         poll.register(&r, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
51 
52         loop {
53             let n = poll.poll(&mut events, None).unwrap();
54 
55             if n == 0 {
56                 continue;
57             }
58 
59             assert_eq!(n, 1);
60             assert_eq!(events.get(0).unwrap().token(), Token(123));
61             break;
62         }
63 
64         th.join().unwrap();
65     }
66 }
67 
68 #[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
69 mod stress {
70     use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
71     use mio::event::Evented;
72     use std::time::Duration;
73 
74     #[test]
single_threaded_poll()75     fn single_threaded_poll() {
76         use std::sync::Arc;
77         use std::sync::atomic::AtomicUsize;
78         use std::sync::atomic::Ordering::{Acquire, Release};
79         use std::thread;
80 
81         const NUM_ATTEMPTS: usize = 30;
82         const NUM_ITERS: usize = 500;
83         const NUM_THREADS: usize = 4;
84         const NUM_REGISTRATIONS: usize = 128;
85 
86         for _ in 0..NUM_ATTEMPTS {
87             let poll = Poll::new().unwrap();
88             let mut events = Events::with_capacity(NUM_REGISTRATIONS);
89 
90             let registrations: Vec<_> = (0..NUM_REGISTRATIONS).map(|i| {
91                 let (r, s) = Registration::new2();
92                 r.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
93                 (r, s)
94             }).collect();
95 
96             let mut ready: Vec<_> = (0..NUM_REGISTRATIONS).map(|_| Ready::empty()).collect();
97 
98             let remaining = Arc::new(AtomicUsize::new(NUM_THREADS));
99 
100             for _ in 0..NUM_THREADS {
101                 let remaining = remaining.clone();
102 
103                 let set_readiness: Vec<SetReadiness> =
104                     registrations.iter().map(|r| r.1.clone()).collect();
105 
106                 thread::spawn(move || {
107                     for _ in 0..NUM_ITERS {
108                         for i in 0..NUM_REGISTRATIONS {
109                             set_readiness[i].set_readiness(Ready::readable()).unwrap();
110                             set_readiness[i].set_readiness(Ready::empty()).unwrap();
111                             set_readiness[i].set_readiness(Ready::writable()).unwrap();
112                             set_readiness[i].set_readiness(Ready::readable() | Ready::writable()).unwrap();
113                             set_readiness[i].set_readiness(Ready::empty()).unwrap();
114                         }
115                     }
116 
117                     for i in 0..NUM_REGISTRATIONS {
118                         set_readiness[i].set_readiness(Ready::readable()).unwrap();
119                     }
120 
121                     remaining.fetch_sub(1, Release);
122                 });
123             }
124 
125             while remaining.load(Acquire) > 0 {
126                 // Set interest
127                 for (i, &(ref r, _)) in registrations.iter().enumerate() {
128                     r.reregister(&poll, Token(i), Ready::writable(), PollOpt::edge()).unwrap();
129                 }
130 
131                 poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
132 
133                 for event in &events {
134                     ready[event.token().0] = event.readiness();
135                 }
136 
137                 // Update registration
138                 // Set interest
139                 for (i, &(ref r, _)) in registrations.iter().enumerate() {
140                     r.reregister(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
141                 }
142             }
143 
144             // Finall polls, repeat until readiness-queue empty
145             loop {
146                 // Might not read all events from custom-event-queue at once, implementation dependend
147                 poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
148                 if events.is_empty() {
149                     // no more events in readiness queue pending
150                     break;
151                 }
152                 for event in &events {
153                     ready[event.token().0] = event.readiness();
154                 }
155             }
156 
157             // Everything should be flagged as readable
158             for ready in ready {
159                 assert_eq!(ready, Ready::readable());
160             }
161         }
162     }
163 
164     #[test]
multi_threaded_poll()165     fn multi_threaded_poll() {
166         use std::sync::{Arc, Barrier};
167         use std::sync::atomic::{AtomicUsize};
168         use std::sync::atomic::Ordering::{Relaxed, SeqCst};
169         use std::thread;
170 
171         const ENTRIES: usize = 10_000;
172         const PER_ENTRY: usize = 16;
173         const THREADS: usize = 4;
174         const NUM: usize = ENTRIES * PER_ENTRY;
175 
176         struct Entry {
177             #[allow(dead_code)]
178             registration: Registration,
179             set_readiness: SetReadiness,
180             num: AtomicUsize,
181         }
182 
183         impl Entry {
184             fn fire(&self) {
185                 self.set_readiness.set_readiness(Ready::readable()).unwrap();
186             }
187         }
188 
189         let poll = Arc::new(Poll::new().unwrap());
190         let mut entries = vec![];
191 
192         // Create entries
193         for i in 0..ENTRIES {
194             let (registration, set_readiness) = Registration::new2();
195             registration.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
196 
197             entries.push(Entry {
198                 registration,
199                 set_readiness,
200                 num: AtomicUsize::new(0),
201             });
202         }
203 
204         let total = Arc::new(AtomicUsize::new(0));
205         let entries = Arc::new(entries);
206         let barrier = Arc::new(Barrier::new(THREADS));
207 
208         let mut threads = vec![];
209 
210         for th in 0..THREADS {
211             let poll = poll.clone();
212             let total = total.clone();
213             let entries = entries.clone();
214             let barrier = barrier.clone();
215 
216             threads.push(thread::spawn(move || {
217                 let mut events = Events::with_capacity(128);
218 
219                 barrier.wait();
220 
221                 // Prime all the registrations
222                 let mut i = th;
223                 while i < ENTRIES {
224                     entries[i].fire();
225                     i += THREADS;
226                 }
227 
228                 let mut n = 0;
229 
230 
231                 while total.load(SeqCst) < NUM {
232                     // A poll timeout is necessary here because there may be more
233                     // than one threads blocked in `poll` when the final wakeup
234                     // notification arrives (and only notifies one thread).
235                     n += poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
236 
237                     let mut num_this_tick = 0;
238 
239                     for event in &events {
240                         let e = &entries[event.token().0];
241 
242                         let mut num = e.num.load(Relaxed);
243 
244                         loop {
245                             if num < PER_ENTRY {
246                                 let actual = e.num.compare_and_swap(num, num + 1, Relaxed);
247 
248                                 if actual == num {
249                                     num_this_tick += 1;
250                                     e.fire();
251                                     break;
252                                 }
253 
254                                 num = actual;
255                             } else {
256                                 break;
257                             }
258                         }
259                     }
260 
261                     total.fetch_add(num_this_tick, SeqCst);
262                 }
263 
264                 n
265             }));
266         }
267 
268         let _: Vec<_> = threads.into_iter()
269             .map(|th| th.join().unwrap())
270             .collect();
271 
272         for entry in entries.iter() {
273             assert_eq!(PER_ENTRY, entry.num.load(Relaxed));
274         }
275     }
276 
277     #[test]
with_small_events_collection()278     fn with_small_events_collection() {
279         const N: usize = 8;
280         const ITER: usize = 1_000;
281 
282         use std::sync::{Arc, Barrier};
283         use std::sync::atomic::AtomicBool;
284         use std::sync::atomic::Ordering::{Acquire, Release};
285         use std::thread;
286 
287         let poll = Poll::new().unwrap();
288         let mut registrations = vec![];
289 
290         let barrier = Arc::new(Barrier::new(N + 1));
291         let done = Arc::new(AtomicBool::new(false));
292 
293         for i in 0..N {
294             let (registration, set_readiness) = Registration::new2();
295             poll.register(&registration, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
296 
297             registrations.push(registration);
298 
299             let barrier = barrier.clone();
300             let done = done.clone();
301 
302             thread::spawn(move || {
303                 barrier.wait();
304 
305                 while !done.load(Acquire) {
306                     set_readiness.set_readiness(Ready::readable()).unwrap();
307                 }
308 
309                 // Set one last time
310                 set_readiness.set_readiness(Ready::readable()).unwrap();
311             });
312         }
313 
314         let mut events = Events::with_capacity(4);
315 
316         barrier.wait();
317 
318         for _ in 0..ITER {
319             poll.poll(&mut events, None).unwrap();
320         }
321 
322         done.store(true, Release);
323 
324         let mut final_ready = vec![false; N];
325 
326 
327         for _ in 0..5 {
328             poll.poll(&mut events, None).unwrap();
329 
330             for event in &events {
331                 final_ready[event.token().0] = true;
332             }
333 
334             if final_ready.iter().all(|v| *v) {
335                 return;
336             }
337 
338             thread::sleep(Duration::from_millis(10));
339         }
340 
341         panic!("dead lock?");
342     }
343 }
344 
345 #[test]
drop_registration_from_non_main_thread()346 fn drop_registration_from_non_main_thread() {
347     use std::thread;
348     use std::sync::mpsc::channel;
349 
350     const THREADS: usize = 8;
351     const ITERS: usize = 50_000;
352 
353     let poll = Poll::new().unwrap();
354     let mut events = Events::with_capacity(1024);
355     let mut senders = Vec::with_capacity(THREADS);
356     let mut token_index = 0;
357 
358     // spawn threads, which will send messages to single receiver
359     for _ in 0..THREADS {
360         let (tx, rx) = channel::<(Registration, SetReadiness)>();
361         senders.push(tx);
362 
363         thread::spawn(move || {
364             for (registration, set_readiness) in rx {
365                 let _ = set_readiness.set_readiness(Ready::readable());
366                 drop(registration);
367                 drop(set_readiness);
368             }
369         });
370     }
371 
372     let mut index: usize = 0;
373     for _ in 0..ITERS {
374         let (registration, set_readiness) = Registration::new2();
375         registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
376         let _ = senders[index].send((registration, set_readiness));
377 
378         token_index += 1;
379         index += 1;
380         if index == THREADS {
381             index = 0;
382 
383             let (registration, set_readiness) = Registration::new2();
384             registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
385             let _ = set_readiness.set_readiness(Ready::readable());
386             drop(registration);
387             drop(set_readiness);
388             token_index += 1;
389 
390             thread::park_timeout(Duration::from_millis(0));
391             let _ = poll.poll(&mut events, None).unwrap();
392         }
393     }
394 }
395