1 use futures_01::executor::{
2     spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
3     Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{
6     Async as Async01, Future as Future01,
7     Stream as Stream01,
8 };
9 #[cfg(feature = "sink")]
10 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
11 use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03};
12 use std::pin::Pin;
13 use std::task::Context;
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16 
17 #[cfg(feature = "io-compat")]
18 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
19 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
20 
21 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
22 /// object to a futures 0.3-compatible version,
23 #[derive(Debug)]
24 #[must_use = "futures do nothing unless you `.await` or poll them"]
25 pub struct Compat01As03<T> {
26     pub(crate) inner: Spawn01<T>,
27 }
28 
29 impl<T> Unpin for Compat01As03<T> {}
30 
31 impl<T> Compat01As03<T> {
32     /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
33     /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Compat01As03<T>34     pub fn new(object: T) -> Compat01As03<T> {
35         Compat01As03 {
36             inner: spawn01(object),
37         }
38     }
39 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R40     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
41         let notify = &WakerToHandle(cx.waker());
42         self.inner.poll_fn_notify(notify, 0, f)
43     }
44 
45     /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T46     pub fn get_ref(&self) -> &T {
47         self.inner.get_ref()
48     }
49 
50     /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
51     /// within.
get_mut(&mut self) -> &mut T52     pub fn get_mut(&mut self) -> &mut T {
53         self.inner.get_mut()
54     }
55 
56     /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
57     /// AsyncWrite object.
into_inner(self) -> T58     pub fn into_inner(self) -> T {
59         self.inner.into_inner()
60     }
61 }
62 
63 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
64 pub trait Future01CompatExt: Future01 {
65     /// Converts a futures 0.1
66     /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
67     /// into a futures 0.3
68     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
69     ///
70     /// ```
71     /// # futures::executor::block_on(async {
72     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
73     /// # // feature issues
74     /// use futures_util::compat::Future01CompatExt;
75     ///
76     /// let future = futures_01::future::ok::<u32, ()>(1);
77     /// assert_eq!(future.compat().await, Ok(1));
78     /// # });
79     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,80     fn compat(self) -> Compat01As03<Self>
81     where
82         Self: Sized,
83     {
84         Compat01As03::new(self)
85     }
86 }
87 impl<Fut: Future01> Future01CompatExt for Fut {}
88 
89 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
90 pub trait Stream01CompatExt: Stream01 {
91     /// Converts a futures 0.1
92     /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
93     /// into a futures 0.3
94     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
95     ///
96     /// ```
97     /// # futures::executor::block_on(async {
98     /// use futures::stream::StreamExt;
99     /// use futures_util::compat::Stream01CompatExt;
100     ///
101     /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
102     /// let mut stream = stream.compat();
103     /// assert_eq!(stream.next().await, Some(Ok(1)));
104     /// assert_eq!(stream.next().await, None);
105     /// # });
106     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,107     fn compat(self) -> Compat01As03<Self>
108     where
109         Self: Sized,
110     {
111         Compat01As03::new(self)
112     }
113 }
114 impl<St: Stream01> Stream01CompatExt for St {}
115 
116 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
117 #[cfg(feature = "sink")]
118 pub trait Sink01CompatExt: Sink01 {
119     /// Converts a futures 0.1
120     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
121     /// into a futures 0.3
122     /// [`Sink<T, Error = E>`](futures_sink::Sink).
123     ///
124     /// ```
125     /// # futures::executor::block_on(async {
126     /// use futures::{sink::SinkExt, stream::StreamExt};
127     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
128     ///
129     /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
130     /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
131     ///
132     /// tx.send(1).await.unwrap();
133     /// drop(tx);
134     /// assert_eq!(rx.next().await, Some(Ok(1)));
135     /// assert_eq!(rx.next().await, None);
136     /// # });
137     /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,138     fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
139     where
140         Self: Sized,
141     {
142         Compat01As03Sink::new(self)
143     }
144 }
145 #[cfg(feature = "sink")]
146 impl<Si: Sink01> Sink01CompatExt for Si {}
147 
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>148 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
149     match x? {
150         Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
151         Async01::NotReady => task03::Poll::Pending,
152     }
153 }
154 
155 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
156     type Output = Result<Fut::Item, Fut::Error>;
157 
poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Self::Output>158     fn poll(
159         mut self: Pin<&mut Self>,
160         cx: &mut Context<'_>,
161     ) -> task03::Poll<Self::Output> {
162         poll_01_to_03(self.in_notify(cx, Future01::poll))
163     }
164 }
165 
166 impl<St: Stream01> Stream03 for Compat01As03<St> {
167     type Item = Result<St::Item, St::Error>;
168 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>169     fn poll_next(
170         mut self: Pin<&mut Self>,
171         cx: &mut Context<'_>,
172     ) -> task03::Poll<Option<Self::Item>> {
173         match self.in_notify(cx, Stream01::poll)? {
174             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
175             Async01::Ready(None) => task03::Poll::Ready(None),
176             Async01::NotReady => task03::Poll::Pending,
177         }
178     }
179 }
180 
181 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
182 #[cfg(feature = "sink")]
183 #[derive(Debug)]
184 #[must_use = "sinks do nothing unless polled"]
185 pub struct Compat01As03Sink<S, SinkItem> {
186     pub(crate) inner: Spawn01<S>,
187     pub(crate) buffer: Option<SinkItem>,
188     pub(crate) close_started: bool,
189 }
190 
191 #[cfg(feature = "sink")]
192 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
193 
194 #[cfg(feature = "sink")]
195 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
196     /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Compat01As03Sink<S, SinkItem>197     pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> {
198         Compat01As03Sink {
199             inner: spawn01(inner),
200             buffer: None,
201             close_started: false
202         }
203     }
204 
in_notify<R>( &mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R, ) -> R205     fn in_notify<R>(
206         &mut self,
207         cx: &mut Context<'_>,
208         f: impl FnOnce(&mut S) -> R,
209     ) -> R {
210         let notify = &WakerToHandle(cx.waker());
211         self.inner.poll_fn_notify(notify, 0, f)
212     }
213 
214     /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S215     pub fn get_ref(&self) -> &S {
216         self.inner.get_ref()
217     }
218 
219     /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S220     pub fn get_mut(&mut self) -> &mut S {
221         self.inner.get_mut()
222     }
223 
224     /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S225     pub fn into_inner(self) -> S {
226         self.inner.into_inner()
227     }
228 }
229 
230 #[cfg(feature = "sink")]
231 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
232 where
233     S: Stream01,
234 {
235     type Item = Result<S::Item, S::Error>;
236 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>237     fn poll_next(
238         mut self: Pin<&mut Self>,
239         cx: &mut Context<'_>,
240     ) -> task03::Poll<Option<Self::Item>> {
241         match self.in_notify(cx, Stream01::poll)? {
242             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
243             Async01::Ready(None) => task03::Poll::Ready(None),
244             Async01::NotReady => task03::Poll::Pending,
245         }
246     }
247 }
248 
249 #[cfg(feature = "sink")]
250 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
251 where
252     S: Sink01<SinkItem = SinkItem>,
253 {
254     type Error = S::SinkError;
255 
start_send( mut self: Pin<&mut Self>, item: SinkItem, ) -> Result<(), Self::Error>256     fn start_send(
257         mut self: Pin<&mut Self>,
258         item: SinkItem,
259     ) -> Result<(), Self::Error> {
260         debug_assert!(self.buffer.is_none());
261         self.buffer = Some(item);
262         Ok(())
263     }
264 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>265     fn poll_ready(
266         mut self: Pin<&mut Self>,
267         cx: &mut Context<'_>,
268     ) -> task03::Poll<Result<(), Self::Error>> {
269         match self.buffer.take() {
270             Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
271                 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
272                 AsyncSink01::NotReady(i) => {
273                     self.buffer = Some(i);
274                     task03::Poll::Pending
275                 }
276             },
277             None => task03::Poll::Ready(Ok(())),
278         }
279     }
280 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>281     fn poll_flush(
282         mut self: Pin<&mut Self>,
283         cx: &mut Context<'_>,
284     ) -> task03::Poll<Result<(), Self::Error>> {
285         let item = self.buffer.take();
286         match self.in_notify(cx, |f| match item {
287             Some(i) => match f.start_send(i)? {
288                 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
289                 AsyncSink01::NotReady(t) => {
290                     Ok((Async01::NotReady, Some(t)))
291                 }
292             },
293             None => f.poll_complete().map(|i| (i, None)),
294         })? {
295             (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
296             (Async01::NotReady, item) => {
297                 self.buffer = item;
298                 task03::Poll::Pending
299             }
300         }
301     }
302 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>303     fn poll_close(
304         mut self: Pin<&mut Self>,
305         cx: &mut Context<'_>,
306     ) -> task03::Poll<Result<(), Self::Error>> {
307         let item = self.buffer.take();
308         let close_started = self.close_started;
309 
310         let result = self.in_notify(cx, |f| {
311             if !close_started {
312                 if let Some(item) = item {
313                     if let AsyncSink01::NotReady(item) = f.start_send(item)? {
314                         return Ok((Async01::NotReady, Some(item), false));
315                     }
316                 }
317 
318                 if let Async01::NotReady = f.poll_complete()? {
319                     return Ok((Async01::NotReady, None, false));
320                 }
321             }
322 
323             Ok((<S as Sink01>::close(f)?, None, true))
324         });
325 
326         match result? {
327             (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
328             (Async01::NotReady, item, close_started) => {
329                 self.buffer = item;
330                 self.close_started = close_started;
331                 task03::Poll::Pending
332             }
333         }
334     }
335 }
336 
337 struct NotifyWaker(task03::Waker);
338 
339 #[allow(missing_debug_implementations)] // false positive: this is private type
340 #[derive(Clone)]
341 struct WakerToHandle<'a>(&'a task03::Waker);
342 
343 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> NotifyHandle01344     fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 {
345         let ptr = Box::new(NotifyWaker(handle.0.clone()));
346 
347         unsafe { NotifyHandle01::new(Box::into_raw(ptr)) }
348     }
349 }
350 
351 impl Notify01 for NotifyWaker {
notify(&self, _: usize)352     fn notify(&self, _: usize) {
353         self.0.wake_by_ref();
354     }
355 }
356 
357 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01358     unsafe fn clone_raw(&self) -> NotifyHandle01 {
359         WakerToHandle(&self.0).into()
360     }
361 
drop_raw(&self)362     unsafe fn drop_raw(&self) {
363         let ptr: *const dyn UnsafeNotify01 = self;
364         drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
365     }
366 }
367 
368 #[cfg(feature = "io-compat")]
369 mod io {
370     use super::*;
371     #[cfg(feature = "read-initializer")]
372     use futures_io::Initializer;
373     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
374     use std::io::Error;
375     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
376 
377     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
378     pub trait AsyncRead01CompatExt: AsyncRead01 {
379         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
380         /// [`AsyncRead`](futures_io::AsyncRead).
381         ///
382         /// ```
383         /// # futures::executor::block_on(async {
384         /// use futures::io::AsyncReadExt;
385         /// use futures_util::compat::AsyncRead01CompatExt;
386         ///
387         /// let input = b"Hello World!";
388         /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
389         /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
390         ///
391         /// let mut output = Vec::with_capacity(12);
392         /// reader.read_to_end(&mut output).await.unwrap();
393         /// assert_eq!(output, input);
394         /// # });
395         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,396         fn compat(self) -> Compat01As03<Self>
397         where
398             Self: Sized,
399         {
400             Compat01As03::new(self)
401         }
402     }
403     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
404 
405     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
406     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
407         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
408         /// [`AsyncWrite`](futures_io::AsyncWrite).
409         ///
410         /// ```
411         /// # futures::executor::block_on(async {
412         /// use futures::io::AsyncWriteExt;
413         /// use futures_util::compat::AsyncWrite01CompatExt;
414         ///
415         /// let input = b"Hello World!";
416         /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
417         ///
418         /// let mut writer = (&mut cursor).compat();
419         /// writer.write_all(input).await.unwrap();
420         ///
421         /// assert_eq!(cursor.into_inner(), input);
422         /// # });
423         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,424         fn compat(self) -> Compat01As03<Self>
425         where
426             Self: Sized,
427         {
428             Compat01As03::new(self)
429         }
430     }
431     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
432 
433     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
434         #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer435         unsafe fn initializer(&self) -> Initializer {
436             // check if `prepare_uninitialized_buffer` needs zeroing
437             if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
438                 Initializer::zeroing()
439             } else {
440                 Initializer::nop()
441             }
442         }
443 
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> task03::Poll<Result<usize, Error>>444         fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
445             -> task03::Poll<Result<usize, Error>>
446         {
447             poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
448         }
449     }
450 
451     impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> task03::Poll<Result<usize, Error>>452         fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
453             -> task03::Poll<Result<usize, Error>>
454         {
455             poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
456         }
457 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>458         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
459             -> task03::Poll<Result<(), Error>>
460         {
461             poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
462         }
463 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>464         fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
465             -> task03::Poll<Result<(), Error>>
466         {
467             poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
468         }
469     }
470 }
471