1 use std::fmt; 2 3 use bytes::Bytes; 4 use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; 5 use futures::sync::{mpsc, oneshot}; 6 #[cfg(feature = "tokio-proto")] 7 use tokio_proto; 8 use std::borrow::Cow; 9 10 use common::Never; 11 use super::Chunk; 12 13 #[cfg(feature = "tokio-proto")] 14 pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>; 15 pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>; 16 17 /// A `Stream` for `Chunk`s used in requests and responses. 18 #[must_use = "streams do nothing unless polled"] 19 pub struct Body { 20 kind: Kind, 21 /// Allow the client to pass a future to delay the `Body` from returning 22 /// EOF. This allows the `Client` to try to put the idle connection 23 /// back into the pool before the body is "finished". 24 /// 25 /// The reason for this is so that creating a new request after finishing 26 /// streaming the body of a response could sometimes result in creating 27 /// a brand new connection, since the pool didn't know about the idle 28 /// connection yet. 29 delayed_eof: Option<DelayEof>, 30 } 31 32 #[derive(Debug)] 33 enum Kind { 34 #[cfg(feature = "tokio-proto")] 35 Tokio(TokioBody), 36 Chan { 37 close_tx: oneshot::Sender<bool>, 38 rx: mpsc::Receiver<Result<Chunk, ::Error>>, 39 }, 40 Once(Option<Chunk>), 41 Empty, 42 } 43 44 type DelayEofUntil = oneshot::Receiver<Never>; 45 46 enum DelayEof { 47 /// Initial state, stream hasn't seen EOF yet. 48 NotEof(DelayEofUntil), 49 /// Transitions to this state once we've seen `poll` try to 50 /// return EOF (`None`). This future is then polled, and 51 /// when it completes, the Body finally returns EOF (`None`). 52 Eof(DelayEofUntil), 53 } 54 55 //pub(crate) 56 #[derive(Debug)] 57 pub struct ChunkSender { 58 close_rx: oneshot::Receiver<bool>, 59 close_rx_check: bool, 60 tx: BodySender, 61 } 62 63 impl Body { 64 /// Return an empty body stream 65 #[inline] 66 pub fn empty() -> Body { 67 Body::new(Kind::Empty) 68 } 69 70 /// Return a body stream with an associated sender half 71 #[inline] 72 pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) { 73 let (tx, rx) = channel(); 74 (tx.tx, rx) 75 } 76 77 /// Returns if this body was constructed via `Body::empty()`. 78 /// 79 /// # Note 80 /// 81 /// This does **not** detect if the body stream may be at the end, or 82 /// if the stream will not yield any chunks, in all cases. For instance, 83 /// a streaming body using `chunked` encoding is not able to tell if 84 /// there are more chunks immediately. 85 #[inline] 86 pub fn is_empty(&self) -> bool { 87 match self.kind { 88 Kind::Empty => true, 89 _ => false, 90 } 91 } 92 93 fn new(kind: Kind) -> Body { 94 Body { 95 kind: kind, 96 delayed_eof: None, 97 } 98 } 99 100 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { 101 self.delayed_eof = Some(DelayEof::NotEof(fut)); 102 } 103 104 fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> { 105 match self.delayed_eof.take() { 106 Some(DelayEof::NotEof(mut delay)) => { 107 match self.poll_inner() { 108 ok @ Ok(Async::Ready(Some(..))) | 109 ok @ Ok(Async::NotReady) => { 110 self.delayed_eof = Some(DelayEof::NotEof(delay)); 111 ok 112 }, 113 Ok(Async::Ready(None)) => match delay.poll() { 114 Ok(Async::Ready(never)) => match never {}, 115 Ok(Async::NotReady) => { 116 self.delayed_eof = Some(DelayEof::Eof(delay)); 117 Ok(Async::NotReady) 118 }, 119 Err(_done) => { 120 Ok(Async::Ready(None)) 121 }, 122 }, 123 Err(e) => Err(e), 124 } 125 }, 126 Some(DelayEof::Eof(mut delay)) => { 127 match delay.poll() { 128 Ok(Async::Ready(never)) => match never {}, 129 Ok(Async::NotReady) => { 130 self.delayed_eof = Some(DelayEof::Eof(delay)); 131 Ok(Async::NotReady) 132 }, 133 Err(_done) => { 134 Ok(Async::Ready(None)) 135 }, 136 } 137 }, 138 None => self.poll_inner(), 139 } 140 } 141 142 fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> { 143 match self.kind { 144 #[cfg(feature = "tokio-proto")] 145 Kind::Tokio(ref mut rx) => rx.poll(), 146 Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { 147 Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), 148 Async::Ready(Some(Err(err))) => Err(err), 149 Async::Ready(None) => Ok(Async::Ready(None)), 150 Async::NotReady => Ok(Async::NotReady), 151 }, 152 Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), 153 Kind::Empty => Ok(Async::Ready(None)), 154 } 155 } 156 } 157 158 impl Default for Body { 159 #[inline] 160 fn default() -> Body { 161 Body::empty() 162 } 163 } 164 165 impl Stream for Body { 166 type Item = Chunk; 167 type Error = ::Error; 168 169 #[inline] 170 fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> { 171 self.poll_eof() 172 } 173 } 174 175 impl fmt::Debug for Body { 176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 177 f.debug_tuple("Body") 178 .field(&self.kind) 179 .finish() 180 } 181 } 182 183 //pub(crate) 184 pub fn channel() -> (ChunkSender, Body) { 185 let (tx, rx) = mpsc::channel(0); 186 let (close_tx, close_rx) = oneshot::channel(); 187 188 let tx = ChunkSender { 189 close_rx: close_rx, 190 close_rx_check: true, 191 tx: tx, 192 }; 193 let rx = Body::new(Kind::Chan { 194 close_tx: close_tx, 195 rx: rx, 196 }); 197 198 (tx, rx) 199 } 200 201 impl ChunkSender { 202 pub fn poll_ready(&mut self) -> Poll<(), ()> { 203 if self.close_rx_check { 204 match self.close_rx.poll() { 205 Ok(Async::Ready(true)) | Err(_) => return Err(()), 206 Ok(Async::Ready(false)) => { 207 // needed to allow converting into a plain mpsc::Receiver 208 // if it has been, the tx will send false to disable this check 209 self.close_rx_check = false; 210 } 211 Ok(Async::NotReady) => (), 212 } 213 } 214 215 self.tx.poll_ready().map_err(|_| ()) 216 } 217 218 pub fn start_send(&mut self, msg: Result<Chunk, ::Error>) -> StartSend<(), ()> { 219 match self.tx.start_send(msg) { 220 Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), 221 Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())), 222 Err(_) => Err(()), 223 } 224 } 225 } 226 227 feat_server_proto! { 228 impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> { 229 fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> { 230 match b.kind { 231 Kind::Tokio(b) => b, 232 Kind::Chan { close_tx, rx } => { 233 // disable knowing if the Rx gets dropped, since we cannot 234 // pass this tx along. 235 let _ = close_tx.send(false); 236 rx.into() 237 }, 238 Kind::Once(Some(chunk)) => TokioBody::from(chunk), 239 Kind::Once(None) | 240 Kind::Empty => TokioBody::empty(), 241 } 242 } 243 } 244 245 impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body { 246 fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body { 247 Body::new(Kind::Tokio(tokio_body)) 248 } 249 } 250 } 251 252 impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body { 253 #[inline] 254 fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body { 255 let (tx, _) = oneshot::channel(); 256 Body::new(Kind::Chan { 257 close_tx: tx, 258 rx: src, 259 }) 260 } 261 } 262 263 impl From<Chunk> for Body { 264 #[inline] 265 fn from (chunk: Chunk) -> Body { 266 Body::new(Kind::Once(Some(chunk))) 267 } 268 } 269 270 impl From<Bytes> for Body { 271 #[inline] 272 fn from (bytes: Bytes) -> Body { 273 Body::from(Chunk::from(bytes)) 274 } 275 } 276 277 impl From<Vec<u8>> for Body { 278 #[inline] 279 fn from (vec: Vec<u8>) -> Body { 280 Body::from(Chunk::from(vec)) 281 } 282 } 283 284 impl From<&'static [u8]> for Body { 285 #[inline] 286 fn from (slice: &'static [u8]) -> Body { 287 Body::from(Chunk::from(slice)) 288 } 289 } 290 291 impl From<Cow<'static, [u8]>> for Body { 292 #[inline] 293 fn from (cow: Cow<'static, [u8]>) -> Body { 294 match cow { 295 Cow::Borrowed(b) => Body::from(b), 296 Cow::Owned(o) => Body::from(o) 297 } 298 } 299 } 300 301 impl From<String> for Body { 302 #[inline] 303 fn from (s: String) -> Body { 304 Body::from(Chunk::from(s.into_bytes())) 305 } 306 } 307 308 impl From<&'static str> for Body { 309 #[inline] 310 fn from(slice: &'static str) -> Body { 311 Body::from(Chunk::from(slice.as_bytes())) 312 } 313 } 314 315 impl From<Cow<'static, str>> for Body { 316 #[inline] 317 fn from(cow: Cow<'static, str>) -> Body { 318 match cow { 319 Cow::Borrowed(b) => Body::from(b), 320 Cow::Owned(o) => Body::from(o) 321 } 322 } 323 } 324 325 impl From<Option<Body>> for Body { 326 #[inline] 327 fn from (body: Option<Body>) -> Body { 328 body.unwrap_or_default() 329 } 330 } 331 332 fn _assert_send_sync() { 333 fn _assert_send<T: Send>() {} 334 fn _assert_sync<T: Sync>() {} 335 336 _assert_send::<Body>(); 337 _assert_send::<Chunk>(); 338 _assert_sync::<Chunk>(); 339 } 340 341 #[test] 342 fn test_body_stream_concat() { 343 use futures::{Sink, Stream, Future}; 344 let (tx, body) = Body::pair(); 345 346 ::std::thread::spawn(move || { 347 let tx = tx.send(Ok("hello ".into())).wait().unwrap(); 348 tx.send(Ok("world".into())).wait().unwrap(); 349 }); 350 351 let total = body.concat2().wait().unwrap(); 352 assert_eq!(total.as_ref(), b"hello world"); 353 354 } 355