1 //! An unbounded set of futures.
2 //!
3 //! This module is only available when the `std` or `alloc` feature of this
4 //! library is activated, and it is activated by default.
5 
6 use crate::task::{AtomicWaker};
7 use futures_core::future::{Future, FutureObj, LocalFutureObj};
8 use futures_core::stream::{FusedStream, Stream};
9 use futures_core::task::{Context, Poll, Spawn, LocalSpawn, SpawnError};
10 use core::cell::UnsafeCell;
11 use core::fmt::{self, Debug};
12 use core::iter::FromIterator;
13 use core::marker::PhantomData;
14 use core::mem;
15 use core::pin::Pin;
16 use core::ptr;
17 use core::sync::atomic::Ordering::SeqCst;
18 use core::sync::atomic::{AtomicPtr, AtomicBool};
19 use alloc::sync::{Arc, Weak};
20 
21 mod abort;
22 
23 mod iter;
24 pub use self::iter::{IterMut, IterPinMut};
25 
26 mod task;
27 use self::task::Task;
28 
29 mod ready_to_run_queue;
30 use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
31 
32 /// Constant used for a `FuturesUnordered` to indicate we are empty and have
33 /// yielded a `None` element so can return `true` from
34 /// `FusedStream::is_terminated`
35 ///
36 /// It is safe to not check for this when incrementing as even a ZST future will
37 /// have a `Task` allocated for it, so we cannot ever reach usize::max_value()
38 /// without running out of ram.
39 const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();
40 
41 /// A set of futures which may complete in any order.
42 ///
43 /// This structure is optimized to manage a large number of futures.
44 /// Futures managed by [`FuturesUnordered`] will only be polled when they
45 /// generate wake-up notifications. This reduces the required amount of work
46 /// needed to poll large numbers of futures.
47 ///
48 /// [`FuturesUnordered`] can be filled by [`collect`](Iterator::collect)ing an
49 /// iterator of futures into a [`FuturesUnordered`], or by
50 /// [`push`](FuturesUnordered::push)ing futures onto an existing
51 /// [`FuturesUnordered`]. When new futures are added,
52 /// [`poll_next`](Stream::poll_next) must be called in order to begin receiving
53 /// wake-ups for new futures.
54 ///
55 /// Note that you can create a ready-made [`FuturesUnordered`] via the
56 /// [`collect`](Iterator::collect) method, or you can start with an empty set
57 /// with the [`FuturesUnordered::new`] constructor.
58 ///
59 /// This type is only available when the `std` or `alloc` feature of this
60 /// library is activated, and it is activated by default.
61 #[must_use = "streams do nothing unless polled"]
62 pub struct FuturesUnordered<Fut> {
63     ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,
64     len: usize,
65     head_all: *const Task<Fut>,
66 }
67 
68 unsafe impl<Fut: Send> Send for FuturesUnordered<Fut> {}
69 unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
70 impl<Fut> Unpin for FuturesUnordered<Fut> {}
71 
72 impl Spawn for FuturesUnordered<FutureObj<'_, ()>> {
spawn_obj(&mut self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError>73     fn spawn_obj(&mut self, future_obj: FutureObj<'static, ()>)
74         -> Result<(), SpawnError>
75     {
76         self.push(future_obj);
77         Ok(())
78     }
79 }
80 
81 impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
spawn_local_obj(&mut self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>82     fn spawn_local_obj(&mut self, future_obj: LocalFutureObj<'static, ()>)
83         -> Result<(), SpawnError>
84     {
85         self.push(future_obj);
86         Ok(())
87     }
88 }
89 
90 // FuturesUnordered is implemented using two linked lists. One which links all
91 // futures managed by a `FuturesUnordered` and one that tracks futures that have
92 // been scheduled for polling. The first linked list is not thread safe and is
93 // only accessed by the thread that owns the `FuturesUnordered` value. The
94 // second linked list is an implementation of the intrusive MPSC queue algorithm
95 // described by 1024cores.net.
96 //
97 // When a future is submitted to the set, a task is allocated and inserted in
98 // both linked lists. The next call to `poll_next` will (eventually) see this
99 // task and call `poll` on the future.
100 //
101 // Before a managed future is polled, the current context's waker is replaced
102 // with one that is aware of the specific future being run. This ensures that
103 // wake-up notifications generated by that specific future are visible to
104 // `FuturesUnordered`. When a wake-up notification is received, the task is
105 // inserted into the ready to run queue, so that its future can be polled later.
106 //
107 // Each task is wrapped in an `Arc` and thereby atomically reference counted.
108 // Also, each task contains an `AtomicBool` which acts as a flag that indicates
109 // whether the task is currently inserted in the atomic queue. When a wake-up
110 // notifiaction is received, the task will only be inserted into the ready to
111 // run queue if it isn't inserted already.
112 
113 impl<Fut: Future> FuturesUnordered<Fut> {
114     /// Constructs a new, empty [`FuturesUnordered`].
115     ///
116     /// The returned [`FuturesUnordered`] does not contain any futures.
117     /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
118     /// return [`Poll::Ready(None)`](Poll::Ready).
new() -> FuturesUnordered<Fut>119     pub fn new() -> FuturesUnordered<Fut> {
120         let stub = Arc::new(Task {
121             future: UnsafeCell::new(None),
122             next_all: UnsafeCell::new(ptr::null()),
123             prev_all: UnsafeCell::new(ptr::null()),
124             next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
125             queued: AtomicBool::new(true),
126             ready_to_run_queue: Weak::new(),
127         });
128         let stub_ptr = &*stub as *const Task<Fut>;
129         let ready_to_run_queue = Arc::new(ReadyToRunQueue {
130             waker: AtomicWaker::new(),
131             head: AtomicPtr::new(stub_ptr as *mut _),
132             tail: UnsafeCell::new(stub_ptr),
133             stub,
134         });
135 
136         FuturesUnordered {
137             len: 0,
138             head_all: ptr::null_mut(),
139             ready_to_run_queue,
140         }
141     }
142 }
143 
144 impl<Fut: Future> Default for FuturesUnordered<Fut> {
default() -> FuturesUnordered<Fut>145     fn default() -> FuturesUnordered<Fut> {
146         FuturesUnordered::new()
147     }
148 }
149 
150 impl<Fut> FuturesUnordered<Fut> {
151     /// Returns the number of futures contained in the set.
152     ///
153     /// This represents the total number of in-flight futures.
len(&self) -> usize154     pub fn len(&self) -> usize {
155         if self.len == TERMINATED_SENTINEL_LENGTH { 0 } else { self.len }
156     }
157 
158     /// Returns `true` if the set contains no futures.
is_empty(&self) -> bool159     pub fn is_empty(&self) -> bool {
160         self.len == 0 || self.len == TERMINATED_SENTINEL_LENGTH
161     }
162 
163     /// Push a future into the set.
164     ///
165     /// This method adds the given future to the set. This method will not
166     /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
167     /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called
168     /// in order to receive wake-up notifications for the given future.
push(&mut self, future: Fut)169     pub fn push(&mut self, future: Fut) {
170         let task = Arc::new(Task {
171             future: UnsafeCell::new(Some(future)),
172             next_all: UnsafeCell::new(ptr::null_mut()),
173             prev_all: UnsafeCell::new(ptr::null_mut()),
174             next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
175             queued: AtomicBool::new(true),
176             ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
177         });
178 
179         // If we've previously marked ourselves as terminated we need to reset
180         // len to 0 to track it correctly
181         if self.len == TERMINATED_SENTINEL_LENGTH {
182             self.len = 0;
183         }
184 
185         // Right now our task has a strong reference count of 1. We transfer
186         // ownership of this reference count to our internal linked list
187         // and we'll reclaim ownership through the `unlink` method below.
188         let ptr = self.link(task);
189 
190         // We'll need to get the future "into the system" to start tracking it,
191         // e.g. getting its wake-up notifications going to us tracking which
192         // futures are ready. To do that we unconditionally enqueue it for
193         // polling here.
194         self.ready_to_run_queue.enqueue(ptr);
195     }
196 
197     /// Returns an iterator that allows modifying each future in the set.
iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin198     pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
199         IterMut(Pin::new(self).iter_pin_mut())
200     }
201 
202     /// Returns an iterator that allows modifying each future in the set.
iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, Fut>203     pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, Fut> {
204         IterPinMut {
205             task: self.head_all,
206             len: self.len(),
207             _marker: PhantomData
208         }
209     }
210 
211     /// Releases the task. It destorys the future inside and either drops
212     /// the `Arc<Task>` or transfers ownership to the ready to run queue.
213     /// The task this method is called on must have been unlinked before.
release_task(&mut self, task: Arc<Task<Fut>>)214     fn release_task(&mut self, task: Arc<Task<Fut>>) {
215         // `release_task` must only be called on unlinked tasks
216         unsafe {
217             debug_assert!((*task.next_all.get()).is_null());
218             debug_assert!((*task.prev_all.get()).is_null());
219         }
220 
221         // The future is done, try to reset the queued flag. This will prevent
222         // `wake` from doing any work in the future
223         let prev = task.queued.swap(true, SeqCst);
224 
225         // Drop the future, even if it hasn't finished yet. This is safe
226         // because we're dropping the future on the thread that owns
227         // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
228         // such.
229         unsafe {
230             // Set to `None` rather than `take()`ing to prevent moving the
231             // future.
232             *task.future.get() = None;
233         }
234 
235         // If the queued flag was previously set, then it means that this task
236         // is still in our internal ready to run queue. We then transfer
237         // ownership of our reference count to the ready to run queue, and it'll
238         // come along and free it later, noticing that the future is `None`.
239         //
240         // If, however, the queued flag was *not* set then we're safe to
241         // release our reference count on the task. The queued flag was set
242         // above so all future `enqueue` operations will not actually
243         // enqueue the task, so our task will never see the ready to run queue
244         // again. The task itself will be deallocated once all reference counts
245         // have been dropped elsewhere by the various wakers that contain it.
246         if prev {
247             mem::forget(task);
248         }
249     }
250 
251     /// Insert a new task into the internal linked list.
link(&mut self, task: Arc<Task<Fut>>) -> *const Task<Fut>252     fn link(&mut self, task: Arc<Task<Fut>>) -> *const Task<Fut> {
253         let ptr = Arc::into_raw(task);
254         unsafe {
255             *(*ptr).next_all.get() = self.head_all;
256             if !self.head_all.is_null() {
257                 *(*self.head_all).prev_all.get() = ptr;
258             }
259         }
260 
261         self.head_all = ptr;
262         self.len += 1;
263         ptr
264     }
265 
266     /// Remove the task from the linked list tracking all tasks currently
267     /// managed by `FuturesUnordered`.
268     /// This method is unsafe because it has be guaranteed that `task` is a
269     /// valid pointer.
unlink(&mut self, task: *const Task<Fut>) -> Arc<Task<Fut>>270     unsafe fn unlink(&mut self, task: *const Task<Fut>) -> Arc<Task<Fut>> {
271         let task = Arc::from_raw(task);
272         let next = *task.next_all.get();
273         let prev = *task.prev_all.get();
274         *task.next_all.get() = ptr::null_mut();
275         *task.prev_all.get() = ptr::null_mut();
276 
277         if !next.is_null() {
278             *(*next).prev_all.get() = prev;
279         }
280 
281         if !prev.is_null() {
282             *(*prev).next_all.get() = next;
283         } else {
284             self.head_all = next;
285         }
286         self.len -= 1;
287         task
288     }
289 }
290 
291 impl<Fut: Future> Stream for FuturesUnordered<Fut> {
292     type Item = Fut::Output;
293 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>294     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
295         -> Poll<Option<Self::Item>>
296     {
297         // Ensure `parent` is correctly set.
298         self.ready_to_run_queue.waker.register(cx.waker());
299 
300         loop {
301             // Safety: &mut self guarantees the mutual exclusion `dequeue`
302             // expects
303             let task = match unsafe { self.ready_to_run_queue.dequeue() } {
304                 Dequeue::Empty => {
305                     if self.is_empty() {
306                         // We can only consider ourselves terminated once we
307                         // have yielded a `None`
308                         self.len = TERMINATED_SENTINEL_LENGTH;
309                         return Poll::Ready(None);
310                     } else {
311                         return Poll::Pending;
312                     }
313                 }
314                 Dequeue::Inconsistent => {
315                     // At this point, it may be worth yielding the thread &
316                     // spinning a few times... but for now, just yield using the
317                     // task system.
318                     cx.waker().wake_by_ref();
319                     return Poll::Pending;
320                 }
321                 Dequeue::Data(task) => task,
322             };
323 
324             debug_assert!(task != self.ready_to_run_queue.stub());
325 
326             // Safety:
327             // - `task` is a valid pointer.
328             // - We are the only thread that accesses the `UnsafeCell` that
329             //   contains the future
330             let future = match unsafe { &mut *(*task).future.get() } {
331                 Some(future) => future,
332 
333                 // If the future has already gone away then we're just
334                 // cleaning out this task. See the comment in
335                 // `release_task` for more information, but we're basically
336                 // just taking ownership of our reference count here.
337                 None => {
338                     // This case only happens when `release_task` was called
339                     // for this task before and couldn't drop the task
340                     // because it was already enqueued in the ready to run
341                     // queue.
342 
343                     // Safety: `task` is a valid pointer
344                     let task = unsafe { Arc::from_raw(task) };
345 
346                     // Double check that the call to `release_task` really
347                     // happened. Calling it required the task to be unlinked.
348                     unsafe {
349                         debug_assert!((*task.next_all.get()).is_null());
350                         debug_assert!((*task.prev_all.get()).is_null());
351                     }
352                     continue
353                 }
354             };
355 
356             // Safety: `task` is a valid pointer
357             let task = unsafe { self.unlink(task) };
358 
359             // Unset queued flag: This must be done before polling to ensure
360             // that the future's task gets rescheduled if it sends a wake-up
361             // notification **during** the call to `poll`.
362             let prev = task.queued.swap(false, SeqCst);
363             assert!(prev);
364 
365             // We're going to need to be very careful if the `poll`
366             // method below panics. We need to (a) not leak memory and
367             // (b) ensure that we still don't have any use-after-frees. To
368             // manage this we do a few things:
369             //
370             // * A "bomb" is created which if dropped abnormally will call
371             //   `release_task`. That way we'll be sure the memory management
372             //   of the `task` is managed correctly. In particular
373             //   `release_task` will drop the future. This ensures that it is
374             //   dropped on this thread and not accidentally on a different
375             //   thread (bad).
376             // * We unlink the task from our internal queue to preemptively
377             //   assume it'll panic, in which case we'll want to discard it
378             //   regardless.
379             struct Bomb<'a, Fut> {
380                 queue: &'a mut FuturesUnordered<Fut>,
381                 task: Option<Arc<Task<Fut>>>,
382             }
383 
384             impl<Fut> Drop for Bomb<'_, Fut> {
385                 fn drop(&mut self) {
386                     if let Some(task) = self.task.take() {
387                         self.queue.release_task(task);
388                     }
389                 }
390             }
391 
392             let mut bomb = Bomb {
393                 task: Some(task),
394                 queue: &mut *self,
395             };
396 
397             // Poll the underlying future with the appropriate waker
398             // implementation. This is where a large bit of the unsafety
399             // starts to stem from internally. The waker is basically just
400             // our `Arc<Task<Fut>>` and can schedule the future for polling by
401             // enqueuing itself in the ready to run queue.
402             //
403             // Critically though `Task<Fut>` won't actually access `Fut`, the
404             // future, while it's floating around inside of wakers.
405             // These structs will basically just use `Fut` to size
406             // the internal allocation, appropriately accessing fields and
407             // deallocating the task if need be.
408             let res = {
409                 let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
410                 let mut cx = Context::from_waker(&waker);
411 
412                 // Safety: We won't move the future ever again
413                 let future = unsafe { Pin::new_unchecked(future) };
414 
415                 future.poll(&mut cx)
416             };
417 
418             match res {
419                 Poll::Pending => {
420                     let task = bomb.task.take().unwrap();
421                     bomb.queue.link(task);
422                     continue
423                 }
424                 Poll::Ready(output) => {
425                     return Poll::Ready(Some(output))
426                 }
427             }
428         }
429     }
430 
size_hint(&self) -> (usize, Option<usize>)431     fn size_hint(&self) -> (usize, Option<usize>) {
432         let len = self.len();
433         (len, Some(len))
434     }
435 }
436 
437 impl<Fut> Debug for FuturesUnordered<Fut> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result438     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439         write!(f, "FuturesUnordered {{ ... }}")
440     }
441 }
442 
443 impl<Fut> Drop for FuturesUnordered<Fut> {
drop(&mut self)444     fn drop(&mut self) {
445         // When a `FuturesUnordered` is dropped we want to drop all futures
446         // associated with it. At the same time though there may be tons of
447         // wakers flying around which contain `Task<Fut>` references
448         // inside them. We'll let those naturally get deallocated.
449         unsafe {
450             while !self.head_all.is_null() {
451                 let head = self.head_all;
452                 let task = self.unlink(head);
453                 self.release_task(task);
454             }
455         }
456 
457         // Note that at this point we could still have a bunch of tasks in the
458         // ready to run queue. None of those tasks, however, have futures
459         // associated with them so they're safe to destroy on any thread. At
460         // this point the `FuturesUnordered` struct, the owner of the one strong
461         // reference to the ready to run queue will drop the strong reference.
462         // At that point whichever thread releases the strong refcount last (be
463         // it this thread or some other thread as part of an `upgrade`) will
464         // clear out the ready to run queue and free all remaining tasks.
465         //
466         // While that freeing operation isn't guaranteed to happen here, it's
467         // guaranteed to happen "promptly" as no more "blocking work" will
468         // happen while there's a strong refcount held.
469     }
470 }
471 
472 impl<Fut: Future> FromIterator<Fut> for FuturesUnordered<Fut> {
from_iter<I>(iter: I) -> Self where I: IntoIterator<Item = Fut>,473     fn from_iter<I>(iter: I) -> Self
474     where
475         I: IntoIterator<Item = Fut>,
476     {
477         let acc = FuturesUnordered::new();
478         iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
479     }
480 }
481 
482 impl<Fut: Future> FusedStream for FuturesUnordered<Fut> {
is_terminated(&self) -> bool483     fn is_terminated(&self) -> bool {
484         self.len == TERMINATED_SENTINEL_LENGTH
485     }
486 }
487