1 use std::fmt;
2 use std::fs::File;
3 use std::future::Future;
4 use std::io::{self, Cursor, Read};
5 use std::mem::{self, MaybeUninit};
6 use std::ptr;
7 
8 use bytes::Bytes;
9 
10 use crate::async_impl;
11 
12 /// The body of a `Request`.
13 ///
14 /// In most cases, this is not needed directly, as the
15 /// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
16 /// passing many things (like a string or vector of bytes).
17 ///
18 /// [builder]: ./struct.RequestBuilder.html#method.body
19 #[derive(Debug)]
20 pub struct Body {
21     kind: Kind,
22 }
23 
24 impl Body {
25     /// Instantiate a `Body` from a reader.
26     ///
27     /// # Note
28     ///
29     /// While allowing for many types to be used, these bodies do not have
30     /// a way to reset to the beginning and be reused. This means that when
31     /// encountering a 307 or 308 status code, instead of repeating the
32     /// request at the new location, the `Response` will be returned with
33     /// the redirect status code set.
34     ///
35     /// ```rust
36     /// # use std::fs::File;
37     /// # use reqwest::blocking::Body;
38     /// # fn run() -> Result<(), Box<std::error::Error>> {
39     /// let file = File::open("national_secrets.txt")?;
40     /// let body = Body::new(file);
41     /// # Ok(())
42     /// # }
43     /// ```
44     ///
45     /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
46     /// `From` implementations for `Body` will store the data in a manner
47     /// it can be reused.
48     ///
49     /// ```rust
50     /// # use reqwest::blocking::Body;
51     /// # fn run() -> Result<(), Box<std::error::Error>> {
52     /// let s = "A stringy body";
53     /// let body = Body::from(s);
54     /// # Ok(())
55     /// # }
56     /// ```
new<R: Read + Send + 'static>(reader: R) -> Body57     pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
58         Body {
59             kind: Kind::Reader(Box::from(reader), None),
60         }
61     }
62 
63     /// Create a `Body` from a `Read` where the size is known in advance
64     /// but the data should not be fully loaded into memory. This will
65     /// set the `Content-Length` header and stream from the `Read`.
66     ///
67     /// ```rust
68     /// # use std::fs::File;
69     /// # use reqwest::blocking::Body;
70     /// # fn run() -> Result<(), Box<std::error::Error>> {
71     /// let file = File::open("a_large_file.txt")?;
72     /// let file_size = file.metadata()?.len();
73     /// let body = Body::sized(file, file_size);
74     /// # Ok(())
75     /// # }
76     /// ```
sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body77     pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
78         Body {
79             kind: Kind::Reader(Box::from(reader), Some(len)),
80         }
81     }
82 
len(&self) -> Option<u64>83     pub(crate) fn len(&self) -> Option<u64> {
84         match self.kind {
85             Kind::Reader(_, len) => len,
86             Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
87         }
88     }
89 
into_reader(self) -> Reader90     pub(crate) fn into_reader(self) -> Reader {
91         match self.kind {
92             Kind::Reader(r, _) => Reader::Reader(r),
93             Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
94         }
95     }
96 
into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>)97     pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
98         match self.kind {
99             Kind::Reader(read, len) => {
100                 let (tx, rx) = hyper::Body::channel();
101                 let tx = Sender {
102                     body: (read, len),
103                     tx,
104                 };
105                 (Some(tx), async_impl::Body::wrap(rx), len)
106             }
107             Kind::Bytes(chunk) => {
108                 let len = chunk.len() as u64;
109                 (None, async_impl::Body::reusable(chunk), Some(len))
110             }
111         }
112     }
113 
try_clone(&self) -> Option<Body>114     pub(crate) fn try_clone(&self) -> Option<Body> {
115         self.kind.try_clone().map(|kind| Body { kind })
116     }
117 }
118 
119 enum Kind {
120     Reader(Box<dyn Read + Send>, Option<u64>),
121     Bytes(Bytes),
122 }
123 
124 impl Kind {
try_clone(&self) -> Option<Kind>125     fn try_clone(&self) -> Option<Kind> {
126         match self {
127             Kind::Reader(..) => None,
128             Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
129         }
130     }
131 }
132 
133 impl From<Vec<u8>> for Body {
134     #[inline]
from(v: Vec<u8>) -> Body135     fn from(v: Vec<u8>) -> Body {
136         Body {
137             kind: Kind::Bytes(v.into()),
138         }
139     }
140 }
141 
142 impl From<String> for Body {
143     #[inline]
from(s: String) -> Body144     fn from(s: String) -> Body {
145         s.into_bytes().into()
146     }
147 }
148 
149 impl From<&'static [u8]> for Body {
150     #[inline]
from(s: &'static [u8]) -> Body151     fn from(s: &'static [u8]) -> Body {
152         Body {
153             kind: Kind::Bytes(Bytes::from_static(s)),
154         }
155     }
156 }
157 
158 impl From<&'static str> for Body {
159     #[inline]
from(s: &'static str) -> Body160     fn from(s: &'static str) -> Body {
161         s.as_bytes().into()
162     }
163 }
164 
165 impl From<File> for Body {
166     #[inline]
from(f: File) -> Body167     fn from(f: File) -> Body {
168         let len = f.metadata().map(|m| m.len()).ok();
169         Body {
170             kind: Kind::Reader(Box::new(f), len),
171         }
172     }
173 }
174 
175 impl fmt::Debug for Kind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result176     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177         match *self {
178             Kind::Reader(_, ref v) => f
179                 .debug_struct("Reader")
180                 .field("length", &DebugLength(v))
181                 .finish(),
182             Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
183         }
184     }
185 }
186 
187 struct DebugLength<'a>(&'a Option<u64>);
188 
189 impl<'a> fmt::Debug for DebugLength<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result190     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191         match *self.0 {
192             Some(ref len) => fmt::Debug::fmt(len, f),
193             None => f.write_str("Unknown"),
194         }
195     }
196 }
197 
198 pub(crate) enum Reader {
199     Reader(Box<dyn Read + Send>),
200     Bytes(Cursor<Bytes>),
201 }
202 
203 impl Read for Reader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>204     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
205         match *self {
206             Reader::Reader(ref mut rdr) => rdr.read(buf),
207             Reader::Bytes(ref mut rdr) => rdr.read(buf),
208         }
209     }
210 }
211 
212 pub(crate) struct Sender {
213     body: (Box<dyn Read + Send>, Option<u64>),
214     tx: hyper::body::Sender,
215 }
216 
send_future(sender: Sender) -> Result<(), crate::Error>217 async fn send_future(sender: Sender) -> Result<(), crate::Error> {
218     use bytes::{BufMut, BytesMut};
219     use std::cmp;
220 
221     let con_len = sender.body.1;
222     let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
223     let mut written = 0;
224     let mut buf = BytesMut::with_capacity(cap as usize);
225     let mut body = sender.body.0;
226     // Put in an option so that it can be consumed on error to call abort()
227     let mut tx = Some(sender.tx);
228 
229     loop {
230         if Some(written) == con_len {
231             // Written up to content-length, so stop.
232             return Ok(());
233         }
234 
235         // The input stream is read only if the buffer is empty so
236         // that there is only one read in the buffer at any time.
237         //
238         // We need to know whether there is any data to send before
239         // we check the transmission channel (with poll_ready below)
240         // because somestimes the receiver disappears as soon as is
241         // considers the data is completely transmitted, which may
242         // be true.
243         //
244         // The use case is a web server that closes its
245         // input stream as soon as the data received is valid JSON.
246         // This behaviour is questionable, but it exists and the
247         // fact is that there is actually no remaining data to read.
248         if buf.is_empty() {
249             if buf.remaining_mut() == 0 {
250                 buf.reserve(8192);
251                 // zero out the reserved memory
252                 unsafe {
253                     let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
254                     ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
255                 }
256             }
257 
258             let bytes = unsafe {
259                 mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
260             };
261             match body.read(bytes) {
262                 Ok(0) => {
263                     // The buffer was empty and nothing's left to
264                     // read. Return.
265                     return Ok(());
266                 }
267                 Ok(n) => unsafe {
268                     buf.advance_mut(n);
269                 },
270                 Err(e) => {
271                     tx.take().expect("tx only taken on error").abort();
272                     return Err(crate::error::body(e));
273                 }
274             }
275         }
276 
277         // The only way to get here is when the buffer is not empty.
278         // We can check the transmission channel
279 
280         let buf_len = buf.len() as u64;
281         tx.as_mut()
282             .expect("tx only taken on error")
283             .send_data(buf.split().freeze())
284             .await
285             .map_err(crate::error::body)?;
286 
287         written += buf_len;
288     }
289 }
290 
291 impl Sender {
292     // A `Future` that may do blocking read calls.
293     // As a `Future`, this integrates easily with `wait::timeout`.
send(self) -> impl Future<Output = Result<(), crate::Error>>294     pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
295         send_future(self)
296     }
297 }
298 
299 // useful for tests, but not publicly exposed
300 #[cfg(test)]
read_to_string(mut body: Body) -> io::Result<String>301 pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
302     let mut s = String::new();
303     match body.kind {
304         Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
305         Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
306     }
307     .map(|_| s)
308 }
309