1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2 //! and the `AsyncRead` and `AsyncWrite` traits.
3 
4 #![cfg_attr(feature = "read-initializer", feature(read_initializer))]
5 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
6 #![cfg_attr(not(feature = "std"), no_std)]
7 #![warn(
8     missing_debug_implementations,
9     missing_docs,
10     rust_2018_idioms,
11     single_use_lifetimes,
12     unreachable_pub
13 )]
14 #![doc(test(
15     no_crate_inject,
16     attr(
17         deny(warnings, rust_2018_idioms, single_use_lifetimes),
18         allow(dead_code, unused_assignments, unused_variables)
19     )
20 ))]
21 #![cfg_attr(docsrs, feature(doc_cfg))]
22 
23 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
24 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
25 
26 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
27 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
28 
29 #[cfg(feature = "alloc")]
30 extern crate alloc;
31 
32 // Macro re-exports
33 pub use futures_core::ready;
34 pub use pin_utils::pin_mut;
35 
36 #[cfg(feature = "async-await")]
37 #[macro_use]
38 mod async_await;
39 #[cfg(feature = "async-await")]
40 #[doc(hidden)]
41 pub use self::async_await::*;
42 
43 // Not public API.
44 #[cfg(feature = "async-await")]
45 #[doc(hidden)]
46 pub mod __private {
47     pub use crate::*;
48     pub use core::{
49         option::Option::{self, None, Some},
50         pin::Pin,
51         result::Result::{Err, Ok},
52     };
53 
54     pub mod async_await {
55         pub use crate::async_await::*;
56     }
57 }
58 
59 #[cfg(feature = "sink")]
60 macro_rules! delegate_sink {
61     ($field:ident, $item:ty) => {
62         fn poll_ready(
63             self: core::pin::Pin<&mut Self>,
64             cx: &mut core::task::Context<'_>,
65         ) -> core::task::Poll<Result<(), Self::Error>> {
66             self.project().$field.poll_ready(cx)
67         }
68 
69         fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
70             self.project().$field.start_send(item)
71         }
72 
73         fn poll_flush(
74             self: core::pin::Pin<&mut Self>,
75             cx: &mut core::task::Context<'_>,
76         ) -> core::task::Poll<Result<(), Self::Error>> {
77             self.project().$field.poll_flush(cx)
78         }
79 
80         fn poll_close(
81             self: core::pin::Pin<&mut Self>,
82             cx: &mut core::task::Context<'_>,
83         ) -> core::task::Poll<Result<(), Self::Error>> {
84             self.project().$field.poll_close(cx)
85         }
86     };
87 }
88 
89 macro_rules! delegate_future {
90     ($field:ident) => {
91         fn poll(
92             self: core::pin::Pin<&mut Self>,
93             cx: &mut core::task::Context<'_>,
94         ) -> core::task::Poll<Self::Output> {
95             self.project().$field.poll(cx)
96         }
97     };
98 }
99 
100 macro_rules! delegate_stream {
101     ($field:ident) => {
102         fn poll_next(
103             self: core::pin::Pin<&mut Self>,
104             cx: &mut core::task::Context<'_>,
105         ) -> core::task::Poll<Option<Self::Item>> {
106             self.project().$field.poll_next(cx)
107         }
108         fn size_hint(&self) -> (usize, Option<usize>) {
109             self.$field.size_hint()
110         }
111     };
112 }
113 
114 #[cfg(feature = "io")]
115 #[cfg(feature = "std")]
116 macro_rules! delegate_async_write {
117     ($field:ident) => {
118         fn poll_write(
119             self: core::pin::Pin<&mut Self>,
120             cx: &mut core::task::Context<'_>,
121             buf: &[u8],
122         ) -> core::task::Poll<std::io::Result<usize>> {
123             self.project().$field.poll_write(cx, buf)
124         }
125         fn poll_write_vectored(
126             self: core::pin::Pin<&mut Self>,
127             cx: &mut core::task::Context<'_>,
128             bufs: &[std::io::IoSlice<'_>],
129         ) -> core::task::Poll<std::io::Result<usize>> {
130             self.project().$field.poll_write_vectored(cx, bufs)
131         }
132         fn poll_flush(
133             self: core::pin::Pin<&mut Self>,
134             cx: &mut core::task::Context<'_>,
135         ) -> core::task::Poll<std::io::Result<()>> {
136             self.project().$field.poll_flush(cx)
137         }
138         fn poll_close(
139             self: core::pin::Pin<&mut Self>,
140             cx: &mut core::task::Context<'_>,
141         ) -> core::task::Poll<std::io::Result<()>> {
142             self.project().$field.poll_close(cx)
143         }
144     };
145 }
146 
147 #[cfg(feature = "io")]
148 #[cfg(feature = "std")]
149 macro_rules! delegate_async_read {
150     ($field:ident) => {
151         #[cfg(feature = "read-initializer")]
152         unsafe fn initializer(&self) -> $crate::io::Initializer {
153             self.$field.initializer()
154         }
155 
156         fn poll_read(
157             self: core::pin::Pin<&mut Self>,
158             cx: &mut core::task::Context<'_>,
159             buf: &mut [u8],
160         ) -> core::task::Poll<std::io::Result<usize>> {
161             self.project().$field.poll_read(cx, buf)
162         }
163 
164         fn poll_read_vectored(
165             self: core::pin::Pin<&mut Self>,
166             cx: &mut core::task::Context<'_>,
167             bufs: &mut [std::io::IoSliceMut<'_>],
168         ) -> core::task::Poll<std::io::Result<usize>> {
169             self.project().$field.poll_read_vectored(cx, bufs)
170         }
171     };
172 }
173 
174 #[cfg(feature = "io")]
175 #[cfg(feature = "std")]
176 macro_rules! delegate_async_buf_read {
177     ($field:ident) => {
178         fn poll_fill_buf(
179             self: core::pin::Pin<&mut Self>,
180             cx: &mut core::task::Context<'_>,
181         ) -> core::task::Poll<std::io::Result<&[u8]>> {
182             self.project().$field.poll_fill_buf(cx)
183         }
184 
185         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
186             self.project().$field.consume(amt)
187         }
188     };
189 }
190 
191 macro_rules! delegate_access_inner {
192     ($field:ident, $inner:ty, ($($ind:tt)*)) => {
193         /// Acquires a reference to the underlying sink or stream that this combinator is
194         /// pulling from.
195         pub fn get_ref(&self) -> &$inner {
196             (&self.$field) $($ind get_ref())*
197         }
198 
199         /// Acquires a mutable reference to the underlying sink or stream that this
200         /// combinator is pulling from.
201         ///
202         /// Note that care must be taken to avoid tampering with the state of the
203         /// sink or stream which may otherwise confuse this combinator.
204         pub fn get_mut(&mut self) -> &mut $inner {
205             (&mut self.$field) $($ind get_mut())*
206         }
207 
208         /// Acquires a pinned mutable reference to the underlying sink or stream that this
209         /// combinator is pulling from.
210         ///
211         /// Note that care must be taken to avoid tampering with the state of the
212         /// sink or stream which may otherwise confuse this combinator.
213         pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
214             self.project().$field $($ind get_pin_mut())*
215         }
216 
217         /// Consumes this combinator, returning the underlying sink or stream.
218         ///
219         /// Note that this may discard intermediate state of this combinator, so
220         /// care should be taken to avoid losing resources when this is called.
221         pub fn into_inner(self) -> $inner {
222             self.$field $($ind into_inner())*
223         }
224     }
225 }
226 
227 macro_rules! delegate_all {
228     (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
229         impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
230             type Output = <$t as futures_core::future::Future>::Output;
231 
232             delegate_future!(inner);
233         }
234     };
235     (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
236         impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
237             fn is_terminated(&self) -> bool {
238                 self.inner.is_terminated()
239             }
240         }
241     };
242     (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
243         impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
244             type Item = <$t as futures_core::stream::Stream>::Item;
245 
246             delegate_stream!(inner);
247         }
248     };
249     (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
250         impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
251             fn is_terminated(&self) -> bool {
252                 self.inner.is_terminated()
253             }
254         }
255     };
256     (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
257         #[cfg(feature = "sink")]
258         impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
259             type Error = <$t as futures_sink::Sink<_Item>>::Error;
260 
261             delegate_sink!(inner, _Item);
262         }
263     };
264     (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
265         impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
266             fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
267                 core::fmt::Debug::fmt(&self.inner, f)
268             }
269         }
270     };
271     (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
272         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
273             delegate_access_inner!(inner, $inner, ($($ind)*));
274         }
275     };
276     (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
277         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
278             pub(crate) fn new($($param: $paramt),*) -> Self {
279                 Self { inner: $cons }
280             }
281         }
282     };
283     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
284         pin_project_lite::pin_project! {
285             #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
286             $(#[$attr])*
287             pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
288         }
289 
290         impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
291             $($($item)*)*
292         }
293 
294         delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
295     };
296     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
297         delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
298 
299         delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
300     };
301 }
302 
303 pub mod future;
304 #[doc(no_inline)]
305 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
306 
307 pub mod stream;
308 #[doc(no_inline)]
309 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
310 
311 #[cfg(feature = "sink")]
312 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
313 pub mod sink;
314 #[cfg(feature = "sink")]
315 #[doc(no_inline)]
316 pub use crate::sink::{Sink, SinkExt};
317 
318 pub mod task;
319 
320 pub mod never;
321 
322 #[cfg(feature = "compat")]
323 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
324 pub mod compat;
325 
326 #[cfg(feature = "io")]
327 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
328 #[cfg(feature = "std")]
329 pub mod io;
330 #[cfg(feature = "io")]
331 #[cfg(feature = "std")]
332 #[doc(no_inline)]
333 pub use crate::io::{
334     AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
335     AsyncWriteExt,
336 };
337 
338 #[cfg(feature = "alloc")]
339 pub mod lock;
340 
341 #[cfg(not(futures_no_atomic_cas))]
342 #[cfg(feature = "alloc")]
343 mod abortable;
344 
345 mod fns;
346 mod unfold_state;
347