1 use crate::enter;
2 use futures_core::future::Future;
3 use futures_core::stream::Stream;
4 use futures_core::task::{Context, Poll};
5 use futures_task::{waker_ref, ArcWake};
6 use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7 use futures_util::pin_mut;
8 use futures_util::stream::FuturesUnordered;
9 use futures_util::stream::StreamExt;
10 use std::cell::RefCell;
11 use std::ops::{Deref, DerefMut};
12 use std::rc::{Rc, Weak};
13 use std::sync::{
14     atomic::{AtomicBool, Ordering},
15     Arc,
16 };
17 use std::thread::{self, Thread};
18 
19 /// A single-threaded task pool for polling futures to completion.
20 ///
21 /// This executor allows you to multiplex any number of tasks onto a single
22 /// thread. It's appropriate to poll strictly I/O-bound futures that do very
23 /// little work in between I/O actions.
24 ///
25 /// To get a handle to the pool that implements
26 /// [`Spawn`](futures_task::Spawn), use the
27 /// [`spawner()`](LocalPool::spawner) method. Because the executor is
28 /// single-threaded, it supports a special form of task spawning for non-`Send`
29 /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
30 #[derive(Debug)]
31 pub struct LocalPool {
32     pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
33     incoming: Rc<Incoming>,
34 }
35 
36 /// A handle to a [`LocalPool`](LocalPool) that implements
37 /// [`Spawn`](futures_task::Spawn).
38 #[derive(Clone, Debug)]
39 pub struct LocalSpawner {
40     incoming: Weak<Incoming>,
41 }
42 
43 type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
44 
45 pub(crate) struct ThreadNotify {
46     /// The (single) executor thread.
47     thread: Thread,
48     /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
49     /// before the next `park()`, which may otherwise happen if the code
50     /// being executed as part of the future(s) being polled makes use of
51     /// park / unpark calls of its own, i.e. we cannot assume that no other
52     /// code uses park / unpark on the executing `thread`.
53     unparked: AtomicBool,
54 }
55 
56 thread_local! {
57     static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
58         thread: thread::current(),
59         unparked: AtomicBool::new(false),
60     });
61 }
62 
63 impl ArcWake for ThreadNotify {
wake_by_ref(arc_self: &Arc<Self>)64     fn wake_by_ref(arc_self: &Arc<Self>) {
65         // Make sure the wakeup is remembered until the next `park()`.
66         let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
67         if !unparked {
68             // If the thread has not been unparked yet, it must be done
69             // now. If it was actually parked, it will run again,
70             // otherwise the token made available by `unpark`
71             // may be consumed before reaching `park()`, but `unparked`
72             // ensures it is not forgotten.
73             arc_self.thread.unpark();
74         }
75     }
76 }
77 
78 // Set up and run a basic single-threaded spawner loop, invoking `f` on each
79 // turn.
run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T80 fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
81     let _enter = enter().expect(
82         "cannot execute `LocalPool` executor from within \
83          another executor",
84     );
85 
86     CURRENT_THREAD_NOTIFY.with(|thread_notify| {
87         let waker = waker_ref(thread_notify);
88         let mut cx = Context::from_waker(&waker);
89         loop {
90             if let Poll::Ready(t) = f(&mut cx) {
91                 return t;
92             }
93             // Consume the wakeup that occurred while executing `f`, if any.
94             let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
95             if !unparked {
96                 // No wakeup occurred. It may occur now, right before parking,
97                 // but in that case the token made available by `unpark()`
98                 // is guaranteed to still be available and `park()` is a no-op.
99                 thread::park();
100                 // When the thread is unparked, `unparked` will have been set
101                 // and needs to be unset before the next call to `f` to avoid
102                 // a redundant loop iteration.
103                 thread_notify.unparked.store(false, Ordering::Release);
104             }
105         }
106     })
107 }
108 
poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T109 fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
110     let _enter = enter().expect(
111         "cannot execute `LocalPool` executor from within \
112          another executor",
113     );
114 
115     CURRENT_THREAD_NOTIFY.with(|thread_notify| {
116         let waker = waker_ref(thread_notify);
117         let mut cx = Context::from_waker(&waker);
118         f(&mut cx)
119     })
120 }
121 
122 impl LocalPool {
123     /// Create a new, empty pool of tasks.
new() -> Self124     pub fn new() -> Self {
125         Self { pool: FuturesUnordered::new(), incoming: Default::default() }
126     }
127 
128     /// Get a clonable handle to the pool as a [`Spawn`].
spawner(&self) -> LocalSpawner129     pub fn spawner(&self) -> LocalSpawner {
130         LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
131     }
132 
133     /// Run all tasks in the pool to completion.
134     ///
135     /// ```
136     /// use futures::executor::LocalPool;
137     ///
138     /// let mut pool = LocalPool::new();
139     ///
140     /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
141     ///
142     /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
143     /// pool.run();
144     /// ```
145     ///
146     /// The function will block the calling thread until *all* tasks in the pool
147     /// are complete, including any spawned while running existing tasks.
run(&mut self)148     pub fn run(&mut self) {
149         run_executor(|cx| self.poll_pool(cx))
150     }
151 
152     /// Runs all the tasks in the pool until the given future completes.
153     ///
154     /// ```
155     /// use futures::executor::LocalPool;
156     ///
157     /// let mut pool = LocalPool::new();
158     /// # let my_app  = async {};
159     ///
160     /// // run tasks in the pool until `my_app` completes
161     /// pool.run_until(my_app);
162     /// ```
163     ///
164     /// The function will block the calling thread *only* until the future `f`
165     /// completes; there may still be incomplete tasks in the pool, which will
166     /// be inert after the call completes, but can continue with further use of
167     /// one of the pool's run or poll methods. While the function is running,
168     /// however, all tasks in the pool will try to make progress.
run_until<F: Future>(&mut self, future: F) -> F::Output169     pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
170         pin_mut!(future);
171 
172         run_executor(|cx| {
173             {
174                 // if our main task is done, so are we
175                 let result = future.as_mut().poll(cx);
176                 if let Poll::Ready(output) = result {
177                     return Poll::Ready(output);
178                 }
179             }
180 
181             let _ = self.poll_pool(cx);
182             Poll::Pending
183         })
184     }
185 
186     /// Runs all tasks and returns after completing one future or until no more progress
187     /// can be made. Returns `true` if one future was completed, `false` otherwise.
188     ///
189     /// ```
190     /// use futures::executor::LocalPool;
191     /// use futures::task::LocalSpawnExt;
192     /// use futures::future::{ready, pending};
193     ///
194     /// let mut pool = LocalPool::new();
195     /// let spawner = pool.spawner();
196     ///
197     /// spawner.spawn_local(ready(())).unwrap();
198     /// spawner.spawn_local(ready(())).unwrap();
199     /// spawner.spawn_local(pending()).unwrap();
200     ///
201     /// // Run the two ready tasks and return true for them.
202     /// pool.try_run_one(); // returns true after completing one of the ready futures
203     /// pool.try_run_one(); // returns true after completing the other ready future
204     ///
205     /// // the remaining task can not be completed
206     /// assert!(!pool.try_run_one()); // returns false
207     /// ```
208     ///
209     /// This function will not block the calling thread and will return the moment
210     /// that there are no tasks left for which progress can be made or after exactly one
211     /// task was completed; Remaining incomplete tasks in the pool can continue with
212     /// further use of one of the pool's run or poll methods.
213     /// Though only one task will be completed, progress may be made on multiple tasks.
try_run_one(&mut self) -> bool214     pub fn try_run_one(&mut self) -> bool {
215         poll_executor(|ctx| {
216             loop {
217                 let ret = self.poll_pool_once(ctx);
218 
219                 // return if we have executed a future
220                 if let Poll::Ready(Some(_)) = ret {
221                     return true;
222                 }
223 
224                 // if there are no new incoming futures
225                 // then there is no feature that can make progress
226                 // and we can return without having completed a single future
227                 if self.incoming.borrow().is_empty() {
228                     return false;
229                 }
230             }
231         })
232     }
233 
234     /// Runs all tasks in the pool and returns if no more progress can be made
235     /// on any task.
236     ///
237     /// ```
238     /// use futures::executor::LocalPool;
239     /// use futures::task::LocalSpawnExt;
240     /// use futures::future::{ready, pending};
241     ///
242     /// let mut pool = LocalPool::new();
243     /// let spawner = pool.spawner();
244     ///
245     /// spawner.spawn_local(ready(())).unwrap();
246     /// spawner.spawn_local(ready(())).unwrap();
247     /// spawner.spawn_local(pending()).unwrap();
248     ///
249     /// // Runs the two ready task and returns.
250     /// // The empty task remains in the pool.
251     /// pool.run_until_stalled();
252     /// ```
253     ///
254     /// This function will not block the calling thread and will return the moment
255     /// that there are no tasks left for which progress can be made;
256     /// remaining incomplete tasks in the pool can continue with further use of one
257     /// of the pool's run or poll methods. While the function is running, all tasks
258     /// in the pool will try to make progress.
run_until_stalled(&mut self)259     pub fn run_until_stalled(&mut self) {
260         poll_executor(|ctx| {
261             let _ = self.poll_pool(ctx);
262         });
263     }
264 
265     // Make maximal progress on the entire pool of spawned task, returning `Ready`
266     // if the pool is empty and `Pending` if no further progress can be made.
poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()>267     fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
268         // state for the FuturesUnordered, which will never be used
269         loop {
270             let ret = self.poll_pool_once(cx);
271 
272             // we queued up some new tasks; add them and poll again
273             if !self.incoming.borrow().is_empty() {
274                 continue;
275             }
276 
277             // no queued tasks; we may be done
278             match ret {
279                 Poll::Pending => return Poll::Pending,
280                 Poll::Ready(None) => return Poll::Ready(()),
281                 _ => {}
282             }
283         }
284     }
285 
286     // Try make minimal progress on the pool of spawned tasks
poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>287     fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
288         // empty the incoming queue of newly-spawned tasks
289         {
290             let mut incoming = self.incoming.borrow_mut();
291             for task in incoming.drain(..) {
292                 self.pool.push(task)
293             }
294         }
295 
296         // try to execute the next ready future
297         self.pool.poll_next_unpin(cx)
298     }
299 }
300 
301 impl Default for LocalPool {
default() -> Self302     fn default() -> Self {
303         Self::new()
304     }
305 }
306 
307 /// Run a future to completion on the current thread.
308 ///
309 /// This function will block the caller until the given future has completed.
310 ///
311 /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
312 /// spawned tasks.
block_on<F: Future>(f: F) -> F::Output313 pub fn block_on<F: Future>(f: F) -> F::Output {
314     pin_mut!(f);
315     run_executor(|cx| f.as_mut().poll(cx))
316 }
317 
318 /// Turn a stream into a blocking iterator.
319 ///
320 /// When `next` is called on the resulting `BlockingStream`, the caller
321 /// will be blocked until the next element of the `Stream` becomes available.
block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S>322 pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
323     BlockingStream { stream }
324 }
325 
326 /// An iterator which blocks on values from a stream until they become available.
327 #[derive(Debug)]
328 pub struct BlockingStream<S: Stream + Unpin> {
329     stream: S,
330 }
331 
332 impl<S: Stream + Unpin> Deref for BlockingStream<S> {
333     type Target = S;
deref(&self) -> &Self::Target334     fn deref(&self) -> &Self::Target {
335         &self.stream
336     }
337 }
338 
339 impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
deref_mut(&mut self) -> &mut Self::Target340     fn deref_mut(&mut self) -> &mut Self::Target {
341         &mut self.stream
342     }
343 }
344 
345 impl<S: Stream + Unpin> BlockingStream<S> {
346     /// Convert this `BlockingStream` into the inner `Stream` type.
into_inner(self) -> S347     pub fn into_inner(self) -> S {
348         self.stream
349     }
350 }
351 
352 impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
353     type Item = S::Item;
354 
next(&mut self) -> Option<Self::Item>355     fn next(&mut self) -> Option<Self::Item> {
356         LocalPool::new().run_until(self.stream.next())
357     }
358 
size_hint(&self) -> (usize, Option<usize>)359     fn size_hint(&self) -> (usize, Option<usize>) {
360         self.stream.size_hint()
361     }
362 }
363 
364 impl Spawn for LocalSpawner {
spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>365     fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
366         if let Some(incoming) = self.incoming.upgrade() {
367             incoming.borrow_mut().push(future.into());
368             Ok(())
369         } else {
370             Err(SpawnError::shutdown())
371         }
372     }
373 
status(&self) -> Result<(), SpawnError>374     fn status(&self) -> Result<(), SpawnError> {
375         if self.incoming.upgrade().is_some() {
376             Ok(())
377         } else {
378             Err(SpawnError::shutdown())
379         }
380     }
381 }
382 
383 impl LocalSpawn for LocalSpawner {
spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>384     fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
385         if let Some(incoming) = self.incoming.upgrade() {
386             incoming.borrow_mut().push(future);
387             Ok(())
388         } else {
389             Err(SpawnError::shutdown())
390         }
391     }
392 
status_local(&self) -> Result<(), SpawnError>393     fn status_local(&self) -> Result<(), SpawnError> {
394         if self.incoming.upgrade().is_some() {
395             Ok(())
396         } else {
397             Err(SpawnError::shutdown())
398         }
399     }
400 }
401