1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2 //! and the `AsyncRead` and `AsyncWrite` traits.
3 
4 #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
5 #![cfg_attr(feature = "read-initializer", feature(read_initializer))]
6 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
7 
8 #![cfg_attr(not(feature = "std"), no_std)]
9 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
10 #![warn(clippy::all)]
11 
12 // The solution for this lint is not available on 1.39 which is the current minimum supported version.
13 // Can be removed as of minimum supported 1.40 or if https://github.com/rust-lang/rust-clippy/issues/3941
14 // get's implemented.
15 #![allow(clippy::mem_replace_with_default)]
16 
17 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
18 
19 #![doc(html_root_url = "https://docs.rs/futures-util/0.3.5")]
20 
21 #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
22 compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
23 
24 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
25 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
26 
27 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
28 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
29 
30 #[cfg(feature = "alloc")]
31 extern crate alloc;
32 
33 #[macro_use(ready)]
34 extern crate futures_core;
35 
36 // Macro re-exports
37 pub use futures_core::ready;
38 pub use pin_utils::pin_mut;
39 
40 // Not public API.
41 #[cfg(feature = "async-await")]
42 #[macro_use]
43 #[doc(hidden)]
44 pub mod async_await;
45 #[cfg(feature = "async-await")]
46 #[doc(hidden)]
47 pub use self::async_await::*;
48 
49 // Not public API.
50 #[doc(hidden)]
51 pub use futures_core::core_reexport;
52 
53 // Not public API.
54 #[cfg(feature = "async-await")]
55 #[doc(hidden)]
56 pub mod __reexport {
57     #[doc(hidden)]
58     pub use crate::*;
59 }
60 
61 macro_rules! cfg_target_has_atomic {
62     ($($item:item)*) => {$(
63         #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
64         $item
65     )*};
66 }
67 
68 #[cfg(feature = "sink")]
69 macro_rules! delegate_sink {
70     ($field:ident, $item:ty) => {
71         fn poll_ready(
72             self: core::pin::Pin<&mut Self>,
73             cx: &mut $crate::core_reexport::task::Context<'_>,
74         ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
75             self.project().$field.poll_ready(cx)
76         }
77 
78         fn start_send(
79             self: core::pin::Pin<&mut Self>,
80             item: $item,
81         ) -> Result<(), Self::Error> {
82             self.project().$field.start_send(item)
83         }
84 
85         fn poll_flush(
86             self: core::pin::Pin<&mut Self>,
87             cx: &mut $crate::core_reexport::task::Context<'_>,
88         ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
89             self.project().$field.poll_flush(cx)
90         }
91 
92         fn poll_close(
93             self: core::pin::Pin<&mut Self>,
94             cx: &mut $crate::core_reexport::task::Context<'_>,
95         ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
96             self.project().$field.poll_close(cx)
97         }
98     }
99 }
100 
101 macro_rules! delegate_future {
102     ($field:ident) => {
103         fn poll(
104             self: core::pin::Pin<&mut Self>,
105             cx: &mut $crate::core_reexport::task::Context<'_>,
106         ) -> $crate::core_reexport::task::Poll<Self::Output> {
107             self.project().$field.poll(cx)
108         }
109     }
110 }
111 
112 macro_rules! delegate_stream {
113     ($field:ident) => {
114         fn poll_next(
115             self: core::pin::Pin<&mut Self>,
116             cx: &mut $crate::core_reexport::task::Context<'_>,
117         ) -> $crate::core_reexport::task::Poll<Option<Self::Item>> {
118             self.project().$field.poll_next(cx)
119         }
120         fn size_hint(&self) -> (usize, Option<usize>) {
121             self.$field.size_hint()
122         }
123     }
124 }
125 
126 #[cfg(feature = "io")]
127 #[cfg(feature = "std")]
128 macro_rules! delegate_async_write {
129     ($field:ident) => {
130         fn poll_write(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &[u8])
131             -> core::task::Poll<std::io::Result<usize>>
132         {
133             self.project().$field.poll_write(cx, buf)
134         }
135         fn poll_write_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &[std::io::IoSlice<'_>])
136             -> core::task::Poll<std::io::Result<usize>>
137         {
138             self.project().$field.poll_write_vectored(cx, bufs)
139         }
140         fn poll_flush(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
141             -> core::task::Poll<std::io::Result<()>>
142         {
143             self.project().$field.poll_flush(cx)
144         }
145         fn poll_close(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
146             -> core::task::Poll<std::io::Result<()>>
147         {
148             self.project().$field.poll_close(cx)
149         }
150     }
151 }
152 
153 #[cfg(feature = "io")]
154 #[cfg(feature = "std")]
155 macro_rules! delegate_async_read {
156     ($field:ident) => {
157         #[cfg(feature = "read-initializer")]
158         unsafe fn initializer(&self) -> $crate::io::Initializer {
159             self.$field.initializer()
160         }
161 
162         fn poll_read(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &mut [u8])
163             -> core::task::Poll<std::io::Result<usize>>
164         {
165             self.project().$field.poll_read(cx, buf)
166         }
167 
168         fn poll_read_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>])
169             -> core::task::Poll<std::io::Result<usize>>
170         {
171             self.project().$field.poll_read_vectored(cx, bufs)
172         }
173     }
174 }
175 
176 #[cfg(feature = "io")]
177 #[cfg(feature = "std")]
178 macro_rules! delegate_async_buf_read {
179     ($field:ident) => {
180         fn poll_fill_buf(
181             self: core::pin::Pin<&mut Self>,
182             cx: &mut core::task::Context<'_>,
183         ) -> core::task::Poll<std::io::Result<&[u8]>> {
184             self.project().$field.poll_fill_buf(cx)
185         }
186 
187         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
188             self.project().$field.consume(amt)
189         }
190     }
191 }
192 
193 macro_rules! delegate_access_inner {
194     ($field:ident, $inner:ty, ($($ind:tt)*)) => {
195         /// Acquires a reference to the underlying sink or stream that this combinator is
196         /// pulling from.
197         pub fn get_ref(&self) -> &$inner {
198             (&self.$field) $($ind get_ref())*
199         }
200 
201         /// Acquires a mutable reference to the underlying sink or stream that this
202         /// combinator is pulling from.
203         ///
204         /// Note that care must be taken to avoid tampering with the state of the
205         /// sink or stream which may otherwise confuse this combinator.
206         pub fn get_mut(&mut self) -> &mut $inner {
207             (&mut self.$field) $($ind get_mut())*
208         }
209 
210         /// Acquires a pinned mutable reference to the underlying sink or stream that this
211         /// combinator is pulling from.
212         ///
213         /// Note that care must be taken to avoid tampering with the state of the
214         /// sink or stream which may otherwise confuse this combinator.
215         pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
216             self.project().$field $($ind get_pin_mut())*
217         }
218 
219         /// Consumes this combinator, returning the underlying sink or stream.
220         ///
221         /// Note that this may discard intermediate state of this combinator, so
222         /// care should be taken to avoid losing resources when this is called.
223         pub fn into_inner(self) -> $inner {
224             self.$field $($ind into_inner())*
225         }
226     }
227 }
228 
229 macro_rules! delegate_all {
230     (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
231         impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
232             type Output = <$t as futures_core::future::Future>::Output;
233 
234             delegate_future!(inner);
235         }
236     };
237     (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
238         impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
239             fn is_terminated(&self) -> bool {
240                 self.inner.is_terminated()
241             }
242         }
243     };
244     (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
245         impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
246             type Item = <$t as futures_core::stream::Stream>::Item;
247 
248             delegate_stream!(inner);
249         }
250     };
251     (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
252         impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
253             fn is_terminated(&self) -> bool {
254                 self.inner.is_terminated()
255             }
256         }
257     };
258     (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
259         #[cfg(feature = "sink")]
260         impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
261             type Error = <$t as futures_sink::Sink<_Item>>::Error;
262 
263             delegate_sink!(inner, _Item);
264         }
265     };
266     (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
267         impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
268             fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
269                 core::fmt::Debug::fmt(&self.inner, f)
270             }
271         }
272     };
273     (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
274         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
275             delegate_access_inner!(inner, $inner, ($($ind)*));
276         }
277     };
278     (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
279         impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
280             pub(crate) fn new($($param: $paramt),*) -> Self {
281                 Self { inner: $cons }
282             }
283         }
284     };
285     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
286         #[pin_project::pin_project]
287         #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
288         $(#[$attr])*
289         pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner:$t }
290 
291         impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
292             $($($item)*)*
293         }
294 
295         delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
296     };
297     ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
298         delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
299 
300         delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
301     };
302 }
303 
304 pub mod future;
305 #[doc(hidden)] pub use crate::future::{FutureExt, TryFutureExt};
306 
307 pub mod stream;
308 #[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt};
309 
310 #[cfg(feature = "sink")]
311 pub mod sink;
312 #[cfg(feature = "sink")]
313 #[doc(hidden)] pub use crate::sink::SinkExt;
314 
315 pub mod task;
316 
317 pub mod never;
318 
319 #[cfg(feature = "compat")]
320 pub mod compat;
321 
322 #[cfg(feature = "io")]
323 #[cfg(feature = "std")]
324 pub mod io;
325 #[cfg(feature = "io")]
326 #[cfg(feature = "std")]
327 #[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt};
328 
329 mod fns;
330 
331 
332 cfg_target_has_atomic! {
333     #[cfg(feature = "alloc")]
334     pub mod lock;
335 }
336