1 use crate::future::Future;
2 use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
3 use crate::runtime::task::state::Snapshot;
4 use crate::runtime::task::waker::waker_ref;
5 use crate::runtime::task::{JoinError, Notified, Schedule, Task};
6
7 use std::mem;
8 use std::mem::ManuallyDrop;
9 use std::panic;
10 use std::ptr::NonNull;
11 use std::task::{Context, Poll, Waker};
12
13 /// Typed raw task handle.
14 pub(super) struct Harness<T: Future, S: 'static> {
15 cell: NonNull<Cell<T, S>>,
16 }
17
18 impl<T, S> Harness<T, S>
19 where
20 T: Future,
21 S: 'static,
22 {
from_raw(ptr: NonNull<Header>) -> Harness<T, S>23 pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
24 Harness {
25 cell: ptr.cast::<Cell<T, S>>(),
26 }
27 }
28
header(&self) -> &Header29 fn header(&self) -> &Header {
30 unsafe { &self.cell.as_ref().header }
31 }
32
trailer(&self) -> &Trailer33 fn trailer(&self) -> &Trailer {
34 unsafe { &self.cell.as_ref().trailer }
35 }
36
core(&self) -> &Core<T, S>37 fn core(&self) -> &Core<T, S> {
38 unsafe { &self.cell.as_ref().core }
39 }
40 }
41
42 impl<T, S> Harness<T, S>
43 where
44 T: Future,
45 S: Schedule,
46 {
47 /// Polls the inner future. A ref-count is consumed.
48 ///
49 /// All necessary state checks and transitions are performed.
50 /// Panics raised while polling the future are handled.
poll(self)51 pub(super) fn poll(self) {
52 // We pass our ref-count to `poll_inner`.
53 match self.poll_inner() {
54 PollFuture::Notified => {
55 // The `poll_inner` call has given us two ref-counts back.
56 // We give one of them to a new task and call `yield_now`.
57 self.core()
58 .scheduler
59 .yield_now(Notified(self.get_new_task()));
60
61 // The remaining ref-count is now dropped. We kept the extra
62 // ref-count until now to ensure that even if the `yield_now`
63 // call drops the provided task, the task isn't deallocated
64 // before after `yield_now` returns.
65 self.drop_reference();
66 }
67 PollFuture::Complete => {
68 self.complete();
69 }
70 PollFuture::Dealloc => {
71 self.dealloc();
72 }
73 PollFuture::Done => (),
74 }
75 }
76
77 /// Polls the task and cancel it if necessary. This takes ownership of a
78 /// ref-count.
79 ///
80 /// If the return value is Notified, the caller is given ownership of two
81 /// ref-counts.
82 ///
83 /// If the return value is Complete, the caller is given ownership of a
84 /// single ref-count, which should be passed on to `complete`.
85 ///
86 /// If the return value is Dealloc, then this call consumed the last
87 /// ref-count and the caller should call `dealloc`.
88 ///
89 /// Otherwise the ref-count is consumed and the caller should not access
90 /// `self` again.
poll_inner(&self) -> PollFuture91 fn poll_inner(&self) -> PollFuture {
92 use super::state::{TransitionToIdle, TransitionToRunning};
93
94 match self.header().state.transition_to_running() {
95 TransitionToRunning::Success => {
96 let waker_ref = waker_ref::<T, S>(self.header());
97 let cx = Context::from_waker(&*waker_ref);
98 let res = poll_future(&self.core().stage, cx);
99
100 if res == Poll::Ready(()) {
101 // The future completed. Move on to complete the task.
102 return PollFuture::Complete;
103 }
104
105 match self.header().state.transition_to_idle() {
106 TransitionToIdle::Ok => PollFuture::Done,
107 TransitionToIdle::OkNotified => PollFuture::Notified,
108 TransitionToIdle::OkDealloc => PollFuture::Dealloc,
109 TransitionToIdle::Cancelled => {
110 // The transition to idle failed because the task was
111 // cancelled during the poll.
112
113 cancel_task(&self.core().stage);
114 PollFuture::Complete
115 }
116 }
117 }
118 TransitionToRunning::Cancelled => {
119 cancel_task(&self.core().stage);
120 PollFuture::Complete
121 }
122 TransitionToRunning::Failed => PollFuture::Done,
123 TransitionToRunning::Dealloc => PollFuture::Dealloc,
124 }
125 }
126
127 /// Forcibly shuts down the task.
128 ///
129 /// Attempt to transition to `Running` in order to forcibly shutdown the
130 /// task. If the task is currently running or in a state of completion, then
131 /// there is nothing further to do. When the task completes running, it will
132 /// notice the `CANCELLED` bit and finalize the task.
shutdown(self)133 pub(super) fn shutdown(self) {
134 if !self.header().state.transition_to_shutdown() {
135 // The task is concurrently running. No further work needed.
136 self.drop_reference();
137 return;
138 }
139
140 // By transitioning the lifecycle to `Running`, we have permission to
141 // drop the future.
142 cancel_task(&self.core().stage);
143 self.complete();
144 }
145
dealloc(self)146 pub(super) fn dealloc(self) {
147 // Release the join waker, if there is one.
148 self.trailer().waker.with_mut(drop);
149
150 // Check causality
151 self.core().stage.with_mut(drop);
152
153 unsafe {
154 drop(Box::from_raw(self.cell.as_ptr()));
155 }
156 }
157
158 // ===== join handle =====
159
160 /// Read the task output into `dst`.
try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker)161 pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
162 if can_read_output(self.header(), self.trailer(), waker) {
163 *dst = Poll::Ready(self.core().stage.take_output());
164 }
165 }
166
drop_join_handle_slow(self)167 pub(super) fn drop_join_handle_slow(self) {
168 let mut maybe_panic = None;
169
170 // Try to unset `JOIN_INTEREST`. This must be done as a first step in
171 // case the task concurrently completed.
172 if self.header().state.unset_join_interested().is_err() {
173 // It is our responsibility to drop the output. This is critical as
174 // the task output may not be `Send` and as such must remain with
175 // the scheduler or `JoinHandle`. i.e. if the output remains in the
176 // task structure until the task is deallocated, it may be dropped
177 // by a Waker on any arbitrary thread.
178 let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
179 self.core().stage.drop_future_or_output();
180 }));
181
182 if let Err(panic) = panic {
183 maybe_panic = Some(panic);
184 }
185 }
186
187 // Drop the `JoinHandle` reference, possibly deallocating the task
188 self.drop_reference();
189
190 if let Some(panic) = maybe_panic {
191 panic::resume_unwind(panic);
192 }
193 }
194
195 /// Remotely aborts the task.
196 ///
197 /// The caller should hold a ref-count, but we do not consume it.
198 ///
199 /// This is similar to `shutdown` except that it asks the runtime to perform
200 /// the shutdown. This is necessary to avoid the shutdown happening in the
201 /// wrong thread for non-Send tasks.
remote_abort(self)202 pub(super) fn remote_abort(self) {
203 if self.header().state.transition_to_notified_and_cancel() {
204 // The transition has created a new ref-count, which we turn into
205 // a Notified and pass to the task.
206 //
207 // Since the caller holds a ref-count, the task cannot be destroyed
208 // before the call to `schedule` returns even if the call drops the
209 // `Notified` internally.
210 self.core()
211 .scheduler
212 .schedule(Notified(self.get_new_task()));
213 }
214 }
215
216 // ===== waker behavior =====
217
218 /// This call consumes a ref-count and notifies the task. This will create a
219 /// new Notified and submit it if necessary.
220 ///
221 /// The caller does not need to hold a ref-count besides the one that was
222 /// passed to this call.
wake_by_val(self)223 pub(super) fn wake_by_val(self) {
224 use super::state::TransitionToNotifiedByVal;
225
226 match self.header().state.transition_to_notified_by_val() {
227 TransitionToNotifiedByVal::Submit => {
228 // The caller has given us a ref-count, and the transition has
229 // created a new ref-count, so we now hold two. We turn the new
230 // ref-count Notified and pass it to the call to `schedule`.
231 //
232 // The old ref-count is retained for now to ensure that the task
233 // is not dropped during the call to `schedule` if the call
234 // drops the task it was given.
235 self.core()
236 .scheduler
237 .schedule(Notified(self.get_new_task()));
238
239 // Now that we have completed the call to schedule, we can
240 // release our ref-count.
241 self.drop_reference();
242 }
243 TransitionToNotifiedByVal::Dealloc => {
244 self.dealloc();
245 }
246 TransitionToNotifiedByVal::DoNothing => {}
247 }
248 }
249
250 /// This call notifies the task. It will not consume any ref-counts, but the
251 /// caller should hold a ref-count. This will create a new Notified and
252 /// submit it if necessary.
wake_by_ref(&self)253 pub(super) fn wake_by_ref(&self) {
254 use super::state::TransitionToNotifiedByRef;
255
256 match self.header().state.transition_to_notified_by_ref() {
257 TransitionToNotifiedByRef::Submit => {
258 // The transition above incremented the ref-count for a new task
259 // and the caller also holds a ref-count. The caller's ref-count
260 // ensures that the task is not destroyed even if the new task
261 // is dropped before `schedule` returns.
262 self.core()
263 .scheduler
264 .schedule(Notified(self.get_new_task()));
265 }
266 TransitionToNotifiedByRef::DoNothing => {}
267 }
268 }
269
drop_reference(self)270 pub(super) fn drop_reference(self) {
271 if self.header().state.ref_dec() {
272 self.dealloc();
273 }
274 }
275
276 #[cfg(all(tokio_unstable, feature = "tracing"))]
id(&self) -> Option<&tracing::Id>277 pub(super) fn id(&self) -> Option<&tracing::Id> {
278 self.header().id.as_ref()
279 }
280
281 // ====== internal ======
282
283 /// Completes the task. This method assumes that the state is RUNNING.
complete(self)284 fn complete(self) {
285 // The future has completed and its output has been written to the task
286 // stage. We transition from running to complete.
287
288 let snapshot = self.header().state.transition_to_complete();
289
290 // We catch panics here in case dropping the future or waking the
291 // JoinHandle panics.
292 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
293 if !snapshot.is_join_interested() {
294 // The `JoinHandle` is not interested in the output of
295 // this task. It is our responsibility to drop the
296 // output.
297 self.core().stage.drop_future_or_output();
298 } else if snapshot.has_join_waker() {
299 // Notify the join handle. The previous transition obtains the
300 // lock on the waker cell.
301 self.trailer().wake_join();
302 }
303 }));
304
305 // The task has completed execution and will no longer be scheduled.
306 let num_release = self.release();
307
308 if self.header().state.transition_to_terminal(num_release) {
309 self.dealloc();
310 }
311 }
312
313 /// Releases the task from the scheduler. Returns the number of ref-counts
314 /// that should be decremented.
release(&self) -> usize315 fn release(&self) -> usize {
316 // We don't actually increment the ref-count here, but the new task is
317 // never destroyed, so that's ok.
318 let me = ManuallyDrop::new(self.get_new_task());
319
320 if let Some(task) = self.core().scheduler.release(&me) {
321 mem::forget(task);
322 2
323 } else {
324 1
325 }
326 }
327
328 /// Creates a new task that holds its own ref-count.
329 ///
330 /// # Safety
331 ///
332 /// Any use of `self` after this call must ensure that a ref-count to the
333 /// task holds the task alive until after the use of `self`. Passing the
334 /// returned Task to any method on `self` is unsound if dropping the Task
335 /// could drop `self` before the call on `self` returned.
get_new_task(&self) -> Task<S>336 fn get_new_task(&self) -> Task<S> {
337 // safety: The header is at the beginning of the cell, so this cast is
338 // safe.
339 unsafe { Task::from_raw(self.cell.cast()) }
340 }
341 }
342
can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool343 fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
344 // Load a snapshot of the current task state
345 let snapshot = header.state.load();
346
347 debug_assert!(snapshot.is_join_interested());
348
349 if !snapshot.is_complete() {
350 // The waker must be stored in the task struct.
351 let res = if snapshot.has_join_waker() {
352 // There already is a waker stored in the struct. If it matches
353 // the provided waker, then there is no further work to do.
354 // Otherwise, the waker must be swapped.
355 let will_wake = unsafe {
356 // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
357 // may mutate the `waker` field.
358 trailer.will_wake(waker)
359 };
360
361 if will_wake {
362 // The task is not complete **and** the waker is up to date,
363 // there is nothing further that needs to be done.
364 return false;
365 }
366
367 // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
368 // field then update the field with the new join worker.
369 //
370 // This requires two atomic operations, unsetting the bit and
371 // then resetting it. If the task transitions to complete
372 // concurrently to either one of those operations, then setting
373 // the join waker fails and we proceed to reading the task
374 // output.
375 header
376 .state
377 .unset_waker()
378 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
379 } else {
380 set_join_waker(header, trailer, waker.clone(), snapshot)
381 };
382
383 match res {
384 Ok(_) => return false,
385 Err(snapshot) => {
386 assert!(snapshot.is_complete());
387 }
388 }
389 }
390 true
391 }
392
set_join_waker( header: &Header, trailer: &Trailer, waker: Waker, snapshot: Snapshot, ) -> Result<Snapshot, Snapshot>393 fn set_join_waker(
394 header: &Header,
395 trailer: &Trailer,
396 waker: Waker,
397 snapshot: Snapshot,
398 ) -> Result<Snapshot, Snapshot> {
399 assert!(snapshot.is_join_interested());
400 assert!(!snapshot.has_join_waker());
401
402 // Safety: Only the `JoinHandle` may set the `waker` field. When
403 // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
404 unsafe {
405 trailer.set_waker(Some(waker));
406 }
407
408 // Update the `JoinWaker` state accordingly
409 let res = header.state.set_join_waker();
410
411 // If the state could not be updated, then clear the join waker
412 if res.is_err() {
413 unsafe {
414 trailer.set_waker(None);
415 }
416 }
417
418 res
419 }
420
421 enum PollFuture {
422 Complete,
423 Notified,
424 Done,
425 Dealloc,
426 }
427
428 /// Cancels the task and store the appropriate error in the stage field.
cancel_task<T: Future>(stage: &CoreStage<T>)429 fn cancel_task<T: Future>(stage: &CoreStage<T>) {
430 // Drop the future from a panic guard.
431 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
432 stage.drop_future_or_output();
433 }));
434
435 match res {
436 Ok(()) => {
437 stage.store_output(Err(JoinError::cancelled()));
438 }
439 Err(panic) => {
440 stage.store_output(Err(JoinError::panic(panic)));
441 }
442 }
443 }
444
445 /// Polls the future. If the future completes, the output is written to the
446 /// stage field.
poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()>447 fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
448 // Poll the future.
449 let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
450 struct Guard<'a, T: Future> {
451 core: &'a CoreStage<T>,
452 }
453 impl<'a, T: Future> Drop for Guard<'a, T> {
454 fn drop(&mut self) {
455 // If the future panics on poll, we drop it inside the panic
456 // guard.
457 self.core.drop_future_or_output();
458 }
459 }
460 let guard = Guard { core };
461 let res = guard.core.poll(cx);
462 mem::forget(guard);
463 res
464 }));
465
466 // Prepare output for being placed in the core stage.
467 let output = match output {
468 Ok(Poll::Pending) => return Poll::Pending,
469 Ok(Poll::Ready(output)) => Ok(output),
470 Err(panic) => Err(JoinError::panic(panic)),
471 };
472
473 // Catch and ignore panics if the future panics on drop.
474 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
475 core.store_output(output);
476 }));
477
478 Poll::Ready(())
479 }
480