1 use crate::future::Future;
2 use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
3 use crate::runtime::task::state::Snapshot;
4 use crate::runtime::task::waker::waker_ref;
5 use crate::runtime::task::{JoinError, Notified, Schedule, Task};
6 
7 use std::mem;
8 use std::mem::ManuallyDrop;
9 use std::panic;
10 use std::ptr::NonNull;
11 use std::task::{Context, Poll, Waker};
12 
13 /// Typed raw task handle.
14 pub(super) struct Harness<T: Future, S: 'static> {
15     cell: NonNull<Cell<T, S>>,
16 }
17 
18 impl<T, S> Harness<T, S>
19 where
20     T: Future,
21     S: 'static,
22 {
from_raw(ptr: NonNull<Header>) -> Harness<T, S>23     pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
24         Harness {
25             cell: ptr.cast::<Cell<T, S>>(),
26         }
27     }
28 
header(&self) -> &Header29     fn header(&self) -> &Header {
30         unsafe { &self.cell.as_ref().header }
31     }
32 
trailer(&self) -> &Trailer33     fn trailer(&self) -> &Trailer {
34         unsafe { &self.cell.as_ref().trailer }
35     }
36 
core(&self) -> &Core<T, S>37     fn core(&self) -> &Core<T, S> {
38         unsafe { &self.cell.as_ref().core }
39     }
40 }
41 
42 impl<T, S> Harness<T, S>
43 where
44     T: Future,
45     S: Schedule,
46 {
47     /// Polls the inner future. A ref-count is consumed.
48     ///
49     /// All necessary state checks and transitions are performed.
50     /// Panics raised while polling the future are handled.
poll(self)51     pub(super) fn poll(self) {
52         // We pass our ref-count to `poll_inner`.
53         match self.poll_inner() {
54             PollFuture::Notified => {
55                 // The `poll_inner` call has given us two ref-counts back.
56                 // We give one of them to a new task and call `yield_now`.
57                 self.core()
58                     .scheduler
59                     .yield_now(Notified(self.get_new_task()));
60 
61                 // The remaining ref-count is now dropped. We kept the extra
62                 // ref-count until now to ensure that even if the `yield_now`
63                 // call drops the provided task, the task isn't deallocated
64                 // before after `yield_now` returns.
65                 self.drop_reference();
66             }
67             PollFuture::Complete => {
68                 self.complete();
69             }
70             PollFuture::Dealloc => {
71                 self.dealloc();
72             }
73             PollFuture::Done => (),
74         }
75     }
76 
77     /// Polls the task and cancel it if necessary. This takes ownership of a
78     /// ref-count.
79     ///
80     /// If the return value is Notified, the caller is given ownership of two
81     /// ref-counts.
82     ///
83     /// If the return value is Complete, the caller is given ownership of a
84     /// single ref-count, which should be passed on to `complete`.
85     ///
86     /// If the return value is Dealloc, then this call consumed the last
87     /// ref-count and the caller should call `dealloc`.
88     ///
89     /// Otherwise the ref-count is consumed and the caller should not access
90     /// `self` again.
poll_inner(&self) -> PollFuture91     fn poll_inner(&self) -> PollFuture {
92         use super::state::{TransitionToIdle, TransitionToRunning};
93 
94         match self.header().state.transition_to_running() {
95             TransitionToRunning::Success => {
96                 let waker_ref = waker_ref::<T, S>(self.header());
97                 let cx = Context::from_waker(&*waker_ref);
98                 let res = poll_future(&self.core().stage, cx);
99 
100                 if res == Poll::Ready(()) {
101                     // The future completed. Move on to complete the task.
102                     return PollFuture::Complete;
103                 }
104 
105                 match self.header().state.transition_to_idle() {
106                     TransitionToIdle::Ok => PollFuture::Done,
107                     TransitionToIdle::OkNotified => PollFuture::Notified,
108                     TransitionToIdle::OkDealloc => PollFuture::Dealloc,
109                     TransitionToIdle::Cancelled => {
110                         // The transition to idle failed because the task was
111                         // cancelled during the poll.
112 
113                         cancel_task(&self.core().stage);
114                         PollFuture::Complete
115                     }
116                 }
117             }
118             TransitionToRunning::Cancelled => {
119                 cancel_task(&self.core().stage);
120                 PollFuture::Complete
121             }
122             TransitionToRunning::Failed => PollFuture::Done,
123             TransitionToRunning::Dealloc => PollFuture::Dealloc,
124         }
125     }
126 
127     /// Forcibly shuts down the task.
128     ///
129     /// Attempt to transition to `Running` in order to forcibly shutdown the
130     /// task. If the task is currently running or in a state of completion, then
131     /// there is nothing further to do. When the task completes running, it will
132     /// notice the `CANCELLED` bit and finalize the task.
shutdown(self)133     pub(super) fn shutdown(self) {
134         if !self.header().state.transition_to_shutdown() {
135             // The task is concurrently running. No further work needed.
136             self.drop_reference();
137             return;
138         }
139 
140         // By transitioning the lifecycle to `Running`, we have permission to
141         // drop the future.
142         cancel_task(&self.core().stage);
143         self.complete();
144     }
145 
dealloc(self)146     pub(super) fn dealloc(self) {
147         // Release the join waker, if there is one.
148         self.trailer().waker.with_mut(drop);
149 
150         // Check causality
151         self.core().stage.with_mut(drop);
152 
153         unsafe {
154             drop(Box::from_raw(self.cell.as_ptr()));
155         }
156     }
157 
158     // ===== join handle =====
159 
160     /// Read the task output into `dst`.
try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker)161     pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
162         if can_read_output(self.header(), self.trailer(), waker) {
163             *dst = Poll::Ready(self.core().stage.take_output());
164         }
165     }
166 
drop_join_handle_slow(self)167     pub(super) fn drop_join_handle_slow(self) {
168         let mut maybe_panic = None;
169 
170         // Try to unset `JOIN_INTEREST`. This must be done as a first step in
171         // case the task concurrently completed.
172         if self.header().state.unset_join_interested().is_err() {
173             // It is our responsibility to drop the output. This is critical as
174             // the task output may not be `Send` and as such must remain with
175             // the scheduler or `JoinHandle`. i.e. if the output remains in the
176             // task structure until the task is deallocated, it may be dropped
177             // by a Waker on any arbitrary thread.
178             let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
179                 self.core().stage.drop_future_or_output();
180             }));
181 
182             if let Err(panic) = panic {
183                 maybe_panic = Some(panic);
184             }
185         }
186 
187         // Drop the `JoinHandle` reference, possibly deallocating the task
188         self.drop_reference();
189 
190         if let Some(panic) = maybe_panic {
191             panic::resume_unwind(panic);
192         }
193     }
194 
195     /// Remotely aborts the task.
196     ///
197     /// The caller should hold a ref-count, but we do not consume it.
198     ///
199     /// This is similar to `shutdown` except that it asks the runtime to perform
200     /// the shutdown. This is necessary to avoid the shutdown happening in the
201     /// wrong thread for non-Send tasks.
remote_abort(self)202     pub(super) fn remote_abort(self) {
203         if self.header().state.transition_to_notified_and_cancel() {
204             // The transition has created a new ref-count, which we turn into
205             // a Notified and pass to the task.
206             //
207             // Since the caller holds a ref-count, the task cannot be destroyed
208             // before the call to `schedule` returns even if the call drops the
209             // `Notified` internally.
210             self.core()
211                 .scheduler
212                 .schedule(Notified(self.get_new_task()));
213         }
214     }
215 
216     // ===== waker behavior =====
217 
218     /// This call consumes a ref-count and notifies the task. This will create a
219     /// new Notified and submit it if necessary.
220     ///
221     /// The caller does not need to hold a ref-count besides the one that was
222     /// passed to this call.
wake_by_val(self)223     pub(super) fn wake_by_val(self) {
224         use super::state::TransitionToNotifiedByVal;
225 
226         match self.header().state.transition_to_notified_by_val() {
227             TransitionToNotifiedByVal::Submit => {
228                 // The caller has given us a ref-count, and the transition has
229                 // created a new ref-count, so we now hold two. We turn the new
230                 // ref-count Notified and pass it to the call to `schedule`.
231                 //
232                 // The old ref-count is retained for now to ensure that the task
233                 // is not dropped during the call to `schedule` if the call
234                 // drops the task it was given.
235                 self.core()
236                     .scheduler
237                     .schedule(Notified(self.get_new_task()));
238 
239                 // Now that we have completed the call to schedule, we can
240                 // release our ref-count.
241                 self.drop_reference();
242             }
243             TransitionToNotifiedByVal::Dealloc => {
244                 self.dealloc();
245             }
246             TransitionToNotifiedByVal::DoNothing => {}
247         }
248     }
249 
250     /// This call notifies the task. It will not consume any ref-counts, but the
251     /// caller should hold a ref-count.  This will create a new Notified and
252     /// submit it if necessary.
wake_by_ref(&self)253     pub(super) fn wake_by_ref(&self) {
254         use super::state::TransitionToNotifiedByRef;
255 
256         match self.header().state.transition_to_notified_by_ref() {
257             TransitionToNotifiedByRef::Submit => {
258                 // The transition above incremented the ref-count for a new task
259                 // and the caller also holds a ref-count. The caller's ref-count
260                 // ensures that the task is not destroyed even if the new task
261                 // is dropped before `schedule` returns.
262                 self.core()
263                     .scheduler
264                     .schedule(Notified(self.get_new_task()));
265             }
266             TransitionToNotifiedByRef::DoNothing => {}
267         }
268     }
269 
drop_reference(self)270     pub(super) fn drop_reference(self) {
271         if self.header().state.ref_dec() {
272             self.dealloc();
273         }
274     }
275 
276     #[cfg(all(tokio_unstable, feature = "tracing"))]
id(&self) -> Option<&tracing::Id>277     pub(super) fn id(&self) -> Option<&tracing::Id> {
278         self.header().id.as_ref()
279     }
280 
281     // ====== internal ======
282 
283     /// Completes the task. This method assumes that the state is RUNNING.
complete(self)284     fn complete(self) {
285         // The future has completed and its output has been written to the task
286         // stage. We transition from running to complete.
287 
288         let snapshot = self.header().state.transition_to_complete();
289 
290         // We catch panics here in case dropping the future or waking the
291         // JoinHandle panics.
292         let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
293             if !snapshot.is_join_interested() {
294                 // The `JoinHandle` is not interested in the output of
295                 // this task. It is our responsibility to drop the
296                 // output.
297                 self.core().stage.drop_future_or_output();
298             } else if snapshot.has_join_waker() {
299                 // Notify the join handle. The previous transition obtains the
300                 // lock on the waker cell.
301                 self.trailer().wake_join();
302             }
303         }));
304 
305         // The task has completed execution and will no longer be scheduled.
306         let num_release = self.release();
307 
308         if self.header().state.transition_to_terminal(num_release) {
309             self.dealloc();
310         }
311     }
312 
313     /// Releases the task from the scheduler. Returns the number of ref-counts
314     /// that should be decremented.
release(&self) -> usize315     fn release(&self) -> usize {
316         // We don't actually increment the ref-count here, but the new task is
317         // never destroyed, so that's ok.
318         let me = ManuallyDrop::new(self.get_new_task());
319 
320         if let Some(task) = self.core().scheduler.release(&me) {
321             mem::forget(task);
322             2
323         } else {
324             1
325         }
326     }
327 
328     /// Creates a new task that holds its own ref-count.
329     ///
330     /// # Safety
331     ///
332     /// Any use of `self` after this call must ensure that a ref-count to the
333     /// task holds the task alive until after the use of `self`. Passing the
334     /// returned Task to any method on `self` is unsound if dropping the Task
335     /// could drop `self` before the call on `self` returned.
get_new_task(&self) -> Task<S>336     fn get_new_task(&self) -> Task<S> {
337         // safety: The header is at the beginning of the cell, so this cast is
338         // safe.
339         unsafe { Task::from_raw(self.cell.cast()) }
340     }
341 }
342 
can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool343 fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
344     // Load a snapshot of the current task state
345     let snapshot = header.state.load();
346 
347     debug_assert!(snapshot.is_join_interested());
348 
349     if !snapshot.is_complete() {
350         // The waker must be stored in the task struct.
351         let res = if snapshot.has_join_waker() {
352             // There already is a waker stored in the struct. If it matches
353             // the provided waker, then there is no further work to do.
354             // Otherwise, the waker must be swapped.
355             let will_wake = unsafe {
356                 // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
357                 // may mutate the `waker` field.
358                 trailer.will_wake(waker)
359             };
360 
361             if will_wake {
362                 // The task is not complete **and** the waker is up to date,
363                 // there is nothing further that needs to be done.
364                 return false;
365             }
366 
367             // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
368             // field then update the field with the new join worker.
369             //
370             // This requires two atomic operations, unsetting the bit and
371             // then resetting it. If the task transitions to complete
372             // concurrently to either one of those operations, then setting
373             // the join waker fails and we proceed to reading the task
374             // output.
375             header
376                 .state
377                 .unset_waker()
378                 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
379         } else {
380             set_join_waker(header, trailer, waker.clone(), snapshot)
381         };
382 
383         match res {
384             Ok(_) => return false,
385             Err(snapshot) => {
386                 assert!(snapshot.is_complete());
387             }
388         }
389     }
390     true
391 }
392 
set_join_waker( header: &Header, trailer: &Trailer, waker: Waker, snapshot: Snapshot, ) -> Result<Snapshot, Snapshot>393 fn set_join_waker(
394     header: &Header,
395     trailer: &Trailer,
396     waker: Waker,
397     snapshot: Snapshot,
398 ) -> Result<Snapshot, Snapshot> {
399     assert!(snapshot.is_join_interested());
400     assert!(!snapshot.has_join_waker());
401 
402     // Safety: Only the `JoinHandle` may set the `waker` field. When
403     // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
404     unsafe {
405         trailer.set_waker(Some(waker));
406     }
407 
408     // Update the `JoinWaker` state accordingly
409     let res = header.state.set_join_waker();
410 
411     // If the state could not be updated, then clear the join waker
412     if res.is_err() {
413         unsafe {
414             trailer.set_waker(None);
415         }
416     }
417 
418     res
419 }
420 
421 enum PollFuture {
422     Complete,
423     Notified,
424     Done,
425     Dealloc,
426 }
427 
428 /// Cancels the task and store the appropriate error in the stage field.
cancel_task<T: Future>(stage: &CoreStage<T>)429 fn cancel_task<T: Future>(stage: &CoreStage<T>) {
430     // Drop the future from a panic guard.
431     let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
432         stage.drop_future_or_output();
433     }));
434 
435     match res {
436         Ok(()) => {
437             stage.store_output(Err(JoinError::cancelled()));
438         }
439         Err(panic) => {
440             stage.store_output(Err(JoinError::panic(panic)));
441         }
442     }
443 }
444 
445 /// Polls the future. If the future completes, the output is written to the
446 /// stage field.
poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()>447 fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
448     // Poll the future.
449     let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
450         struct Guard<'a, T: Future> {
451             core: &'a CoreStage<T>,
452         }
453         impl<'a, T: Future> Drop for Guard<'a, T> {
454             fn drop(&mut self) {
455                 // If the future panics on poll, we drop it inside the panic
456                 // guard.
457                 self.core.drop_future_or_output();
458             }
459         }
460         let guard = Guard { core };
461         let res = guard.core.poll(cx);
462         mem::forget(guard);
463         res
464     }));
465 
466     // Prepare output for being placed in the core stage.
467     let output = match output {
468         Ok(Poll::Pending) => return Poll::Pending,
469         Ok(Poll::Ready(output)) => Ok(output),
470         Err(panic) => Err(JoinError::panic(panic)),
471     };
472 
473     // Catch and ignore panics if the future panics on drop.
474     let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
475         core.store_output(output);
476     }));
477 
478     Poll::Ready(())
479 }
480