1 //! A thread pool for isolating blocking I/O in async programs.
2 //!
3 //! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4 //! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5 //! solutions, they're not always available or ideal.
6 //!
7 //! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8 //! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9 //! current number of running I/O jobs.
10 //!
11 //! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12 //! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13 //! next job or shuts down after a certain timeout.
14 //!
15 //! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
16 //! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
17 //! [io_uring]: https://lwn.net/Articles/776703
18 //!
19 //! # Examples
20 //!
21 //! Read the contents of a file:
22 //!
23 //! ```no_run
24 //! use blocking::unblock;
25 //! use futures_lite::*;
26 //! use std::fs;
27 //!
28 //! # future::block_on(async {
29 //! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
30 //! println!("{}", contents);
31 //! # io::Result::Ok(()) });
32 //! ```
33 //!
34 //! Read a file and pipe its contents to stdout:
35 //!
36 //! ```no_run
37 //! use blocking::{unblock, Unblock};
38 //! use futures_lite::*;
39 //! use std::fs::File;
40 //!
41 //! # future::block_on(async {
42 //! let input = unblock(|| File::open("file.txt")).await?;
43 //! let input = Unblock::new(input);
44 //! let mut output = Unblock::new(std::io::stdout());
45 //!
46 //! io::copy(input, &mut output).await?;
47 //! # io::Result::Ok(()) });
48 //! ```
49 //!
50 //! Iterate over the contents of a directory:
51 //!
52 //! ```no_run
53 //! use blocking::Unblock;
54 //! use futures_lite::*;
55 //! use std::fs;
56 //!
57 //! # future::block_on(async {
58 //! let mut dir = Unblock::new(fs::read_dir(".")?);
59 //! while let Some(item) = dir.next().await {
60 //!     println!("{}", item?.file_name().to_string_lossy());
61 //! }
62 //! # io::Result::Ok(()) });
63 //! ```
64 //!
65 //! Spawn a process:
66 //!
67 //! ```no_run
68 //! use blocking::unblock;
69 //! use std::process::Command;
70 //!
71 //! # futures_lite::future::block_on(async {
72 //! let out = unblock(|| Command::new("dir").output()).await?;
73 //! # std::io::Result::Ok(()) });
74 //! ```
75 
76 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
77 
78 use std::any::Any;
79 use std::collections::VecDeque;
80 use std::fmt;
81 use std::io::{self, Read, Seek, SeekFrom, Write};
82 use std::mem;
83 use std::panic;
84 use std::pin::Pin;
85 use std::slice;
86 use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
87 use std::sync::{Arc, Condvar, Mutex, MutexGuard};
88 use std::task::{Context, Poll};
89 use std::thread;
90 use std::time::Duration;
91 
92 use async_channel::{bounded, Receiver};
93 use atomic_waker::AtomicWaker;
94 use futures_lite::*;
95 use once_cell::sync::Lazy;
96 use waker_fn::waker_fn;
97 
98 /// Retrieves the output of a spawned future.
99 type Task<T> = Receiver<T>;
100 
101 /// A spawned future and its current state.
102 ///
103 /// How this works was explained in a [blog post].
104 ///
105 /// [blog post]: https://stjepang.github.io/2020/01/31/build-your-own-executor.html
106 struct Runnable {
107     /// Current state of the task.
108     state: AtomicUsize,
109 
110     /// The inner future.
111     future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
112 }
113 
114 impl Runnable {
115     /// Runs the task.
run(self: Arc<Runnable>)116     fn run(self: Arc<Runnable>) {
117         // Set if the task has been woken.
118         const WOKEN: usize = 0b01;
119         // Set if the task is currently running.
120         const RUNNING: usize = 0b10;
121 
122         // The state is now "not woken" and "running".
123         self.state.store(RUNNING, Ordering::SeqCst);
124 
125         // Poll the future.
126         let this = self.clone();
127         let waker = waker_fn(move || {
128             if this.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
129                 EXECUTOR.schedule(this.clone());
130             }
131         });
132         let cx = &mut Context::from_waker(&waker);
133         let poll = self.future.try_lock().unwrap().as_mut().poll(cx);
134 
135         // If the future hasn't completed and was woken while running, then reschedule it.
136         if poll.is_pending() {
137             if self.state.fetch_and(!RUNNING, Ordering::SeqCst) == WOKEN | RUNNING {
138                 EXECUTOR.schedule(self);
139             }
140         }
141     }
142 }
143 
144 /// Lazily initialized global executor.
145 static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor {
146     inner: Mutex::new(Inner {
147         idle_count: 0,
148         thread_count: 0,
149         queue: VecDeque::new(),
150     }),
151     cvar: Condvar::new(),
152 });
153 
154 /// The blocking executor.
155 struct Executor {
156     /// Inner state of the executor.
157     inner: Mutex<Inner>,
158 
159     /// Used to put idle threads to sleep and wake them up when new work comes in.
160     cvar: Condvar,
161 }
162 
163 /// Inner state of the blocking executor.
164 struct Inner {
165     /// Number of idle threads in the pool.
166     ///
167     /// Idle threads are sleeping, waiting to get a task to run.
168     idle_count: usize,
169 
170     /// Total number of threads in the pool.
171     ///
172     /// This is the number of idle threads + the number of active threads.
173     thread_count: usize,
174 
175     /// The queue of blocking tasks.
176     queue: VecDeque<Arc<Runnable>>,
177 }
178 
179 impl Executor {
180     /// Spawns a future onto this executor.
181     ///
182     /// Returns a [`Task`] handle for the spawned task.
spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T>183     fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
184         // Wrap the future into one that sends the output into a channel.
185         let (s, r) = bounded(1);
186         let future = async move {
187             let _ = s.send(future.await).await;
188         };
189 
190         // Create a task and schedule it for execution.
191         let runnable = Arc::new(Runnable {
192             state: AtomicUsize::new(0),
193             future: Mutex::new(Box::pin(future)),
194         });
195         EXECUTOR.schedule(runnable);
196 
197         r
198     }
199 
200     /// Runs the main loop on the current thread.
201     ///
202     /// This function runs blocking tasks until it becomes idle and times out.
main_loop(&'static self)203     fn main_loop(&'static self) {
204         let mut inner = self.inner.lock().unwrap();
205         loop {
206             // This thread is not idle anymore because it's going to run tasks.
207             inner.idle_count -= 1;
208 
209             // Run tasks in the queue.
210             while let Some(runnable) = inner.queue.pop_front() {
211                 // We have found a task - grow the pool if needed.
212                 self.grow_pool(inner);
213 
214                 // Run the task.
215                 let _ = panic::catch_unwind(|| runnable.run());
216 
217                 // Re-lock the inner state and continue.
218                 inner = self.inner.lock().unwrap();
219             }
220 
221             // This thread is now becoming idle.
222             inner.idle_count += 1;
223 
224             // Put the thread to sleep until another task is scheduled.
225             let timeout = Duration::from_millis(500);
226             let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
227             inner = lock;
228 
229             // If there are no tasks after a while, stop this thread.
230             if res.timed_out() && inner.queue.is_empty() {
231                 inner.idle_count -= 1;
232                 inner.thread_count -= 1;
233                 break;
234             }
235         }
236     }
237 
238     /// Schedules a runnable task for execution.
schedule(&'static self, runnable: Arc<Runnable>)239     fn schedule(&'static self, runnable: Arc<Runnable>) {
240         let mut inner = self.inner.lock().unwrap();
241         inner.queue.push_back(runnable);
242 
243         // Notify a sleeping thread and spawn more threads if needed.
244         self.cvar.notify_one();
245         self.grow_pool(inner);
246     }
247 
248     /// Spawns more blocking threads if the pool is overloaded with work.
grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>)249     fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
250         // If runnable tasks greatly outnumber idle threads and there aren't too many threads
251         // already, then be aggressive: wake all idle threads and spawn one more thread.
252         while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < 500 {
253             // The new thread starts in idle state.
254             inner.idle_count += 1;
255             inner.thread_count += 1;
256 
257             // Notify all existing idle threads because we need to hurry up.
258             self.cvar.notify_all();
259 
260             // Generate a new thread ID.
261             static ID: AtomicUsize = AtomicUsize::new(1);
262             let id = ID.fetch_add(1, Ordering::Relaxed);
263 
264             // Spawn the new thread.
265             thread::Builder::new()
266                 .name(format!("blocking-{}", id))
267                 .spawn(move || self.main_loop())
268                 .unwrap();
269         }
270     }
271 }
272 
273 /// Runs blocking code on a thread pool.
274 ///
275 /// # Examples
276 ///
277 /// Read the contents of a file:
278 ///
279 /// ```no_run
280 /// use blocking::unblock;
281 /// use std::fs;
282 ///
283 /// # futures_lite::future::block_on(async {
284 /// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
285 /// # std::io::Result::Ok(()) });
286 /// ```
287 ///
288 /// Spawn a process:
289 ///
290 /// ```no_run
291 /// use blocking::unblock;
292 /// use std::process::Command;
293 ///
294 /// # futures_lite::future::block_on(async {
295 /// let out = unblock(|| Command::new("dir").output()).await?;
296 /// # std::io::Result::Ok(()) });
297 /// ```
unblock<T, F>(f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static,298 pub async fn unblock<T, F>(f: F) -> T
299 where
300     F: FnOnce() -> T + Send + 'static,
301     T: Send + 'static,
302 {
303     Executor::spawn(async move { f() })
304         .recv()
305         .await
306         .expect("`unblock()` operation has panicked")
307 }
308 
309 /// Runs blocking code on a thread pool.
310 ///
311 /// # Desugaring
312 ///
313 /// Note that `unblock!(expr)` is syntax sugar for:
314 ///
315 /// ```ignore
316 /// unblock(move || expr).await
317 /// ```
318 ///
319 /// # Examples
320 ///
321 /// Read the contents of a file:
322 ///
323 /// ```no_run
324 /// use blocking::unblock;
325 /// use std::fs;
326 ///
327 /// # futures_lite::future::block_on(async {
328 /// let contents = unblock!(fs::read_to_string("file.txt"))?;
329 /// # std::io::Result::Ok(()) });
330 /// ```
331 ///
332 /// Spawn a process:
333 ///
334 /// ```no_run
335 /// use blocking::unblock;
336 /// use std::process::Command;
337 ///
338 /// # futures_lite::future::block_on(async {
339 /// let out = unblock!(Command::new("dir").output())?;
340 /// # std::io::Result::Ok(()) });
341 /// ```
342 #[macro_export]
343 macro_rules! unblock {
344     ($($code:tt)*) => {
345         $crate::unblock(move || { $($code)* }).await
346     };
347 }
348 
349 /// Runs blocking I/O on a thread pool.
350 ///
351 /// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
352 /// special thread pool while exposing a familiar async interface.
353 ///
354 /// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
355 /// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
356 ///
357 /// # Notes
358 ///
359 /// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
360 /// [`Unblock`] handle or some buffered data might get lost.
361 ///
362 /// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
363 /// async read/write operation simply receives/sends some bytes from/into the pipe. On the other
364 /// side of the pipe, the inner I/O handle reads bytes in advance until the pipe is full, and it
365 /// writes all bytes received through the pipe.
366 ///
367 /// This kind of buffering has some interesting consequences. If [`Unblock`] wraps a
368 /// [`File`][`std::fs::File`], note that a single read operation may move the file cursor farther
369 /// than is the span of the operation! That's because reading happens in the background until the
370 /// pipe gets full - blocking reads do not follow async reads byte-for-byte.
371 ///
372 /// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use blocking::Unblock;
378 /// use futures_lite::*;
379 ///
380 /// # futures_lite::future::block_on(async {
381 /// let mut stdout = Unblock::new(std::io::stdout());
382 /// stdout.write_all(b"Hello world!").await?;
383 /// stdout.flush().await?;
384 /// # std::io::Result::Ok(()) });
385 /// ```
386 pub struct Unblock<T> {
387     state: State<T>,
388     cap: Option<usize>,
389 }
390 
391 impl<T> Unblock<T> {
392     /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
393     ///
394     /// # Examples
395     ///
396     /// ```no_run
397     /// use blocking::Unblock;
398     ///
399     /// let stdin = Unblock::new(std::io::stdin());
400     /// ```
new(io: T) -> Unblock<T>401     pub fn new(io: T) -> Unblock<T> {
402         Unblock {
403             state: State::Idle(Some(Box::new(io))),
404             cap: None,
405         }
406     }
407 
408     /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
409     /// capacity.
410     ///
411     /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
412     /// transferred between blocking and async code goes through a buffer of limited capacity. This
413     /// constructor configures that capacity.
414     ///
415     /// The default capacity is:
416     ///
417     /// * For [`Iterator`] types: 8192 items.
418     /// * For [`Read`]/[`Write`] types: 8 MB.
with_capacity(cap: usize, io: T) -> Unblock<T>419     pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
420         Unblock {
421             state: State::Idle(Some(Box::new(io))),
422             cap: Some(cap),
423         }
424     }
425 
426     /// Gets a mutable reference to the blocking I/O handle.
427     ///
428     /// This is an async method because the I/O handle might be on the thread pool and needs to
429     /// be moved onto the current thread before we can get a reference to it.
430     ///
431     /// # Examples
432     ///
433     /// ```no_run
434     /// use blocking::{unblock, Unblock};
435     /// use std::fs::File;
436     ///
437     /// # futures_lite::future::block_on(async {
438     /// let file = unblock(|| File::create("file.txt")).await?;
439     /// let mut file = Unblock::new(file);
440     ///
441     /// let metadata = file.get_mut().await.metadata()?;
442     /// # std::io::Result::Ok(()) });
443     /// ```
get_mut(&mut self) -> &mut T444     pub async fn get_mut(&mut self) -> &mut T {
445         // Wait for the running task to stop and ignore I/O errors if there are any.
446         let _ = future::poll_fn(|cx| self.poll_stop(cx)).await;
447 
448         // Assume idle state and get a reference to the inner value.
449         match &mut self.state {
450             State::Idle(t) => t.as_mut().expect("inner value was taken out"),
451             State::WithMut(..)
452             | State::Streaming(..)
453             | State::Reading(..)
454             | State::Writing(..)
455             | State::Seeking(..) => {
456                 unreachable!("when stopped, the state machine must be in idle state");
457             }
458         }
459     }
460 
461     /// Performs a blocking operation on the I/O handle.
462     ///
463     /// # Examples
464     ///
465     /// ```no_run
466     /// use blocking::{unblock, Unblock};
467     /// use std::fs::File;
468     ///
469     /// # futures_lite::future::block_on(async {
470     /// let file = unblock(|| File::create("file.txt")).await?;
471     /// let mut file = Unblock::new(file);
472     ///
473     /// let metadata = file.with_mut(|f| f.metadata()).await?;
474     /// # std::io::Result::Ok(()) });
475     /// ```
with_mut<R, F>(&mut self, op: F) -> R where F: FnOnce(&mut T) -> R + Send + 'static, R: Send + 'static, T: Send + 'static,476     pub async fn with_mut<R, F>(&mut self, op: F) -> R
477     where
478         F: FnOnce(&mut T) -> R + Send + 'static,
479         R: Send + 'static,
480         T: Send + 'static,
481     {
482         // Wait for the running task to stop and ignore I/O errors if there are any.
483         let _ = future::poll_fn(|cx| self.poll_stop(cx)).await;
484 
485         // Assume idle state and take out the inner value.
486         let mut t = match &mut self.state {
487             State::Idle(t) => t.take().expect("inner value was taken out"),
488             State::WithMut(..)
489             | State::Streaming(..)
490             | State::Reading(..)
491             | State::Writing(..)
492             | State::Seeking(..) => {
493                 unreachable!("when stopped, the state machine must be in idle state");
494             }
495         };
496 
497         let (sender, receiver) = bounded(1);
498         let task = Executor::spawn(async move {
499             let _ = sender.try_send(op(&mut t));
500             t
501         });
502         self.state = State::WithMut(task);
503 
504         receiver
505             .recv()
506             .await
507             .expect("`Unblock::with_mut()` operation has panicked")
508     }
509 
510     /// Extracts the inner blocking I/O handle.
511     ///
512     /// This is an async method because the I/O handle might be on the thread pool and needs to
513     /// be moved onto the current thread before we can extract it.
514     ///
515     /// # Examples
516     ///
517     /// ```no_run
518     /// use blocking::{unblock, Unblock};
519     /// use futures_lite::*;
520     /// use std::fs::File;
521     ///
522     /// # futures_lite::future::block_on(async {
523     /// let file = unblock(|| File::create("file.txt")).await?;
524     /// let file = Unblock::new(file);
525     ///
526     /// let file = file.into_inner().await;
527     /// # std::io::Result::Ok(()) });
528     /// ```
into_inner(self) -> T529     pub async fn into_inner(self) -> T {
530         // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
531         // bind `self` to a local mutable variable.
532         let mut this = self;
533 
534         // Wait for the running task to stop and ignore I/O errors if there are any.
535         let _ = future::poll_fn(|cx| this.poll_stop(cx)).await;
536 
537         // Assume idle state and extract the inner value.
538         match &mut this.state {
539             State::Idle(t) => *t.take().expect("inner value was taken out"),
540             State::WithMut(..)
541             | State::Streaming(..)
542             | State::Reading(..)
543             | State::Writing(..)
544             | State::Seeking(..) => {
545                 unreachable!("when stopped, the state machine must be in idle state");
546             }
547         }
548     }
549 
550     /// Waits for the running task to stop.
551     ///
552     /// On success, the state machine is moved into the idle state.
poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>553     fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
554         loop {
555             match &mut self.state {
556                 State::Idle(_) => return Poll::Ready(Ok(())),
557 
558                 State::WithMut(task) => {
559                     // Poll the task to wait for it to finish.
560                     let io = ready!(Pin::new(task).poll_next(cx))
561                         .expect("`Unblock::with_mut()` operation has panicked");
562                     self.state = State::Idle(Some(io));
563                 }
564 
565                 State::Streaming(any, task) => {
566                     // Drop the receiver to close the channel. This stops the `send()` operation in
567                     // the task, after which the task returns the iterator back.
568                     any.take();
569 
570                     // Poll the task to retrieve the iterator.
571                     let iter = ready!(Pin::new(task).poll_next(cx))
572                         .expect("`Unblock` stream operation has panicked");
573                     self.state = State::Idle(Some(iter));
574                 }
575 
576                 State::Reading(reader, task) => {
577                     // Drop the reader to close the pipe. This stops copying inside the task, after
578                     // which the task returns the I/O handle back.
579                     reader.take();
580 
581                     // Poll the task to retrieve the I/O handle.
582                     let (res, io) = ready!(Pin::new(task).poll_next(cx))
583                         .expect("`Unblock` read operation has panicked");
584                     // Make sure to move into the idle state before reporting errors.
585                     self.state = State::Idle(Some(io));
586                     res?;
587                 }
588 
589                 State::Writing(writer, task) => {
590                     // Drop the writer to close the pipe. This stops copying inside the task, after
591                     // which the task flushes the I/O handle and
592                     writer.take();
593 
594                     // Poll the task to retrieve the I/O handle.
595                     let (res, io) = ready!(Pin::new(task).poll_next(cx))
596                         .expect("`Unblock` write operation has panicked");
597                     // Make sure to move into the idle state before reporting errors.
598                     self.state = State::Idle(Some(io));
599                     res?;
600                 }
601 
602                 State::Seeking(task) => {
603                     // Poll the task to wait for it to finish.
604                     let (_, res, io) = ready!(Pin::new(task).poll_next(cx))
605                         .expect("`Unblock` seek operation has panicked");
606                     // Make sure to move into the idle state before reporting errors.
607                     self.state = State::Idle(Some(io));
608                     res?;
609                 }
610             }
611         }
612     }
613 }
614 
615 impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result616     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617         struct Closed;
618         impl fmt::Debug for Closed {
619             fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
620                 f.write_str("<closed>")
621             }
622         }
623 
624         struct Blocked;
625         impl fmt::Debug for Blocked {
626             fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627                 f.write_str("<blocked>")
628             }
629         }
630 
631         match &self.state {
632             State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
633             State::Idle(Some(io)) => {
634                 let io: &T = &*io;
635                 f.debug_struct("Unblock").field("io", io).finish()
636             }
637             State::WithMut(..)
638             | State::Streaming(..)
639             | State::Reading(..)
640             | State::Writing(..)
641             | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
642         }
643     }
644 }
645 
646 /// Current state of a blocking task.
647 enum State<T> {
648     /// There is no blocking task.
649     ///
650     /// The inner value is readily available, unless it has already been extracted. The value is
651     /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
652     /// [`Unblock`].
653     Idle(Option<Box<T>>),
654 
655     /// A [`Unblock::with_mut()`] closure was spawned and is still running.
656     WithMut(Task<Box<T>>),
657 
658     /// The inner value is an [`Iterator`] currently iterating in a task.
659     ///
660     /// The `dyn Any` value here is a `mpsc::Receiver<<T as Iterator>::Item>`.
661     Streaming(Option<Box<dyn Any + Send>>, Task<Box<T>>),
662 
663     /// The inner value is a [`Read`] currently reading in a task.
664     Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
665 
666     /// The inner value is a [`Write`] currently writing in a task.
667     Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
668 
669     /// The inner value is a [`Seek`] currently seeking in a task.
670     Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
671 }
672 
673 impl<T: Iterator + Send + 'static> Stream for Unblock<T>
674 where
675     T::Item: Send + 'static,
676 {
677     type Item = T::Item;
678 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>679     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
680         loop {
681             match &mut self.state {
682                 // If not in idle or active streaming state, stop the running task.
683                 State::WithMut(..)
684                 | State::Streaming(None, _)
685                 | State::Reading(..)
686                 | State::Writing(..)
687                 | State::Seeking(..) => {
688                     // Wait for the running task to stop.
689                     let _ = ready!(self.poll_stop(cx));
690                 }
691 
692                 // If idle, start a streaming task.
693                 State::Idle(iter) => {
694                     // Take the iterator out to run it on a blocking task.
695                     let mut iter = iter.take().expect("inner iterator was taken out");
696 
697                     // This channel capacity seems to work well in practice. If it's too low, there
698                     // will be too much synchronization between tasks. If too high, memory
699                     // consumption increases.
700                     let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
701 
702                     // Spawn a blocking task that runs the iterator and returns it when done.
703                     let task = Executor::spawn(async move {
704                         for item in &mut iter {
705                             if sender.send(item).await.is_err() {
706                                 break;
707                             }
708                         }
709                         iter
710                     });
711 
712                     // Move into the busy state and poll again.
713                     self.state = State::Streaming(Some(Box::new(receiver)), task);
714                 }
715 
716                 // If streaming, receive an item.
717                 State::Streaming(Some(any), task) => {
718                     let receiver = any.downcast_mut::<Receiver<T::Item>>().unwrap();
719 
720                     // Poll the channel.
721                     let opt = ready!(Pin::new(receiver).poll_next(cx));
722 
723                     // If the channel is closed, retrieve the iterator back from the blocking task.
724                     // This is not really a required step, but it's cleaner to drop the iterator on
725                     // the same thread that created it.
726                     if opt.is_none() {
727                         // Poll the task to retrieve the iterator.
728                         let iter = ready!(Pin::new(task).poll_next(cx))
729                             .expect("`Unblock` stream operation has panicked");
730                         self.state = State::Idle(Some(iter));
731                     }
732 
733                     return Poll::Ready(opt);
734                 }
735             }
736         }
737     }
738 }
739 
740 impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>741     fn poll_read(
742         mut self: Pin<&mut Self>,
743         cx: &mut Context<'_>,
744         buf: &mut [u8],
745     ) -> Poll<io::Result<usize>> {
746         loop {
747             match &mut self.state {
748                 // If not in idle or active reading state, stop the running task.
749                 State::WithMut(..)
750                 | State::Reading(None, _)
751                 | State::Streaming(..)
752                 | State::Writing(..)
753                 | State::Seeking(..) => {
754                     // Wait for the running task to stop.
755                     ready!(self.poll_stop(cx))?;
756                 }
757 
758                 // If idle, start a reading task.
759                 State::Idle(io) => {
760                     // Take the I/O handle out to read it on a blocking task.
761                     let mut io = io.take().expect("inner value was taken out");
762 
763                     // This pipe capacity seems to work well in practice. If it's too low, there
764                     // will be too much synchronization between tasks. If too high, memory
765                     // consumption increases.
766                     let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
767 
768                     // Spawn a blocking task that reads and returns the I/O handle when done.
769                     let task = Executor::spawn(async move {
770                         // Copy bytes from the I/O handle into the pipe until the pipe is closed or
771                         // an error occurs.
772                         loop {
773                             match future::poll_fn(|cx| writer.fill(cx, &mut io)).await {
774                                 Ok(0) => return (Ok(()), io),
775                                 Ok(_) => {}
776                                 Err(err) => return (Err(err), io),
777                             }
778                         }
779                     });
780 
781                     // Move into the busy state and poll again.
782                     self.state = State::Reading(Some(reader), task);
783                 }
784 
785                 // If reading, read bytes from the pipe.
786                 State::Reading(Some(reader), task) => {
787                     // Poll the pipe.
788                     let n = ready!(reader.drain(cx, buf))?;
789 
790                     // If the pipe is closed, retrieve the I/O handle back from the blocking task.
791                     // This is not really a required step, but it's cleaner to drop the handle on
792                     // the same thread that created it.
793                     if n == 0 {
794                         // Poll the task to retrieve the I/O handle.
795                         let (res, io) = ready!(Pin::new(task).poll_next(cx))
796                             .expect("`Unblock` read operation has panicked");
797                         // Make sure to move into the idle state before reporting errors.
798                         self.state = State::Idle(Some(io));
799                         res?;
800                     }
801 
802                     return Poll::Ready(Ok(n));
803                 }
804             }
805         }
806     }
807 }
808 
809 impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>810     fn poll_write(
811         mut self: Pin<&mut Self>,
812         cx: &mut Context<'_>,
813         buf: &[u8],
814     ) -> Poll<io::Result<usize>> {
815         loop {
816             match &mut self.state {
817                 // If not in idle or active writing state, stop the running task.
818                 State::WithMut(..)
819                 | State::Writing(None, _)
820                 | State::Streaming(..)
821                 | State::Reading(..)
822                 | State::Seeking(..) => {
823                     // Wait for the running task to stop.
824                     ready!(self.poll_stop(cx))?;
825                 }
826 
827                 // If idle, start the writing task.
828                 State::Idle(io) => {
829                     // Take the I/O handle out to write on a blocking task.
830                     let mut io = io.take().expect("inner value was taken out");
831 
832                     // This pipe capacity seems to work well in practice. If it's too low, there will
833                     // be too much synchronization between tasks. If too high, memory consumption
834                     // increases.
835                     let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
836 
837                     // Spawn a blocking task that writes and returns the I/O handle when done.
838                     let task = Executor::spawn(async move {
839                         // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
840                         // error occurs. Flush the I/O handle at the end.
841                         loop {
842                             match future::poll_fn(|cx| reader.drain(cx, &mut io)).await {
843                                 Ok(0) => return (io.flush(), io),
844                                 Ok(_) => {}
845                                 Err(err) => {
846                                     let _ = io.flush();
847                                     return (Err(err), io);
848                                 }
849                             }
850                         }
851                     });
852 
853                     // Move into the busy state and poll again.
854                     self.state = State::Writing(Some(writer), task);
855                 }
856 
857                 // If writing, write more bytes into the pipe.
858                 State::Writing(Some(writer), _) => return writer.fill(cx, buf),
859             }
860         }
861     }
862 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>863     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
864         loop {
865             match &mut self.state {
866                 // If not in idle state, stop the running task.
867                 State::WithMut(..)
868                 | State::Streaming(..)
869                 | State::Writing(..)
870                 | State::Reading(..)
871                 | State::Seeking(..) => {
872                     // Wait for the running task to stop.
873                     ready!(self.poll_stop(cx))?;
874                 }
875 
876                 // Idle implies flushed.
877                 State::Idle(_) => return Poll::Ready(Ok(())),
878             }
879         }
880     }
881 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>882     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
883         // First, make sure the I/O handle is flushed.
884         ready!(Pin::new(&mut self).poll_flush(cx))?;
885 
886         // Then move into the idle state with no I/O handle, thus dropping it.
887         self.state = State::Idle(None);
888         Poll::Ready(Ok(()))
889     }
890 }
891 
892 impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>893     fn poll_seek(
894         mut self: Pin<&mut Self>,
895         cx: &mut Context<'_>,
896         pos: SeekFrom,
897     ) -> Poll<io::Result<u64>> {
898         loop {
899             match &mut self.state {
900                 // If not in idle state, stop the running task.
901                 State::WithMut(..)
902                 | State::Streaming(..)
903                 | State::Reading(..)
904                 | State::Writing(..) => {
905                     // Wait for the running task to stop.
906                     ready!(self.poll_stop(cx))?;
907                 }
908 
909                 State::Idle(io) => {
910                     // Take the I/O handle out to seek on a blocking task.
911                     let mut io = io.take().expect("inner value was taken out");
912 
913                     let task = Executor::spawn(async move {
914                         let res = io.seek(pos);
915                         (pos, res, io)
916                     });
917                     self.state = State::Seeking(task);
918                 }
919 
920                 State::Seeking(task) => {
921                     // Poll the task to wait for it to finish.
922                     let (original_pos, res, io) = ready!(Pin::new(task).poll_next(cx))
923                         .expect("`Unblock` seek operation has panicked");
924                     // Make sure to move into the idle state before reporting errors.
925                     self.state = State::Idle(Some(io));
926                     let current = res?;
927 
928                     // If the `pos` argument matches the original one, return the result.
929                     if original_pos == pos {
930                         return Poll::Ready(Ok(current));
931                     }
932                 }
933             }
934         }
935     }
936 }
937 
938 /// Creates a bounded single-producer single-consumer pipe.
939 ///
940 /// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
941 ///
942 /// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
943 /// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
944 ///
945 /// When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
946 /// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
pipe(cap: usize) -> (Reader, Writer)947 fn pipe(cap: usize) -> (Reader, Writer) {
948     assert!(cap > 0, "capacity must be positive");
949     assert!(cap.checked_mul(2).is_some(), "capacity is too large");
950 
951     // Allocate the ring buffer.
952     let mut v = Vec::with_capacity(cap);
953     let buffer = v.as_mut_ptr();
954     mem::forget(v);
955 
956     let inner = Arc::new(Pipe {
957         head: AtomicUsize::new(0),
958         tail: AtomicUsize::new(0),
959         reader: AtomicWaker::new(),
960         writer: AtomicWaker::new(),
961         closed: AtomicBool::new(false),
962         buffer,
963         cap,
964     });
965 
966     let r = Reader {
967         inner: inner.clone(),
968         head: 0,
969         tail: 0,
970     };
971 
972     let w = Writer {
973         inner,
974         head: 0,
975         tail: 0,
976         zeroed_until: 0,
977     };
978 
979     (r, w)
980 }
981 
982 /// The reading side of a pipe.
983 struct Reader {
984     /// The inner ring buffer.
985     inner: Arc<Pipe>,
986 
987     /// The head index, moved by the reader, in the range `0..2*cap`.
988     ///
989     /// This index always matches `inner.head`.
990     head: usize,
991 
992     /// The tail index, moved by the writer, in the range `0..2*cap`.
993     ///
994     /// This index is a snapshot of `index.tail` that might become stale at any point.
995     tail: usize,
996 }
997 
998 /// The writing side of a pipe.
999 struct Writer {
1000     /// The inner ring buffer.
1001     inner: Arc<Pipe>,
1002 
1003     /// The head index, moved by the reader, in the range `0..2*cap`.
1004     ///
1005     /// This index is a snapshot of `index.head` that might become stale at any point.
1006     head: usize,
1007 
1008     /// The tail index, moved by the writer, in the range `0..2*cap`.
1009     ///
1010     /// This index always matches `inner.tail`.
1011     tail: usize,
1012 
1013     /// How many bytes at the beginning of the buffer have been zeroed.
1014     ///
1015     /// The pipe allocates an uninitialized buffer, and we must be careful about passing
1016     /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
1017     /// expensive, so we zero it in smaller chunks as the writer makes progress.
1018     zeroed_until: usize,
1019 }
1020 
1021 unsafe impl Send for Reader {}
1022 unsafe impl Send for Writer {}
1023 
1024 /// The inner ring buffer.
1025 ///
1026 /// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
1027 /// `0..cap` range. The distance between head and tail indices is never more than `cap`.
1028 ///
1029 /// The reason why indices are not in the range `0..cap` is because we need to distinguish between
1030 /// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
1031 /// could mean the pipe is either empty or full, but we don't know which!
1032 struct Pipe {
1033     /// The head index, moved by the reader, in the range `0..2*cap`.
1034     head: AtomicUsize,
1035 
1036     /// The tail index, moved by the writer, in the range `0..2*cap`.
1037     tail: AtomicUsize,
1038 
1039     /// A waker representing the blocked reader.
1040     reader: AtomicWaker,
1041 
1042     /// A waker representing the blocked writer.
1043     writer: AtomicWaker,
1044 
1045     /// Set to `true` if the reader or writer was dropped.
1046     closed: AtomicBool,
1047 
1048     /// The byte buffer.
1049     buffer: *mut u8,
1050 
1051     /// The buffer capacity.
1052     cap: usize,
1053 }
1054 
1055 impl Drop for Pipe {
drop(&mut self)1056     fn drop(&mut self) {
1057         // Deallocate the byte buffer.
1058         unsafe {
1059             Vec::from_raw_parts(self.buffer, 0, self.cap);
1060         }
1061     }
1062 }
1063 
1064 impl Drop for Reader {
drop(&mut self)1065     fn drop(&mut self) {
1066         // Dropping closes the pipe and then wakes the writer.
1067         self.inner.closed.store(true, Ordering::SeqCst);
1068         self.inner.writer.wake();
1069     }
1070 }
1071 
1072 impl Drop for Writer {
drop(&mut self)1073     fn drop(&mut self) {
1074         // Dropping closes the pipe and then wakes the reader.
1075         self.inner.closed.store(true, Ordering::SeqCst);
1076         self.inner.reader.wake();
1077     }
1078 }
1079 
1080 impl Reader {
1081     /// Reads bytes from this reader and writes into blocking `dest`.
drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>>1082     fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>> {
1083         let cap = self.inner.cap;
1084 
1085         // Calculates the distance between two indices.
1086         let distance = |a: usize, b: usize| {
1087             if a <= b {
1088                 b - a
1089             } else {
1090                 2 * cap - (a - b)
1091             }
1092         };
1093 
1094         // If the pipe appears to be empty...
1095         if distance(self.head, self.tail) == 0 {
1096             // Reload the tail in case it's become stale.
1097             self.tail = self.inner.tail.load(Ordering::Acquire);
1098 
1099             // If the pipe is now really empty...
1100             if distance(self.head, self.tail) == 0 {
1101                 // Register the waker.
1102                 self.inner.reader.register(cx.waker());
1103                 atomic::fence(Ordering::SeqCst);
1104 
1105                 // Reload the tail after registering the waker.
1106                 self.tail = self.inner.tail.load(Ordering::Acquire);
1107 
1108                 // If the pipe is still empty...
1109                 if distance(self.head, self.tail) == 0 {
1110                     // Check whether the pipe is closed or just empty.
1111                     if self.inner.closed.load(Ordering::Relaxed) {
1112                         return Poll::Ready(Ok(0));
1113                     } else {
1114                         return Poll::Pending;
1115                     }
1116                 }
1117             }
1118         }
1119 
1120         // The pipe is not empty so remove the waker.
1121         self.inner.reader.take();
1122 
1123         // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1124         let real_index = |i: usize| {
1125             if i < cap {
1126                 i
1127             } else {
1128                 i - cap
1129             }
1130         };
1131 
1132         // Number of bytes read so far.
1133         let mut count = 0;
1134 
1135         loop {
1136             // Calculate how many bytes to read in this iteration.
1137             let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
1138                 .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
1139                 .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
1140 
1141             // Create a slice of data in the pipe buffer.
1142             let pipe_slice =
1143                 unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
1144 
1145             // Copy bytes from the pipe buffer into `dest`.
1146             let n = dest.write(pipe_slice)?;
1147             count += n;
1148 
1149             // If pipe is empty or `dest` is full, return.
1150             if n == 0 {
1151                 return Poll::Ready(Ok(count));
1152             }
1153 
1154             // Move the head forward.
1155             if self.head + n < 2 * cap {
1156                 self.head += n;
1157             } else {
1158                 self.head = 0;
1159             }
1160 
1161             // Store the current head index.
1162             self.inner.head.store(self.head, Ordering::Release);
1163 
1164             // Wake the writer because the pipe is not full.
1165             self.inner.writer.wake();
1166         }
1167     }
1168 }
1169 
1170 impl Writer {
1171     /// Reads bytes from blocking `src` and writes into this writer.
fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>>1172     fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>> {
1173         // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
1174         if self.inner.closed.load(Ordering::Relaxed) {
1175             return Poll::Ready(Ok(0));
1176         }
1177 
1178         // Calculates the distance between two indices.
1179         let cap = self.inner.cap;
1180         let distance = |a: usize, b: usize| {
1181             if a <= b {
1182                 b - a
1183             } else {
1184                 2 * cap - (a - b)
1185             }
1186         };
1187 
1188         // If the pipe appears to be full...
1189         if distance(self.head, self.tail) == cap {
1190             // Reload the head in case it's become stale.
1191             self.head = self.inner.head.load(Ordering::Acquire);
1192 
1193             // If the pipe is now really empty...
1194             if distance(self.head, self.tail) == cap {
1195                 // Register the waker.
1196                 self.inner.writer.register(cx.waker());
1197                 atomic::fence(Ordering::SeqCst);
1198 
1199                 // Reload the head after registering the waker.
1200                 self.head = self.inner.head.load(Ordering::Acquire);
1201 
1202                 // If the pipe is still full...
1203                 if distance(self.head, self.tail) == cap {
1204                     // Check whether the pipe is closed or just full.
1205                     if self.inner.closed.load(Ordering::Relaxed) {
1206                         return Poll::Ready(Ok(0));
1207                     } else {
1208                         return Poll::Pending;
1209                     }
1210                 }
1211             }
1212         }
1213 
1214         // The pipe is not full so remove the waker.
1215         self.inner.writer.take();
1216 
1217         // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1218         let real_index = |i: usize| {
1219             if i < cap {
1220                 i
1221             } else {
1222                 i - cap
1223             }
1224         };
1225 
1226         // Number of bytes written so far.
1227         let mut count = 0;
1228 
1229         loop {
1230             // Calculate how many bytes to write in this iteration.
1231             let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
1232                 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1233                 .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
1234                 .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
1235 
1236             // Create a slice of available space in the pipe buffer.
1237             let pipe_slice_mut = unsafe {
1238                 let from = real_index(self.tail);
1239                 let to = from + n;
1240 
1241                 // Make sure all bytes in the slice are initialized.
1242                 if self.zeroed_until < to {
1243                     self.inner
1244                         .buffer
1245                         .add(self.zeroed_until)
1246                         .write_bytes(0u8, to - self.zeroed_until);
1247                     self.zeroed_until = to;
1248                 }
1249 
1250                 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1251             };
1252 
1253             // Copy bytes from `src` into the piper buffer.
1254             let n = src.read(pipe_slice_mut)?;
1255             count += n;
1256 
1257             // If the pipe is full or closed, or `src` is empty, return.
1258             if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1259                 return Poll::Ready(Ok(count));
1260             }
1261 
1262             // Move the tail forward.
1263             if self.tail + n < 2 * cap {
1264                 self.tail += n;
1265             } else {
1266                 self.tail = 0;
1267             }
1268 
1269             // Store the current tail index.
1270             self.inner.tail.store(self.tail, Ordering::Release);
1271 
1272             // Wake the reader because the pipe is not empty.
1273             self.inner.reader.wake();
1274         }
1275     }
1276 }
1277