1 //! A thread pool for isolating blocking I/O in async programs.
2 //!
3 //! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4 //! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5 //! solutions, they're not always available or ideal.
6 //!
7 //! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8 //! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9 //! current number of running I/O jobs.
10 //!
11 //! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12 //! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13 //! next job or shuts down after a certain timeout.
14 //!
15 //! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
16 //! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
17 //! [io_uring]: https://lwn.net/Articles/776703
18 //!
19 //! # Examples
20 //!
21 //! Read the contents of a file:
22 //!
23 //! ```no_run
24 //! use blocking::unblock;
25 //! use futures_lite::*;
26 //! use std::fs;
27 //!
28 //! # future::block_on(async {
29 //! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
30 //! println!("{}", contents);
31 //! # io::Result::Ok(()) });
32 //! ```
33 //!
34 //! Read a file and pipe its contents to stdout:
35 //!
36 //! ```no_run
37 //! use blocking::{unblock, Unblock};
38 //! use futures_lite::*;
39 //! use std::fs::File;
40 //!
41 //! # future::block_on(async {
42 //! let input = unblock(|| File::open("file.txt")).await?;
43 //! let input = Unblock::new(input);
44 //! let mut output = Unblock::new(std::io::stdout());
45 //!
46 //! io::copy(input, &mut output).await?;
47 //! # io::Result::Ok(()) });
48 //! ```
49 //!
50 //! Iterate over the contents of a directory:
51 //!
52 //! ```no_run
53 //! use blocking::Unblock;
54 //! use futures_lite::*;
55 //! use std::fs;
56 //!
57 //! # future::block_on(async {
58 //! let mut dir = Unblock::new(fs::read_dir(".")?);
59 //! while let Some(item) = dir.next().await {
60 //! println!("{}", item?.file_name().to_string_lossy());
61 //! }
62 //! # io::Result::Ok(()) });
63 //! ```
64 //!
65 //! Spawn a process:
66 //!
67 //! ```no_run
68 //! use blocking::unblock;
69 //! use std::process::Command;
70 //!
71 //! # futures_lite::future::block_on(async {
72 //! let out = unblock(|| Command::new("dir").output()).await?;
73 //! # std::io::Result::Ok(()) });
74 //! ```
75
76 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
77
78 use std::any::Any;
79 use std::collections::VecDeque;
80 use std::fmt;
81 use std::io::{self, Read, Seek, SeekFrom, Write};
82 use std::mem;
83 use std::panic;
84 use std::pin::Pin;
85 use std::slice;
86 use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
87 use std::sync::{Arc, Condvar, Mutex, MutexGuard};
88 use std::task::{Context, Poll};
89 use std::thread;
90 use std::time::Duration;
91
92 use async_channel::{bounded, Receiver};
93 use atomic_waker::AtomicWaker;
94 use futures_lite::*;
95 use once_cell::sync::Lazy;
96 use waker_fn::waker_fn;
97
98 /// Retrieves the output of a spawned future.
99 type Task<T> = Receiver<T>;
100
101 /// A spawned future and its current state.
102 ///
103 /// How this works was explained in a [blog post].
104 ///
105 /// [blog post]: https://stjepang.github.io/2020/01/31/build-your-own-executor.html
106 struct Runnable {
107 /// Current state of the task.
108 state: AtomicUsize,
109
110 /// The inner future.
111 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
112 }
113
114 impl Runnable {
115 /// Runs the task.
run(self: Arc<Runnable>)116 fn run(self: Arc<Runnable>) {
117 // Set if the task has been woken.
118 const WOKEN: usize = 0b01;
119 // Set if the task is currently running.
120 const RUNNING: usize = 0b10;
121
122 // The state is now "not woken" and "running".
123 self.state.store(RUNNING, Ordering::SeqCst);
124
125 // Poll the future.
126 let this = self.clone();
127 let waker = waker_fn(move || {
128 if this.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
129 EXECUTOR.schedule(this.clone());
130 }
131 });
132 let cx = &mut Context::from_waker(&waker);
133 let poll = self.future.try_lock().unwrap().as_mut().poll(cx);
134
135 // If the future hasn't completed and was woken while running, then reschedule it.
136 if poll.is_pending() {
137 if self.state.fetch_and(!RUNNING, Ordering::SeqCst) == WOKEN | RUNNING {
138 EXECUTOR.schedule(self);
139 }
140 }
141 }
142 }
143
144 /// Lazily initialized global executor.
145 static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor {
146 inner: Mutex::new(Inner {
147 idle_count: 0,
148 thread_count: 0,
149 queue: VecDeque::new(),
150 }),
151 cvar: Condvar::new(),
152 });
153
154 /// The blocking executor.
155 struct Executor {
156 /// Inner state of the executor.
157 inner: Mutex<Inner>,
158
159 /// Used to put idle threads to sleep and wake them up when new work comes in.
160 cvar: Condvar,
161 }
162
163 /// Inner state of the blocking executor.
164 struct Inner {
165 /// Number of idle threads in the pool.
166 ///
167 /// Idle threads are sleeping, waiting to get a task to run.
168 idle_count: usize,
169
170 /// Total number of threads in the pool.
171 ///
172 /// This is the number of idle threads + the number of active threads.
173 thread_count: usize,
174
175 /// The queue of blocking tasks.
176 queue: VecDeque<Arc<Runnable>>,
177 }
178
179 impl Executor {
180 /// Spawns a future onto this executor.
181 ///
182 /// Returns a [`Task`] handle for the spawned task.
spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T>183 fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
184 // Wrap the future into one that sends the output into a channel.
185 let (s, r) = bounded(1);
186 let future = async move {
187 let _ = s.send(future.await).await;
188 };
189
190 // Create a task and schedule it for execution.
191 let runnable = Arc::new(Runnable {
192 state: AtomicUsize::new(0),
193 future: Mutex::new(Box::pin(future)),
194 });
195 EXECUTOR.schedule(runnable);
196
197 r
198 }
199
200 /// Runs the main loop on the current thread.
201 ///
202 /// This function runs blocking tasks until it becomes idle and times out.
main_loop(&'static self)203 fn main_loop(&'static self) {
204 let mut inner = self.inner.lock().unwrap();
205 loop {
206 // This thread is not idle anymore because it's going to run tasks.
207 inner.idle_count -= 1;
208
209 // Run tasks in the queue.
210 while let Some(runnable) = inner.queue.pop_front() {
211 // We have found a task - grow the pool if needed.
212 self.grow_pool(inner);
213
214 // Run the task.
215 let _ = panic::catch_unwind(|| runnable.run());
216
217 // Re-lock the inner state and continue.
218 inner = self.inner.lock().unwrap();
219 }
220
221 // This thread is now becoming idle.
222 inner.idle_count += 1;
223
224 // Put the thread to sleep until another task is scheduled.
225 let timeout = Duration::from_millis(500);
226 let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
227 inner = lock;
228
229 // If there are no tasks after a while, stop this thread.
230 if res.timed_out() && inner.queue.is_empty() {
231 inner.idle_count -= 1;
232 inner.thread_count -= 1;
233 break;
234 }
235 }
236 }
237
238 /// Schedules a runnable task for execution.
schedule(&'static self, runnable: Arc<Runnable>)239 fn schedule(&'static self, runnable: Arc<Runnable>) {
240 let mut inner = self.inner.lock().unwrap();
241 inner.queue.push_back(runnable);
242
243 // Notify a sleeping thread and spawn more threads if needed.
244 self.cvar.notify_one();
245 self.grow_pool(inner);
246 }
247
248 /// Spawns more blocking threads if the pool is overloaded with work.
grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>)249 fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
250 // If runnable tasks greatly outnumber idle threads and there aren't too many threads
251 // already, then be aggressive: wake all idle threads and spawn one more thread.
252 while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < 500 {
253 // The new thread starts in idle state.
254 inner.idle_count += 1;
255 inner.thread_count += 1;
256
257 // Notify all existing idle threads because we need to hurry up.
258 self.cvar.notify_all();
259
260 // Generate a new thread ID.
261 static ID: AtomicUsize = AtomicUsize::new(1);
262 let id = ID.fetch_add(1, Ordering::Relaxed);
263
264 // Spawn the new thread.
265 thread::Builder::new()
266 .name(format!("blocking-{}", id))
267 .spawn(move || self.main_loop())
268 .unwrap();
269 }
270 }
271 }
272
273 /// Runs blocking code on a thread pool.
274 ///
275 /// # Examples
276 ///
277 /// Read the contents of a file:
278 ///
279 /// ```no_run
280 /// use blocking::unblock;
281 /// use std::fs;
282 ///
283 /// # futures_lite::future::block_on(async {
284 /// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
285 /// # std::io::Result::Ok(()) });
286 /// ```
287 ///
288 /// Spawn a process:
289 ///
290 /// ```no_run
291 /// use blocking::unblock;
292 /// use std::process::Command;
293 ///
294 /// # futures_lite::future::block_on(async {
295 /// let out = unblock(|| Command::new("dir").output()).await?;
296 /// # std::io::Result::Ok(()) });
297 /// ```
unblock<T, F>(f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static,298 pub async fn unblock<T, F>(f: F) -> T
299 where
300 F: FnOnce() -> T + Send + 'static,
301 T: Send + 'static,
302 {
303 Executor::spawn(async move { f() })
304 .recv()
305 .await
306 .expect("`unblock()` operation has panicked")
307 }
308
309 /// Runs blocking code on a thread pool.
310 ///
311 /// # Desugaring
312 ///
313 /// Note that `unblock!(expr)` is syntax sugar for:
314 ///
315 /// ```ignore
316 /// unblock(move || expr).await
317 /// ```
318 ///
319 /// # Examples
320 ///
321 /// Read the contents of a file:
322 ///
323 /// ```no_run
324 /// use blocking::unblock;
325 /// use std::fs;
326 ///
327 /// # futures_lite::future::block_on(async {
328 /// let contents = unblock!(fs::read_to_string("file.txt"))?;
329 /// # std::io::Result::Ok(()) });
330 /// ```
331 ///
332 /// Spawn a process:
333 ///
334 /// ```no_run
335 /// use blocking::unblock;
336 /// use std::process::Command;
337 ///
338 /// # futures_lite::future::block_on(async {
339 /// let out = unblock!(Command::new("dir").output())?;
340 /// # std::io::Result::Ok(()) });
341 /// ```
342 #[macro_export]
343 macro_rules! unblock {
344 ($($code:tt)*) => {
345 $crate::unblock(move || { $($code)* }).await
346 };
347 }
348
349 /// Runs blocking I/O on a thread pool.
350 ///
351 /// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
352 /// special thread pool while exposing a familiar async interface.
353 ///
354 /// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
355 /// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
356 ///
357 /// # Notes
358 ///
359 /// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
360 /// [`Unblock`] handle or some buffered data might get lost.
361 ///
362 /// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
363 /// async read/write operation simply receives/sends some bytes from/into the pipe. On the other
364 /// side of the pipe, the inner I/O handle reads bytes in advance until the pipe is full, and it
365 /// writes all bytes received through the pipe.
366 ///
367 /// This kind of buffering has some interesting consequences. If [`Unblock`] wraps a
368 /// [`File`][`std::fs::File`], note that a single read operation may move the file cursor farther
369 /// than is the span of the operation! That's because reading happens in the background until the
370 /// pipe gets full - blocking reads do not follow async reads byte-for-byte.
371 ///
372 /// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use blocking::Unblock;
378 /// use futures_lite::*;
379 ///
380 /// # futures_lite::future::block_on(async {
381 /// let mut stdout = Unblock::new(std::io::stdout());
382 /// stdout.write_all(b"Hello world!").await?;
383 /// stdout.flush().await?;
384 /// # std::io::Result::Ok(()) });
385 /// ```
386 pub struct Unblock<T> {
387 state: State<T>,
388 cap: Option<usize>,
389 }
390
391 impl<T> Unblock<T> {
392 /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use blocking::Unblock;
398 ///
399 /// let stdin = Unblock::new(std::io::stdin());
400 /// ```
new(io: T) -> Unblock<T>401 pub fn new(io: T) -> Unblock<T> {
402 Unblock {
403 state: State::Idle(Some(Box::new(io))),
404 cap: None,
405 }
406 }
407
408 /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
409 /// capacity.
410 ///
411 /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
412 /// transferred between blocking and async code goes through a buffer of limited capacity. This
413 /// constructor configures that capacity.
414 ///
415 /// The default capacity is:
416 ///
417 /// * For [`Iterator`] types: 8192 items.
418 /// * For [`Read`]/[`Write`] types: 8 MB.
with_capacity(cap: usize, io: T) -> Unblock<T>419 pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
420 Unblock {
421 state: State::Idle(Some(Box::new(io))),
422 cap: Some(cap),
423 }
424 }
425
426 /// Gets a mutable reference to the blocking I/O handle.
427 ///
428 /// This is an async method because the I/O handle might be on the thread pool and needs to
429 /// be moved onto the current thread before we can get a reference to it.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use blocking::{unblock, Unblock};
435 /// use std::fs::File;
436 ///
437 /// # futures_lite::future::block_on(async {
438 /// let file = unblock(|| File::create("file.txt")).await?;
439 /// let mut file = Unblock::new(file);
440 ///
441 /// let metadata = file.get_mut().await.metadata()?;
442 /// # std::io::Result::Ok(()) });
443 /// ```
get_mut(&mut self) -> &mut T444 pub async fn get_mut(&mut self) -> &mut T {
445 // Wait for the running task to stop and ignore I/O errors if there are any.
446 let _ = future::poll_fn(|cx| self.poll_stop(cx)).await;
447
448 // Assume idle state and get a reference to the inner value.
449 match &mut self.state {
450 State::Idle(t) => t.as_mut().expect("inner value was taken out"),
451 State::WithMut(..)
452 | State::Streaming(..)
453 | State::Reading(..)
454 | State::Writing(..)
455 | State::Seeking(..) => {
456 unreachable!("when stopped, the state machine must be in idle state");
457 }
458 }
459 }
460
461 /// Performs a blocking operation on the I/O handle.
462 ///
463 /// # Examples
464 ///
465 /// ```no_run
466 /// use blocking::{unblock, Unblock};
467 /// use std::fs::File;
468 ///
469 /// # futures_lite::future::block_on(async {
470 /// let file = unblock(|| File::create("file.txt")).await?;
471 /// let mut file = Unblock::new(file);
472 ///
473 /// let metadata = file.with_mut(|f| f.metadata()).await?;
474 /// # std::io::Result::Ok(()) });
475 /// ```
with_mut<R, F>(&mut self, op: F) -> R where F: FnOnce(&mut T) -> R + Send + 'static, R: Send + 'static, T: Send + 'static,476 pub async fn with_mut<R, F>(&mut self, op: F) -> R
477 where
478 F: FnOnce(&mut T) -> R + Send + 'static,
479 R: Send + 'static,
480 T: Send + 'static,
481 {
482 // Wait for the running task to stop and ignore I/O errors if there are any.
483 let _ = future::poll_fn(|cx| self.poll_stop(cx)).await;
484
485 // Assume idle state and take out the inner value.
486 let mut t = match &mut self.state {
487 State::Idle(t) => t.take().expect("inner value was taken out"),
488 State::WithMut(..)
489 | State::Streaming(..)
490 | State::Reading(..)
491 | State::Writing(..)
492 | State::Seeking(..) => {
493 unreachable!("when stopped, the state machine must be in idle state");
494 }
495 };
496
497 let (sender, receiver) = bounded(1);
498 let task = Executor::spawn(async move {
499 let _ = sender.try_send(op(&mut t));
500 t
501 });
502 self.state = State::WithMut(task);
503
504 receiver
505 .recv()
506 .await
507 .expect("`Unblock::with_mut()` operation has panicked")
508 }
509
510 /// Extracts the inner blocking I/O handle.
511 ///
512 /// This is an async method because the I/O handle might be on the thread pool and needs to
513 /// be moved onto the current thread before we can extract it.
514 ///
515 /// # Examples
516 ///
517 /// ```no_run
518 /// use blocking::{unblock, Unblock};
519 /// use futures_lite::*;
520 /// use std::fs::File;
521 ///
522 /// # futures_lite::future::block_on(async {
523 /// let file = unblock(|| File::create("file.txt")).await?;
524 /// let file = Unblock::new(file);
525 ///
526 /// let file = file.into_inner().await;
527 /// # std::io::Result::Ok(()) });
528 /// ```
into_inner(self) -> T529 pub async fn into_inner(self) -> T {
530 // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
531 // bind `self` to a local mutable variable.
532 let mut this = self;
533
534 // Wait for the running task to stop and ignore I/O errors if there are any.
535 let _ = future::poll_fn(|cx| this.poll_stop(cx)).await;
536
537 // Assume idle state and extract the inner value.
538 match &mut this.state {
539 State::Idle(t) => *t.take().expect("inner value was taken out"),
540 State::WithMut(..)
541 | State::Streaming(..)
542 | State::Reading(..)
543 | State::Writing(..)
544 | State::Seeking(..) => {
545 unreachable!("when stopped, the state machine must be in idle state");
546 }
547 }
548 }
549
550 /// Waits for the running task to stop.
551 ///
552 /// On success, the state machine is moved into the idle state.
poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>553 fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
554 loop {
555 match &mut self.state {
556 State::Idle(_) => return Poll::Ready(Ok(())),
557
558 State::WithMut(task) => {
559 // Poll the task to wait for it to finish.
560 let io = ready!(Pin::new(task).poll_next(cx))
561 .expect("`Unblock::with_mut()` operation has panicked");
562 self.state = State::Idle(Some(io));
563 }
564
565 State::Streaming(any, task) => {
566 // Drop the receiver to close the channel. This stops the `send()` operation in
567 // the task, after which the task returns the iterator back.
568 any.take();
569
570 // Poll the task to retrieve the iterator.
571 let iter = ready!(Pin::new(task).poll_next(cx))
572 .expect("`Unblock` stream operation has panicked");
573 self.state = State::Idle(Some(iter));
574 }
575
576 State::Reading(reader, task) => {
577 // Drop the reader to close the pipe. This stops copying inside the task, after
578 // which the task returns the I/O handle back.
579 reader.take();
580
581 // Poll the task to retrieve the I/O handle.
582 let (res, io) = ready!(Pin::new(task).poll_next(cx))
583 .expect("`Unblock` read operation has panicked");
584 // Make sure to move into the idle state before reporting errors.
585 self.state = State::Idle(Some(io));
586 res?;
587 }
588
589 State::Writing(writer, task) => {
590 // Drop the writer to close the pipe. This stops copying inside the task, after
591 // which the task flushes the I/O handle and
592 writer.take();
593
594 // Poll the task to retrieve the I/O handle.
595 let (res, io) = ready!(Pin::new(task).poll_next(cx))
596 .expect("`Unblock` write operation has panicked");
597 // Make sure to move into the idle state before reporting errors.
598 self.state = State::Idle(Some(io));
599 res?;
600 }
601
602 State::Seeking(task) => {
603 // Poll the task to wait for it to finish.
604 let (_, res, io) = ready!(Pin::new(task).poll_next(cx))
605 .expect("`Unblock` seek operation has panicked");
606 // Make sure to move into the idle state before reporting errors.
607 self.state = State::Idle(Some(io));
608 res?;
609 }
610 }
611 }
612 }
613 }
614
615 impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 struct Closed;
618 impl fmt::Debug for Closed {
619 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
620 f.write_str("<closed>")
621 }
622 }
623
624 struct Blocked;
625 impl fmt::Debug for Blocked {
626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627 f.write_str("<blocked>")
628 }
629 }
630
631 match &self.state {
632 State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
633 State::Idle(Some(io)) => {
634 let io: &T = &*io;
635 f.debug_struct("Unblock").field("io", io).finish()
636 }
637 State::WithMut(..)
638 | State::Streaming(..)
639 | State::Reading(..)
640 | State::Writing(..)
641 | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
642 }
643 }
644 }
645
646 /// Current state of a blocking task.
647 enum State<T> {
648 /// There is no blocking task.
649 ///
650 /// The inner value is readily available, unless it has already been extracted. The value is
651 /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
652 /// [`Unblock`].
653 Idle(Option<Box<T>>),
654
655 /// A [`Unblock::with_mut()`] closure was spawned and is still running.
656 WithMut(Task<Box<T>>),
657
658 /// The inner value is an [`Iterator`] currently iterating in a task.
659 ///
660 /// The `dyn Any` value here is a `mpsc::Receiver<<T as Iterator>::Item>`.
661 Streaming(Option<Box<dyn Any + Send>>, Task<Box<T>>),
662
663 /// The inner value is a [`Read`] currently reading in a task.
664 Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
665
666 /// The inner value is a [`Write`] currently writing in a task.
667 Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
668
669 /// The inner value is a [`Seek`] currently seeking in a task.
670 Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
671 }
672
673 impl<T: Iterator + Send + 'static> Stream for Unblock<T>
674 where
675 T::Item: Send + 'static,
676 {
677 type Item = T::Item;
678
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>679 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
680 loop {
681 match &mut self.state {
682 // If not in idle or active streaming state, stop the running task.
683 State::WithMut(..)
684 | State::Streaming(None, _)
685 | State::Reading(..)
686 | State::Writing(..)
687 | State::Seeking(..) => {
688 // Wait for the running task to stop.
689 let _ = ready!(self.poll_stop(cx));
690 }
691
692 // If idle, start a streaming task.
693 State::Idle(iter) => {
694 // Take the iterator out to run it on a blocking task.
695 let mut iter = iter.take().expect("inner iterator was taken out");
696
697 // This channel capacity seems to work well in practice. If it's too low, there
698 // will be too much synchronization between tasks. If too high, memory
699 // consumption increases.
700 let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
701
702 // Spawn a blocking task that runs the iterator and returns it when done.
703 let task = Executor::spawn(async move {
704 for item in &mut iter {
705 if sender.send(item).await.is_err() {
706 break;
707 }
708 }
709 iter
710 });
711
712 // Move into the busy state and poll again.
713 self.state = State::Streaming(Some(Box::new(receiver)), task);
714 }
715
716 // If streaming, receive an item.
717 State::Streaming(Some(any), task) => {
718 let receiver = any.downcast_mut::<Receiver<T::Item>>().unwrap();
719
720 // Poll the channel.
721 let opt = ready!(Pin::new(receiver).poll_next(cx));
722
723 // If the channel is closed, retrieve the iterator back from the blocking task.
724 // This is not really a required step, but it's cleaner to drop the iterator on
725 // the same thread that created it.
726 if opt.is_none() {
727 // Poll the task to retrieve the iterator.
728 let iter = ready!(Pin::new(task).poll_next(cx))
729 .expect("`Unblock` stream operation has panicked");
730 self.state = State::Idle(Some(iter));
731 }
732
733 return Poll::Ready(opt);
734 }
735 }
736 }
737 }
738 }
739
740 impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>741 fn poll_read(
742 mut self: Pin<&mut Self>,
743 cx: &mut Context<'_>,
744 buf: &mut [u8],
745 ) -> Poll<io::Result<usize>> {
746 loop {
747 match &mut self.state {
748 // If not in idle or active reading state, stop the running task.
749 State::WithMut(..)
750 | State::Reading(None, _)
751 | State::Streaming(..)
752 | State::Writing(..)
753 | State::Seeking(..) => {
754 // Wait for the running task to stop.
755 ready!(self.poll_stop(cx))?;
756 }
757
758 // If idle, start a reading task.
759 State::Idle(io) => {
760 // Take the I/O handle out to read it on a blocking task.
761 let mut io = io.take().expect("inner value was taken out");
762
763 // This pipe capacity seems to work well in practice. If it's too low, there
764 // will be too much synchronization between tasks. If too high, memory
765 // consumption increases.
766 let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
767
768 // Spawn a blocking task that reads and returns the I/O handle when done.
769 let task = Executor::spawn(async move {
770 // Copy bytes from the I/O handle into the pipe until the pipe is closed or
771 // an error occurs.
772 loop {
773 match future::poll_fn(|cx| writer.fill(cx, &mut io)).await {
774 Ok(0) => return (Ok(()), io),
775 Ok(_) => {}
776 Err(err) => return (Err(err), io),
777 }
778 }
779 });
780
781 // Move into the busy state and poll again.
782 self.state = State::Reading(Some(reader), task);
783 }
784
785 // If reading, read bytes from the pipe.
786 State::Reading(Some(reader), task) => {
787 // Poll the pipe.
788 let n = ready!(reader.drain(cx, buf))?;
789
790 // If the pipe is closed, retrieve the I/O handle back from the blocking task.
791 // This is not really a required step, but it's cleaner to drop the handle on
792 // the same thread that created it.
793 if n == 0 {
794 // Poll the task to retrieve the I/O handle.
795 let (res, io) = ready!(Pin::new(task).poll_next(cx))
796 .expect("`Unblock` read operation has panicked");
797 // Make sure to move into the idle state before reporting errors.
798 self.state = State::Idle(Some(io));
799 res?;
800 }
801
802 return Poll::Ready(Ok(n));
803 }
804 }
805 }
806 }
807 }
808
809 impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>810 fn poll_write(
811 mut self: Pin<&mut Self>,
812 cx: &mut Context<'_>,
813 buf: &[u8],
814 ) -> Poll<io::Result<usize>> {
815 loop {
816 match &mut self.state {
817 // If not in idle or active writing state, stop the running task.
818 State::WithMut(..)
819 | State::Writing(None, _)
820 | State::Streaming(..)
821 | State::Reading(..)
822 | State::Seeking(..) => {
823 // Wait for the running task to stop.
824 ready!(self.poll_stop(cx))?;
825 }
826
827 // If idle, start the writing task.
828 State::Idle(io) => {
829 // Take the I/O handle out to write on a blocking task.
830 let mut io = io.take().expect("inner value was taken out");
831
832 // This pipe capacity seems to work well in practice. If it's too low, there will
833 // be too much synchronization between tasks. If too high, memory consumption
834 // increases.
835 let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
836
837 // Spawn a blocking task that writes and returns the I/O handle when done.
838 let task = Executor::spawn(async move {
839 // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
840 // error occurs. Flush the I/O handle at the end.
841 loop {
842 match future::poll_fn(|cx| reader.drain(cx, &mut io)).await {
843 Ok(0) => return (io.flush(), io),
844 Ok(_) => {}
845 Err(err) => {
846 let _ = io.flush();
847 return (Err(err), io);
848 }
849 }
850 }
851 });
852
853 // Move into the busy state and poll again.
854 self.state = State::Writing(Some(writer), task);
855 }
856
857 // If writing, write more bytes into the pipe.
858 State::Writing(Some(writer), _) => return writer.fill(cx, buf),
859 }
860 }
861 }
862
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>863 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
864 loop {
865 match &mut self.state {
866 // If not in idle state, stop the running task.
867 State::WithMut(..)
868 | State::Streaming(..)
869 | State::Writing(..)
870 | State::Reading(..)
871 | State::Seeking(..) => {
872 // Wait for the running task to stop.
873 ready!(self.poll_stop(cx))?;
874 }
875
876 // Idle implies flushed.
877 State::Idle(_) => return Poll::Ready(Ok(())),
878 }
879 }
880 }
881
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>882 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
883 // First, make sure the I/O handle is flushed.
884 ready!(Pin::new(&mut self).poll_flush(cx))?;
885
886 // Then move into the idle state with no I/O handle, thus dropping it.
887 self.state = State::Idle(None);
888 Poll::Ready(Ok(()))
889 }
890 }
891
892 impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>893 fn poll_seek(
894 mut self: Pin<&mut Self>,
895 cx: &mut Context<'_>,
896 pos: SeekFrom,
897 ) -> Poll<io::Result<u64>> {
898 loop {
899 match &mut self.state {
900 // If not in idle state, stop the running task.
901 State::WithMut(..)
902 | State::Streaming(..)
903 | State::Reading(..)
904 | State::Writing(..) => {
905 // Wait for the running task to stop.
906 ready!(self.poll_stop(cx))?;
907 }
908
909 State::Idle(io) => {
910 // Take the I/O handle out to seek on a blocking task.
911 let mut io = io.take().expect("inner value was taken out");
912
913 let task = Executor::spawn(async move {
914 let res = io.seek(pos);
915 (pos, res, io)
916 });
917 self.state = State::Seeking(task);
918 }
919
920 State::Seeking(task) => {
921 // Poll the task to wait for it to finish.
922 let (original_pos, res, io) = ready!(Pin::new(task).poll_next(cx))
923 .expect("`Unblock` seek operation has panicked");
924 // Make sure to move into the idle state before reporting errors.
925 self.state = State::Idle(Some(io));
926 let current = res?;
927
928 // If the `pos` argument matches the original one, return the result.
929 if original_pos == pos {
930 return Poll::Ready(Ok(current));
931 }
932 }
933 }
934 }
935 }
936 }
937
938 /// Creates a bounded single-producer single-consumer pipe.
939 ///
940 /// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
941 ///
942 /// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
943 /// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
944 ///
945 /// When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
946 /// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
pipe(cap: usize) -> (Reader, Writer)947 fn pipe(cap: usize) -> (Reader, Writer) {
948 assert!(cap > 0, "capacity must be positive");
949 assert!(cap.checked_mul(2).is_some(), "capacity is too large");
950
951 // Allocate the ring buffer.
952 let mut v = Vec::with_capacity(cap);
953 let buffer = v.as_mut_ptr();
954 mem::forget(v);
955
956 let inner = Arc::new(Pipe {
957 head: AtomicUsize::new(0),
958 tail: AtomicUsize::new(0),
959 reader: AtomicWaker::new(),
960 writer: AtomicWaker::new(),
961 closed: AtomicBool::new(false),
962 buffer,
963 cap,
964 });
965
966 let r = Reader {
967 inner: inner.clone(),
968 head: 0,
969 tail: 0,
970 };
971
972 let w = Writer {
973 inner,
974 head: 0,
975 tail: 0,
976 zeroed_until: 0,
977 };
978
979 (r, w)
980 }
981
982 /// The reading side of a pipe.
983 struct Reader {
984 /// The inner ring buffer.
985 inner: Arc<Pipe>,
986
987 /// The head index, moved by the reader, in the range `0..2*cap`.
988 ///
989 /// This index always matches `inner.head`.
990 head: usize,
991
992 /// The tail index, moved by the writer, in the range `0..2*cap`.
993 ///
994 /// This index is a snapshot of `index.tail` that might become stale at any point.
995 tail: usize,
996 }
997
998 /// The writing side of a pipe.
999 struct Writer {
1000 /// The inner ring buffer.
1001 inner: Arc<Pipe>,
1002
1003 /// The head index, moved by the reader, in the range `0..2*cap`.
1004 ///
1005 /// This index is a snapshot of `index.head` that might become stale at any point.
1006 head: usize,
1007
1008 /// The tail index, moved by the writer, in the range `0..2*cap`.
1009 ///
1010 /// This index always matches `inner.tail`.
1011 tail: usize,
1012
1013 /// How many bytes at the beginning of the buffer have been zeroed.
1014 ///
1015 /// The pipe allocates an uninitialized buffer, and we must be careful about passing
1016 /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
1017 /// expensive, so we zero it in smaller chunks as the writer makes progress.
1018 zeroed_until: usize,
1019 }
1020
1021 unsafe impl Send for Reader {}
1022 unsafe impl Send for Writer {}
1023
1024 /// The inner ring buffer.
1025 ///
1026 /// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
1027 /// `0..cap` range. The distance between head and tail indices is never more than `cap`.
1028 ///
1029 /// The reason why indices are not in the range `0..cap` is because we need to distinguish between
1030 /// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
1031 /// could mean the pipe is either empty or full, but we don't know which!
1032 struct Pipe {
1033 /// The head index, moved by the reader, in the range `0..2*cap`.
1034 head: AtomicUsize,
1035
1036 /// The tail index, moved by the writer, in the range `0..2*cap`.
1037 tail: AtomicUsize,
1038
1039 /// A waker representing the blocked reader.
1040 reader: AtomicWaker,
1041
1042 /// A waker representing the blocked writer.
1043 writer: AtomicWaker,
1044
1045 /// Set to `true` if the reader or writer was dropped.
1046 closed: AtomicBool,
1047
1048 /// The byte buffer.
1049 buffer: *mut u8,
1050
1051 /// The buffer capacity.
1052 cap: usize,
1053 }
1054
1055 impl Drop for Pipe {
drop(&mut self)1056 fn drop(&mut self) {
1057 // Deallocate the byte buffer.
1058 unsafe {
1059 Vec::from_raw_parts(self.buffer, 0, self.cap);
1060 }
1061 }
1062 }
1063
1064 impl Drop for Reader {
drop(&mut self)1065 fn drop(&mut self) {
1066 // Dropping closes the pipe and then wakes the writer.
1067 self.inner.closed.store(true, Ordering::SeqCst);
1068 self.inner.writer.wake();
1069 }
1070 }
1071
1072 impl Drop for Writer {
drop(&mut self)1073 fn drop(&mut self) {
1074 // Dropping closes the pipe and then wakes the reader.
1075 self.inner.closed.store(true, Ordering::SeqCst);
1076 self.inner.reader.wake();
1077 }
1078 }
1079
1080 impl Reader {
1081 /// Reads bytes from this reader and writes into blocking `dest`.
drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>>1082 fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>> {
1083 let cap = self.inner.cap;
1084
1085 // Calculates the distance between two indices.
1086 let distance = |a: usize, b: usize| {
1087 if a <= b {
1088 b - a
1089 } else {
1090 2 * cap - (a - b)
1091 }
1092 };
1093
1094 // If the pipe appears to be empty...
1095 if distance(self.head, self.tail) == 0 {
1096 // Reload the tail in case it's become stale.
1097 self.tail = self.inner.tail.load(Ordering::Acquire);
1098
1099 // If the pipe is now really empty...
1100 if distance(self.head, self.tail) == 0 {
1101 // Register the waker.
1102 self.inner.reader.register(cx.waker());
1103 atomic::fence(Ordering::SeqCst);
1104
1105 // Reload the tail after registering the waker.
1106 self.tail = self.inner.tail.load(Ordering::Acquire);
1107
1108 // If the pipe is still empty...
1109 if distance(self.head, self.tail) == 0 {
1110 // Check whether the pipe is closed or just empty.
1111 if self.inner.closed.load(Ordering::Relaxed) {
1112 return Poll::Ready(Ok(0));
1113 } else {
1114 return Poll::Pending;
1115 }
1116 }
1117 }
1118 }
1119
1120 // The pipe is not empty so remove the waker.
1121 self.inner.reader.take();
1122
1123 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1124 let real_index = |i: usize| {
1125 if i < cap {
1126 i
1127 } else {
1128 i - cap
1129 }
1130 };
1131
1132 // Number of bytes read so far.
1133 let mut count = 0;
1134
1135 loop {
1136 // Calculate how many bytes to read in this iteration.
1137 let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
1138 .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
1139 .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
1140
1141 // Create a slice of data in the pipe buffer.
1142 let pipe_slice =
1143 unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
1144
1145 // Copy bytes from the pipe buffer into `dest`.
1146 let n = dest.write(pipe_slice)?;
1147 count += n;
1148
1149 // If pipe is empty or `dest` is full, return.
1150 if n == 0 {
1151 return Poll::Ready(Ok(count));
1152 }
1153
1154 // Move the head forward.
1155 if self.head + n < 2 * cap {
1156 self.head += n;
1157 } else {
1158 self.head = 0;
1159 }
1160
1161 // Store the current head index.
1162 self.inner.head.store(self.head, Ordering::Release);
1163
1164 // Wake the writer because the pipe is not full.
1165 self.inner.writer.wake();
1166 }
1167 }
1168 }
1169
1170 impl Writer {
1171 /// Reads bytes from blocking `src` and writes into this writer.
fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>>1172 fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>> {
1173 // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
1174 if self.inner.closed.load(Ordering::Relaxed) {
1175 return Poll::Ready(Ok(0));
1176 }
1177
1178 // Calculates the distance between two indices.
1179 let cap = self.inner.cap;
1180 let distance = |a: usize, b: usize| {
1181 if a <= b {
1182 b - a
1183 } else {
1184 2 * cap - (a - b)
1185 }
1186 };
1187
1188 // If the pipe appears to be full...
1189 if distance(self.head, self.tail) == cap {
1190 // Reload the head in case it's become stale.
1191 self.head = self.inner.head.load(Ordering::Acquire);
1192
1193 // If the pipe is now really empty...
1194 if distance(self.head, self.tail) == cap {
1195 // Register the waker.
1196 self.inner.writer.register(cx.waker());
1197 atomic::fence(Ordering::SeqCst);
1198
1199 // Reload the head after registering the waker.
1200 self.head = self.inner.head.load(Ordering::Acquire);
1201
1202 // If the pipe is still full...
1203 if distance(self.head, self.tail) == cap {
1204 // Check whether the pipe is closed or just full.
1205 if self.inner.closed.load(Ordering::Relaxed) {
1206 return Poll::Ready(Ok(0));
1207 } else {
1208 return Poll::Pending;
1209 }
1210 }
1211 }
1212 }
1213
1214 // The pipe is not full so remove the waker.
1215 self.inner.writer.take();
1216
1217 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
1218 let real_index = |i: usize| {
1219 if i < cap {
1220 i
1221 } else {
1222 i - cap
1223 }
1224 };
1225
1226 // Number of bytes written so far.
1227 let mut count = 0;
1228
1229 loop {
1230 // Calculate how many bytes to write in this iteration.
1231 let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
1232 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
1233 .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
1234 .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
1235
1236 // Create a slice of available space in the pipe buffer.
1237 let pipe_slice_mut = unsafe {
1238 let from = real_index(self.tail);
1239 let to = from + n;
1240
1241 // Make sure all bytes in the slice are initialized.
1242 if self.zeroed_until < to {
1243 self.inner
1244 .buffer
1245 .add(self.zeroed_until)
1246 .write_bytes(0u8, to - self.zeroed_until);
1247 self.zeroed_until = to;
1248 }
1249
1250 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
1251 };
1252
1253 // Copy bytes from `src` into the piper buffer.
1254 let n = src.read(pipe_slice_mut)?;
1255 count += n;
1256
1257 // If the pipe is full or closed, or `src` is empty, return.
1258 if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
1259 return Poll::Ready(Ok(count));
1260 }
1261
1262 // Move the tail forward.
1263 if self.tail + n < 2 * cap {
1264 self.tail += n;
1265 } else {
1266 self.tail = 0;
1267 }
1268
1269 // Store the current tail index.
1270 self.inner.tail.store(self.tail, Ordering::Release);
1271
1272 // Wake the reader because the pipe is not empty.
1273 self.inner.reader.wake();
1274 }
1275 }
1276 }
1277