1 use std::fmt;
2 use std::fs::File;
3 use std::future::Future;
4 use std::io::{self, Cursor, Read};
5 use std::mem::{self, MaybeUninit};
6 use std::ptr;
7
8 use bytes::Bytes;
9
10 use crate::async_impl;
11
12 /// The body of a `Request`.
13 ///
14 /// In most cases, this is not needed directly, as the
15 /// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
16 /// passing many things (like a string or vector of bytes).
17 ///
18 /// [builder]: ./struct.RequestBuilder.html#method.body
19 #[derive(Debug)]
20 pub struct Body {
21 kind: Kind,
22 }
23
24 impl Body {
25 /// Instantiate a `Body` from a reader.
26 ///
27 /// # Note
28 ///
29 /// While allowing for many types to be used, these bodies do not have
30 /// a way to reset to the beginning and be reused. This means that when
31 /// encountering a 307 or 308 status code, instead of repeating the
32 /// request at the new location, the `Response` will be returned with
33 /// the redirect status code set.
34 ///
35 /// ```rust
36 /// # use std::fs::File;
37 /// # use reqwest::blocking::Body;
38 /// # fn run() -> Result<(), Box<std::error::Error>> {
39 /// let file = File::open("national_secrets.txt")?;
40 /// let body = Body::new(file);
41 /// # Ok(())
42 /// # }
43 /// ```
44 ///
45 /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
46 /// `From` implementations for `Body` will store the data in a manner
47 /// it can be reused.
48 ///
49 /// ```rust
50 /// # use reqwest::blocking::Body;
51 /// # fn run() -> Result<(), Box<std::error::Error>> {
52 /// let s = "A stringy body";
53 /// let body = Body::from(s);
54 /// # Ok(())
55 /// # }
56 /// ```
new<R: Read + Send + 'static>(reader: R) -> Body57 pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
58 Body {
59 kind: Kind::Reader(Box::from(reader), None),
60 }
61 }
62
63 /// Create a `Body` from a `Read` where the size is known in advance
64 /// but the data should not be fully loaded into memory. This will
65 /// set the `Content-Length` header and stream from the `Read`.
66 ///
67 /// ```rust
68 /// # use std::fs::File;
69 /// # use reqwest::blocking::Body;
70 /// # fn run() -> Result<(), Box<std::error::Error>> {
71 /// let file = File::open("a_large_file.txt")?;
72 /// let file_size = file.metadata()?.len();
73 /// let body = Body::sized(file, file_size);
74 /// # Ok(())
75 /// # }
76 /// ```
sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body77 pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
78 Body {
79 kind: Kind::Reader(Box::from(reader), Some(len)),
80 }
81 }
82
len(&self) -> Option<u64>83 pub(crate) fn len(&self) -> Option<u64> {
84 match self.kind {
85 Kind::Reader(_, len) => len,
86 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
87 }
88 }
89
into_reader(self) -> Reader90 pub(crate) fn into_reader(self) -> Reader {
91 match self.kind {
92 Kind::Reader(r, _) => Reader::Reader(r),
93 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
94 }
95 }
96
into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>)97 pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
98 match self.kind {
99 Kind::Reader(read, len) => {
100 let (tx, rx) = hyper::Body::channel();
101 let tx = Sender {
102 body: (read, len),
103 tx,
104 };
105 (Some(tx), async_impl::Body::wrap(rx), len)
106 }
107 Kind::Bytes(chunk) => {
108 let len = chunk.len() as u64;
109 (None, async_impl::Body::reusable(chunk), Some(len))
110 }
111 }
112 }
113
try_clone(&self) -> Option<Body>114 pub(crate) fn try_clone(&self) -> Option<Body> {
115 self.kind.try_clone().map(|kind| Body { kind })
116 }
117 }
118
119 enum Kind {
120 Reader(Box<dyn Read + Send>, Option<u64>),
121 Bytes(Bytes),
122 }
123
124 impl Kind {
try_clone(&self) -> Option<Kind>125 fn try_clone(&self) -> Option<Kind> {
126 match self {
127 Kind::Reader(..) => None,
128 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
129 }
130 }
131 }
132
133 impl From<Vec<u8>> for Body {
134 #[inline]
from(v: Vec<u8>) -> Body135 fn from(v: Vec<u8>) -> Body {
136 Body {
137 kind: Kind::Bytes(v.into()),
138 }
139 }
140 }
141
142 impl From<String> for Body {
143 #[inline]
from(s: String) -> Body144 fn from(s: String) -> Body {
145 s.into_bytes().into()
146 }
147 }
148
149 impl From<&'static [u8]> for Body {
150 #[inline]
from(s: &'static [u8]) -> Body151 fn from(s: &'static [u8]) -> Body {
152 Body {
153 kind: Kind::Bytes(Bytes::from_static(s)),
154 }
155 }
156 }
157
158 impl From<&'static str> for Body {
159 #[inline]
from(s: &'static str) -> Body160 fn from(s: &'static str) -> Body {
161 s.as_bytes().into()
162 }
163 }
164
165 impl From<File> for Body {
166 #[inline]
from(f: File) -> Body167 fn from(f: File) -> Body {
168 let len = f.metadata().map(|m| m.len()).ok();
169 Body {
170 kind: Kind::Reader(Box::new(f), len),
171 }
172 }
173 }
174
175 impl fmt::Debug for Kind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 match *self {
178 Kind::Reader(_, ref v) => f
179 .debug_struct("Reader")
180 .field("length", &DebugLength(v))
181 .finish(),
182 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
183 }
184 }
185 }
186
187 struct DebugLength<'a>(&'a Option<u64>);
188
189 impl<'a> fmt::Debug for DebugLength<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result190 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191 match *self.0 {
192 Some(ref len) => fmt::Debug::fmt(len, f),
193 None => f.write_str("Unknown"),
194 }
195 }
196 }
197
198 pub(crate) enum Reader {
199 Reader(Box<dyn Read + Send>),
200 Bytes(Cursor<Bytes>),
201 }
202
203 impl Read for Reader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>204 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
205 match *self {
206 Reader::Reader(ref mut rdr) => rdr.read(buf),
207 Reader::Bytes(ref mut rdr) => rdr.read(buf),
208 }
209 }
210 }
211
212 pub(crate) struct Sender {
213 body: (Box<dyn Read + Send>, Option<u64>),
214 tx: hyper::body::Sender,
215 }
216
send_future(sender: Sender) -> Result<(), crate::Error>217 async fn send_future(sender: Sender) -> Result<(), crate::Error> {
218 use bytes::{BufMut, BytesMut};
219 use std::cmp;
220
221 let con_len = sender.body.1;
222 let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
223 let mut written = 0;
224 let mut buf = BytesMut::with_capacity(cap as usize);
225 let mut body = sender.body.0;
226 // Put in an option so that it can be consumed on error to call abort()
227 let mut tx = Some(sender.tx);
228
229 loop {
230 if Some(written) == con_len {
231 // Written up to content-length, so stop.
232 return Ok(());
233 }
234
235 // The input stream is read only if the buffer is empty so
236 // that there is only one read in the buffer at any time.
237 //
238 // We need to know whether there is any data to send before
239 // we check the transmission channel (with poll_ready below)
240 // because somestimes the receiver disappears as soon as is
241 // considers the data is completely transmitted, which may
242 // be true.
243 //
244 // The use case is a web server that closes its
245 // input stream as soon as the data received is valid JSON.
246 // This behaviour is questionable, but it exists and the
247 // fact is that there is actually no remaining data to read.
248 if buf.is_empty() {
249 if buf.remaining_mut() == 0 {
250 buf.reserve(8192);
251 // zero out the reserved memory
252 unsafe {
253 let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
254 ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
255 }
256 }
257
258 let bytes = unsafe {
259 mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
260 };
261 match body.read(bytes) {
262 Ok(0) => {
263 // The buffer was empty and nothing's left to
264 // read. Return.
265 return Ok(());
266 }
267 Ok(n) => unsafe {
268 buf.advance_mut(n);
269 },
270 Err(e) => {
271 tx.take().expect("tx only taken on error").abort();
272 return Err(crate::error::body(e));
273 }
274 }
275 }
276
277 // The only way to get here is when the buffer is not empty.
278 // We can check the transmission channel
279
280 let buf_len = buf.len() as u64;
281 tx.as_mut()
282 .expect("tx only taken on error")
283 .send_data(buf.split().freeze())
284 .await
285 .map_err(crate::error::body)?;
286
287 written += buf_len;
288 }
289 }
290
291 impl Sender {
292 // A `Future` that may do blocking read calls.
293 // As a `Future`, this integrates easily with `wait::timeout`.
send(self) -> impl Future<Output = Result<(), crate::Error>>294 pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
295 send_future(self)
296 }
297 }
298
299 // useful for tests, but not publicly exposed
300 #[cfg(test)]
read_to_string(mut body: Body) -> io::Result<String>301 pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
302 let mut s = String::new();
303 match body.kind {
304 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
305 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
306 }
307 .map(|_| s)
308 }
309