1 //! The task module. 2 //! 3 //! The task module contains the code that manages spawned tasks and provides a 4 //! safe API for the rest of the runtime to use. Each task in a runtime is 5 //! stored in an OwnedTasks or LocalOwnedTasks object. 6 //! 7 //! # Task reference types 8 //! 9 //! A task is usually referenced by multiple handles, and there are several 10 //! types of handles. 11 //! 12 //! * OwnedTask - tasks stored in an OwnedTasks or LocalOwnedTasks are of this 13 //! reference type. 14 //! 15 //! * JoinHandle - each task has a JoinHandle that allows access to the output 16 //! of the task. 17 //! 18 //! * Waker - every waker for a task has this reference type. There can be any 19 //! number of waker references. 20 //! 21 //! * Notified - tracks whether the task is notified. 22 //! 23 //! * Unowned - this task reference type is used for tasks not stored in any 24 //! runtime. Mainly used for blocking tasks, but also in tests. 25 //! 26 //! The task uses a reference count to keep track of how many active references 27 //! exist. The Unowned reference type takes up two ref-counts. All other 28 //! reference types take up a single ref-count. 29 //! 30 //! Besides the waker type, each task has at most one of each reference type. 31 //! 32 //! # State 33 //! 34 //! The task stores its state in an atomic usize with various bitfields for the 35 //! necessary information. The state has the following bitfields: 36 //! 37 //! * RUNNING - Tracks whether the task is currently being polled or cancelled. 38 //! This bit functions as a lock around the task. 39 //! 40 //! * COMPLETE - Is one once the future has fully completed and has been 41 //! dropped. Never unset once set. Never set together with RUNNING. 42 //! 43 //! * NOTIFIED - Tracks whether a Notified object currently exists. 44 //! 45 //! * CANCELLED - Is set to one for tasks that should be cancelled as soon as 46 //! possible. May take any value for completed tasks. 47 //! 48 //! * JOIN_INTEREST - Is set to one if there exists a JoinHandle. 49 //! 50 //! * JOIN_WAKER - Is set to one if the JoinHandle has set a waker. 51 //! 52 //! The rest of the bits are used for the ref-count. 53 //! 54 //! # Fields in the task 55 //! 56 //! The task has various fields. This section describes how and when it is safe 57 //! to access a field. 58 //! 59 //! * The state field is accessed with atomic instructions. 60 //! 61 //! * The OwnedTask reference has exclusive access to the `owned` field. 62 //! 63 //! * The Notified reference has exclusive access to the `queue_next` field. 64 //! 65 //! * The `owner_id` field can be set as part of construction of the task, but 66 //! is otherwise immutable and anyone can access the field immutably without 67 //! synchronization. 68 //! 69 //! * If COMPLETE is one, then the JoinHandle has exclusive access to the 70 //! stage field. If COMPLETE is zero, then the RUNNING bitfield functions as 71 //! a lock for the stage field, and it can be accessed only by the thread 72 //! that set RUNNING to one. 73 //! 74 //! * If JOIN_WAKER is zero, then the JoinHandle has exclusive access to the 75 //! join handle waker. If JOIN_WAKER and COMPLETE are both one, then the 76 //! thread that set COMPLETE to one has exclusive access to the join handle 77 //! waker. 78 //! 79 //! All other fields are immutable and can be accessed immutably without 80 //! synchronization by anyone. 81 //! 82 //! # Safety 83 //! 84 //! This section goes through various situations and explains why the API is 85 //! safe in that situation. 86 //! 87 //! ## Polling or dropping the future 88 //! 89 //! Any mutable access to the future happens after obtaining a lock by modifying 90 //! the RUNNING field, so exclusive access is ensured. 91 //! 92 //! When the task completes, exclusive access to the output is transferred to 93 //! the JoinHandle. If the JoinHandle is already dropped when the transition to 94 //! complete happens, the thread performing that transition retains exclusive 95 //! access to the output and should immediately drop it. 96 //! 97 //! ## Non-Send futures 98 //! 99 //! If a future is not Send, then it is bound to a LocalOwnedTasks. The future 100 //! will only ever be polled or dropped given a LocalNotified or inside a call 101 //! to LocalOwnedTasks::shutdown_all. In either case, it is guaranteed that the 102 //! future is on the right thread. 103 //! 104 //! If the task is never removed from the LocalOwnedTasks, then it is leaked, so 105 //! there is no risk that the task is dropped on some other thread when the last 106 //! ref-count drops. 107 //! 108 //! ## Non-Send output 109 //! 110 //! When a task completes, the output is placed in the stage of the task. Then, 111 //! a transition that sets COMPLETE to true is performed, and the value of 112 //! JOIN_INTEREST when this transition happens is read. 113 //! 114 //! If JOIN_INTEREST is zero when the transition to COMPLETE happens, then the 115 //! output is immediately dropped. 116 //! 117 //! If JOIN_INTEREST is one when the transition to COMPLETE happens, then the 118 //! JoinHandle is responsible for cleaning up the output. If the output is not 119 //! Send, then this happens: 120 //! 121 //! 1. The output is created on the thread that the future was polled on. Since 122 //! only non-Send futures can have non-Send output, the future was polled on 123 //! the thread that the future was spawned from. 124 //! 2. Since JoinHandle<Output> is not Send if Output is not Send, the 125 //! JoinHandle is also on the thread that the future was spawned from. 126 //! 3. Thus, the JoinHandle will not move the output across threads when it 127 //! takes or drops the output. 128 //! 129 //! ## Recursive poll/shutdown 130 //! 131 //! Calling poll from inside a shutdown call or vice-versa is not prevented by 132 //! the API exposed by the task module, so this has to be safe. In either case, 133 //! the lock in the RUNNING bitfield makes the inner call return immediately. If 134 //! the inner call is a `shutdown` call, then the CANCELLED bit is set, and the 135 //! poll call will notice it when the poll finishes, and the task is cancelled 136 //! at that point. 137 138 mod core; 139 use self::core::Cell; 140 use self::core::Header; 141 142 mod error; 143 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 144 pub use self::error::JoinError; 145 146 mod harness; 147 use self::harness::Harness; 148 149 cfg_rt_multi_thread! { 150 mod inject; 151 pub(super) use self::inject::Inject; 152 } 153 154 mod join; 155 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 156 pub use self::join::JoinHandle; 157 158 mod list; 159 pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks}; 160 161 mod raw; 162 use self::raw::RawTask; 163 164 mod state; 165 use self::state::State; 166 167 mod waker; 168 169 use crate::future::Future; 170 use crate::util::linked_list; 171 172 use std::marker::PhantomData; 173 use std::ptr::NonNull; 174 use std::{fmt, mem}; 175 176 /// An owned handle to the task, tracked by ref count. 177 #[repr(transparent)] 178 pub(crate) struct Task<S: 'static> { 179 raw: RawTask, 180 _p: PhantomData<S>, 181 } 182 183 unsafe impl<S> Send for Task<S> {} 184 unsafe impl<S> Sync for Task<S> {} 185 186 /// A task was notified. 187 #[repr(transparent)] 188 pub(crate) struct Notified<S: 'static>(Task<S>); 189 190 // safety: This type cannot be used to touch the task without first verifying 191 // that the value is on a thread where it is safe to poll the task. 192 unsafe impl<S: Schedule> Send for Notified<S> {} 193 unsafe impl<S: Schedule> Sync for Notified<S> {} 194 195 /// A non-Send variant of Notified with the invariant that it is on a thread 196 /// where it is safe to poll it. 197 #[repr(transparent)] 198 pub(crate) struct LocalNotified<S: 'static> { 199 task: Task<S>, 200 _not_send: PhantomData<*const ()>, 201 } 202 203 /// A task that is not owned by any OwnedTasks. Used for blocking tasks. 204 /// This type holds two ref-counts. 205 pub(crate) struct UnownedTask<S: 'static> { 206 raw: RawTask, 207 _p: PhantomData<S>, 208 } 209 210 // safety: This type can only be created given a Send task. 211 unsafe impl<S> Send for UnownedTask<S> {} 212 unsafe impl<S> Sync for UnownedTask<S> {} 213 214 /// Task result sent back. 215 pub(crate) type Result<T> = std::result::Result<T, JoinError>; 216 217 pub(crate) trait Schedule: Sync + Sized + 'static { 218 /// The task has completed work and is ready to be released. The scheduler 219 /// should release it immediately and return it. The task module will batch 220 /// the ref-dec with setting other options. 221 /// 222 /// If the scheduler has already released the task, then None is returned. release(&self, task: &Task<Self>) -> Option<Task<Self>>223 fn release(&self, task: &Task<Self>) -> Option<Task<Self>>; 224 225 /// Schedule the task schedule(&self, task: Notified<Self>)226 fn schedule(&self, task: Notified<Self>); 227 228 /// Schedule the task to run in the near future, yielding the thread to 229 /// other tasks. yield_now(&self, task: Notified<Self>)230 fn yield_now(&self, task: Notified<Self>) { 231 self.schedule(task); 232 } 233 } 234 235 cfg_rt! { 236 /// This is the constructor for a new task. Three references to the task are 237 /// created. The first task reference is usually put into an OwnedTasks 238 /// immediately. The Notified is sent to the scheduler as an ordinary 239 /// notification. 240 fn new_task<T, S>( 241 task: T, 242 scheduler: S 243 ) -> (Task<S>, Notified<S>, JoinHandle<T::Output>) 244 where 245 S: Schedule, 246 T: Future + 'static, 247 T::Output: 'static, 248 { 249 let raw = RawTask::new::<T, S>(task, scheduler); 250 let task = Task { 251 raw, 252 _p: PhantomData, 253 }; 254 let notified = Notified(Task { 255 raw, 256 _p: PhantomData, 257 }); 258 let join = JoinHandle::new(raw); 259 260 (task, notified, join) 261 } 262 263 /// Creates a new task with an associated join handle. This method is used 264 /// only when the task is not going to be stored in an `OwnedTasks` list. 265 /// 266 /// Currently only blocking tasks use this method. 267 pub(crate) fn unowned<T, S>(task: T, scheduler: S) -> (UnownedTask<S>, JoinHandle<T::Output>) 268 where 269 S: Schedule, 270 T: Send + Future + 'static, 271 T::Output: Send + 'static, 272 { 273 let (task, notified, join) = new_task(task, scheduler); 274 275 // This transfers the ref-count of task and notified into an UnownedTask. 276 // This is valid because an UnownedTask holds two ref-counts. 277 let unowned = UnownedTask { 278 raw: task.raw, 279 _p: PhantomData, 280 }; 281 std::mem::forget(task); 282 std::mem::forget(notified); 283 284 (unowned, join) 285 } 286 } 287 288 impl<S: 'static> Task<S> { from_raw(ptr: NonNull<Header>) -> Task<S>289 unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { 290 Task { 291 raw: RawTask::from_raw(ptr), 292 _p: PhantomData, 293 } 294 } 295 header(&self) -> &Header296 fn header(&self) -> &Header { 297 self.raw.header() 298 } 299 } 300 301 impl<S: 'static> Notified<S> { header(&self) -> &Header302 fn header(&self) -> &Header { 303 self.0.header() 304 } 305 } 306 307 cfg_rt_multi_thread! { 308 impl<S: 'static> Notified<S> { 309 unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> { 310 Notified(Task::from_raw(ptr)) 311 } 312 } 313 314 impl<S: 'static> Task<S> { 315 fn into_raw(self) -> NonNull<Header> { 316 let ret = self.header().into(); 317 mem::forget(self); 318 ret 319 } 320 } 321 322 impl<S: 'static> Notified<S> { 323 fn into_raw(self) -> NonNull<Header> { 324 self.0.into_raw() 325 } 326 } 327 } 328 329 impl<S: Schedule> Task<S> { 330 /// Pre-emptively cancels the task as part of the shutdown process. shutdown(self)331 pub(crate) fn shutdown(self) { 332 let raw = self.raw; 333 mem::forget(self); 334 raw.shutdown(); 335 } 336 } 337 338 impl<S: Schedule> LocalNotified<S> { 339 /// Runs the task. run(self)340 pub(crate) fn run(self) { 341 let raw = self.task.raw; 342 mem::forget(self); 343 raw.poll(); 344 } 345 } 346 347 impl<S: Schedule> UnownedTask<S> { 348 // Used in test of the inject queue. 349 #[cfg(test)] 350 #[cfg_attr(target_arch = "wasm32", allow(dead_code))] into_notified(self) -> Notified<S>351 pub(super) fn into_notified(self) -> Notified<S> { 352 Notified(self.into_task()) 353 } 354 into_task(self) -> Task<S>355 fn into_task(self) -> Task<S> { 356 // Convert into a task. 357 let task = Task { 358 raw: self.raw, 359 _p: PhantomData, 360 }; 361 mem::forget(self); 362 363 // Drop a ref-count since an UnownedTask holds two. 364 task.header().state.ref_dec(); 365 366 task 367 } 368 run(self)369 pub(crate) fn run(self) { 370 let raw = self.raw; 371 mem::forget(self); 372 373 // Transfer one ref-count to a Task object. 374 let task = Task::<S> { 375 raw, 376 _p: PhantomData, 377 }; 378 379 // Use the other ref-count to poll the task. 380 raw.poll(); 381 // Decrement our extra ref-count 382 drop(task); 383 } 384 shutdown(self)385 pub(crate) fn shutdown(self) { 386 self.into_task().shutdown() 387 } 388 } 389 390 impl<S: 'static> Drop for Task<S> { drop(&mut self)391 fn drop(&mut self) { 392 // Decrement the ref count 393 if self.header().state.ref_dec() { 394 // Deallocate if this is the final ref count 395 self.raw.dealloc(); 396 } 397 } 398 } 399 400 impl<S: 'static> Drop for UnownedTask<S> { drop(&mut self)401 fn drop(&mut self) { 402 // Decrement the ref count 403 if self.raw.header().state.ref_dec_twice() { 404 // Deallocate if this is the final ref count 405 self.raw.dealloc(); 406 } 407 } 408 } 409 410 impl<S> fmt::Debug for Task<S> { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result411 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 412 write!(fmt, "Task({:p})", self.header()) 413 } 414 } 415 416 impl<S> fmt::Debug for Notified<S> { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result417 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 418 write!(fmt, "task::Notified({:p})", self.0.header()) 419 } 420 } 421 422 /// # Safety 423 /// 424 /// Tasks are pinned. 425 unsafe impl<S> linked_list::Link for Task<S> { 426 type Handle = Task<S>; 427 type Target = Header; 428 as_raw(handle: &Task<S>) -> NonNull<Header>429 fn as_raw(handle: &Task<S>) -> NonNull<Header> { 430 handle.header().into() 431 } 432 from_raw(ptr: NonNull<Header>) -> Task<S>433 unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { 434 Task::from_raw(ptr) 435 } 436 pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>>437 unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> { 438 // Not super great as it avoids some of looms checking... 439 NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr)) 440 } 441 } 442