1 use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
2 use mio::event::Evented;
3 use std::time::Duration;
4
5 #[test]
smoke()6 fn smoke() {
7 let poll = Poll::new().unwrap();
8 let mut events = Events::with_capacity(128);
9
10 let (r, set) = Registration::new2();
11 r.register(&poll, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
12
13 let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
14 assert_eq!(n, 0);
15
16 set.set_readiness(Ready::readable()).unwrap();
17
18 let n = poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
19 assert_eq!(n, 1);
20
21 assert_eq!(events.get(0).unwrap().token(), Token(0));
22 }
23
24 #[test]
set_readiness_before_register()25 fn set_readiness_before_register() {
26 use std::sync::{Arc, Barrier};
27 use std::thread;
28
29 let poll = Poll::new().unwrap();
30 let mut events = Events::with_capacity(128);
31
32 for _ in 0..5_000 {
33 let (r, set) = Registration::new2();
34
35 let b1 = Arc::new(Barrier::new(2));
36 let b2 = b1.clone();
37
38 let th = thread::spawn(move || {
39 // set readiness before register
40 set.set_readiness(Ready::readable()).unwrap();
41
42 // run into barrier so both can pass
43 b2.wait();
44 });
45
46 // wait for readiness
47 b1.wait();
48
49 // now register
50 poll.register(&r, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
51
52 loop {
53 let n = poll.poll(&mut events, None).unwrap();
54
55 if n == 0 {
56 continue;
57 }
58
59 assert_eq!(n, 1);
60 assert_eq!(events.get(0).unwrap().token(), Token(123));
61 break;
62 }
63
64 th.join().unwrap();
65 }
66 }
67
68 #[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
69 mod stress {
70 use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
71 use mio::event::Evented;
72 use std::time::Duration;
73
74 #[test]
single_threaded_poll()75 fn single_threaded_poll() {
76 use std::sync::Arc;
77 use std::sync::atomic::AtomicUsize;
78 use std::sync::atomic::Ordering::{Acquire, Release};
79 use std::thread;
80
81 const NUM_ATTEMPTS: usize = 30;
82 const NUM_ITERS: usize = 500;
83 const NUM_THREADS: usize = 4;
84 const NUM_REGISTRATIONS: usize = 128;
85
86 for _ in 0..NUM_ATTEMPTS {
87 let poll = Poll::new().unwrap();
88 let mut events = Events::with_capacity(NUM_REGISTRATIONS);
89
90 let registrations: Vec<_> = (0..NUM_REGISTRATIONS).map(|i| {
91 let (r, s) = Registration::new2();
92 r.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
93 (r, s)
94 }).collect();
95
96 let mut ready: Vec<_> = (0..NUM_REGISTRATIONS).map(|_| Ready::empty()).collect();
97
98 let remaining = Arc::new(AtomicUsize::new(NUM_THREADS));
99
100 for _ in 0..NUM_THREADS {
101 let remaining = remaining.clone();
102
103 let set_readiness: Vec<SetReadiness> =
104 registrations.iter().map(|r| r.1.clone()).collect();
105
106 thread::spawn(move || {
107 for _ in 0..NUM_ITERS {
108 for i in 0..NUM_REGISTRATIONS {
109 set_readiness[i].set_readiness(Ready::readable()).unwrap();
110 set_readiness[i].set_readiness(Ready::empty()).unwrap();
111 set_readiness[i].set_readiness(Ready::writable()).unwrap();
112 set_readiness[i].set_readiness(Ready::readable() | Ready::writable()).unwrap();
113 set_readiness[i].set_readiness(Ready::empty()).unwrap();
114 }
115 }
116
117 for i in 0..NUM_REGISTRATIONS {
118 set_readiness[i].set_readiness(Ready::readable()).unwrap();
119 }
120
121 remaining.fetch_sub(1, Release);
122 });
123 }
124
125 while remaining.load(Acquire) > 0 {
126 // Set interest
127 for (i, &(ref r, _)) in registrations.iter().enumerate() {
128 r.reregister(&poll, Token(i), Ready::writable(), PollOpt::edge()).unwrap();
129 }
130
131 poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
132
133 for event in &events {
134 ready[event.token().0] = event.readiness();
135 }
136
137 // Update registration
138 // Set interest
139 for (i, &(ref r, _)) in registrations.iter().enumerate() {
140 r.reregister(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
141 }
142 }
143
144 // Finall polls, repeat until readiness-queue empty
145 loop {
146 // Might not read all events from custom-event-queue at once, implementation dependend
147 poll.poll(&mut events, Some(Duration::from_millis(0))).unwrap();
148 if events.is_empty() {
149 // no more events in readiness queue pending
150 break;
151 }
152 for event in &events {
153 ready[event.token().0] = event.readiness();
154 }
155 }
156
157 // Everything should be flagged as readable
158 for ready in ready {
159 assert_eq!(ready, Ready::readable());
160 }
161 }
162 }
163
164 #[test]
multi_threaded_poll()165 fn multi_threaded_poll() {
166 use std::sync::{Arc, Barrier};
167 use std::sync::atomic::{AtomicUsize};
168 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
169 use std::thread;
170
171 const ENTRIES: usize = 10_000;
172 const PER_ENTRY: usize = 16;
173 const THREADS: usize = 4;
174 const NUM: usize = ENTRIES * PER_ENTRY;
175
176 struct Entry {
177 #[allow(dead_code)]
178 registration: Registration,
179 set_readiness: SetReadiness,
180 num: AtomicUsize,
181 }
182
183 impl Entry {
184 fn fire(&self) {
185 self.set_readiness.set_readiness(Ready::readable()).unwrap();
186 }
187 }
188
189 let poll = Arc::new(Poll::new().unwrap());
190 let mut entries = vec![];
191
192 // Create entries
193 for i in 0..ENTRIES {
194 let (registration, set_readiness) = Registration::new2();
195 registration.register(&poll, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
196
197 entries.push(Entry {
198 registration,
199 set_readiness,
200 num: AtomicUsize::new(0),
201 });
202 }
203
204 let total = Arc::new(AtomicUsize::new(0));
205 let entries = Arc::new(entries);
206 let barrier = Arc::new(Barrier::new(THREADS));
207
208 let mut threads = vec![];
209
210 for th in 0..THREADS {
211 let poll = poll.clone();
212 let total = total.clone();
213 let entries = entries.clone();
214 let barrier = barrier.clone();
215
216 threads.push(thread::spawn(move || {
217 let mut events = Events::with_capacity(128);
218
219 barrier.wait();
220
221 // Prime all the registrations
222 let mut i = th;
223 while i < ENTRIES {
224 entries[i].fire();
225 i += THREADS;
226 }
227
228 let mut n = 0;
229
230
231 while total.load(SeqCst) < NUM {
232 // A poll timeout is necessary here because there may be more
233 // than one threads blocked in `poll` when the final wakeup
234 // notification arrives (and only notifies one thread).
235 n += poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
236
237 let mut num_this_tick = 0;
238
239 for event in &events {
240 let e = &entries[event.token().0];
241
242 let mut num = e.num.load(Relaxed);
243
244 loop {
245 if num < PER_ENTRY {
246 let actual = e.num.compare_and_swap(num, num + 1, Relaxed);
247
248 if actual == num {
249 num_this_tick += 1;
250 e.fire();
251 break;
252 }
253
254 num = actual;
255 } else {
256 break;
257 }
258 }
259 }
260
261 total.fetch_add(num_this_tick, SeqCst);
262 }
263
264 n
265 }));
266 }
267
268 let _: Vec<_> = threads.into_iter()
269 .map(|th| th.join().unwrap())
270 .collect();
271
272 for entry in entries.iter() {
273 assert_eq!(PER_ENTRY, entry.num.load(Relaxed));
274 }
275 }
276
277 #[test]
with_small_events_collection()278 fn with_small_events_collection() {
279 const N: usize = 8;
280 const ITER: usize = 1_000;
281
282 use std::sync::{Arc, Barrier};
283 use std::sync::atomic::AtomicBool;
284 use std::sync::atomic::Ordering::{Acquire, Release};
285 use std::thread;
286
287 let poll = Poll::new().unwrap();
288 let mut registrations = vec![];
289
290 let barrier = Arc::new(Barrier::new(N + 1));
291 let done = Arc::new(AtomicBool::new(false));
292
293 for i in 0..N {
294 let (registration, set_readiness) = Registration::new2();
295 poll.register(®istration, Token(i), Ready::readable(), PollOpt::edge()).unwrap();
296
297 registrations.push(registration);
298
299 let barrier = barrier.clone();
300 let done = done.clone();
301
302 thread::spawn(move || {
303 barrier.wait();
304
305 while !done.load(Acquire) {
306 set_readiness.set_readiness(Ready::readable()).unwrap();
307 }
308
309 // Set one last time
310 set_readiness.set_readiness(Ready::readable()).unwrap();
311 });
312 }
313
314 let mut events = Events::with_capacity(4);
315
316 barrier.wait();
317
318 for _ in 0..ITER {
319 poll.poll(&mut events, None).unwrap();
320 }
321
322 done.store(true, Release);
323
324 let mut final_ready = vec![false; N];
325
326
327 for _ in 0..5 {
328 poll.poll(&mut events, None).unwrap();
329
330 for event in &events {
331 final_ready[event.token().0] = true;
332 }
333
334 if final_ready.iter().all(|v| *v) {
335 return;
336 }
337
338 thread::sleep(Duration::from_millis(10));
339 }
340
341 panic!("dead lock?");
342 }
343 }
344
345 #[test]
drop_registration_from_non_main_thread()346 fn drop_registration_from_non_main_thread() {
347 use std::thread;
348 use std::sync::mpsc::channel;
349
350 const THREADS: usize = 8;
351 const ITERS: usize = 50_000;
352
353 let poll = Poll::new().unwrap();
354 let mut events = Events::with_capacity(1024);
355 let mut senders = Vec::with_capacity(THREADS);
356 let mut token_index = 0;
357
358 // spawn threads, which will send messages to single receiver
359 for _ in 0..THREADS {
360 let (tx, rx) = channel::<(Registration, SetReadiness)>();
361 senders.push(tx);
362
363 thread::spawn(move || {
364 for (registration, set_readiness) in rx {
365 let _ = set_readiness.set_readiness(Ready::readable());
366 drop(registration);
367 drop(set_readiness);
368 }
369 });
370 }
371
372 let mut index: usize = 0;
373 for _ in 0..ITERS {
374 let (registration, set_readiness) = Registration::new2();
375 registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
376 let _ = senders[index].send((registration, set_readiness));
377
378 token_index += 1;
379 index += 1;
380 if index == THREADS {
381 index = 0;
382
383 let (registration, set_readiness) = Registration::new2();
384 registration.register(&poll, Token(token_index), Ready::readable(), PollOpt::edge()).unwrap();
385 let _ = set_readiness.set_readiness(Ready::readable());
386 drop(registration);
387 drop(set_readiness);
388 token_index += 1;
389
390 thread::park_timeout(Duration::from_millis(0));
391 let _ = poll.poll(&mut events, None).unwrap();
392 }
393 }
394 }
395