1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2 //! and the `AsyncRead` and `AsyncWrite` traits.
3 
4 #![cfg_attr(feature = "read-initializer", feature(read_initializer))]
5 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
6 #![cfg_attr(not(feature = "std"), no_std)]
7 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
8 // It cannot be included in the published code because this lints have false positives in the minimum required version.
9 #![cfg_attr(test, warn(single_use_lifetimes))]
10 #![warn(clippy::all)]
11 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
12 #![cfg_attr(docsrs, feature(doc_cfg))]
13 
14 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
15 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
16 
17 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
18 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
19 
20 #[cfg(feature = "alloc")]
21 extern crate alloc;
22 
23 // Macro re-exports
24 pub use futures_core::ready;
25 pub use pin_utils::pin_mut;
26 
27 #[cfg(feature = "async-await")]
28 #[macro_use]
29 mod async_await;
30 #[cfg(feature = "async-await")]
31 #[doc(hidden)]
32 pub use self::async_await::*;
33 
34 // Not public API.
35 #[cfg(feature = "async-await")]
36 #[doc(hidden)]
37 pub mod __private {
38     pub use crate::*;
39     pub use core::{
40         option::Option::{self, None, Some},
41         pin::Pin,
42         result::Result::{Err, Ok},
43     };
44 
45     pub mod async_await {
46         pub use crate::async_await::*;
47     }
48 }
49 
50 macro_rules! cfg_target_has_atomic {
51     ($($item:item)*) => {$(
52         #[cfg(not(futures_no_atomic_cas))]
53         $item
54     )*};
55 }
56 
57 #[cfg(feature = "sink")]
58 macro_rules! delegate_sink {
59     ($field:ident, $item:ty) => {
60         fn poll_ready(
61             self: core::pin::Pin<&mut Self>,
62             cx: &mut core::task::Context<'_>,
63         ) -> core::task::Poll<Result<(), Self::Error>> {
64             self.project().$field.poll_ready(cx)
65         }
66 
67         fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
68             self.project().$field.start_send(item)
69         }
70 
71         fn poll_flush(
72             self: core::pin::Pin<&mut Self>,
73             cx: &mut core::task::Context<'_>,
74         ) -> core::task::Poll<Result<(), Self::Error>> {
75             self.project().$field.poll_flush(cx)
76         }
77 
78         fn poll_close(
79             self: core::pin::Pin<&mut Self>,
80             cx: &mut core::task::Context<'_>,
81         ) -> core::task::Poll<Result<(), Self::Error>> {
82             self.project().$field.poll_close(cx)
83         }
84     };
85 }
86 
87 macro_rules! delegate_future {
88     ($field:ident) => {
89         fn poll(
90             self: core::pin::Pin<&mut Self>,
91             cx: &mut core::task::Context<'_>,
92         ) -> core::task::Poll<Self::Output> {
93             self.project().$field.poll(cx)
94         }
95     };
96 }
97 
98 macro_rules! delegate_stream {
99     ($field:ident) => {
100         fn poll_next(
101             self: core::pin::Pin<&mut Self>,
102             cx: &mut core::task::Context<'_>,
103         ) -> core::task::Poll<Option<Self::Item>> {
104             self.project().$field.poll_next(cx)
105         }
106         fn size_hint(&self) -> (usize, Option<usize>) {
107             self.$field.size_hint()
108         }
109     };
110 }
111 
112 #[cfg(feature = "io")]
113 #[cfg(feature = "std")]
114 macro_rules! delegate_async_write {
115     ($field:ident) => {
116         fn poll_write(
117             self: core::pin::Pin<&mut Self>,
118             cx: &mut core::task::Context<'_>,
119             buf: &[u8],
120         ) -> core::task::Poll<std::io::Result<usize>> {
121             self.project().$field.poll_write(cx, buf)
122         }
123         fn poll_write_vectored(
124             self: core::pin::Pin<&mut Self>,
125             cx: &mut core::task::Context<'_>,
126             bufs: &[std::io::IoSlice<'_>],
127         ) -> core::task::Poll<std::io::Result<usize>> {
128             self.project().$field.poll_write_vectored(cx, bufs)
129         }
130         fn poll_flush(
131             self: core::pin::Pin<&mut Self>,
132             cx: &mut core::task::Context<'_>,
133         ) -> core::task::Poll<std::io::Result<()>> {
134             self.project().$field.poll_flush(cx)
135         }
136         fn poll_close(
137             self: core::pin::Pin<&mut Self>,
138             cx: &mut core::task::Context<'_>,
139         ) -> core::task::Poll<std::io::Result<()>> {
140             self.project().$field.poll_close(cx)
141         }
142     };
143 }
144 
145 #[cfg(feature = "io")]
146 #[cfg(feature = "std")]
147 macro_rules! delegate_async_read {
148     ($field:ident) => {
149         #[cfg(feature = "read-initializer")]
150         unsafe fn initializer(&self) -> $crate::io::Initializer {
151             self.$field.initializer()
152         }
153 
154         fn poll_read(
155             self: core::pin::Pin<&mut Self>,
156             cx: &mut core::task::Context<'_>,
157             buf: &mut [u8],
158         ) -> core::task::Poll<std::io::Result<usize>> {
159             self.project().$field.poll_read(cx, buf)
160         }
161 
162         fn poll_read_vectored(
163             self: core::pin::Pin<&mut Self>,
164             cx: &mut core::task::Context<'_>,
165             bufs: &mut [std::io::IoSliceMut<'_>],
166         ) -> core::task::Poll<std::io::Result<usize>> {
167             self.project().$field.poll_read_vectored(cx, bufs)
168         }
169     };
170 }
171 
172 #[cfg(feature = "io")]
173 #[cfg(feature = "std")]
174 macro_rules! delegate_async_buf_read {
175     ($field:ident) => {
176         fn poll_fill_buf(
177             self: core::pin::Pin<&mut Self>,
178             cx: &mut core::task::Context<'_>,
179         ) -> core::task::Poll<std::io::Result<&[u8]>> {
180             self.project().$field.poll_fill_buf(cx)
181         }
182 
183         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
184             self.project().$field.consume(amt)
185         }
186     };
187 }
188 
189 macro_rules! delegate_access_inner {
190     ($field:ident, $inner:ty, ($($ind:tt)*)) => {
191         /// Acquires a reference to the underlying sink or stream that this combinator is
192         /// pulling from.
193         pub fn get_ref(&self) -> &$inner {
194             (&self.$field) $($ind get_ref())*
195         }
196 
197         /// Acquires a mutable reference to the underlying sink or stream that this
198         /// combinator is pulling from.
199         ///
200         /// Note that care must be taken to avoid tampering with the state of the
201         /// sink or stream which may otherwise confuse this combinator.
202         pub fn get_mut(&mut self) -> &mut $inner {
203             (&mut self.$field) $($ind get_mut())*
204         }
205 
206         /// Acquires a pinned mutable reference to the underlying sink or stream that this
207         /// combinator is pulling from.
208         ///
209         /// Note that care must be taken to avoid tampering with the state of the
210         /// sink or stream which may otherwise confuse this combinator.
211         pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
212             self.project().$field $($ind get_pin_mut())*
213         }
214 
215         /// Consumes this combinator, returning the underlying sink or stream.
216         ///
217         /// Note that this may discard intermediate state of this combinator, so
218         /// care should be taken to avoid losing resources when this is called.
219         pub fn into_inner(self) -> $inner {
220             self.$field $($ind into_inner())*
221         }
222     }
223 }
224 
225 macro_rules! delegate_all {
226     (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
227         impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
228             type Output = <$t as futures_core::future::Future>::Output;
229 
230             delegate_future!(inner);
231         }
232     };
233     (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
234         impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
235             fn is_terminated(&self) -> bool {
236                 self.inner.is_terminated()
237             }
238         }
239     };
240     (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
241         impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
242             type Item = <$t as futures_core::stream::Stream>::Item;
243 
244             delegate_stream!(inner);
245         }
246     };
247     (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
248         impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
249             fn is_terminated(&self) -> bool {
250                 self.inner.is_terminated()
251             }
252         }
253     };
254     (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
255         #[cfg(feature = "sink")]
256         impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
257             type Error = <$t as futures_sink::Sink<_Item>>::Error;
258 
259             delegate_sink!(inner, _Item);
260         }
261     };
262     (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
263         impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
264             fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
265                 core::fmt::Debug::fmt(&self.inner, f)
266             }
267         }
268     };
269     (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
270         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
271             delegate_access_inner!(inner, $inner, ($($ind)*));
272         }
273     };
274     (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
275         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
276             pub(crate) fn new($($param: $paramt),*) -> Self {
277                 Self { inner: $cons }
278             }
279         }
280     };
281     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
282         pin_project_lite::pin_project! {
283             #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
284             $(#[$attr])*
285             pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
286         }
287 
288         impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
289             $($($item)*)*
290         }
291 
292         delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
293     };
294     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
295         delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
296 
297         delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
298     };
299 }
300 
301 pub mod future;
302 #[doc(hidden)]
303 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
304 
305 pub mod stream;
306 #[doc(hidden)]
307 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
308 
309 #[cfg(feature = "sink")]
310 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
311 pub mod sink;
312 #[cfg(feature = "sink")]
313 #[doc(hidden)]
314 pub use crate::sink::{Sink, SinkExt};
315 
316 pub mod task;
317 
318 pub mod never;
319 
320 #[cfg(feature = "compat")]
321 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
322 pub mod compat;
323 
324 #[cfg(feature = "io")]
325 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
326 #[cfg(feature = "std")]
327 pub mod io;
328 #[cfg(feature = "io")]
329 #[cfg(feature = "std")]
330 #[doc(hidden)]
331 pub use crate::io::{
332     AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
333     AsyncWriteExt,
334 };
335 
336 #[cfg(feature = "alloc")]
337 pub mod lock;
338 
339 cfg_target_has_atomic! {
340     #[cfg(feature = "alloc")]
341     mod abortable;
342 }
343 
344 mod fns;
345 mod unfold_state;
346