1 use std::io;
2
3 use futures_util::future;
4 use tokio::net::TcpStream;
5
6 use super::Client;
7
8 #[tokio::test]
client_connect_uri_argument()9 async fn client_connect_uri_argument() {
10 let connector = tower::service_fn(|dst: http::Uri| {
11 assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP));
12 assert_eq!(dst.host(), Some("example.local"));
13 assert_eq!(dst.port(), None);
14 assert_eq!(dst.path(), "/", "path should be removed");
15
16 future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
17 });
18
19 let client = Client::builder().build::<_, crate::Body>(connector);
20 let _ = client
21 .get("http://example.local/and/a/path".parse().unwrap())
22 .await
23 .expect_err("response should fail");
24 }
25
26 /*
27 // FIXME: re-implement tests with `async/await`
28 #[test]
29 fn retryable_request() {
30 let _ = pretty_env_logger::try_init();
31
32 let mut rt = Runtime::new().expect("new rt");
33 let mut connector = MockConnector::new();
34
35 let sock1 = connector.mock("http://mock.local");
36 let sock2 = connector.mock("http://mock.local");
37
38 let client = Client::builder()
39 .build::<_, crate::Body>(connector);
40
41 client.pool.no_timer();
42
43 {
44
45 let req = Request::builder()
46 .uri("http://mock.local/a")
47 .body(Default::default())
48 .unwrap();
49 let res1 = client.request(req);
50 let srv1 = poll_fn(|| {
51 try_ready!(sock1.read(&mut [0u8; 512]));
52 try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
53 Ok(Async::Ready(()))
54 }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
55 rt.block_on(res1.join(srv1)).expect("res1");
56 }
57 drop(sock1);
58
59 let req = Request::builder()
60 .uri("http://mock.local/b")
61 .body(Default::default())
62 .unwrap();
63 let res2 = client.request(req)
64 .map(|res| {
65 assert_eq!(res.status().as_u16(), 222);
66 });
67 let srv2 = poll_fn(|| {
68 try_ready!(sock2.read(&mut [0u8; 512]));
69 try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n"));
70 Ok(Async::Ready(()))
71 }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
72
73 rt.block_on(res2.join(srv2)).expect("res2");
74 }
75
76 #[test]
77 fn conn_reset_after_write() {
78 let _ = pretty_env_logger::try_init();
79
80 let mut rt = Runtime::new().expect("new rt");
81 let mut connector = MockConnector::new();
82
83 let sock1 = connector.mock("http://mock.local");
84
85 let client = Client::builder()
86 .build::<_, crate::Body>(connector);
87
88 client.pool.no_timer();
89
90 {
91 let req = Request::builder()
92 .uri("http://mock.local/a")
93 .body(Default::default())
94 .unwrap();
95 let res1 = client.request(req);
96 let srv1 = poll_fn(|| {
97 try_ready!(sock1.read(&mut [0u8; 512]));
98 try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
99 Ok(Async::Ready(()))
100 }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
101 rt.block_on(res1.join(srv1)).expect("res1");
102 }
103
104 let req = Request::builder()
105 .uri("http://mock.local/a")
106 .body(Default::default())
107 .unwrap();
108 let res2 = client.request(req);
109 let mut sock1 = Some(sock1);
110 let srv2 = poll_fn(|| {
111 // We purposefully keep the socket open until the client
112 // has written the second request, and THEN disconnect.
113 //
114 // Not because we expect servers to be jerks, but to trigger
115 // state where we write on an assumedly good connection, and
116 // only reset the close AFTER we wrote bytes.
117 try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
118 sock1.take();
119 Ok(Async::Ready(()))
120 }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
121 let err = rt.block_on(res2.join(srv2)).expect_err("res2");
122 assert!(err.is_incomplete_message(), "{:?}", err);
123 }
124
125 #[test]
126 fn checkout_win_allows_connect_future_to_be_pooled() {
127 let _ = pretty_env_logger::try_init();
128
129 let mut rt = Runtime::new().expect("new rt");
130 let mut connector = MockConnector::new();
131
132
133 let (tx, rx) = oneshot::channel::<()>();
134 let sock1 = connector.mock("http://mock.local");
135 let sock2 = connector.mock_fut("http://mock.local", rx);
136
137 let client = Client::builder()
138 .build::<_, crate::Body>(connector);
139
140 client.pool.no_timer();
141
142 let uri = "http://mock.local/a".parse::<crate::Uri>().expect("uri parse");
143
144 // First request just sets us up to have a connection able to be put
145 // back in the pool. *However*, it doesn't insert immediately. The
146 // body has 1 pending byte, and we will only drain in request 2, once
147 // the connect future has been started.
148 let mut body = {
149 let res1 = client.get(uri.clone())
150 .map(|res| res.into_body().concat2());
151 let srv1 = poll_fn(|| {
152 try_ready!(sock1.read(&mut [0u8; 512]));
153 // Chunked is used so as to force 2 body reads.
154 try_ready!(sock1.write(b"\
155 HTTP/1.1 200 OK\r\n\
156 transfer-encoding: chunked\r\n\
157 \r\n\
158 1\r\nx\r\n\
159 0\r\n\r\n\
160 "));
161 Ok(Async::Ready(()))
162 }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
163
164 rt.block_on(res1.join(srv1)).expect("res1").0
165 };
166
167
168 // The second request triggers the only mocked connect future, but then
169 // the drained body allows the first socket to go back to the pool,
170 // "winning" the checkout race.
171 {
172 let res2 = client.get(uri.clone());
173 let drain = poll_fn(move || {
174 body.poll()
175 });
176 let srv2 = poll_fn(|| {
177 try_ready!(sock1.read(&mut [0u8; 512]));
178 try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx"));
179 Ok(Async::Ready(()))
180 }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
181
182 rt.block_on(res2.join(drain).join(srv2)).expect("res2");
183 }
184
185 // "Release" the mocked connect future, and let the runtime spin once so
186 // it's all setup...
187 {
188 let mut tx = Some(tx);
189 let client = &client;
190 let key = client.pool.h1_key("http://mock.local");
191 let mut tick_cnt = 0;
192 let fut = poll_fn(move || {
193 tx.take();
194
195 if client.pool.idle_count(&key) == 0 {
196 tick_cnt += 1;
197 assert!(tick_cnt < 10, "ticked too many times waiting for idle");
198 trace!("no idle yet; tick count: {}", tick_cnt);
199 ::futures::task::current().notify();
200 Ok(Async::NotReady)
201 } else {
202 Ok::<_, ()>(Async::Ready(()))
203 }
204 });
205 rt.block_on(fut).unwrap();
206 }
207
208 // Third request just tests out that the "loser" connection was pooled. If
209 // it isn't, this will panic since the MockConnector doesn't have any more
210 // mocks to give out.
211 {
212 let res3 = client.get(uri);
213 let srv3 = poll_fn(|| {
214 try_ready!(sock2.read(&mut [0u8; 512]));
215 try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
216 Ok(Async::Ready(()))
217 }).map_err(|e: std::io::Error| panic!("srv3 poll_fn error: {}", e));
218
219 rt.block_on(res3.join(srv3)).expect("res3");
220 }
221 }
222
223 #[cfg(feature = "nightly")]
224 #[bench]
225 fn bench_http1_get_0b(b: &mut test::Bencher) {
226 let _ = pretty_env_logger::try_init();
227
228 let mut rt = Runtime::new().expect("new rt");
229 let mut connector = MockConnector::new();
230
231
232 let client = Client::builder()
233 .build::<_, crate::Body>(connector.clone());
234
235 client.pool.no_timer();
236
237 let uri = Uri::from_static("http://mock.local/a");
238
239 b.iter(move || {
240 let sock1 = connector.mock("http://mock.local");
241 let res1 = client
242 .get(uri.clone())
243 .and_then(|res| {
244 res.into_body().for_each(|_| Ok(()))
245 });
246 let srv1 = poll_fn(|| {
247 try_ready!(sock1.read(&mut [0u8; 512]));
248 try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
249 Ok(Async::Ready(()))
250 }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
251 rt.block_on(res1.join(srv1)).expect("res1");
252 });
253 }
254
255 #[cfg(feature = "nightly")]
256 #[bench]
257 fn bench_http1_get_10b(b: &mut test::Bencher) {
258 let _ = pretty_env_logger::try_init();
259
260 let mut rt = Runtime::new().expect("new rt");
261 let mut connector = MockConnector::new();
262
263
264 let client = Client::builder()
265 .build::<_, crate::Body>(connector.clone());
266
267 client.pool.no_timer();
268
269 let uri = Uri::from_static("http://mock.local/a");
270
271 b.iter(move || {
272 let sock1 = connector.mock("http://mock.local");
273 let res1 = client
274 .get(uri.clone())
275 .and_then(|res| {
276 res.into_body().for_each(|_| Ok(()))
277 });
278 let srv1 = poll_fn(|| {
279 try_ready!(sock1.read(&mut [0u8; 512]));
280 try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n0123456789"));
281 Ok(Async::Ready(()))
282 }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
283 rt.block_on(res1.join(srv1)).expect("res1");
284 });
285 }
286 */
287