1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![cfg_attr(feature = "read-initializer", feature(read_initializer))] 5 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] 6 #![cfg_attr(not(feature = "std"), no_std)] 7 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] 8 // It cannot be included in the published code because this lints have false positives in the minimum required version. 9 #![cfg_attr(test, warn(single_use_lifetimes))] 10 #![warn(clippy::all)] 11 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] 12 #![cfg_attr(docsrs, feature(doc_cfg))] 13 14 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 15 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 16 17 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] 18 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 19 20 #[cfg(feature = "alloc")] 21 extern crate alloc; 22 23 // Macro re-exports 24 pub use futures_core::ready; 25 pub use pin_utils::pin_mut; 26 27 #[cfg(feature = "async-await")] 28 #[macro_use] 29 mod async_await; 30 #[cfg(feature = "async-await")] 31 #[doc(hidden)] 32 pub use self::async_await::*; 33 34 // Not public API. 35 #[cfg(feature = "async-await")] 36 #[doc(hidden)] 37 pub mod __private { 38 pub use crate::*; 39 pub use core::{ 40 option::Option::{self, None, Some}, 41 pin::Pin, 42 result::Result::{Err, Ok}, 43 }; 44 45 pub mod async_await { 46 pub use crate::async_await::*; 47 } 48 } 49 50 macro_rules! cfg_target_has_atomic { 51 ($($item:item)*) => {$( 52 #[cfg(not(futures_no_atomic_cas))] 53 $item 54 )*}; 55 } 56 57 #[cfg(feature = "sink")] 58 macro_rules! delegate_sink { 59 ($field:ident, $item:ty) => { 60 fn poll_ready( 61 self: core::pin::Pin<&mut Self>, 62 cx: &mut core::task::Context<'_>, 63 ) -> core::task::Poll<Result<(), Self::Error>> { 64 self.project().$field.poll_ready(cx) 65 } 66 67 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { 68 self.project().$field.start_send(item) 69 } 70 71 fn poll_flush( 72 self: core::pin::Pin<&mut Self>, 73 cx: &mut core::task::Context<'_>, 74 ) -> core::task::Poll<Result<(), Self::Error>> { 75 self.project().$field.poll_flush(cx) 76 } 77 78 fn poll_close( 79 self: core::pin::Pin<&mut Self>, 80 cx: &mut core::task::Context<'_>, 81 ) -> core::task::Poll<Result<(), Self::Error>> { 82 self.project().$field.poll_close(cx) 83 } 84 }; 85 } 86 87 macro_rules! delegate_future { 88 ($field:ident) => { 89 fn poll( 90 self: core::pin::Pin<&mut Self>, 91 cx: &mut core::task::Context<'_>, 92 ) -> core::task::Poll<Self::Output> { 93 self.project().$field.poll(cx) 94 } 95 }; 96 } 97 98 macro_rules! delegate_stream { 99 ($field:ident) => { 100 fn poll_next( 101 self: core::pin::Pin<&mut Self>, 102 cx: &mut core::task::Context<'_>, 103 ) -> core::task::Poll<Option<Self::Item>> { 104 self.project().$field.poll_next(cx) 105 } 106 fn size_hint(&self) -> (usize, Option<usize>) { 107 self.$field.size_hint() 108 } 109 }; 110 } 111 112 #[cfg(feature = "io")] 113 #[cfg(feature = "std")] 114 macro_rules! delegate_async_write { 115 ($field:ident) => { 116 fn poll_write( 117 self: core::pin::Pin<&mut Self>, 118 cx: &mut core::task::Context<'_>, 119 buf: &[u8], 120 ) -> core::task::Poll<std::io::Result<usize>> { 121 self.project().$field.poll_write(cx, buf) 122 } 123 fn poll_write_vectored( 124 self: core::pin::Pin<&mut Self>, 125 cx: &mut core::task::Context<'_>, 126 bufs: &[std::io::IoSlice<'_>], 127 ) -> core::task::Poll<std::io::Result<usize>> { 128 self.project().$field.poll_write_vectored(cx, bufs) 129 } 130 fn poll_flush( 131 self: core::pin::Pin<&mut Self>, 132 cx: &mut core::task::Context<'_>, 133 ) -> core::task::Poll<std::io::Result<()>> { 134 self.project().$field.poll_flush(cx) 135 } 136 fn poll_close( 137 self: core::pin::Pin<&mut Self>, 138 cx: &mut core::task::Context<'_>, 139 ) -> core::task::Poll<std::io::Result<()>> { 140 self.project().$field.poll_close(cx) 141 } 142 }; 143 } 144 145 #[cfg(feature = "io")] 146 #[cfg(feature = "std")] 147 macro_rules! delegate_async_read { 148 ($field:ident) => { 149 #[cfg(feature = "read-initializer")] 150 unsafe fn initializer(&self) -> $crate::io::Initializer { 151 self.$field.initializer() 152 } 153 154 fn poll_read( 155 self: core::pin::Pin<&mut Self>, 156 cx: &mut core::task::Context<'_>, 157 buf: &mut [u8], 158 ) -> core::task::Poll<std::io::Result<usize>> { 159 self.project().$field.poll_read(cx, buf) 160 } 161 162 fn poll_read_vectored( 163 self: core::pin::Pin<&mut Self>, 164 cx: &mut core::task::Context<'_>, 165 bufs: &mut [std::io::IoSliceMut<'_>], 166 ) -> core::task::Poll<std::io::Result<usize>> { 167 self.project().$field.poll_read_vectored(cx, bufs) 168 } 169 }; 170 } 171 172 #[cfg(feature = "io")] 173 #[cfg(feature = "std")] 174 macro_rules! delegate_async_buf_read { 175 ($field:ident) => { 176 fn poll_fill_buf( 177 self: core::pin::Pin<&mut Self>, 178 cx: &mut core::task::Context<'_>, 179 ) -> core::task::Poll<std::io::Result<&[u8]>> { 180 self.project().$field.poll_fill_buf(cx) 181 } 182 183 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 184 self.project().$field.consume(amt) 185 } 186 }; 187 } 188 189 macro_rules! delegate_access_inner { 190 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 191 /// Acquires a reference to the underlying sink or stream that this combinator is 192 /// pulling from. 193 pub fn get_ref(&self) -> &$inner { 194 (&self.$field) $($ind get_ref())* 195 } 196 197 /// Acquires a mutable reference to the underlying sink or stream that this 198 /// combinator is pulling from. 199 /// 200 /// Note that care must be taken to avoid tampering with the state of the 201 /// sink or stream which may otherwise confuse this combinator. 202 pub fn get_mut(&mut self) -> &mut $inner { 203 (&mut self.$field) $($ind get_mut())* 204 } 205 206 /// Acquires a pinned mutable reference to the underlying sink or stream that this 207 /// combinator is pulling from. 208 /// 209 /// Note that care must be taken to avoid tampering with the state of the 210 /// sink or stream which may otherwise confuse this combinator. 211 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 212 self.project().$field $($ind get_pin_mut())* 213 } 214 215 /// Consumes this combinator, returning the underlying sink or stream. 216 /// 217 /// Note that this may discard intermediate state of this combinator, so 218 /// care should be taken to avoid losing resources when this is called. 219 pub fn into_inner(self) -> $inner { 220 self.$field $($ind into_inner())* 221 } 222 } 223 } 224 225 macro_rules! delegate_all { 226 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 227 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 228 type Output = <$t as futures_core::future::Future>::Output; 229 230 delegate_future!(inner); 231 } 232 }; 233 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 234 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 235 fn is_terminated(&self) -> bool { 236 self.inner.is_terminated() 237 } 238 } 239 }; 240 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 241 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 242 type Item = <$t as futures_core::stream::Stream>::Item; 243 244 delegate_stream!(inner); 245 } 246 }; 247 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 248 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 249 fn is_terminated(&self) -> bool { 250 self.inner.is_terminated() 251 } 252 } 253 }; 254 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 255 #[cfg(feature = "sink")] 256 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 257 type Error = <$t as futures_sink::Sink<_Item>>::Error; 258 259 delegate_sink!(inner, _Item); 260 } 261 }; 262 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 263 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 264 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 265 core::fmt::Debug::fmt(&self.inner, f) 266 } 267 } 268 }; 269 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 270 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 271 delegate_access_inner!(inner, $inner, ($($ind)*)); 272 } 273 }; 274 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 275 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 276 pub(crate) fn new($($param: $paramt),*) -> Self { 277 Self { inner: $cons } 278 } 279 } 280 }; 281 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 282 pin_project_lite::pin_project! { 283 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 284 $(#[$attr])* 285 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } 286 } 287 288 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 289 $($($item)*)* 290 } 291 292 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 293 }; 294 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 295 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 296 297 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 298 }; 299 } 300 301 pub mod future; 302 #[doc(hidden)] 303 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; 304 305 pub mod stream; 306 #[doc(hidden)] 307 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; 308 309 #[cfg(feature = "sink")] 310 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 311 pub mod sink; 312 #[cfg(feature = "sink")] 313 #[doc(hidden)] 314 pub use crate::sink::{Sink, SinkExt}; 315 316 pub mod task; 317 318 pub mod never; 319 320 #[cfg(feature = "compat")] 321 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] 322 pub mod compat; 323 324 #[cfg(feature = "io")] 325 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 326 #[cfg(feature = "std")] 327 pub mod io; 328 #[cfg(feature = "io")] 329 #[cfg(feature = "std")] 330 #[doc(hidden)] 331 pub use crate::io::{ 332 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, 333 AsyncWriteExt, 334 }; 335 336 #[cfg(feature = "alloc")] 337 pub mod lock; 338 339 cfg_target_has_atomic! { 340 #[cfg(feature = "alloc")] 341 mod abortable; 342 } 343 344 mod fns; 345 mod unfold_state; 346