1 use crate::expect_events;
2 use mio::event::Event;
3 use mio::{Events, Poll, PollOpt, Ready, Token};
4 use mio_extras::channel;
5 use std::sync::mpsc::TryRecvError;
6 use std::thread;
7 use std::time::Duration;
8 
9 #[test]
test_poll_channel_edge()10 pub fn test_poll_channel_edge() {
11     let poll = Poll::new().unwrap();
12     let mut events = Events::with_capacity(1024);
13     let (tx, rx) = channel::channel();
14 
15     poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
16         .unwrap();
17 
18     // Wait, but nothing should happen
19     let num = poll
20         .poll(&mut events, Some(Duration::from_millis(300)))
21         .unwrap();
22     assert_eq!(0, num);
23 
24     // Push the value
25     tx.send("hello").unwrap();
26 
27     // Polling will contain the event
28     let num = poll
29         .poll(&mut events, Some(Duration::from_millis(300)))
30         .unwrap();
31     assert_eq!(1, num);
32 
33     let event = events.iter().next().unwrap();
34     assert_eq!(event.token(), Token(123));
35     assert_eq!(event.readiness(), Ready::readable());
36 
37     // Poll again and there should be no events
38     let num = poll
39         .poll(&mut events, Some(Duration::from_millis(300)))
40         .unwrap();
41     assert_eq!(0, num);
42 
43     // Read the value
44     assert_eq!("hello", rx.try_recv().unwrap());
45 
46     // Poll again, nothing
47     let num = poll
48         .poll(&mut events, Some(Duration::from_millis(300)))
49         .unwrap();
50     assert_eq!(0, num);
51 
52     // Push a value
53     tx.send("goodbye").unwrap();
54 
55     // Have an event
56     let num = poll
57         .poll(&mut events, Some(Duration::from_millis(300)))
58         .unwrap();
59     assert_eq!(1, num);
60 
61     let event = events.iter().next().unwrap();
62     assert_eq!(event.token(), Token(123));
63     assert_eq!(event.readiness(), Ready::readable());
64 
65     // Read the value
66     rx.try_recv().unwrap();
67 
68     // Drop the sender half
69     drop(tx);
70 
71     let num = poll
72         .poll(&mut events, Some(Duration::from_millis(300)))
73         .unwrap();
74     assert_eq!(1, num);
75 
76     let event = events.iter().next().unwrap();
77     assert_eq!(event.token(), Token(123));
78     assert_eq!(event.readiness(), Ready::readable());
79 
80     match rx.try_recv() {
81         Err(TryRecvError::Disconnected) => {}
82         no => panic!("unexpected value {:?}", no),
83     }
84 }
85 
86 #[test]
test_poll_channel_oneshot()87 pub fn test_poll_channel_oneshot() {
88     let poll = Poll::new().unwrap();
89     let mut events = Events::with_capacity(1024);
90     let (tx, rx) = channel::channel();
91 
92     poll.register(
93         &rx,
94         Token(123),
95         Ready::readable(),
96         PollOpt::edge() | PollOpt::oneshot(),
97     )
98     .unwrap();
99 
100     // Wait, but nothing should happen
101     let num = poll
102         .poll(&mut events, Some(Duration::from_millis(300)))
103         .unwrap();
104     assert_eq!(0, num);
105 
106     // Push the value
107     tx.send("hello").unwrap();
108 
109     // Polling will contain the event
110     let num = poll
111         .poll(&mut events, Some(Duration::from_millis(300)))
112         .unwrap();
113     assert_eq!(1, num);
114 
115     let event = events.iter().next().unwrap();
116     assert_eq!(event.token(), Token(123));
117     assert_eq!(event.readiness(), Ready::readable());
118 
119     // Poll again and there should be no events
120     let num = poll
121         .poll(&mut events, Some(Duration::from_millis(300)))
122         .unwrap();
123     assert_eq!(0, num);
124 
125     // Read the value
126     assert_eq!("hello", rx.try_recv().unwrap());
127 
128     // Poll again, nothing
129     let num = poll
130         .poll(&mut events, Some(Duration::from_millis(300)))
131         .unwrap();
132     assert_eq!(0, num);
133 
134     // Push a value
135     tx.send("goodbye").unwrap();
136 
137     // Poll again, nothing
138     let num = poll
139         .poll(&mut events, Some(Duration::from_millis(300)))
140         .unwrap();
141     assert_eq!(0, num);
142 
143     // Reregistering will re-trigger the notification
144     for _ in 0..3 {
145         poll.reregister(
146             &rx,
147             Token(123),
148             Ready::readable(),
149             PollOpt::edge() | PollOpt::oneshot(),
150         )
151         .unwrap();
152 
153         // Have an event
154         let num = poll
155             .poll(&mut events, Some(Duration::from_millis(300)))
156             .unwrap();
157         assert_eq!(1, num);
158 
159         let event = events.iter().next().unwrap();
160         assert_eq!(event.token(), Token(123));
161         assert_eq!(event.readiness(), Ready::readable());
162     }
163 
164     // Get the value
165     assert_eq!("goodbye", rx.try_recv().unwrap());
166 
167     poll.reregister(
168         &rx,
169         Token(123),
170         Ready::readable(),
171         PollOpt::edge() | PollOpt::oneshot(),
172     )
173     .unwrap();
174 
175     // Have an event
176     let num = poll
177         .poll(&mut events, Some(Duration::from_millis(300)))
178         .unwrap();
179     assert_eq!(0, num);
180 
181     poll.reregister(
182         &rx,
183         Token(123),
184         Ready::readable(),
185         PollOpt::edge() | PollOpt::oneshot(),
186     )
187     .unwrap();
188 
189     // Have an event
190     let num = poll
191         .poll(&mut events, Some(Duration::from_millis(300)))
192         .unwrap();
193     assert_eq!(0, num);
194 }
195 
196 #[test]
test_poll_channel_level()197 pub fn test_poll_channel_level() {
198     let poll = Poll::new().unwrap();
199     let mut events = Events::with_capacity(1024);
200     let (tx, rx) = channel::channel();
201 
202     poll.register(&rx, Token(123), Ready::readable(), PollOpt::level())
203         .unwrap();
204 
205     // Wait, but nothing should happen
206     let num = poll
207         .poll(&mut events, Some(Duration::from_millis(300)))
208         .unwrap();
209     assert_eq!(0, num);
210 
211     // Push the value
212     tx.send("hello").unwrap();
213 
214     // Polling will contain the event
215     for i in 0..5 {
216         let num = poll
217             .poll(&mut events, Some(Duration::from_millis(300)))
218             .unwrap();
219         assert!(1 == num, "actually got {} on iteration {}", num, i);
220 
221         let event = events.iter().next().unwrap();
222         assert_eq!(event.token(), Token(123));
223         assert_eq!(event.readiness(), Ready::readable());
224     }
225 
226     // Read the value
227     assert_eq!("hello", rx.try_recv().unwrap());
228 
229     // Wait, but nothing should happen
230     let num = poll
231         .poll(&mut events, Some(Duration::from_millis(300)))
232         .unwrap();
233     assert_eq!(0, num);
234 }
235 
236 #[test]
test_poll_channel_writable()237 pub fn test_poll_channel_writable() {
238     let poll = Poll::new().unwrap();
239     let mut events = Events::with_capacity(1024);
240     let (tx, rx) = channel::channel();
241 
242     poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge())
243         .unwrap();
244 
245     // Wait, but nothing should happen
246     let num = poll
247         .poll(&mut events, Some(Duration::from_millis(300)))
248         .unwrap();
249     assert_eq!(0, num);
250 
251     // Push the value
252     tx.send("hello").unwrap();
253 
254     // Wait, but nothing should happen
255     let num = poll
256         .poll(&mut events, Some(Duration::from_millis(300)))
257         .unwrap();
258     assert_eq!(0, num);
259 }
260 
261 #[test]
test_dropping_receive_before_poll()262 pub fn test_dropping_receive_before_poll() {
263     let poll = Poll::new().unwrap();
264     let mut events = Events::with_capacity(1024);
265     let (tx, rx) = channel::channel();
266 
267     poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
268         .unwrap();
269 
270     // Push the value
271     tx.send("hello").unwrap();
272 
273     // Drop the receive end
274     drop(rx);
275 
276     // Wait, but nothing should happen
277     let num = poll
278         .poll(&mut events, Some(Duration::from_millis(300)))
279         .unwrap();
280     assert_eq!(0, num);
281 }
282 
283 #[test]
test_mixing_channel_with_socket()284 pub fn test_mixing_channel_with_socket() {
285     use mio::net::{TcpListener, TcpStream};
286 
287     let poll = Poll::new().unwrap();
288     let mut events = Events::with_capacity(1024);
289     let (tx, rx) = channel::channel();
290 
291     // Create the listener
292     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
293 
294     // Register the listener with `Poll`
295     poll.register(&l, Token(0), Ready::readable(), PollOpt::edge())
296         .unwrap();
297     poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge())
298         .unwrap();
299 
300     // Push a value onto the channel
301     tx.send("hello").unwrap();
302 
303     // Connect a TCP socket
304     let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
305 
306     // Register the socket
307     poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge())
308         .unwrap();
309 
310     // Sleep a bit to ensure it arrives at dest
311     thread::sleep(Duration::from_millis(250));
312 
313     expect_events(
314         &poll,
315         &mut events,
316         2,
317         vec![
318             Event::new(Ready::empty(), Token(0)),
319             Event::new(Ready::empty(), Token(1)),
320         ],
321     );
322 }
323 
324 #[test]
test_sending_from_other_thread_while_polling()325 pub fn test_sending_from_other_thread_while_polling() {
326     const ITERATIONS: usize = 20;
327     const THREADS: usize = 5;
328 
329     // Make sure to run multiple times
330     let poll = Poll::new().unwrap();
331     let mut events = Events::with_capacity(1024);
332 
333     for _ in 0..ITERATIONS {
334         let (tx, rx) = channel::channel();
335         poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge())
336             .unwrap();
337 
338         for _ in 0..THREADS {
339             let tx = tx.clone();
340 
341             thread::spawn(move || {
342                 thread::sleep(Duration::from_millis(50));
343                 tx.send("ping").unwrap();
344             });
345         }
346 
347         let mut recv = 0;
348 
349         while recv < THREADS {
350             let num = poll.poll(&mut events, None).unwrap();
351 
352             if num != 0 {
353                 assert_eq!(1, num);
354                 assert_eq!(events.iter().next().unwrap().token(), Token(0));
355 
356                 while let Ok(_) = rx.try_recv() {
357                     recv += 1;
358                 }
359             }
360         }
361     }
362 }
363