1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] 5 #![cfg_attr(feature = "read-initializer", feature(read_initializer))] 6 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] 7 8 #![cfg_attr(not(feature = "std"), no_std)] 9 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] 10 #![warn(clippy::all)] 11 12 // The solution for this lint is not available on 1.39 which is the current minimum supported version. 13 // Can be removed as of minimum supported 1.40 or if https://github.com/rust-lang/rust-clippy/issues/3941 14 // get's implemented. 15 #![allow(clippy::mem_replace_with_default)] 16 17 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] 18 19 #![doc(html_root_url = "https://docs.rs/futures-util/0.3.5")] 20 21 #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] 22 compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 23 24 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 25 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 26 27 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] 28 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 29 30 #[cfg(feature = "alloc")] 31 extern crate alloc; 32 33 #[macro_use(ready)] 34 extern crate futures_core; 35 36 // Macro re-exports 37 pub use futures_core::ready; 38 pub use pin_utils::pin_mut; 39 40 // Not public API. 41 #[cfg(feature = "async-await")] 42 #[macro_use] 43 #[doc(hidden)] 44 pub mod async_await; 45 #[cfg(feature = "async-await")] 46 #[doc(hidden)] 47 pub use self::async_await::*; 48 49 // Not public API. 50 #[doc(hidden)] 51 pub use futures_core::core_reexport; 52 53 // Not public API. 54 #[cfg(feature = "async-await")] 55 #[doc(hidden)] 56 pub mod __reexport { 57 #[doc(hidden)] 58 pub use crate::*; 59 } 60 61 macro_rules! cfg_target_has_atomic { 62 ($($item:item)*) => {$( 63 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] 64 $item 65 )*}; 66 } 67 68 #[cfg(feature = "sink")] 69 macro_rules! delegate_sink { 70 ($field:ident, $item:ty) => { 71 fn poll_ready( 72 self: core::pin::Pin<&mut Self>, 73 cx: &mut $crate::core_reexport::task::Context<'_>, 74 ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { 75 self.project().$field.poll_ready(cx) 76 } 77 78 fn start_send( 79 self: core::pin::Pin<&mut Self>, 80 item: $item, 81 ) -> Result<(), Self::Error> { 82 self.project().$field.start_send(item) 83 } 84 85 fn poll_flush( 86 self: core::pin::Pin<&mut Self>, 87 cx: &mut $crate::core_reexport::task::Context<'_>, 88 ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { 89 self.project().$field.poll_flush(cx) 90 } 91 92 fn poll_close( 93 self: core::pin::Pin<&mut Self>, 94 cx: &mut $crate::core_reexport::task::Context<'_>, 95 ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { 96 self.project().$field.poll_close(cx) 97 } 98 } 99 } 100 101 macro_rules! delegate_future { 102 ($field:ident) => { 103 fn poll( 104 self: core::pin::Pin<&mut Self>, 105 cx: &mut $crate::core_reexport::task::Context<'_>, 106 ) -> $crate::core_reexport::task::Poll<Self::Output> { 107 self.project().$field.poll(cx) 108 } 109 } 110 } 111 112 macro_rules! delegate_stream { 113 ($field:ident) => { 114 fn poll_next( 115 self: core::pin::Pin<&mut Self>, 116 cx: &mut $crate::core_reexport::task::Context<'_>, 117 ) -> $crate::core_reexport::task::Poll<Option<Self::Item>> { 118 self.project().$field.poll_next(cx) 119 } 120 fn size_hint(&self) -> (usize, Option<usize>) { 121 self.$field.size_hint() 122 } 123 } 124 } 125 126 #[cfg(feature = "io")] 127 #[cfg(feature = "std")] 128 macro_rules! delegate_async_write { 129 ($field:ident) => { 130 fn poll_write(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &[u8]) 131 -> core::task::Poll<std::io::Result<usize>> 132 { 133 self.project().$field.poll_write(cx, buf) 134 } 135 fn poll_write_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &[std::io::IoSlice<'_>]) 136 -> core::task::Poll<std::io::Result<usize>> 137 { 138 self.project().$field.poll_write_vectored(cx, bufs) 139 } 140 fn poll_flush(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) 141 -> core::task::Poll<std::io::Result<()>> 142 { 143 self.project().$field.poll_flush(cx) 144 } 145 fn poll_close(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) 146 -> core::task::Poll<std::io::Result<()>> 147 { 148 self.project().$field.poll_close(cx) 149 } 150 } 151 } 152 153 #[cfg(feature = "io")] 154 #[cfg(feature = "std")] 155 macro_rules! delegate_async_read { 156 ($field:ident) => { 157 #[cfg(feature = "read-initializer")] 158 unsafe fn initializer(&self) -> $crate::io::Initializer { 159 self.$field.initializer() 160 } 161 162 fn poll_read(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &mut [u8]) 163 -> core::task::Poll<std::io::Result<usize>> 164 { 165 self.project().$field.poll_read(cx, buf) 166 } 167 168 fn poll_read_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>]) 169 -> core::task::Poll<std::io::Result<usize>> 170 { 171 self.project().$field.poll_read_vectored(cx, bufs) 172 } 173 } 174 } 175 176 #[cfg(feature = "io")] 177 #[cfg(feature = "std")] 178 macro_rules! delegate_async_buf_read { 179 ($field:ident) => { 180 fn poll_fill_buf( 181 self: core::pin::Pin<&mut Self>, 182 cx: &mut core::task::Context<'_>, 183 ) -> core::task::Poll<std::io::Result<&[u8]>> { 184 self.project().$field.poll_fill_buf(cx) 185 } 186 187 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 188 self.project().$field.consume(amt) 189 } 190 } 191 } 192 193 macro_rules! delegate_access_inner { 194 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 195 /// Acquires a reference to the underlying sink or stream that this combinator is 196 /// pulling from. 197 pub fn get_ref(&self) -> &$inner { 198 (&self.$field) $($ind get_ref())* 199 } 200 201 /// Acquires a mutable reference to the underlying sink or stream that this 202 /// combinator is pulling from. 203 /// 204 /// Note that care must be taken to avoid tampering with the state of the 205 /// sink or stream which may otherwise confuse this combinator. 206 pub fn get_mut(&mut self) -> &mut $inner { 207 (&mut self.$field) $($ind get_mut())* 208 } 209 210 /// Acquires a pinned mutable reference to the underlying sink or stream that this 211 /// combinator is pulling from. 212 /// 213 /// Note that care must be taken to avoid tampering with the state of the 214 /// sink or stream which may otherwise confuse this combinator. 215 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 216 self.project().$field $($ind get_pin_mut())* 217 } 218 219 /// Consumes this combinator, returning the underlying sink or stream. 220 /// 221 /// Note that this may discard intermediate state of this combinator, so 222 /// care should be taken to avoid losing resources when this is called. 223 pub fn into_inner(self) -> $inner { 224 self.$field $($ind into_inner())* 225 } 226 } 227 } 228 229 macro_rules! delegate_all { 230 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 231 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 232 type Output = <$t as futures_core::future::Future>::Output; 233 234 delegate_future!(inner); 235 } 236 }; 237 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 238 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 239 fn is_terminated(&self) -> bool { 240 self.inner.is_terminated() 241 } 242 } 243 }; 244 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 245 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 246 type Item = <$t as futures_core::stream::Stream>::Item; 247 248 delegate_stream!(inner); 249 } 250 }; 251 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 252 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 253 fn is_terminated(&self) -> bool { 254 self.inner.is_terminated() 255 } 256 } 257 }; 258 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 259 #[cfg(feature = "sink")] 260 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 261 type Error = <$t as futures_sink::Sink<_Item>>::Error; 262 263 delegate_sink!(inner, _Item); 264 } 265 }; 266 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 267 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 268 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 269 core::fmt::Debug::fmt(&self.inner, f) 270 } 271 } 272 }; 273 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 274 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 275 delegate_access_inner!(inner, $inner, ($($ind)*)); 276 } 277 }; 278 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 279 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 280 pub(crate) fn new($($param: $paramt),*) -> Self { 281 Self { inner: $cons } 282 } 283 } 284 }; 285 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 286 #[pin_project::pin_project] 287 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 288 $(#[$attr])* 289 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner:$t } 290 291 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 292 $($($item)*)* 293 } 294 295 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 296 }; 297 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 298 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 299 300 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 301 }; 302 } 303 304 pub mod future; 305 #[doc(hidden)] pub use crate::future::{FutureExt, TryFutureExt}; 306 307 pub mod stream; 308 #[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt}; 309 310 #[cfg(feature = "sink")] 311 pub mod sink; 312 #[cfg(feature = "sink")] 313 #[doc(hidden)] pub use crate::sink::SinkExt; 314 315 pub mod task; 316 317 pub mod never; 318 319 #[cfg(feature = "compat")] 320 pub mod compat; 321 322 #[cfg(feature = "io")] 323 #[cfg(feature = "std")] 324 pub mod io; 325 #[cfg(feature = "io")] 326 #[cfg(feature = "std")] 327 #[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt}; 328 329 mod fns; 330 331 332 cfg_target_has_atomic! { 333 #[cfg(feature = "alloc")] 334 pub mod lock; 335 } 336