1 use std::prelude::v1::*;
2 
3 use std::cell::Cell;
4 use std::fmt;
5 use std::marker::PhantomData;
6 use std::mem;
7 use std::ptr;
8 use std::sync::{Arc, Mutex, Condvar, Once};
9 #[allow(deprecated)]
10 use std::sync::ONCE_INIT;
11 use std::sync::atomic::{AtomicUsize, Ordering};
12 
13 use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
14 use super::core;
15 use super::{BorrowedTask, NotifyHandle, Spawn, spawn, Notify, UnsafeNotify};
16 
17 mod unpark_mutex;
18 pub use self::unpark_mutex::UnparkMutex;
19 
20 mod data;
21 pub use self::data::*;
22 
23 mod task_rc;
24 #[allow(deprecated)]
25 #[cfg(feature = "with-deprecated")]
26 pub use self::task_rc::TaskRc;
27 
28 pub use task_impl::core::init;
29 
30 thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut()));
31 
32 /// Return whether the caller is running in a task (and so can use task_local!).
is_in_task() -> bool33 pub fn is_in_task() -> bool {
34     CURRENT_TASK.with(|task| !task.get().is_null())
35 }
36 
37 #[allow(deprecated)]
38 static INIT: Once = ONCE_INIT;
39 
get_ptr() -> Option<*mut u8>40 pub fn get_ptr() -> Option<*mut u8> {
41     // Since this condition will always return true when TLS task storage is
42     // used (the default), the branch predictor will be able to optimize the
43     // branching and a dynamic dispatch will be avoided, which makes the
44     // compiler happier.
45     if core::is_get_ptr(0x1) {
46         Some(CURRENT_TASK.with(|c| c.get()))
47     } else {
48         core::get_ptr()
49     }
50 }
51 
tls_slot() -> *const Cell<*mut u8>52 fn tls_slot() -> *const Cell<*mut u8> {
53     CURRENT_TASK.with(|c| c as *const _)
54 }
55 
set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R where F: FnOnce() -> R56 pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R
57     where F: FnOnce() -> R
58 {
59     // Lazily initialize the get / set ptrs
60     //
61     // Note that we won't actually use these functions ever, we'll instead be
62     // testing the pointer's value elsewhere and calling our own functions.
63     INIT.call_once(|| unsafe {
64         let get = mem::transmute::<usize, _>(0x1);
65         let set = mem::transmute::<usize, _>(0x2);
66         init(get, set);
67     });
68 
69     // Same as above.
70     if core::is_get_ptr(0x1) {
71         struct Reset(*const Cell<*mut u8>, *mut u8);
72 
73         impl Drop for Reset {
74             #[inline]
75             fn drop(&mut self) {
76                 unsafe {
77                     (*self.0).set(self.1);
78                 }
79             }
80         }
81 
82         unsafe {
83             let slot = tls_slot();
84             let _reset = Reset(slot, (*slot).get());
85             (*slot).set(task as *const _ as *mut u8);
86             f()
87         }
88     } else {
89         core::set(task, f)
90     }
91 }
92 
93 #[derive(Copy, Clone)]
94 #[allow(deprecated)]
95 pub enum BorrowedUnpark<'a> {
96     Old(&'a Arc<Unpark>),
97     New(core::BorrowedUnpark<'a>),
98 }
99 
100 #[derive(Copy, Clone)]
101 #[allow(deprecated)]
102 pub enum BorrowedEvents<'a> {
103     None,
104     One(&'a UnparkEvent, &'a BorrowedEvents<'a>),
105 }
106 
107 #[derive(Clone)]
108 pub enum TaskUnpark {
109     #[allow(deprecated)]
110     Old(Arc<Unpark>),
111     New(core::TaskUnpark),
112 }
113 
114 #[derive(Clone)]
115 #[allow(deprecated)]
116 pub enum UnparkEvents {
117     None,
118     One(UnparkEvent),
119     Many(Box<[UnparkEvent]>),
120 }
121 
122 impl<'a> BorrowedUnpark<'a> {
123     #[inline]
new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a>124     pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> {
125         BorrowedUnpark::New(core::BorrowedUnpark::new(f, id))
126     }
127 
128     #[inline]
to_owned(&self) -> TaskUnpark129     pub fn to_owned(&self) -> TaskUnpark {
130         match *self {
131             BorrowedUnpark::Old(old) => TaskUnpark::Old(old.clone()),
132             BorrowedUnpark::New(new) => TaskUnpark::New(new.to_owned()),
133         }
134     }
135 }
136 
137 impl<'a> BorrowedEvents<'a> {
138     #[inline]
new() -> BorrowedEvents<'a>139     pub fn new() -> BorrowedEvents<'a> {
140         BorrowedEvents::None
141     }
142 
143     #[inline]
to_owned(&self) -> UnparkEvents144     pub fn to_owned(&self) -> UnparkEvents {
145         let mut one_event = None;
146         let mut list = Vec::new();
147         let mut cur = self;
148         while let BorrowedEvents::One(event, next) = *cur {
149             let event = event.clone();
150             match one_event.take() {
151                 None if list.len() == 0 => one_event = Some(event),
152                 None => list.push(event),
153                 Some(event2) =>  {
154                     list.push(event2);
155                     list.push(event);
156                 }
157             }
158             cur = next;
159         }
160 
161         match one_event {
162             None if list.len() == 0 => UnparkEvents::None,
163             None => UnparkEvents::Many(list.into_boxed_slice()),
164             Some(e) => UnparkEvents::One(e),
165         }
166     }
167 }
168 
169 impl UnparkEvents {
notify(&self)170     pub fn notify(&self) {
171         match *self {
172             UnparkEvents::None => {}
173             UnparkEvents::One(ref e) => e.unpark(),
174             UnparkEvents::Many(ref list) => {
175                 for event in list.iter() {
176                     event.unpark();
177                 }
178             }
179         }
180     }
181 
will_notify(&self, events: &BorrowedEvents) -> bool182     pub fn will_notify(&self, events: &BorrowedEvents) -> bool {
183         // Pessimistically assume that any unpark events mean that we're not
184         // equivalent to the current task.
185         match *self {
186             UnparkEvents::None => {}
187             _ => return false,
188         }
189 
190         match *events {
191             BorrowedEvents::None => return true,
192             _ => {},
193         }
194 
195         return false
196     }
197 }
198 
199 #[allow(deprecated)]
200 impl TaskUnpark {
notify(&self)201     pub fn notify(&self) {
202         match *self {
203             TaskUnpark::Old(ref old) => old.unpark(),
204             TaskUnpark::New(ref new) => new.notify(),
205         }
206     }
207 
will_notify(&self, unpark: &BorrowedUnpark) -> bool208     pub fn will_notify(&self, unpark: &BorrowedUnpark) -> bool {
209         match (unpark, self) {
210             (&BorrowedUnpark::Old(old1), &TaskUnpark::Old(ref old2)) => {
211                 &**old1 as *const Unpark == &**old2 as *const Unpark
212             }
213             (&BorrowedUnpark::New(ref new1), &TaskUnpark::New(ref new2)) => {
214                 new2.will_notify(new1)
215             }
216             _ => false,
217         }
218     }
219 }
220 
221 impl<F: Future> Spawn<F> {
222     #[doc(hidden)]
223     #[deprecated(note = "recommended to use `poll_future_notify` instead")]
224     #[allow(deprecated)]
poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error>225     pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
226         self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll())
227     }
228 
229     /// Waits for the internal future to complete, blocking this thread's
230     /// execution until it does.
231     ///
232     /// This function will call `poll_future` in a loop, waiting for the future
233     /// to complete. When a future cannot make progress it will use
234     /// `thread::park` to block the current thread.
wait_future(&mut self) -> Result<F::Item, F::Error>235     pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
236         ThreadNotify::with_current(|notify| {
237 
238             loop {
239                 match self.poll_future_notify(notify, 0)? {
240                     Async::NotReady => notify.park(),
241                     Async::Ready(e) => return Ok(e),
242                 }
243             }
244         })
245     }
246 
247 
248     #[doc(hidden)]
249     #[deprecated]
250     #[allow(deprecated)]
execute(self, exec: Arc<Executor>) where F: Future<Item=(), Error=()> + Send + 'static,251     pub fn execute(self, exec: Arc<Executor>)
252         where F: Future<Item=(), Error=()> + Send + 'static,
253     {
254         exec.clone().execute(Run {
255             // Ideally this method would be defined directly on
256             // `Spawn<BoxFuture<(), ()>>` so we wouldn't have to box here and
257             // it'd be more explicit, but unfortunately that currently has a
258             // link error on nightly: rust-lang/rust#36155
259             spawn: spawn(Box::new(self.into_inner())),
260             inner: Arc::new(RunInner {
261                 exec: exec,
262                 mutex: UnparkMutex::new()
263             }),
264         })
265     }
266 }
267 
268 impl<S: Stream> Spawn<S> {
269     #[deprecated(note = "recommended to use `poll_stream_notify` instead")]
270     #[allow(deprecated)]
271     #[doc(hidden)]
poll_stream(&mut self, unpark: Arc<Unpark>) -> Poll<Option<S::Item>, S::Error>272     pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
273                        -> Poll<Option<S::Item>, S::Error> {
274         self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll())
275     }
276 
277     /// Like `wait_future`, except only waits for the next element to arrive on
278     /// the underlying stream.
wait_stream(&mut self) -> Option<Result<S::Item, S::Error>>279     pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
280         ThreadNotify::with_current(|notify| {
281 
282             loop {
283                 match self.poll_stream_notify(notify, 0) {
284                     Ok(Async::NotReady) => notify.park(),
285                     Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
286                     Ok(Async::Ready(None)) => return None,
287                     Err(e) => return Some(Err(e)),
288                 }
289             }
290         })
291     }
292 }
293 
294 impl<S: Sink> Spawn<S> {
295     #[doc(hidden)]
296     #[deprecated(note = "recommended to use `start_send_notify` instead")]
297     #[allow(deprecated)]
start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>) -> StartSend<S::SinkItem, S::SinkError>298     pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>)
299                        -> StartSend<S::SinkItem, S::SinkError> {
300         self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value))
301     }
302 
303     #[deprecated(note = "recommended to use `poll_flush_notify` instead")]
304     #[allow(deprecated)]
305     #[doc(hidden)]
poll_flush(&mut self, unpark: &Arc<Unpark>) -> Poll<(), S::SinkError>306     pub fn poll_flush(&mut self, unpark: &Arc<Unpark>)
307                        -> Poll<(), S::SinkError> {
308         self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete())
309     }
310 
311     /// Blocks the current thread until it's able to send `value` on this sink.
312     ///
313     /// This function will send the `value` on the sink that this task wraps. If
314     /// the sink is not ready to send the value yet then the current thread will
315     /// be blocked until it's able to send the value.
wait_send(&mut self, mut value: S::SinkItem) -> Result<(), S::SinkError>316     pub fn wait_send(&mut self, mut value: S::SinkItem)
317                      -> Result<(), S::SinkError> {
318         ThreadNotify::with_current(|notify| {
319 
320             loop {
321                 value = match self.start_send_notify(value, notify, 0)? {
322                     AsyncSink::NotReady(v) => v,
323                     AsyncSink::Ready => return Ok(()),
324                 };
325                 notify.park();
326             }
327         })
328     }
329 
330     /// Blocks the current thread until it's able to flush this sink.
331     ///
332     /// This function will call the underlying sink's `poll_complete` method
333     /// until it returns that it's ready, proxying out errors upwards to the
334     /// caller if one occurs.
335     ///
336     /// The thread will be blocked until `poll_complete` returns that it's
337     /// ready.
wait_flush(&mut self) -> Result<(), S::SinkError>338     pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
339         ThreadNotify::with_current(|notify| {
340 
341             loop {
342                 if self.poll_flush_notify(notify, 0)?.is_ready() {
343                     return Ok(())
344                 }
345                 notify.park();
346             }
347         })
348     }
349 
350     /// Blocks the current thread until it's able to close this sink.
351     ///
352     /// This function will close the sink that this task wraps. If the sink
353     /// is not ready to be close yet, then the current thread will be blocked
354     /// until it's closed.
wait_close(&mut self) -> Result<(), S::SinkError>355     pub fn wait_close(&mut self) -> Result<(), S::SinkError> {
356         ThreadNotify::with_current(|notify| {
357 
358             loop {
359                 if self.close_notify(notify, 0)?.is_ready() {
360                     return Ok(())
361                 }
362                 notify.park();
363             }
364         })
365     }
366 }
367 
368 /// A trait which represents a sink of notifications that a future is ready to
369 /// make progress.
370 ///
371 /// This trait is provided as an argument to the `Spawn::poll_future` and
372 /// `Spawn::poll_stream` functions. It's transitively used as part of the
373 /// `Task::unpark` method to internally deliver notifications of readiness of a
374 /// future to move forward.
375 #[deprecated(note = "recommended to use `Notify` instead")]
376 pub trait Unpark: Send + Sync {
377     /// Indicates that an associated future and/or task are ready to make
378     /// progress.
379     ///
380     /// Typically this means that the receiver of the notification should
381     /// arrange for the future to get poll'd in a prompt fashion.
unpark(&self)382     fn unpark(&self);
383 }
384 
385 /// A trait representing requests to poll futures.
386 ///
387 /// This trait is an argument to the `Spawn::execute` which is used to run a
388 /// future to completion. An executor will receive requests to run a future and
389 /// an executor is responsible for ensuring that happens in a timely fashion.
390 ///
391 /// Note that this trait is likely to be deprecated and/or renamed to avoid
392 /// clashing with the `future::Executor` trait. If you've got a use case for
393 /// this or would like to comment on the name please let us know!
394 #[deprecated]
395 #[allow(deprecated)]
396 pub trait Executor: Send + Sync + 'static {
397     /// Requests that `Run` is executed soon on the given executor.
execute(&self, r: Run)398     fn execute(&self, r: Run);
399 }
400 
401 /// Units of work submitted to an `Executor`, currently only created
402 /// internally.
403 #[deprecated]
404 pub struct Run {
405     spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>,
406     inner: Arc<RunInner>,
407 }
408 
409 #[allow(deprecated)]
410 struct RunInner {
411     mutex: UnparkMutex<Run>,
412     exec: Arc<Executor>,
413 }
414 
415 #[allow(deprecated)]
416 impl Run {
417     /// Actually run the task (invoking `poll` on its future) on the current
418     /// thread.
run(self)419     pub fn run(self) {
420         let Run { mut spawn, inner } = self;
421 
422         // SAFETY: the ownership of this `Run` object is evidence that
423         // we are in the `POLLING`/`REPOLL` state for the mutex.
424         unsafe {
425             inner.mutex.start_poll();
426 
427             loop {
428                 match spawn.poll_future_notify(&inner, 0) {
429                     Ok(Async::NotReady) => {}
430                     Ok(Async::Ready(())) |
431                     Err(()) => return inner.mutex.complete(),
432                 }
433                 let run = Run { spawn: spawn, inner: inner.clone() };
434                 match inner.mutex.wait(run) {
435                     Ok(()) => return,            // we've waited
436                     Err(r) => spawn = r.spawn,   // someone's notified us
437                 }
438             }
439         }
440     }
441 }
442 
443 #[allow(deprecated)]
444 impl fmt::Debug for Run {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result445     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
446         f.debug_struct("Run")
447          .field("contents", &"...")
448          .finish()
449     }
450 }
451 
452 #[allow(deprecated)]
453 impl Notify for RunInner {
notify(&self, _id: usize)454     fn notify(&self, _id: usize) {
455         match self.mutex.notify() {
456             Ok(run) => self.exec.execute(run),
457             Err(()) => {}
458         }
459     }
460 }
461 
462 // ===== ThreadNotify =====
463 
464 struct ThreadNotify {
465     state: AtomicUsize,
466     mutex: Mutex<()>,
467     condvar: Condvar,
468 }
469 
470 const IDLE: usize = 0;
471 const NOTIFY: usize = 1;
472 const SLEEP: usize = 2;
473 
474 thread_local! {
475     static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
476         state: AtomicUsize::new(IDLE),
477         mutex: Mutex::new(()),
478         condvar: Condvar::new(),
479     });
480 }
481 
482 impl ThreadNotify {
with_current<F, R>(f: F) -> R where F: FnOnce(&Arc<ThreadNotify>) -> R,483     fn with_current<F, R>(f: F) -> R
484         where F: FnOnce(&Arc<ThreadNotify>) -> R,
485     {
486         CURRENT_THREAD_NOTIFY.with(|notify| f(notify))
487     }
488 
park(&self)489     fn park(&self) {
490         // If currently notified, then we skip sleeping. This is checked outside
491         // of the lock to avoid acquiring a mutex if not necessary.
492         match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
493             NOTIFY => return,
494             IDLE => {},
495             _ => unreachable!(),
496         }
497 
498         // The state is currently idle, so obtain the lock and then try to
499         // transition to a sleeping state.
500         let mut m = self.mutex.lock().unwrap();
501 
502         // Transition to sleeping
503         match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
504             NOTIFY => {
505                 // Notified before we could sleep, consume the notification and
506                 // exit
507                 self.state.store(IDLE, Ordering::SeqCst);
508                 return;
509             }
510             IDLE => {},
511             _ => unreachable!(),
512         }
513 
514         // Loop until we've been notified
515         loop {
516             m = self.condvar.wait(m).unwrap();
517 
518             // Transition back to idle, loop otherwise
519             if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
520                 return;
521             }
522         }
523     }
524 }
525 
526 impl Notify for ThreadNotify {
notify(&self, _unpark_id: usize)527     fn notify(&self, _unpark_id: usize) {
528         // First, try transitioning from IDLE -> NOTIFY, this does not require a
529         // lock.
530         match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
531             IDLE | NOTIFY => return,
532             SLEEP => {}
533             _ => unreachable!(),
534         }
535 
536         // The other half is sleeping, this requires a lock
537         let _m = self.mutex.lock().unwrap();
538 
539         // Transition from SLEEP -> NOTIFY
540         match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
541             SLEEP => {}
542             _ => return,
543         }
544 
545         // Wakeup the sleeper
546         self.condvar.notify_one();
547     }
548 }
549 
550 // ===== UnparkEvent =====
551 
552 /// For the duration of the given callback, add an "unpark event" to be
553 /// triggered when the task handle is used to unpark the task.
554 ///
555 /// Unpark events are used to pass information about what event caused a task to
556 /// be unparked. In some cases, tasks are waiting on a large number of possible
557 /// events, and need precise information about the wakeup to avoid extraneous
558 /// polling.
559 ///
560 /// Every `Task` handle comes with a set of unpark events which will fire when
561 /// `unpark` is called. When fired, these events insert an identifier into a
562 /// concurrent set, which the task can read from to determine what events
563 /// occurred.
564 ///
565 /// This function immediately invokes the closure, `f`, but arranges things so
566 /// that `task::park` will produce a `Task` handle that includes the given
567 /// unpark event.
568 ///
569 /// # Panics
570 ///
571 /// This function will panic if a task is not currently being executed. That
572 /// is, this method can be dangerous to call outside of an implementation of
573 /// `poll`.
574 #[deprecated(note = "recommended to use `FuturesUnordered` instead")]
575 #[allow(deprecated)]
with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R where F: FnOnce() -> R576 pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R
577     where F: FnOnce() -> R
578 {
579     super::with(|task| {
580         let new_task = BorrowedTask {
581             id: task.id,
582             unpark: task.unpark,
583             events: BorrowedEvents::One(&event, &task.events),
584             map: task.map,
585         };
586 
587         super::set(&new_task, f)
588     })
589 }
590 
591 /// A set insertion to trigger upon `unpark`.
592 ///
593 /// Unpark events are used to communicate information about *why* an unpark
594 /// occurred, in particular populating sets with event identifiers so that the
595 /// unparked task can avoid extraneous polling. See `with_unpark_event` for
596 /// more.
597 #[derive(Clone)]
598 #[deprecated(note = "recommended to use `FuturesUnordered` instead")]
599 #[allow(deprecated)]
600 pub struct UnparkEvent {
601     set: Arc<EventSet>,
602     item: usize,
603 }
604 
605 #[allow(deprecated)]
606 impl UnparkEvent {
607     /// Construct an unpark event that will insert `id` into `set` when
608     /// triggered.
609     #[deprecated(note = "recommended to use `FuturesUnordered` instead")]
new(set: Arc<EventSet>, id: usize) -> UnparkEvent610     pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent {
611         UnparkEvent {
612             set: set,
613             item: id,
614         }
615     }
616 
unpark(&self)617     fn unpark(&self) {
618         self.set.insert(self.item);
619     }
620 }
621 
622 #[allow(deprecated)]
623 impl fmt::Debug for UnparkEvent {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result624     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
625         f.debug_struct("UnparkEvent")
626          .field("set", &"...")
627          .field("item", &self.item)
628          .finish()
629     }
630 }
631 
632 /// A concurrent set which allows for the insertion of `usize` values.
633 ///
634 /// `EventSet`s are used to communicate precise information about the event(s)
635 /// that triggered a task notification. See `task::with_unpark_event` for details.
636 #[deprecated(since="0.1.18", note = "recommended to use `FuturesUnordered` instead")]
637 pub trait EventSet: Send + Sync + 'static {
638     /// Insert the given ID into the set
insert(&self, id: usize)639     fn insert(&self, id: usize);
640 }
641 
642 // Safe implementation of `UnsafeNotify` for `Arc` in the standard library.
643 //
644 // Note that this is a very unsafe implementation! The crucial pieces is that
645 // these two values are considered equivalent:
646 //
647 // * Arc<T>
648 // * *const ArcWrapped<T>
649 //
650 // We don't actually know the layout of `ArcWrapped<T>` as it's an
651 // implementation detail in the standard library. We can work, though, by
652 // casting it through and back an `Arc<T>`.
653 //
654 // This also means that you won't actually fine `UnsafeNotify for Arc<T>`
655 // because it's the wrong level of indirection. These methods are sort of
656 // receiving Arc<T>, but not an owned version. It's... complicated. We may be
657 // one of the first users of unsafe trait objects!
658 
659 struct ArcWrapped<T>(PhantomData<T>);
660 
661 impl<T: Notify + 'static> Notify for ArcWrapped<T> {
notify(&self, id: usize)662     fn notify(&self, id: usize) {
663         unsafe {
664             let me: *const ArcWrapped<T> = self;
665             T::notify(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
666                       id)
667         }
668     }
669 
clone_id(&self, id: usize) -> usize670     fn clone_id(&self, id: usize) -> usize {
671         unsafe {
672             let me: *const ArcWrapped<T> = self;
673             T::clone_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
674                         id)
675         }
676     }
677 
drop_id(&self, id: usize)678     fn drop_id(&self, id: usize) {
679         unsafe {
680             let me: *const ArcWrapped<T> = self;
681             T::drop_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
682                        id)
683         }
684     }
685 }
686 
687 unsafe impl<T: Notify + 'static> UnsafeNotify for ArcWrapped<T> {
clone_raw(&self) -> NotifyHandle688     unsafe fn clone_raw(&self) -> NotifyHandle {
689         let me: *const ArcWrapped<T> = self;
690         let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone();
691         NotifyHandle::from(arc)
692     }
693 
drop_raw(&self)694     unsafe fn drop_raw(&self) {
695         let mut me: *const ArcWrapped<T> = self;
696         let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>;
697         ptr::drop_in_place(me);
698     }
699 }
700 
701 impl<T> From<Arc<T>> for NotifyHandle
702     where T: Notify + 'static,
703 {
from(rc: Arc<T>) -> NotifyHandle704     fn from(rc: Arc<T>) -> NotifyHandle {
705         unsafe {
706             let ptr = mem::transmute::<Arc<T>, *mut ArcWrapped<T>>(rc);
707             NotifyHandle::new(ptr)
708         }
709     }
710 }
711 
712 #[cfg(feature = "nightly")]
713 mod nightly {
714     use super::{TaskUnpark, UnparkEvents};
715     use core::marker::Unpin;
716 
717     impl Unpin for TaskUnpark {}
718     impl Unpin for UnparkEvents {}
719 }
720