1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::spinwait::SpinWait;
9 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
10 use core::{
11     cell::Cell,
12     mem, ptr,
13     sync::atomic::{fence, AtomicUsize, Ordering},
14 };
15 
16 struct ThreadData {
17     parker: ThreadParker,
18 
19     // Linked list of threads in the queue. The queue is split into two parts:
20     // the processed part and the unprocessed part. When new nodes are added to
21     // the list, they only have the next pointer set, and queue_tail is null.
22     //
23     // Nodes are processed with the queue lock held, which consists of setting
24     // the prev pointer for each node and setting the queue_tail pointer on the
25     // first processed node of the list.
26     //
27     // This setup allows nodes to be added to the queue without a lock, while
28     // still allowing O(1) removal of nodes from the processed part of the list.
29     // The only cost is the O(n) processing, but this only needs to be done
30     // once for each node, and therefore isn't too expensive.
31     queue_tail: Cell<*const ThreadData>,
32     prev: Cell<*const ThreadData>,
33     next: Cell<*const ThreadData>,
34 }
35 
36 impl ThreadData {
37     #[inline]
new() -> ThreadData38     fn new() -> ThreadData {
39         assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
40         ThreadData {
41             parker: ThreadParker::new(),
42             queue_tail: Cell::new(ptr::null()),
43             prev: Cell::new(ptr::null()),
44             next: Cell::new(ptr::null()),
45         }
46     }
47 }
48 
49 // Invokes the given closure with a reference to the current thread `ThreadData`.
50 #[inline]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T51 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
52     let mut thread_data_ptr = ptr::null();
53     // If ThreadData is expensive to construct, then we want to use a cached
54     // version in thread-local storage if possible.
55     if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
56         thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
57         if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
58             thread_data_ptr = tls_thread_data;
59         }
60     }
61     // Otherwise just create a ThreadData on the stack
62     let mut thread_data_storage = None;
63     if thread_data_ptr.is_null() {
64         thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
65     }
66 
67     f(unsafe { &*thread_data_ptr })
68 }
69 
70 const LOCKED_BIT: usize = 1;
71 const QUEUE_LOCKED_BIT: usize = 2;
72 const QUEUE_MASK: usize = !3;
73 
74 // Word-sized lock that is used to implement the parking_lot API. Since this
75 // can't use parking_lot, it instead manages its own queue of waiting threads.
76 pub struct WordLock {
77     state: AtomicUsize,
78 }
79 
80 impl WordLock {
81     pub const INIT: WordLock = WordLock {
82         state: AtomicUsize::new(0),
83     };
84 
85     #[inline]
lock(&self)86     pub fn lock(&self) {
87         if self
88             .state
89             .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
90             .is_ok()
91         {
92             return;
93         }
94         self.lock_slow();
95     }
96 
97     /// Must not be called on an already unlocked `WordLock`!
98     #[inline]
unlock(&self)99     pub unsafe fn unlock(&self) {
100         let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
101         if state.is_queue_locked() || state.queue_head().is_null() {
102             return;
103         }
104         self.unlock_slow();
105     }
106 
107     #[cold]
lock_slow(&self)108     fn lock_slow(&self) {
109         let mut spinwait = SpinWait::new();
110         let mut state = self.state.load(Ordering::Relaxed);
111         loop {
112             // Grab the lock if it isn't locked, even if there is a queue on it
113             if !state.is_locked() {
114                 match self.state.compare_exchange_weak(
115                     state,
116                     state | LOCKED_BIT,
117                     Ordering::Acquire,
118                     Ordering::Relaxed,
119                 ) {
120                     Ok(_) => return,
121                     Err(x) => state = x,
122                 }
123                 continue;
124             }
125 
126             // If there is no queue, try spinning a few times
127             if state.queue_head().is_null() && spinwait.spin() {
128                 state = self.state.load(Ordering::Relaxed);
129                 continue;
130             }
131 
132             // Get our thread data and prepare it for parking
133             state = with_thread_data(|thread_data| {
134                 // The pthread implementation is still unsafe, so we need to surround `prepare_park`
135                 // with `unsafe {}`.
136                 #[allow(unused_unsafe)]
137                 unsafe {
138                     thread_data.parker.prepare_park();
139                 }
140 
141                 // Add our thread to the front of the queue
142                 let queue_head = state.queue_head();
143                 if queue_head.is_null() {
144                     thread_data.queue_tail.set(thread_data);
145                     thread_data.prev.set(ptr::null());
146                 } else {
147                     thread_data.queue_tail.set(ptr::null());
148                     thread_data.prev.set(ptr::null());
149                     thread_data.next.set(queue_head);
150                 }
151                 if let Err(x) = self.state.compare_exchange_weak(
152                     state,
153                     state.with_queue_head(thread_data),
154                     Ordering::Release,
155                     Ordering::Relaxed,
156                 ) {
157                     return x;
158                 }
159 
160                 // Sleep until we are woken up by an unlock
161                 // Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
162                 #[allow(unused_unsafe)]
163                 unsafe {
164                     thread_data.parker.park();
165                 }
166 
167                 // Loop back and try locking again
168                 spinwait.reset();
169                 self.state.load(Ordering::Relaxed)
170             });
171         }
172     }
173 
174     #[cold]
unlock_slow(&self)175     fn unlock_slow(&self) {
176         let mut state = self.state.load(Ordering::Relaxed);
177         loop {
178             // We just unlocked the WordLock. Just check if there is a thread
179             // to wake up. If the queue is locked then another thread is already
180             // taking care of waking up a thread.
181             if state.is_queue_locked() || state.queue_head().is_null() {
182                 return;
183             }
184 
185             // Try to grab the queue lock
186             match self.state.compare_exchange_weak(
187                 state,
188                 state | QUEUE_LOCKED_BIT,
189                 Ordering::Acquire,
190                 Ordering::Relaxed,
191             ) {
192                 Ok(_) => break,
193                 Err(x) => state = x,
194             }
195         }
196 
197         // Now we have the queue lock and the queue is non-empty
198         'outer: loop {
199             // First, we need to fill in the prev pointers for any newly added
200             // threads. We do this until we reach a node that we previously
201             // processed, which has a non-null queue_tail pointer.
202             let queue_head = state.queue_head();
203             let mut queue_tail;
204             let mut current = queue_head;
205             loop {
206                 queue_tail = unsafe { (*current).queue_tail.get() };
207                 if !queue_tail.is_null() {
208                     break;
209                 }
210                 unsafe {
211                     let next = (*current).next.get();
212                     (*next).prev.set(current);
213                     current = next;
214                 }
215             }
216 
217             // Set queue_tail on the queue head to indicate that the whole list
218             // has prev pointers set correctly.
219             unsafe {
220                 (*queue_head).queue_tail.set(queue_tail);
221             }
222 
223             // If the WordLock is locked, then there is no point waking up a
224             // thread now. Instead we let the next unlocker take care of waking
225             // up a thread.
226             if state.is_locked() {
227                 match self.state.compare_exchange_weak(
228                     state,
229                     state & !QUEUE_LOCKED_BIT,
230                     Ordering::Release,
231                     Ordering::Relaxed,
232                 ) {
233                     Ok(_) => return,
234                     Err(x) => state = x,
235                 }
236 
237                 // Need an acquire fence before reading the new queue
238                 fence(Ordering::Acquire);
239                 continue;
240             }
241 
242             // Remove the last thread from the queue and unlock the queue
243             let new_tail = unsafe { (*queue_tail).prev.get() };
244             if new_tail.is_null() {
245                 loop {
246                     match self.state.compare_exchange_weak(
247                         state,
248                         state & LOCKED_BIT,
249                         Ordering::Release,
250                         Ordering::Relaxed,
251                     ) {
252                         Ok(_) => break,
253                         Err(x) => state = x,
254                     }
255 
256                     // If the compare_exchange failed because a new thread was
257                     // added to the queue then we need to re-scan the queue to
258                     // find the previous element.
259                     if state.queue_head().is_null() {
260                         continue;
261                     } else {
262                         // Need an acquire fence before reading the new queue
263                         fence(Ordering::Acquire);
264                         continue 'outer;
265                     }
266                 }
267             } else {
268                 unsafe {
269                     (*queue_head).queue_tail.set(new_tail);
270                 }
271                 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
272             }
273 
274             // Finally, wake up the thread we removed from the queue. Note that
275             // we don't need to worry about any races here since the thread is
276             // guaranteed to be sleeping right now and we are the only one who
277             // can wake it up.
278             unsafe {
279                 (*queue_tail).parker.unpark_lock().unpark();
280             }
281             break;
282         }
283     }
284 }
285 
286 trait LockState {
is_locked(self) -> bool287     fn is_locked(self) -> bool;
is_queue_locked(self) -> bool288     fn is_queue_locked(self) -> bool;
queue_head(self) -> *const ThreadData289     fn queue_head(self) -> *const ThreadData;
with_queue_head(self, thread_data: *const ThreadData) -> Self290     fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
291 }
292 
293 impl LockState for usize {
294     #[inline]
is_locked(self) -> bool295     fn is_locked(self) -> bool {
296         self & LOCKED_BIT != 0
297     }
298 
299     #[inline]
is_queue_locked(self) -> bool300     fn is_queue_locked(self) -> bool {
301         self & QUEUE_LOCKED_BIT != 0
302     }
303 
304     #[inline]
queue_head(self) -> *const ThreadData305     fn queue_head(self) -> *const ThreadData {
306         (self & QUEUE_MASK) as *const ThreadData
307     }
308 
309     #[inline]
with_queue_head(self, thread_data: *const ThreadData) -> Self310     fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
311         (self & !QUEUE_MASK) | thread_data as *const _ as usize
312     }
313 }
314