1 use std::fmt;
2 
3 use bytes::Bytes;
4 use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
5 use futures::sync::{mpsc, oneshot};
6 #[cfg(feature = "tokio-proto")]
7 use tokio_proto;
8 use std::borrow::Cow;
9 
10 use common::Never;
11 use super::Chunk;
12 
13 #[cfg(feature = "tokio-proto")]
14 pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
15 pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
16 
17 /// A `Stream` for `Chunk`s used in requests and responses.
18 #[must_use = "streams do nothing unless polled"]
19 pub struct Body {
20     kind: Kind,
21     /// Allow the client to pass a future to delay the `Body` from returning
22     /// EOF. This allows the `Client` to try to put the idle connection
23     /// back into the pool before the body is "finished".
24     ///
25     /// The reason for this is so that creating a new request after finishing
26     /// streaming the body of a response could sometimes result in creating
27     /// a brand new connection, since the pool didn't know about the idle
28     /// connection yet.
29     delayed_eof: Option<DelayEof>,
30 }
31 
32 #[derive(Debug)]
33 enum Kind {
34     #[cfg(feature = "tokio-proto")]
35     Tokio(TokioBody),
36     Chan {
37         close_tx: oneshot::Sender<bool>,
38         rx: mpsc::Receiver<Result<Chunk, ::Error>>,
39     },
40     Once(Option<Chunk>),
41     Empty,
42 }
43 
44 type DelayEofUntil = oneshot::Receiver<Never>;
45 
46 enum DelayEof {
47     /// Initial state, stream hasn't seen EOF yet.
48     NotEof(DelayEofUntil),
49     /// Transitions to this state once we've seen `poll` try to
50     /// return EOF (`None`). This future is then polled, and
51     /// when it completes, the Body finally returns EOF (`None`).
52     Eof(DelayEofUntil),
53 }
54 
55 //pub(crate)
56 #[derive(Debug)]
57 pub struct ChunkSender {
58     close_rx: oneshot::Receiver<bool>,
59     close_rx_check: bool,
60     tx: BodySender,
61 }
62 
63 impl Body {
64     /// Return an empty body stream
65     #[inline]
66     pub fn empty() -> Body {
67         Body::new(Kind::Empty)
68     }
69 
70     /// Return a body stream with an associated sender half
71     #[inline]
72     pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) {
73         let (tx, rx) = channel();
74         (tx.tx, rx)
75     }
76 
77     /// Returns if this body was constructed via `Body::empty()`.
78     ///
79     /// # Note
80     ///
81     /// This does **not** detect if the body stream may be at the end, or
82     /// if the stream will not yield any chunks, in all cases. For instance,
83     /// a streaming body using `chunked` encoding is not able to tell if
84     /// there are more chunks immediately.
85     #[inline]
86     pub fn is_empty(&self) -> bool {
87         match self.kind {
88             Kind::Empty => true,
89             _ => false,
90         }
91     }
92 
93     fn new(kind: Kind) -> Body {
94         Body {
95             kind: kind,
96             delayed_eof: None,
97         }
98     }
99 
100     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
101         self.delayed_eof = Some(DelayEof::NotEof(fut));
102     }
103 
104     fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
105         match self.delayed_eof.take() {
106             Some(DelayEof::NotEof(mut delay)) => {
107                 match self.poll_inner() {
108                     ok @ Ok(Async::Ready(Some(..))) |
109                     ok @ Ok(Async::NotReady) => {
110                         self.delayed_eof = Some(DelayEof::NotEof(delay));
111                         ok
112                     },
113                     Ok(Async::Ready(None)) => match delay.poll() {
114                         Ok(Async::Ready(never)) => match never {},
115                         Ok(Async::NotReady) => {
116                             self.delayed_eof = Some(DelayEof::Eof(delay));
117                             Ok(Async::NotReady)
118                         },
119                         Err(_done) => {
120                             Ok(Async::Ready(None))
121                         },
122                     },
123                     Err(e) => Err(e),
124                 }
125             },
126             Some(DelayEof::Eof(mut delay)) => {
127                 match delay.poll() {
128                     Ok(Async::Ready(never)) => match never {},
129                     Ok(Async::NotReady) => {
130                         self.delayed_eof = Some(DelayEof::Eof(delay));
131                         Ok(Async::NotReady)
132                     },
133                     Err(_done) => {
134                         Ok(Async::Ready(None))
135                     },
136                 }
137             },
138             None => self.poll_inner(),
139         }
140     }
141 
142     fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
143         match self.kind {
144             #[cfg(feature = "tokio-proto")]
145             Kind::Tokio(ref mut rx) => rx.poll(),
146             Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
147                 Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
148                 Async::Ready(Some(Err(err))) => Err(err),
149                 Async::Ready(None) => Ok(Async::Ready(None)),
150                 Async::NotReady => Ok(Async::NotReady),
151             },
152             Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
153             Kind::Empty => Ok(Async::Ready(None)),
154         }
155     }
156 }
157 
158 impl Default for Body {
159     #[inline]
160     fn default() -> Body {
161         Body::empty()
162     }
163 }
164 
165 impl Stream for Body {
166     type Item = Chunk;
167     type Error = ::Error;
168 
169     #[inline]
170     fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
171         self.poll_eof()
172     }
173 }
174 
175 impl fmt::Debug for Body {
176     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177         f.debug_tuple("Body")
178             .field(&self.kind)
179             .finish()
180     }
181 }
182 
183 //pub(crate)
184 pub fn channel() -> (ChunkSender, Body) {
185     let (tx, rx) = mpsc::channel(0);
186     let (close_tx, close_rx) = oneshot::channel();
187 
188     let tx = ChunkSender {
189         close_rx: close_rx,
190         close_rx_check: true,
191         tx: tx,
192     };
193     let rx = Body::new(Kind::Chan {
194         close_tx: close_tx,
195         rx: rx,
196     });
197 
198     (tx, rx)
199 }
200 
201 impl ChunkSender {
202     pub fn poll_ready(&mut self) -> Poll<(), ()> {
203         if self.close_rx_check {
204             match self.close_rx.poll() {
205                 Ok(Async::Ready(true)) | Err(_) => return Err(()),
206                 Ok(Async::Ready(false)) => {
207                     // needed to allow converting into a plain mpsc::Receiver
208                     // if it has been, the tx will send false to disable this check
209                     self.close_rx_check = false;
210                 }
211                 Ok(Async::NotReady) => (),
212             }
213         }
214 
215         self.tx.poll_ready().map_err(|_| ())
216     }
217 
218     pub fn start_send(&mut self, msg: Result<Chunk, ::Error>) -> StartSend<(), ()> {
219         match self.tx.start_send(msg) {
220             Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
221             Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())),
222             Err(_) => Err(()),
223         }
224     }
225 }
226 
227 feat_server_proto! {
228     impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
229         fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
230             match b.kind {
231                 Kind::Tokio(b) => b,
232                 Kind::Chan { close_tx, rx } => {
233                     // disable knowing if the Rx gets dropped, since we cannot
234                     // pass this tx along.
235                     let _ = close_tx.send(false);
236                     rx.into()
237                 },
238                 Kind::Once(Some(chunk)) => TokioBody::from(chunk),
239                 Kind::Once(None) |
240                 Kind::Empty => TokioBody::empty(),
241             }
242         }
243     }
244 
245     impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
246         fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
247             Body::new(Kind::Tokio(tokio_body))
248         }
249     }
250 }
251 
252 impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body {
253     #[inline]
254     fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body {
255         let (tx, _) = oneshot::channel();
256         Body::new(Kind::Chan {
257             close_tx: tx,
258             rx: src,
259         })
260     }
261 }
262 
263 impl From<Chunk> for Body {
264     #[inline]
265     fn from (chunk: Chunk) -> Body {
266         Body::new(Kind::Once(Some(chunk)))
267     }
268 }
269 
270 impl From<Bytes> for Body {
271     #[inline]
272     fn from (bytes: Bytes) -> Body {
273         Body::from(Chunk::from(bytes))
274     }
275 }
276 
277 impl From<Vec<u8>> for Body {
278     #[inline]
279     fn from (vec: Vec<u8>) -> Body {
280         Body::from(Chunk::from(vec))
281     }
282 }
283 
284 impl From<&'static [u8]> for Body {
285     #[inline]
286     fn from (slice: &'static [u8]) -> Body {
287         Body::from(Chunk::from(slice))
288     }
289 }
290 
291 impl From<Cow<'static, [u8]>> for Body {
292     #[inline]
293     fn from (cow: Cow<'static, [u8]>) -> Body {
294         match cow {
295             Cow::Borrowed(b) => Body::from(b),
296             Cow::Owned(o) => Body::from(o)
297         }
298     }
299 }
300 
301 impl From<String> for Body {
302     #[inline]
303     fn from (s: String) -> Body {
304         Body::from(Chunk::from(s.into_bytes()))
305     }
306 }
307 
308 impl From<&'static str> for Body {
309     #[inline]
310     fn from(slice: &'static str) -> Body {
311         Body::from(Chunk::from(slice.as_bytes()))
312     }
313 }
314 
315 impl From<Cow<'static, str>> for Body {
316     #[inline]
317     fn from(cow: Cow<'static, str>) -> Body {
318         match cow {
319             Cow::Borrowed(b) => Body::from(b),
320             Cow::Owned(o) => Body::from(o)
321         }
322     }
323 }
324 
325 impl From<Option<Body>> for Body {
326     #[inline]
327     fn from (body: Option<Body>) -> Body {
328         body.unwrap_or_default()
329     }
330 }
331 
332 fn _assert_send_sync() {
333     fn _assert_send<T: Send>() {}
334     fn _assert_sync<T: Sync>() {}
335 
336     _assert_send::<Body>();
337     _assert_send::<Chunk>();
338     _assert_sync::<Chunk>();
339 }
340 
341 #[test]
342 fn test_body_stream_concat() {
343     use futures::{Sink, Stream, Future};
344     let (tx, body) = Body::pair();
345 
346     ::std::thread::spawn(move || {
347         let tx = tx.send(Ok("hello ".into())).wait().unwrap();
348         tx.send(Ok("world".into())).wait().unwrap();
349     });
350 
351     let total = body.concat2().wait().unwrap();
352     assert_eq!(total.as_ref(), b"hello world");
353 
354 }
355