1 use crate::expect_events;
2 use mio::event::Event;
3 use mio::{Events, Poll, PollOpt, Ready, Token};
4 use mio_extras::channel;
5 use std::sync::mpsc::TryRecvError;
6 use std::thread;
7 use std::time::Duration;
8
9 #[test]
test_poll_channel_edge()10 pub fn test_poll_channel_edge() {
11 let poll = Poll::new().unwrap();
12 let mut events = Events::with_capacity(1024);
13 let (tx, rx) = channel::channel();
14
15 poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
16 .unwrap();
17
18 // Wait, but nothing should happen
19 let num = poll
20 .poll(&mut events, Some(Duration::from_millis(300)))
21 .unwrap();
22 assert_eq!(0, num);
23
24 // Push the value
25 tx.send("hello").unwrap();
26
27 // Polling will contain the event
28 let num = poll
29 .poll(&mut events, Some(Duration::from_millis(300)))
30 .unwrap();
31 assert_eq!(1, num);
32
33 let event = events.iter().next().unwrap();
34 assert_eq!(event.token(), Token(123));
35 assert_eq!(event.readiness(), Ready::readable());
36
37 // Poll again and there should be no events
38 let num = poll
39 .poll(&mut events, Some(Duration::from_millis(300)))
40 .unwrap();
41 assert_eq!(0, num);
42
43 // Read the value
44 assert_eq!("hello", rx.try_recv().unwrap());
45
46 // Poll again, nothing
47 let num = poll
48 .poll(&mut events, Some(Duration::from_millis(300)))
49 .unwrap();
50 assert_eq!(0, num);
51
52 // Push a value
53 tx.send("goodbye").unwrap();
54
55 // Have an event
56 let num = poll
57 .poll(&mut events, Some(Duration::from_millis(300)))
58 .unwrap();
59 assert_eq!(1, num);
60
61 let event = events.iter().next().unwrap();
62 assert_eq!(event.token(), Token(123));
63 assert_eq!(event.readiness(), Ready::readable());
64
65 // Read the value
66 rx.try_recv().unwrap();
67
68 // Drop the sender half
69 drop(tx);
70
71 let num = poll
72 .poll(&mut events, Some(Duration::from_millis(300)))
73 .unwrap();
74 assert_eq!(1, num);
75
76 let event = events.iter().next().unwrap();
77 assert_eq!(event.token(), Token(123));
78 assert_eq!(event.readiness(), Ready::readable());
79
80 match rx.try_recv() {
81 Err(TryRecvError::Disconnected) => {}
82 no => panic!("unexpected value {:?}", no),
83 }
84 }
85
86 #[test]
test_poll_channel_oneshot()87 pub fn test_poll_channel_oneshot() {
88 let poll = Poll::new().unwrap();
89 let mut events = Events::with_capacity(1024);
90 let (tx, rx) = channel::channel();
91
92 poll.register(
93 &rx,
94 Token(123),
95 Ready::readable(),
96 PollOpt::edge() | PollOpt::oneshot(),
97 )
98 .unwrap();
99
100 // Wait, but nothing should happen
101 let num = poll
102 .poll(&mut events, Some(Duration::from_millis(300)))
103 .unwrap();
104 assert_eq!(0, num);
105
106 // Push the value
107 tx.send("hello").unwrap();
108
109 // Polling will contain the event
110 let num = poll
111 .poll(&mut events, Some(Duration::from_millis(300)))
112 .unwrap();
113 assert_eq!(1, num);
114
115 let event = events.iter().next().unwrap();
116 assert_eq!(event.token(), Token(123));
117 assert_eq!(event.readiness(), Ready::readable());
118
119 // Poll again and there should be no events
120 let num = poll
121 .poll(&mut events, Some(Duration::from_millis(300)))
122 .unwrap();
123 assert_eq!(0, num);
124
125 // Read the value
126 assert_eq!("hello", rx.try_recv().unwrap());
127
128 // Poll again, nothing
129 let num = poll
130 .poll(&mut events, Some(Duration::from_millis(300)))
131 .unwrap();
132 assert_eq!(0, num);
133
134 // Push a value
135 tx.send("goodbye").unwrap();
136
137 // Poll again, nothing
138 let num = poll
139 .poll(&mut events, Some(Duration::from_millis(300)))
140 .unwrap();
141 assert_eq!(0, num);
142
143 // Reregistering will re-trigger the notification
144 for _ in 0..3 {
145 poll.reregister(
146 &rx,
147 Token(123),
148 Ready::readable(),
149 PollOpt::edge() | PollOpt::oneshot(),
150 )
151 .unwrap();
152
153 // Have an event
154 let num = poll
155 .poll(&mut events, Some(Duration::from_millis(300)))
156 .unwrap();
157 assert_eq!(1, num);
158
159 let event = events.iter().next().unwrap();
160 assert_eq!(event.token(), Token(123));
161 assert_eq!(event.readiness(), Ready::readable());
162 }
163
164 // Get the value
165 assert_eq!("goodbye", rx.try_recv().unwrap());
166
167 poll.reregister(
168 &rx,
169 Token(123),
170 Ready::readable(),
171 PollOpt::edge() | PollOpt::oneshot(),
172 )
173 .unwrap();
174
175 // Have an event
176 let num = poll
177 .poll(&mut events, Some(Duration::from_millis(300)))
178 .unwrap();
179 assert_eq!(0, num);
180
181 poll.reregister(
182 &rx,
183 Token(123),
184 Ready::readable(),
185 PollOpt::edge() | PollOpt::oneshot(),
186 )
187 .unwrap();
188
189 // Have an event
190 let num = poll
191 .poll(&mut events, Some(Duration::from_millis(300)))
192 .unwrap();
193 assert_eq!(0, num);
194 }
195
196 #[test]
test_poll_channel_level()197 pub fn test_poll_channel_level() {
198 let poll = Poll::new().unwrap();
199 let mut events = Events::with_capacity(1024);
200 let (tx, rx) = channel::channel();
201
202 poll.register(&rx, Token(123), Ready::readable(), PollOpt::level())
203 .unwrap();
204
205 // Wait, but nothing should happen
206 let num = poll
207 .poll(&mut events, Some(Duration::from_millis(300)))
208 .unwrap();
209 assert_eq!(0, num);
210
211 // Push the value
212 tx.send("hello").unwrap();
213
214 // Polling will contain the event
215 for i in 0..5 {
216 let num = poll
217 .poll(&mut events, Some(Duration::from_millis(300)))
218 .unwrap();
219 assert!(1 == num, "actually got {} on iteration {}", num, i);
220
221 let event = events.iter().next().unwrap();
222 assert_eq!(event.token(), Token(123));
223 assert_eq!(event.readiness(), Ready::readable());
224 }
225
226 // Read the value
227 assert_eq!("hello", rx.try_recv().unwrap());
228
229 // Wait, but nothing should happen
230 let num = poll
231 .poll(&mut events, Some(Duration::from_millis(300)))
232 .unwrap();
233 assert_eq!(0, num);
234 }
235
236 #[test]
test_poll_channel_writable()237 pub fn test_poll_channel_writable() {
238 let poll = Poll::new().unwrap();
239 let mut events = Events::with_capacity(1024);
240 let (tx, rx) = channel::channel();
241
242 poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge())
243 .unwrap();
244
245 // Wait, but nothing should happen
246 let num = poll
247 .poll(&mut events, Some(Duration::from_millis(300)))
248 .unwrap();
249 assert_eq!(0, num);
250
251 // Push the value
252 tx.send("hello").unwrap();
253
254 // Wait, but nothing should happen
255 let num = poll
256 .poll(&mut events, Some(Duration::from_millis(300)))
257 .unwrap();
258 assert_eq!(0, num);
259 }
260
261 #[test]
test_dropping_receive_before_poll()262 pub fn test_dropping_receive_before_poll() {
263 let poll = Poll::new().unwrap();
264 let mut events = Events::with_capacity(1024);
265 let (tx, rx) = channel::channel();
266
267 poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
268 .unwrap();
269
270 // Push the value
271 tx.send("hello").unwrap();
272
273 // Drop the receive end
274 drop(rx);
275
276 // Wait, but nothing should happen
277 let num = poll
278 .poll(&mut events, Some(Duration::from_millis(300)))
279 .unwrap();
280 assert_eq!(0, num);
281 }
282
283 #[test]
test_mixing_channel_with_socket()284 pub fn test_mixing_channel_with_socket() {
285 use mio::net::{TcpListener, TcpStream};
286
287 let poll = Poll::new().unwrap();
288 let mut events = Events::with_capacity(1024);
289 let (tx, rx) = channel::channel();
290
291 // Create the listener
292 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
293
294 // Register the listener with `Poll`
295 poll.register(&l, Token(0), Ready::readable(), PollOpt::edge())
296 .unwrap();
297 poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge())
298 .unwrap();
299
300 // Push a value onto the channel
301 tx.send("hello").unwrap();
302
303 // Connect a TCP socket
304 let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
305
306 // Register the socket
307 poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge())
308 .unwrap();
309
310 // Sleep a bit to ensure it arrives at dest
311 thread::sleep(Duration::from_millis(250));
312
313 expect_events(
314 &poll,
315 &mut events,
316 2,
317 vec![
318 Event::new(Ready::empty(), Token(0)),
319 Event::new(Ready::empty(), Token(1)),
320 ],
321 );
322 }
323
324 #[test]
test_sending_from_other_thread_while_polling()325 pub fn test_sending_from_other_thread_while_polling() {
326 const ITERATIONS: usize = 20;
327 const THREADS: usize = 5;
328
329 // Make sure to run multiple times
330 let poll = Poll::new().unwrap();
331 let mut events = Events::with_capacity(1024);
332
333 for _ in 0..ITERATIONS {
334 let (tx, rx) = channel::channel();
335 poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge())
336 .unwrap();
337
338 for _ in 0..THREADS {
339 let tx = tx.clone();
340
341 thread::spawn(move || {
342 thread::sleep(Duration::from_millis(50));
343 tx.send("ping").unwrap();
344 });
345 }
346
347 let mut recv = 0;
348
349 while recv < THREADS {
350 let num = poll.poll(&mut events, None).unwrap();
351
352 if num != 0 {
353 assert_eq!(1, num);
354 assert_eq!(events.iter().next().unwrap().token(), Token(0));
355
356 while let Ok(_) = rx.try_recv() {
357 recv += 1;
358 }
359 }
360 }
361 }
362 }
363