1 use {expect_events, sleep_ms};
2 use mio::{channel, Events, Poll, PollOpt, Ready, Token};
3 use mio::event::Event;
4 use std::sync::mpsc::TryRecvError;
5 use std::thread;
6 use std::time::Duration;
7
8 #[test]
test_poll_channel_edge()9 pub fn test_poll_channel_edge() {
10 let poll = Poll::new().unwrap();
11 let mut events = Events::with_capacity(1024);
12 let (tx, rx) = channel::channel();
13
14 poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
15
16 // Wait, but nothing should happen
17 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
18 assert_eq!(0, num);
19
20 // Push the value
21 tx.send("hello").unwrap();
22
23 // Polling will contain the event
24 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
25 assert_eq!(1, num);
26
27 let event = events.get(0).unwrap();
28 assert_eq!(event.token(), Token(123));
29 assert_eq!(event.readiness(), Ready::readable());
30
31 // Poll again and there should be no events
32 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
33 assert_eq!(0, num);
34
35 // Read the value
36 assert_eq!("hello", rx.try_recv().unwrap());
37
38 // Poll again, nothing
39 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
40 assert_eq!(0, num);
41
42 // Push a value
43 tx.send("goodbye").unwrap();
44
45 // Have an event
46 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
47 assert_eq!(1, num);
48
49 let event = events.get(0).unwrap();
50 assert_eq!(event.token(), Token(123));
51 assert_eq!(event.readiness(), Ready::readable());
52
53 // Read the value
54 rx.try_recv().unwrap();
55
56 // Drop the sender half
57 drop(tx);
58
59 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
60 assert_eq!(1, num);
61
62 let event = events.get(0).unwrap();
63 assert_eq!(event.token(), Token(123));
64 assert_eq!(event.readiness(), Ready::readable());
65
66 match rx.try_recv() {
67 Err(TryRecvError::Disconnected) => {}
68 no => panic!("unexpected value {:?}", no),
69 }
70
71 }
72
73 #[test]
test_poll_channel_oneshot()74 pub fn test_poll_channel_oneshot() {
75 let poll = Poll::new().unwrap();
76 let mut events = Events::with_capacity(1024);
77 let (tx, rx) = channel::channel();
78
79 poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
80
81 // Wait, but nothing should happen
82 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
83 assert_eq!(0, num);
84
85 // Push the value
86 tx.send("hello").unwrap();
87
88 // Polling will contain the event
89 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
90 assert_eq!(1, num);
91
92 let event = events.get(0).unwrap();
93 assert_eq!(event.token(), Token(123));
94 assert_eq!(event.readiness(), Ready::readable());
95
96 // Poll again and there should be no events
97 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
98 assert_eq!(0, num);
99
100 // Read the value
101 assert_eq!("hello", rx.try_recv().unwrap());
102
103 // Poll again, nothing
104 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
105 assert_eq!(0, num);
106
107 // Push a value
108 tx.send("goodbye").unwrap();
109
110 // Poll again, nothing
111 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
112 assert_eq!(0, num);
113
114 // Reregistering will re-trigger the notification
115 for _ in 0..3 {
116 poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
117
118 // Have an event
119 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
120 assert_eq!(1, num);
121
122 let event = events.get(0).unwrap();
123 assert_eq!(event.token(), Token(123));
124 assert_eq!(event.readiness(), Ready::readable());
125 }
126
127 // Get the value
128 assert_eq!("goodbye", rx.try_recv().unwrap());
129
130 poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
131
132 // Have an event
133 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
134 assert_eq!(0, num);
135
136 poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
137
138 // Have an event
139 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
140 assert_eq!(0, num);
141 }
142
143 #[test]
test_poll_channel_level()144 pub fn test_poll_channel_level() {
145 let poll = Poll::new().unwrap();
146 let mut events = Events::with_capacity(1024);
147 let (tx, rx) = channel::channel();
148
149 poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap();
150
151 // Wait, but nothing should happen
152 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
153 assert_eq!(0, num);
154
155 // Push the value
156 tx.send("hello").unwrap();
157
158 // Polling will contain the event
159 for i in 0..5 {
160 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
161 assert!(1 == num, "actually got {} on iteration {}", num, i);
162
163 let event = events.get(0).unwrap();
164 assert_eq!(event.token(), Token(123));
165 assert_eq!(event.readiness(), Ready::readable());
166 }
167
168 // Read the value
169 assert_eq!("hello", rx.try_recv().unwrap());
170
171 // Wait, but nothing should happen
172 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
173 assert_eq!(0, num);
174 }
175
176 #[test]
test_poll_channel_writable()177 pub fn test_poll_channel_writable() {
178 let poll = Poll::new().unwrap();
179 let mut events = Events::with_capacity(1024);
180 let (tx, rx) = channel::channel();
181
182 poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap();
183
184 // Wait, but nothing should happen
185 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
186 assert_eq!(0, num);
187
188 // Push the value
189 tx.send("hello").unwrap();
190
191 // Wait, but nothing should happen
192 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
193 assert_eq!(0, num);
194 }
195
196 #[test]
test_dropping_receive_before_poll()197 pub fn test_dropping_receive_before_poll() {
198 let poll = Poll::new().unwrap();
199 let mut events = Events::with_capacity(1024);
200 let (tx, rx) = channel::channel();
201
202 poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
203
204 // Push the value
205 tx.send("hello").unwrap();
206
207 // Drop the receive end
208 drop(rx);
209
210 // Wait, but nothing should happen
211 let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
212 assert_eq!(0, num);
213 }
214
215 #[test]
test_mixing_channel_with_socket()216 pub fn test_mixing_channel_with_socket() {
217 use mio::net::{TcpListener, TcpStream};
218
219 let poll = Poll::new().unwrap();
220 let mut events = Events::with_capacity(1024);
221 let (tx, rx) = channel::channel();
222
223 // Create the listener
224 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
225
226 // Register the listener with `Poll`
227 poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
228 poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
229
230 // Push a value onto the channel
231 tx.send("hello").unwrap();
232
233 // Connect a TCP socket
234 let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
235
236 // Register the socket
237 poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
238
239 // Sleep a bit to ensure it arrives at dest
240 sleep_ms(250);
241
242 expect_events(&poll, &mut events, 2, vec![
243 Event::new(Ready::empty(), Token(0)),
244 Event::new(Ready::empty(), Token(1)),
245 ]);
246 }
247
248 #[test]
test_sending_from_other_thread_while_polling()249 pub fn test_sending_from_other_thread_while_polling() {
250 const ITERATIONS: usize = 20;
251 const THREADS: usize = 5;
252
253 // Make sure to run multiple times
254 let poll = Poll::new().unwrap();
255 let mut events = Events::with_capacity(1024);
256
257 for _ in 0..ITERATIONS {
258 let (tx, rx) = channel::channel();
259 poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
260
261 for _ in 0..THREADS {
262 let tx = tx.clone();
263
264 thread::spawn(move || {
265 sleep_ms(50);
266 tx.send("ping").unwrap();
267 });
268 }
269
270 let mut recv = 0;
271
272 while recv < THREADS {
273 let num = poll.poll(&mut events, None).unwrap();
274
275 if num != 0 {
276 assert_eq!(1, num);
277 assert_eq!(events.get(0).unwrap().token(), Token(0));
278
279 while let Ok(_) = rx.try_recv() {
280 recv += 1;
281 }
282 }
283 }
284 }
285 }
286