1 use futures_01::executor::{
2     spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01,
3     UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
6 #[cfg(feature = "sink")]
7 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
8 use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink as Sink03;
11 use std::pin::Pin;
12 use std::task::Context;
13 
14 #[cfg(feature = "io-compat")]
15 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
16 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
17 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
18 
19 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
20 /// object to a futures 0.3-compatible version,
21 #[derive(Debug)]
22 #[must_use = "futures do nothing unless you `.await` or poll them"]
23 pub struct Compat01As03<T> {
24     pub(crate) inner: Spawn01<T>,
25 }
26 
27 impl<T> Unpin for Compat01As03<T> {}
28 
29 impl<T> Compat01As03<T> {
30     /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
31     /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self32     pub fn new(object: T) -> Self {
33         Self { inner: spawn01(object) }
34     }
35 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R36     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
37         let notify = &WakerToHandle(cx.waker());
38         self.inner.poll_fn_notify(notify, 0, f)
39     }
40 
41     /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T42     pub fn get_ref(&self) -> &T {
43         self.inner.get_ref()
44     }
45 
46     /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
47     /// within.
get_mut(&mut self) -> &mut T48     pub fn get_mut(&mut self) -> &mut T {
49         self.inner.get_mut()
50     }
51 
52     /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
53     /// AsyncWrite object.
into_inner(self) -> T54     pub fn into_inner(self) -> T {
55         self.inner.into_inner()
56     }
57 }
58 
59 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
60 pub trait Future01CompatExt: Future01 {
61     /// Converts a futures 0.1
62     /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
63     /// into a futures 0.3
64     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
65     ///
66     /// ```
67     /// # futures::executor::block_on(async {
68     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
69     /// # // feature issues
70     /// use futures_util::compat::Future01CompatExt;
71     ///
72     /// let future = futures_01::future::ok::<u32, ()>(1);
73     /// assert_eq!(future.compat().await, Ok(1));
74     /// # });
75     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,76     fn compat(self) -> Compat01As03<Self>
77     where
78         Self: Sized,
79     {
80         Compat01As03::new(self)
81     }
82 }
83 impl<Fut: Future01> Future01CompatExt for Fut {}
84 
85 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
86 pub trait Stream01CompatExt: Stream01 {
87     /// Converts a futures 0.1
88     /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
89     /// into a futures 0.3
90     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
91     ///
92     /// ```
93     /// # futures::executor::block_on(async {
94     /// use futures::stream::StreamExt;
95     /// use futures_util::compat::Stream01CompatExt;
96     ///
97     /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
98     /// let mut stream = stream.compat();
99     /// assert_eq!(stream.next().await, Some(Ok(1)));
100     /// assert_eq!(stream.next().await, None);
101     /// # });
102     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,103     fn compat(self) -> Compat01As03<Self>
104     where
105         Self: Sized,
106     {
107         Compat01As03::new(self)
108     }
109 }
110 impl<St: Stream01> Stream01CompatExt for St {}
111 
112 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
113 #[cfg(feature = "sink")]
114 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
115 pub trait Sink01CompatExt: Sink01 {
116     /// Converts a futures 0.1
117     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
118     /// into a futures 0.3
119     /// [`Sink<T, Error = E>`](futures_sink::Sink).
120     ///
121     /// ```
122     /// # futures::executor::block_on(async {
123     /// use futures::{sink::SinkExt, stream::StreamExt};
124     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
125     ///
126     /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
127     /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
128     ///
129     /// tx.send(1).await.unwrap();
130     /// drop(tx);
131     /// assert_eq!(rx.next().await, Some(Ok(1)));
132     /// assert_eq!(rx.next().await, None);
133     /// # });
134     /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,135     fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
136     where
137         Self: Sized,
138     {
139         Compat01As03Sink::new(self)
140     }
141 }
142 #[cfg(feature = "sink")]
143 impl<Si: Sink01> Sink01CompatExt for Si {}
144 
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>145 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
146     match x? {
147         Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
148         Async01::NotReady => task03::Poll::Pending,
149     }
150 }
151 
152 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
153     type Output = Result<Fut::Item, Fut::Error>;
154 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output>155     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
156         poll_01_to_03(self.in_notify(cx, Future01::poll))
157     }
158 }
159 
160 impl<St: Stream01> Stream03 for Compat01As03<St> {
161     type Item = Result<St::Item, St::Error>;
162 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>163     fn poll_next(
164         mut self: Pin<&mut Self>,
165         cx: &mut Context<'_>,
166     ) -> task03::Poll<Option<Self::Item>> {
167         match self.in_notify(cx, Stream01::poll)? {
168             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
169             Async01::Ready(None) => task03::Poll::Ready(None),
170             Async01::NotReady => task03::Poll::Pending,
171         }
172     }
173 }
174 
175 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
176 #[cfg(feature = "sink")]
177 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
178 #[derive(Debug)]
179 #[must_use = "sinks do nothing unless polled"]
180 pub struct Compat01As03Sink<S, SinkItem> {
181     pub(crate) inner: Spawn01<S>,
182     pub(crate) buffer: Option<SinkItem>,
183     pub(crate) close_started: bool,
184 }
185 
186 #[cfg(feature = "sink")]
187 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
188 
189 #[cfg(feature = "sink")]
190 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
191     /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self192     pub fn new(inner: S) -> Self {
193         Self { inner: spawn01(inner), buffer: None, close_started: false }
194     }
195 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R196     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R {
197         let notify = &WakerToHandle(cx.waker());
198         self.inner.poll_fn_notify(notify, 0, f)
199     }
200 
201     /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S202     pub fn get_ref(&self) -> &S {
203         self.inner.get_ref()
204     }
205 
206     /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S207     pub fn get_mut(&mut self) -> &mut S {
208         self.inner.get_mut()
209     }
210 
211     /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S212     pub fn into_inner(self) -> S {
213         self.inner.into_inner()
214     }
215 }
216 
217 #[cfg(feature = "sink")]
218 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
219 where
220     S: Stream01,
221 {
222     type Item = Result<S::Item, S::Error>;
223 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>224     fn poll_next(
225         mut self: Pin<&mut Self>,
226         cx: &mut Context<'_>,
227     ) -> task03::Poll<Option<Self::Item>> {
228         match self.in_notify(cx, Stream01::poll)? {
229             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
230             Async01::Ready(None) => task03::Poll::Ready(None),
231             Async01::NotReady => task03::Poll::Pending,
232         }
233     }
234 }
235 
236 #[cfg(feature = "sink")]
237 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
238 where
239     S: Sink01<SinkItem = SinkItem>,
240 {
241     type Error = S::SinkError;
242 
start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error>243     fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
244         debug_assert!(self.buffer.is_none());
245         self.buffer = Some(item);
246         Ok(())
247     }
248 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>249     fn poll_ready(
250         mut self: Pin<&mut Self>,
251         cx: &mut Context<'_>,
252     ) -> task03::Poll<Result<(), Self::Error>> {
253         match self.buffer.take() {
254             Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
255                 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
256                 AsyncSink01::NotReady(i) => {
257                     self.buffer = Some(i);
258                     task03::Poll::Pending
259                 }
260             },
261             None => task03::Poll::Ready(Ok(())),
262         }
263     }
264 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>265     fn poll_flush(
266         mut self: Pin<&mut Self>,
267         cx: &mut Context<'_>,
268     ) -> task03::Poll<Result<(), Self::Error>> {
269         let item = self.buffer.take();
270         match self.in_notify(cx, |f| match item {
271             Some(i) => match f.start_send(i)? {
272                 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
273                 AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))),
274             },
275             None => f.poll_complete().map(|i| (i, None)),
276         })? {
277             (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
278             (Async01::NotReady, item) => {
279                 self.buffer = item;
280                 task03::Poll::Pending
281             }
282         }
283     }
284 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>285     fn poll_close(
286         mut self: Pin<&mut Self>,
287         cx: &mut Context<'_>,
288     ) -> task03::Poll<Result<(), Self::Error>> {
289         let item = self.buffer.take();
290         let close_started = self.close_started;
291 
292         let result = self.in_notify(cx, |f| {
293             if !close_started {
294                 if let Some(item) = item {
295                     if let AsyncSink01::NotReady(item) = f.start_send(item)? {
296                         return Ok((Async01::NotReady, Some(item), false));
297                     }
298                 }
299 
300                 if let Async01::NotReady = f.poll_complete()? {
301                     return Ok((Async01::NotReady, None, false));
302                 }
303             }
304 
305             Ok((<S as Sink01>::close(f)?, None, true))
306         });
307 
308         match result? {
309             (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
310             (Async01::NotReady, item, close_started) => {
311                 self.buffer = item;
312                 self.close_started = close_started;
313                 task03::Poll::Pending
314             }
315         }
316     }
317 }
318 
319 struct NotifyWaker(task03::Waker);
320 
321 #[allow(missing_debug_implementations)] // false positive: this is private type
322 #[derive(Clone)]
323 struct WakerToHandle<'a>(&'a task03::Waker);
324 
325 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self326     fn from(handle: WakerToHandle<'_>) -> Self {
327         let ptr = Box::new(NotifyWaker(handle.0.clone()));
328 
329         unsafe { Self::new(Box::into_raw(ptr)) }
330     }
331 }
332 
333 impl Notify01 for NotifyWaker {
notify(&self, _: usize)334     fn notify(&self, _: usize) {
335         self.0.wake_by_ref();
336     }
337 }
338 
339 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01340     unsafe fn clone_raw(&self) -> NotifyHandle01 {
341         WakerToHandle(&self.0).into()
342     }
343 
drop_raw(&self)344     unsafe fn drop_raw(&self) {
345         let ptr: *const dyn UnsafeNotify01 = self;
346         drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
347     }
348 }
349 
350 #[cfg(feature = "io-compat")]
351 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
352 mod io {
353     use super::*;
354     #[cfg(feature = "read-initializer")]
355     use futures_io::Initializer;
356     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
357     use std::io::Error;
358     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
359 
360     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
361     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
362     pub trait AsyncRead01CompatExt: AsyncRead01 {
363         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
364         /// [`AsyncRead`](futures_io::AsyncRead).
365         ///
366         /// ```
367         /// # futures::executor::block_on(async {
368         /// use futures::io::AsyncReadExt;
369         /// use futures_util::compat::AsyncRead01CompatExt;
370         ///
371         /// let input = b"Hello World!";
372         /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
373         /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
374         ///
375         /// let mut output = Vec::with_capacity(12);
376         /// reader.read_to_end(&mut output).await.unwrap();
377         /// assert_eq!(output, input);
378         /// # });
379         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,380         fn compat(self) -> Compat01As03<Self>
381         where
382             Self: Sized,
383         {
384             Compat01As03::new(self)
385         }
386     }
387     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
388 
389     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
390     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
391     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
392         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
393         /// [`AsyncWrite`](futures_io::AsyncWrite).
394         ///
395         /// ```
396         /// # futures::executor::block_on(async {
397         /// use futures::io::AsyncWriteExt;
398         /// use futures_util::compat::AsyncWrite01CompatExt;
399         ///
400         /// let input = b"Hello World!";
401         /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
402         ///
403         /// let mut writer = (&mut cursor).compat();
404         /// writer.write_all(input).await.unwrap();
405         ///
406         /// assert_eq!(cursor.into_inner(), input);
407         /// # });
408         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,409         fn compat(self) -> Compat01As03<Self>
410         where
411             Self: Sized,
412         {
413             Compat01As03::new(self)
414         }
415     }
416     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
417 
418     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
419         #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer420         unsafe fn initializer(&self) -> Initializer {
421             // check if `prepare_uninitialized_buffer` needs zeroing
422             if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
423                 Initializer::zeroing()
424             } else {
425                 Initializer::nop()
426             }
427         }
428 
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> task03::Poll<Result<usize, Error>>429         fn poll_read(
430             mut self: Pin<&mut Self>,
431             cx: &mut Context<'_>,
432             buf: &mut [u8],
433         ) -> task03::Poll<Result<usize, Error>> {
434             poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
435         }
436     }
437 
438     impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> task03::Poll<Result<usize, Error>>439         fn poll_write(
440             mut self: Pin<&mut Self>,
441             cx: &mut Context<'_>,
442             buf: &[u8],
443         ) -> task03::Poll<Result<usize, Error>> {
444             poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
445         }
446 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>447         fn poll_flush(
448             mut self: Pin<&mut Self>,
449             cx: &mut Context<'_>,
450         ) -> task03::Poll<Result<(), Error>> {
451             poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
452         }
453 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>454         fn poll_close(
455             mut self: Pin<&mut Self>,
456             cx: &mut Context<'_>,
457         ) -> task03::Poll<Result<(), Error>> {
458             poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
459         }
460     }
461 }
462