1 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, 2 //! and the `AsyncRead` and `AsyncWrite` traits. 3 4 #![cfg_attr(feature = "read-initializer", feature(read_initializer))] 5 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] 6 #![cfg_attr(not(feature = "std"), no_std)] 7 #![warn( 8 missing_debug_implementations, 9 missing_docs, 10 rust_2018_idioms, 11 single_use_lifetimes, 12 unreachable_pub 13 )] 14 #![doc(test( 15 no_crate_inject, 16 attr( 17 deny(warnings, rust_2018_idioms, single_use_lifetimes), 18 allow(dead_code, unused_assignments, unused_variables) 19 ) 20 ))] 21 #![cfg_attr(docsrs, feature(doc_cfg))] 22 23 #[cfg(all(feature = "bilock", not(feature = "unstable")))] 24 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 25 26 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] 27 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); 28 29 #[cfg(feature = "alloc")] 30 extern crate alloc; 31 32 // Macro re-exports 33 pub use futures_core::ready; 34 pub use pin_utils::pin_mut; 35 36 #[cfg(feature = "async-await")] 37 #[macro_use] 38 mod async_await; 39 #[cfg(feature = "async-await")] 40 #[doc(hidden)] 41 pub use self::async_await::*; 42 43 // Not public API. 44 #[cfg(feature = "async-await")] 45 #[doc(hidden)] 46 pub mod __private { 47 pub use crate::*; 48 pub use core::{ 49 option::Option::{self, None, Some}, 50 pin::Pin, 51 result::Result::{Err, Ok}, 52 }; 53 54 pub mod async_await { 55 pub use crate::async_await::*; 56 } 57 } 58 59 #[cfg(feature = "sink")] 60 macro_rules! delegate_sink { 61 ($field:ident, $item:ty) => { 62 fn poll_ready( 63 self: core::pin::Pin<&mut Self>, 64 cx: &mut core::task::Context<'_>, 65 ) -> core::task::Poll<Result<(), Self::Error>> { 66 self.project().$field.poll_ready(cx) 67 } 68 69 fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { 70 self.project().$field.start_send(item) 71 } 72 73 fn poll_flush( 74 self: core::pin::Pin<&mut Self>, 75 cx: &mut core::task::Context<'_>, 76 ) -> core::task::Poll<Result<(), Self::Error>> { 77 self.project().$field.poll_flush(cx) 78 } 79 80 fn poll_close( 81 self: core::pin::Pin<&mut Self>, 82 cx: &mut core::task::Context<'_>, 83 ) -> core::task::Poll<Result<(), Self::Error>> { 84 self.project().$field.poll_close(cx) 85 } 86 }; 87 } 88 89 macro_rules! delegate_future { 90 ($field:ident) => { 91 fn poll( 92 self: core::pin::Pin<&mut Self>, 93 cx: &mut core::task::Context<'_>, 94 ) -> core::task::Poll<Self::Output> { 95 self.project().$field.poll(cx) 96 } 97 }; 98 } 99 100 macro_rules! delegate_stream { 101 ($field:ident) => { 102 fn poll_next( 103 self: core::pin::Pin<&mut Self>, 104 cx: &mut core::task::Context<'_>, 105 ) -> core::task::Poll<Option<Self::Item>> { 106 self.project().$field.poll_next(cx) 107 } 108 fn size_hint(&self) -> (usize, Option<usize>) { 109 self.$field.size_hint() 110 } 111 }; 112 } 113 114 #[cfg(feature = "io")] 115 #[cfg(feature = "std")] 116 macro_rules! delegate_async_write { 117 ($field:ident) => { 118 fn poll_write( 119 self: core::pin::Pin<&mut Self>, 120 cx: &mut core::task::Context<'_>, 121 buf: &[u8], 122 ) -> core::task::Poll<std::io::Result<usize>> { 123 self.project().$field.poll_write(cx, buf) 124 } 125 fn poll_write_vectored( 126 self: core::pin::Pin<&mut Self>, 127 cx: &mut core::task::Context<'_>, 128 bufs: &[std::io::IoSlice<'_>], 129 ) -> core::task::Poll<std::io::Result<usize>> { 130 self.project().$field.poll_write_vectored(cx, bufs) 131 } 132 fn poll_flush( 133 self: core::pin::Pin<&mut Self>, 134 cx: &mut core::task::Context<'_>, 135 ) -> core::task::Poll<std::io::Result<()>> { 136 self.project().$field.poll_flush(cx) 137 } 138 fn poll_close( 139 self: core::pin::Pin<&mut Self>, 140 cx: &mut core::task::Context<'_>, 141 ) -> core::task::Poll<std::io::Result<()>> { 142 self.project().$field.poll_close(cx) 143 } 144 }; 145 } 146 147 #[cfg(feature = "io")] 148 #[cfg(feature = "std")] 149 macro_rules! delegate_async_read { 150 ($field:ident) => { 151 #[cfg(feature = "read-initializer")] 152 unsafe fn initializer(&self) -> $crate::io::Initializer { 153 self.$field.initializer() 154 } 155 156 fn poll_read( 157 self: core::pin::Pin<&mut Self>, 158 cx: &mut core::task::Context<'_>, 159 buf: &mut [u8], 160 ) -> core::task::Poll<std::io::Result<usize>> { 161 self.project().$field.poll_read(cx, buf) 162 } 163 164 fn poll_read_vectored( 165 self: core::pin::Pin<&mut Self>, 166 cx: &mut core::task::Context<'_>, 167 bufs: &mut [std::io::IoSliceMut<'_>], 168 ) -> core::task::Poll<std::io::Result<usize>> { 169 self.project().$field.poll_read_vectored(cx, bufs) 170 } 171 }; 172 } 173 174 #[cfg(feature = "io")] 175 #[cfg(feature = "std")] 176 macro_rules! delegate_async_buf_read { 177 ($field:ident) => { 178 fn poll_fill_buf( 179 self: core::pin::Pin<&mut Self>, 180 cx: &mut core::task::Context<'_>, 181 ) -> core::task::Poll<std::io::Result<&[u8]>> { 182 self.project().$field.poll_fill_buf(cx) 183 } 184 185 fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { 186 self.project().$field.consume(amt) 187 } 188 }; 189 } 190 191 macro_rules! delegate_access_inner { 192 ($field:ident, $inner:ty, ($($ind:tt)*)) => { 193 /// Acquires a reference to the underlying sink or stream that this combinator is 194 /// pulling from. 195 pub fn get_ref(&self) -> &$inner { 196 (&self.$field) $($ind get_ref())* 197 } 198 199 /// Acquires a mutable reference to the underlying sink or stream that this 200 /// combinator is pulling from. 201 /// 202 /// Note that care must be taken to avoid tampering with the state of the 203 /// sink or stream which may otherwise confuse this combinator. 204 pub fn get_mut(&mut self) -> &mut $inner { 205 (&mut self.$field) $($ind get_mut())* 206 } 207 208 /// Acquires a pinned mutable reference to the underlying sink or stream that this 209 /// combinator is pulling from. 210 /// 211 /// Note that care must be taken to avoid tampering with the state of the 212 /// sink or stream which may otherwise confuse this combinator. 213 pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> { 214 self.project().$field $($ind get_pin_mut())* 215 } 216 217 /// Consumes this combinator, returning the underlying sink or stream. 218 /// 219 /// Note that this may discard intermediate state of this combinator, so 220 /// care should be taken to avoid losing resources when this is called. 221 pub fn into_inner(self) -> $inner { 222 self.$field $($ind into_inner())* 223 } 224 } 225 } 226 227 macro_rules! delegate_all { 228 (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 229 impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* { 230 type Output = <$t as futures_core::future::Future>::Output; 231 232 delegate_future!(inner); 233 } 234 }; 235 (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 236 impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* { 237 fn is_terminated(&self) -> bool { 238 self.inner.is_terminated() 239 } 240 } 241 }; 242 (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 243 impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* { 244 type Item = <$t as futures_core::stream::Stream>::Item; 245 246 delegate_stream!(inner); 247 } 248 }; 249 (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 250 impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* { 251 fn is_terminated(&self) -> bool { 252 self.inner.is_terminated() 253 } 254 } 255 }; 256 (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 257 #[cfg(feature = "sink")] 258 impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* { 259 type Error = <$t as futures_sink::Sink<_Item>>::Error; 260 261 delegate_sink!(inner, _Item); 262 } 263 }; 264 (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 265 impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* { 266 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 267 core::fmt::Debug::fmt(&self.inner, f) 268 } 269 } 270 }; 271 (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 272 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 273 delegate_access_inner!(inner, $inner, ($($ind)*)); 274 } 275 }; 276 (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => { 277 impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* { 278 pub(crate) fn new($($param: $paramt),*) -> Self { 279 Self { inner: $cons } 280 } 281 } 282 }; 283 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 284 pin_project_lite::pin_project! { 285 #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] 286 $(#[$attr])* 287 pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } 288 } 289 290 impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { 291 $($($item)*)* 292 } 293 294 delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 295 }; 296 ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { 297 delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*); 298 299 delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*); 300 }; 301 } 302 303 pub mod future; 304 #[doc(no_inline)] 305 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; 306 307 pub mod stream; 308 #[doc(no_inline)] 309 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; 310 311 #[cfg(feature = "sink")] 312 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 313 pub mod sink; 314 #[cfg(feature = "sink")] 315 #[doc(no_inline)] 316 pub use crate::sink::{Sink, SinkExt}; 317 318 pub mod task; 319 320 pub mod never; 321 322 #[cfg(feature = "compat")] 323 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] 324 pub mod compat; 325 326 #[cfg(feature = "io")] 327 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 328 #[cfg(feature = "std")] 329 pub mod io; 330 #[cfg(feature = "io")] 331 #[cfg(feature = "std")] 332 #[doc(no_inline)] 333 pub use crate::io::{ 334 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, 335 AsyncWriteExt, 336 }; 337 338 #[cfg(feature = "alloc")] 339 pub mod lock; 340 341 #[cfg(not(futures_no_atomic_cas))] 342 #[cfg(feature = "alloc")] 343 mod abortable; 344 345 mod fns; 346 mod unfold_state; 347