1 //! An unbounded set of futures.
2 
3 use std::cell::UnsafeCell;
4 use std::fmt::{self, Debug};
5 use std::iter::FromIterator;
6 use std::marker::PhantomData;
7 use std::mem;
8 use std::ptr;
9 use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
10 use std::sync::atomic::{AtomicPtr, AtomicBool};
11 use std::sync::{Arc, Weak};
12 use std::usize;
13 
14 use {task, Stream, Future, Poll, Async};
15 use executor::{Notify, UnsafeNotify, NotifyHandle};
16 use task_impl::{self, AtomicTask};
17 
18 /// An unbounded set of futures.
19 ///
20 /// This "combinator" also serves a special function in this library, providing
21 /// the ability to maintain a set of futures that and manage driving them all
22 /// to completion.
23 ///
24 /// Futures are pushed into this set and their realized values are yielded as
25 /// they are ready. This structure is optimized to manage a large number of
26 /// futures. Futures managed by `FuturesUnordered` will only be polled when they
27 /// generate notifications. This reduces the required amount of work needed to
28 /// coordinate large numbers of futures.
29 ///
30 /// When a `FuturesUnordered` is first created, it does not contain any futures.
31 /// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
32 /// returned. Futures are submitted to the set using `push`; however, the
33 /// future will **not** be polled at this point. `FuturesUnordered` will only
34 /// poll managed futures when `FuturesUnordered::poll` is called. As such, it
35 /// is important to call `poll` after pushing new futures.
36 ///
37 /// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that
38 /// the set is currently not managing any futures. A future may be submitted
39 /// to the set at a later time. At that point, a call to
40 /// `FuturesUnordered::poll` will either return the future's resolved value
41 /// **or** `Ok(Async::NotReady)` if the future has not yet completed.
42 ///
43 /// Note that you can create a ready-made `FuturesUnordered` via the
44 /// `futures_unordered` function in the `stream` module, or you can start with an
45 /// empty set with the `FuturesUnordered::new` constructor.
46 #[must_use = "streams do nothing unless polled"]
47 pub struct FuturesUnordered<F> {
48     inner: Arc<Inner<F>>,
49     len: usize,
50     head_all: *const Node<F>,
51 }
52 
53 unsafe impl<T: Send> Send for FuturesUnordered<T> {}
54 unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
55 
56 // FuturesUnordered is implemented using two linked lists. One which links all
57 // futures managed by a `FuturesUnordered` and one that tracks futures that have
58 // been scheduled for polling. The first linked list is not thread safe and is
59 // only accessed by the thread that owns the `FuturesUnordered` value. The
60 // second linked list is an implementation of the intrusive MPSC queue algorithm
61 // described by 1024cores.net.
62 //
63 // When a future is submitted to the set a node is allocated and inserted in
64 // both linked lists. The next call to `poll` will (eventually) see this node
65 // and call `poll` on the future.
66 //
67 // Before a managed future is polled, the current task's `Notify` is replaced
68 // with one that is aware of the specific future being run. This ensures that
69 // task notifications generated by that specific future are visible to
70 // `FuturesUnordered`. When a notification is received, the node is scheduled
71 // for polling by being inserted into the concurrent linked list.
72 //
73 // Each node uses an `AtomicUsize` to track it's state. The node state is the
74 // reference count (the number of outstanding handles to the node) as well as a
75 // flag tracking if the node is currently inserted in the atomic queue. When the
76 // future is notified, it will only insert itself into the linked list if it
77 // isn't currently inserted.
78 
79 #[allow(missing_debug_implementations)]
80 struct Inner<T> {
81     // The task using `FuturesUnordered`.
82     parent: AtomicTask,
83 
84     // Head/tail of the readiness queue
85     head_readiness: AtomicPtr<Node<T>>,
86     tail_readiness: UnsafeCell<*const Node<T>>,
87     stub: Arc<Node<T>>,
88 }
89 
90 struct Node<T> {
91     // The future
92     future: UnsafeCell<Option<T>>,
93 
94     // Next pointer for linked list tracking all active nodes
95     next_all: UnsafeCell<*const Node<T>>,
96 
97     // Previous node in linked list tracking all active nodes
98     prev_all: UnsafeCell<*const Node<T>>,
99 
100     // Next pointer in readiness queue
101     next_readiness: AtomicPtr<Node<T>>,
102 
103     // Queue that we'll be enqueued to when notified
104     queue: Weak<Inner<T>>,
105 
106     // Whether or not this node is currently in the mpsc queue.
107     queued: AtomicBool,
108 }
109 
110 enum Dequeue<T> {
111     Data(*const Node<T>),
112     Empty,
113     Inconsistent,
114 }
115 
116 impl<T> Default for FuturesUnordered<T> where T: Future {
default() -> Self117     fn default() -> Self {
118         FuturesUnordered::new()
119     }
120 }
121 
122 impl<T> FuturesUnordered<T>
123     where T: Future,
124 {
125     /// Constructs a new, empty `FuturesUnordered`
126     ///
127     /// The returned `FuturesUnordered` does not contain any futures and, in this
128     /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`.
new() -> FuturesUnordered<T>129     pub fn new() -> FuturesUnordered<T> {
130         let stub = Arc::new(Node {
131             future: UnsafeCell::new(None),
132             next_all: UnsafeCell::new(ptr::null()),
133             prev_all: UnsafeCell::new(ptr::null()),
134             next_readiness: AtomicPtr::new(ptr::null_mut()),
135             queued: AtomicBool::new(true),
136             queue: Weak::new(),
137         });
138         let stub_ptr = &*stub as *const Node<T>;
139         let inner = Arc::new(Inner {
140             parent: AtomicTask::new(),
141             head_readiness: AtomicPtr::new(stub_ptr as *mut _),
142             tail_readiness: UnsafeCell::new(stub_ptr),
143             stub: stub,
144         });
145 
146         FuturesUnordered {
147             len: 0,
148             head_all: ptr::null_mut(),
149             inner: inner,
150         }
151     }
152 }
153 
154 impl<T> FuturesUnordered<T> {
155     /// Returns the number of futures contained in the set.
156     ///
157     /// This represents the total number of in-flight futures.
len(&self) -> usize158     pub fn len(&self) -> usize {
159         self.len
160     }
161 
162     /// Returns `true` if the set contains no futures
is_empty(&self) -> bool163     pub fn is_empty(&self) -> bool {
164         self.len == 0
165     }
166 
167     /// Push a future into the set.
168     ///
169     /// This function submits the given future to the set for managing. This
170     /// function will not call `poll` on the submitted future. The caller must
171     /// ensure that `FuturesUnordered::poll` is called in order to receive task
172     /// notifications.
push(&mut self, future: T)173     pub fn push(&mut self, future: T) {
174         let node = Arc::new(Node {
175             future: UnsafeCell::new(Some(future)),
176             next_all: UnsafeCell::new(ptr::null_mut()),
177             prev_all: UnsafeCell::new(ptr::null_mut()),
178             next_readiness: AtomicPtr::new(ptr::null_mut()),
179             queued: AtomicBool::new(true),
180             queue: Arc::downgrade(&self.inner),
181         });
182 
183         // Right now our node has a strong reference count of 1. We transfer
184         // ownership of this reference count to our internal linked list
185         // and we'll reclaim ownership through the `unlink` function below.
186         let ptr = self.link(node);
187 
188         // We'll need to get the future "into the system" to start tracking it,
189         // e.g. getting its unpark notifications going to us tracking which
190         // futures are ready. To do that we unconditionally enqueue it for
191         // polling here.
192         self.inner.enqueue(ptr);
193     }
194 
195     /// Returns an iterator that allows modifying each future in the set.
iter_mut(&mut self) -> IterMut<T>196     pub fn iter_mut(&mut self) -> IterMut<T> {
197         IterMut {
198             node: self.head_all,
199             len: self.len,
200             _marker: PhantomData
201         }
202     }
203 
release_node(&mut self, node: Arc<Node<T>>)204     fn release_node(&mut self, node: Arc<Node<T>>) {
205         // The future is done, try to reset the queued flag. This will prevent
206         // `notify` from doing any work in the future
207         let prev = node.queued.swap(true, SeqCst);
208 
209         // Drop the future, even if it hasn't finished yet. This is safe
210         // because we're dropping the future on the thread that owns
211         // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
212         unsafe {
213             drop((*node.future.get()).take());
214         }
215 
216         // If the queued flag was previously set then it means that this node
217         // is still in our internal mpsc queue. We then transfer ownership
218         // of our reference count to the mpsc queue, and it'll come along and
219         // free it later, noticing that the future is `None`.
220         //
221         // If, however, the queued flag was *not* set then we're safe to
222         // release our reference count on the internal node. The queued flag
223         // was set above so all future `enqueue` operations will not actually
224         // enqueue the node, so our node will never see the mpsc queue again.
225         // The node itself will be deallocated once all reference counts have
226         // been dropped by the various owning tasks elsewhere.
227         if prev {
228             mem::forget(node);
229         }
230     }
231 
232     /// Insert a new node into the internal linked list.
link(&mut self, node: Arc<Node<T>>) -> *const Node<T>233     fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
234         let ptr = arc2ptr(node);
235         unsafe {
236             *(*ptr).next_all.get() = self.head_all;
237             if !self.head_all.is_null() {
238                 *(*self.head_all).prev_all.get() = ptr;
239             }
240         }
241 
242         self.head_all = ptr;
243         self.len += 1;
244         return ptr
245     }
246 
247     /// Remove the node from the linked list tracking all nodes currently
248     /// managed by `FuturesUnordered`.
unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>>249     unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
250         let node = ptr2arc(node);
251         let next = *node.next_all.get();
252         let prev = *node.prev_all.get();
253         *node.next_all.get() = ptr::null_mut();
254         *node.prev_all.get() = ptr::null_mut();
255 
256         if !next.is_null() {
257             *(*next).prev_all.get() = prev;
258         }
259 
260         if !prev.is_null() {
261             *(*prev).next_all.get() = next;
262         } else {
263             self.head_all = next;
264         }
265         self.len -= 1;
266         return node
267     }
268 }
269 
270 impl<T> Stream for FuturesUnordered<T>
271     where T: Future
272 {
273     type Item = T::Item;
274     type Error = T::Error;
275 
poll(&mut self) -> Poll<Option<T::Item>, T::Error>276     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
277         // Variable to determine how many times it is allowed to poll underlying
278         // futures without yielding.
279         //
280         // A single call to `poll_next` may potentially do a lot of work before
281         // yielding. This happens in particular if the underlying futures are awoken
282         // frequently but continue to return `Pending`. This is problematic if other
283         // tasks are waiting on the executor, since they do not get to run. This value
284         // caps the number of calls to `poll` on underlying futures a single call to
285         // `poll_next` is allowed to make.
286         //
287         // The value is the length of FuturesUnordered. This ensures that each
288         // future is polled only once at most per iteration.
289         //
290         // See also https://github.com/rust-lang/futures-rs/issues/2047.
291         let yield_every = self.len();
292 
293         // Keep track of how many child futures we have polled,
294         // in case we want to forcibly yield.
295         let mut polled = 0;
296 
297         // Ensure `parent` is correctly set.
298         self.inner.parent.register();
299 
300         loop {
301             let node = match unsafe { self.inner.dequeue() } {
302                 Dequeue::Empty => {
303                     if self.is_empty() {
304                         return Ok(Async::Ready(None));
305                     } else {
306                         return Ok(Async::NotReady)
307                     }
308                 }
309                 Dequeue::Inconsistent => {
310                     // At this point, it may be worth yielding the thread &
311                     // spinning a few times... but for now, just yield using the
312                     // task system.
313                     task::current().notify();
314                     return Ok(Async::NotReady);
315                 }
316                 Dequeue::Data(node) => node,
317             };
318 
319             debug_assert!(node != self.inner.stub());
320 
321             unsafe {
322                 let mut future = match (*(*node).future.get()).take() {
323                     Some(future) => future,
324 
325                     // If the future has already gone away then we're just
326                     // cleaning out this node. See the comment in
327                     // `release_node` for more information, but we're basically
328                     // just taking ownership of our reference count here.
329                     None => {
330                         let node = ptr2arc(node);
331                         assert!((*node.next_all.get()).is_null());
332                         assert!((*node.prev_all.get()).is_null());
333                         continue
334                     }
335                 };
336 
337                 // Unset queued flag... this must be done before
338                 // polling. This ensures that the future gets
339                 // rescheduled if it is notified **during** a call
340                 // to `poll`.
341                 let prev = (*node).queued.swap(false, SeqCst);
342                 assert!(prev);
343 
344                 // We're going to need to be very careful if the `poll`
345                 // function below panics. We need to (a) not leak memory and
346                 // (b) ensure that we still don't have any use-after-frees. To
347                 // manage this we do a few things:
348                 //
349                 // * This "bomb" here will call `release_node` if dropped
350                 //   abnormally. That way we'll be sure the memory management
351                 //   of the `node` is managed correctly.
352                 // * The future was extracted above (taken ownership). That way
353                 //   if it panics we're guaranteed that the future is
354                 //   dropped on this thread and doesn't accidentally get
355                 //   dropped on a different thread (bad).
356                 // * We unlink the node from our internal queue to preemptively
357                 //   assume it'll panic, in which case we'll want to discard it
358                 //   regardless.
359                 struct Bomb<'a, T: 'a> {
360                     queue: &'a mut FuturesUnordered<T>,
361                     node: Option<Arc<Node<T>>>,
362                 }
363                 impl<'a, T> Drop for Bomb<'a, T> {
364                     fn drop(&mut self) {
365                         if let Some(node) = self.node.take() {
366                             self.queue.release_node(node);
367                         }
368                     }
369                 }
370                 let mut bomb = Bomb {
371                     node: Some(self.unlink(node)),
372                     queue: self,
373                 };
374 
375                 // Poll the underlying future with the appropriate `notify`
376                 // implementation. This is where a large bit of the unsafety
377                 // starts to stem from internally. The `notify` instance itself
378                 // is basically just our `Arc<Node<T>>` and tracks the mpsc
379                 // queue of ready futures.
380                 //
381                 // Critically though `Node<T>` won't actually access `T`, the
382                 // future, while it's floating around inside of `Task`
383                 // instances. These structs will basically just use `T` to size
384                 // the internal allocation, appropriately accessing fields and
385                 // deallocating the node if need be.
386                 let res = {
387                     let notify = NodeToHandle(bomb.node.as_ref().unwrap());
388                     task_impl::with_notify(&notify, 0, || {
389                         future.poll()
390                     })
391                 };
392                 polled += 1;
393 
394                 let ret = match res {
395                     Ok(Async::NotReady) => {
396                         let node = bomb.node.take().unwrap();
397                         *node.future.get() = Some(future);
398                         bomb.queue.link(node);
399 
400                         if polled == yield_every {
401                             // We have polled a large number of futures in a row without yielding.
402                             // To ensure we do not starve other tasks waiting on the executor,
403                             // we yield here, but immediately wake ourselves up to continue.
404                             task_impl::current().notify();
405                             return Ok(Async::NotReady);
406                         }
407                         continue
408                     }
409                     Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
410                     Err(e) => Err(e),
411                 };
412                 return ret
413             }
414         }
415     }
416 }
417 
418 impl<T: Debug> Debug for FuturesUnordered<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result419     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
420         write!(fmt, "FuturesUnordered {{ ... }}")
421     }
422 }
423 
424 impl<T> Drop for FuturesUnordered<T> {
drop(&mut self)425     fn drop(&mut self) {
426         // When a `FuturesUnordered` is dropped we want to drop all futures associated
427         // with it. At the same time though there may be tons of `Task` handles
428         // flying around which contain `Node<T>` references inside them. We'll
429         // let those naturally get deallocated when the `Task` itself goes out
430         // of scope or gets notified.
431         unsafe {
432             while !self.head_all.is_null() {
433                 let head = self.head_all;
434                 let node = self.unlink(head);
435                 self.release_node(node);
436             }
437         }
438 
439         // Note that at this point we could still have a bunch of nodes in the
440         // mpsc queue. None of those nodes, however, have futures associated
441         // with them so they're safe to destroy on any thread. At this point
442         // the `FuturesUnordered` struct, the owner of the one strong reference
443         // to `Inner<T>` will drop the strong reference. At that point
444         // whichever thread releases the strong refcount last (be it this
445         // thread or some other thread as part of an `upgrade`) will clear out
446         // the mpsc queue and free all remaining nodes.
447         //
448         // While that freeing operation isn't guaranteed to happen here, it's
449         // guaranteed to happen "promptly" as no more "blocking work" will
450         // happen while there's a strong refcount held.
451     }
452 }
453 
454 impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = F>455     fn from_iter<T>(iter: T) -> Self
456         where T: IntoIterator<Item = F>
457     {
458         let mut new = FuturesUnordered::new();
459         for future in iter.into_iter() {
460             new.push(future);
461         }
462         new
463     }
464 }
465 
466 #[derive(Debug)]
467 /// Mutable iterator over all futures in the unordered set.
468 pub struct IterMut<'a, F: 'a> {
469     node: *const Node<F>,
470     len: usize,
471     _marker: PhantomData<&'a mut FuturesUnordered<F>>
472 }
473 
474 impl<'a, F> Iterator for IterMut<'a, F> {
475     type Item = &'a mut F;
476 
next(&mut self) -> Option<&'a mut F>477     fn next(&mut self) -> Option<&'a mut F> {
478         if self.node.is_null() {
479             return None;
480         }
481         unsafe {
482             let future = (*(*self.node).future.get()).as_mut().unwrap();
483             let next = *(*self.node).next_all.get();
484             self.node = next;
485             self.len -= 1;
486             return Some(future);
487         }
488     }
489 
size_hint(&self) -> (usize, Option<usize>)490     fn size_hint(&self) -> (usize, Option<usize>) {
491         (self.len, Some(self.len))
492     }
493 }
494 
495 impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
496 
497 impl<T> Inner<T> {
498     /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
enqueue(&self, node: *const Node<T>)499     fn enqueue(&self, node: *const Node<T>) {
500         unsafe {
501             debug_assert!((*node).queued.load(Relaxed));
502 
503             // This action does not require any coordination
504             (*node).next_readiness.store(ptr::null_mut(), Relaxed);
505 
506             // Note that these atomic orderings come from 1024cores
507             let node = node as *mut _;
508             let prev = self.head_readiness.swap(node, AcqRel);
509             (*prev).next_readiness.store(node, Release);
510         }
511     }
512 
513     /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
514     ///
515     /// Note that this unsafe as it required mutual exclusion (only one thread
516     /// can call this) to be guaranteed elsewhere.
dequeue(&self) -> Dequeue<T>517     unsafe fn dequeue(&self) -> Dequeue<T> {
518         let mut tail = *self.tail_readiness.get();
519         let mut next = (*tail).next_readiness.load(Acquire);
520 
521         if tail == self.stub() {
522             if next.is_null() {
523                 return Dequeue::Empty;
524             }
525 
526             *self.tail_readiness.get() = next;
527             tail = next;
528             next = (*next).next_readiness.load(Acquire);
529         }
530 
531         if !next.is_null() {
532             *self.tail_readiness.get() = next;
533             debug_assert!(tail != self.stub());
534             return Dequeue::Data(tail);
535         }
536 
537         if self.head_readiness.load(Acquire) as *const _ != tail {
538             return Dequeue::Inconsistent;
539         }
540 
541         self.enqueue(self.stub());
542 
543         next = (*tail).next_readiness.load(Acquire);
544 
545         if !next.is_null() {
546             *self.tail_readiness.get() = next;
547             return Dequeue::Data(tail);
548         }
549 
550         Dequeue::Inconsistent
551     }
552 
stub(&self) -> *const Node<T>553     fn stub(&self) -> *const Node<T> {
554         &*self.stub
555     }
556 }
557 
558 impl<T> Drop for Inner<T> {
drop(&mut self)559     fn drop(&mut self) {
560         // Once we're in the destructor for `Inner<T>` we need to clear out the
561         // mpsc queue of nodes if there's anything left in there.
562         //
563         // Note that each node has a strong reference count associated with it
564         // which is owned by the mpsc queue. All nodes should have had their
565         // futures dropped already by the `FuturesUnordered` destructor above,
566         // so we're just pulling out nodes and dropping their refcounts.
567         unsafe {
568             loop {
569                 match self.dequeue() {
570                     Dequeue::Empty => break,
571                     Dequeue::Inconsistent => abort("inconsistent in drop"),
572                     Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
573                 }
574             }
575         }
576     }
577 }
578 
579 #[allow(missing_debug_implementations)]
580 struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
581 
582 impl<'a, T> Clone for NodeToHandle<'a, T> {
clone(&self) -> Self583     fn clone(&self) -> Self {
584         NodeToHandle(self.0)
585     }
586 }
587 
588 impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
from(handle: NodeToHandle<'a, T>) -> NotifyHandle589     fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
590         unsafe {
591             let ptr = handle.0.clone();
592             let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
593             NotifyHandle::new(hide_lt(ptr))
594         }
595     }
596 }
597 
598 struct ArcNode<T>(PhantomData<T>);
599 
600 // We should never touch `T` on any thread other than the one owning
601 // `FuturesUnordered`, so this should be a safe operation.
602 unsafe impl<T> Send for ArcNode<T> {}
603 unsafe impl<T> Sync for ArcNode<T> {}
604 
605 impl<T> Notify for ArcNode<T> {
notify(&self, _id: usize)606     fn notify(&self, _id: usize) {
607         unsafe {
608             let me: *const ArcNode<T> = self;
609             let me: *const *const ArcNode<T> = &me;
610             let me = me as *const Arc<Node<T>>;
611             Node::notify(&*me)
612         }
613     }
614 }
615 
616 unsafe impl<T> UnsafeNotify for ArcNode<T> {
clone_raw(&self) -> NotifyHandle617     unsafe fn clone_raw(&self) -> NotifyHandle {
618         let me: *const ArcNode<T> = self;
619         let me: *const *const ArcNode<T> = &me;
620         let me = &*(me as *const Arc<Node<T>>);
621         NodeToHandle(me).into()
622     }
623 
drop_raw(&self)624     unsafe fn drop_raw(&self) {
625         let mut me: *const ArcNode<T> = self;
626         let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
627         ptr::drop_in_place(me);
628     }
629 }
630 
hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify631 unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
632     mem::transmute(p as *mut UnsafeNotify)
633 }
634 
635 impl<T> Node<T> {
notify(me: &Arc<Node<T>>)636     fn notify(me: &Arc<Node<T>>) {
637         let inner = match me.queue.upgrade() {
638             Some(inner) => inner,
639             None => return,
640         };
641 
642         // It's our job to notify the node that it's ready to get polled,
643         // meaning that we need to enqueue it into the readiness queue. To
644         // do this we flag that we're ready to be queued, and if successful
645         // we then do the literal queueing operation, ensuring that we're
646         // only queued once.
647         //
648         // Once the node is inserted we be sure to notify the parent task,
649         // as it'll want to come along and pick up our node now.
650         //
651         // Note that we don't change the reference count of the node here,
652         // we're just enqueueing the raw pointer. The `FuturesUnordered`
653         // implementation guarantees that if we set the `queued` flag true that
654         // there's a reference count held by the main `FuturesUnordered` queue
655         // still.
656         let prev = me.queued.swap(true, SeqCst);
657         if !prev {
658             inner.enqueue(&**me);
659             inner.parent.notify();
660         }
661     }
662 }
663 
664 impl<T> Drop for Node<T> {
drop(&mut self)665     fn drop(&mut self) {
666         // Currently a `Node<T>` is sent across all threads for any lifetime,
667         // regardless of `T`. This means that for memory safety we can't
668         // actually touch `T` at any time except when we have a reference to the
669         // `FuturesUnordered` itself.
670         //
671         // Consequently it *should* be the case that we always drop futures from
672         // the `FuturesUnordered` instance, but this is a bomb in place to catch
673         // any bugs in that logic.
674         unsafe {
675             if (*self.future.get()).is_some() {
676                 abort("future still here when dropping");
677             }
678         }
679     }
680 }
681 
arc2ptr<T>(ptr: Arc<T>) -> *const T682 fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
683     let addr = &*ptr as *const T;
684     mem::forget(ptr);
685     return addr
686 }
687 
ptr2arc<T>(ptr: *const T) -> Arc<T>688 unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
689     let anchor = mem::transmute::<usize, Arc<T>>(0x10);
690     let addr = &*anchor as *const T;
691     mem::forget(anchor);
692     let offset = addr as isize - 0x10;
693     mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
694 }
695 
abort(s: &str) -> !696 fn abort(s: &str) -> ! {
697     struct DoublePanic;
698 
699     impl Drop for DoublePanic {
700         fn drop(&mut self) {
701             panic!("panicking twice to abort the program");
702         }
703     }
704 
705     let _bomb = DoublePanic;
706     panic!("{}", s);
707 }
708