use futures_01::executor::{ spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01, UnsafeNotify as UnsafeNotify01, }; use futures_01::{ Async as Async01, Future as Future01, Stream as Stream01, }; #[cfg(feature = "sink")] use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01}; use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03}; use std::pin::Pin; use std::task::Context; #[cfg(feature = "sink")] use futures_sink::Sink as Sink03; #[cfg(feature = "io-compat")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object to a futures 0.3-compatible version, #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Compat01As03 { pub(crate) inner: Spawn01, } impl Unpin for Compat01As03 {} impl Compat01As03 { /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object in a futures 0.3-compatible wrapper. pub fn new(object: T) -> Compat01As03 { Compat01As03 { inner: spawn01(object), } } fn in_notify(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R { let notify = &WakerToHandle(cx.waker()); self.inner.poll_fn_notify(notify, 0, f) } /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within. pub fn get_ref(&self) -> &T { self.inner.get_ref() } /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained /// within. pub fn get_mut(&mut self) -> &mut T { self.inner.get_mut() } /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or /// AsyncWrite object. pub fn into_inner(self) -> T { self.inner.into_inner() } } /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future) pub trait Future01CompatExt: Future01 { /// Converts a futures 0.1 /// [`Future`](futures_01::future::Future) /// into a futures 0.3 /// [`Future>`](futures_core::future::Future). /// /// ``` /// # futures::executor::block_on(async { /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo /// # // feature issues /// use futures_util::compat::Future01CompatExt; /// /// let future = futures_01::future::ok::(1); /// assert_eq!(future.compat().await, Ok(1)); /// # }); /// ``` fn compat(self) -> Compat01As03 where Self: Sized, { Compat01As03::new(self) } } impl Future01CompatExt for Fut {} /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream) pub trait Stream01CompatExt: Stream01 { /// Converts a futures 0.1 /// [`Stream`](futures_01::stream::Stream) /// into a futures 0.3 /// [`Stream>`](futures_core::stream::Stream). /// /// ``` /// # futures::executor::block_on(async { /// use futures::stream::StreamExt; /// use futures_util::compat::Stream01CompatExt; /// /// let stream = futures_01::stream::once::(Ok(1)); /// let mut stream = stream.compat(); /// assert_eq!(stream.next().await, Some(Ok(1))); /// assert_eq!(stream.next().await, None); /// # }); /// ``` fn compat(self) -> Compat01As03 where Self: Sized, { Compat01As03::new(self) } } impl Stream01CompatExt for St {} /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink) #[cfg(feature = "sink")] pub trait Sink01CompatExt: Sink01 { /// Converts a futures 0.1 /// [`Sink`](futures_01::sink::Sink) /// into a futures 0.3 /// [`Sink`](futures_sink::Sink). /// /// ``` /// # futures::executor::block_on(async { /// use futures::{sink::SinkExt, stream::StreamExt}; /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt}; /// /// let (tx, rx) = futures_01::unsync::mpsc::channel(1); /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat()); /// /// tx.send(1).await.unwrap(); /// drop(tx); /// assert_eq!(rx.next().await, Some(Ok(1))); /// assert_eq!(rx.next().await, None); /// # }); /// ``` fn sink_compat(self) -> Compat01As03Sink where Self: Sized, { Compat01As03Sink::new(self) } } #[cfg(feature = "sink")] impl Sink01CompatExt for Si {} fn poll_01_to_03(x: Result, E>) -> task03::Poll> { match x? { Async01::Ready(t) => task03::Poll::Ready(Ok(t)), Async01::NotReady => task03::Poll::Pending, } } impl Future03 for Compat01As03 { type Output = Result; fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll { poll_01_to_03(self.in_notify(cx, Future01::poll)) } } impl Stream03 for Compat01As03 { type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll> { match self.in_notify(cx, Stream01::poll)? { Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))), Async01::Ready(None) => task03::Poll::Ready(None), Async01::NotReady => task03::Poll::Pending, } } } /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version #[cfg(feature = "sink")] #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct Compat01As03Sink { pub(crate) inner: Spawn01, pub(crate) buffer: Option, pub(crate) close_started: bool, } #[cfg(feature = "sink")] impl Unpin for Compat01As03Sink {} #[cfg(feature = "sink")] impl Compat01As03Sink { /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. pub fn new(inner: S) -> Compat01As03Sink { Compat01As03Sink { inner: spawn01(inner), buffer: None, close_started: false } } fn in_notify( &mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R, ) -> R { let notify = &WakerToHandle(cx.waker()); self.inner.poll_fn_notify(notify, 0, f) } /// Get a reference to 0.1 Sink object contained within. pub fn get_ref(&self) -> &S { self.inner.get_ref() } /// Get a mutable reference to 0.1 Sink contained within. pub fn get_mut(&mut self) -> &mut S { self.inner.get_mut() } /// Consume this wrapper to return the underlying 0.1 Sink. pub fn into_inner(self) -> S { self.inner.into_inner() } } #[cfg(feature = "sink")] impl Stream03 for Compat01As03Sink where S: Stream01, { type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll> { match self.in_notify(cx, Stream01::poll)? { Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))), Async01::Ready(None) => task03::Poll::Ready(None), Async01::NotReady => task03::Poll::Pending, } } } #[cfg(feature = "sink")] impl Sink03 for Compat01As03Sink where S: Sink01, { type Error = S::SinkError; fn start_send( mut self: Pin<&mut Self>, item: SinkItem, ) -> Result<(), Self::Error> { debug_assert!(self.buffer.is_none()); self.buffer = Some(item); Ok(()) } fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll> { match self.buffer.take() { Some(item) => match self.in_notify(cx, |f| f.start_send(item))? { AsyncSink01::Ready => task03::Poll::Ready(Ok(())), AsyncSink01::NotReady(i) => { self.buffer = Some(i); task03::Poll::Pending } }, None => task03::Poll::Ready(Ok(())), } } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll> { let item = self.buffer.take(); match self.in_notify(cx, |f| match item { Some(i) => match f.start_send(i)? { AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)), AsyncSink01::NotReady(t) => { Ok((Async01::NotReady, Some(t))) } }, None => f.poll_complete().map(|i| (i, None)), })? { (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())), (Async01::NotReady, item) => { self.buffer = item; task03::Poll::Pending } } } fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll> { let item = self.buffer.take(); let close_started = self.close_started; let result = self.in_notify(cx, |f| { if !close_started { if let Some(item) = item { if let AsyncSink01::NotReady(item) = f.start_send(item)? { return Ok((Async01::NotReady, Some(item), false)); } } if let Async01::NotReady = f.poll_complete()? { return Ok((Async01::NotReady, None, false)); } } Ok((::close(f)?, None, true)) }); match result? { (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())), (Async01::NotReady, item, close_started) => { self.buffer = item; self.close_started = close_started; task03::Poll::Pending } } } } struct NotifyWaker(task03::Waker); #[allow(missing_debug_implementations)] // false positive: this is private type #[derive(Clone)] struct WakerToHandle<'a>(&'a task03::Waker); impl From> for NotifyHandle01 { fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 { let ptr = Box::new(NotifyWaker(handle.0.clone())); unsafe { NotifyHandle01::new(Box::into_raw(ptr)) } } } impl Notify01 for NotifyWaker { fn notify(&self, _: usize) { self.0.wake_by_ref(); } } unsafe impl UnsafeNotify01 for NotifyWaker { unsafe fn clone_raw(&self) -> NotifyHandle01 { WakerToHandle(&self.0).into() } unsafe fn drop_raw(&self) { let ptr: *const dyn UnsafeNotify01 = self; drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); } } #[cfg(feature = "io-compat")] mod io { use super::*; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; use std::io::Error; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead) pub trait AsyncRead01CompatExt: AsyncRead01 { /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3 /// [`AsyncRead`](futures_io::AsyncRead). /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::AsyncReadExt; /// use futures_util::compat::AsyncRead01CompatExt; /// /// let input = b"Hello World!"; /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input); /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat(); /// /// let mut output = Vec::with_capacity(12); /// reader.read_to_end(&mut output).await.unwrap(); /// assert_eq!(output, input); /// # }); /// ``` fn compat(self) -> Compat01As03 where Self: Sized, { Compat01As03::new(self) } } impl AsyncRead01CompatExt for R {} /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) pub trait AsyncWrite01CompatExt: AsyncWrite01 { /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3 /// [`AsyncWrite`](futures_io::AsyncWrite). /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::AsyncWriteExt; /// use futures_util::compat::AsyncWrite01CompatExt; /// /// let input = b"Hello World!"; /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12)); /// /// let mut writer = (&mut cursor).compat(); /// writer.write_all(input).await.unwrap(); /// /// assert_eq!(cursor.into_inner(), input); /// # }); /// ``` fn compat(self) -> Compat01As03 where Self: Sized, { Compat01As03::new(self) } } impl AsyncWrite01CompatExt for W {} impl AsyncRead03 for Compat01As03 { #[cfg(feature = "read-initializer")] unsafe fn initializer(&self) -> Initializer { // check if `prepare_uninitialized_buffer` needs zeroing if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) { Initializer::zeroing() } else { Initializer::nop() } } fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> task03::Poll> { poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf))) } } impl AsyncWrite03 for Compat01As03 { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> task03::Poll> { poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf))) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll> { poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll> { poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown)) } } }