1 use pool::Pool;
2 use task::{BlockingState, Task};
3 
4 use futures::{Async, Poll};
5 
6 use std::cell::UnsafeCell;
7 use std::fmt;
8 use std::ptr;
9 use std::sync::atomic::AtomicUsize;
10 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
11 use std::sync::Arc;
12 use std::thread;
13 
14 /// Manages the state around entering a blocking section and tasks that are
15 /// queued pending the ability to block.
16 ///
17 /// This is a hybrid counter and intrusive mpsc channel (like `Queue`).
18 #[derive(Debug)]
19 pub(crate) struct Blocking {
20     /// Queue head.
21     ///
22     /// This is either the current remaining capacity for blocking sections
23     /// **or** if the max has been reached, the head of a pending blocking
24     /// capacity channel of tasks.
25     ///
26     /// When this points to a task, it represents a strong reference, i.e.
27     /// `Arc<Task>`.
28     state: AtomicUsize,
29 
30     /// Tail pointer. This is `Arc<Task>` unless it points to `stub`.
31     tail: UnsafeCell<*mut Task>,
32 
33     /// Stub pointer, used as part of the intrusive mpsc channel algorithm
34     /// described by 1024cores.
35     stub: Box<Task>,
36 
37     /// The channel algorithm is MPSC. This means that, in order to pop tasks,
38     /// coordination is required.
39     ///
40     /// Since it doesn't matter *which* task pops & notifies the queued task, we
41     /// can avoid a full mutex and make the "lock" lock free.
42     ///
43     /// Instead, threads race to set the "entered" bit. When the transition is
44     /// successfully made, the thread has permission to pop tasks off of the
45     /// queue. If a thread loses the race, instead of waiting to pop a task, it
46     /// signals to the winning thread that it should pop an additional task.
47     lock: AtomicUsize,
48 }
49 
50 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
51 pub(crate) enum CanBlock {
52     /// Blocking capacity has been allocated to this task.
53     ///
54     /// The capacity allocation is initially checked before a task is polled. If
55     /// capacity has been allocated, it is consumed and tracked as `Allocated`.
56     Allocated,
57 
58     /// Allocation capacity must be either available to the task when it is
59     /// polled or not available. This means that a task can only ask for
60     /// capacity once. This state is used to track a task that has not yet asked
61     /// for blocking capacity. When a task needs blocking capacity, if it is in
62     /// this state, it can immediately try to get an allocation.
63     CanRequest,
64 
65     /// The task has requested blocking capacity, but none is available.
66     NoCapacity,
67 }
68 
69 /// Decorates the `usize` value of `Blocking::state`, providing fns to
70 /// manipulate the state instead of requiring bit ops.
71 #[derive(Copy, Clone, Eq, PartialEq)]
72 struct State(usize);
73 
74 /// Flag differentiating between remaining capacity and task pointers.
75 ///
76 /// If we assume pointers are properly aligned, then the least significant bit
77 /// will always be zero. So, we use that bit to track if the value represents a
78 /// number.
79 const NUM_FLAG: usize = 1;
80 
81 /// When representing "numbers", the state has to be shifted this much (to get
82 /// rid of the flag bit).
83 const NUM_SHIFT: usize = 1;
84 
85 // ====== impl Blocking =====
86 //
87 impl Blocking {
88     /// Create a new `Blocking`.
89     pub fn new(capacity: usize) -> Blocking {
90         assert!(capacity > 0, "blocking capacity must be greater than zero");
91 
92         let stub = Box::new(Task::stub());
93         let ptr = &*stub as *const _ as *mut _;
94 
95         // Allocations are aligned
96         debug_assert!(ptr as usize & NUM_FLAG == 0);
97 
98         // The initial state value. This starts at the max capacity.
99         let init = State::new(capacity);
100 
101         Blocking {
102             state: AtomicUsize::new(init.into()),
103             tail: UnsafeCell::new(ptr),
104             stub: stub,
105             lock: AtomicUsize::new(0),
106         }
107     }
108 
109     /// Atomically either acquire blocking capacity or queue the task to be
110     /// notified once capacity becomes available.
111     ///
112     /// The caller must ensure that `task` has not previously been queued to be
113     /// notified when capacity becomes available.
114     pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
115         // This requires atomically claiming blocking capacity and if none is
116         // available, queuing &task.
117 
118         // The task cannot be queued at this point. The caller must ensure this.
119         debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued());
120 
121         // Don't bump the ref count unless necessary.
122         let mut strong: Option<*const Task> = None;
123 
124         // Load the state
125         let mut curr: State = self.state.load(Acquire).into();
126 
127         loop {
128             let mut next = curr;
129 
130             if !next.claim_capacity(&self.stub) {
131                 debug_assert!(curr.ptr().is_some());
132 
133                 // Unable to claim capacity, so we must queue `task` onto the
134                 // channel.
135                 //
136                 // This guard also serves to ensure that queuing work that is
137                 // only needed to run once only gets run once.
138                 if strong.is_none() {
139                     // First, transition the task to a "queued" state. This
140                     // prevents double queuing.
141                     //
142                     // This is also the only thread that can set the queued flag
143                     // at this point. And, the goal is for this to only be
144                     // visible when the task node is polled from the channel.
145                     // The memory ordering is established by MPSC queue
146                     // operation.
147                     //
148                     // Note that, if the task doesn't get queued (because the
149                     // CAS fails and capacity is now available) then this flag
150                     // must be unset. Again, there is no race because until the
151                     // task is queued, no other thread can see it.
152                     let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
153                     debug_assert!(!prev.is_queued());
154 
155                     // Bump the ref count
156                     strong = Some(Arc::into_raw(task.clone()));
157 
158                     // Set the next pointer. This does not require an atomic
159                     // operation as this node is not currently accessible to
160                     // other threads via the queue.
161                     task.next_blocking.store(ptr::null_mut(), Relaxed);
162                 }
163 
164                 let ptr = strong.unwrap();
165 
166                 // Update the head to point to the new node. We need to see the
167                 // previous node in order to update the next pointer as well as
168                 // release `task` to any other threads calling `push`.
169                 next.set_ptr(ptr);
170             }
171 
172             debug_assert_ne!(curr.0, 0);
173             debug_assert_ne!(next.0, 0);
174 
175             let actual = self
176                 .state
177                 .compare_and_swap(curr.into(), next.into(), AcqRel)
178                 .into();
179 
180             if curr == actual {
181                 break;
182             }
183 
184             curr = actual;
185         }
186 
187         match curr.ptr() {
188             Some(prev) => {
189                 let ptr = strong.unwrap();
190 
191                 // Finish pushing
192                 unsafe {
193                     (*prev).next_blocking.store(ptr as *mut _, Release);
194                 }
195 
196                 // The node was queued to be notified once capacity is made
197                 // available.
198                 Ok(Async::NotReady)
199             }
200             None => {
201                 debug_assert!(curr.remaining_capacity() > 0);
202 
203                 // If `strong` is set, gotta undo a bunch of work
204                 if let Some(ptr) = strong {
205                     let _ = unsafe { Arc::from_raw(ptr) };
206 
207                     // Unset the queued flag.
208                     let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
209                     debug_assert!(prev.is_queued());
210                 }
211 
212                 // Capacity has been obtained
213                 Ok(().into())
214             }
215         }
216     }
217 
218     unsafe fn push_stub(&self) {
219         let task: *mut Task = &*self.stub as *const _ as *mut _;
220 
221         // Set the next pointer. This does not require an atomic operation as
222         // this node is not accessible. The write will be flushed with the next
223         // operation
224         (*task).next_blocking.store(ptr::null_mut(), Relaxed);
225 
226         // Update the head to point to the new node. We need to see the previous
227         // node in order to update the next pointer as well as release `task`
228         // to any other threads calling `push`.
229         let prev = self.state.swap(task as usize, AcqRel);
230 
231         // The stub is only pushed when there are pending tasks. Because of
232         // this, the state must *always* be in pointer mode.
233         debug_assert!(State::from(prev).is_ptr());
234 
235         let prev = prev as *const Task;
236 
237         // We don't want the *existing* pointer to be a stub.
238         debug_assert_ne!(prev, task);
239 
240         // Release `task` to the consume end.
241         (*prev).next_blocking.store(task, Release);
242     }
243 
244     pub fn notify_task(&self, pool: &Arc<Pool>) {
245         let prev = self.lock.fetch_add(1, AcqRel);
246 
247         if prev != 0 {
248             // Another thread has the lock and will be responsible for notifying
249             // pending tasks.
250             return;
251         }
252 
253         let mut dec = 1;
254 
255         loop {
256             let mut remaining_pops = dec;
257             while remaining_pops > 0 {
258                 remaining_pops -= 1;
259 
260                 let task = match self.pop(remaining_pops) {
261                     Some(t) => t,
262                     None => break,
263                 };
264 
265                 Task::notify_blocking(task, pool);
266             }
267 
268             // Decrement the number of handled notifications
269             let actual = self.lock.fetch_sub(dec, AcqRel);
270 
271             if actual == dec {
272                 break;
273             }
274 
275             // This can only be greater than expected as we are the only thread
276             // that is decrementing.
277             debug_assert!(actual > dec);
278             dec = actual - dec;
279         }
280     }
281 
282     /// Pop a task
283     ///
284     /// `rem` represents the remaining number of times the caller will pop. If
285     /// there are no more tasks to pop, `rem` is used to set the remaining
286     /// capacity.
287     fn pop(&self, rem: usize) -> Option<Arc<Task>> {
288         'outer: loop {
289             unsafe {
290                 let mut tail = *self.tail.get();
291                 let mut next = (*tail).next_blocking.load(Acquire);
292 
293                 let stub = &*self.stub as *const _ as *mut _;
294 
295                 if tail == stub {
296                     if next.is_null() {
297                         // This loop is not part of the standard intrusive mpsc
298                         // channel algorithm. This is where we atomically pop
299                         // the last task and add `rem` to the remaining capacity.
300                         //
301                         // This modification to the pop algorithm works because,
302                         // at this point, we have not done any work (only done
303                         // reading). We have a *pretty* good idea that there is
304                         // no concurrent pusher.
305                         //
306                         // The capacity is then atomically added by doing an
307                         // AcqRel CAS on `state`. The `state` cell is the
308                         // linchpin of the algorithm.
309                         //
310                         // By successfully CASing `head` w/ AcqRel, we ensure
311                         // that, if any thread was racing and entered a push, we
312                         // see that and abort pop, retrying as it is
313                         // "inconsistent".
314                         let mut curr: State = self.state.load(Acquire).into();
315 
316                         loop {
317                             if curr.has_task(&self.stub) {
318                                 // Inconsistent state, yield the thread and try
319                                 // again.
320                                 thread::yield_now();
321                                 continue 'outer;
322                             }
323 
324                             let mut after = curr;
325 
326                             // +1 here because `rem` represents the number of
327                             // pops that will come after the current one.
328                             after.add_capacity(rem + 1, &self.stub);
329 
330                             let actual: State = self
331                                 .state
332                                 .compare_and_swap(curr.into(), after.into(), AcqRel)
333                                 .into();
334 
335                             if actual == curr {
336                                 // Successfully returned the remaining capacity
337                                 return None;
338                             }
339 
340                             curr = actual;
341                         }
342                     }
343 
344                     *self.tail.get() = next;
345                     tail = next;
346                     next = (*next).next_blocking.load(Acquire);
347                 }
348 
349                 if !next.is_null() {
350                     *self.tail.get() = next;
351 
352                     // No ref_count inc is necessary here as this poll is paired
353                     // with a `push` which "forgets" the handle.
354                     return Some(Arc::from_raw(tail));
355                 }
356 
357                 let state = self.state.load(Acquire);
358 
359                 // This must always be a pointer
360                 debug_assert!(State::from(state).is_ptr());
361 
362                 if state != tail as usize {
363                     // Try again
364                     thread::yield_now();
365                     continue 'outer;
366                 }
367 
368                 self.push_stub();
369 
370                 next = (*tail).next_blocking.load(Acquire);
371 
372                 if !next.is_null() {
373                     *self.tail.get() = next;
374 
375                     return Some(Arc::from_raw(tail));
376                 }
377 
378                 thread::yield_now();
379                 // Try again
380             }
381         }
382     }
383 }
384 
385 // ====== impl State =====
386 
387 impl State {
388     /// Return a new `State` representing the remaining capacity at the maximum
389     /// value.
390     fn new(capacity: usize) -> State {
391         State((capacity << NUM_SHIFT) | NUM_FLAG)
392     }
393 
394     fn remaining_capacity(&self) -> usize {
395         if !self.has_remaining_capacity() {
396             return 0;
397         }
398 
399         self.0 >> 1
400     }
401 
402     fn has_remaining_capacity(&self) -> bool {
403         self.0 & NUM_FLAG == NUM_FLAG
404     }
405 
406     fn has_task(&self, stub: &Task) -> bool {
407         !(self.has_remaining_capacity() || self.is_stub(stub))
408     }
409 
410     fn is_stub(&self, stub: &Task) -> bool {
411         self.0 == stub as *const _ as usize
412     }
413 
414     /// Try to claim blocking capacity.
415     ///
416     /// # Return
417     ///
418     /// Returns `true` if the capacity was claimed, `false` otherwise. If
419     /// `false` is returned, it can be assumed that `State` represents the head
420     /// pointer in the mpsc channel.
421     fn claim_capacity(&mut self, stub: &Task) -> bool {
422         if !self.has_remaining_capacity() {
423             return false;
424         }
425 
426         debug_assert!(self.0 != 1);
427 
428         self.0 -= 1 << NUM_SHIFT;
429 
430         if self.0 == NUM_FLAG {
431             // Set the state to the stub pointer.
432             self.0 = stub as *const _ as usize;
433         }
434 
435         true
436     }
437 
438     /// Add blocking capacity.
439     fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool {
440         debug_assert!(capacity > 0);
441 
442         if self.is_stub(stub) {
443             self.0 = (capacity << NUM_SHIFT) | NUM_FLAG;
444             true
445         } else if self.has_remaining_capacity() {
446             self.0 += capacity << NUM_SHIFT;
447             true
448         } else {
449             false
450         }
451     }
452 
453     fn is_ptr(&self) -> bool {
454         self.0 & NUM_FLAG == 0
455     }
456 
457     fn ptr(&self) -> Option<*const Task> {
458         if self.is_ptr() {
459             Some(self.0 as *const Task)
460         } else {
461             None
462         }
463     }
464 
465     fn set_ptr(&mut self, ptr: *const Task) {
466         let ptr = ptr as usize;
467         debug_assert!(ptr & NUM_FLAG == 0);
468         self.0 = ptr
469     }
470 }
471 
472 impl From<usize> for State {
473     fn from(src: usize) -> State {
474         State(src)
475     }
476 }
477 
478 impl From<State> for usize {
479     fn from(src: State) -> usize {
480         src.0
481     }
482 }
483 
484 impl fmt::Debug for State {
485     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
486         let mut fmt = fmt.debug_struct("State");
487 
488         if self.is_ptr() {
489             fmt.field("ptr", &self.0);
490         } else {
491             fmt.field("remaining", &self.remaining_capacity());
492         }
493 
494         fmt.finish()
495     }
496 }
497