1 use crate::stream::Fuse; 2 use alloc::vec::Vec; 3 use core::mem; 4 use core::pin::Pin; 5 use futures_core::ready; 6 use futures_core::stream::{FusedStream, Stream}; 7 use futures_core::task::{Context, Poll}; 8 #[cfg(feature = "sink")] 9 use futures_sink::Sink; 10 use pin_project_lite::pin_project; 11 12 pin_project! { 13 /// Stream for the [`chunks`](super::StreamExt::chunks) method. 14 #[derive(Debug)] 15 #[must_use = "streams do nothing unless polled"] 16 pub struct Chunks<St: Stream> { 17 #[pin] 18 stream: Fuse<St>, 19 items: Vec<St::Item>, 20 cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 21 } 22 } 23 24 impl<St: Stream> Chunks<St> 25 where 26 St: Stream, 27 { new(stream: St, capacity: usize) -> Self28 pub(super) fn new(stream: St, capacity: usize) -> Self { 29 assert!(capacity > 0); 30 31 Self { 32 stream: super::Fuse::new(stream), 33 items: Vec::with_capacity(capacity), 34 cap: capacity, 35 } 36 } 37 take(self: Pin<&mut Self>) -> Vec<St::Item>38 fn take(self: Pin<&mut Self>) -> Vec<St::Item> { 39 let cap = self.cap; 40 mem::replace(self.project().items, Vec::with_capacity(cap)) 41 } 42 43 delegate_access_inner!(stream, St, (.)); 44 } 45 46 impl<St: Stream> Stream for Chunks<St> { 47 type Item = Vec<St::Item>; 48 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 50 let mut this = self.as_mut().project(); 51 loop { 52 match ready!(this.stream.as_mut().poll_next(cx)) { 53 // Push the item into the buffer and check whether it is full. 54 // If so, replace our buffer with a new and empty one and return 55 // the full one. 56 Some(item) => { 57 this.items.push(item); 58 if this.items.len() >= *this.cap { 59 return Poll::Ready(Some(self.take())); 60 } 61 } 62 63 // Since the underlying stream ran out of values, return what we 64 // have buffered, if we have anything. 65 None => { 66 let last = if this.items.is_empty() { 67 None 68 } else { 69 let full_buf = mem::replace(this.items, Vec::new()); 70 Some(full_buf) 71 }; 72 73 return Poll::Ready(last); 74 } 75 } 76 } 77 } 78 size_hint(&self) -> (usize, Option<usize>)79 fn size_hint(&self) -> (usize, Option<usize>) { 80 let chunk_len = if self.items.is_empty() { 0 } else { 1 }; 81 let (lower, upper) = self.stream.size_hint(); 82 let lower = lower.saturating_add(chunk_len); 83 let upper = match upper { 84 Some(x) => x.checked_add(chunk_len), 85 None => None, 86 }; 87 (lower, upper) 88 } 89 } 90 91 impl<St: FusedStream> FusedStream for Chunks<St> { is_terminated(&self) -> bool92 fn is_terminated(&self) -> bool { 93 self.stream.is_terminated() && self.items.is_empty() 94 } 95 } 96 97 // Forwarding impl of Sink from the underlying stream 98 #[cfg(feature = "sink")] 99 impl<S, Item> Sink<Item> for Chunks<S> 100 where 101 S: Stream + Sink<Item>, 102 { 103 type Error = S::Error; 104 105 delegate_sink!(stream, Item); 106 } 107