1 use crate::loom::sync::atomic::AtomicUsize; 2 3 use std::fmt; 4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 5 use std::usize; 6 7 pub(super) struct State { 8 val: AtomicUsize, 9 } 10 11 /// Current state value 12 #[derive(Copy, Clone)] 13 pub(super) struct Snapshot(usize); 14 15 /// The task is currently being run. 16 const RUNNING: usize = 0b00_0001; 17 18 /// The task has been notified by a waker. 19 const NOTIFIED: usize = 0b00_0010; 20 21 /// The task is complete. 22 /// 23 /// Once this bit is set, it is never unset 24 const COMPLETE: usize = 0b00_0100; 25 26 /// The primary task handle has been dropped. 27 const RELEASED: usize = 0b00_1000; 28 29 /// The join handle is still around 30 const JOIN_INTEREST: usize = 0b01_0000; 31 32 /// A join handle waker has been set 33 const JOIN_WAKER: usize = 0b10_0000; 34 35 /// The task has been forcibly canceled. 36 const CANCELLED: usize = 0b100_0000; 37 38 /// All bits 39 const LIFECYCLE_MASK: usize = 40 RUNNING | NOTIFIED | COMPLETE | RELEASED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; 41 42 /// Bits used by the waker ref count portion of the state. 43 /// 44 /// Ref counts only cover **wakers**. Other handles are tracked with other state 45 /// bits. 46 const WAKER_COUNT_MASK: usize = usize::MAX - LIFECYCLE_MASK; 47 48 /// Number of positions to shift the ref count 49 const WAKER_COUNT_SHIFT: usize = WAKER_COUNT_MASK.count_zeros() as usize; 50 51 /// One ref count 52 const WAKER_ONE: usize = 1 << WAKER_COUNT_SHIFT; 53 54 /// Initial state 55 const INITIAL_STATE: usize = NOTIFIED; 56 57 /// All transitions are performed via RMW operations. This establishes an 58 /// unambiguous modification order. 59 impl State { 60 /// Starts with a ref count of 2 new_joinable() -> State61 pub(super) fn new_joinable() -> State { 62 State { 63 val: AtomicUsize::new(INITIAL_STATE | JOIN_INTEREST), 64 } 65 } 66 67 /// Loads the current state, establishes `Acquire` ordering. load(&self) -> Snapshot68 pub(super) fn load(&self) -> Snapshot { 69 Snapshot(self.val.load(Acquire)) 70 } 71 72 /// Transitions a task to the `Running` state. 73 /// 74 /// Returns a snapshot of the state **after** the transition. transition_to_running(&self) -> Snapshot75 pub(super) fn transition_to_running(&self) -> Snapshot { 76 const DELTA: usize = RUNNING | NOTIFIED; 77 78 let prev = Snapshot(self.val.fetch_xor(DELTA, Acquire)); 79 debug_assert!(prev.is_notified()); 80 81 if prev.is_running() { 82 // We were signalled to cancel 83 // 84 // Apply the state 85 let prev = self.val.fetch_or(CANCELLED, AcqRel); 86 return Snapshot(prev | CANCELLED); 87 } 88 89 debug_assert!(!prev.is_running()); 90 91 let next = Snapshot(prev.0 ^ DELTA); 92 93 debug_assert!(next.is_running()); 94 debug_assert!(!next.is_notified()); 95 96 next 97 } 98 99 /// Transitions the task from `Running` -> `Idle`. 100 /// 101 /// Returns a snapshot of the state **after** the transition. transition_to_idle(&self) -> Snapshot102 pub(super) fn transition_to_idle(&self) -> Snapshot { 103 const DELTA: usize = RUNNING; 104 105 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 106 107 if !prev.is_running() { 108 // We were signaled to cancel. 109 // 110 // Apply the state 111 let prev = self.val.fetch_or(CANCELLED, AcqRel); 112 return Snapshot(prev | CANCELLED); 113 } 114 115 let next = Snapshot(prev.0 ^ DELTA); 116 117 debug_assert!(!next.is_running()); 118 119 next 120 } 121 122 /// Transitions the task from `Running` -> `Complete`. 123 /// 124 /// Returns a snapshot of the state **after** the transition. transition_to_complete(&self) -> Snapshot125 pub(super) fn transition_to_complete(&self) -> Snapshot { 126 const DELTA: usize = RUNNING | COMPLETE; 127 128 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 129 130 debug_assert!(!prev.is_complete()); 131 132 let next = Snapshot(prev.0 ^ DELTA); 133 134 debug_assert!(next.is_complete()); 135 136 next 137 } 138 139 /// Transitions the task from `Running` -> `Released`. 140 /// 141 /// Returns a snapshot of the state **after** the transition. transition_to_released(&self) -> Snapshot142 pub(super) fn transition_to_released(&self) -> Snapshot { 143 const DELTA: usize = RUNNING | COMPLETE | RELEASED; 144 145 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 146 147 debug_assert!(prev.is_running()); 148 debug_assert!(!prev.is_complete()); 149 debug_assert!(!prev.is_released()); 150 151 let next = Snapshot(prev.0 ^ DELTA); 152 153 debug_assert!(!next.is_running()); 154 debug_assert!(next.is_complete()); 155 debug_assert!(next.is_released()); 156 157 next 158 } 159 160 /// Transitions the task to the canceled state. 161 /// 162 /// Returns the snapshot of the state **after** the transition **if** the 163 /// transition was made successfully 164 /// 165 /// # States 166 /// 167 /// - Notifed: task may be in a queue, caller must not release. 168 /// - Running: cannot drop. The poll handle will handle releasing. 169 /// - Other prior states do not require cancellation. 170 /// 171 /// If the task has been notified, then it may still be in a queue. The 172 /// caller must not release the task. transition_to_canceled_from_queue(&self) -> Snapshot173 pub(super) fn transition_to_canceled_from_queue(&self) -> Snapshot { 174 let prev = Snapshot(self.val.fetch_or(CANCELLED, AcqRel)); 175 176 debug_assert!(!prev.is_complete()); 177 debug_assert!(!prev.is_running() || prev.is_notified()); 178 179 Snapshot(prev.0 | CANCELLED) 180 } 181 transition_to_canceled_from_list(&self) -> Option<Snapshot>182 pub(super) fn transition_to_canceled_from_list(&self) -> Option<Snapshot> { 183 let mut prev = self.load(); 184 185 loop { 186 if !prev.is_active() { 187 return None; 188 } 189 190 let mut next = prev; 191 192 // Use the running flag to signal cancellation 193 if prev.is_running() { 194 next.0 -= RUNNING; 195 next.0 |= NOTIFIED; 196 } else if prev.is_notified() { 197 next.0 += RUNNING; 198 next.0 |= NOTIFIED; 199 } else { 200 next.0 |= CANCELLED; 201 } 202 203 let res = self.val.compare_exchange(prev.0, next.0, AcqRel, Acquire); 204 205 match res { 206 Ok(_) if next.is_canceled() => return Some(next), 207 Ok(_) => return None, 208 Err(actual) => { 209 prev = Snapshot(actual); 210 } 211 } 212 } 213 } 214 215 /// Transitions to `Released`. Called when primary task handle is 216 /// dropped. This is roughly a "ref decrement" operation. 217 /// 218 /// Returns a snapshot of the state **after** the transition. release_task(&self) -> Snapshot219 pub(super) fn release_task(&self) -> Snapshot { 220 use crate::loom::sync::atomic; 221 222 const DELTA: usize = RELEASED; 223 224 let prev = Snapshot(self.val.fetch_or(DELTA, Release)); 225 226 debug_assert!(!prev.is_released()); 227 debug_assert!(prev.is_terminal(), "state = {:?}", prev); 228 229 let next = Snapshot(prev.0 | DELTA); 230 231 debug_assert!(next.is_released()); 232 233 if next.is_final_ref() || (next.has_join_waker() && !next.is_join_interested()) { 234 // The final reference to the task was dropped, the caller must free the 235 // memory. Establish an acquire ordering. 236 atomic::fence(Acquire); 237 } 238 239 next 240 } 241 242 /// Transitions the state to `Scheduled`. 243 /// 244 /// Returns `true` if the task needs to be submitted to the pool for 245 /// execution transition_to_notified(&self) -> bool246 pub(super) fn transition_to_notified(&self) -> bool { 247 const MASK: usize = RUNNING | NOTIFIED | COMPLETE | CANCELLED; 248 249 let prev = self.val.fetch_or(NOTIFIED, Release); 250 prev & MASK == 0 251 } 252 253 /// Optimistically tries to swap the state assuming the join handle is 254 /// __immediately__ dropped on spawn drop_join_handle_fast(&self) -> bool255 pub(super) fn drop_join_handle_fast(&self) -> bool { 256 use std::sync::atomic::Ordering::Relaxed; 257 258 // Relaxed is acceptable as if this function is called and succeeds, 259 // then nothing has been done w/ the join handle. 260 // 261 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is 262 // set, at which point the CAS will fail. 263 // 264 // Given this, there is no risk if this operation is reordered. 265 self.val 266 .compare_exchange_weak( 267 INITIAL_STATE | JOIN_INTEREST, 268 INITIAL_STATE, 269 Release, 270 Relaxed, 271 ) 272 .is_ok() 273 } 274 275 /// The join handle has completed by reading the output. 276 /// 277 /// Returns a snapshot of the state **after** the transition. complete_join_handle(&self) -> Snapshot278 pub(super) fn complete_join_handle(&self) -> Snapshot { 279 use crate::loom::sync::atomic; 280 281 const DELTA: usize = JOIN_INTEREST; 282 283 let prev = Snapshot(self.val.fetch_sub(DELTA, Release)); 284 285 debug_assert!(prev.is_join_interested()); 286 287 let next = Snapshot(prev.0 - DELTA); 288 289 if !next.is_final_ref() { 290 return next; 291 } 292 293 atomic::fence(Acquire); 294 295 next 296 } 297 298 /// The join handle is being dropped, this fails if the task has been 299 /// completed and the output must be dropped first then 300 /// `complete_join_handle` should be called. 301 /// 302 /// Returns a snapshot of the state **after** the transition. drop_join_handle_slow(&self) -> Result<Snapshot, Snapshot>303 pub(super) fn drop_join_handle_slow(&self) -> Result<Snapshot, Snapshot> { 304 const MASK: usize = COMPLETE | CANCELLED; 305 306 let mut prev = self.val.load(Acquire); 307 308 loop { 309 // Once the complete bit is set, it is never unset. 310 if prev & MASK != 0 { 311 return Err(Snapshot(prev)); 312 } 313 314 debug_assert!(prev & JOIN_INTEREST == JOIN_INTEREST); 315 316 let next = (prev - JOIN_INTEREST) & !JOIN_WAKER; 317 318 let res = self.val.compare_exchange(prev, next, AcqRel, Acquire); 319 320 match res { 321 Ok(_) => { 322 return Ok(Snapshot(next)); 323 } 324 Err(actual) => { 325 prev = actual; 326 } 327 } 328 } 329 } 330 331 /// Stores the join waker. store_join_waker(&self) -> Snapshot332 pub(super) fn store_join_waker(&self) -> Snapshot { 333 use crate::loom::sync::atomic; 334 335 const DELTA: usize = JOIN_WAKER; 336 337 let prev = Snapshot(self.val.fetch_xor(DELTA, Release)); 338 339 debug_assert!(!prev.has_join_waker()); 340 341 let next = Snapshot(prev.0 ^ DELTA); 342 343 debug_assert!(next.has_join_waker()); 344 345 if next.is_complete() { 346 atomic::fence(Acquire); 347 } 348 349 next 350 } 351 unset_waker(&self) -> Snapshot352 pub(super) fn unset_waker(&self) -> Snapshot { 353 const MASK: usize = COMPLETE | CANCELLED; 354 355 let mut prev = self.val.load(Acquire); 356 357 loop { 358 // Once the `COMPLETE` bit is set, it is never unset 359 if prev & MASK != 0 { 360 return Snapshot(prev); 361 } 362 363 debug_assert!(Snapshot(prev).has_join_waker()); 364 365 let next = prev - JOIN_WAKER; 366 367 let res = self.val.compare_exchange(prev, next, AcqRel, Acquire); 368 369 match res { 370 Ok(_) => return Snapshot(next), 371 Err(actual) => { 372 prev = actual; 373 } 374 } 375 } 376 } 377 ref_inc(&self)378 pub(super) fn ref_inc(&self) { 379 use std::process; 380 use std::sync::atomic::Ordering::Relaxed; 381 382 // Using a relaxed ordering is alright here, as knowledge of the 383 // original reference prevents other threads from erroneously deleting 384 // the object. 385 // 386 // As explained in the [Boost documentation][1], Increasing the 387 // reference counter can always be done with memory_order_relaxed: New 388 // references to an object can only be formed from an existing 389 // reference, and passing an existing reference from one thread to 390 // another must already provide any required synchronization. 391 // 392 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) 393 let prev = self.val.fetch_add(WAKER_ONE, Relaxed); 394 395 // If the reference count overflowed, abort. 396 if prev > isize::max_value() as usize { 397 process::abort(); 398 } 399 } 400 401 /// Returns `true` if the task should be released. ref_dec(&self) -> bool402 pub(super) fn ref_dec(&self) -> bool { 403 use crate::loom::sync::atomic; 404 405 let prev = self.val.fetch_sub(WAKER_ONE, Release); 406 let next = Snapshot(prev - WAKER_ONE); 407 408 if next.is_final_ref() { 409 atomic::fence(Acquire); 410 } 411 412 next.is_final_ref() 413 } 414 } 415 416 impl Snapshot { is_running(self) -> bool417 pub(super) fn is_running(self) -> bool { 418 self.0 & RUNNING == RUNNING 419 } 420 is_notified(self) -> bool421 pub(super) fn is_notified(self) -> bool { 422 self.0 & NOTIFIED == NOTIFIED 423 } 424 is_released(self) -> bool425 pub(super) fn is_released(self) -> bool { 426 self.0 & RELEASED == RELEASED 427 } 428 is_complete(self) -> bool429 pub(super) fn is_complete(self) -> bool { 430 self.0 & COMPLETE == COMPLETE 431 } 432 is_canceled(self) -> bool433 pub(super) fn is_canceled(self) -> bool { 434 self.0 & CANCELLED == CANCELLED 435 } 436 437 /// Used during normal runtime. is_active(self) -> bool438 pub(super) fn is_active(self) -> bool { 439 self.0 & (COMPLETE | CANCELLED) == 0 440 } 441 442 /// Used before dropping the task is_terminal(self) -> bool443 pub(super) fn is_terminal(self) -> bool { 444 // When both the notified & running flags are set, the task was canceled 445 // after being notified, before it was run. 446 // 447 // There is a race where: 448 // - The task state transitions to notified 449 // - The global queue is shutdown 450 // - The waker attempts to push into the global queue and fails. 451 // - The waker holds the last reference to the task, thus drops it. 452 // 453 // In this scenario, the cancelled bit will never get set. 454 !self.is_active() || (self.is_notified() && self.is_running()) 455 } 456 is_join_interested(self) -> bool457 pub(super) fn is_join_interested(self) -> bool { 458 self.0 & JOIN_INTEREST == JOIN_INTEREST 459 } 460 has_join_waker(self) -> bool461 pub(super) fn has_join_waker(self) -> bool { 462 self.0 & JOIN_WAKER == JOIN_WAKER 463 } 464 is_final_ref(self) -> bool465 pub(super) fn is_final_ref(self) -> bool { 466 const MASK: usize = WAKER_COUNT_MASK | RELEASED | JOIN_INTEREST; 467 468 (self.0 & MASK) == RELEASED 469 } 470 } 471 472 impl fmt::Debug for State { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result473 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 474 use std::sync::atomic::Ordering::SeqCst; 475 476 let snapshot = Snapshot(self.val.load(SeqCst)); 477 478 fmt.debug_struct("State") 479 .field("snapshot", &snapshot) 480 .finish() 481 } 482 } 483 484 impl fmt::Debug for Snapshot { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result485 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 486 fmt.debug_struct("Snapshot") 487 .field("is_running", &self.is_running()) 488 .field("is_notified", &self.is_notified()) 489 .field("is_released", &self.is_released()) 490 .field("is_complete", &self.is_complete()) 491 .field("is_canceled", &self.is_canceled()) 492 .field("is_join_interested", &self.is_join_interested()) 493 .field("has_join_waker", &self.has_join_waker()) 494 .field("is_final_ref", &self.is_final_ref()) 495 .finish() 496 } 497 } 498