1 use futures_core::ready; 2 use futures_core::task::{Context, Poll}; 3 #[cfg(feature = "read-initializer")] 4 use futures_io::Initializer; 5 use futures_io::{AsyncBufRead, AsyncRead}; 6 use pin_project_lite::pin_project; 7 use std::pin::Pin; 8 use std::{cmp, io}; 9 10 pin_project! { 11 /// Reader for the [`take`](super::AsyncReadExt::take) method. 12 #[derive(Debug)] 13 #[must_use = "readers do nothing unless you `.await` or poll them"] 14 pub struct Take<R> { 15 #[pin] 16 inner: R, 17 limit: u64, 18 } 19 } 20 21 impl<R: AsyncRead> Take<R> { new(inner: R, limit: u64) -> Self22 pub(super) fn new(inner: R, limit: u64) -> Self { 23 Self { inner, limit } 24 } 25 26 /// Returns the remaining number of bytes that can be 27 /// read before this instance will return EOF. 28 /// 29 /// # Note 30 /// 31 /// This instance may reach `EOF` after reading fewer bytes than indicated by 32 /// this method if the underlying [`AsyncRead`] instance reaches EOF. 33 /// 34 /// # Examples 35 /// 36 /// ``` 37 /// # futures::executor::block_on(async { 38 /// use futures::io::{AsyncReadExt, Cursor}; 39 /// 40 /// let reader = Cursor::new(&b"12345678"[..]); 41 /// let mut buffer = [0; 2]; 42 /// 43 /// let mut take = reader.take(4); 44 /// let n = take.read(&mut buffer).await?; 45 /// 46 /// assert_eq!(take.limit(), 2); 47 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 48 /// ``` limit(&self) -> u6449 pub fn limit(&self) -> u64 { 50 self.limit 51 } 52 53 /// Sets the number of bytes that can be read before this instance will 54 /// return EOF. This is the same as constructing a new `Take` instance, so 55 /// the amount of bytes read and the previous limit value don't matter when 56 /// calling this method. 57 /// 58 /// # Examples 59 /// 60 /// ``` 61 /// # futures::executor::block_on(async { 62 /// use futures::io::{AsyncReadExt, Cursor}; 63 /// 64 /// let reader = Cursor::new(&b"12345678"[..]); 65 /// let mut buffer = [0; 4]; 66 /// 67 /// let mut take = reader.take(4); 68 /// let n = take.read(&mut buffer).await?; 69 /// 70 /// assert_eq!(n, 4); 71 /// assert_eq!(take.limit(), 0); 72 /// 73 /// take.set_limit(10); 74 /// let n = take.read(&mut buffer).await?; 75 /// assert_eq!(n, 4); 76 /// 77 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 78 /// ``` set_limit(&mut self, limit: u64)79 pub fn set_limit(&mut self, limit: u64) { 80 self.limit = limit 81 } 82 83 delegate_access_inner!(inner, R, ()); 84 } 85 86 impl<R: AsyncRead> AsyncRead for Take<R> { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>>87 fn poll_read( 88 self: Pin<&mut Self>, 89 cx: &mut Context<'_>, 90 buf: &mut [u8], 91 ) -> Poll<Result<usize, io::Error>> { 92 let this = self.project(); 93 94 if *this.limit == 0 { 95 return Poll::Ready(Ok(0)); 96 } 97 98 let max = cmp::min(buf.len() as u64, *this.limit) as usize; 99 let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?; 100 *this.limit -= n as u64; 101 Poll::Ready(Ok(n)) 102 } 103 104 #[cfg(feature = "read-initializer")] initializer(&self) -> Initializer105 unsafe fn initializer(&self) -> Initializer { 106 self.inner.initializer() 107 } 108 } 109 110 impl<R: AsyncBufRead> AsyncBufRead for Take<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>111 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 112 let this = self.project(); 113 114 // Don't call into inner reader at all at EOF because it may still block 115 if *this.limit == 0 { 116 return Poll::Ready(Ok(&[])); 117 } 118 119 let buf = ready!(this.inner.poll_fill_buf(cx)?); 120 let cap = cmp::min(buf.len() as u64, *this.limit) as usize; 121 Poll::Ready(Ok(&buf[..cap])) 122 } 123 consume(self: Pin<&mut Self>, amt: usize)124 fn consume(self: Pin<&mut Self>, amt: usize) { 125 let this = self.project(); 126 127 // Don't let callers reset the limit by passing an overlarge value 128 let amt = cmp::min(amt as u64, *this.limit) as usize; 129 *this.limit -= amt as u64; 130 this.inner.consume(amt); 131 } 132 } 133