1 //! An unbounded set of futures.
2 
3 use std::cell::UnsafeCell;
4 use std::fmt::{self, Debug};
5 use std::iter::FromIterator;
6 use std::marker::PhantomData;
7 use std::mem;
8 use std::ptr;
9 use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
10 use std::sync::atomic::{AtomicPtr, AtomicBool};
11 use std::sync::{Arc, Weak};
12 use std::usize;
13 
14 use {task, Stream, Future, Poll, Async};
15 use executor::{Notify, UnsafeNotify, NotifyHandle};
16 use task_impl::{self, AtomicTask};
17 
18 /// An unbounded set of futures.
19 ///
20 /// This "combinator" also serves a special function in this library, providing
21 /// the ability to maintain a set of futures that and manage driving them all
22 /// to completion.
23 ///
24 /// Futures are pushed into this set and their realized values are yielded as
25 /// they are ready. This structure is optimized to manage a large number of
26 /// futures. Futures managed by `FuturesUnordered` will only be polled when they
27 /// generate notifications. This reduces the required amount of work needed to
28 /// coordinate large numbers of futures.
29 ///
30 /// When a `FuturesUnordered` is first created, it does not contain any futures.
31 /// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
32 /// returned. Futures are submitted to the set using `push`; however, the
33 /// future will **not** be polled at this point. `FuturesUnordered` will only
34 /// poll managed futures when `FuturesUnordered::poll` is called. As such, it
35 /// is important to call `poll` after pushing new futures.
36 ///
37 /// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that
38 /// the set is currently not managing any futures. A future may be submitted
39 /// to the set at a later time. At that point, a call to
40 /// `FuturesUnordered::poll` will either return the future's resolved value
41 /// **or** `Ok(Async::NotReady)` if the future has not yet completed.
42 ///
43 /// Note that you can create a ready-made `FuturesUnordered` via the
44 /// `futures_unordered` function in the `stream` module, or you can start with an
45 /// empty set with the `FuturesUnordered::new` constructor.
46 #[must_use = "streams do nothing unless polled"]
47 pub struct FuturesUnordered<F> {
48     inner: Arc<Inner<F>>,
49     len: usize,
50     head_all: *const Node<F>,
51 }
52 
53 unsafe impl<T: Send> Send for FuturesUnordered<T> {}
54 unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
55 
56 // FuturesUnordered is implemented using two linked lists. One which links all
57 // futures managed by a `FuturesUnordered` and one that tracks futures that have
58 // been scheduled for polling. The first linked list is not thread safe and is
59 // only accessed by the thread that owns the `FuturesUnordered` value. The
60 // second linked list is an implementation of the intrusive MPSC queue algorithm
61 // described by 1024cores.net.
62 //
63 // When a future is submitted to the set a node is allocated and inserted in
64 // both linked lists. The next call to `poll` will (eventually) see this node
65 // and call `poll` on the future.
66 //
67 // Before a managed future is polled, the current task's `Notify` is replaced
68 // with one that is aware of the specific future being run. This ensures that
69 // task notifications generated by that specific future are visible to
70 // `FuturesUnordered`. When a notification is received, the node is scheduled
71 // for polling by being inserted into the concurrent linked list.
72 //
73 // Each node uses an `AtomicUsize` to track it's state. The node state is the
74 // reference count (the number of outstanding handles to the node) as well as a
75 // flag tracking if the node is currently inserted in the atomic queue. When the
76 // future is notified, it will only insert itself into the linked list if it
77 // isn't currently inserted.
78 
79 #[allow(missing_debug_implementations)]
80 struct Inner<T> {
81     // The task using `FuturesUnordered`.
82     parent: AtomicTask,
83 
84     // Head/tail of the readiness queue
85     head_readiness: AtomicPtr<Node<T>>,
86     tail_readiness: UnsafeCell<*const Node<T>>,
87     stub: Arc<Node<T>>,
88 }
89 
90 struct Node<T> {
91     // The future
92     future: UnsafeCell<Option<T>>,
93 
94     // Next pointer for linked list tracking all active nodes
95     next_all: UnsafeCell<*const Node<T>>,
96 
97     // Previous node in linked list tracking all active nodes
98     prev_all: UnsafeCell<*const Node<T>>,
99 
100     // Next pointer in readiness queue
101     next_readiness: AtomicPtr<Node<T>>,
102 
103     // Queue that we'll be enqueued to when notified
104     queue: Weak<Inner<T>>,
105 
106     // Whether or not this node is currently in the mpsc queue.
107     queued: AtomicBool,
108 }
109 
110 enum Dequeue<T> {
111     Data(*const Node<T>),
112     Empty,
113     Inconsistent,
114 }
115 
116 impl<T> FuturesUnordered<T>
117     where T: Future,
118 {
119     /// Constructs a new, empty `FuturesUnordered`
120     ///
121     /// The returned `FuturesUnordered` does not contain any futures and, in this
122     /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`.
new() -> FuturesUnordered<T>123     pub fn new() -> FuturesUnordered<T> {
124         let stub = Arc::new(Node {
125             future: UnsafeCell::new(None),
126             next_all: UnsafeCell::new(ptr::null()),
127             prev_all: UnsafeCell::new(ptr::null()),
128             next_readiness: AtomicPtr::new(ptr::null_mut()),
129             queued: AtomicBool::new(true),
130             queue: Weak::new(),
131         });
132         let stub_ptr = &*stub as *const Node<T>;
133         let inner = Arc::new(Inner {
134             parent: AtomicTask::new(),
135             head_readiness: AtomicPtr::new(stub_ptr as *mut _),
136             tail_readiness: UnsafeCell::new(stub_ptr),
137             stub: stub,
138         });
139 
140         FuturesUnordered {
141             len: 0,
142             head_all: ptr::null_mut(),
143             inner: inner,
144         }
145     }
146 }
147 
148 impl<T> FuturesUnordered<T> {
149     /// Returns the number of futures contained in the set.
150     ///
151     /// This represents the total number of in-flight futures.
len(&self) -> usize152     pub fn len(&self) -> usize {
153         self.len
154     }
155 
156     /// Returns `true` if the set contains no futures
is_empty(&self) -> bool157     pub fn is_empty(&self) -> bool {
158         self.len == 0
159     }
160 
161     /// Push a future into the set.
162     ///
163     /// This function submits the given future to the set for managing. This
164     /// function will not call `poll` on the submitted future. The caller must
165     /// ensure that `FuturesUnordered::poll` is called in order to receive task
166     /// notifications.
push(&mut self, future: T)167     pub fn push(&mut self, future: T) {
168         let node = Arc::new(Node {
169             future: UnsafeCell::new(Some(future)),
170             next_all: UnsafeCell::new(ptr::null_mut()),
171             prev_all: UnsafeCell::new(ptr::null_mut()),
172             next_readiness: AtomicPtr::new(ptr::null_mut()),
173             queued: AtomicBool::new(true),
174             queue: Arc::downgrade(&self.inner),
175         });
176 
177         // Right now our node has a strong reference count of 1. We transfer
178         // ownership of this reference count to our internal linked list
179         // and we'll reclaim ownership through the `unlink` function below.
180         let ptr = self.link(node);
181 
182         // We'll need to get the future "into the system" to start tracking it,
183         // e.g. getting its unpark notifications going to us tracking which
184         // futures are ready. To do that we unconditionally enqueue it for
185         // polling here.
186         self.inner.enqueue(ptr);
187     }
188 
189     /// Returns an iterator that allows modifying each future in the set.
iter_mut(&mut self) -> IterMut<T>190     pub fn iter_mut(&mut self) -> IterMut<T> {
191         IterMut {
192             node: self.head_all,
193             len: self.len,
194             _marker: PhantomData
195         }
196     }
197 
release_node(&mut self, node: Arc<Node<T>>)198     fn release_node(&mut self, node: Arc<Node<T>>) {
199         // The future is done, try to reset the queued flag. This will prevent
200         // `notify` from doing any work in the future
201         let prev = node.queued.swap(true, SeqCst);
202 
203         // Drop the future, even if it hasn't finished yet. This is safe
204         // because we're dropping the future on the thread that owns
205         // `FuturesUnordered`, which correctly tracks T's lifetimes and such.
206         unsafe {
207             drop((*node.future.get()).take());
208         }
209 
210         // If the queued flag was previously set then it means that this node
211         // is still in our internal mpsc queue. We then transfer ownership
212         // of our reference count to the mpsc queue, and it'll come along and
213         // free it later, noticing that the future is `None`.
214         //
215         // If, however, the queued flag was *not* set then we're safe to
216         // release our reference count on the internal node. The queued flag
217         // was set above so all future `enqueue` operations will not actually
218         // enqueue the node, so our node will never see the mpsc queue again.
219         // The node itself will be deallocated once all reference counts have
220         // been dropped by the various owning tasks elsewhere.
221         if prev {
222             mem::forget(node);
223         }
224     }
225 
226     /// Insert a new node into the internal linked list.
link(&mut self, node: Arc<Node<T>>) -> *const Node<T>227     fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
228         let ptr = arc2ptr(node);
229         unsafe {
230             *(*ptr).next_all.get() = self.head_all;
231             if !self.head_all.is_null() {
232                 *(*self.head_all).prev_all.get() = ptr;
233             }
234         }
235 
236         self.head_all = ptr;
237         self.len += 1;
238         return ptr
239     }
240 
241     /// Remove the node from the linked list tracking all nodes currently
242     /// managed by `FuturesUnordered`.
unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>>243     unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
244         let node = ptr2arc(node);
245         let next = *node.next_all.get();
246         let prev = *node.prev_all.get();
247         *node.next_all.get() = ptr::null_mut();
248         *node.prev_all.get() = ptr::null_mut();
249 
250         if !next.is_null() {
251             *(*next).prev_all.get() = prev;
252         }
253 
254         if !prev.is_null() {
255             *(*prev).next_all.get() = next;
256         } else {
257             self.head_all = next;
258         }
259         self.len -= 1;
260         return node
261     }
262 }
263 
264 impl<T> Stream for FuturesUnordered<T>
265     where T: Future
266 {
267     type Item = T::Item;
268     type Error = T::Error;
269 
poll(&mut self) -> Poll<Option<T::Item>, T::Error>270     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
271         // Ensure `parent` is correctly set.
272         self.inner.parent.register();
273 
274         loop {
275             let node = match unsafe { self.inner.dequeue() } {
276                 Dequeue::Empty => {
277                     if self.is_empty() {
278                         return Ok(Async::Ready(None));
279                     } else {
280                         return Ok(Async::NotReady)
281                     }
282                 }
283                 Dequeue::Inconsistent => {
284                     // At this point, it may be worth yielding the thread &
285                     // spinning a few times... but for now, just yield using the
286                     // task system.
287                     task::current().notify();
288                     return Ok(Async::NotReady);
289                 }
290                 Dequeue::Data(node) => node,
291             };
292 
293             debug_assert!(node != self.inner.stub());
294 
295             unsafe {
296                 let mut future = match (*(*node).future.get()).take() {
297                     Some(future) => future,
298 
299                     // If the future has already gone away then we're just
300                     // cleaning out this node. See the comment in
301                     // `release_node` for more information, but we're basically
302                     // just taking ownership of our reference count here.
303                     None => {
304                         let node = ptr2arc(node);
305                         assert!((*node.next_all.get()).is_null());
306                         assert!((*node.prev_all.get()).is_null());
307                         continue
308                     }
309                 };
310 
311                 // Unset queued flag... this must be done before
312                 // polling. This ensures that the future gets
313                 // rescheduled if it is notified **during** a call
314                 // to `poll`.
315                 let prev = (*node).queued.swap(false, SeqCst);
316                 assert!(prev);
317 
318                 // We're going to need to be very careful if the `poll`
319                 // function below panics. We need to (a) not leak memory and
320                 // (b) ensure that we still don't have any use-after-frees. To
321                 // manage this we do a few things:
322                 //
323                 // * This "bomb" here will call `release_node` if dropped
324                 //   abnormally. That way we'll be sure the memory management
325                 //   of the `node` is managed correctly.
326                 // * The future was extracted above (taken ownership). That way
327                 //   if it panics we're guaranteed that the future is
328                 //   dropped on this thread and doesn't accidentally get
329                 //   dropped on a different thread (bad).
330                 // * We unlink the node from our internal queue to preemptively
331                 //   assume it'll panic, in which case we'll want to discard it
332                 //   regardless.
333                 struct Bomb<'a, T: 'a> {
334                     queue: &'a mut FuturesUnordered<T>,
335                     node: Option<Arc<Node<T>>>,
336                 }
337                 impl<'a, T> Drop for Bomb<'a, T> {
338                     fn drop(&mut self) {
339                         if let Some(node) = self.node.take() {
340                             self.queue.release_node(node);
341                         }
342                     }
343                 }
344                 let mut bomb = Bomb {
345                     node: Some(self.unlink(node)),
346                     queue: self,
347                 };
348 
349                 // Poll the underlying future with the appropriate `notify`
350                 // implementation. This is where a large bit of the unsafety
351                 // starts to stem from internally. The `notify` instance itself
352                 // is basically just our `Arc<Node<T>>` and tracks the mpsc
353                 // queue of ready futures.
354                 //
355                 // Critically though `Node<T>` won't actually access `T`, the
356                 // future, while it's floating around inside of `Task`
357                 // instances. These structs will basically just use `T` to size
358                 // the internal allocation, appropriately accessing fields and
359                 // deallocating the node if need be.
360                 let res = {
361                     let notify = NodeToHandle(bomb.node.as_ref().unwrap());
362                     task_impl::with_notify(&notify, 0, || {
363                         future.poll()
364                     })
365                 };
366 
367                 let ret = match res {
368                     Ok(Async::NotReady) => {
369                         let node = bomb.node.take().unwrap();
370                         *node.future.get() = Some(future);
371                         bomb.queue.link(node);
372                         continue
373                     }
374                     Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
375                     Err(e) => Err(e),
376                 };
377                 return ret
378             }
379         }
380     }
381 }
382 
383 impl<T: Debug> Debug for FuturesUnordered<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result384     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
385         write!(fmt, "FuturesUnordered {{ ... }}")
386     }
387 }
388 
389 impl<T> Drop for FuturesUnordered<T> {
drop(&mut self)390     fn drop(&mut self) {
391         // When a `FuturesUnordered` is dropped we want to drop all futures associated
392         // with it. At the same time though there may be tons of `Task` handles
393         // flying around which contain `Node<T>` references inside them. We'll
394         // let those naturally get deallocated when the `Task` itself goes out
395         // of scope or gets notified.
396         unsafe {
397             while !self.head_all.is_null() {
398                 let head = self.head_all;
399                 let node = self.unlink(head);
400                 self.release_node(node);
401             }
402         }
403 
404         // Note that at this point we could still have a bunch of nodes in the
405         // mpsc queue. None of those nodes, however, have futures associated
406         // with them so they're safe to destroy on any thread. At this point
407         // the `FuturesUnordered` struct, the owner of the one strong reference
408         // to `Inner<T>` will drop the strong reference. At that point
409         // whichever thread releases the strong refcount last (be it this
410         // thread or some other thread as part of an `upgrade`) will clear out
411         // the mpsc queue and free all remaining nodes.
412         //
413         // While that freeing operation isn't guaranteed to happen here, it's
414         // guaranteed to happen "promptly" as no more "blocking work" will
415         // happen while there's a strong refcount held.
416     }
417 }
418 
419 impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = F>420     fn from_iter<T>(iter: T) -> Self
421         where T: IntoIterator<Item = F>
422     {
423         let mut new = FuturesUnordered::new();
424         for future in iter.into_iter() {
425             new.push(future);
426         }
427         new
428     }
429 }
430 
431 #[derive(Debug)]
432 /// Mutable iterator over all futures in the unordered set.
433 pub struct IterMut<'a, F: 'a> {
434     node: *const Node<F>,
435     len: usize,
436     _marker: PhantomData<&'a mut FuturesUnordered<F>>
437 }
438 
439 impl<'a, F> Iterator for IterMut<'a, F> {
440     type Item = &'a mut F;
441 
next(&mut self) -> Option<&'a mut F>442     fn next(&mut self) -> Option<&'a mut F> {
443         if self.node.is_null() {
444             return None;
445         }
446         unsafe {
447             let future = (*(*self.node).future.get()).as_mut().unwrap();
448             let next = *(*self.node).next_all.get();
449             self.node = next;
450             self.len -= 1;
451             return Some(future);
452         }
453     }
454 
size_hint(&self) -> (usize, Option<usize>)455     fn size_hint(&self) -> (usize, Option<usize>) {
456         (self.len, Some(self.len))
457     }
458 }
459 
460 impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
461 
462 impl<T> Inner<T> {
463     /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
enqueue(&self, node: *const Node<T>)464     fn enqueue(&self, node: *const Node<T>) {
465         unsafe {
466             debug_assert!((*node).queued.load(Relaxed));
467 
468             // This action does not require any coordination
469             (*node).next_readiness.store(ptr::null_mut(), Relaxed);
470 
471             // Note that these atomic orderings come from 1024cores
472             let node = node as *mut _;
473             let prev = self.head_readiness.swap(node, AcqRel);
474             (*prev).next_readiness.store(node, Release);
475         }
476     }
477 
478     /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
479     ///
480     /// Note that this unsafe as it required mutual exclusion (only one thread
481     /// can call this) to be guaranteed elsewhere.
dequeue(&self) -> Dequeue<T>482     unsafe fn dequeue(&self) -> Dequeue<T> {
483         let mut tail = *self.tail_readiness.get();
484         let mut next = (*tail).next_readiness.load(Acquire);
485 
486         if tail == self.stub() {
487             if next.is_null() {
488                 return Dequeue::Empty;
489             }
490 
491             *self.tail_readiness.get() = next;
492             tail = next;
493             next = (*next).next_readiness.load(Acquire);
494         }
495 
496         if !next.is_null() {
497             *self.tail_readiness.get() = next;
498             debug_assert!(tail != self.stub());
499             return Dequeue::Data(tail);
500         }
501 
502         if self.head_readiness.load(Acquire) as *const _ != tail {
503             return Dequeue::Inconsistent;
504         }
505 
506         self.enqueue(self.stub());
507 
508         next = (*tail).next_readiness.load(Acquire);
509 
510         if !next.is_null() {
511             *self.tail_readiness.get() = next;
512             return Dequeue::Data(tail);
513         }
514 
515         Dequeue::Inconsistent
516     }
517 
stub(&self) -> *const Node<T>518     fn stub(&self) -> *const Node<T> {
519         &*self.stub
520     }
521 }
522 
523 impl<T> Drop for Inner<T> {
drop(&mut self)524     fn drop(&mut self) {
525         // Once we're in the destructor for `Inner<T>` we need to clear out the
526         // mpsc queue of nodes if there's anything left in there.
527         //
528         // Note that each node has a strong reference count associated with it
529         // which is owned by the mpsc queue. All nodes should have had their
530         // futures dropped already by the `FuturesUnordered` destructor above,
531         // so we're just pulling out nodes and dropping their refcounts.
532         unsafe {
533             loop {
534                 match self.dequeue() {
535                     Dequeue::Empty => break,
536                     Dequeue::Inconsistent => abort("inconsistent in drop"),
537                     Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
538                 }
539             }
540         }
541     }
542 }
543 
544 #[allow(missing_debug_implementations)]
545 struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
546 
547 impl<'a, T> Clone for NodeToHandle<'a, T> {
clone(&self) -> Self548     fn clone(&self) -> Self {
549         NodeToHandle(self.0)
550     }
551 }
552 
553 impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
from(handle: NodeToHandle<'a, T>) -> NotifyHandle554     fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
555         unsafe {
556             let ptr = handle.0.clone();
557             let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
558             NotifyHandle::new(hide_lt(ptr))
559         }
560     }
561 }
562 
563 struct ArcNode<T>(PhantomData<T>);
564 
565 // We should never touch `T` on any thread other than the one owning
566 // `FuturesUnordered`, so this should be a safe operation.
567 unsafe impl<T> Send for ArcNode<T> {}
568 unsafe impl<T> Sync for ArcNode<T> {}
569 
570 impl<T> Notify for ArcNode<T> {
notify(&self, _id: usize)571     fn notify(&self, _id: usize) {
572         unsafe {
573             let me: *const ArcNode<T> = self;
574             let me: *const *const ArcNode<T> = &me;
575             let me = me as *const Arc<Node<T>>;
576             Node::notify(&*me)
577         }
578     }
579 }
580 
581 unsafe impl<T> UnsafeNotify for ArcNode<T> {
clone_raw(&self) -> NotifyHandle582     unsafe fn clone_raw(&self) -> NotifyHandle {
583         let me: *const ArcNode<T> = self;
584         let me: *const *const ArcNode<T> = &me;
585         let me = &*(me as *const Arc<Node<T>>);
586         NodeToHandle(me).into()
587     }
588 
drop_raw(&self)589     unsafe fn drop_raw(&self) {
590         let mut me: *const ArcNode<T> = self;
591         let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
592         ptr::drop_in_place(me);
593     }
594 }
595 
hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify596 unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
597     mem::transmute(p as *mut UnsafeNotify)
598 }
599 
600 impl<T> Node<T> {
notify(me: &Arc<Node<T>>)601     fn notify(me: &Arc<Node<T>>) {
602         let inner = match me.queue.upgrade() {
603             Some(inner) => inner,
604             None => return,
605         };
606 
607         // It's our job to notify the node that it's ready to get polled,
608         // meaning that we need to enqueue it into the readiness queue. To
609         // do this we flag that we're ready to be queued, and if successful
610         // we then do the literal queueing operation, ensuring that we're
611         // only queued once.
612         //
613         // Once the node is inserted we be sure to notify the parent task,
614         // as it'll want to come along and pick up our node now.
615         //
616         // Note that we don't change the reference count of the node here,
617         // we're just enqueueing the raw pointer. The `FuturesUnordered`
618         // implementation guarantees that if we set the `queued` flag true that
619         // there's a reference count held by the main `FuturesUnordered` queue
620         // still.
621         let prev = me.queued.swap(true, SeqCst);
622         if !prev {
623             inner.enqueue(&**me);
624             inner.parent.notify();
625         }
626     }
627 }
628 
629 impl<T> Drop for Node<T> {
drop(&mut self)630     fn drop(&mut self) {
631         // Currently a `Node<T>` is sent across all threads for any lifetime,
632         // regardless of `T`. This means that for memory safety we can't
633         // actually touch `T` at any time except when we have a reference to the
634         // `FuturesUnordered` itself.
635         //
636         // Consequently it *should* be the case that we always drop futures from
637         // the `FuturesUnordered` instance, but this is a bomb in place to catch
638         // any bugs in that logic.
639         unsafe {
640             if (*self.future.get()).is_some() {
641                 abort("future still here when dropping");
642             }
643         }
644     }
645 }
646 
arc2ptr<T>(ptr: Arc<T>) -> *const T647 fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
648     let addr = &*ptr as *const T;
649     mem::forget(ptr);
650     return addr
651 }
652 
ptr2arc<T>(ptr: *const T) -> Arc<T>653 unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
654     let anchor = mem::transmute::<usize, Arc<T>>(0x10);
655     let addr = &*anchor as *const T;
656     mem::forget(anchor);
657     let offset = addr as isize - 0x10;
658     mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
659 }
660 
abort(s: &str) -> !661 fn abort(s: &str) -> ! {
662     struct DoublePanic;
663 
664     impl Drop for DoublePanic {
665         fn drop(&mut self) {
666             panic!("panicking twice to abort the program");
667         }
668     }
669 
670     let _bomb = DoublePanic;
671     panic!("{}", s);
672 }
673