1 use std::fmt;
2 use std::fs::File;
3 use std::future::Future;
4 #[cfg(feature = "multipart")]
5 use std::io::Cursor;
6 use std::io::{self, Read};
7 use std::mem;
8 use std::ptr;
9 
10 use bytes::buf::UninitSlice;
11 use bytes::Bytes;
12 
13 use crate::async_impl;
14 
15 /// The body of a `Request`.
16 ///
17 /// In most cases, this is not needed directly, as the
18 /// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
19 /// passing many things (like a string or vector of bytes).
20 ///
21 /// [builder]: ./struct.RequestBuilder.html#method.body
22 #[derive(Debug)]
23 pub struct Body {
24     kind: Kind,
25 }
26 
27 impl Body {
28     /// Instantiate a `Body` from a reader.
29     ///
30     /// # Note
31     ///
32     /// While allowing for many types to be used, these bodies do not have
33     /// a way to reset to the beginning and be reused. This means that when
34     /// encountering a 307 or 308 status code, instead of repeating the
35     /// request at the new location, the `Response` will be returned with
36     /// the redirect status code set.
37     ///
38     /// ```rust
39     /// # use std::fs::File;
40     /// # use reqwest::blocking::Body;
41     /// # fn run() -> Result<(), Box<std::error::Error>> {
42     /// let file = File::open("national_secrets.txt")?;
43     /// let body = Body::new(file);
44     /// # Ok(())
45     /// # }
46     /// ```
47     ///
48     /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
49     /// `From` implementations for `Body` will store the data in a manner
50     /// it can be reused.
51     ///
52     /// ```rust
53     /// # use reqwest::blocking::Body;
54     /// # fn run() -> Result<(), Box<std::error::Error>> {
55     /// let s = "A stringy body";
56     /// let body = Body::from(s);
57     /// # Ok(())
58     /// # }
59     /// ```
new<R: Read + Send + 'static>(reader: R) -> Body60     pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
61         Body {
62             kind: Kind::Reader(Box::from(reader), None),
63         }
64     }
65 
66     /// Create a `Body` from a `Read` where the size is known in advance
67     /// but the data should not be fully loaded into memory. This will
68     /// set the `Content-Length` header and stream from the `Read`.
69     ///
70     /// ```rust
71     /// # use std::fs::File;
72     /// # use reqwest::blocking::Body;
73     /// # fn run() -> Result<(), Box<std::error::Error>> {
74     /// let file = File::open("a_large_file.txt")?;
75     /// let file_size = file.metadata()?.len();
76     /// let body = Body::sized(file, file_size);
77     /// # Ok(())
78     /// # }
79     /// ```
sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body80     pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
81         Body {
82             kind: Kind::Reader(Box::from(reader), Some(len)),
83         }
84     }
85 
86     /// Returns the body as a byte slice if the body is already buffered in
87     /// memory. For streamed requests this method returns `None`.
as_bytes(&self) -> Option<&[u8]>88     pub fn as_bytes(&self) -> Option<&[u8]> {
89         match self.kind {
90             Kind::Reader(_, _) => None,
91             Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
92         }
93     }
94 
95     /// Converts streamed requests to their buffered equivalent and
96     /// returns a reference to the buffer. If the request is already
97     /// buffered, this has no effect.
98     ///
99     /// Be aware that for large requests this method is expensive
100     /// and may cause your program to run out of memory.
buffer(&mut self) -> Result<&[u8], crate::Error>101     pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
102         match self.kind {
103             Kind::Reader(ref mut reader, maybe_len) => {
104                 let mut bytes = if let Some(len) = maybe_len {
105                     Vec::with_capacity(len as usize)
106                 } else {
107                     Vec::new()
108                 };
109                 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
110                 self.kind = Kind::Bytes(bytes.into());
111                 self.buffer()
112             }
113             Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
114         }
115     }
116 
117     #[cfg(feature = "multipart")]
len(&self) -> Option<u64>118     pub(crate) fn len(&self) -> Option<u64> {
119         match self.kind {
120             Kind::Reader(_, len) => len,
121             Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
122         }
123     }
124 
125     #[cfg(feature = "multipart")]
into_reader(self) -> Reader126     pub(crate) fn into_reader(self) -> Reader {
127         match self.kind {
128             Kind::Reader(r, _) => Reader::Reader(r),
129             Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
130         }
131     }
132 
into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>)133     pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
134         match self.kind {
135             Kind::Reader(read, len) => {
136                 let (tx, rx) = hyper::Body::channel();
137                 let tx = Sender {
138                     body: (read, len),
139                     tx,
140                 };
141                 (Some(tx), async_impl::Body::wrap(rx), len)
142             }
143             Kind::Bytes(chunk) => {
144                 let len = chunk.len() as u64;
145                 (None, async_impl::Body::reusable(chunk), Some(len))
146             }
147         }
148     }
149 
try_clone(&self) -> Option<Body>150     pub(crate) fn try_clone(&self) -> Option<Body> {
151         self.kind.try_clone().map(|kind| Body { kind })
152     }
153 }
154 
155 enum Kind {
156     Reader(Box<dyn Read + Send>, Option<u64>),
157     Bytes(Bytes),
158 }
159 
160 impl Kind {
try_clone(&self) -> Option<Kind>161     fn try_clone(&self) -> Option<Kind> {
162         match self {
163             Kind::Reader(..) => None,
164             Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
165         }
166     }
167 }
168 
169 impl From<Vec<u8>> for Body {
170     #[inline]
from(v: Vec<u8>) -> Body171     fn from(v: Vec<u8>) -> Body {
172         Body {
173             kind: Kind::Bytes(v.into()),
174         }
175     }
176 }
177 
178 impl From<String> for Body {
179     #[inline]
from(s: String) -> Body180     fn from(s: String) -> Body {
181         s.into_bytes().into()
182     }
183 }
184 
185 impl From<&'static [u8]> for Body {
186     #[inline]
from(s: &'static [u8]) -> Body187     fn from(s: &'static [u8]) -> Body {
188         Body {
189             kind: Kind::Bytes(Bytes::from_static(s)),
190         }
191     }
192 }
193 
194 impl From<&'static str> for Body {
195     #[inline]
from(s: &'static str) -> Body196     fn from(s: &'static str) -> Body {
197         s.as_bytes().into()
198     }
199 }
200 
201 impl From<File> for Body {
202     #[inline]
from(f: File) -> Body203     fn from(f: File) -> Body {
204         let len = f.metadata().map(|m| m.len()).ok();
205         Body {
206             kind: Kind::Reader(Box::new(f), len),
207         }
208     }
209 }
210 impl From<Bytes> for Body {
211     #[inline]
from(b: Bytes) -> Body212     fn from(b: Bytes) -> Body {
213         Body {
214             kind: Kind::Bytes(b),
215         }
216     }
217 }
218 
219 impl fmt::Debug for Kind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result220     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221         match *self {
222             Kind::Reader(_, ref v) => f
223                 .debug_struct("Reader")
224                 .field("length", &DebugLength(v))
225                 .finish(),
226             Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
227         }
228     }
229 }
230 
231 struct DebugLength<'a>(&'a Option<u64>);
232 
233 impl<'a> fmt::Debug for DebugLength<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result234     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235         match *self.0 {
236             Some(ref len) => fmt::Debug::fmt(len, f),
237             None => f.write_str("Unknown"),
238         }
239     }
240 }
241 
242 #[cfg(feature = "multipart")]
243 pub(crate) enum Reader {
244     Reader(Box<dyn Read + Send>),
245     Bytes(Cursor<Bytes>),
246 }
247 
248 #[cfg(feature = "multipart")]
249 impl Read for Reader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>250     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251         match *self {
252             Reader::Reader(ref mut rdr) => rdr.read(buf),
253             Reader::Bytes(ref mut rdr) => rdr.read(buf),
254         }
255     }
256 }
257 
258 pub(crate) struct Sender {
259     body: (Box<dyn Read + Send>, Option<u64>),
260     tx: hyper::body::Sender,
261 }
262 
send_future(sender: Sender) -> Result<(), crate::Error>263 async fn send_future(sender: Sender) -> Result<(), crate::Error> {
264     use bytes::{BufMut, BytesMut};
265     use std::cmp;
266 
267     let con_len = sender.body.1;
268     let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
269     let mut written = 0;
270     let mut buf = BytesMut::with_capacity(cap as usize);
271     let mut body = sender.body.0;
272     // Put in an option so that it can be consumed on error to call abort()
273     let mut tx = Some(sender.tx);
274 
275     loop {
276         if Some(written) == con_len {
277             // Written up to content-length, so stop.
278             return Ok(());
279         }
280 
281         // The input stream is read only if the buffer is empty so
282         // that there is only one read in the buffer at any time.
283         //
284         // We need to know whether there is any data to send before
285         // we check the transmission channel (with poll_ready below)
286         // because somestimes the receiver disappears as soon as is
287         // considers the data is completely transmitted, which may
288         // be true.
289         //
290         // The use case is a web server that closes its
291         // input stream as soon as the data received is valid JSON.
292         // This behaviour is questionable, but it exists and the
293         // fact is that there is actually no remaining data to read.
294         if buf.is_empty() {
295             if buf.remaining_mut() == 0 {
296                 buf.reserve(8192);
297                 // zero out the reserved memory
298                 let uninit = buf.chunk_mut();
299                 unsafe {
300                     ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
301                 }
302             }
303 
304             let bytes = unsafe { mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut()) };
305             match body.read(bytes) {
306                 Ok(0) => {
307                     // The buffer was empty and nothing's left to
308                     // read. Return.
309                     return Ok(());
310                 }
311                 Ok(n) => unsafe {
312                     buf.advance_mut(n);
313                 },
314                 Err(e) => {
315                     tx.take().expect("tx only taken on error").abort();
316                     return Err(crate::error::body(e));
317                 }
318             }
319         }
320 
321         // The only way to get here is when the buffer is not empty.
322         // We can check the transmission channel
323 
324         let buf_len = buf.len() as u64;
325         tx.as_mut()
326             .expect("tx only taken on error")
327             .send_data(buf.split().freeze())
328             .await
329             .map_err(crate::error::body)?;
330 
331         written += buf_len;
332     }
333 }
334 
335 impl Sender {
336     // A `Future` that may do blocking read calls.
337     // As a `Future`, this integrates easily with `wait::timeout`.
send(self) -> impl Future<Output = Result<(), crate::Error>>338     pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
339         send_future(self)
340     }
341 }
342 
343 // useful for tests, but not publicly exposed
344 #[cfg(test)]
read_to_string(mut body: Body) -> io::Result<String>345 pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
346     let mut s = String::new();
347     match body.kind {
348         Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
349         Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
350     }
351     .map(|_| s)
352 }
353