1 use task::CanBlock; 2 3 use std::fmt; 4 use std::sync::atomic::{AtomicUsize, Ordering}; 5 6 /// State tracking task level state to support `blocking`. 7 /// 8 /// This tracks two separate flags. 9 /// 10 /// a) If the task is queued in the pending blocking channel. This prevents 11 /// double queuing (which would break the linked list). 12 /// 13 /// b) If the task has been allocated capacity to block. 14 #[derive(Eq, PartialEq)] 15 pub(crate) struct BlockingState(usize); 16 17 const QUEUED: usize = 0b01; 18 const ALLOCATED: usize = 0b10; 19 20 impl BlockingState { 21 /// Create a new, default, `BlockingState`. new() -> BlockingState22 pub fn new() -> BlockingState { 23 BlockingState(0) 24 } 25 26 /// Returns `true` if the state represents the associated task being queued 27 /// in the pending blocking capacity channel is_queued(&self) -> bool28 pub fn is_queued(&self) -> bool { 29 self.0 & QUEUED == QUEUED 30 } 31 32 /// Toggle the queued flag 33 /// 34 /// Returns the state before the flag has been toggled. toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState35 pub fn toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState { 36 state.fetch_xor(QUEUED, ordering).into() 37 } 38 39 /// Returns `true` if the state represents the associated task having been 40 /// allocated capacity to block. is_allocated(&self) -> bool41 pub fn is_allocated(&self) -> bool { 42 self.0 & ALLOCATED == ALLOCATED 43 } 44 45 /// Atomically consume the capacity allocation and return if the allocation 46 /// was present. 47 /// 48 /// If this returns `true`, then the task has the ability to block for the 49 /// duration of the `poll`. consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock50 pub fn consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock { 51 let state: Self = state.fetch_and(!ALLOCATED, ordering).into(); 52 53 if state.is_allocated() { 54 CanBlock::Allocated 55 } else if state.is_queued() { 56 CanBlock::NoCapacity 57 } else { 58 CanBlock::CanRequest 59 } 60 } 61 notify_blocking(state: &AtomicUsize, ordering: Ordering)62 pub fn notify_blocking(state: &AtomicUsize, ordering: Ordering) { 63 let prev: Self = state.fetch_xor(ALLOCATED | QUEUED, ordering).into(); 64 65 debug_assert!(prev.is_queued()); 66 debug_assert!(!prev.is_allocated()); 67 } 68 } 69 70 impl From<usize> for BlockingState { from(src: usize) -> BlockingState71 fn from(src: usize) -> BlockingState { 72 BlockingState(src) 73 } 74 } 75 76 impl From<BlockingState> for usize { from(src: BlockingState) -> usize77 fn from(src: BlockingState) -> usize { 78 src.0 79 } 80 } 81 82 impl fmt::Debug for BlockingState { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result83 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 84 fmt.debug_struct("BlockingState") 85 .field("is_queued", &self.is_queued()) 86 .field("is_allocated", &self.is_allocated()) 87 .finish() 88 } 89 } 90