1 use futures_01::executor::{
2 spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
3 Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{
6 Async as Async01, Future as Future01,
7 Stream as Stream01,
8 };
9 #[cfg(feature = "sink")]
10 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
11 use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03};
12 use std::pin::Pin;
13 use std::task::Context;
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16
17 #[cfg(feature = "io-compat")]
18 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
19 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
20
21 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
22 /// object to a futures 0.3-compatible version,
23 #[derive(Debug)]
24 #[must_use = "futures do nothing unless you `.await` or poll them"]
25 pub struct Compat01As03<T> {
26 pub(crate) inner: Spawn01<T>,
27 }
28
29 impl<T> Unpin for Compat01As03<T> {}
30
31 impl<T> Compat01As03<T> {
32 /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
33 /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Compat01As03<T>34 pub fn new(object: T) -> Compat01As03<T> {
35 Compat01As03 {
36 inner: spawn01(object),
37 }
38 }
39
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R40 fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
41 let notify = &WakerToHandle(cx.waker());
42 self.inner.poll_fn_notify(notify, 0, f)
43 }
44
45 /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T46 pub fn get_ref(&self) -> &T {
47 self.inner.get_ref()
48 }
49
50 /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
51 /// within.
get_mut(&mut self) -> &mut T52 pub fn get_mut(&mut self) -> &mut T {
53 self.inner.get_mut()
54 }
55
56 /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
57 /// AsyncWrite object.
into_inner(self) -> T58 pub fn into_inner(self) -> T {
59 self.inner.into_inner()
60 }
61 }
62
63 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
64 pub trait Future01CompatExt: Future01 {
65 /// Converts a futures 0.1
66 /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
67 /// into a futures 0.3
68 /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
69 ///
70 /// ```
71 /// # futures::executor::block_on(async {
72 /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
73 /// # // feature issues
74 /// use futures_util::compat::Future01CompatExt;
75 ///
76 /// let future = futures_01::future::ok::<u32, ()>(1);
77 /// assert_eq!(future.compat().await, Ok(1));
78 /// # });
79 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,80 fn compat(self) -> Compat01As03<Self>
81 where
82 Self: Sized,
83 {
84 Compat01As03::new(self)
85 }
86 }
87 impl<Fut: Future01> Future01CompatExt for Fut {}
88
89 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
90 pub trait Stream01CompatExt: Stream01 {
91 /// Converts a futures 0.1
92 /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
93 /// into a futures 0.3
94 /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
95 ///
96 /// ```
97 /// # futures::executor::block_on(async {
98 /// use futures::stream::StreamExt;
99 /// use futures_util::compat::Stream01CompatExt;
100 ///
101 /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
102 /// let mut stream = stream.compat();
103 /// assert_eq!(stream.next().await, Some(Ok(1)));
104 /// assert_eq!(stream.next().await, None);
105 /// # });
106 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,107 fn compat(self) -> Compat01As03<Self>
108 where
109 Self: Sized,
110 {
111 Compat01As03::new(self)
112 }
113 }
114 impl<St: Stream01> Stream01CompatExt for St {}
115
116 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
117 #[cfg(feature = "sink")]
118 pub trait Sink01CompatExt: Sink01 {
119 /// Converts a futures 0.1
120 /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
121 /// into a futures 0.3
122 /// [`Sink<T, Error = E>`](futures_sink::Sink).
123 ///
124 /// ```
125 /// # futures::executor::block_on(async {
126 /// use futures::{sink::SinkExt, stream::StreamExt};
127 /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
128 ///
129 /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
130 /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
131 ///
132 /// tx.send(1).await.unwrap();
133 /// drop(tx);
134 /// assert_eq!(rx.next().await, Some(Ok(1)));
135 /// assert_eq!(rx.next().await, None);
136 /// # });
137 /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,138 fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
139 where
140 Self: Sized,
141 {
142 Compat01As03Sink::new(self)
143 }
144 }
145 #[cfg(feature = "sink")]
146 impl<Si: Sink01> Sink01CompatExt for Si {}
147
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>148 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
149 match x? {
150 Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
151 Async01::NotReady => task03::Poll::Pending,
152 }
153 }
154
155 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
156 type Output = Result<Fut::Item, Fut::Error>;
157
poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Self::Output>158 fn poll(
159 mut self: Pin<&mut Self>,
160 cx: &mut Context<'_>,
161 ) -> task03::Poll<Self::Output> {
162 poll_01_to_03(self.in_notify(cx, Future01::poll))
163 }
164 }
165
166 impl<St: Stream01> Stream03 for Compat01As03<St> {
167 type Item = Result<St::Item, St::Error>;
168
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>169 fn poll_next(
170 mut self: Pin<&mut Self>,
171 cx: &mut Context<'_>,
172 ) -> task03::Poll<Option<Self::Item>> {
173 match self.in_notify(cx, Stream01::poll)? {
174 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
175 Async01::Ready(None) => task03::Poll::Ready(None),
176 Async01::NotReady => task03::Poll::Pending,
177 }
178 }
179 }
180
181 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
182 #[cfg(feature = "sink")]
183 #[derive(Debug)]
184 #[must_use = "sinks do nothing unless polled"]
185 pub struct Compat01As03Sink<S, SinkItem> {
186 pub(crate) inner: Spawn01<S>,
187 pub(crate) buffer: Option<SinkItem>,
188 pub(crate) close_started: bool,
189 }
190
191 #[cfg(feature = "sink")]
192 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
193
194 #[cfg(feature = "sink")]
195 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
196 /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Compat01As03Sink<S, SinkItem>197 pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> {
198 Compat01As03Sink {
199 inner: spawn01(inner),
200 buffer: None,
201 close_started: false
202 }
203 }
204
in_notify<R>( &mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R, ) -> R205 fn in_notify<R>(
206 &mut self,
207 cx: &mut Context<'_>,
208 f: impl FnOnce(&mut S) -> R,
209 ) -> R {
210 let notify = &WakerToHandle(cx.waker());
211 self.inner.poll_fn_notify(notify, 0, f)
212 }
213
214 /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S215 pub fn get_ref(&self) -> &S {
216 self.inner.get_ref()
217 }
218
219 /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S220 pub fn get_mut(&mut self) -> &mut S {
221 self.inner.get_mut()
222 }
223
224 /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S225 pub fn into_inner(self) -> S {
226 self.inner.into_inner()
227 }
228 }
229
230 #[cfg(feature = "sink")]
231 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
232 where
233 S: Stream01,
234 {
235 type Item = Result<S::Item, S::Error>;
236
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>237 fn poll_next(
238 mut self: Pin<&mut Self>,
239 cx: &mut Context<'_>,
240 ) -> task03::Poll<Option<Self::Item>> {
241 match self.in_notify(cx, Stream01::poll)? {
242 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
243 Async01::Ready(None) => task03::Poll::Ready(None),
244 Async01::NotReady => task03::Poll::Pending,
245 }
246 }
247 }
248
249 #[cfg(feature = "sink")]
250 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
251 where
252 S: Sink01<SinkItem = SinkItem>,
253 {
254 type Error = S::SinkError;
255
start_send( mut self: Pin<&mut Self>, item: SinkItem, ) -> Result<(), Self::Error>256 fn start_send(
257 mut self: Pin<&mut Self>,
258 item: SinkItem,
259 ) -> Result<(), Self::Error> {
260 debug_assert!(self.buffer.is_none());
261 self.buffer = Some(item);
262 Ok(())
263 }
264
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>265 fn poll_ready(
266 mut self: Pin<&mut Self>,
267 cx: &mut Context<'_>,
268 ) -> task03::Poll<Result<(), Self::Error>> {
269 match self.buffer.take() {
270 Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
271 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
272 AsyncSink01::NotReady(i) => {
273 self.buffer = Some(i);
274 task03::Poll::Pending
275 }
276 },
277 None => task03::Poll::Ready(Ok(())),
278 }
279 }
280
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>281 fn poll_flush(
282 mut self: Pin<&mut Self>,
283 cx: &mut Context<'_>,
284 ) -> task03::Poll<Result<(), Self::Error>> {
285 let item = self.buffer.take();
286 match self.in_notify(cx, |f| match item {
287 Some(i) => match f.start_send(i)? {
288 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
289 AsyncSink01::NotReady(t) => {
290 Ok((Async01::NotReady, Some(t)))
291 }
292 },
293 None => f.poll_complete().map(|i| (i, None)),
294 })? {
295 (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
296 (Async01::NotReady, item) => {
297 self.buffer = item;
298 task03::Poll::Pending
299 }
300 }
301 }
302
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>303 fn poll_close(
304 mut self: Pin<&mut Self>,
305 cx: &mut Context<'_>,
306 ) -> task03::Poll<Result<(), Self::Error>> {
307 let item = self.buffer.take();
308 let close_started = self.close_started;
309
310 let result = self.in_notify(cx, |f| {
311 if !close_started {
312 if let Some(item) = item {
313 if let AsyncSink01::NotReady(item) = f.start_send(item)? {
314 return Ok((Async01::NotReady, Some(item), false));
315 }
316 }
317
318 if let Async01::NotReady = f.poll_complete()? {
319 return Ok((Async01::NotReady, None, false));
320 }
321 }
322
323 Ok((<S as Sink01>::close(f)?, None, true))
324 });
325
326 match result? {
327 (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
328 (Async01::NotReady, item, close_started) => {
329 self.buffer = item;
330 self.close_started = close_started;
331 task03::Poll::Pending
332 }
333 }
334 }
335 }
336
337 struct NotifyWaker(task03::Waker);
338
339 #[allow(missing_debug_implementations)] // false positive: this is private type
340 #[derive(Clone)]
341 struct WakerToHandle<'a>(&'a task03::Waker);
342
343 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> NotifyHandle01344 fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 {
345 let ptr = Box::new(NotifyWaker(handle.0.clone()));
346
347 unsafe { NotifyHandle01::new(Box::into_raw(ptr)) }
348 }
349 }
350
351 impl Notify01 for NotifyWaker {
notify(&self, _: usize)352 fn notify(&self, _: usize) {
353 self.0.wake_by_ref();
354 }
355 }
356
357 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01358 unsafe fn clone_raw(&self) -> NotifyHandle01 {
359 WakerToHandle(&self.0).into()
360 }
361
drop_raw(&self)362 unsafe fn drop_raw(&self) {
363 let ptr: *const dyn UnsafeNotify01 = self;
364 drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
365 }
366 }
367
368 #[cfg(feature = "io-compat")]
369 mod io {
370 use super::*;
371 #[cfg(feature = "read-initializer")]
372 use futures_io::Initializer;
373 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
374 use std::io::Error;
375 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
376
377 /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
378 pub trait AsyncRead01CompatExt: AsyncRead01 {
379 /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
380 /// [`AsyncRead`](futures_io::AsyncRead).
381 ///
382 /// ```
383 /// # futures::executor::block_on(async {
384 /// use futures::io::AsyncReadExt;
385 /// use futures_util::compat::AsyncRead01CompatExt;
386 ///
387 /// let input = b"Hello World!";
388 /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
389 /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
390 ///
391 /// let mut output = Vec::with_capacity(12);
392 /// reader.read_to_end(&mut output).await.unwrap();
393 /// assert_eq!(output, input);
394 /// # });
395 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,396 fn compat(self) -> Compat01As03<Self>
397 where
398 Self: Sized,
399 {
400 Compat01As03::new(self)
401 }
402 }
403 impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
404
405 /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
406 pub trait AsyncWrite01CompatExt: AsyncWrite01 {
407 /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
408 /// [`AsyncWrite`](futures_io::AsyncWrite).
409 ///
410 /// ```
411 /// # futures::executor::block_on(async {
412 /// use futures::io::AsyncWriteExt;
413 /// use futures_util::compat::AsyncWrite01CompatExt;
414 ///
415 /// let input = b"Hello World!";
416 /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
417 ///
418 /// let mut writer = (&mut cursor).compat();
419 /// writer.write_all(input).await.unwrap();
420 ///
421 /// assert_eq!(cursor.into_inner(), input);
422 /// # });
423 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,424 fn compat(self) -> Compat01As03<Self>
425 where
426 Self: Sized,
427 {
428 Compat01As03::new(self)
429 }
430 }
431 impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
432
433 impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
434 #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer435 unsafe fn initializer(&self) -> Initializer {
436 // check if `prepare_uninitialized_buffer` needs zeroing
437 if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
438 Initializer::zeroing()
439 } else {
440 Initializer::nop()
441 }
442 }
443
poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> task03::Poll<Result<usize, Error>>444 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
445 -> task03::Poll<Result<usize, Error>>
446 {
447 poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
448 }
449 }
450
451 impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> task03::Poll<Result<usize, Error>>452 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
453 -> task03::Poll<Result<usize, Error>>
454 {
455 poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
456 }
457
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>458 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
459 -> task03::Poll<Result<(), Error>>
460 {
461 poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
462 }
463
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Result<(), Error>>464 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
465 -> task03::Poll<Result<(), Error>>
466 {
467 poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
468 }
469 }
470 }
471