1 use pool::Pool; 2 use task::{BlockingState, Task}; 3 4 use futures::{Async, Poll}; 5 6 use std::cell::UnsafeCell; 7 use std::fmt; 8 use std::ptr; 9 use std::sync::atomic::AtomicUsize; 10 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; 11 use std::sync::Arc; 12 use std::thread; 13 14 /// Manages the state around entering a blocking section and tasks that are 15 /// queued pending the ability to block. 16 /// 17 /// This is a hybrid counter and intrusive mpsc channel (like `Queue`). 18 #[derive(Debug)] 19 pub(crate) struct Blocking { 20 /// Queue head. 21 /// 22 /// This is either the current remaining capacity for blocking sections 23 /// **or** if the max has been reached, the head of a pending blocking 24 /// capacity channel of tasks. 25 /// 26 /// When this points to a task, it represents a strong reference, i.e. 27 /// `Arc<Task>`. 28 state: AtomicUsize, 29 30 /// Tail pointer. This is `Arc<Task>` unless it points to `stub`. 31 tail: UnsafeCell<*mut Task>, 32 33 /// Stub pointer, used as part of the intrusive mpsc channel algorithm 34 /// described by 1024cores. 35 stub: Box<Task>, 36 37 /// The channel algorithm is MPSC. This means that, in order to pop tasks, 38 /// coordination is required. 39 /// 40 /// Since it doesn't matter *which* task pops & notifies the queued task, we 41 /// can avoid a full mutex and make the "lock" lock free. 42 /// 43 /// Instead, threads race to set the "entered" bit. When the transition is 44 /// successfully made, the thread has permission to pop tasks off of the 45 /// queue. If a thread loses the race, instead of waiting to pop a task, it 46 /// signals to the winning thread that it should pop an additional task. 47 lock: AtomicUsize, 48 } 49 50 #[derive(Debug, Clone, Copy, Eq, PartialEq)] 51 pub(crate) enum CanBlock { 52 /// Blocking capacity has been allocated to this task. 53 /// 54 /// The capacity allocation is initially checked before a task is polled. If 55 /// capacity has been allocated, it is consumed and tracked as `Allocated`. 56 Allocated, 57 58 /// Allocation capacity must be either available to the task when it is 59 /// polled or not available. This means that a task can only ask for 60 /// capacity once. This state is used to track a task that has not yet asked 61 /// for blocking capacity. When a task needs blocking capacity, if it is in 62 /// this state, it can immediately try to get an allocation. 63 CanRequest, 64 65 /// The task has requested blocking capacity, but none is available. 66 NoCapacity, 67 } 68 69 /// Decorates the `usize` value of `Blocking::state`, providing fns to 70 /// manipulate the state instead of requiring bit ops. 71 #[derive(Copy, Clone, Eq, PartialEq)] 72 struct State(usize); 73 74 /// Flag differentiating between remaining capacity and task pointers. 75 /// 76 /// If we assume pointers are properly aligned, then the least significant bit 77 /// will always be zero. So, we use that bit to track if the value represents a 78 /// number. 79 const NUM_FLAG: usize = 1; 80 81 /// When representing "numbers", the state has to be shifted this much (to get 82 /// rid of the flag bit). 83 const NUM_SHIFT: usize = 1; 84 85 // ====== impl Blocking ===== 86 // 87 impl Blocking { 88 /// Create a new `Blocking`. 89 pub fn new(capacity: usize) -> Blocking { 90 assert!(capacity > 0, "blocking capacity must be greater than zero"); 91 92 let stub = Box::new(Task::stub()); 93 let ptr = &*stub as *const _ as *mut _; 94 95 // Allocations are aligned 96 debug_assert!(ptr as usize & NUM_FLAG == 0); 97 98 // The initial state value. This starts at the max capacity. 99 let init = State::new(capacity); 100 101 Blocking { 102 state: AtomicUsize::new(init.into()), 103 tail: UnsafeCell::new(ptr), 104 stub: stub, 105 lock: AtomicUsize::new(0), 106 } 107 } 108 109 /// Atomically either acquire blocking capacity or queue the task to be 110 /// notified once capacity becomes available. 111 /// 112 /// The caller must ensure that `task` has not previously been queued to be 113 /// notified when capacity becomes available. 114 pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> { 115 // This requires atomically claiming blocking capacity and if none is 116 // available, queuing &task. 117 118 // The task cannot be queued at this point. The caller must ensure this. 119 debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued()); 120 121 // Don't bump the ref count unless necessary. 122 let mut strong: Option<*const Task> = None; 123 124 // Load the state 125 let mut curr: State = self.state.load(Acquire).into(); 126 127 loop { 128 let mut next = curr; 129 130 if !next.claim_capacity(&self.stub) { 131 debug_assert!(curr.ptr().is_some()); 132 133 // Unable to claim capacity, so we must queue `task` onto the 134 // channel. 135 // 136 // This guard also serves to ensure that queuing work that is 137 // only needed to run once only gets run once. 138 if strong.is_none() { 139 // First, transition the task to a "queued" state. This 140 // prevents double queuing. 141 // 142 // This is also the only thread that can set the queued flag 143 // at this point. And, the goal is for this to only be 144 // visible when the task node is polled from the channel. 145 // The memory ordering is established by MPSC queue 146 // operation. 147 // 148 // Note that, if the task doesn't get queued (because the 149 // CAS fails and capacity is now available) then this flag 150 // must be unset. Again, there is no race because until the 151 // task is queued, no other thread can see it. 152 let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); 153 debug_assert!(!prev.is_queued()); 154 155 // Bump the ref count 156 strong = Some(Arc::into_raw(task.clone())); 157 158 // Set the next pointer. This does not require an atomic 159 // operation as this node is not currently accessible to 160 // other threads via the queue. 161 task.next_blocking.store(ptr::null_mut(), Relaxed); 162 } 163 164 let ptr = strong.unwrap(); 165 166 // Update the head to point to the new node. We need to see the 167 // previous node in order to update the next pointer as well as 168 // release `task` to any other threads calling `push`. 169 next.set_ptr(ptr); 170 } 171 172 debug_assert_ne!(curr.0, 0); 173 debug_assert_ne!(next.0, 0); 174 175 let actual = self 176 .state 177 .compare_and_swap(curr.into(), next.into(), AcqRel) 178 .into(); 179 180 if curr == actual { 181 break; 182 } 183 184 curr = actual; 185 } 186 187 match curr.ptr() { 188 Some(prev) => { 189 let ptr = strong.unwrap(); 190 191 // Finish pushing 192 unsafe { 193 (*prev).next_blocking.store(ptr as *mut _, Release); 194 } 195 196 // The node was queued to be notified once capacity is made 197 // available. 198 Ok(Async::NotReady) 199 } 200 None => { 201 debug_assert!(curr.remaining_capacity() > 0); 202 203 // If `strong` is set, gotta undo a bunch of work 204 if let Some(ptr) = strong { 205 let _ = unsafe { Arc::from_raw(ptr) }; 206 207 // Unset the queued flag. 208 let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); 209 debug_assert!(prev.is_queued()); 210 } 211 212 // Capacity has been obtained 213 Ok(().into()) 214 } 215 } 216 } 217 218 unsafe fn push_stub(&self) { 219 let task: *mut Task = &*self.stub as *const _ as *mut _; 220 221 // Set the next pointer. This does not require an atomic operation as 222 // this node is not accessible. The write will be flushed with the next 223 // operation 224 (*task).next_blocking.store(ptr::null_mut(), Relaxed); 225 226 // Update the head to point to the new node. We need to see the previous 227 // node in order to update the next pointer as well as release `task` 228 // to any other threads calling `push`. 229 let prev = self.state.swap(task as usize, AcqRel); 230 231 // The stub is only pushed when there are pending tasks. Because of 232 // this, the state must *always* be in pointer mode. 233 debug_assert!(State::from(prev).is_ptr()); 234 235 let prev = prev as *const Task; 236 237 // We don't want the *existing* pointer to be a stub. 238 debug_assert_ne!(prev, task); 239 240 // Release `task` to the consume end. 241 (*prev).next_blocking.store(task, Release); 242 } 243 244 pub fn notify_task(&self, pool: &Arc<Pool>) { 245 let prev = self.lock.fetch_add(1, AcqRel); 246 247 if prev != 0 { 248 // Another thread has the lock and will be responsible for notifying 249 // pending tasks. 250 return; 251 } 252 253 let mut dec = 1; 254 255 loop { 256 let mut remaining_pops = dec; 257 while remaining_pops > 0 { 258 remaining_pops -= 1; 259 260 let task = match self.pop(remaining_pops) { 261 Some(t) => t, 262 None => break, 263 }; 264 265 Task::notify_blocking(task, pool); 266 } 267 268 // Decrement the number of handled notifications 269 let actual = self.lock.fetch_sub(dec, AcqRel); 270 271 if actual == dec { 272 break; 273 } 274 275 // This can only be greater than expected as we are the only thread 276 // that is decrementing. 277 debug_assert!(actual > dec); 278 dec = actual - dec; 279 } 280 } 281 282 /// Pop a task 283 /// 284 /// `rem` represents the remaining number of times the caller will pop. If 285 /// there are no more tasks to pop, `rem` is used to set the remaining 286 /// capacity. 287 fn pop(&self, rem: usize) -> Option<Arc<Task>> { 288 'outer: loop { 289 unsafe { 290 let mut tail = *self.tail.get(); 291 let mut next = (*tail).next_blocking.load(Acquire); 292 293 let stub = &*self.stub as *const _ as *mut _; 294 295 if tail == stub { 296 if next.is_null() { 297 // This loop is not part of the standard intrusive mpsc 298 // channel algorithm. This is where we atomically pop 299 // the last task and add `rem` to the remaining capacity. 300 // 301 // This modification to the pop algorithm works because, 302 // at this point, we have not done any work (only done 303 // reading). We have a *pretty* good idea that there is 304 // no concurrent pusher. 305 // 306 // The capacity is then atomically added by doing an 307 // AcqRel CAS on `state`. The `state` cell is the 308 // linchpin of the algorithm. 309 // 310 // By successfully CASing `head` w/ AcqRel, we ensure 311 // that, if any thread was racing and entered a push, we 312 // see that and abort pop, retrying as it is 313 // "inconsistent". 314 let mut curr: State = self.state.load(Acquire).into(); 315 316 loop { 317 if curr.has_task(&self.stub) { 318 // Inconsistent state, yield the thread and try 319 // again. 320 thread::yield_now(); 321 continue 'outer; 322 } 323 324 let mut after = curr; 325 326 // +1 here because `rem` represents the number of 327 // pops that will come after the current one. 328 after.add_capacity(rem + 1, &self.stub); 329 330 let actual: State = self 331 .state 332 .compare_and_swap(curr.into(), after.into(), AcqRel) 333 .into(); 334 335 if actual == curr { 336 // Successfully returned the remaining capacity 337 return None; 338 } 339 340 curr = actual; 341 } 342 } 343 344 *self.tail.get() = next; 345 tail = next; 346 next = (*next).next_blocking.load(Acquire); 347 } 348 349 if !next.is_null() { 350 *self.tail.get() = next; 351 352 // No ref_count inc is necessary here as this poll is paired 353 // with a `push` which "forgets" the handle. 354 return Some(Arc::from_raw(tail)); 355 } 356 357 let state = self.state.load(Acquire); 358 359 // This must always be a pointer 360 debug_assert!(State::from(state).is_ptr()); 361 362 if state != tail as usize { 363 // Try again 364 thread::yield_now(); 365 continue 'outer; 366 } 367 368 self.push_stub(); 369 370 next = (*tail).next_blocking.load(Acquire); 371 372 if !next.is_null() { 373 *self.tail.get() = next; 374 375 return Some(Arc::from_raw(tail)); 376 } 377 378 thread::yield_now(); 379 // Try again 380 } 381 } 382 } 383 } 384 385 // ====== impl State ===== 386 387 impl State { 388 /// Return a new `State` representing the remaining capacity at the maximum 389 /// value. 390 fn new(capacity: usize) -> State { 391 State((capacity << NUM_SHIFT) | NUM_FLAG) 392 } 393 394 fn remaining_capacity(&self) -> usize { 395 if !self.has_remaining_capacity() { 396 return 0; 397 } 398 399 self.0 >> 1 400 } 401 402 fn has_remaining_capacity(&self) -> bool { 403 self.0 & NUM_FLAG == NUM_FLAG 404 } 405 406 fn has_task(&self, stub: &Task) -> bool { 407 !(self.has_remaining_capacity() || self.is_stub(stub)) 408 } 409 410 fn is_stub(&self, stub: &Task) -> bool { 411 self.0 == stub as *const _ as usize 412 } 413 414 /// Try to claim blocking capacity. 415 /// 416 /// # Return 417 /// 418 /// Returns `true` if the capacity was claimed, `false` otherwise. If 419 /// `false` is returned, it can be assumed that `State` represents the head 420 /// pointer in the mpsc channel. 421 fn claim_capacity(&mut self, stub: &Task) -> bool { 422 if !self.has_remaining_capacity() { 423 return false; 424 } 425 426 debug_assert!(self.0 != 1); 427 428 self.0 -= 1 << NUM_SHIFT; 429 430 if self.0 == NUM_FLAG { 431 // Set the state to the stub pointer. 432 self.0 = stub as *const _ as usize; 433 } 434 435 true 436 } 437 438 /// Add blocking capacity. 439 fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool { 440 debug_assert!(capacity > 0); 441 442 if self.is_stub(stub) { 443 self.0 = (capacity << NUM_SHIFT) | NUM_FLAG; 444 true 445 } else if self.has_remaining_capacity() { 446 self.0 += capacity << NUM_SHIFT; 447 true 448 } else { 449 false 450 } 451 } 452 453 fn is_ptr(&self) -> bool { 454 self.0 & NUM_FLAG == 0 455 } 456 457 fn ptr(&self) -> Option<*const Task> { 458 if self.is_ptr() { 459 Some(self.0 as *const Task) 460 } else { 461 None 462 } 463 } 464 465 fn set_ptr(&mut self, ptr: *const Task) { 466 let ptr = ptr as usize; 467 debug_assert!(ptr & NUM_FLAG == 0); 468 self.0 = ptr 469 } 470 } 471 472 impl From<usize> for State { 473 fn from(src: usize) -> State { 474 State(src) 475 } 476 } 477 478 impl From<State> for usize { 479 fn from(src: State) -> usize { 480 src.0 481 } 482 } 483 484 impl fmt::Debug for State { 485 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 486 let mut fmt = fmt.debug_struct("State"); 487 488 if self.is_ptr() { 489 fmt.field("ptr", &self.0); 490 } else { 491 fmt.field("remaining", &self.remaining_capacity()); 492 } 493 494 fmt.finish() 495 } 496 } 497