1 use crate::stream::Fuse; 2 use futures_core::stream::{Stream, FusedStream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_project::{pin_project, project}; 7 use core::mem; 8 use core::pin::Pin; 9 use alloc::vec::Vec; 10 11 /// Stream for the [`chunks`](super::StreamExt::chunks) method. 12 #[pin_project] 13 #[derive(Debug)] 14 #[must_use = "streams do nothing unless polled"] 15 pub struct Chunks<St: Stream> { 16 #[pin] 17 stream: Fuse<St>, 18 items: Vec<St::Item>, 19 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 20 } 21 22 impl<St: Stream> Chunks<St> where St: Stream { new(stream: St, capacity: usize) -> Chunks<St>23 pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> { 24 assert!(capacity > 0); 25 26 Chunks { 27 stream: super::Fuse::new(stream), 28 items: Vec::with_capacity(capacity), 29 cap: capacity, 30 } 31 } 32 take(self: Pin<&mut Self>) -> Vec<St::Item>33 fn take(self: Pin<&mut Self>) -> Vec<St::Item> { 34 let cap = self.cap; 35 mem::replace(self.project().items, Vec::with_capacity(cap)) 36 } 37 38 delegate_access_inner!(stream, St, (.)); 39 } 40 41 impl<St: Stream> Stream for Chunks<St> { 42 type Item = Vec<St::Item>; 43 44 #[project] poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>45 fn poll_next( 46 mut self: Pin<&mut Self>, 47 cx: &mut Context<'_>, 48 ) -> Poll<Option<Self::Item>> { 49 #[project] 50 let Chunks { mut stream, items, cap } = self.as_mut().project(); 51 loop { 52 match ready!(stream.as_mut().poll_next(cx)) { 53 // Push the item into the buffer and check whether it is full. 54 // If so, replace our buffer with a new and empty one and return 55 // the full one. 56 Some(item) => { 57 items.push(item); 58 if items.len() >= *cap { 59 return Poll::Ready(Some(self.take())) 60 } 61 } 62 63 // Since the underlying stream ran out of values, return what we 64 // have buffered, if we have anything. 65 None => { 66 let last = if items.is_empty() { 67 None 68 } else { 69 let full_buf = mem::replace(items, Vec::new()); 70 Some(full_buf) 71 }; 72 73 return Poll::Ready(last); 74 } 75 } 76 } 77 } 78 size_hint(&self) -> (usize, Option<usize>)79 fn size_hint(&self) -> (usize, Option<usize>) { 80 let chunk_len = if self.items.is_empty() { 0 } else { 1 }; 81 let (lower, upper) = self.stream.size_hint(); 82 let lower = lower.saturating_add(chunk_len); 83 let upper = match upper { 84 Some(x) => x.checked_add(chunk_len), 85 None => None, 86 }; 87 (lower, upper) 88 } 89 } 90 91 impl<St: FusedStream> FusedStream for Chunks<St> { is_terminated(&self) -> bool92 fn is_terminated(&self) -> bool { 93 self.stream.is_terminated() && self.items.is_empty() 94 } 95 } 96 97 // Forwarding impl of Sink from the underlying stream 98 #[cfg(feature = "sink")] 99 impl<S, Item> Sink<Item> for Chunks<S> 100 where 101 S: Stream + Sink<Item>, 102 { 103 type Error = S::Error; 104 105 delegate_sink!(stream, Item); 106 } 107