1 use crate::stream::Fuse; 2 use futures_core::stream::{Stream, FusedStream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project::{pin_project, project}; 7 use core::mem; 8 use core::pin::Pin; 9 use alloc::vec::Vec; 10 11 /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. 12 #[pin_project] 13 #[derive(Debug)] 14 #[must_use = "streams do nothing unless polled"] 15 pub struct ReadyChunks<St: Stream> { 16 #[pin] 17 stream: Fuse<St>, 18 items: Vec<St::Item>, 19 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 20 } 21 22 impl<St: Stream> ReadyChunks<St> where St: Stream { new(stream: St, capacity: usize) -> ReadyChunks<St>23 pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> { 24 assert!(capacity > 0); 25 26 ReadyChunks { 27 stream: super::Fuse::new(stream), 28 items: Vec::with_capacity(capacity), 29 cap: capacity, 30 } 31 } 32 33 delegate_access_inner!(stream, St, (.)); 34 } 35 36 impl<St: Stream> Stream for ReadyChunks<St> { 37 type Item = Vec<St::Item>; 38 39 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>40 fn poll_next( 41 self: Pin<&mut Self>, 42 cx: &mut Context<'_>, 43 ) -> Poll<Option<Self::Item>> { 44 #[project] 45 let ReadyChunks { items, cap, mut stream } = self.project(); 46 47 loop { 48 match stream.as_mut().poll_next(cx) { 49 // Flush all collected data if underlying stream doesn't contain 50 // more ready values 51 Poll::Pending => { 52 return if items.is_empty() { 53 Poll::Pending 54 } else { 55 Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) 56 } 57 } 58 59 // Push the ready item into the buffer and check whether it is full. 60 // If so, replace our buffer with a new and empty one and return 61 // the full one. 62 Poll::Ready(Some(item)) => { 63 items.push(item); 64 if items.len() >= *cap { 65 return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) 66 } 67 } 68 69 // Since the underlying stream ran out of values, return what we 70 // have buffered, if we have anything. 71 Poll::Ready(None) => { 72 let last = if items.is_empty() { 73 None 74 } else { 75 let full_buf = mem::replace(items, Vec::new()); 76 Some(full_buf) 77 }; 78 79 return Poll::Ready(last); 80 } 81 } 82 } 83 } 84 size_hint(&self) -> (usize, Option<usize>)85 fn size_hint(&self) -> (usize, Option<usize>) { 86 let chunk_len = if self.items.is_empty() { 0 } else { 1 }; 87 let (lower, upper) = self.stream.size_hint(); 88 let lower = lower.saturating_add(chunk_len); 89 let upper = match upper { 90 Some(x) => x.checked_add(chunk_len), 91 None => None, 92 }; 93 (lower, upper) 94 } 95 } 96 97 impl<St: FusedStream> FusedStream for ReadyChunks<St> { is_terminated(&self) -> bool98 fn is_terminated(&self) -> bool { 99 self.stream.is_terminated() && self.items.is_empty() 100 } 101 } 102 103 // Forwarding impl of Sink from the underlying stream 104 #[cfg(feature = "sink")] 105 impl<S, Item> Sink<Item> for ReadyChunks<S> 106 where 107 S: Stream + Sink<Item>, 108 { 109 type Error = S::Error; 110 111 delegate_sink!(stream, Item); 112 } 113