1 //! Async I/O and timers.
2 //!
3 //! This crate provides two tools:
4 //!
5 //! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6 //!   async programs.
7 //! * [`Timer`], a future or stream that emits timed events.
8 //!
9 //! For concrete async networking types built on top of this crate, see [`async-net`].
10 //!
11 //! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12 //! [`async-net`]: https://docs.rs/async-net
13 //!
14 //! # Implementation
15 //!
16 //! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17 //! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18 //! wake appropriate futures blocked on I/O or timers when they can be resumed.
19 //!
20 //! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21 //! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows. That
22 //! functionality is provided by the [`polling`] crate.
23 //!
24 //! However, note that you can also process I/O events and wake futures on any thread using the
25 //! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26 //! processing I/O events in case no other threads are.
27 //!
28 //! [epoll]: https://en.wikipedia.org/wiki/Epoll
29 //! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30 //! [event ports]: https://illumos.org/man/port_create
31 //! [wepoll]: https://github.com/piscisaureus/wepoll
32 //! [`polling`]: https://docs.rs/polling
33 //!
34 //! # Examples
35 //!
36 //! Connect to `example.com:80`, or time out after 10 seconds.
37 //!
38 //! ```
39 //! use async_io::{Async, Timer};
40 //! use futures_lite::{future::FutureExt, io};
41 //!
42 //! use std::net::{TcpStream, ToSocketAddrs};
43 //! use std::time::Duration;
44 //!
45 //! # futures_lite::future::block_on(async {
46 //! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47 //!
48 //! let stream = Async::<TcpStream>::connect(addr).or(async {
49 //!     Timer::after(Duration::from_secs(10)).await;
50 //!     Err(io::ErrorKind::TimedOut.into())
51 //! })
52 //! .await?;
53 //! # std::io::Result::Ok(()) });
54 //! ```
55 
56 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57 
58 use std::convert::TryFrom;
59 use std::future::Future;
60 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
61 use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
62 use std::pin::Pin;
63 use std::sync::Arc;
64 use std::task::{Context, Poll, Waker};
65 use std::time::{Duration, Instant};
66 
67 #[cfg(unix)]
68 use std::{
69     os::unix::io::{AsRawFd, RawFd},
70     os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
71     path::Path,
72 };
73 
74 #[cfg(windows)]
75 use std::os::windows::io::{AsRawSocket, RawSocket};
76 
77 use futures_lite::io::{AsyncRead, AsyncWrite};
78 use futures_lite::stream::{self, Stream};
79 use futures_lite::{future, pin, ready};
80 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
81 
82 use crate::reactor::{Reactor, Source};
83 
84 mod driver;
85 mod reactor;
86 
87 pub use driver::block_on;
88 pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
89 
90 /// Use `Duration::MAX` once `duration_constants` are stabilized.
duration_max() -> Duration91 fn duration_max() -> Duration {
92     Duration::new(std::u64::MAX, 1_000_000_000 - 1)
93 }
94 
95 /// A future or stream that emits timed events.
96 ///
97 /// Timers are futures that output a single [`Instant`] when they fire.
98 ///
99 /// Timers are also streams that can output [`Instant`]s periodically.
100 ///
101 /// # Examples
102 ///
103 /// Sleep for 1 second:
104 ///
105 /// ```
106 /// use async_io::Timer;
107 /// use std::time::Duration;
108 ///
109 /// # futures_lite::future::block_on(async {
110 /// Timer::after(Duration::from_secs(1)).await;
111 /// # });
112 /// ```
113 ///
114 /// Timeout after 1 second:
115 ///
116 /// ```
117 /// use async_io::Timer;
118 /// use futures_lite::FutureExt;
119 /// use std::time::Duration;
120 ///
121 /// # futures_lite::future::block_on(async {
122 /// let addrs = async_net::resolve("google.com:80")
123 ///     .or(async {
124 ///         Timer::after(Duration::from_secs(10)).await;
125 ///         Err(std::io::ErrorKind::TimedOut.into())
126 ///     })
127 ///     .await?;
128 /// # std::io::Result::Ok(()) });
129 /// ```
130 #[derive(Debug)]
131 pub struct Timer {
132     /// This timer's ID and last waker that polled it.
133     ///
134     /// When this field is set to `None`, this timer is not registered in the reactor.
135     id_and_waker: Option<(usize, Waker)>,
136 
137     /// The next instant at which this timer fires.
138     when: Instant,
139 
140     /// The period.
141     period: Duration,
142 }
143 
144 impl Timer {
145     /// Creates a timer that emits an event once after the given duration of time.
146     ///
147     /// # Examples
148     ///
149     /// ```
150     /// use async_io::Timer;
151     /// use std::time::Duration;
152     ///
153     /// # futures_lite::future::block_on(async {
154     /// Timer::after(Duration::from_secs(1)).await;
155     /// # });
156     /// ```
after(duration: Duration) -> Timer157     pub fn after(duration: Duration) -> Timer {
158         Timer::at(Instant::now() + duration)
159     }
160 
161     /// Creates a timer that emits an event once at the given time instant.
162     ///
163     /// # Examples
164     ///
165     /// ```
166     /// use async_io::Timer;
167     /// use std::time::{Duration, Instant};
168     ///
169     /// # futures_lite::future::block_on(async {
170     /// let now = Instant::now();
171     /// let when = now + Duration::from_secs(1);
172     /// Timer::at(when).await;
173     /// # });
174     /// ```
at(instant: Instant) -> Timer175     pub fn at(instant: Instant) -> Timer {
176         // Use Duration::MAX once duration_constants are stabilized.
177         Timer::interval_at(instant, duration_max())
178     }
179 
180     /// Creates a timer that emits events periodically.
181     ///
182     /// # Examples
183     ///
184     /// ```
185     /// use async_io::Timer;
186     /// use futures_lite::StreamExt;
187     /// use std::time::{Duration, Instant};
188     ///
189     /// # futures_lite::future::block_on(async {
190     /// let period = Duration::from_secs(1);
191     /// Timer::interval(period).next().await;
192     /// # });
193     /// ```
interval(period: Duration) -> Timer194     pub fn interval(period: Duration) -> Timer {
195         Timer::interval_at(Instant::now() + period, period)
196     }
197 
198     /// Creates a timer that emits events periodically, starting at `start`.
199     ///
200     /// # Examples
201     ///
202     /// ```
203     /// use async_io::Timer;
204     /// use futures_lite::StreamExt;
205     /// use std::time::{Duration, Instant};
206     ///
207     /// # futures_lite::future::block_on(async {
208     /// let start = Instant::now();
209     /// let period = Duration::from_secs(1);
210     /// Timer::interval_at(start, period).next().await;
211     /// # });
212     /// ```
interval_at(start: Instant, period: Duration) -> Timer213     pub fn interval_at(start: Instant, period: Duration) -> Timer {
214         Timer {
215             id_and_waker: None,
216             when: start,
217             period,
218         }
219     }
220 
221     /// Sets the timer to emit an en event once after the given duration of time.
222     ///
223     /// Note that resetting a timer is different from creating a new timer because
224     /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
225     /// that is polling the timer.
226     ///
227     /// # Examples
228     ///
229     /// ```
230     /// use async_io::Timer;
231     /// use std::time::Duration;
232     ///
233     /// # futures_lite::future::block_on(async {
234     /// let mut t = Timer::after(Duration::from_secs(1));
235     /// t.set_after(Duration::from_millis(100));
236     /// # });
237     /// ```
set_after(&mut self, duration: Duration)238     pub fn set_after(&mut self, duration: Duration) {
239         self.set_at(Instant::now() + duration);
240     }
241 
242     /// Sets the timer to emit an event once at the given time instant.
243     ///
244     /// Note that resetting a timer is different from creating a new timer because
245     /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
246     /// that is polling the timer.
247     ///
248     /// # Examples
249     ///
250     /// ```
251     /// use async_io::Timer;
252     /// use std::time::{Duration, Instant};
253     ///
254     /// # futures_lite::future::block_on(async {
255     /// let mut t = Timer::after(Duration::from_secs(1));
256     ///
257     /// let now = Instant::now();
258     /// let when = now + Duration::from_secs(1);
259     /// t.set_at(when);
260     /// # });
261     /// ```
set_at(&mut self, instant: Instant)262     pub fn set_at(&mut self, instant: Instant) {
263         if let Some((id, _)) = self.id_and_waker.as_ref() {
264             // Deregister the timer from the reactor.
265             Reactor::get().remove_timer(self.when, *id);
266         }
267 
268         // Update the timeout.
269         self.when = instant;
270 
271         if let Some((id, waker)) = self.id_and_waker.as_mut() {
272             // Re-register the timer with the new timeout.
273             *id = Reactor::get().insert_timer(self.when, waker);
274         }
275     }
276 
277     /// Sets the timer to emit events periodically.
278     ///
279     /// Note that resetting a timer is different from creating a new timer because
280     /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
281     /// task that is polling the timer.
282     ///
283     /// # Examples
284     ///
285     /// ```
286     /// use async_io::Timer;
287     /// use futures_lite::StreamExt;
288     /// use std::time::{Duration, Instant};
289     ///
290     /// # futures_lite::future::block_on(async {
291     /// let mut t = Timer::after(Duration::from_secs(1));
292     ///
293     /// let period = Duration::from_secs(2);
294     /// t.set_interval(period);
295     /// # });
296     /// ```
set_interval(&mut self, period: Duration)297     pub fn set_interval(&mut self, period: Duration) {
298         self.set_interval_at(Instant::now() + period, period);
299     }
300 
301     /// Sets the timer to emit events periodically, starting at `start`.
302     ///
303     /// Note that resetting a timer is different from creating a new timer because
304     /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
305     /// the task that is polling the timer.
306     ///
307     /// # Examples
308     ///
309     /// ```
310     /// use async_io::Timer;
311     /// use futures_lite::StreamExt;
312     /// use std::time::{Duration, Instant};
313     ///
314     /// # futures_lite::future::block_on(async {
315     /// let mut t = Timer::after(Duration::from_secs(1));
316     ///
317     /// let start = Instant::now();
318     /// let period = Duration::from_secs(2);
319     /// t.set_interval_at(start, period);
320     /// # });
321     /// ```
set_interval_at(&mut self, start: Instant, period: Duration)322     pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
323         if let Some((id, _)) = self.id_and_waker.as_ref() {
324             // Deregister the timer from the reactor.
325             Reactor::get().remove_timer(self.when, *id);
326         }
327 
328         self.when = start;
329         self.period = period;
330 
331         if let Some((id, waker)) = self.id_and_waker.as_mut() {
332             // Re-register the timer with the new timeout.
333             *id = Reactor::get().insert_timer(self.when, waker);
334         }
335     }
336 }
337 
338 impl Drop for Timer {
drop(&mut self)339     fn drop(&mut self) {
340         if let Some((id, _)) = self.id_and_waker.take() {
341             // Deregister the timer from the reactor.
342             Reactor::get().remove_timer(self.when, id);
343         }
344     }
345 }
346 
347 impl Future for Timer {
348     type Output = Instant;
349 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>350     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351         match self.poll_next(cx) {
352             Poll::Ready(Some(when)) => Poll::Ready(when),
353             Poll::Pending => Poll::Pending,
354             Poll::Ready(None) => unreachable!(),
355         }
356     }
357 }
358 
359 impl Stream for Timer {
360     type Item = Instant;
361 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>362     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
363         // Check if the timer has already fired.
364         if Instant::now() >= self.when {
365             if let Some((id, _)) = self.id_and_waker.take() {
366                 // Deregister the timer from the reactor.
367                 Reactor::get().remove_timer(self.when, id);
368             }
369             let when = self.when;
370             if let Some(next) = when.checked_add(self.period) {
371                 self.when = next;
372                 // Register the timer in the reactor.
373                 let id = Reactor::get().insert_timer(self.when, cx.waker());
374                 self.id_and_waker = Some((id, cx.waker().clone()));
375             }
376             return Poll::Ready(Some(when));
377         } else {
378             match &self.id_and_waker {
379                 None => {
380                     // Register the timer in the reactor.
381                     let id = Reactor::get().insert_timer(self.when, cx.waker());
382                     self.id_and_waker = Some((id, cx.waker().clone()));
383                 }
384                 Some((id, w)) if !w.will_wake(cx.waker()) => {
385                     // Deregister the timer from the reactor to remove the old waker.
386                     Reactor::get().remove_timer(self.when, *id);
387 
388                     // Register the timer in the reactor with the new waker.
389                     let id = Reactor::get().insert_timer(self.when, cx.waker());
390                     self.id_and_waker = Some((id, cx.waker().clone()));
391                 }
392                 Some(_) => {}
393             }
394         }
395         Poll::Pending
396     }
397 }
398 
399 /// Async adapter for I/O types.
400 ///
401 /// This type puts an I/O handle into non-blocking mode, registers it in
402 /// [epoll]/[kqueue]/[event ports]/[wepoll], and then provides an async interface for it.
403 ///
404 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
405 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
406 /// [event ports]: https://illumos.org/man/port_create
407 /// [wepoll]: https://github.com/piscisaureus/wepoll
408 ///
409 /// # Caveats
410 ///
411 /// [`Async`] is a low-level primitive, and as such it comes with some caveats.
412 ///
413 /// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
414 /// [`async-process`] (on Unix).
415 ///
416 /// [`async-net`]: https://github.com/smol-rs/async-net
417 /// [`async-process`]: https://github.com/smol-rs/async-process
418 ///
419 /// ### Supported types
420 ///
421 /// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
422 /// [timerfd] and [inotify].
423 ///
424 /// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
425 /// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
426 /// because all operating systems have issues with them when put in non-blocking mode.
427 ///
428 /// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
429 /// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
430 ///
431 /// ### Concurrent I/O
432 ///
433 /// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
434 /// implements those traits, which means tasks can concurrently read and write using shared
435 /// references.
436 ///
437 /// But there is a catch: only one task can read a time, and only one task can write at a time. It
438 /// is okay to have two tasks where one is reading and the other is writing at the same time, but
439 /// it is not okay to have two tasks reading at the same time or writing at the same time. If you
440 /// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
441 /// time.
442 ///
443 /// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
444 /// [`poll_readable()`][`Async::poll_readable()`] and
445 /// [`poll_writable()`][`Async::poll_writable()`].
446 ///
447 /// However, any number of tasks can be concurrently calling other methods like
448 /// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
449 ///
450 /// ### Closing
451 ///
452 /// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
453 /// simply flushes. If you want to shutdown a TCP or Unix socket, use
454 /// [`Shutdown`][`std::net::Shutdown`].
455 ///
456 /// # Examples
457 ///
458 /// Connect to a server and echo incoming messages back to the server:
459 ///
460 /// ```no_run
461 /// use async_io::Async;
462 /// use futures_lite::io;
463 /// use std::net::TcpStream;
464 ///
465 /// # futures_lite::future::block_on(async {
466 /// // Connect to a local server.
467 /// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
468 ///
469 /// // Echo all messages from the read side of the stream into the write side.
470 /// io::copy(&stream, &stream).await?;
471 /// # std::io::Result::Ok(()) });
472 /// ```
473 ///
474 /// You can use either predefined async methods or wrap blocking I/O operations in
475 /// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
476 /// [`Async::write_with_mut()`]:
477 ///
478 /// ```no_run
479 /// use async_io::Async;
480 /// use std::net::TcpListener;
481 ///
482 /// # futures_lite::future::block_on(async {
483 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
484 ///
485 /// // These two lines are equivalent:
486 /// let (stream, addr) = listener.accept().await?;
487 /// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
488 /// # std::io::Result::Ok(()) });
489 /// ```
490 #[derive(Debug)]
491 pub struct Async<T> {
492     /// A source registered in the reactor.
493     source: Arc<Source>,
494 
495     /// The inner I/O handle.
496     io: Option<T>,
497 }
498 
499 impl<T> Unpin for Async<T> {}
500 
501 #[cfg(unix)]
502 impl<T: AsRawFd> Async<T> {
503     /// Creates an async I/O handle.
504     ///
505     /// This method will put the handle in non-blocking mode and register it in
506     /// [epoll]/[kqueue]/[event ports]/[wepoll].
507     ///
508     /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
509     /// `AsRawSocket`.
510     ///
511     /// [epoll]: https://en.wikipedia.org/wiki/Epoll
512     /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
513     /// [event ports]: https://illumos.org/man/port_create
514     /// [wepoll]: https://github.com/piscisaureus/wepoll
515     ///
516     /// # Examples
517     ///
518     /// ```
519     /// use async_io::Async;
520     /// use std::net::{SocketAddr, TcpListener};
521     ///
522     /// # futures_lite::future::block_on(async {
523     /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
524     /// let listener = Async::new(listener)?;
525     /// # std::io::Result::Ok(()) });
526     /// ```
new(io: T) -> io::Result<Async<T>>527     pub fn new(io: T) -> io::Result<Async<T>> {
528         let fd = io.as_raw_fd();
529 
530         // Put the file descriptor in non-blocking mode.
531         unsafe {
532             let mut res = libc::fcntl(fd, libc::F_GETFL);
533             if res != -1 {
534                 res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
535             }
536             if res == -1 {
537                 return Err(io::Error::last_os_error());
538             }
539         }
540 
541         Ok(Async {
542             source: Reactor::get().insert_io(fd)?,
543             io: Some(io),
544         })
545     }
546 }
547 
548 #[cfg(unix)]
549 impl<T: AsRawFd> AsRawFd for Async<T> {
as_raw_fd(&self) -> RawFd550     fn as_raw_fd(&self) -> RawFd {
551         self.source.raw
552     }
553 }
554 
555 #[cfg(windows)]
556 impl<T: AsRawSocket> Async<T> {
557     /// Creates an async I/O handle.
558     ///
559     /// This method will put the handle in non-blocking mode and register it in
560     /// [epoll]/[kqueue]/[event ports]/[wepoll].
561     ///
562     /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
563     /// `AsRawSocket`.
564     ///
565     /// [epoll]: https://en.wikipedia.org/wiki/Epoll
566     /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
567     /// [event ports]: https://illumos.org/man/port_create
568     /// [wepoll]: https://github.com/piscisaureus/wepoll
569     ///
570     /// # Examples
571     ///
572     /// ```
573     /// use async_io::Async;
574     /// use std::net::{SocketAddr, TcpListener};
575     ///
576     /// # futures_lite::future::block_on(async {
577     /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
578     /// let listener = Async::new(listener)?;
579     /// # std::io::Result::Ok(()) });
580     /// ```
new(io: T) -> io::Result<Async<T>>581     pub fn new(io: T) -> io::Result<Async<T>> {
582         let sock = io.as_raw_socket();
583 
584         // Put the socket in non-blocking mode.
585         unsafe {
586             use winapi::ctypes;
587             use winapi::um::winsock2;
588 
589             let mut nonblocking = true as ctypes::c_ulong;
590             let res = winsock2::ioctlsocket(
591                 sock as winsock2::SOCKET,
592                 winsock2::FIONBIO,
593                 &mut nonblocking,
594             );
595             if res != 0 {
596                 return Err(io::Error::last_os_error());
597             }
598         }
599 
600         Ok(Async {
601             source: Reactor::get().insert_io(sock)?,
602             io: Some(io),
603         })
604     }
605 }
606 
607 #[cfg(windows)]
608 impl<T: AsRawSocket> AsRawSocket for Async<T> {
as_raw_socket(&self) -> RawSocket609     fn as_raw_socket(&self) -> RawSocket {
610         self.source.raw
611     }
612 }
613 
614 impl<T> Async<T> {
615     /// Gets a reference to the inner I/O handle.
616     ///
617     /// # Examples
618     ///
619     /// ```
620     /// use async_io::Async;
621     /// use std::net::TcpListener;
622     ///
623     /// # futures_lite::future::block_on(async {
624     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
625     /// let inner = listener.get_ref();
626     /// # std::io::Result::Ok(()) });
627     /// ```
get_ref(&self) -> &T628     pub fn get_ref(&self) -> &T {
629         self.io.as_ref().unwrap()
630     }
631 
632     /// Gets a mutable reference to the inner I/O handle.
633     ///
634     /// # Examples
635     ///
636     /// ```
637     /// use async_io::Async;
638     /// use std::net::TcpListener;
639     ///
640     /// # futures_lite::future::block_on(async {
641     /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
642     /// let inner = listener.get_mut();
643     /// # std::io::Result::Ok(()) });
644     /// ```
get_mut(&mut self) -> &mut T645     pub fn get_mut(&mut self) -> &mut T {
646         self.io.as_mut().unwrap()
647     }
648 
649     /// Unwraps the inner I/O handle.
650     ///
651     /// This method will **not** put the I/O handle back into blocking mode.
652     ///
653     /// # Examples
654     ///
655     /// ```
656     /// use async_io::Async;
657     /// use std::net::TcpListener;
658     ///
659     /// # futures_lite::future::block_on(async {
660     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
661     /// let inner = listener.into_inner()?;
662     ///
663     /// // Put the listener back into blocking mode.
664     /// inner.set_nonblocking(false)?;
665     /// # std::io::Result::Ok(()) });
666     /// ```
into_inner(mut self) -> io::Result<T>667     pub fn into_inner(mut self) -> io::Result<T> {
668         let io = self.io.take().unwrap();
669         Reactor::get().remove_io(&self.source)?;
670         Ok(io)
671     }
672 
673     /// Waits until the I/O handle is readable.
674     ///
675     /// This method completes when a read operation on this I/O handle wouldn't block.
676     ///
677     /// # Examples
678     ///
679     /// ```no_run
680     /// use async_io::Async;
681     /// use std::net::TcpListener;
682     ///
683     /// # futures_lite::future::block_on(async {
684     /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
685     ///
686     /// // Wait until a client can be accepted.
687     /// listener.readable().await?;
688     /// # std::io::Result::Ok(()) });
689     /// ```
readable(&self) -> Readable<'_, T>690     pub fn readable(&self) -> Readable<'_, T> {
691         Source::readable(self)
692     }
693 
694     /// Waits until the I/O handle is readable.
695     ///
696     /// This method completes when a read operation on this I/O handle wouldn't block.
readable_owned(self: Arc<Self>) -> ReadableOwned<T>697     pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
698         Source::readable_owned(self)
699     }
700 
701     /// Waits until the I/O handle is writable.
702     ///
703     /// This method completes when a write operation on this I/O handle wouldn't block.
704     ///
705     /// # Examples
706     ///
707     /// ```
708     /// use async_io::Async;
709     /// use std::net::{TcpStream, ToSocketAddrs};
710     ///
711     /// # futures_lite::future::block_on(async {
712     /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
713     /// let stream = Async::<TcpStream>::connect(addr).await?;
714     ///
715     /// // Wait until the stream is writable.
716     /// stream.writable().await?;
717     /// # std::io::Result::Ok(()) });
718     /// ```
writable(&self) -> Writable<'_, T>719     pub fn writable(&self) -> Writable<'_, T> {
720         Source::writable(self)
721     }
722 
723     /// Waits until the I/O handle is writable.
724     ///
725     /// This method completes when a write operation on this I/O handle wouldn't block.
writable_owned(self: Arc<Self>) -> WritableOwned<T>726     pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
727         Source::writable_owned(self)
728     }
729 
730     /// Polls the I/O handle for readability.
731     ///
732     /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
733     /// indicating readability since the last time this task has called the method and received
734     /// [`Poll::Pending`].
735     ///
736     /// # Caveats
737     ///
738     /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
739     /// will just keep waking each other in turn, thus wasting CPU time.
740     ///
741     /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
742     ///
743     /// # Examples
744     ///
745     /// ```no_run
746     /// use async_io::Async;
747     /// use futures_lite::future;
748     /// use std::net::TcpListener;
749     ///
750     /// # futures_lite::future::block_on(async {
751     /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
752     ///
753     /// // Wait until a client can be accepted.
754     /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
755     /// # std::io::Result::Ok(()) });
756     /// ```
poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>757     pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
758         self.source.poll_readable(cx)
759     }
760 
761     /// Polls the I/O handle for writability.
762     ///
763     /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
764     /// indicating writability since the last time this task has called the method and received
765     /// [`Poll::Pending`].
766     ///
767     /// # Caveats
768     ///
769     /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
770     /// will just keep waking each other in turn, thus wasting CPU time.
771     ///
772     /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
773     ///
774     /// # Examples
775     ///
776     /// ```
777     /// use async_io::Async;
778     /// use futures_lite::future;
779     /// use std::net::{TcpStream, ToSocketAddrs};
780     ///
781     /// # futures_lite::future::block_on(async {
782     /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
783     /// let stream = Async::<TcpStream>::connect(addr).await?;
784     ///
785     /// // Wait until the stream is writable.
786     /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
787     /// # std::io::Result::Ok(()) });
788     /// ```
poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>789     pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
790         self.source.poll_writable(cx)
791     }
792 
793     /// Performs a read operation asynchronously.
794     ///
795     /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
796     /// invokes the `op` closure in a loop until it succeeds or returns an error other than
797     /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
798     /// sends a notification that the I/O handle is readable.
799     ///
800     /// The closure receives a shared reference to the I/O handle.
801     ///
802     /// # Examples
803     ///
804     /// ```no_run
805     /// use async_io::Async;
806     /// use std::net::TcpListener;
807     ///
808     /// # futures_lite::future::block_on(async {
809     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
810     ///
811     /// // Accept a new client asynchronously.
812     /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
813     /// # std::io::Result::Ok(()) });
814     /// ```
read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R>815     pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
816         let mut op = op;
817         loop {
818             match op(self.get_ref()) {
819                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
820                 res => return res,
821             }
822             optimistic(self.readable()).await?;
823         }
824     }
825 
826     /// Performs a read operation asynchronously.
827     ///
828     /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
829     /// invokes the `op` closure in a loop until it succeeds or returns an error other than
830     /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
831     /// sends a notification that the I/O handle is readable.
832     ///
833     /// The closure receives a mutable reference to the I/O handle.
834     ///
835     /// # Examples
836     ///
837     /// ```no_run
838     /// use async_io::Async;
839     /// use std::net::TcpListener;
840     ///
841     /// # futures_lite::future::block_on(async {
842     /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
843     ///
844     /// // Accept a new client asynchronously.
845     /// let (stream, addr) = listener.read_with_mut(|l| l.accept()).await?;
846     /// # std::io::Result::Ok(()) });
847     /// ```
read_with_mut<R>( &mut self, op: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>848     pub async fn read_with_mut<R>(
849         &mut self,
850         op: impl FnMut(&mut T) -> io::Result<R>,
851     ) -> io::Result<R> {
852         let mut op = op;
853         loop {
854             match op(self.get_mut()) {
855                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
856                 res => return res,
857             }
858             optimistic(self.readable()).await?;
859         }
860     }
861 
862     /// Performs a write operation asynchronously.
863     ///
864     /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
865     /// invokes the `op` closure in a loop until it succeeds or returns an error other than
866     /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
867     /// sends a notification that the I/O handle is writable.
868     ///
869     /// The closure receives a shared reference to the I/O handle.
870     ///
871     /// # Examples
872     ///
873     /// ```no_run
874     /// use async_io::Async;
875     /// use std::net::UdpSocket;
876     ///
877     /// # futures_lite::future::block_on(async {
878     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
879     /// socket.get_ref().connect("127.0.0.1:9000")?;
880     ///
881     /// let msg = b"hello";
882     /// let len = socket.write_with(|s| s.send(msg)).await?;
883     /// # std::io::Result::Ok(()) });
884     /// ```
write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R>885     pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
886         let mut op = op;
887         loop {
888             match op(self.get_ref()) {
889                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
890                 res => return res,
891             }
892             optimistic(self.writable()).await?;
893         }
894     }
895 
896     /// Performs a write operation asynchronously.
897     ///
898     /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
899     /// invokes the `op` closure in a loop until it succeeds or returns an error other than
900     /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
901     /// sends a notification that the I/O handle is writable.
902     ///
903     /// The closure receives a mutable reference to the I/O handle.
904     ///
905     /// # Examples
906     ///
907     /// ```no_run
908     /// use async_io::Async;
909     /// use std::net::UdpSocket;
910     ///
911     /// # futures_lite::future::block_on(async {
912     /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
913     /// socket.get_ref().connect("127.0.0.1:9000")?;
914     ///
915     /// let msg = b"hello";
916     /// let len = socket.write_with_mut(|s| s.send(msg)).await?;
917     /// # std::io::Result::Ok(()) });
918     /// ```
write_with_mut<R>( &mut self, op: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>919     pub async fn write_with_mut<R>(
920         &mut self,
921         op: impl FnMut(&mut T) -> io::Result<R>,
922     ) -> io::Result<R> {
923         let mut op = op;
924         loop {
925             match op(self.get_mut()) {
926                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
927                 res => return res,
928             }
929             optimistic(self.writable()).await?;
930         }
931     }
932 }
933 
934 impl<T> AsRef<T> for Async<T> {
as_ref(&self) -> &T935     fn as_ref(&self) -> &T {
936         self.get_ref()
937     }
938 }
939 
940 impl<T> AsMut<T> for Async<T> {
as_mut(&mut self) -> &mut T941     fn as_mut(&mut self) -> &mut T {
942         self.get_mut()
943     }
944 }
945 
946 impl<T> Drop for Async<T> {
drop(&mut self)947     fn drop(&mut self) {
948         if self.io.is_some() {
949             // Deregister and ignore errors because destructors should not panic.
950             Reactor::get().remove_io(&self.source).ok();
951 
952             // Drop the I/O handle to close it.
953             self.io.take();
954         }
955     }
956 }
957 
958 impl<T: Read> AsyncRead for Async<T> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>959     fn poll_read(
960         mut self: Pin<&mut Self>,
961         cx: &mut Context<'_>,
962         buf: &mut [u8],
963     ) -> Poll<io::Result<usize>> {
964         loop {
965             match (&mut *self).get_mut().read(buf) {
966                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
967                 res => return Poll::Ready(res),
968             }
969             ready!(self.poll_readable(cx))?;
970         }
971     }
972 
poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>973     fn poll_read_vectored(
974         mut self: Pin<&mut Self>,
975         cx: &mut Context<'_>,
976         bufs: &mut [IoSliceMut<'_>],
977     ) -> Poll<io::Result<usize>> {
978         loop {
979             match (&mut *self).get_mut().read_vectored(bufs) {
980                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
981                 res => return Poll::Ready(res),
982             }
983             ready!(self.poll_readable(cx))?;
984         }
985     }
986 }
987 
988 impl<T> AsyncRead for &Async<T>
989 where
990     for<'a> &'a T: Read,
991 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>992     fn poll_read(
993         self: Pin<&mut Self>,
994         cx: &mut Context<'_>,
995         buf: &mut [u8],
996     ) -> Poll<io::Result<usize>> {
997         loop {
998             match (&*self).get_ref().read(buf) {
999                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1000                 res => return Poll::Ready(res),
1001             }
1002             ready!(self.poll_readable(cx))?;
1003         }
1004     }
1005 
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>1006     fn poll_read_vectored(
1007         self: Pin<&mut Self>,
1008         cx: &mut Context<'_>,
1009         bufs: &mut [IoSliceMut<'_>],
1010     ) -> Poll<io::Result<usize>> {
1011         loop {
1012             match (&*self).get_ref().read_vectored(bufs) {
1013                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1014                 res => return Poll::Ready(res),
1015             }
1016             ready!(self.poll_readable(cx))?;
1017         }
1018     }
1019 }
1020 
1021 impl<T: Write> AsyncWrite for Async<T> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1022     fn poll_write(
1023         mut self: Pin<&mut Self>,
1024         cx: &mut Context<'_>,
1025         buf: &[u8],
1026     ) -> Poll<io::Result<usize>> {
1027         loop {
1028             match (&mut *self).get_mut().write(buf) {
1029                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1030                 res => return Poll::Ready(res),
1031             }
1032             ready!(self.poll_writable(cx))?;
1033         }
1034     }
1035 
poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>1036     fn poll_write_vectored(
1037         mut self: Pin<&mut Self>,
1038         cx: &mut Context<'_>,
1039         bufs: &[IoSlice<'_>],
1040     ) -> Poll<io::Result<usize>> {
1041         loop {
1042             match (&mut *self).get_mut().write_vectored(bufs) {
1043                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1044                 res => return Poll::Ready(res),
1045             }
1046             ready!(self.poll_writable(cx))?;
1047         }
1048     }
1049 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1050     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1051         loop {
1052             match (&mut *self).get_mut().flush() {
1053                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1054                 res => return Poll::Ready(res),
1055             }
1056             ready!(self.poll_writable(cx))?;
1057         }
1058     }
1059 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1060     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1061         self.poll_flush(cx)
1062     }
1063 }
1064 
1065 impl<T> AsyncWrite for &Async<T>
1066 where
1067     for<'a> &'a T: Write,
1068 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1069     fn poll_write(
1070         self: Pin<&mut Self>,
1071         cx: &mut Context<'_>,
1072         buf: &[u8],
1073     ) -> Poll<io::Result<usize>> {
1074         loop {
1075             match (&*self).get_ref().write(buf) {
1076                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1077                 res => return Poll::Ready(res),
1078             }
1079             ready!(self.poll_writable(cx))?;
1080         }
1081     }
1082 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>1083     fn poll_write_vectored(
1084         self: Pin<&mut Self>,
1085         cx: &mut Context<'_>,
1086         bufs: &[IoSlice<'_>],
1087     ) -> Poll<io::Result<usize>> {
1088         loop {
1089             match (&*self).get_ref().write_vectored(bufs) {
1090                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1091                 res => return Poll::Ready(res),
1092             }
1093             ready!(self.poll_writable(cx))?;
1094         }
1095     }
1096 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1097     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1098         loop {
1099             match (&*self).get_ref().flush() {
1100                 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1101                 res => return Poll::Ready(res),
1102             }
1103             ready!(self.poll_writable(cx))?;
1104         }
1105     }
1106 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1107     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1108         self.poll_flush(cx)
1109     }
1110 }
1111 
1112 impl Async<TcpListener> {
1113     /// Creates a TCP listener bound to the specified address.
1114     ///
1115     /// Binding with port number 0 will request an available port from the OS.
1116     ///
1117     /// # Examples
1118     ///
1119     /// ```
1120     /// use async_io::Async;
1121     /// use std::net::TcpListener;
1122     ///
1123     /// # futures_lite::future::block_on(async {
1124     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1125     /// println!("Listening on {}", listener.get_ref().local_addr()?);
1126     /// # std::io::Result::Ok(()) });
1127     /// ```
bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>>1128     pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1129         let addr = addr.into();
1130         Async::new(TcpListener::bind(addr)?)
1131     }
1132 
1133     /// Accepts a new incoming TCP connection.
1134     ///
1135     /// When a connection is established, it will be returned as a TCP stream together with its
1136     /// remote address.
1137     ///
1138     /// # Examples
1139     ///
1140     /// ```no_run
1141     /// use async_io::Async;
1142     /// use std::net::TcpListener;
1143     ///
1144     /// # futures_lite::future::block_on(async {
1145     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1146     /// let (stream, addr) = listener.accept().await?;
1147     /// println!("Accepted client: {}", addr);
1148     /// # std::io::Result::Ok(()) });
1149     /// ```
accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)>1150     pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1151         let (stream, addr) = self.read_with(|io| io.accept()).await?;
1152         Ok((Async::new(stream)?, addr))
1153     }
1154 
1155     /// Returns a stream of incoming TCP connections.
1156     ///
1157     /// The stream is infinite, i.e. it never stops with a [`None`].
1158     ///
1159     /// # Examples
1160     ///
1161     /// ```no_run
1162     /// use async_io::Async;
1163     /// use futures_lite::{pin, stream::StreamExt};
1164     /// use std::net::TcpListener;
1165     ///
1166     /// # futures_lite::future::block_on(async {
1167     /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1168     /// let incoming = listener.incoming();
1169     /// pin!(incoming);
1170     ///
1171     /// while let Some(stream) = incoming.next().await {
1172     ///     let stream = stream?;
1173     ///     println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1174     /// }
1175     /// # std::io::Result::Ok(()) });
1176     /// ```
incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_1177     pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1178         stream::unfold(self, |listener| async move {
1179             let res = listener.accept().await.map(|(stream, _)| stream);
1180             Some((res, listener))
1181         })
1182     }
1183 }
1184 
1185 impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1186     type Error = io::Error;
1187 
try_from(listener: std::net::TcpListener) -> io::Result<Self>1188     fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1189         Async::new(listener)
1190     }
1191 }
1192 
1193 impl Async<TcpStream> {
1194     /// Creates a TCP connection to the specified address.
1195     ///
1196     /// # Examples
1197     ///
1198     /// ```
1199     /// use async_io::Async;
1200     /// use std::net::{TcpStream, ToSocketAddrs};
1201     ///
1202     /// # futures_lite::future::block_on(async {
1203     /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1204     /// let stream = Async::<TcpStream>::connect(addr).await?;
1205     /// # std::io::Result::Ok(()) });
1206     /// ```
connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>>1207     pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1208         // Begin async connect.
1209         let addr = addr.into();
1210         let domain = Domain::for_address(addr);
1211         let socket = connect(addr.into(), domain, Some(Protocol::TCP))?;
1212         let stream = Async::new(TcpStream::from(socket))?;
1213 
1214         // The stream becomes writable when connected.
1215         stream.writable().await?;
1216 
1217         // Check if there was an error while connecting.
1218         match stream.get_ref().take_error()? {
1219             None => Ok(stream),
1220             Some(err) => Err(err),
1221         }
1222     }
1223 
1224     /// Reads data from the stream without removing it from the buffer.
1225     ///
1226     /// Returns the number of bytes read. Successive calls of this method read the same data.
1227     ///
1228     /// # Examples
1229     ///
1230     /// ```
1231     /// use async_io::Async;
1232     /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1233     /// use std::net::{TcpStream, ToSocketAddrs};
1234     ///
1235     /// # futures_lite::future::block_on(async {
1236     /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1237     /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1238     ///
1239     /// stream
1240     ///     .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1241     ///     .await?;
1242     ///
1243     /// let mut buf = [0u8; 1024];
1244     /// let len = stream.peek(&mut buf).await?;
1245     /// # std::io::Result::Ok(()) });
1246     /// ```
peek(&self, buf: &mut [u8]) -> io::Result<usize>1247     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1248         self.read_with(|io| io.peek(buf)).await
1249     }
1250 }
1251 
1252 impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1253     type Error = io::Error;
1254 
try_from(stream: std::net::TcpStream) -> io::Result<Self>1255     fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1256         Async::new(stream)
1257     }
1258 }
1259 
1260 impl Async<UdpSocket> {
1261     /// Creates a UDP socket bound to the specified address.
1262     ///
1263     /// Binding with port number 0 will request an available port from the OS.
1264     ///
1265     /// # Examples
1266     ///
1267     /// ```
1268     /// use async_io::Async;
1269     /// use std::net::UdpSocket;
1270     ///
1271     /// # futures_lite::future::block_on(async {
1272     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1273     /// println!("Bound to {}", socket.get_ref().local_addr()?);
1274     /// # std::io::Result::Ok(()) });
1275     /// ```
bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>>1276     pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1277         let addr = addr.into();
1278         Async::new(UdpSocket::bind(addr)?)
1279     }
1280 
1281     /// Receives a single datagram message.
1282     ///
1283     /// Returns the number of bytes read and the address the message came from.
1284     ///
1285     /// This method must be called with a valid byte slice of sufficient size to hold the message.
1286     /// If the message is too long to fit, excess bytes may get discarded.
1287     ///
1288     /// # Examples
1289     ///
1290     /// ```no_run
1291     /// use async_io::Async;
1292     /// use std::net::UdpSocket;
1293     ///
1294     /// # futures_lite::future::block_on(async {
1295     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1296     ///
1297     /// let mut buf = [0u8; 1024];
1298     /// let (len, addr) = socket.recv_from(&mut buf).await?;
1299     /// # std::io::Result::Ok(()) });
1300     /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1301     pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1302         self.read_with(|io| io.recv_from(buf)).await
1303     }
1304 
1305     /// Receives a single datagram message without removing it from the queue.
1306     ///
1307     /// Returns the number of bytes read and the address the message came from.
1308     ///
1309     /// This method must be called with a valid byte slice of sufficient size to hold the message.
1310     /// If the message is too long to fit, excess bytes may get discarded.
1311     ///
1312     /// # Examples
1313     ///
1314     /// ```no_run
1315     /// use async_io::Async;
1316     /// use std::net::UdpSocket;
1317     ///
1318     /// # futures_lite::future::block_on(async {
1319     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1320     ///
1321     /// let mut buf = [0u8; 1024];
1322     /// let (len, addr) = socket.peek_from(&mut buf).await?;
1323     /// # std::io::Result::Ok(()) });
1324     /// ```
peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1325     pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1326         self.read_with(|io| io.peek_from(buf)).await
1327     }
1328 
1329     /// Sends data to the specified address.
1330     ///
1331     /// Returns the number of bytes writen.
1332     ///
1333     /// # Examples
1334     ///
1335     /// ```no_run
1336     /// use async_io::Async;
1337     /// use std::net::UdpSocket;
1338     ///
1339     /// # futures_lite::future::block_on(async {
1340     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1341     /// let addr = socket.get_ref().local_addr()?;
1342     ///
1343     /// let msg = b"hello";
1344     /// let len = socket.send_to(msg, addr).await?;
1345     /// # std::io::Result::Ok(()) });
1346     /// ```
send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize>1347     pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1348         let addr = addr.into();
1349         self.write_with(|io| io.send_to(buf, addr)).await
1350     }
1351 
1352     /// Receives a single datagram message from the connected peer.
1353     ///
1354     /// Returns the number of bytes read.
1355     ///
1356     /// This method must be called with a valid byte slice of sufficient size to hold the message.
1357     /// If the message is too long to fit, excess bytes may get discarded.
1358     ///
1359     /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1360     /// This method will fail if the socket is not connected.
1361     ///
1362     /// # Examples
1363     ///
1364     /// ```no_run
1365     /// use async_io::Async;
1366     /// use std::net::UdpSocket;
1367     ///
1368     /// # futures_lite::future::block_on(async {
1369     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1370     /// socket.get_ref().connect("127.0.0.1:9000")?;
1371     ///
1372     /// let mut buf = [0u8; 1024];
1373     /// let len = socket.recv(&mut buf).await?;
1374     /// # std::io::Result::Ok(()) });
1375     /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>1376     pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1377         self.read_with(|io| io.recv(buf)).await
1378     }
1379 
1380     /// Receives a single datagram message from the connected peer without removing it from the
1381     /// queue.
1382     ///
1383     /// Returns the number of bytes read and the address the message came from.
1384     ///
1385     /// This method must be called with a valid byte slice of sufficient size to hold the message.
1386     /// If the message is too long to fit, excess bytes may get discarded.
1387     ///
1388     /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1389     /// This method will fail if the socket is not connected.
1390     ///
1391     /// # Examples
1392     ///
1393     /// ```no_run
1394     /// use async_io::Async;
1395     /// use std::net::UdpSocket;
1396     ///
1397     /// # futures_lite::future::block_on(async {
1398     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1399     /// socket.get_ref().connect("127.0.0.1:9000")?;
1400     ///
1401     /// let mut buf = [0u8; 1024];
1402     /// let len = socket.peek(&mut buf).await?;
1403     /// # std::io::Result::Ok(()) });
1404     /// ```
peek(&self, buf: &mut [u8]) -> io::Result<usize>1405     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1406         self.read_with(|io| io.peek(buf)).await
1407     }
1408 
1409     /// Sends data to the connected peer.
1410     ///
1411     /// Returns the number of bytes written.
1412     ///
1413     /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1414     /// This method will fail if the socket is not connected.
1415     ///
1416     /// # Examples
1417     ///
1418     /// ```no_run
1419     /// use async_io::Async;
1420     /// use std::net::UdpSocket;
1421     ///
1422     /// # futures_lite::future::block_on(async {
1423     /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1424     /// socket.get_ref().connect("127.0.0.1:9000")?;
1425     ///
1426     /// let msg = b"hello";
1427     /// let len = socket.send(msg).await?;
1428     /// # std::io::Result::Ok(()) });
1429     /// ```
send(&self, buf: &[u8]) -> io::Result<usize>1430     pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1431         self.write_with(|io| io.send(buf)).await
1432     }
1433 }
1434 
1435 impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1436     type Error = io::Error;
1437 
try_from(socket: std::net::UdpSocket) -> io::Result<Self>1438     fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1439         Async::new(socket)
1440     }
1441 }
1442 
1443 #[cfg(unix)]
1444 impl Async<UnixListener> {
1445     /// Creates a UDS listener bound to the specified path.
1446     ///
1447     /// # Examples
1448     ///
1449     /// ```no_run
1450     /// use async_io::Async;
1451     /// use std::os::unix::net::UnixListener;
1452     ///
1453     /// # futures_lite::future::block_on(async {
1454     /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1455     /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1456     /// # std::io::Result::Ok(()) });
1457     /// ```
bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>>1458     pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1459         let path = path.as_ref().to_owned();
1460         Async::new(UnixListener::bind(path)?)
1461     }
1462 
1463     /// Accepts a new incoming UDS stream connection.
1464     ///
1465     /// When a connection is established, it will be returned as a stream together with its remote
1466     /// address.
1467     ///
1468     /// # Examples
1469     ///
1470     /// ```no_run
1471     /// use async_io::Async;
1472     /// use std::os::unix::net::UnixListener;
1473     ///
1474     /// # futures_lite::future::block_on(async {
1475     /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1476     /// let (stream, addr) = listener.accept().await?;
1477     /// println!("Accepted client: {:?}", addr);
1478     /// # std::io::Result::Ok(()) });
1479     /// ```
accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)>1480     pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1481         let (stream, addr) = self.read_with(|io| io.accept()).await?;
1482         Ok((Async::new(stream)?, addr))
1483     }
1484 
1485     /// Returns a stream of incoming UDS connections.
1486     ///
1487     /// The stream is infinite, i.e. it never stops with a [`None`] item.
1488     ///
1489     /// # Examples
1490     ///
1491     /// ```no_run
1492     /// use async_io::Async;
1493     /// use futures_lite::{pin, stream::StreamExt};
1494     /// use std::os::unix::net::UnixListener;
1495     ///
1496     /// # futures_lite::future::block_on(async {
1497     /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1498     /// let incoming = listener.incoming();
1499     /// pin!(incoming);
1500     ///
1501     /// while let Some(stream) = incoming.next().await {
1502     ///     let stream = stream?;
1503     ///     println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1504     /// }
1505     /// # std::io::Result::Ok(()) });
1506     /// ```
incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_1507     pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1508         stream::unfold(self, |listener| async move {
1509             let res = listener.accept().await.map(|(stream, _)| stream);
1510             Some((res, listener))
1511         })
1512     }
1513 }
1514 
1515 #[cfg(unix)]
1516 impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1517     type Error = io::Error;
1518 
try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self>1519     fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1520         Async::new(listener)
1521     }
1522 }
1523 
1524 #[cfg(unix)]
1525 impl Async<UnixStream> {
1526     /// Creates a UDS stream connected to the specified path.
1527     ///
1528     /// # Examples
1529     ///
1530     /// ```no_run
1531     /// use async_io::Async;
1532     /// use std::os::unix::net::UnixStream;
1533     ///
1534     /// # futures_lite::future::block_on(async {
1535     /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1536     /// # std::io::Result::Ok(()) });
1537     /// ```
connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>>1538     pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1539         // Begin async connect.
1540         let socket = connect(SockAddr::unix(path)?, Domain::UNIX, None)?;
1541         let stream = Async::new(UnixStream::from(socket))?;
1542 
1543         // The stream becomes writable when connected.
1544         stream.writable().await?;
1545 
1546         // On Linux, it appears the socket may become writable even when connecting fails, so we
1547         // must do an extra check here and see if the peer address is retrievable.
1548         stream.get_ref().peer_addr()?;
1549         Ok(stream)
1550     }
1551 
1552     /// Creates an unnamed pair of connected UDS stream sockets.
1553     ///
1554     /// # Examples
1555     ///
1556     /// ```no_run
1557     /// use async_io::Async;
1558     /// use std::os::unix::net::UnixStream;
1559     ///
1560     /// # futures_lite::future::block_on(async {
1561     /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1562     /// # std::io::Result::Ok(()) });
1563     /// ```
pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)>1564     pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1565         let (stream1, stream2) = UnixStream::pair()?;
1566         Ok((Async::new(stream1)?, Async::new(stream2)?))
1567     }
1568 }
1569 
1570 #[cfg(unix)]
1571 impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1572     type Error = io::Error;
1573 
try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self>1574     fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1575         Async::new(stream)
1576     }
1577 }
1578 
1579 #[cfg(unix)]
1580 impl Async<UnixDatagram> {
1581     /// Creates a UDS datagram socket bound to the specified path.
1582     ///
1583     /// # Examples
1584     ///
1585     /// ```no_run
1586     /// use async_io::Async;
1587     /// use std::os::unix::net::UnixDatagram;
1588     ///
1589     /// # futures_lite::future::block_on(async {
1590     /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1591     /// # std::io::Result::Ok(()) });
1592     /// ```
bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>>1593     pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1594         let path = path.as_ref().to_owned();
1595         Async::new(UnixDatagram::bind(path)?)
1596     }
1597 
1598     /// Creates a UDS datagram socket not bound to any address.
1599     ///
1600     /// # Examples
1601     ///
1602     /// ```no_run
1603     /// use async_io::Async;
1604     /// use std::os::unix::net::UnixDatagram;
1605     ///
1606     /// # futures_lite::future::block_on(async {
1607     /// let socket = Async::<UnixDatagram>::unbound()?;
1608     /// # std::io::Result::Ok(()) });
1609     /// ```
unbound() -> io::Result<Async<UnixDatagram>>1610     pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1611         Async::new(UnixDatagram::unbound()?)
1612     }
1613 
1614     /// Creates an unnamed pair of connected Unix datagram sockets.
1615     ///
1616     /// # Examples
1617     ///
1618     /// ```no_run
1619     /// use async_io::Async;
1620     /// use std::os::unix::net::UnixDatagram;
1621     ///
1622     /// # futures_lite::future::block_on(async {
1623     /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1624     /// # std::io::Result::Ok(()) });
1625     /// ```
pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)>1626     pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1627         let (socket1, socket2) = UnixDatagram::pair()?;
1628         Ok((Async::new(socket1)?, Async::new(socket2)?))
1629     }
1630 
1631     /// Receives data from the socket.
1632     ///
1633     /// Returns the number of bytes read and the address the message came from.
1634     ///
1635     /// # Examples
1636     ///
1637     /// ```no_run
1638     /// use async_io::Async;
1639     /// use std::os::unix::net::UnixDatagram;
1640     ///
1641     /// # futures_lite::future::block_on(async {
1642     /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1643     ///
1644     /// let mut buf = [0u8; 1024];
1645     /// let (len, addr) = socket.recv_from(&mut buf).await?;
1646     /// # std::io::Result::Ok(()) });
1647     /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)>1648     pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1649         self.read_with(|io| io.recv_from(buf)).await
1650     }
1651 
1652     /// Sends data to the specified address.
1653     ///
1654     /// Returns the number of bytes written.
1655     ///
1656     /// # Examples
1657     ///
1658     /// ```no_run
1659     /// use async_io::Async;
1660     /// use std::os::unix::net::UnixDatagram;
1661     ///
1662     /// # futures_lite::future::block_on(async {
1663     /// let socket = Async::<UnixDatagram>::unbound()?;
1664     ///
1665     /// let msg = b"hello";
1666     /// let addr = "/tmp/socket";
1667     /// let len = socket.send_to(msg, addr).await?;
1668     /// # std::io::Result::Ok(()) });
1669     /// ```
send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize>1670     pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1671         self.write_with(|io| io.send_to(buf, &path)).await
1672     }
1673 
1674     /// Receives data from the connected peer.
1675     ///
1676     /// Returns the number of bytes read and the address the message came from.
1677     ///
1678     /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1679     /// This method will fail if the socket is not connected.
1680     ///
1681     /// # Examples
1682     ///
1683     /// ```no_run
1684     /// use async_io::Async;
1685     /// use std::os::unix::net::UnixDatagram;
1686     ///
1687     /// # futures_lite::future::block_on(async {
1688     /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1689     /// socket.get_ref().connect("/tmp/socket2")?;
1690     ///
1691     /// let mut buf = [0u8; 1024];
1692     /// let len = socket.recv(&mut buf).await?;
1693     /// # std::io::Result::Ok(()) });
1694     /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>1695     pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1696         self.read_with(|io| io.recv(buf)).await
1697     }
1698 
1699     /// Sends data to the connected peer.
1700     ///
1701     /// Returns the number of bytes written.
1702     ///
1703     /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1704     /// This method will fail if the socket is not connected.
1705     ///
1706     /// # Examples
1707     ///
1708     /// ```no_run
1709     /// use async_io::Async;
1710     /// use std::os::unix::net::UnixDatagram;
1711     ///
1712     /// # futures_lite::future::block_on(async {
1713     /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1714     /// socket.get_ref().connect("/tmp/socket2")?;
1715     ///
1716     /// let msg = b"hello";
1717     /// let len = socket.send(msg).await?;
1718     /// # std::io::Result::Ok(()) });
1719     /// ```
send(&self, buf: &[u8]) -> io::Result<usize>1720     pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1721         self.write_with(|io| io.send(buf)).await
1722     }
1723 }
1724 
1725 #[cfg(unix)]
1726 impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
1727     type Error = io::Error;
1728 
try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self>1729     fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
1730         Async::new(socket)
1731     }
1732 }
1733 
1734 /// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>1735 async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
1736     let mut polled = false;
1737     pin!(fut);
1738 
1739     future::poll_fn(|cx| {
1740         if !polled {
1741             polled = true;
1742             fut.as_mut().poll(cx)
1743         } else {
1744             Poll::Ready(Ok(()))
1745         }
1746     })
1747     .await
1748 }
1749 
connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket>1750 fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket> {
1751     let sock_type = Type::STREAM;
1752     #[cfg(any(
1753         target_os = "android",
1754         target_os = "dragonfly",
1755         target_os = "freebsd",
1756         target_os = "fuchsia",
1757         target_os = "illumos",
1758         target_os = "linux",
1759         target_os = "netbsd",
1760         target_os = "openbsd"
1761     ))]
1762     // If we can, set nonblocking at socket creation for unix
1763     let sock_type = sock_type.nonblocking();
1764     // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
1765     let socket = Socket::new(domain, sock_type, protocol)?;
1766     #[cfg(not(any(
1767         target_os = "android",
1768         target_os = "dragonfly",
1769         target_os = "freebsd",
1770         target_os = "fuchsia",
1771         target_os = "illumos",
1772         target_os = "linux",
1773         target_os = "netbsd",
1774         target_os = "openbsd"
1775     )))]
1776     // If the current platform doesn't support nonblocking at creation, enable it after creation
1777     socket.set_nonblocking(true)?;
1778     match socket.connect(&addr) {
1779         Ok(_) => {}
1780         #[cfg(unix)]
1781         Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
1782         Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1783         Err(err) => return Err(err),
1784     }
1785     Ok(socket)
1786 }
1787