1 use futures_01::executor::{
2     spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
3     Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{
6     Async as Async01, Future as Future01,
7     Stream as Stream01,
8 };
9 #[cfg(feature = "sink")]
10 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
11 use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03};
12 use std::pin::Pin;
13 use std::task::Context;
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16 
17 #[cfg(feature = "io-compat")]
18 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
19 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
20 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
21 
22 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
23 /// object to a futures 0.3-compatible version,
24 #[derive(Debug)]
25 #[must_use = "futures do nothing unless you `.await` or poll them"]
26 pub struct Compat01As03<T> {
27     pub(crate) inner: Spawn01<T>,
28 }
29 
30 impl<T> Unpin for Compat01As03<T> {}
31 
32 impl<T> Compat01As03<T> {
33     /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
34     /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self35     pub fn new(object: T) -> Self {
36         Self {
37             inner: spawn01(object),
38         }
39     }
40 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R41     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
42         let notify = &WakerToHandle(cx.waker());
43         self.inner.poll_fn_notify(notify, 0, f)
44     }
45 
46     /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T47     pub fn get_ref(&self) -> &T {
48         self.inner.get_ref()
49     }
50 
51     /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
52     /// within.
get_mut(&mut self) -> &mut T53     pub fn get_mut(&mut self) -> &mut T {
54         self.inner.get_mut()
55     }
56 
57     /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
58     /// AsyncWrite object.
into_inner(self) -> T59     pub fn into_inner(self) -> T {
60         self.inner.into_inner()
61     }
62 }
63 
64 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
65 pub trait Future01CompatExt: Future01 {
66     /// Converts a futures 0.1
67     /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
68     /// into a futures 0.3
69     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
70     ///
71     /// ```
72     /// # futures::executor::block_on(async {
73     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
74     /// # // feature issues
75     /// use futures_util::compat::Future01CompatExt;
76     ///
77     /// let future = futures_01::future::ok::<u32, ()>(1);
78     /// assert_eq!(future.compat().await, Ok(1));
79     /// # });
80     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,81     fn compat(self) -> Compat01As03<Self>
82     where
83         Self: Sized,
84     {
85         Compat01As03::new(self)
86     }
87 }
88 impl<Fut: Future01> Future01CompatExt for Fut {}
89 
90 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
91 pub trait Stream01CompatExt: Stream01 {
92     /// Converts a futures 0.1
93     /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
94     /// into a futures 0.3
95     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
96     ///
97     /// ```
98     /// # futures::executor::block_on(async {
99     /// use futures::stream::StreamExt;
100     /// use futures_util::compat::Stream01CompatExt;
101     ///
102     /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
103     /// let mut stream = stream.compat();
104     /// assert_eq!(stream.next().await, Some(Ok(1)));
105     /// assert_eq!(stream.next().await, None);
106     /// # });
107     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,108     fn compat(self) -> Compat01As03<Self>
109     where
110         Self: Sized,
111     {
112         Compat01As03::new(self)
113     }
114 }
115 impl<St: Stream01> Stream01CompatExt for St {}
116 
117 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
118 #[cfg(feature = "sink")]
119 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
120 pub trait Sink01CompatExt: Sink01 {
121     /// Converts a futures 0.1
122     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
123     /// into a futures 0.3
124     /// [`Sink<T, Error = E>`](futures_sink::Sink).
125     ///
126     /// ```
127     /// # futures::executor::block_on(async {
128     /// use futures::{sink::SinkExt, stream::StreamExt};
129     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
130     ///
131     /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
132     /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
133     ///
134     /// tx.send(1).await.unwrap();
135     /// drop(tx);
136     /// assert_eq!(rx.next().await, Some(Ok(1)));
137     /// assert_eq!(rx.next().await, None);
138     /// # });
139     /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,140     fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
141     where
142         Self: Sized,
143     {
144         Compat01As03Sink::new(self)
145     }
146 }
147 #[cfg(feature = "sink")]
148 impl<Si: Sink01> Sink01CompatExt for Si {}
149 
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>150 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
151     match x? {
152         Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
153         Async01::NotReady => task03::Poll::Pending,
154     }
155 }
156 
157 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
158     type Output = Result<Fut::Item, Fut::Error>;
159 
poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Self::Output>160     fn poll(
161         mut self: Pin<&mut Self>,
162         cx: &mut Context<'_>,
163     ) -> task03::Poll<Self::Output> {
164         poll_01_to_03(self.in_notify(cx, Future01::poll))
165     }
166 }
167 
168 impl<St: Stream01> Stream03 for Compat01As03<St> {
169     type Item = Result<St::Item, St::Error>;
170 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>171     fn poll_next(
172         mut self: Pin<&mut Self>,
173         cx: &mut Context<'_>,
174     ) -> task03::Poll<Option<Self::Item>> {
175         match self.in_notify(cx, Stream01::poll)? {
176             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
177             Async01::Ready(None) => task03::Poll::Ready(None),
178             Async01::NotReady => task03::Poll::Pending,
179         }
180     }
181 }
182 
183 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
184 #[cfg(feature = "sink")]
185 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
186 #[derive(Debug)]
187 #[must_use = "sinks do nothing unless polled"]
188 pub struct Compat01As03Sink<S, SinkItem> {
189     pub(crate) inner: Spawn01<S>,
190     pub(crate) buffer: Option<SinkItem>,
191     pub(crate) close_started: bool,
192 }
193 
194 #[cfg(feature = "sink")]
195 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
196 
197 #[cfg(feature = "sink")]
198 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
199     /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self200     pub fn new(inner: S) -> Self {
201         Self {
202             inner: spawn01(inner),
203             buffer: None,
204             close_started: false
205         }
206     }
207 
in_notify<R>( &mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R, ) -> R208     fn in_notify<R>(
209         &mut self,
210         cx: &mut Context<'_>,
211         f: impl FnOnce(&mut S) -> R,
212     ) -> R {
213         let notify = &WakerToHandle(cx.waker());
214         self.inner.poll_fn_notify(notify, 0, f)
215     }
216 
217     /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S218     pub fn get_ref(&self) -> &S {
219         self.inner.get_ref()
220     }
221 
222     /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S223     pub fn get_mut(&mut self) -> &mut S {
224         self.inner.get_mut()
225     }
226 
227     /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S228     pub fn into_inner(self) -> S {
229         self.inner.into_inner()
230     }
231 }
232 
233 #[cfg(feature = "sink")]
234 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
235 where
236     S: Stream01,
237 {
238     type Item = Result<S::Item, S::Error>;
239 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>240     fn poll_next(
241         mut self: Pin<&mut Self>,
242         cx: &mut Context<'_>,
243     ) -> task03::Poll<Option<Self::Item>> {
244         match self.in_notify(cx, Stream01::poll)? {
245             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
246             Async01::Ready(None) => task03::Poll::Ready(None),
247             Async01::NotReady => task03::Poll::Pending,
248         }
249     }
250 }
251 
252 #[cfg(feature = "sink")]
253 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
254 where
255     S: Sink01<SinkItem = SinkItem>,
256 {
257     type Error = S::SinkError;
258 
start_send( mut self: Pin<&mut Self>, item: SinkItem, ) -> Result<(), Self::Error>259     fn start_send(
260         mut self: Pin<&mut Self>,
261         item: SinkItem,
262     ) -> Result<(), Self::Error> {
263         debug_assert!(self.buffer.is_none());
264         self.buffer = Some(item);
265         Ok(())
266     }
267 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>268     fn poll_ready(
269         mut self: Pin<&mut Self>,
270         cx: &mut Context<'_>,
271     ) -> task03::Poll<Result<(), Self::Error>> {
272         match self.buffer.take() {
273             Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
274                 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
275                 AsyncSink01::NotReady(i) => {
276                     self.buffer = Some(i);
277                     task03::Poll::Pending
278                 }
279             },
280             None => task03::Poll::Ready(Ok(())),
281         }
282     }
283 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>284     fn poll_flush(
285         mut self: Pin<&mut Self>,
286         cx: &mut Context<'_>,
287     ) -> task03::Poll<Result<(), Self::Error>> {
288         let item = self.buffer.take();
289         match self.in_notify(cx, |f| match item {
290             Some(i) => match f.start_send(i)? {
291                 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
292                 AsyncSink01::NotReady(t) => {
293                     Ok((Async01::NotReady, Some(t)))
294                 }
295             },
296             None => f.poll_complete().map(|i| (i, None)),
297         })? {
298             (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
299             (Async01::NotReady, item) => {
300                 self.buffer = item;
301                 task03::Poll::Pending
302             }
303         }
304     }
305 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>306     fn poll_close(
307         mut self: Pin<&mut Self>,
308         cx: &mut Context<'_>,
309     ) -> task03::Poll<Result<(), Self::Error>> {
310         let item = self.buffer.take();
311         let close_started = self.close_started;
312 
313         let result = self.in_notify(cx, |f| {
314             if !close_started {
315                 if let Some(item) = item {
316                     if let AsyncSink01::NotReady(item) = f.start_send(item)? {
317                         return Ok((Async01::NotReady, Some(item), false));
318                     }
319                 }
320 
321                 if let Async01::NotReady = f.poll_complete()? {
322                     return Ok((Async01::NotReady, None, false));
323                 }
324             }
325 
326             Ok((<S as Sink01>::close(f)?, None, true))
327         });
328 
329         match result? {
330             (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
331             (Async01::NotReady, item, close_started) => {
332                 self.buffer = item;
333                 self.close_started = close_started;
334                 task03::Poll::Pending
335             }
336         }
337     }
338 }
339 
340 struct NotifyWaker(task03::Waker);
341 
342 #[allow(missing_debug_implementations)] // false positive: this is private type
343 #[derive(Clone)]
344 struct WakerToHandle<'a>(&'a task03::Waker);
345 
346 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self347     fn from(handle: WakerToHandle<'_>) -> Self {
348         let ptr = Box::new(NotifyWaker(handle.0.clone()));
349 
350         unsafe { Self::new(Box::into_raw(ptr)) }
351     }
352 }
353 
354 impl Notify01 for NotifyWaker {
notify(&self, _: usize)355     fn notify(&self, _: usize) {
356         self.0.wake_by_ref();
357     }
358 }
359 
360 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01361     unsafe fn clone_raw(&self) -> NotifyHandle01 {
362         WakerToHandle(&self.0).into()
363     }
364 
drop_raw(&self)365     unsafe fn drop_raw(&self) {
366         let ptr: *const dyn UnsafeNotify01 = self;
367         drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
368     }
369 }
370 
371 #[cfg(feature = "io-compat")]
372 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
373 mod io {
374     use super::*;
375     #[cfg(feature = "read-initializer")]
376     use futures_io::Initializer;
377     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
378     use std::io::Error;
379     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
380 
381     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
382     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
383     pub trait AsyncRead01CompatExt: AsyncRead01 {
384         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
385         /// [`AsyncRead`](futures_io::AsyncRead).
386         ///
387         /// ```
388         /// # futures::executor::block_on(async {
389         /// use futures::io::AsyncReadExt;
390         /// use futures_util::compat::AsyncRead01CompatExt;
391         ///
392         /// let input = b"Hello World!";
393         /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
394         /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
395         ///
396         /// let mut output = Vec::with_capacity(12);
397         /// reader.read_to_end(&mut output).await.unwrap();
398         /// assert_eq!(output, input);
399         /// # });
400         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,401         fn compat(self) -> Compat01As03<Self>
402         where
403             Self: Sized,
404         {
405             Compat01As03::new(self)
406         }
407     }
408     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
409 
410     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
411     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
412     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
413         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
414         /// [`AsyncWrite`](futures_io::AsyncWrite).
415         ///
416         /// ```
417         /// # futures::executor::block_on(async {
418         /// use futures::io::AsyncWriteExt;
419         /// use futures_util::compat::AsyncWrite01CompatExt;
420         ///
421         /// let input = b"Hello World!";
422         /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
423         ///
424         /// let mut writer = (&mut cursor).compat();
425         /// writer.write_all(input).await.unwrap();
426         ///
427         /// assert_eq!(cursor.into_inner(), input);
428         /// # });
429         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,430         fn compat(self) -> Compat01As03<Self>
431         where
432             Self: Sized,
433         {
434             Compat01As03::new(self)
435         }
436     }
437     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
438 
439     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
440         #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer441         unsafe fn initializer(&self) -> Initializer {
442             // check if `prepare_uninitialized_buffer` needs zeroing
443             if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
444                 Initializer::zeroing()
445             } else {
446                 Initializer::nop()
447             }
448         }
449 
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> task03::Poll<Result<usize, Error>>450         fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
451             -> task03::Poll<Result<usize, Error>>
452         {
453             poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
454         }
455     }
456 
457     impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> task03::Poll<Result<usize, Error>>458         fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
459             -> task03::Poll<Result<usize, Error>>
460         {
461             poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
462         }
463 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>464         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
465             -> task03::Poll<Result<(), Error>>
466         {
467             poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
468         }
469 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>470         fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
471             -> task03::Poll<Result<(), Error>>
472         {
473             poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
474         }
475     }
476 }
477