1 //! An unbounded set of futures. 2 //! 3 //! This module is only available when the `std` or `alloc` feature of this 4 //! library is activated, and it is activated by default. 5 6 use futures_core::future::Future; 7 use futures_core::stream::{FusedStream, Stream}; 8 use futures_core::task::{Context, Poll}; 9 use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; 10 use crate::task::AtomicWaker; 11 use core::cell::UnsafeCell; 12 use core::fmt::{self, Debug}; 13 use core::iter::FromIterator; 14 use core::marker::PhantomData; 15 use core::mem; 16 use core::pin::Pin; 17 use core::ptr; 18 use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; 19 use core::sync::atomic::{AtomicPtr, AtomicBool}; 20 use alloc::sync::{Arc, Weak}; 21 22 mod abort; 23 24 mod iter; 25 pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; 26 27 mod task; 28 use self::task::Task; 29 30 mod ready_to_run_queue; 31 use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; 32 33 34 /// A set of futures which may complete in any order. 35 /// 36 /// This structure is optimized to manage a large number of futures. 37 /// Futures managed by [`FuturesUnordered`] will only be polled when they 38 /// generate wake-up notifications. This reduces the required amount of work 39 /// needed to poll large numbers of futures. 40 /// 41 /// [`FuturesUnordered`] can be filled by [`collect`](Iterator::collect)ing an 42 /// iterator of futures into a [`FuturesUnordered`], or by 43 /// [`push`](FuturesUnordered::push)ing futures onto an existing 44 /// [`FuturesUnordered`]. When new futures are added, 45 /// [`poll_next`](Stream::poll_next) must be called in order to begin receiving 46 /// wake-ups for new futures. 47 /// 48 /// Note that you can create a ready-made [`FuturesUnordered`] via the 49 /// [`collect`](Iterator::collect) method, or you can start with an empty set 50 /// with the [`FuturesUnordered::new`] constructor. 51 /// 52 /// This type is only available when the `std` or `alloc` feature of this 53 /// library is activated, and it is activated by default. 54 #[must_use = "streams do nothing unless polled"] 55 pub struct FuturesUnordered<Fut> { 56 ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>, 57 head_all: AtomicPtr<Task<Fut>>, 58 is_terminated: AtomicBool, 59 } 60 61 unsafe impl<Fut: Send> Send for FuturesUnordered<Fut> {} 62 unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {} 63 impl<Fut> Unpin for FuturesUnordered<Fut> {} 64 65 impl Spawn for FuturesUnordered<FutureObj<'_, ()>> { spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError>66 fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) 67 -> Result<(), SpawnError> 68 { 69 self.push(future_obj); 70 Ok(()) 71 } 72 } 73 74 impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> { spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>75 fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) 76 -> Result<(), SpawnError> 77 { 78 self.push(future_obj); 79 Ok(()) 80 } 81 } 82 83 // FuturesUnordered is implemented using two linked lists. One which links all 84 // futures managed by a `FuturesUnordered` and one that tracks futures that have 85 // been scheduled for polling. The first linked list allows for thread safe 86 // insertion of nodes at the head as well as forward iteration, but is otherwise 87 // not thread safe and is only accessed by the thread that owns the 88 // `FuturesUnordered` value for any other operations. The second linked list is 89 // an implementation of the intrusive MPSC queue algorithm described by 90 // 1024cores.net. 91 // 92 // When a future is submitted to the set, a task is allocated and inserted in 93 // both linked lists. The next call to `poll_next` will (eventually) see this 94 // task and call `poll` on the future. 95 // 96 // Before a managed future is polled, the current context's waker is replaced 97 // with one that is aware of the specific future being run. This ensures that 98 // wake-up notifications generated by that specific future are visible to 99 // `FuturesUnordered`. When a wake-up notification is received, the task is 100 // inserted into the ready to run queue, so that its future can be polled later. 101 // 102 // Each task is wrapped in an `Arc` and thereby atomically reference counted. 103 // Also, each task contains an `AtomicBool` which acts as a flag that indicates 104 // whether the task is currently inserted in the atomic queue. When a wake-up 105 // notifiaction is received, the task will only be inserted into the ready to 106 // run queue if it isn't inserted already. 107 108 impl<Fut> Default for FuturesUnordered<Fut> { default() -> Self109 fn default() -> Self { 110 Self::new() 111 } 112 } 113 114 impl<Fut> FuturesUnordered<Fut> { 115 /// Constructs a new, empty [`FuturesUnordered`]. 116 /// 117 /// The returned [`FuturesUnordered`] does not contain any futures. 118 /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will 119 /// return [`Poll::Ready(None)`](Poll::Ready). new() -> Self120 pub fn new() -> Self { 121 let stub = Arc::new(Task { 122 future: UnsafeCell::new(None), 123 next_all: AtomicPtr::new(ptr::null_mut()), 124 prev_all: UnsafeCell::new(ptr::null()), 125 len_all: UnsafeCell::new(0), 126 next_ready_to_run: AtomicPtr::new(ptr::null_mut()), 127 queued: AtomicBool::new(true), 128 ready_to_run_queue: Weak::new(), 129 }); 130 let stub_ptr = &*stub as *const Task<Fut>; 131 let ready_to_run_queue = Arc::new(ReadyToRunQueue { 132 waker: AtomicWaker::new(), 133 head: AtomicPtr::new(stub_ptr as *mut _), 134 tail: UnsafeCell::new(stub_ptr), 135 stub, 136 }); 137 138 Self { 139 head_all: AtomicPtr::new(ptr::null_mut()), 140 ready_to_run_queue, 141 is_terminated: AtomicBool::new(false), 142 } 143 } 144 145 /// Returns the number of futures contained in the set. 146 /// 147 /// This represents the total number of in-flight futures. len(&self) -> usize148 pub fn len(&self) -> usize { 149 let (_, len) = self.atomic_load_head_and_len_all(); 150 len 151 } 152 153 /// Returns `true` if the set contains no futures. is_empty(&self) -> bool154 pub fn is_empty(&self) -> bool { 155 // Relaxed ordering can be used here since we don't need to read from 156 // the head pointer, only check whether it is null. 157 self.head_all.load(Relaxed).is_null() 158 } 159 160 /// Push a future into the set. 161 /// 162 /// This method adds the given future to the set. This method will not 163 /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must 164 /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called 165 /// in order to receive wake-up notifications for the given future. push(&self, future: Fut)166 pub fn push(&self, future: Fut) { 167 let task = Arc::new(Task { 168 future: UnsafeCell::new(Some(future)), 169 next_all: AtomicPtr::new(self.pending_next_all()), 170 prev_all: UnsafeCell::new(ptr::null_mut()), 171 len_all: UnsafeCell::new(0), 172 next_ready_to_run: AtomicPtr::new(ptr::null_mut()), 173 queued: AtomicBool::new(true), 174 ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), 175 }); 176 177 // Reset the `is_terminated` flag if we've previously marked ourselves 178 // as terminated. 179 self.is_terminated.store(false, Relaxed); 180 181 // Right now our task has a strong reference count of 1. We transfer 182 // ownership of this reference count to our internal linked list 183 // and we'll reclaim ownership through the `unlink` method below. 184 let ptr = self.link(task); 185 186 // We'll need to get the future "into the system" to start tracking it, 187 // e.g. getting its wake-up notifications going to us tracking which 188 // futures are ready. To do that we unconditionally enqueue it for 189 // polling here. 190 self.ready_to_run_queue.enqueue(ptr); 191 } 192 193 /// Returns an iterator that allows inspecting each future in the set. iter(&self) -> Iter<'_, Fut> where Fut: Unpin194 pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin { 195 Iter(Pin::new(self).iter_pin_ref()) 196 } 197 198 /// Returns an iterator that allows inspecting each future in the set. iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut>199 fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { 200 let (task, len) = self.atomic_load_head_and_len_all(); 201 202 IterPinRef { 203 task, 204 len, 205 pending_next_all: self.pending_next_all(), 206 _marker: PhantomData, 207 } 208 } 209 210 /// Returns an iterator that allows modifying each future in the set. iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin211 pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin { 212 IterMut(Pin::new(self).iter_pin_mut()) 213 } 214 215 /// Returns an iterator that allows modifying each future in the set. iter_pin_mut(mut self: Pin<&mut Self>) -> IterPinMut<'_, Fut>216 pub fn iter_pin_mut(mut self: Pin<&mut Self>) -> IterPinMut<'_, Fut> { 217 // `head_all` can be accessed directly and we don't need to spin on 218 // `Task::next_all` since we have exclusive access to the set. 219 let task = *self.head_all.get_mut(); 220 let len = if task.is_null() { 221 0 222 } else { 223 unsafe { 224 *(*task).len_all.get() 225 } 226 }; 227 228 IterPinMut { 229 task, 230 len, 231 _marker: PhantomData 232 } 233 } 234 235 /// Returns the current head node and number of futures in the list of all 236 /// futures within a context where access is shared with other threads 237 /// (mostly for use with the `len` and `iter_pin_ref` methods). atomic_load_head_and_len_all(&self) -> (*const Task<Fut>, usize)238 fn atomic_load_head_and_len_all(&self) -> (*const Task<Fut>, usize) { 239 let task = self.head_all.load(Acquire); 240 let len = if task.is_null() { 241 0 242 } else { 243 unsafe { 244 (*task).spin_next_all(self.pending_next_all(), Acquire); 245 *(*task).len_all.get() 246 } 247 }; 248 249 (task, len) 250 } 251 252 /// Releases the task. It destorys the future inside and either drops 253 /// the `Arc<Task>` or transfers ownership to the ready to run queue. 254 /// The task this method is called on must have been unlinked before. release_task(&mut self, task: Arc<Task<Fut>>)255 fn release_task(&mut self, task: Arc<Task<Fut>>) { 256 // `release_task` must only be called on unlinked tasks 257 debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all()); 258 unsafe { 259 debug_assert!((*task.prev_all.get()).is_null()); 260 } 261 262 // The future is done, try to reset the queued flag. This will prevent 263 // `wake` from doing any work in the future 264 let prev = task.queued.swap(true, SeqCst); 265 266 // Drop the future, even if it hasn't finished yet. This is safe 267 // because we're dropping the future on the thread that owns 268 // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and 269 // such. 270 unsafe { 271 // Set to `None` rather than `take()`ing to prevent moving the 272 // future. 273 *task.future.get() = None; 274 } 275 276 // If the queued flag was previously set, then it means that this task 277 // is still in our internal ready to run queue. We then transfer 278 // ownership of our reference count to the ready to run queue, and it'll 279 // come along and free it later, noticing that the future is `None`. 280 // 281 // If, however, the queued flag was *not* set then we're safe to 282 // release our reference count on the task. The queued flag was set 283 // above so all future `enqueue` operations will not actually 284 // enqueue the task, so our task will never see the ready to run queue 285 // again. The task itself will be deallocated once all reference counts 286 // have been dropped elsewhere by the various wakers that contain it. 287 if prev { 288 mem::forget(task); 289 } 290 } 291 292 /// Insert a new task into the internal linked list. link(&self, task: Arc<Task<Fut>>) -> *const Task<Fut>293 fn link(&self, task: Arc<Task<Fut>>) -> *const Task<Fut> { 294 // `next_all` should already be reset to the pending state before this 295 // function is called. 296 debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all()); 297 let ptr = Arc::into_raw(task); 298 299 // Atomically swap out the old head node to get the node that should be 300 // assigned to `next_all`. 301 let next = self.head_all.swap(ptr as *mut _, AcqRel); 302 303 unsafe { 304 // Store the new list length in the new node. 305 let new_len = if next.is_null() { 306 1 307 } else { 308 // Make sure `next_all` has been written to signal that it is 309 // safe to read `len_all`. 310 (*next).spin_next_all(self.pending_next_all(), Acquire); 311 *(*next).len_all.get() + 1 312 }; 313 *(*ptr).len_all.get() = new_len; 314 315 // Write the old head as the next node pointer, signaling to other 316 // threads that `len_all` and `next_all` are ready to read. 317 (*ptr).next_all.store(next, Release); 318 319 // `prev_all` updates don't need to be synchronized, as the field is 320 // only ever used after exclusive access has been acquired. 321 if !next.is_null() { 322 *(*next).prev_all.get() = ptr; 323 } 324 } 325 326 ptr 327 } 328 329 /// Remove the task from the linked list tracking all tasks currently 330 /// managed by `FuturesUnordered`. 331 /// This method is unsafe because it has be guaranteed that `task` is a 332 /// valid pointer. unlink(&mut self, task: *const Task<Fut>) -> Arc<Task<Fut>>333 unsafe fn unlink(&mut self, task: *const Task<Fut>) -> Arc<Task<Fut>> { 334 // Compute the new list length now in case we're removing the head node 335 // and won't be able to retrieve the correct length later. 336 let head = *self.head_all.get_mut(); 337 debug_assert!(!head.is_null()); 338 let new_len = *(*head).len_all.get() - 1; 339 340 let task = Arc::from_raw(task); 341 let next = task.next_all.load(Relaxed); 342 let prev = *task.prev_all.get(); 343 task.next_all.store(self.pending_next_all(), Relaxed); 344 *task.prev_all.get() = ptr::null_mut(); 345 346 if !next.is_null() { 347 *(*next).prev_all.get() = prev; 348 } 349 350 if !prev.is_null() { 351 (*prev).next_all.store(next, Relaxed); 352 } else { 353 *self.head_all.get_mut() = next; 354 } 355 356 // Store the new list length in the head node. 357 let head = *self.head_all.get_mut(); 358 if !head.is_null() { 359 *(*head).len_all.get() = new_len; 360 } 361 362 task 363 } 364 365 /// Returns the reserved value for `Task::next_all` to indicate a pending 366 /// assignment from the thread that inserted the task. 367 /// 368 /// `FuturesUnordered::link` needs to update `Task` pointers in an order 369 /// that ensures any iterators created on other threads can correctly 370 /// traverse the entire `Task` list using the chain of `next_all` pointers. 371 /// This could be solved with a compare-exchange loop that stores the 372 /// current `head_all` in `next_all` and swaps out `head_all` with the new 373 /// `Task` pointer if the head hasn't already changed. Under heavy thread 374 /// contention, this compare-exchange loop could become costly. 375 /// 376 /// An alternative is to initialize `next_all` to a reserved pending state 377 /// first, perform an atomic swap on `head_all`, and finally update 378 /// `next_all` with the old head node. Iterators will then either see the 379 /// pending state value or the correct next node pointer, and can reload 380 /// `next_all` as needed until the correct value is loaded. The number of 381 /// retries needed (if any) would be small and will always be finite, so 382 /// this should generally perform better than the compare-exchange loop. 383 /// 384 /// A valid `Task` pointer in the `head_all` list is guaranteed to never be 385 /// this value, so it is safe to use as a reserved value until the correct 386 /// value can be written. pending_next_all(&self) -> *mut Task<Fut>387 fn pending_next_all(&self) -> *mut Task<Fut> { 388 // The `ReadyToRunQueue` stub is never inserted into the `head_all` 389 // list, and its pointer value will remain valid for the lifetime of 390 // this `FuturesUnordered`, so we can make use of its value here. 391 &*self.ready_to_run_queue.stub as *const _ as *mut _ 392 } 393 } 394 395 impl<Fut: Future> Stream for FuturesUnordered<Fut> { 396 type Item = Fut::Output; 397 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>398 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) 399 -> Poll<Option<Self::Item>> 400 { 401 // Variable to determine how many times it is allowed to poll underlying 402 // futures without yielding. 403 // 404 // A single call to `poll_next` may potentially do a lot of work before 405 // yielding. This happens in particular if the underlying futures are awoken 406 // frequently but continue to return `Pending`. This is problematic if other 407 // tasks are waiting on the executor, since they do not get to run. This value 408 // caps the number of calls to `poll` on underlying futures a single call to 409 // `poll_next` is allowed to make. 410 // 411 // The value is the length of FuturesUnordered. This ensures that each 412 // future is polled only once at most per iteration. 413 // 414 // See also https://github.com/rust-lang/futures-rs/issues/2047. 415 let yield_every = self.len(); 416 417 // Keep track of how many child futures we have polled, 418 // in case we want to forcibly yield. 419 let mut polled = 0; 420 421 // Ensure `parent` is correctly set. 422 self.ready_to_run_queue.waker.register(cx.waker()); 423 424 loop { 425 // Safety: &mut self guarantees the mutual exclusion `dequeue` 426 // expects 427 let task = match unsafe { self.ready_to_run_queue.dequeue() } { 428 Dequeue::Empty => { 429 if self.is_empty() { 430 // We can only consider ourselves terminated once we 431 // have yielded a `None` 432 *self.is_terminated.get_mut() = true; 433 return Poll::Ready(None); 434 } else { 435 return Poll::Pending; 436 } 437 } 438 Dequeue::Inconsistent => { 439 // At this point, it may be worth yielding the thread & 440 // spinning a few times... but for now, just yield using the 441 // task system. 442 cx.waker().wake_by_ref(); 443 return Poll::Pending; 444 } 445 Dequeue::Data(task) => task, 446 }; 447 448 debug_assert!(task != self.ready_to_run_queue.stub()); 449 450 // Safety: 451 // - `task` is a valid pointer. 452 // - We are the only thread that accesses the `UnsafeCell` that 453 // contains the future 454 let future = match unsafe { &mut *(*task).future.get() } { 455 Some(future) => future, 456 457 // If the future has already gone away then we're just 458 // cleaning out this task. See the comment in 459 // `release_task` for more information, but we're basically 460 // just taking ownership of our reference count here. 461 None => { 462 // This case only happens when `release_task` was called 463 // for this task before and couldn't drop the task 464 // because it was already enqueued in the ready to run 465 // queue. 466 467 // Safety: `task` is a valid pointer 468 let task = unsafe { Arc::from_raw(task) }; 469 470 // Double check that the call to `release_task` really 471 // happened. Calling it required the task to be unlinked. 472 debug_assert_eq!( 473 task.next_all.load(Relaxed), 474 self.pending_next_all() 475 ); 476 unsafe { 477 debug_assert!((*task.prev_all.get()).is_null()); 478 } 479 continue 480 } 481 }; 482 483 // Safety: `task` is a valid pointer 484 let task = unsafe { self.unlink(task) }; 485 486 // Unset queued flag: This must be done before polling to ensure 487 // that the future's task gets rescheduled if it sends a wake-up 488 // notification **during** the call to `poll`. 489 let prev = task.queued.swap(false, SeqCst); 490 assert!(prev); 491 492 // We're going to need to be very careful if the `poll` 493 // method below panics. We need to (a) not leak memory and 494 // (b) ensure that we still don't have any use-after-frees. To 495 // manage this we do a few things: 496 // 497 // * A "bomb" is created which if dropped abnormally will call 498 // `release_task`. That way we'll be sure the memory management 499 // of the `task` is managed correctly. In particular 500 // `release_task` will drop the future. This ensures that it is 501 // dropped on this thread and not accidentally on a different 502 // thread (bad). 503 // * We unlink the task from our internal queue to preemptively 504 // assume it'll panic, in which case we'll want to discard it 505 // regardless. 506 struct Bomb<'a, Fut> { 507 queue: &'a mut FuturesUnordered<Fut>, 508 task: Option<Arc<Task<Fut>>>, 509 } 510 511 impl<Fut> Drop for Bomb<'_, Fut> { 512 fn drop(&mut self) { 513 if let Some(task) = self.task.take() { 514 self.queue.release_task(task); 515 } 516 } 517 } 518 519 let mut bomb = Bomb { 520 task: Some(task), 521 queue: &mut *self, 522 }; 523 524 // Poll the underlying future with the appropriate waker 525 // implementation. This is where a large bit of the unsafety 526 // starts to stem from internally. The waker is basically just 527 // our `Arc<Task<Fut>>` and can schedule the future for polling by 528 // enqueuing itself in the ready to run queue. 529 // 530 // Critically though `Task<Fut>` won't actually access `Fut`, the 531 // future, while it's floating around inside of wakers. 532 // These structs will basically just use `Fut` to size 533 // the internal allocation, appropriately accessing fields and 534 // deallocating the task if need be. 535 let res = { 536 let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); 537 let mut cx = Context::from_waker(&waker); 538 539 // Safety: We won't move the future ever again 540 let future = unsafe { Pin::new_unchecked(future) }; 541 542 future.poll(&mut cx) 543 }; 544 polled += 1; 545 546 match res { 547 Poll::Pending => { 548 let task = bomb.task.take().unwrap(); 549 bomb.queue.link(task); 550 551 if polled == yield_every { 552 // We have polled a large number of futures in a row without yielding. 553 // To ensure we do not starve other tasks waiting on the executor, 554 // we yield here, but immediately wake ourselves up to continue. 555 cx.waker().wake_by_ref(); 556 return Poll::Pending; 557 } 558 continue 559 } 560 Poll::Ready(output) => { 561 return Poll::Ready(Some(output)) 562 } 563 } 564 } 565 } 566 size_hint(&self) -> (usize, Option<usize>)567 fn size_hint(&self) -> (usize, Option<usize>) { 568 let len = self.len(); 569 (len, Some(len)) 570 } 571 } 572 573 impl<Fut> Debug for FuturesUnordered<Fut> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 575 write!(f, "FuturesUnordered {{ ... }}") 576 } 577 } 578 579 impl<Fut> Drop for FuturesUnordered<Fut> { drop(&mut self)580 fn drop(&mut self) { 581 // When a `FuturesUnordered` is dropped we want to drop all futures 582 // associated with it. At the same time though there may be tons of 583 // wakers flying around which contain `Task<Fut>` references 584 // inside them. We'll let those naturally get deallocated. 585 unsafe { 586 while !self.head_all.get_mut().is_null() { 587 let head = *self.head_all.get_mut(); 588 let task = self.unlink(head); 589 self.release_task(task); 590 } 591 } 592 593 // Note that at this point we could still have a bunch of tasks in the 594 // ready to run queue. None of those tasks, however, have futures 595 // associated with them so they're safe to destroy on any thread. At 596 // this point the `FuturesUnordered` struct, the owner of the one strong 597 // reference to the ready to run queue will drop the strong reference. 598 // At that point whichever thread releases the strong refcount last (be 599 // it this thread or some other thread as part of an `upgrade`) will 600 // clear out the ready to run queue and free all remaining tasks. 601 // 602 // While that freeing operation isn't guaranteed to happen here, it's 603 // guaranteed to happen "promptly" as no more "blocking work" will 604 // happen while there's a strong refcount held. 605 } 606 } 607 608 impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> { from_iter<I>(iter: I) -> Self where I: IntoIterator<Item = Fut>,609 fn from_iter<I>(iter: I) -> Self 610 where 611 I: IntoIterator<Item = Fut>, 612 { 613 let acc = Self::new(); 614 iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) 615 } 616 } 617 618 impl<Fut: Future> FusedStream for FuturesUnordered<Fut> { is_terminated(&self) -> bool619 fn is_terminated(&self) -> bool { 620 self.is_terminated.load(Relaxed) 621 } 622 } 623 624 impl<Fut> Extend<Fut> for FuturesUnordered<Fut> { extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,625 fn extend<I>(&mut self, iter: I) 626 where 627 I: IntoIterator<Item = Fut>, 628 { 629 for item in iter { 630 self.push(item); 631 } 632 } 633 } 634