1 //! Async I/O and timers.
2 //!
3 //! This crate provides two tools:
4 //!
5 //! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6 //! async programs.
7 //! * [`Timer`], a future or stream that emits timed events.
8 //!
9 //! For concrete async networking types built on top of this crate, see [`async-net`].
10 //!
11 //! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12 //! [`async-net`]: https://docs.rs/async-net
13 //!
14 //! # Implementation
15 //!
16 //! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17 //! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18 //! wake appropriate futures blocked on I/O or timers when they can be resumed.
19 //!
20 //! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21 //! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [wepoll] on Windows. That
22 //! functionality is provided by the [`polling`] crate.
23 //!
24 //! However, note that you can also process I/O events and wake futures on any thread using the
25 //! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26 //! processing I/O events in case no other threads are.
27 //!
28 //! [epoll]: https://en.wikipedia.org/wiki/Epoll
29 //! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30 //! [event ports]: https://illumos.org/man/port_create
31 //! [wepoll]: https://github.com/piscisaureus/wepoll
32 //! [`polling`]: https://docs.rs/polling
33 //!
34 //! # Examples
35 //!
36 //! Connect to `example.com:80`, or time out after 10 seconds.
37 //!
38 //! ```
39 //! use async_io::{Async, Timer};
40 //! use futures_lite::{future::FutureExt, io};
41 //!
42 //! use std::net::{TcpStream, ToSocketAddrs};
43 //! use std::time::Duration;
44 //!
45 //! # futures_lite::future::block_on(async {
46 //! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47 //!
48 //! let stream = Async::<TcpStream>::connect(addr).or(async {
49 //! Timer::after(Duration::from_secs(10)).await;
50 //! Err(io::ErrorKind::TimedOut.into())
51 //! })
52 //! .await?;
53 //! # std::io::Result::Ok(()) });
54 //! ```
55
56 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57
58 use std::convert::TryFrom;
59 use std::future::Future;
60 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
61 use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
62 use std::pin::Pin;
63 use std::sync::Arc;
64 use std::task::{Context, Poll, Waker};
65 use std::time::{Duration, Instant};
66
67 #[cfg(unix)]
68 use std::{
69 os::unix::io::{AsRawFd, RawFd},
70 os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
71 path::Path,
72 };
73
74 #[cfg(windows)]
75 use std::os::windows::io::{AsRawSocket, RawSocket};
76
77 use futures_lite::io::{AsyncRead, AsyncWrite};
78 use futures_lite::stream::{self, Stream};
79 use futures_lite::{future, pin, ready};
80 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
81
82 use crate::reactor::{Reactor, Source};
83
84 mod driver;
85 mod reactor;
86
87 pub use driver::block_on;
88 pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
89
90 /// Use `Duration::MAX` once `duration_constants` are stabilized.
duration_max() -> Duration91 fn duration_max() -> Duration {
92 Duration::new(std::u64::MAX, 1_000_000_000 - 1)
93 }
94
95 /// A future or stream that emits timed events.
96 ///
97 /// Timers are futures that output a single [`Instant`] when they fire.
98 ///
99 /// Timers are also streams that can output [`Instant`]s periodically.
100 ///
101 /// # Examples
102 ///
103 /// Sleep for 1 second:
104 ///
105 /// ```
106 /// use async_io::Timer;
107 /// use std::time::Duration;
108 ///
109 /// # futures_lite::future::block_on(async {
110 /// Timer::after(Duration::from_secs(1)).await;
111 /// # });
112 /// ```
113 ///
114 /// Timeout after 1 second:
115 ///
116 /// ```
117 /// use async_io::Timer;
118 /// use futures_lite::FutureExt;
119 /// use std::time::Duration;
120 ///
121 /// # futures_lite::future::block_on(async {
122 /// let addrs = async_net::resolve("google.com:80")
123 /// .or(async {
124 /// Timer::after(Duration::from_secs(10)).await;
125 /// Err(std::io::ErrorKind::TimedOut.into())
126 /// })
127 /// .await?;
128 /// # std::io::Result::Ok(()) });
129 /// ```
130 #[derive(Debug)]
131 pub struct Timer {
132 /// This timer's ID and last waker that polled it.
133 ///
134 /// When this field is set to `None`, this timer is not registered in the reactor.
135 id_and_waker: Option<(usize, Waker)>,
136
137 /// The next instant at which this timer fires.
138 when: Instant,
139
140 /// The period.
141 period: Duration,
142 }
143
144 impl Timer {
145 /// Creates a timer that emits an event once after the given duration of time.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use async_io::Timer;
151 /// use std::time::Duration;
152 ///
153 /// # futures_lite::future::block_on(async {
154 /// Timer::after(Duration::from_secs(1)).await;
155 /// # });
156 /// ```
after(duration: Duration) -> Timer157 pub fn after(duration: Duration) -> Timer {
158 Timer::at(Instant::now() + duration)
159 }
160
161 /// Creates a timer that emits an event once at the given time instant.
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use async_io::Timer;
167 /// use std::time::{Duration, Instant};
168 ///
169 /// # futures_lite::future::block_on(async {
170 /// let now = Instant::now();
171 /// let when = now + Duration::from_secs(1);
172 /// Timer::at(when).await;
173 /// # });
174 /// ```
at(instant: Instant) -> Timer175 pub fn at(instant: Instant) -> Timer {
176 // Use Duration::MAX once duration_constants are stabilized.
177 Timer::interval_at(instant, duration_max())
178 }
179
180 /// Creates a timer that emits events periodically.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use async_io::Timer;
186 /// use futures_lite::StreamExt;
187 /// use std::time::{Duration, Instant};
188 ///
189 /// # futures_lite::future::block_on(async {
190 /// let period = Duration::from_secs(1);
191 /// Timer::interval(period).next().await;
192 /// # });
193 /// ```
interval(period: Duration) -> Timer194 pub fn interval(period: Duration) -> Timer {
195 Timer::interval_at(Instant::now() + period, period)
196 }
197
198 /// Creates a timer that emits events periodically, starting at `start`.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// use async_io::Timer;
204 /// use futures_lite::StreamExt;
205 /// use std::time::{Duration, Instant};
206 ///
207 /// # futures_lite::future::block_on(async {
208 /// let start = Instant::now();
209 /// let period = Duration::from_secs(1);
210 /// Timer::interval_at(start, period).next().await;
211 /// # });
212 /// ```
interval_at(start: Instant, period: Duration) -> Timer213 pub fn interval_at(start: Instant, period: Duration) -> Timer {
214 Timer {
215 id_and_waker: None,
216 when: start,
217 period,
218 }
219 }
220
221 /// Sets the timer to emit an en event once after the given duration of time.
222 ///
223 /// Note that resetting a timer is different from creating a new timer because
224 /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
225 /// that is polling the timer.
226 ///
227 /// # Examples
228 ///
229 /// ```
230 /// use async_io::Timer;
231 /// use std::time::Duration;
232 ///
233 /// # futures_lite::future::block_on(async {
234 /// let mut t = Timer::after(Duration::from_secs(1));
235 /// t.set_after(Duration::from_millis(100));
236 /// # });
237 /// ```
set_after(&mut self, duration: Duration)238 pub fn set_after(&mut self, duration: Duration) {
239 self.set_at(Instant::now() + duration);
240 }
241
242 /// Sets the timer to emit an event once at the given time instant.
243 ///
244 /// Note that resetting a timer is different from creating a new timer because
245 /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
246 /// that is polling the timer.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use async_io::Timer;
252 /// use std::time::{Duration, Instant};
253 ///
254 /// # futures_lite::future::block_on(async {
255 /// let mut t = Timer::after(Duration::from_secs(1));
256 ///
257 /// let now = Instant::now();
258 /// let when = now + Duration::from_secs(1);
259 /// t.set_at(when);
260 /// # });
261 /// ```
set_at(&mut self, instant: Instant)262 pub fn set_at(&mut self, instant: Instant) {
263 if let Some((id, _)) = self.id_and_waker.as_ref() {
264 // Deregister the timer from the reactor.
265 Reactor::get().remove_timer(self.when, *id);
266 }
267
268 // Update the timeout.
269 self.when = instant;
270
271 if let Some((id, waker)) = self.id_and_waker.as_mut() {
272 // Re-register the timer with the new timeout.
273 *id = Reactor::get().insert_timer(self.when, waker);
274 }
275 }
276
277 /// Sets the timer to emit events periodically.
278 ///
279 /// Note that resetting a timer is different from creating a new timer because
280 /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
281 /// task that is polling the timer.
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// use async_io::Timer;
287 /// use futures_lite::StreamExt;
288 /// use std::time::{Duration, Instant};
289 ///
290 /// # futures_lite::future::block_on(async {
291 /// let mut t = Timer::after(Duration::from_secs(1));
292 ///
293 /// let period = Duration::from_secs(2);
294 /// t.set_interval(period);
295 /// # });
296 /// ```
set_interval(&mut self, period: Duration)297 pub fn set_interval(&mut self, period: Duration) {
298 self.set_interval_at(Instant::now() + period, period);
299 }
300
301 /// Sets the timer to emit events periodically, starting at `start`.
302 ///
303 /// Note that resetting a timer is different from creating a new timer because
304 /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
305 /// the task that is polling the timer.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use async_io::Timer;
311 /// use futures_lite::StreamExt;
312 /// use std::time::{Duration, Instant};
313 ///
314 /// # futures_lite::future::block_on(async {
315 /// let mut t = Timer::after(Duration::from_secs(1));
316 ///
317 /// let start = Instant::now();
318 /// let period = Duration::from_secs(2);
319 /// t.set_interval_at(start, period);
320 /// # });
321 /// ```
set_interval_at(&mut self, start: Instant, period: Duration)322 pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
323 if let Some((id, _)) = self.id_and_waker.as_ref() {
324 // Deregister the timer from the reactor.
325 Reactor::get().remove_timer(self.when, *id);
326 }
327
328 self.when = start;
329 self.period = period;
330
331 if let Some((id, waker)) = self.id_and_waker.as_mut() {
332 // Re-register the timer with the new timeout.
333 *id = Reactor::get().insert_timer(self.when, waker);
334 }
335 }
336 }
337
338 impl Drop for Timer {
drop(&mut self)339 fn drop(&mut self) {
340 if let Some((id, _)) = self.id_and_waker.take() {
341 // Deregister the timer from the reactor.
342 Reactor::get().remove_timer(self.when, id);
343 }
344 }
345 }
346
347 impl Future for Timer {
348 type Output = Instant;
349
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>350 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351 match self.poll_next(cx) {
352 Poll::Ready(Some(when)) => Poll::Ready(when),
353 Poll::Pending => Poll::Pending,
354 Poll::Ready(None) => unreachable!(),
355 }
356 }
357 }
358
359 impl Stream for Timer {
360 type Item = Instant;
361
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>362 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
363 // Check if the timer has already fired.
364 if Instant::now() >= self.when {
365 if let Some((id, _)) = self.id_and_waker.take() {
366 // Deregister the timer from the reactor.
367 Reactor::get().remove_timer(self.when, id);
368 }
369 let when = self.when;
370 if let Some(next) = when.checked_add(self.period) {
371 self.when = next;
372 // Register the timer in the reactor.
373 let id = Reactor::get().insert_timer(self.when, cx.waker());
374 self.id_and_waker = Some((id, cx.waker().clone()));
375 }
376 return Poll::Ready(Some(when));
377 } else {
378 match &self.id_and_waker {
379 None => {
380 // Register the timer in the reactor.
381 let id = Reactor::get().insert_timer(self.when, cx.waker());
382 self.id_and_waker = Some((id, cx.waker().clone()));
383 }
384 Some((id, w)) if !w.will_wake(cx.waker()) => {
385 // Deregister the timer from the reactor to remove the old waker.
386 Reactor::get().remove_timer(self.when, *id);
387
388 // Register the timer in the reactor with the new waker.
389 let id = Reactor::get().insert_timer(self.when, cx.waker());
390 self.id_and_waker = Some((id, cx.waker().clone()));
391 }
392 Some(_) => {}
393 }
394 }
395 Poll::Pending
396 }
397 }
398
399 /// Async adapter for I/O types.
400 ///
401 /// This type puts an I/O handle into non-blocking mode, registers it in
402 /// [epoll]/[kqueue]/[event ports]/[wepoll], and then provides an async interface for it.
403 ///
404 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
405 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
406 /// [event ports]: https://illumos.org/man/port_create
407 /// [wepoll]: https://github.com/piscisaureus/wepoll
408 ///
409 /// # Caveats
410 ///
411 /// [`Async`] is a low-level primitive, and as such it comes with some caveats.
412 ///
413 /// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
414 /// [`async-process`] (on Unix).
415 ///
416 /// [`async-net`]: https://github.com/smol-rs/async-net
417 /// [`async-process`]: https://github.com/smol-rs/async-process
418 ///
419 /// ### Supported types
420 ///
421 /// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
422 /// [timerfd] and [inotify].
423 ///
424 /// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
425 /// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
426 /// because all operating systems have issues with them when put in non-blocking mode.
427 ///
428 /// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
429 /// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
430 ///
431 /// ### Concurrent I/O
432 ///
433 /// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
434 /// implements those traits, which means tasks can concurrently read and write using shared
435 /// references.
436 ///
437 /// But there is a catch: only one task can read a time, and only one task can write at a time. It
438 /// is okay to have two tasks where one is reading and the other is writing at the same time, but
439 /// it is not okay to have two tasks reading at the same time or writing at the same time. If you
440 /// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
441 /// time.
442 ///
443 /// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
444 /// [`poll_readable()`][`Async::poll_readable()`] and
445 /// [`poll_writable()`][`Async::poll_writable()`].
446 ///
447 /// However, any number of tasks can be concurrently calling other methods like
448 /// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
449 ///
450 /// ### Closing
451 ///
452 /// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
453 /// simply flushes. If you want to shutdown a TCP or Unix socket, use
454 /// [`Shutdown`][`std::net::Shutdown`].
455 ///
456 /// # Examples
457 ///
458 /// Connect to a server and echo incoming messages back to the server:
459 ///
460 /// ```no_run
461 /// use async_io::Async;
462 /// use futures_lite::io;
463 /// use std::net::TcpStream;
464 ///
465 /// # futures_lite::future::block_on(async {
466 /// // Connect to a local server.
467 /// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
468 ///
469 /// // Echo all messages from the read side of the stream into the write side.
470 /// io::copy(&stream, &stream).await?;
471 /// # std::io::Result::Ok(()) });
472 /// ```
473 ///
474 /// You can use either predefined async methods or wrap blocking I/O operations in
475 /// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
476 /// [`Async::write_with_mut()`]:
477 ///
478 /// ```no_run
479 /// use async_io::Async;
480 /// use std::net::TcpListener;
481 ///
482 /// # futures_lite::future::block_on(async {
483 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
484 ///
485 /// // These two lines are equivalent:
486 /// let (stream, addr) = listener.accept().await?;
487 /// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
488 /// # std::io::Result::Ok(()) });
489 /// ```
490 #[derive(Debug)]
491 pub struct Async<T> {
492 /// A source registered in the reactor.
493 source: Arc<Source>,
494
495 /// The inner I/O handle.
496 io: Option<T>,
497 }
498
499 impl<T> Unpin for Async<T> {}
500
501 #[cfg(unix)]
502 impl<T: AsRawFd> Async<T> {
503 /// Creates an async I/O handle.
504 ///
505 /// This method will put the handle in non-blocking mode and register it in
506 /// [epoll]/[kqueue]/[event ports]/[wepoll].
507 ///
508 /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
509 /// `AsRawSocket`.
510 ///
511 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
512 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
513 /// [event ports]: https://illumos.org/man/port_create
514 /// [wepoll]: https://github.com/piscisaureus/wepoll
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// use async_io::Async;
520 /// use std::net::{SocketAddr, TcpListener};
521 ///
522 /// # futures_lite::future::block_on(async {
523 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
524 /// let listener = Async::new(listener)?;
525 /// # std::io::Result::Ok(()) });
526 /// ```
new(io: T) -> io::Result<Async<T>>527 pub fn new(io: T) -> io::Result<Async<T>> {
528 let fd = io.as_raw_fd();
529
530 // Put the file descriptor in non-blocking mode.
531 unsafe {
532 let mut res = libc::fcntl(fd, libc::F_GETFL);
533 if res != -1 {
534 res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
535 }
536 if res == -1 {
537 return Err(io::Error::last_os_error());
538 }
539 }
540
541 Ok(Async {
542 source: Reactor::get().insert_io(fd)?,
543 io: Some(io),
544 })
545 }
546 }
547
548 #[cfg(unix)]
549 impl<T: AsRawFd> AsRawFd for Async<T> {
as_raw_fd(&self) -> RawFd550 fn as_raw_fd(&self) -> RawFd {
551 self.source.raw
552 }
553 }
554
555 #[cfg(windows)]
556 impl<T: AsRawSocket> Async<T> {
557 /// Creates an async I/O handle.
558 ///
559 /// This method will put the handle in non-blocking mode and register it in
560 /// [epoll]/[kqueue]/[event ports]/[wepoll].
561 ///
562 /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
563 /// `AsRawSocket`.
564 ///
565 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
566 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
567 /// [event ports]: https://illumos.org/man/port_create
568 /// [wepoll]: https://github.com/piscisaureus/wepoll
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// use async_io::Async;
574 /// use std::net::{SocketAddr, TcpListener};
575 ///
576 /// # futures_lite::future::block_on(async {
577 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
578 /// let listener = Async::new(listener)?;
579 /// # std::io::Result::Ok(()) });
580 /// ```
new(io: T) -> io::Result<Async<T>>581 pub fn new(io: T) -> io::Result<Async<T>> {
582 let sock = io.as_raw_socket();
583
584 // Put the socket in non-blocking mode.
585 unsafe {
586 use winapi::ctypes;
587 use winapi::um::winsock2;
588
589 let mut nonblocking = true as ctypes::c_ulong;
590 let res = winsock2::ioctlsocket(
591 sock as winsock2::SOCKET,
592 winsock2::FIONBIO,
593 &mut nonblocking,
594 );
595 if res != 0 {
596 return Err(io::Error::last_os_error());
597 }
598 }
599
600 Ok(Async {
601 source: Reactor::get().insert_io(sock)?,
602 io: Some(io),
603 })
604 }
605 }
606
607 #[cfg(windows)]
608 impl<T: AsRawSocket> AsRawSocket for Async<T> {
as_raw_socket(&self) -> RawSocket609 fn as_raw_socket(&self) -> RawSocket {
610 self.source.raw
611 }
612 }
613
614 impl<T> Async<T> {
615 /// Gets a reference to the inner I/O handle.
616 ///
617 /// # Examples
618 ///
619 /// ```
620 /// use async_io::Async;
621 /// use std::net::TcpListener;
622 ///
623 /// # futures_lite::future::block_on(async {
624 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
625 /// let inner = listener.get_ref();
626 /// # std::io::Result::Ok(()) });
627 /// ```
get_ref(&self) -> &T628 pub fn get_ref(&self) -> &T {
629 self.io.as_ref().unwrap()
630 }
631
632 /// Gets a mutable reference to the inner I/O handle.
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// use async_io::Async;
638 /// use std::net::TcpListener;
639 ///
640 /// # futures_lite::future::block_on(async {
641 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
642 /// let inner = listener.get_mut();
643 /// # std::io::Result::Ok(()) });
644 /// ```
get_mut(&mut self) -> &mut T645 pub fn get_mut(&mut self) -> &mut T {
646 self.io.as_mut().unwrap()
647 }
648
649 /// Unwraps the inner I/O handle.
650 ///
651 /// This method will **not** put the I/O handle back into blocking mode.
652 ///
653 /// # Examples
654 ///
655 /// ```
656 /// use async_io::Async;
657 /// use std::net::TcpListener;
658 ///
659 /// # futures_lite::future::block_on(async {
660 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
661 /// let inner = listener.into_inner()?;
662 ///
663 /// // Put the listener back into blocking mode.
664 /// inner.set_nonblocking(false)?;
665 /// # std::io::Result::Ok(()) });
666 /// ```
into_inner(mut self) -> io::Result<T>667 pub fn into_inner(mut self) -> io::Result<T> {
668 let io = self.io.take().unwrap();
669 Reactor::get().remove_io(&self.source)?;
670 Ok(io)
671 }
672
673 /// Waits until the I/O handle is readable.
674 ///
675 /// This method completes when a read operation on this I/O handle wouldn't block.
676 ///
677 /// # Examples
678 ///
679 /// ```no_run
680 /// use async_io::Async;
681 /// use std::net::TcpListener;
682 ///
683 /// # futures_lite::future::block_on(async {
684 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
685 ///
686 /// // Wait until a client can be accepted.
687 /// listener.readable().await?;
688 /// # std::io::Result::Ok(()) });
689 /// ```
readable(&self) -> Readable<'_, T>690 pub fn readable(&self) -> Readable<'_, T> {
691 Source::readable(self)
692 }
693
694 /// Waits until the I/O handle is readable.
695 ///
696 /// This method completes when a read operation on this I/O handle wouldn't block.
readable_owned(self: Arc<Self>) -> ReadableOwned<T>697 pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
698 Source::readable_owned(self)
699 }
700
701 /// Waits until the I/O handle is writable.
702 ///
703 /// This method completes when a write operation on this I/O handle wouldn't block.
704 ///
705 /// # Examples
706 ///
707 /// ```
708 /// use async_io::Async;
709 /// use std::net::{TcpStream, ToSocketAddrs};
710 ///
711 /// # futures_lite::future::block_on(async {
712 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
713 /// let stream = Async::<TcpStream>::connect(addr).await?;
714 ///
715 /// // Wait until the stream is writable.
716 /// stream.writable().await?;
717 /// # std::io::Result::Ok(()) });
718 /// ```
writable(&self) -> Writable<'_, T>719 pub fn writable(&self) -> Writable<'_, T> {
720 Source::writable(self)
721 }
722
723 /// Waits until the I/O handle is writable.
724 ///
725 /// This method completes when a write operation on this I/O handle wouldn't block.
writable_owned(self: Arc<Self>) -> WritableOwned<T>726 pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
727 Source::writable_owned(self)
728 }
729
730 /// Polls the I/O handle for readability.
731 ///
732 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
733 /// indicating readability since the last time this task has called the method and received
734 /// [`Poll::Pending`].
735 ///
736 /// # Caveats
737 ///
738 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
739 /// will just keep waking each other in turn, thus wasting CPU time.
740 ///
741 /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
742 ///
743 /// # Examples
744 ///
745 /// ```no_run
746 /// use async_io::Async;
747 /// use futures_lite::future;
748 /// use std::net::TcpListener;
749 ///
750 /// # futures_lite::future::block_on(async {
751 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
752 ///
753 /// // Wait until a client can be accepted.
754 /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
755 /// # std::io::Result::Ok(()) });
756 /// ```
poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>757 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
758 self.source.poll_readable(cx)
759 }
760
761 /// Polls the I/O handle for writability.
762 ///
763 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
764 /// indicating writability since the last time this task has called the method and received
765 /// [`Poll::Pending`].
766 ///
767 /// # Caveats
768 ///
769 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
770 /// will just keep waking each other in turn, thus wasting CPU time.
771 ///
772 /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use async_io::Async;
778 /// use futures_lite::future;
779 /// use std::net::{TcpStream, ToSocketAddrs};
780 ///
781 /// # futures_lite::future::block_on(async {
782 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
783 /// let stream = Async::<TcpStream>::connect(addr).await?;
784 ///
785 /// // Wait until the stream is writable.
786 /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
787 /// # std::io::Result::Ok(()) });
788 /// ```
poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>789 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
790 self.source.poll_writable(cx)
791 }
792
793 /// Performs a read operation asynchronously.
794 ///
795 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
796 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
797 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
798 /// sends a notification that the I/O handle is readable.
799 ///
800 /// The closure receives a shared reference to the I/O handle.
801 ///
802 /// # Examples
803 ///
804 /// ```no_run
805 /// use async_io::Async;
806 /// use std::net::TcpListener;
807 ///
808 /// # futures_lite::future::block_on(async {
809 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
810 ///
811 /// // Accept a new client asynchronously.
812 /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
813 /// # std::io::Result::Ok(()) });
814 /// ```
read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R>815 pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
816 let mut op = op;
817 loop {
818 match op(self.get_ref()) {
819 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
820 res => return res,
821 }
822 optimistic(self.readable()).await?;
823 }
824 }
825
826 /// Performs a read operation asynchronously.
827 ///
828 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
829 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
830 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
831 /// sends a notification that the I/O handle is readable.
832 ///
833 /// The closure receives a mutable reference to the I/O handle.
834 ///
835 /// # Examples
836 ///
837 /// ```no_run
838 /// use async_io::Async;
839 /// use std::net::TcpListener;
840 ///
841 /// # futures_lite::future::block_on(async {
842 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
843 ///
844 /// // Accept a new client asynchronously.
845 /// let (stream, addr) = listener.read_with_mut(|l| l.accept()).await?;
846 /// # std::io::Result::Ok(()) });
847 /// ```
read_with_mut<R>( &mut self, op: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>848 pub async fn read_with_mut<R>(
849 &mut self,
850 op: impl FnMut(&mut T) -> io::Result<R>,
851 ) -> io::Result<R> {
852 let mut op = op;
853 loop {
854 match op(self.get_mut()) {
855 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
856 res => return res,
857 }
858 optimistic(self.readable()).await?;
859 }
860 }
861
862 /// Performs a write operation asynchronously.
863 ///
864 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
865 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
866 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
867 /// sends a notification that the I/O handle is writable.
868 ///
869 /// The closure receives a shared reference to the I/O handle.
870 ///
871 /// # Examples
872 ///
873 /// ```no_run
874 /// use async_io::Async;
875 /// use std::net::UdpSocket;
876 ///
877 /// # futures_lite::future::block_on(async {
878 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
879 /// socket.get_ref().connect("127.0.0.1:9000")?;
880 ///
881 /// let msg = b"hello";
882 /// let len = socket.write_with(|s| s.send(msg)).await?;
883 /// # std::io::Result::Ok(()) });
884 /// ```
write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R>885 pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
886 let mut op = op;
887 loop {
888 match op(self.get_ref()) {
889 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
890 res => return res,
891 }
892 optimistic(self.writable()).await?;
893 }
894 }
895
896 /// Performs a write operation asynchronously.
897 ///
898 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
899 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
900 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
901 /// sends a notification that the I/O handle is writable.
902 ///
903 /// The closure receives a mutable reference to the I/O handle.
904 ///
905 /// # Examples
906 ///
907 /// ```no_run
908 /// use async_io::Async;
909 /// use std::net::UdpSocket;
910 ///
911 /// # futures_lite::future::block_on(async {
912 /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
913 /// socket.get_ref().connect("127.0.0.1:9000")?;
914 ///
915 /// let msg = b"hello";
916 /// let len = socket.write_with_mut(|s| s.send(msg)).await?;
917 /// # std::io::Result::Ok(()) });
918 /// ```
write_with_mut<R>( &mut self, op: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>919 pub async fn write_with_mut<R>(
920 &mut self,
921 op: impl FnMut(&mut T) -> io::Result<R>,
922 ) -> io::Result<R> {
923 let mut op = op;
924 loop {
925 match op(self.get_mut()) {
926 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
927 res => return res,
928 }
929 optimistic(self.writable()).await?;
930 }
931 }
932 }
933
934 impl<T> AsRef<T> for Async<T> {
as_ref(&self) -> &T935 fn as_ref(&self) -> &T {
936 self.get_ref()
937 }
938 }
939
940 impl<T> AsMut<T> for Async<T> {
as_mut(&mut self) -> &mut T941 fn as_mut(&mut self) -> &mut T {
942 self.get_mut()
943 }
944 }
945
946 impl<T> Drop for Async<T> {
drop(&mut self)947 fn drop(&mut self) {
948 if self.io.is_some() {
949 // Deregister and ignore errors because destructors should not panic.
950 Reactor::get().remove_io(&self.source).ok();
951
952 // Drop the I/O handle to close it.
953 self.io.take();
954 }
955 }
956 }
957
958 impl<T: Read> AsyncRead for Async<T> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>959 fn poll_read(
960 mut self: Pin<&mut Self>,
961 cx: &mut Context<'_>,
962 buf: &mut [u8],
963 ) -> Poll<io::Result<usize>> {
964 loop {
965 match (&mut *self).get_mut().read(buf) {
966 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
967 res => return Poll::Ready(res),
968 }
969 ready!(self.poll_readable(cx))?;
970 }
971 }
972
poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>973 fn poll_read_vectored(
974 mut self: Pin<&mut Self>,
975 cx: &mut Context<'_>,
976 bufs: &mut [IoSliceMut<'_>],
977 ) -> Poll<io::Result<usize>> {
978 loop {
979 match (&mut *self).get_mut().read_vectored(bufs) {
980 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
981 res => return Poll::Ready(res),
982 }
983 ready!(self.poll_readable(cx))?;
984 }
985 }
986 }
987
988 impl<T> AsyncRead for &Async<T>
989 where
990 for<'a> &'a T: Read,
991 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>992 fn poll_read(
993 self: Pin<&mut Self>,
994 cx: &mut Context<'_>,
995 buf: &mut [u8],
996 ) -> Poll<io::Result<usize>> {
997 loop {
998 match (&*self).get_ref().read(buf) {
999 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1000 res => return Poll::Ready(res),
1001 }
1002 ready!(self.poll_readable(cx))?;
1003 }
1004 }
1005
poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>1006 fn poll_read_vectored(
1007 self: Pin<&mut Self>,
1008 cx: &mut Context<'_>,
1009 bufs: &mut [IoSliceMut<'_>],
1010 ) -> Poll<io::Result<usize>> {
1011 loop {
1012 match (&*self).get_ref().read_vectored(bufs) {
1013 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1014 res => return Poll::Ready(res),
1015 }
1016 ready!(self.poll_readable(cx))?;
1017 }
1018 }
1019 }
1020
1021 impl<T: Write> AsyncWrite for Async<T> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1022 fn poll_write(
1023 mut self: Pin<&mut Self>,
1024 cx: &mut Context<'_>,
1025 buf: &[u8],
1026 ) -> Poll<io::Result<usize>> {
1027 loop {
1028 match (&mut *self).get_mut().write(buf) {
1029 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1030 res => return Poll::Ready(res),
1031 }
1032 ready!(self.poll_writable(cx))?;
1033 }
1034 }
1035
poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>1036 fn poll_write_vectored(
1037 mut self: Pin<&mut Self>,
1038 cx: &mut Context<'_>,
1039 bufs: &[IoSlice<'_>],
1040 ) -> Poll<io::Result<usize>> {
1041 loop {
1042 match (&mut *self).get_mut().write_vectored(bufs) {
1043 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1044 res => return Poll::Ready(res),
1045 }
1046 ready!(self.poll_writable(cx))?;
1047 }
1048 }
1049
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1050 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1051 loop {
1052 match (&mut *self).get_mut().flush() {
1053 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1054 res => return Poll::Ready(res),
1055 }
1056 ready!(self.poll_writable(cx))?;
1057 }
1058 }
1059
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1060 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1061 self.poll_flush(cx)
1062 }
1063 }
1064
1065 impl<T> AsyncWrite for &Async<T>
1066 where
1067 for<'a> &'a T: Write,
1068 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1069 fn poll_write(
1070 self: Pin<&mut Self>,
1071 cx: &mut Context<'_>,
1072 buf: &[u8],
1073 ) -> Poll<io::Result<usize>> {
1074 loop {
1075 match (&*self).get_ref().write(buf) {
1076 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1077 res => return Poll::Ready(res),
1078 }
1079 ready!(self.poll_writable(cx))?;
1080 }
1081 }
1082
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>1083 fn poll_write_vectored(
1084 self: Pin<&mut Self>,
1085 cx: &mut Context<'_>,
1086 bufs: &[IoSlice<'_>],
1087 ) -> Poll<io::Result<usize>> {
1088 loop {
1089 match (&*self).get_ref().write_vectored(bufs) {
1090 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1091 res => return Poll::Ready(res),
1092 }
1093 ready!(self.poll_writable(cx))?;
1094 }
1095 }
1096
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1097 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1098 loop {
1099 match (&*self).get_ref().flush() {
1100 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1101 res => return Poll::Ready(res),
1102 }
1103 ready!(self.poll_writable(cx))?;
1104 }
1105 }
1106
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1107 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1108 self.poll_flush(cx)
1109 }
1110 }
1111
1112 impl Async<TcpListener> {
1113 /// Creates a TCP listener bound to the specified address.
1114 ///
1115 /// Binding with port number 0 will request an available port from the OS.
1116 ///
1117 /// # Examples
1118 ///
1119 /// ```
1120 /// use async_io::Async;
1121 /// use std::net::TcpListener;
1122 ///
1123 /// # futures_lite::future::block_on(async {
1124 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1125 /// println!("Listening on {}", listener.get_ref().local_addr()?);
1126 /// # std::io::Result::Ok(()) });
1127 /// ```
bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>>1128 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1129 let addr = addr.into();
1130 Async::new(TcpListener::bind(addr)?)
1131 }
1132
1133 /// Accepts a new incoming TCP connection.
1134 ///
1135 /// When a connection is established, it will be returned as a TCP stream together with its
1136 /// remote address.
1137 ///
1138 /// # Examples
1139 ///
1140 /// ```no_run
1141 /// use async_io::Async;
1142 /// use std::net::TcpListener;
1143 ///
1144 /// # futures_lite::future::block_on(async {
1145 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1146 /// let (stream, addr) = listener.accept().await?;
1147 /// println!("Accepted client: {}", addr);
1148 /// # std::io::Result::Ok(()) });
1149 /// ```
accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)>1150 pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1151 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1152 Ok((Async::new(stream)?, addr))
1153 }
1154
1155 /// Returns a stream of incoming TCP connections.
1156 ///
1157 /// The stream is infinite, i.e. it never stops with a [`None`].
1158 ///
1159 /// # Examples
1160 ///
1161 /// ```no_run
1162 /// use async_io::Async;
1163 /// use futures_lite::{pin, stream::StreamExt};
1164 /// use std::net::TcpListener;
1165 ///
1166 /// # futures_lite::future::block_on(async {
1167 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1168 /// let incoming = listener.incoming();
1169 /// pin!(incoming);
1170 ///
1171 /// while let Some(stream) = incoming.next().await {
1172 /// let stream = stream?;
1173 /// println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1174 /// }
1175 /// # std::io::Result::Ok(()) });
1176 /// ```
incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_1177 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1178 stream::unfold(self, |listener| async move {
1179 let res = listener.accept().await.map(|(stream, _)| stream);
1180 Some((res, listener))
1181 })
1182 }
1183 }
1184
1185 impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1186 type Error = io::Error;
1187
try_from(listener: std::net::TcpListener) -> io::Result<Self>1188 fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1189 Async::new(listener)
1190 }
1191 }
1192
1193 impl Async<TcpStream> {
1194 /// Creates a TCP connection to the specified address.
1195 ///
1196 /// # Examples
1197 ///
1198 /// ```
1199 /// use async_io::Async;
1200 /// use std::net::{TcpStream, ToSocketAddrs};
1201 ///
1202 /// # futures_lite::future::block_on(async {
1203 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1204 /// let stream = Async::<TcpStream>::connect(addr).await?;
1205 /// # std::io::Result::Ok(()) });
1206 /// ```
connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>>1207 pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1208 // Begin async connect.
1209 let addr = addr.into();
1210 let domain = Domain::for_address(addr);
1211 let socket = connect(addr.into(), domain, Some(Protocol::TCP))?;
1212 let stream = Async::new(TcpStream::from(socket))?;
1213
1214 // The stream becomes writable when connected.
1215 stream.writable().await?;
1216
1217 // Check if there was an error while connecting.
1218 match stream.get_ref().take_error()? {
1219 None => Ok(stream),
1220 Some(err) => Err(err),
1221 }
1222 }
1223
1224 /// Reads data from the stream without removing it from the buffer.
1225 ///
1226 /// Returns the number of bytes read. Successive calls of this method read the same data.
1227 ///
1228 /// # Examples
1229 ///
1230 /// ```
1231 /// use async_io::Async;
1232 /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1233 /// use std::net::{TcpStream, ToSocketAddrs};
1234 ///
1235 /// # futures_lite::future::block_on(async {
1236 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1237 /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1238 ///
1239 /// stream
1240 /// .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1241 /// .await?;
1242 ///
1243 /// let mut buf = [0u8; 1024];
1244 /// let len = stream.peek(&mut buf).await?;
1245 /// # std::io::Result::Ok(()) });
1246 /// ```
peek(&self, buf: &mut [u8]) -> io::Result<usize>1247 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1248 self.read_with(|io| io.peek(buf)).await
1249 }
1250 }
1251
1252 impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1253 type Error = io::Error;
1254
try_from(stream: std::net::TcpStream) -> io::Result<Self>1255 fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1256 Async::new(stream)
1257 }
1258 }
1259
1260 impl Async<UdpSocket> {
1261 /// Creates a UDP socket bound to the specified address.
1262 ///
1263 /// Binding with port number 0 will request an available port from the OS.
1264 ///
1265 /// # Examples
1266 ///
1267 /// ```
1268 /// use async_io::Async;
1269 /// use std::net::UdpSocket;
1270 ///
1271 /// # futures_lite::future::block_on(async {
1272 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1273 /// println!("Bound to {}", socket.get_ref().local_addr()?);
1274 /// # std::io::Result::Ok(()) });
1275 /// ```
bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>>1276 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1277 let addr = addr.into();
1278 Async::new(UdpSocket::bind(addr)?)
1279 }
1280
1281 /// Receives a single datagram message.
1282 ///
1283 /// Returns the number of bytes read and the address the message came from.
1284 ///
1285 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1286 /// If the message is too long to fit, excess bytes may get discarded.
1287 ///
1288 /// # Examples
1289 ///
1290 /// ```no_run
1291 /// use async_io::Async;
1292 /// use std::net::UdpSocket;
1293 ///
1294 /// # futures_lite::future::block_on(async {
1295 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1296 ///
1297 /// let mut buf = [0u8; 1024];
1298 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1299 /// # std::io::Result::Ok(()) });
1300 /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1301 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1302 self.read_with(|io| io.recv_from(buf)).await
1303 }
1304
1305 /// Receives a single datagram message without removing it from the queue.
1306 ///
1307 /// Returns the number of bytes read and the address the message came from.
1308 ///
1309 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1310 /// If the message is too long to fit, excess bytes may get discarded.
1311 ///
1312 /// # Examples
1313 ///
1314 /// ```no_run
1315 /// use async_io::Async;
1316 /// use std::net::UdpSocket;
1317 ///
1318 /// # futures_lite::future::block_on(async {
1319 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1320 ///
1321 /// let mut buf = [0u8; 1024];
1322 /// let (len, addr) = socket.peek_from(&mut buf).await?;
1323 /// # std::io::Result::Ok(()) });
1324 /// ```
peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1325 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1326 self.read_with(|io| io.peek_from(buf)).await
1327 }
1328
1329 /// Sends data to the specified address.
1330 ///
1331 /// Returns the number of bytes writen.
1332 ///
1333 /// # Examples
1334 ///
1335 /// ```no_run
1336 /// use async_io::Async;
1337 /// use std::net::UdpSocket;
1338 ///
1339 /// # futures_lite::future::block_on(async {
1340 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1341 /// let addr = socket.get_ref().local_addr()?;
1342 ///
1343 /// let msg = b"hello";
1344 /// let len = socket.send_to(msg, addr).await?;
1345 /// # std::io::Result::Ok(()) });
1346 /// ```
send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize>1347 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1348 let addr = addr.into();
1349 self.write_with(|io| io.send_to(buf, addr)).await
1350 }
1351
1352 /// Receives a single datagram message from the connected peer.
1353 ///
1354 /// Returns the number of bytes read.
1355 ///
1356 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1357 /// If the message is too long to fit, excess bytes may get discarded.
1358 ///
1359 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1360 /// This method will fail if the socket is not connected.
1361 ///
1362 /// # Examples
1363 ///
1364 /// ```no_run
1365 /// use async_io::Async;
1366 /// use std::net::UdpSocket;
1367 ///
1368 /// # futures_lite::future::block_on(async {
1369 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1370 /// socket.get_ref().connect("127.0.0.1:9000")?;
1371 ///
1372 /// let mut buf = [0u8; 1024];
1373 /// let len = socket.recv(&mut buf).await?;
1374 /// # std::io::Result::Ok(()) });
1375 /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>1376 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1377 self.read_with(|io| io.recv(buf)).await
1378 }
1379
1380 /// Receives a single datagram message from the connected peer without removing it from the
1381 /// queue.
1382 ///
1383 /// Returns the number of bytes read and the address the message came from.
1384 ///
1385 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1386 /// If the message is too long to fit, excess bytes may get discarded.
1387 ///
1388 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1389 /// This method will fail if the socket is not connected.
1390 ///
1391 /// # Examples
1392 ///
1393 /// ```no_run
1394 /// use async_io::Async;
1395 /// use std::net::UdpSocket;
1396 ///
1397 /// # futures_lite::future::block_on(async {
1398 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1399 /// socket.get_ref().connect("127.0.0.1:9000")?;
1400 ///
1401 /// let mut buf = [0u8; 1024];
1402 /// let len = socket.peek(&mut buf).await?;
1403 /// # std::io::Result::Ok(()) });
1404 /// ```
peek(&self, buf: &mut [u8]) -> io::Result<usize>1405 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1406 self.read_with(|io| io.peek(buf)).await
1407 }
1408
1409 /// Sends data to the connected peer.
1410 ///
1411 /// Returns the number of bytes written.
1412 ///
1413 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1414 /// This method will fail if the socket is not connected.
1415 ///
1416 /// # Examples
1417 ///
1418 /// ```no_run
1419 /// use async_io::Async;
1420 /// use std::net::UdpSocket;
1421 ///
1422 /// # futures_lite::future::block_on(async {
1423 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1424 /// socket.get_ref().connect("127.0.0.1:9000")?;
1425 ///
1426 /// let msg = b"hello";
1427 /// let len = socket.send(msg).await?;
1428 /// # std::io::Result::Ok(()) });
1429 /// ```
send(&self, buf: &[u8]) -> io::Result<usize>1430 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1431 self.write_with(|io| io.send(buf)).await
1432 }
1433 }
1434
1435 impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1436 type Error = io::Error;
1437
try_from(socket: std::net::UdpSocket) -> io::Result<Self>1438 fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1439 Async::new(socket)
1440 }
1441 }
1442
1443 #[cfg(unix)]
1444 impl Async<UnixListener> {
1445 /// Creates a UDS listener bound to the specified path.
1446 ///
1447 /// # Examples
1448 ///
1449 /// ```no_run
1450 /// use async_io::Async;
1451 /// use std::os::unix::net::UnixListener;
1452 ///
1453 /// # futures_lite::future::block_on(async {
1454 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1455 /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1456 /// # std::io::Result::Ok(()) });
1457 /// ```
bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>>1458 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1459 let path = path.as_ref().to_owned();
1460 Async::new(UnixListener::bind(path)?)
1461 }
1462
1463 /// Accepts a new incoming UDS stream connection.
1464 ///
1465 /// When a connection is established, it will be returned as a stream together with its remote
1466 /// address.
1467 ///
1468 /// # Examples
1469 ///
1470 /// ```no_run
1471 /// use async_io::Async;
1472 /// use std::os::unix::net::UnixListener;
1473 ///
1474 /// # futures_lite::future::block_on(async {
1475 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1476 /// let (stream, addr) = listener.accept().await?;
1477 /// println!("Accepted client: {:?}", addr);
1478 /// # std::io::Result::Ok(()) });
1479 /// ```
accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)>1480 pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1481 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1482 Ok((Async::new(stream)?, addr))
1483 }
1484
1485 /// Returns a stream of incoming UDS connections.
1486 ///
1487 /// The stream is infinite, i.e. it never stops with a [`None`] item.
1488 ///
1489 /// # Examples
1490 ///
1491 /// ```no_run
1492 /// use async_io::Async;
1493 /// use futures_lite::{pin, stream::StreamExt};
1494 /// use std::os::unix::net::UnixListener;
1495 ///
1496 /// # futures_lite::future::block_on(async {
1497 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1498 /// let incoming = listener.incoming();
1499 /// pin!(incoming);
1500 ///
1501 /// while let Some(stream) = incoming.next().await {
1502 /// let stream = stream?;
1503 /// println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1504 /// }
1505 /// # std::io::Result::Ok(()) });
1506 /// ```
incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_1507 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1508 stream::unfold(self, |listener| async move {
1509 let res = listener.accept().await.map(|(stream, _)| stream);
1510 Some((res, listener))
1511 })
1512 }
1513 }
1514
1515 #[cfg(unix)]
1516 impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1517 type Error = io::Error;
1518
try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self>1519 fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1520 Async::new(listener)
1521 }
1522 }
1523
1524 #[cfg(unix)]
1525 impl Async<UnixStream> {
1526 /// Creates a UDS stream connected to the specified path.
1527 ///
1528 /// # Examples
1529 ///
1530 /// ```no_run
1531 /// use async_io::Async;
1532 /// use std::os::unix::net::UnixStream;
1533 ///
1534 /// # futures_lite::future::block_on(async {
1535 /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1536 /// # std::io::Result::Ok(()) });
1537 /// ```
connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>>1538 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1539 // Begin async connect.
1540 let socket = connect(SockAddr::unix(path)?, Domain::UNIX, None)?;
1541 let stream = Async::new(UnixStream::from(socket))?;
1542
1543 // The stream becomes writable when connected.
1544 stream.writable().await?;
1545
1546 // On Linux, it appears the socket may become writable even when connecting fails, so we
1547 // must do an extra check here and see if the peer address is retrievable.
1548 stream.get_ref().peer_addr()?;
1549 Ok(stream)
1550 }
1551
1552 /// Creates an unnamed pair of connected UDS stream sockets.
1553 ///
1554 /// # Examples
1555 ///
1556 /// ```no_run
1557 /// use async_io::Async;
1558 /// use std::os::unix::net::UnixStream;
1559 ///
1560 /// # futures_lite::future::block_on(async {
1561 /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1562 /// # std::io::Result::Ok(()) });
1563 /// ```
pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)>1564 pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1565 let (stream1, stream2) = UnixStream::pair()?;
1566 Ok((Async::new(stream1)?, Async::new(stream2)?))
1567 }
1568 }
1569
1570 #[cfg(unix)]
1571 impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1572 type Error = io::Error;
1573
try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self>1574 fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1575 Async::new(stream)
1576 }
1577 }
1578
1579 #[cfg(unix)]
1580 impl Async<UnixDatagram> {
1581 /// Creates a UDS datagram socket bound to the specified path.
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```no_run
1586 /// use async_io::Async;
1587 /// use std::os::unix::net::UnixDatagram;
1588 ///
1589 /// # futures_lite::future::block_on(async {
1590 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1591 /// # std::io::Result::Ok(()) });
1592 /// ```
bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>>1593 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1594 let path = path.as_ref().to_owned();
1595 Async::new(UnixDatagram::bind(path)?)
1596 }
1597
1598 /// Creates a UDS datagram socket not bound to any address.
1599 ///
1600 /// # Examples
1601 ///
1602 /// ```no_run
1603 /// use async_io::Async;
1604 /// use std::os::unix::net::UnixDatagram;
1605 ///
1606 /// # futures_lite::future::block_on(async {
1607 /// let socket = Async::<UnixDatagram>::unbound()?;
1608 /// # std::io::Result::Ok(()) });
1609 /// ```
unbound() -> io::Result<Async<UnixDatagram>>1610 pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1611 Async::new(UnixDatagram::unbound()?)
1612 }
1613
1614 /// Creates an unnamed pair of connected Unix datagram sockets.
1615 ///
1616 /// # Examples
1617 ///
1618 /// ```no_run
1619 /// use async_io::Async;
1620 /// use std::os::unix::net::UnixDatagram;
1621 ///
1622 /// # futures_lite::future::block_on(async {
1623 /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1624 /// # std::io::Result::Ok(()) });
1625 /// ```
pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)>1626 pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1627 let (socket1, socket2) = UnixDatagram::pair()?;
1628 Ok((Async::new(socket1)?, Async::new(socket2)?))
1629 }
1630
1631 /// Receives data from the socket.
1632 ///
1633 /// Returns the number of bytes read and the address the message came from.
1634 ///
1635 /// # Examples
1636 ///
1637 /// ```no_run
1638 /// use async_io::Async;
1639 /// use std::os::unix::net::UnixDatagram;
1640 ///
1641 /// # futures_lite::future::block_on(async {
1642 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1643 ///
1644 /// let mut buf = [0u8; 1024];
1645 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1646 /// # std::io::Result::Ok(()) });
1647 /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)>1648 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1649 self.read_with(|io| io.recv_from(buf)).await
1650 }
1651
1652 /// Sends data to the specified address.
1653 ///
1654 /// Returns the number of bytes written.
1655 ///
1656 /// # Examples
1657 ///
1658 /// ```no_run
1659 /// use async_io::Async;
1660 /// use std::os::unix::net::UnixDatagram;
1661 ///
1662 /// # futures_lite::future::block_on(async {
1663 /// let socket = Async::<UnixDatagram>::unbound()?;
1664 ///
1665 /// let msg = b"hello";
1666 /// let addr = "/tmp/socket";
1667 /// let len = socket.send_to(msg, addr).await?;
1668 /// # std::io::Result::Ok(()) });
1669 /// ```
send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize>1670 pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1671 self.write_with(|io| io.send_to(buf, &path)).await
1672 }
1673
1674 /// Receives data from the connected peer.
1675 ///
1676 /// Returns the number of bytes read and the address the message came from.
1677 ///
1678 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1679 /// This method will fail if the socket is not connected.
1680 ///
1681 /// # Examples
1682 ///
1683 /// ```no_run
1684 /// use async_io::Async;
1685 /// use std::os::unix::net::UnixDatagram;
1686 ///
1687 /// # futures_lite::future::block_on(async {
1688 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1689 /// socket.get_ref().connect("/tmp/socket2")?;
1690 ///
1691 /// let mut buf = [0u8; 1024];
1692 /// let len = socket.recv(&mut buf).await?;
1693 /// # std::io::Result::Ok(()) });
1694 /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>1695 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1696 self.read_with(|io| io.recv(buf)).await
1697 }
1698
1699 /// Sends data to the connected peer.
1700 ///
1701 /// Returns the number of bytes written.
1702 ///
1703 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1704 /// This method will fail if the socket is not connected.
1705 ///
1706 /// # Examples
1707 ///
1708 /// ```no_run
1709 /// use async_io::Async;
1710 /// use std::os::unix::net::UnixDatagram;
1711 ///
1712 /// # futures_lite::future::block_on(async {
1713 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1714 /// socket.get_ref().connect("/tmp/socket2")?;
1715 ///
1716 /// let msg = b"hello";
1717 /// let len = socket.send(msg).await?;
1718 /// # std::io::Result::Ok(()) });
1719 /// ```
send(&self, buf: &[u8]) -> io::Result<usize>1720 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1721 self.write_with(|io| io.send(buf)).await
1722 }
1723 }
1724
1725 #[cfg(unix)]
1726 impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
1727 type Error = io::Error;
1728
try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self>1729 fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
1730 Async::new(socket)
1731 }
1732 }
1733
1734 /// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()>1735 async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
1736 let mut polled = false;
1737 pin!(fut);
1738
1739 future::poll_fn(|cx| {
1740 if !polled {
1741 polled = true;
1742 fut.as_mut().poll(cx)
1743 } else {
1744 Poll::Ready(Ok(()))
1745 }
1746 })
1747 .await
1748 }
1749
connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket>1750 fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket> {
1751 let sock_type = Type::STREAM;
1752 #[cfg(any(
1753 target_os = "android",
1754 target_os = "dragonfly",
1755 target_os = "freebsd",
1756 target_os = "fuchsia",
1757 target_os = "illumos",
1758 target_os = "linux",
1759 target_os = "netbsd",
1760 target_os = "openbsd"
1761 ))]
1762 // If we can, set nonblocking at socket creation for unix
1763 let sock_type = sock_type.nonblocking();
1764 // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
1765 let socket = Socket::new(domain, sock_type, protocol)?;
1766 #[cfg(not(any(
1767 target_os = "android",
1768 target_os = "dragonfly",
1769 target_os = "freebsd",
1770 target_os = "fuchsia",
1771 target_os = "illumos",
1772 target_os = "linux",
1773 target_os = "netbsd",
1774 target_os = "openbsd"
1775 )))]
1776 // If the current platform doesn't support nonblocking at creation, enable it after creation
1777 socket.set_nonblocking(true)?;
1778 match socket.connect(&addr) {
1779 Ok(_) => {}
1780 #[cfg(unix)]
1781 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
1782 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1783 Err(err) => return Err(err),
1784 }
1785 Ok(socket)
1786 }
1787