1 use std::fmt;
2 use std::fs::File;
3 use std::future::Future;
4 use std::io::{self, Cursor, Read};
5 use std::mem::{self, MaybeUninit};
6 use std::ptr;
7
8 use bytes::Bytes;
9
10 use crate::async_impl;
11
12 /// The body of a `Request`.
13 ///
14 /// In most cases, this is not needed directly, as the
15 /// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
16 /// passing many things (like a string or vector of bytes).
17 ///
18 /// [builder]: ./struct.RequestBuilder.html#method.body
19 #[derive(Debug)]
20 pub struct Body {
21 kind: Kind,
22 }
23
24 impl Body {
25 /// Instantiate a `Body` from a reader.
26 ///
27 /// # Note
28 ///
29 /// While allowing for many types to be used, these bodies do not have
30 /// a way to reset to the beginning and be reused. This means that when
31 /// encountering a 307 or 308 status code, instead of repeating the
32 /// request at the new location, the `Response` will be returned with
33 /// the redirect status code set.
34 ///
35 /// ```rust
36 /// # use std::fs::File;
37 /// # use reqwest::blocking::Body;
38 /// # fn run() -> Result<(), Box<std::error::Error>> {
39 /// let file = File::open("national_secrets.txt")?;
40 /// let body = Body::new(file);
41 /// # Ok(())
42 /// # }
43 /// ```
44 ///
45 /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
46 /// `From` implementations for `Body` will store the data in a manner
47 /// it can be reused.
48 ///
49 /// ```rust
50 /// # use reqwest::blocking::Body;
51 /// # fn run() -> Result<(), Box<std::error::Error>> {
52 /// let s = "A stringy body";
53 /// let body = Body::from(s);
54 /// # Ok(())
55 /// # }
56 /// ```
new<R: Read + Send + 'static>(reader: R) -> Body57 pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
58 Body {
59 kind: Kind::Reader(Box::from(reader), None),
60 }
61 }
62
63 /// Create a `Body` from a `Read` where the size is known in advance
64 /// but the data should not be fully loaded into memory. This will
65 /// set the `Content-Length` header and stream from the `Read`.
66 ///
67 /// ```rust
68 /// # use std::fs::File;
69 /// # use reqwest::blocking::Body;
70 /// # fn run() -> Result<(), Box<std::error::Error>> {
71 /// let file = File::open("a_large_file.txt")?;
72 /// let file_size = file.metadata()?.len();
73 /// let body = Body::sized(file, file_size);
74 /// # Ok(())
75 /// # }
76 /// ```
sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body77 pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
78 Body {
79 kind: Kind::Reader(Box::from(reader), Some(len)),
80 }
81 }
82
83 /// Returns the body as a byte slice if the body is already buffered in
84 /// memory. For streamed requests this method returns `None`.
as_bytes(&self) -> Option<&[u8]>85 pub fn as_bytes(&self) -> Option<&[u8]> {
86 match self.kind {
87 Kind::Reader(_, _) => None,
88 Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
89 }
90 }
91
92 /// Converts streamed requests to their buffered equivalent and
93 /// returns a reference to the buffer. If the request is already
94 /// buffered, this has no effect.
95 ///
96 /// Be aware that for large requests this method is expensive
97 /// and may cause your program to run out of memory.
buffer(&mut self) -> Result<&[u8], crate::Error>98 pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
99 match self.kind {
100 Kind::Reader(ref mut reader, maybe_len) => {
101 let mut bytes = if let Some(len) = maybe_len {
102 Vec::with_capacity(len as usize)
103 } else {
104 Vec::new()
105 };
106 io::copy(reader, &mut bytes)
107 .map_err(crate::error::builder)?;
108 self.kind = Kind::Bytes(bytes.into());
109 self.buffer()
110 },
111 Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
112 }
113 }
114
len(&self) -> Option<u64>115 pub(crate) fn len(&self) -> Option<u64> {
116 match self.kind {
117 Kind::Reader(_, len) => len,
118 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
119 }
120 }
121
into_reader(self) -> Reader122 pub(crate) fn into_reader(self) -> Reader {
123 match self.kind {
124 Kind::Reader(r, _) => Reader::Reader(r),
125 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
126 }
127 }
128
into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>)129 pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
130 match self.kind {
131 Kind::Reader(read, len) => {
132 let (tx, rx) = hyper::Body::channel();
133 let tx = Sender {
134 body: (read, len),
135 tx,
136 };
137 (Some(tx), async_impl::Body::wrap(rx), len)
138 }
139 Kind::Bytes(chunk) => {
140 let len = chunk.len() as u64;
141 (None, async_impl::Body::reusable(chunk), Some(len))
142 }
143 }
144 }
145
try_clone(&self) -> Option<Body>146 pub(crate) fn try_clone(&self) -> Option<Body> {
147 self.kind.try_clone().map(|kind| Body { kind })
148 }
149 }
150
151 enum Kind {
152 Reader(Box<dyn Read + Send>, Option<u64>),
153 Bytes(Bytes),
154 }
155
156 impl Kind {
try_clone(&self) -> Option<Kind>157 fn try_clone(&self) -> Option<Kind> {
158 match self {
159 Kind::Reader(..) => None,
160 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
161 }
162 }
163 }
164
165 impl From<Vec<u8>> for Body {
166 #[inline]
from(v: Vec<u8>) -> Body167 fn from(v: Vec<u8>) -> Body {
168 Body {
169 kind: Kind::Bytes(v.into()),
170 }
171 }
172 }
173
174 impl From<String> for Body {
175 #[inline]
from(s: String) -> Body176 fn from(s: String) -> Body {
177 s.into_bytes().into()
178 }
179 }
180
181 impl From<&'static [u8]> for Body {
182 #[inline]
from(s: &'static [u8]) -> Body183 fn from(s: &'static [u8]) -> Body {
184 Body {
185 kind: Kind::Bytes(Bytes::from_static(s)),
186 }
187 }
188 }
189
190 impl From<&'static str> for Body {
191 #[inline]
from(s: &'static str) -> Body192 fn from(s: &'static str) -> Body {
193 s.as_bytes().into()
194 }
195 }
196
197 impl From<File> for Body {
198 #[inline]
from(f: File) -> Body199 fn from(f: File) -> Body {
200 let len = f.metadata().map(|m| m.len()).ok();
201 Body {
202 kind: Kind::Reader(Box::new(f), len),
203 }
204 }
205 }
206
207 impl fmt::Debug for Kind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result208 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
209 match *self {
210 Kind::Reader(_, ref v) => f
211 .debug_struct("Reader")
212 .field("length", &DebugLength(v))
213 .finish(),
214 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
215 }
216 }
217 }
218
219 struct DebugLength<'a>(&'a Option<u64>);
220
221 impl<'a> fmt::Debug for DebugLength<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result222 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
223 match *self.0 {
224 Some(ref len) => fmt::Debug::fmt(len, f),
225 None => f.write_str("Unknown"),
226 }
227 }
228 }
229
230 pub(crate) enum Reader {
231 Reader(Box<dyn Read + Send>),
232 Bytes(Cursor<Bytes>),
233 }
234
235 impl Read for Reader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>236 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
237 match *self {
238 Reader::Reader(ref mut rdr) => rdr.read(buf),
239 Reader::Bytes(ref mut rdr) => rdr.read(buf),
240 }
241 }
242 }
243
244 pub(crate) struct Sender {
245 body: (Box<dyn Read + Send>, Option<u64>),
246 tx: hyper::body::Sender,
247 }
248
send_future(sender: Sender) -> Result<(), crate::Error>249 async fn send_future(sender: Sender) -> Result<(), crate::Error> {
250 use bytes::{BufMut, BytesMut};
251 use std::cmp;
252
253 let con_len = sender.body.1;
254 let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
255 let mut written = 0;
256 let mut buf = BytesMut::with_capacity(cap as usize);
257 let mut body = sender.body.0;
258 // Put in an option so that it can be consumed on error to call abort()
259 let mut tx = Some(sender.tx);
260
261 loop {
262 if Some(written) == con_len {
263 // Written up to content-length, so stop.
264 return Ok(());
265 }
266
267 // The input stream is read only if the buffer is empty so
268 // that there is only one read in the buffer at any time.
269 //
270 // We need to know whether there is any data to send before
271 // we check the transmission channel (with poll_ready below)
272 // because somestimes the receiver disappears as soon as is
273 // considers the data is completely transmitted, which may
274 // be true.
275 //
276 // The use case is a web server that closes its
277 // input stream as soon as the data received is valid JSON.
278 // This behaviour is questionable, but it exists and the
279 // fact is that there is actually no remaining data to read.
280 if buf.is_empty() {
281 if buf.remaining_mut() == 0 {
282 buf.reserve(8192);
283 // zero out the reserved memory
284 unsafe {
285 let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
286 ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
287 }
288 }
289
290 let bytes = unsafe {
291 mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
292 };
293 match body.read(bytes) {
294 Ok(0) => {
295 // The buffer was empty and nothing's left to
296 // read. Return.
297 return Ok(());
298 }
299 Ok(n) => unsafe {
300 buf.advance_mut(n);
301 },
302 Err(e) => {
303 tx.take().expect("tx only taken on error").abort();
304 return Err(crate::error::body(e));
305 }
306 }
307 }
308
309 // The only way to get here is when the buffer is not empty.
310 // We can check the transmission channel
311
312 let buf_len = buf.len() as u64;
313 tx.as_mut()
314 .expect("tx only taken on error")
315 .send_data(buf.split().freeze())
316 .await
317 .map_err(crate::error::body)?;
318
319 written += buf_len;
320 }
321 }
322
323 impl Sender {
324 // A `Future` that may do blocking read calls.
325 // As a `Future`, this integrates easily with `wait::timeout`.
send(self) -> impl Future<Output = Result<(), crate::Error>>326 pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
327 send_future(self)
328 }
329 }
330
331 // useful for tests, but not publicly exposed
332 #[cfg(test)]
read_to_string(mut body: Body) -> io::Result<String>333 pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
334 let mut s = String::new();
335 match body.kind {
336 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
337 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
338 }
339 .map(|_| s)
340 }
341