1 //! An unbounded set of futures.
2
3 use std::cell::UnsafeCell;
4 use std::fmt::{self, Debug};
5 use std::iter::FromIterator;
6 use std::marker::PhantomData;
7 use std::mem;
8 use std::ptr;
9 use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
10 use std::sync::atomic::{AtomicPtr, AtomicBool};
11 use std::sync::{Arc, Weak};
12 use std::usize;
13
14 use {task, Stream, Future, Poll, Async};
15 use executor::{Notify, UnsafeNotify, NotifyHandle};
16 use task_impl::{self, AtomicTask};
17
18 /// An unbounded set of futures.
19 ///
20 /// This "combinator" also serves a special function in this library, providing
21 /// the ability to maintain a set of futures that and manage driving them all
22 /// to completion.
23 ///
24 /// Futures are pushed into this set and their realized values are yielded as
25 /// they are ready. This structure is optimized to manage a large number of
26 /// futures. Futures managed by `FuturesUnordered` will only be polled when they
27 /// generate notifications. This reduces the required amount of work needed to
28 /// coordinate large numbers of futures.
29 ///
30 /// When a `FuturesUnordered` is first created, it does not contain any futures.
31 /// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
32 /// returned. Futures are submitted to the set using `push`; however, the
33 /// future will **not** be polled at this point. `FuturesUnordered` will only
34 /// poll managed futures when `FuturesUnordered::poll` is called. As such, it
35 /// is important to call `poll` after pushing new futures.
36 ///
37 /// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that
38 /// the set is currently not managing any futures. A future may be submitted
39 /// to the set at a later time. At that point, a call to
40 /// `FuturesUnordered::poll` will either return the future's resolved value
41 /// **or** `Ok(Async::NotReady)` if the future has not yet completed.
42 ///
43 /// Note that you can create a ready-made `FuturesUnordered` via the
44 /// `futures_unordered` function in the `stream` module, or you can start with an
45 /// empty set with the `FuturesUnordered::new` constructor.
46 #[must_use = "streams do nothing unless polled"]
47 pub struct FuturesUnordered<F> {
48 inner: Arc<Inner<F>>,
49 len: usize,
50 head_all: *const Node<F>,
51 }
52
53 unsafe impl<T: Send> Send for FuturesUnordered<T> {}
54 unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
55
56 // FuturesUnordered is implemented using two linked lists. One which links all
57 // futures managed by a `FuturesUnordered` and one that tracks futures that have
58 // been scheduled for polling. The first linked list is not thread safe and is
59 // only accessed by the thread that owns the `FuturesUnordered` value. The
60 // second linked list is an implementation of the intrusive MPSC queue algorithm
61 // described by 1024cores.net.
62 //
63 // When a future is submitted to the set a node is allocated and inserted in
64 // both linked lists. The next call to `poll` will (eventually) see this node
65 // and call `poll` on the future.
66 //
67 // Before a managed future is polled, the current task's `Notify` is replaced
68 // with one that is aware of the specific future being run. This ensures that
69 // task notifications generated by that specific future are visible to
70 // `FuturesUnordered`. When a notification is received, the node is scheduled
71 // for polling by being inserted into the concurrent linked list.
72 //
73 // Each node uses an `AtomicUsize` to track it's state. The node state is the
74 // reference count (the number of outstanding handles to the node) as well as a
75 // flag tracking if the node is currently inserted in the atomic queue. When the
76 // future is notified, it will only insert itself into the linked list if it
77 // isn't currently inserted.
78
79 #[allow(missing_debug_implementations)]
80 struct Inner<T> {
81 // The task using `FuturesUnordered`.
82 parent: AtomicTask,
83
84 // Head/tail of the readiness queue
85 head_readiness: AtomicPtr<Node<T>>,
86 tail_readiness: UnsafeCell<*const Node<T>>,
87 stub: Arc<Node<T>>,
88 }
89
90 struct Node<T> {
91 // The future
92 future: UnsafeCell<Option<T>>,
93
94 // Next pointer for linked list tracking all active nodes
95 next_all: UnsafeCell<*const Node<T>>,
96
97 // Previous node in linked list tracking all active nodes
98 prev_all: UnsafeCell<*const Node<T>>,
99
100 // Next pointer in readiness queue
101 next_readiness: AtomicPtr<Node<T>>,
102
103 // Queue that we'll be enqueued to when notified
104 queue: Weak<Inner<T>>,
105
106 // Whether or not this node is currently in the mpsc queue.
107 queued: AtomicBool,
108 }
109
110 enum Dequeue<T> {
111 Data(*const Node<T>),
112 Empty,
113 Inconsistent,
114 }
115
116 impl<T> Default for FuturesUnordered<T> where T: Future {
default() -> Self117 fn default() -> Self {
118 FuturesUnordered::new()
119 }
120 }
121
122 impl<T> FuturesUnordered<T>
123 where T: Future,
124 {
125 /// Constructs a new, empty `FuturesUnordered`
126 ///
127 /// The returned `FuturesUnordered` does not contain any futures and, in this
128 /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`.
new() -> FuturesUnordered<T>129 pub fn new() -> FuturesUnordered<T> {
130 let stub = Arc::new(Node {
131 future: UnsafeCell::new(None),
132 next_all: UnsafeCell::new(ptr::null()),
133 prev_all: UnsafeCell::new(ptr::null()),
134 next_readiness: AtomicPtr::new(ptr::null_mut()),
135 queued: AtomicBool::new(true),
136 queue: Weak::new(),
137 });
138 let stub_ptr = &*stub as *const Node<T>;
139 let inner = Arc::new(Inner {
140 parent: AtomicTask::new(),
141 head_readiness: AtomicPtr::new(stub_ptr as *mut _),
142 tail_readiness: UnsafeCell::new(stub_ptr),
143 stub: stub,
144 });
145
146 FuturesUnordered {
147 len: 0,
148 head_all: ptr::null_mut(),
149 inner: inner,
150 }
151 }
152 }
153
154 impl<T> FuturesUnordered<T> {
155 /// Returns the number of futures contained in the set.
156 ///
157 /// This represents the total number of in-flight futures.
len(&self) -> usize158 pub fn len(&self) -> usize {
159 self.len
160 }
161
162 /// Returns `true` if the set contains no futures
is_empty(&self) -> bool163 pub fn is_empty(&self) -> bool {
164 self.len == 0
165 }
166
167 /// Push a future into the set.
168 ///
169 /// This function submits the given future to the set for managing. This
170 /// function will not call `poll` on the submitted future. The caller must
171 /// ensure that `FuturesUnordered::poll` is called in order to receive task
172 /// notifications.
push(&mut self, future: T)173 pub fn push(&mut self, future: T) {
174 let node = Arc::new(Node {
175 future: UnsafeCell::new(Some(future)),
176 next_all: UnsafeCell::new(ptr::null_mut()),
177 prev_all: UnsafeCell::new(ptr::null_mut()),
178 next_readiness: AtomicPtr::new(ptr::null_mut()),
179 queued: AtomicBool::new(true),
180 queue: Arc::downgrade(&self.inner),
181 });
182
183 // Right now our node has a strong reference count of 1. We transfer
184 // ownership of this reference count to our internal linked list
185 // and we'll reclaim ownership through the `unlink` function below.
186 let ptr = self.link(node);
187
188 // We'll need to get the future "into the system" to start tracking it,
189 // e.g. getting its unpark notifications going to us tracking which
190 // futures are ready. To do that we unconditionally enqueue it for
191 // polling here.
192 self.inner.enqueue(ptr);
193 }
194
195 /// Returns an iterator that allows modifying each future in the set.
iter_mut(&mut self) -> IterMut<T>196 pub fn iter_mut(&mut self) -> IterMut<T> {
197 IterMut {
198 node: self.head_all,
199 len: self.len,
200 _marker: PhantomData
201 }
202 }
203
release_node(&mut self, node: Arc<Node<T>>)204 fn release_node(&mut self, node: Arc<Node<T>>) {
205 // The future is done, try to reset the queued flag. This will prevent
206 // `notify` from doing any work in the future
207 let prev = node.queued.swap(true, SeqCst);
208
209 // Drop the future, even if it hasn't finished yet. This is safe
210 // because we're dropping the future on the thread that owns
211 // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
212 unsafe {
213 drop((*node.future.get()).take());
214 }
215
216 // If the queued flag was previously set then it means that this node
217 // is still in our internal mpsc queue. We then transfer ownership
218 // of our reference count to the mpsc queue, and it'll come along and
219 // free it later, noticing that the future is `None`.
220 //
221 // If, however, the queued flag was *not* set then we're safe to
222 // release our reference count on the internal node. The queued flag
223 // was set above so all future `enqueue` operations will not actually
224 // enqueue the node, so our node will never see the mpsc queue again.
225 // The node itself will be deallocated once all reference counts have
226 // been dropped by the various owning tasks elsewhere.
227 if prev {
228 mem::forget(node);
229 }
230 }
231
232 /// Insert a new node into the internal linked list.
link(&mut self, node: Arc<Node<T>>) -> *const Node<T>233 fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
234 let ptr = arc2ptr(node);
235 unsafe {
236 *(*ptr).next_all.get() = self.head_all;
237 if !self.head_all.is_null() {
238 *(*self.head_all).prev_all.get() = ptr;
239 }
240 }
241
242 self.head_all = ptr;
243 self.len += 1;
244 return ptr
245 }
246
247 /// Remove the node from the linked list tracking all nodes currently
248 /// managed by `FuturesUnordered`.
unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>>249 unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
250 let node = ptr2arc(node);
251 let next = *node.next_all.get();
252 let prev = *node.prev_all.get();
253 *node.next_all.get() = ptr::null_mut();
254 *node.prev_all.get() = ptr::null_mut();
255
256 if !next.is_null() {
257 *(*next).prev_all.get() = prev;
258 }
259
260 if !prev.is_null() {
261 *(*prev).next_all.get() = next;
262 } else {
263 self.head_all = next;
264 }
265 self.len -= 1;
266 return node
267 }
268 }
269
270 impl<T> Stream for FuturesUnordered<T>
271 where T: Future
272 {
273 type Item = T::Item;
274 type Error = T::Error;
275
poll(&mut self) -> Poll<Option<T::Item>, T::Error>276 fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
277 // Variable to determine how many times it is allowed to poll underlying
278 // futures without yielding.
279 //
280 // A single call to `poll_next` may potentially do a lot of work before
281 // yielding. This happens in particular if the underlying futures are awoken
282 // frequently but continue to return `Pending`. This is problematic if other
283 // tasks are waiting on the executor, since they do not get to run. This value
284 // caps the number of calls to `poll` on underlying futures a single call to
285 // `poll_next` is allowed to make.
286 //
287 // The value is the length of FuturesUnordered. This ensures that each
288 // future is polled only once at most per iteration.
289 //
290 // See also https://github.com/rust-lang/futures-rs/issues/2047.
291 let yield_every = self.len();
292
293 // Keep track of how many child futures we have polled,
294 // in case we want to forcibly yield.
295 let mut polled = 0;
296
297 // Ensure `parent` is correctly set.
298 self.inner.parent.register();
299
300 loop {
301 let node = match unsafe { self.inner.dequeue() } {
302 Dequeue::Empty => {
303 if self.is_empty() {
304 return Ok(Async::Ready(None));
305 } else {
306 return Ok(Async::NotReady)
307 }
308 }
309 Dequeue::Inconsistent => {
310 // At this point, it may be worth yielding the thread &
311 // spinning a few times... but for now, just yield using the
312 // task system.
313 task::current().notify();
314 return Ok(Async::NotReady);
315 }
316 Dequeue::Data(node) => node,
317 };
318
319 debug_assert!(node != self.inner.stub());
320
321 unsafe {
322 let mut future = match (*(*node).future.get()).take() {
323 Some(future) => future,
324
325 // If the future has already gone away then we're just
326 // cleaning out this node. See the comment in
327 // `release_node` for more information, but we're basically
328 // just taking ownership of our reference count here.
329 None => {
330 let node = ptr2arc(node);
331 assert!((*node.next_all.get()).is_null());
332 assert!((*node.prev_all.get()).is_null());
333 continue
334 }
335 };
336
337 // Unset queued flag... this must be done before
338 // polling. This ensures that the future gets
339 // rescheduled if it is notified **during** a call
340 // to `poll`.
341 let prev = (*node).queued.swap(false, SeqCst);
342 assert!(prev);
343
344 // We're going to need to be very careful if the `poll`
345 // function below panics. We need to (a) not leak memory and
346 // (b) ensure that we still don't have any use-after-frees. To
347 // manage this we do a few things:
348 //
349 // * This "bomb" here will call `release_node` if dropped
350 // abnormally. That way we'll be sure the memory management
351 // of the `node` is managed correctly.
352 // * The future was extracted above (taken ownership). That way
353 // if it panics we're guaranteed that the future is
354 // dropped on this thread and doesn't accidentally get
355 // dropped on a different thread (bad).
356 // * We unlink the node from our internal queue to preemptively
357 // assume it'll panic, in which case we'll want to discard it
358 // regardless.
359 struct Bomb<'a, T: 'a> {
360 queue: &'a mut FuturesUnordered<T>,
361 node: Option<Arc<Node<T>>>,
362 }
363 impl<'a, T> Drop for Bomb<'a, T> {
364 fn drop(&mut self) {
365 if let Some(node) = self.node.take() {
366 self.queue.release_node(node);
367 }
368 }
369 }
370 let mut bomb = Bomb {
371 node: Some(self.unlink(node)),
372 queue: self,
373 };
374
375 // Poll the underlying future with the appropriate `notify`
376 // implementation. This is where a large bit of the unsafety
377 // starts to stem from internally. The `notify` instance itself
378 // is basically just our `Arc<Node<T>>` and tracks the mpsc
379 // queue of ready futures.
380 //
381 // Critically though `Node<T>` won't actually access `T`, the
382 // future, while it's floating around inside of `Task`
383 // instances. These structs will basically just use `T` to size
384 // the internal allocation, appropriately accessing fields and
385 // deallocating the node if need be.
386 let res = {
387 let notify = NodeToHandle(bomb.node.as_ref().unwrap());
388 task_impl::with_notify(¬ify, 0, || {
389 future.poll()
390 })
391 };
392 polled += 1;
393
394 let ret = match res {
395 Ok(Async::NotReady) => {
396 let node = bomb.node.take().unwrap();
397 *node.future.get() = Some(future);
398 bomb.queue.link(node);
399
400 if polled == yield_every {
401 // We have polled a large number of futures in a row without yielding.
402 // To ensure we do not starve other tasks waiting on the executor,
403 // we yield here, but immediately wake ourselves up to continue.
404 task_impl::current().notify();
405 return Ok(Async::NotReady);
406 }
407 continue
408 }
409 Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
410 Err(e) => Err(e),
411 };
412 return ret
413 }
414 }
415 }
416 }
417
418 impl<T: Debug> Debug for FuturesUnordered<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result419 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
420 write!(fmt, "FuturesUnordered {{ ... }}")
421 }
422 }
423
424 impl<T> Drop for FuturesUnordered<T> {
drop(&mut self)425 fn drop(&mut self) {
426 // When a `FuturesUnordered` is dropped we want to drop all futures associated
427 // with it. At the same time though there may be tons of `Task` handles
428 // flying around which contain `Node<T>` references inside them. We'll
429 // let those naturally get deallocated when the `Task` itself goes out
430 // of scope or gets notified.
431 unsafe {
432 while !self.head_all.is_null() {
433 let head = self.head_all;
434 let node = self.unlink(head);
435 self.release_node(node);
436 }
437 }
438
439 // Note that at this point we could still have a bunch of nodes in the
440 // mpsc queue. None of those nodes, however, have futures associated
441 // with them so they're safe to destroy on any thread. At this point
442 // the `FuturesUnordered` struct, the owner of the one strong reference
443 // to `Inner<T>` will drop the strong reference. At that point
444 // whichever thread releases the strong refcount last (be it this
445 // thread or some other thread as part of an `upgrade`) will clear out
446 // the mpsc queue and free all remaining nodes.
447 //
448 // While that freeing operation isn't guaranteed to happen here, it's
449 // guaranteed to happen "promptly" as no more "blocking work" will
450 // happen while there's a strong refcount held.
451 }
452 }
453
454 impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = F>455 fn from_iter<T>(iter: T) -> Self
456 where T: IntoIterator<Item = F>
457 {
458 let mut new = FuturesUnordered::new();
459 for future in iter.into_iter() {
460 new.push(future);
461 }
462 new
463 }
464 }
465
466 #[derive(Debug)]
467 /// Mutable iterator over all futures in the unordered set.
468 pub struct IterMut<'a, F: 'a> {
469 node: *const Node<F>,
470 len: usize,
471 _marker: PhantomData<&'a mut FuturesUnordered<F>>
472 }
473
474 impl<'a, F> Iterator for IterMut<'a, F> {
475 type Item = &'a mut F;
476
next(&mut self) -> Option<&'a mut F>477 fn next(&mut self) -> Option<&'a mut F> {
478 if self.node.is_null() {
479 return None;
480 }
481 unsafe {
482 let future = (*(*self.node).future.get()).as_mut().unwrap();
483 let next = *(*self.node).next_all.get();
484 self.node = next;
485 self.len -= 1;
486 return Some(future);
487 }
488 }
489
size_hint(&self) -> (usize, Option<usize>)490 fn size_hint(&self) -> (usize, Option<usize>) {
491 (self.len, Some(self.len))
492 }
493 }
494
495 impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
496
497 impl<T> Inner<T> {
498 /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
enqueue(&self, node: *const Node<T>)499 fn enqueue(&self, node: *const Node<T>) {
500 unsafe {
501 debug_assert!((*node).queued.load(Relaxed));
502
503 // This action does not require any coordination
504 (*node).next_readiness.store(ptr::null_mut(), Relaxed);
505
506 // Note that these atomic orderings come from 1024cores
507 let node = node as *mut _;
508 let prev = self.head_readiness.swap(node, AcqRel);
509 (*prev).next_readiness.store(node, Release);
510 }
511 }
512
513 /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
514 ///
515 /// Note that this unsafe as it required mutual exclusion (only one thread
516 /// can call this) to be guaranteed elsewhere.
dequeue(&self) -> Dequeue<T>517 unsafe fn dequeue(&self) -> Dequeue<T> {
518 let mut tail = *self.tail_readiness.get();
519 let mut next = (*tail).next_readiness.load(Acquire);
520
521 if tail == self.stub() {
522 if next.is_null() {
523 return Dequeue::Empty;
524 }
525
526 *self.tail_readiness.get() = next;
527 tail = next;
528 next = (*next).next_readiness.load(Acquire);
529 }
530
531 if !next.is_null() {
532 *self.tail_readiness.get() = next;
533 debug_assert!(tail != self.stub());
534 return Dequeue::Data(tail);
535 }
536
537 if self.head_readiness.load(Acquire) as *const _ != tail {
538 return Dequeue::Inconsistent;
539 }
540
541 self.enqueue(self.stub());
542
543 next = (*tail).next_readiness.load(Acquire);
544
545 if !next.is_null() {
546 *self.tail_readiness.get() = next;
547 return Dequeue::Data(tail);
548 }
549
550 Dequeue::Inconsistent
551 }
552
stub(&self) -> *const Node<T>553 fn stub(&self) -> *const Node<T> {
554 &*self.stub
555 }
556 }
557
558 impl<T> Drop for Inner<T> {
drop(&mut self)559 fn drop(&mut self) {
560 // Once we're in the destructor for `Inner<T>` we need to clear out the
561 // mpsc queue of nodes if there's anything left in there.
562 //
563 // Note that each node has a strong reference count associated with it
564 // which is owned by the mpsc queue. All nodes should have had their
565 // futures dropped already by the `FuturesUnordered` destructor above,
566 // so we're just pulling out nodes and dropping their refcounts.
567 unsafe {
568 loop {
569 match self.dequeue() {
570 Dequeue::Empty => break,
571 Dequeue::Inconsistent => abort("inconsistent in drop"),
572 Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
573 }
574 }
575 }
576 }
577 }
578
579 #[allow(missing_debug_implementations)]
580 struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
581
582 impl<'a, T> Clone for NodeToHandle<'a, T> {
clone(&self) -> Self583 fn clone(&self) -> Self {
584 NodeToHandle(self.0)
585 }
586 }
587
588 impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
from(handle: NodeToHandle<'a, T>) -> NotifyHandle589 fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
590 unsafe {
591 let ptr = handle.0.clone();
592 let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
593 NotifyHandle::new(hide_lt(ptr))
594 }
595 }
596 }
597
598 struct ArcNode<T>(PhantomData<T>);
599
600 // We should never touch `T` on any thread other than the one owning
601 // `FuturesUnordered`, so this should be a safe operation.
602 unsafe impl<T> Send for ArcNode<T> {}
603 unsafe impl<T> Sync for ArcNode<T> {}
604
605 impl<T> Notify for ArcNode<T> {
notify(&self, _id: usize)606 fn notify(&self, _id: usize) {
607 unsafe {
608 let me: *const ArcNode<T> = self;
609 let me: *const *const ArcNode<T> = &me;
610 let me = me as *const Arc<Node<T>>;
611 Node::notify(&*me)
612 }
613 }
614 }
615
616 unsafe impl<T> UnsafeNotify for ArcNode<T> {
clone_raw(&self) -> NotifyHandle617 unsafe fn clone_raw(&self) -> NotifyHandle {
618 let me: *const ArcNode<T> = self;
619 let me: *const *const ArcNode<T> = &me;
620 let me = &*(me as *const Arc<Node<T>>);
621 NodeToHandle(me).into()
622 }
623
drop_raw(&self)624 unsafe fn drop_raw(&self) {
625 let mut me: *const ArcNode<T> = self;
626 let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
627 ptr::drop_in_place(me);
628 }
629 }
630
hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify631 unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
632 mem::transmute(p as *mut UnsafeNotify)
633 }
634
635 impl<T> Node<T> {
notify(me: &Arc<Node<T>>)636 fn notify(me: &Arc<Node<T>>) {
637 let inner = match me.queue.upgrade() {
638 Some(inner) => inner,
639 None => return,
640 };
641
642 // It's our job to notify the node that it's ready to get polled,
643 // meaning that we need to enqueue it into the readiness queue. To
644 // do this we flag that we're ready to be queued, and if successful
645 // we then do the literal queueing operation, ensuring that we're
646 // only queued once.
647 //
648 // Once the node is inserted we be sure to notify the parent task,
649 // as it'll want to come along and pick up our node now.
650 //
651 // Note that we don't change the reference count of the node here,
652 // we're just enqueueing the raw pointer. The `FuturesUnordered`
653 // implementation guarantees that if we set the `queued` flag true that
654 // there's a reference count held by the main `FuturesUnordered` queue
655 // still.
656 let prev = me.queued.swap(true, SeqCst);
657 if !prev {
658 inner.enqueue(&**me);
659 inner.parent.notify();
660 }
661 }
662 }
663
664 impl<T> Drop for Node<T> {
drop(&mut self)665 fn drop(&mut self) {
666 // Currently a `Node<T>` is sent across all threads for any lifetime,
667 // regardless of `T`. This means that for memory safety we can't
668 // actually touch `T` at any time except when we have a reference to the
669 // `FuturesUnordered` itself.
670 //
671 // Consequently it *should* be the case that we always drop futures from
672 // the `FuturesUnordered` instance, but this is a bomb in place to catch
673 // any bugs in that logic.
674 unsafe {
675 if (*self.future.get()).is_some() {
676 abort("future still here when dropping");
677 }
678 }
679 }
680 }
681
arc2ptr<T>(ptr: Arc<T>) -> *const T682 fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
683 let addr = &*ptr as *const T;
684 mem::forget(ptr);
685 return addr
686 }
687
ptr2arc<T>(ptr: *const T) -> Arc<T>688 unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
689 let anchor = mem::transmute::<usize, Arc<T>>(0x10);
690 let addr = &*anchor as *const T;
691 mem::forget(anchor);
692 let offset = addr as isize - 0x10;
693 mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
694 }
695
abort(s: &str) -> !696 fn abort(s: &str) -> ! {
697 struct DoublePanic;
698
699 impl Drop for DoublePanic {
700 fn drop(&mut self) {
701 panic!("panicking twice to abort the program");
702 }
703 }
704
705 let _bomb = DoublePanic;
706 panic!("{}", s);
707 }
708