1 #![cfg(unix)]
2 
3 extern crate curl;
4 extern crate mio;
5 extern crate mio_extras;
6 
7 use std::collections::HashMap;
8 use std::io::{Cursor, Read};
9 use std::time::Duration;
10 
11 use curl::easy::{Easy, List};
12 use curl::multi::Multi;
13 
14 macro_rules! t {
15     ($e:expr) => {
16         match $e {
17             Ok(e) => e,
18             Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
19         }
20     };
21 }
22 
23 use server::Server;
24 mod server;
25 
26 #[test]
smoke()27 fn smoke() {
28     let m = Multi::new();
29     let mut e = Easy::new();
30 
31     let s = Server::new();
32     s.receive(
33         "\
34          GET / HTTP/1.1\r\n\
35          Host: 127.0.0.1:$PORT\r\n\
36          Accept: */*\r\n\
37          \r\n",
38     );
39     s.send("HTTP/1.1 200 OK\r\n\r\n");
40 
41     t!(e.url(&s.url("/")));
42     let _e = t!(m.add(e));
43     while t!(m.perform()) > 0 {
44         t!(m.wait(&mut [], Duration::from_secs(1)));
45     }
46 }
47 
48 #[test]
smoke2()49 fn smoke2() {
50     let m = Multi::new();
51 
52     let s1 = Server::new();
53     s1.receive(
54         "\
55          GET / HTTP/1.1\r\n\
56          Host: 127.0.0.1:$PORT\r\n\
57          Accept: */*\r\n\
58          \r\n",
59     );
60     s1.send("HTTP/1.1 200 OK\r\n\r\n");
61 
62     let s2 = Server::new();
63     s2.receive(
64         "\
65          GET / HTTP/1.1\r\n\
66          Host: 127.0.0.1:$PORT\r\n\
67          Accept: */*\r\n\
68          \r\n",
69     );
70     s2.send("HTTP/1.1 200 OK\r\n\r\n");
71 
72     let mut e1 = Easy::new();
73     t!(e1.url(&s1.url("/")));
74     let _e1 = t!(m.add(e1));
75     let mut e2 = Easy::new();
76     t!(e2.url(&s2.url("/")));
77     let _e2 = t!(m.add(e2));
78 
79     while t!(m.perform()) > 0 {
80         t!(m.wait(&mut [], Duration::from_secs(1)));
81     }
82 
83     let mut done = 0;
84     m.messages(|msg| {
85         msg.result().unwrap().unwrap();
86         done += 1;
87     });
88     assert_eq!(done, 2);
89 }
90 
91 #[test]
upload_lots()92 fn upload_lots() {
93     use curl::multi::{Events, Socket, SocketEvents};
94 
95     #[derive(Debug)]
96     enum Message {
97         Timeout(Option<Duration>),
98         Wait(Socket, SocketEvents, usize),
99     }
100 
101     let mut m = Multi::new();
102     let poll = t!(mio::Poll::new());
103     let (tx, rx) = mio_extras::channel::channel();
104     let tx2 = tx.clone();
105     t!(m.socket_function(move |socket, events, token| {
106         t!(tx2.send(Message::Wait(socket, events, token)));
107     }));
108     t!(m.timer_function(move |dur| {
109         t!(tx.send(Message::Timeout(dur)));
110         true
111     }));
112 
113     let s = Server::new();
114     s.receive(&format!(
115         "\
116          PUT / HTTP/1.1\r\n\
117          Host: 127.0.0.1:$PORT\r\n\
118          Accept: */*\r\n\
119          Content-Length: 131072\r\n\
120          \r\n\
121          {}\n",
122         vec!["a"; 128 * 1024 - 1].join("")
123     ));
124     s.send(
125         "\
126          HTTP/1.1 200 OK\r\n\
127          \r\n",
128     );
129 
130     let mut data = vec![b'a'; 128 * 1024 - 1];
131     data.push(b'\n');
132     let mut data = Cursor::new(data);
133     let mut list = List::new();
134     t!(list.append("Expect:"));
135     let mut h = Easy::new();
136     t!(h.url(&s.url("/")));
137     t!(h.put(true));
138     t!(h.read_function(move |buf| Ok(data.read(buf).unwrap())));
139     t!(h.in_filesize(128 * 1024));
140     t!(h.upload(true));
141     t!(h.http_headers(list));
142 
143     t!(poll.register(&rx, mio::Token(0), mio::Ready::all(), mio::PollOpt::level()));
144 
145     let e = t!(m.add(h));
146 
147     assert!(t!(m.perform()) > 0);
148     let mut next_token = 1;
149     let mut token_map = HashMap::new();
150     let mut cur_timeout = None;
151     let mut events = mio::Events::with_capacity(128);
152     let mut running = true;
153 
154     while running {
155         let n = t!(poll.poll(&mut events, cur_timeout));
156 
157         if n == 0 {
158             if t!(m.timeout()) == 0 {
159                 running = false;
160             }
161         }
162 
163         for event in events.iter() {
164             while event.token() == mio::Token(0) {
165                 match rx.try_recv() {
166                     Ok(Message::Timeout(dur)) => cur_timeout = dur,
167                     Ok(Message::Wait(socket, events, token)) => {
168                         let evented = mio::unix::EventedFd(&socket);
169                         if events.remove() {
170                             token_map.remove(&token).unwrap();
171                         } else {
172                             let mut e = mio::Ready::none();
173                             if events.input() {
174                                 e = e | mio::Ready::readable();
175                             }
176                             if events.output() {
177                                 e = e | mio::Ready::writable();
178                             }
179                             if token == 0 {
180                                 let token = next_token;
181                                 next_token += 1;
182                                 t!(m.assign(socket, token));
183                                 token_map.insert(token, socket);
184                                 t!(poll.register(
185                                     &evented,
186                                     mio::Token(token),
187                                     e,
188                                     mio::PollOpt::level()
189                                 ));
190                             } else {
191                                 t!(poll.reregister(
192                                     &evented,
193                                     mio::Token(token),
194                                     e,
195                                     mio::PollOpt::level()
196                                 ));
197                             }
198                         }
199                     }
200                     Err(_) => break,
201                 }
202             }
203 
204             if event.token() == mio::Token(0) {
205                 continue;
206             }
207 
208             let token = event.token();
209             let socket = token_map[&token.into()];
210             let mut e = Events::new();
211             if event.kind().is_readable() {
212                 e.input(true);
213             }
214             if event.kind().is_writable() {
215                 e.output(true);
216             }
217             if event.kind().is_error() {
218                 e.error(true);
219             }
220             let remaining = t!(m.action(socket, &e));
221             if remaining == 0 {
222                 running = false;
223             }
224         }
225     }
226 
227     let mut done = 0;
228     m.messages(|m| {
229         m.result().unwrap().unwrap();
230         done += 1;
231     });
232     assert_eq!(done, 1);
233 
234     let mut e = t!(m.remove(e));
235     assert_eq!(t!(e.response_code()), 200);
236 }
237 
238 // Tests passing raw file descriptors to Multi::wait. The test is limited to Linux only as the
239 // semantics of the underlying poll(2) system call used by curl apparently differ on other
240 // platforms, making the test fail.
241 #[cfg(target_os = "linux")]
242 #[test]
waitfds()243 fn waitfds() {
244     use curl::multi::WaitFd;
245     use std::fs::File;
246     use std::os::unix::io::AsRawFd;
247 
248     let filenames = ["/dev/null", "/dev/zero", "/dev/urandom"];
249     let files: Vec<File> = filenames
250         .iter()
251         .map(|filename| File::open(filename).unwrap())
252         .collect();
253     let mut waitfds: Vec<WaitFd> = files
254         .iter()
255         .map(|f| {
256             let mut waitfd = WaitFd::new();
257             waitfd.set_fd(f.as_raw_fd());
258             waitfd.poll_on_read(true);
259             waitfd
260         })
261         .collect();
262 
263     let m = Multi::new();
264     let events = t!(m.wait(&mut waitfds, Duration::from_secs(1)));
265     assert_eq!(events, 3);
266     for waitfd in waitfds {
267         assert!(waitfd.received_read());
268     }
269 }
270