1 use {expect_events, sleep_ms};
2 use mio::{channel, Events, Poll, PollOpt, Ready, Token};
3 use mio::event::Event;
4 use std::sync::mpsc::TryRecvError;
5 use std::thread;
6 use std::time::Duration;
7 
8 #[test]
test_poll_channel_edge()9 pub fn test_poll_channel_edge() {
10     let poll = Poll::new().unwrap();
11     let mut events = Events::with_capacity(1024);
12     let (tx, rx) = channel::channel();
13 
14     poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
15 
16     // Wait, but nothing should happen
17     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
18     assert_eq!(0, num);
19 
20     // Push the value
21     tx.send("hello").unwrap();
22 
23     // Polling will contain the event
24     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
25     assert_eq!(1, num);
26 
27     let event = events.get(0).unwrap();
28     assert_eq!(event.token(), Token(123));
29     assert_eq!(event.readiness(), Ready::readable());
30 
31     // Poll again and there should be no events
32     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
33     assert_eq!(0, num);
34 
35     // Read the value
36     assert_eq!("hello", rx.try_recv().unwrap());
37 
38     // Poll again, nothing
39     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
40     assert_eq!(0, num);
41 
42     // Push a value
43     tx.send("goodbye").unwrap();
44 
45     // Have an event
46     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
47     assert_eq!(1, num);
48 
49     let event = events.get(0).unwrap();
50     assert_eq!(event.token(), Token(123));
51     assert_eq!(event.readiness(), Ready::readable());
52 
53     // Read the value
54     rx.try_recv().unwrap();
55 
56     // Drop the sender half
57     drop(tx);
58 
59     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
60     assert_eq!(1, num);
61 
62     let event = events.get(0).unwrap();
63     assert_eq!(event.token(), Token(123));
64     assert_eq!(event.readiness(), Ready::readable());
65 
66     match rx.try_recv() {
67         Err(TryRecvError::Disconnected) => {}
68         no => panic!("unexpected value {:?}", no),
69     }
70 
71 }
72 
73 #[test]
test_poll_channel_oneshot()74 pub fn test_poll_channel_oneshot() {
75     let poll = Poll::new().unwrap();
76     let mut events = Events::with_capacity(1024);
77     let (tx, rx) = channel::channel();
78 
79     poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
80 
81     // Wait, but nothing should happen
82     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
83     assert_eq!(0, num);
84 
85     // Push the value
86     tx.send("hello").unwrap();
87 
88     // Polling will contain the event
89     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
90     assert_eq!(1, num);
91 
92     let event = events.get(0).unwrap();
93     assert_eq!(event.token(), Token(123));
94     assert_eq!(event.readiness(), Ready::readable());
95 
96     // Poll again and there should be no events
97     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
98     assert_eq!(0, num);
99 
100     // Read the value
101     assert_eq!("hello", rx.try_recv().unwrap());
102 
103     // Poll again, nothing
104     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
105     assert_eq!(0, num);
106 
107     // Push a value
108     tx.send("goodbye").unwrap();
109 
110     // Poll again, nothing
111     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
112     assert_eq!(0, num);
113 
114     // Reregistering will re-trigger the notification
115     for _ in 0..3 {
116         poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
117 
118         // Have an event
119         let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
120         assert_eq!(1, num);
121 
122         let event = events.get(0).unwrap();
123         assert_eq!(event.token(), Token(123));
124         assert_eq!(event.readiness(), Ready::readable());
125     }
126 
127     // Get the value
128     assert_eq!("goodbye", rx.try_recv().unwrap());
129 
130     poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
131 
132     // Have an event
133     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
134     assert_eq!(0, num);
135 
136     poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
137 
138     // Have an event
139     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
140     assert_eq!(0, num);
141 }
142 
143 #[test]
test_poll_channel_level()144 pub fn test_poll_channel_level() {
145     let poll = Poll::new().unwrap();
146     let mut events = Events::with_capacity(1024);
147     let (tx, rx) = channel::channel();
148 
149     poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap();
150 
151     // Wait, but nothing should happen
152     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
153     assert_eq!(0, num);
154 
155     // Push the value
156     tx.send("hello").unwrap();
157 
158     // Polling will contain the event
159     for i in 0..5 {
160         let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
161         assert!(1 == num, "actually got {} on iteration {}", num, i);
162 
163         let event = events.get(0).unwrap();
164         assert_eq!(event.token(), Token(123));
165         assert_eq!(event.readiness(), Ready::readable());
166     }
167 
168     // Read the value
169     assert_eq!("hello", rx.try_recv().unwrap());
170 
171     // Wait, but nothing should happen
172     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
173     assert_eq!(0, num);
174 }
175 
176 #[test]
test_poll_channel_writable()177 pub fn test_poll_channel_writable() {
178     let poll = Poll::new().unwrap();
179     let mut events = Events::with_capacity(1024);
180     let (tx, rx) = channel::channel();
181 
182     poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap();
183 
184     // Wait, but nothing should happen
185     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
186     assert_eq!(0, num);
187 
188     // Push the value
189     tx.send("hello").unwrap();
190 
191     // Wait, but nothing should happen
192     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
193     assert_eq!(0, num);
194 }
195 
196 #[test]
test_dropping_receive_before_poll()197 pub fn test_dropping_receive_before_poll() {
198     let poll = Poll::new().unwrap();
199     let mut events = Events::with_capacity(1024);
200     let (tx, rx) = channel::channel();
201 
202     poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
203 
204     // Push the value
205     tx.send("hello").unwrap();
206 
207     // Drop the receive end
208     drop(rx);
209 
210     // Wait, but nothing should happen
211     let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
212     assert_eq!(0, num);
213 }
214 
215 #[test]
test_mixing_channel_with_socket()216 pub fn test_mixing_channel_with_socket() {
217     use mio::net::{TcpListener, TcpStream};
218 
219     let poll = Poll::new().unwrap();
220     let mut events = Events::with_capacity(1024);
221     let (tx, rx) = channel::channel();
222 
223     // Create the listener
224     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
225 
226     // Register the listener with `Poll`
227     poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
228     poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
229 
230     // Push a value onto the channel
231     tx.send("hello").unwrap();
232 
233     // Connect a TCP socket
234     let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
235 
236     // Register the socket
237     poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
238 
239     // Sleep a bit to ensure it arrives at dest
240     sleep_ms(250);
241 
242     expect_events(&poll, &mut events, 2, vec![
243         Event::new(Ready::empty(), Token(0)),
244         Event::new(Ready::empty(), Token(1)),
245     ]);
246 }
247 
248 #[test]
test_sending_from_other_thread_while_polling()249 pub fn test_sending_from_other_thread_while_polling() {
250     const ITERATIONS: usize = 20;
251     const THREADS: usize = 5;
252 
253     // Make sure to run multiple times
254     let poll = Poll::new().unwrap();
255     let mut events = Events::with_capacity(1024);
256 
257     for _ in 0..ITERATIONS {
258         let (tx, rx) = channel::channel();
259         poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
260 
261         for _ in 0..THREADS {
262             let tx = tx.clone();
263 
264             thread::spawn(move || {
265                 sleep_ms(50);
266                 tx.send("ping").unwrap();
267             });
268         }
269 
270         let mut recv = 0;
271 
272         while recv < THREADS {
273             let num = poll.poll(&mut events, None).unwrap();
274 
275             if num != 0 {
276                 assert_eq!(1, num);
277                 assert_eq!(events.get(0).unwrap().token(), Token(0));
278 
279                 while let Ok(_) = rx.try_recv() {
280                     recv += 1;
281                 }
282             }
283         }
284     }
285 }
286