1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::spinwait::SpinWait;
9 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
10 use core::{
11     cell::Cell,
12     mem, ptr,
13     sync::atomic::{fence, AtomicUsize, Ordering},
14 };
15 
16 struct ThreadData {
17     parker: ThreadParker,
18 
19     // Linked list of threads in the queue. The queue is split into two parts:
20     // the processed part and the unprocessed part. When new nodes are added to
21     // the list, they only have the next pointer set, and queue_tail is null.
22     //
23     // Nodes are processed with the queue lock held, which consists of setting
24     // the prev pointer for each node and setting the queue_tail pointer on the
25     // first processed node of the list.
26     //
27     // This setup allows nodes to be added to the queue without a lock, while
28     // still allowing O(1) removal of nodes from the processed part of the list.
29     // The only cost is the O(n) processing, but this only needs to be done
30     // once for each node, and therefore isn't too expensive.
31     queue_tail: Cell<*const ThreadData>,
32     prev: Cell<*const ThreadData>,
33     next: Cell<*const ThreadData>,
34 }
35 
36 impl ThreadData {
37     #[inline]
new() -> ThreadData38     fn new() -> ThreadData {
39         assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
40         ThreadData {
41             parker: ThreadParker::new(),
42             queue_tail: Cell::new(ptr::null()),
43             prev: Cell::new(ptr::null()),
44             next: Cell::new(ptr::null()),
45         }
46     }
47 }
48 
49 // Invokes the given closure with a reference to the current thread `ThreadData`.
50 #[inline]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T51 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
52     let mut thread_data_ptr = ptr::null();
53     // If ThreadData is expensive to construct, then we want to use a cached
54     // version in thread-local storage if possible.
55     if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
56         thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
57         if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
58             thread_data_ptr = tls_thread_data;
59         }
60     }
61     // Otherwise just create a ThreadData on the stack
62     let mut thread_data_storage = None;
63     if thread_data_ptr.is_null() {
64         thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
65     }
66 
67     f(unsafe { &*thread_data_ptr })
68 }
69 
70 const LOCKED_BIT: usize = 1;
71 const QUEUE_LOCKED_BIT: usize = 2;
72 const QUEUE_MASK: usize = !3;
73 
74 // Word-sized lock that is used to implement the parking_lot API. Since this
75 // can't use parking_lot, it instead manages its own queue of waiting threads.
76 pub struct WordLock {
77     state: AtomicUsize,
78 }
79 
80 impl WordLock {
81     /// Returns a new, unlocked, WordLock.
new() -> Self82     pub const fn new() -> Self {
83         WordLock {
84             state: AtomicUsize::new(0),
85         }
86     }
87 
88     #[inline]
lock(&self)89     pub fn lock(&self) {
90         if self
91             .state
92             .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
93             .is_ok()
94         {
95             return;
96         }
97         self.lock_slow();
98     }
99 
100     /// Must not be called on an already unlocked `WordLock`!
101     #[inline]
unlock(&self)102     pub unsafe fn unlock(&self) {
103         let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
104         if state.is_queue_locked() || state.queue_head().is_null() {
105             return;
106         }
107         self.unlock_slow();
108     }
109 
110     #[cold]
lock_slow(&self)111     fn lock_slow(&self) {
112         let mut spinwait = SpinWait::new();
113         let mut state = self.state.load(Ordering::Relaxed);
114         loop {
115             // Grab the lock if it isn't locked, even if there is a queue on it
116             if !state.is_locked() {
117                 match self.state.compare_exchange_weak(
118                     state,
119                     state | LOCKED_BIT,
120                     Ordering::Acquire,
121                     Ordering::Relaxed,
122                 ) {
123                     Ok(_) => return,
124                     Err(x) => state = x,
125                 }
126                 continue;
127             }
128 
129             // If there is no queue, try spinning a few times
130             if state.queue_head().is_null() && spinwait.spin() {
131                 state = self.state.load(Ordering::Relaxed);
132                 continue;
133             }
134 
135             // Get our thread data and prepare it for parking
136             state = with_thread_data(|thread_data| {
137                 // The pthread implementation is still unsafe, so we need to surround `prepare_park`
138                 // with `unsafe {}`.
139                 #[allow(unused_unsafe)]
140                 unsafe {
141                     thread_data.parker.prepare_park();
142                 }
143 
144                 // Add our thread to the front of the queue
145                 let queue_head = state.queue_head();
146                 if queue_head.is_null() {
147                     thread_data.queue_tail.set(thread_data);
148                     thread_data.prev.set(ptr::null());
149                 } else {
150                     thread_data.queue_tail.set(ptr::null());
151                     thread_data.prev.set(ptr::null());
152                     thread_data.next.set(queue_head);
153                 }
154                 if let Err(x) = self.state.compare_exchange_weak(
155                     state,
156                     state.with_queue_head(thread_data),
157                     Ordering::Release,
158                     Ordering::Relaxed,
159                 ) {
160                     return x;
161                 }
162 
163                 // Sleep until we are woken up by an unlock
164                 // Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
165                 #[allow(unused_unsafe)]
166                 unsafe {
167                     thread_data.parker.park();
168                 }
169 
170                 // Loop back and try locking again
171                 spinwait.reset();
172                 self.state.load(Ordering::Relaxed)
173             });
174         }
175     }
176 
177     #[cold]
unlock_slow(&self)178     fn unlock_slow(&self) {
179         let mut state = self.state.load(Ordering::Relaxed);
180         loop {
181             // We just unlocked the WordLock. Just check if there is a thread
182             // to wake up. If the queue is locked then another thread is already
183             // taking care of waking up a thread.
184             if state.is_queue_locked() || state.queue_head().is_null() {
185                 return;
186             }
187 
188             // Try to grab the queue lock
189             match self.state.compare_exchange_weak(
190                 state,
191                 state | QUEUE_LOCKED_BIT,
192                 Ordering::Acquire,
193                 Ordering::Relaxed,
194             ) {
195                 Ok(_) => break,
196                 Err(x) => state = x,
197             }
198         }
199 
200         // Now we have the queue lock and the queue is non-empty
201         'outer: loop {
202             // First, we need to fill in the prev pointers for any newly added
203             // threads. We do this until we reach a node that we previously
204             // processed, which has a non-null queue_tail pointer.
205             let queue_head = state.queue_head();
206             let mut queue_tail;
207             let mut current = queue_head;
208             loop {
209                 queue_tail = unsafe { (*current).queue_tail.get() };
210                 if !queue_tail.is_null() {
211                     break;
212                 }
213                 unsafe {
214                     let next = (*current).next.get();
215                     (*next).prev.set(current);
216                     current = next;
217                 }
218             }
219 
220             // Set queue_tail on the queue head to indicate that the whole list
221             // has prev pointers set correctly.
222             unsafe {
223                 (*queue_head).queue_tail.set(queue_tail);
224             }
225 
226             // If the WordLock is locked, then there is no point waking up a
227             // thread now. Instead we let the next unlocker take care of waking
228             // up a thread.
229             if state.is_locked() {
230                 match self.state.compare_exchange_weak(
231                     state,
232                     state & !QUEUE_LOCKED_BIT,
233                     Ordering::Release,
234                     Ordering::Relaxed,
235                 ) {
236                     Ok(_) => return,
237                     Err(x) => state = x,
238                 }
239 
240                 // Need an acquire fence before reading the new queue
241                 fence(Ordering::Acquire);
242                 continue;
243             }
244 
245             // Remove the last thread from the queue and unlock the queue
246             let new_tail = unsafe { (*queue_tail).prev.get() };
247             if new_tail.is_null() {
248                 loop {
249                     match self.state.compare_exchange_weak(
250                         state,
251                         state & LOCKED_BIT,
252                         Ordering::Release,
253                         Ordering::Relaxed,
254                     ) {
255                         Ok(_) => break,
256                         Err(x) => state = x,
257                     }
258 
259                     // If the compare_exchange failed because a new thread was
260                     // added to the queue then we need to re-scan the queue to
261                     // find the previous element.
262                     if state.queue_head().is_null() {
263                         continue;
264                     } else {
265                         // Need an acquire fence before reading the new queue
266                         fence(Ordering::Acquire);
267                         continue 'outer;
268                     }
269                 }
270             } else {
271                 unsafe {
272                     (*queue_head).queue_tail.set(new_tail);
273                 }
274                 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
275             }
276 
277             // Finally, wake up the thread we removed from the queue. Note that
278             // we don't need to worry about any races here since the thread is
279             // guaranteed to be sleeping right now and we are the only one who
280             // can wake it up.
281             unsafe {
282                 (*queue_tail).parker.unpark_lock().unpark();
283             }
284             break;
285         }
286     }
287 }
288 
289 trait LockState {
is_locked(self) -> bool290     fn is_locked(self) -> bool;
is_queue_locked(self) -> bool291     fn is_queue_locked(self) -> bool;
queue_head(self) -> *const ThreadData292     fn queue_head(self) -> *const ThreadData;
with_queue_head(self, thread_data: *const ThreadData) -> Self293     fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
294 }
295 
296 impl LockState for usize {
297     #[inline]
is_locked(self) -> bool298     fn is_locked(self) -> bool {
299         self & LOCKED_BIT != 0
300     }
301 
302     #[inline]
is_queue_locked(self) -> bool303     fn is_queue_locked(self) -> bool {
304         self & QUEUE_LOCKED_BIT != 0
305     }
306 
307     #[inline]
queue_head(self) -> *const ThreadData308     fn queue_head(self) -> *const ThreadData {
309         (self & QUEUE_MASK) as *const ThreadData
310     }
311 
312     #[inline]
with_queue_head(self, thread_data: *const ThreadData) -> Self313     fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
314         (self & !QUEUE_MASK) | thread_data as *const _ as usize
315     }
316 }
317