1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use spinwait::SpinWait;
9 use std::cell::Cell;
10 use std::mem;
11 #[cfg(not(has_localkey_try_with))]
12 use std::panic;
13 use std::ptr;
14 use std::sync::atomic::{fence, AtomicUsize, Ordering};
15 use std::thread::LocalKey;
16 use thread_parker::ThreadParker;
17 
18 struct ThreadData {
19     parker: ThreadParker,
20 
21     // Linked list of threads in the queue. The queue is split into two parts:
22     // the processed part and the unprocessed part. When new nodes are added to
23     // the list, they only have the next pointer set, and queue_tail is null.
24     //
25     // Nodes are processed with the queue lock held, which consists of setting
26     // the prev pointer for each node and setting the queue_tail pointer on the
27     // first processed node of the list.
28     //
29     // This setup allows nodes to be added to the queue without a lock, while
30     // still allowing O(1) removal of nodes from the processed part of the list.
31     // The only cost is the O(n) processing, but this only needs to be done
32     // once for each node, and therefore isn't too expensive.
33     queue_tail: Cell<*const ThreadData>,
34     prev: Cell<*const ThreadData>,
35     next: Cell<*const ThreadData>,
36 }
37 
38 impl ThreadData {
new() -> ThreadData39     fn new() -> ThreadData {
40         ThreadData {
41             parker: ThreadParker::new(),
42             queue_tail: Cell::new(ptr::null()),
43             prev: Cell::new(ptr::null()),
44             next: Cell::new(ptr::null()),
45         }
46     }
47 }
48 
49 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData50 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
51     // Try to read from thread-local storage, but return None if the TLS has
52     // already been destroyed.
53     #[cfg(has_localkey_try_with)]
54     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
55         key.try_with(|x| x as *const ThreadData).ok()
56     }
57     #[cfg(not(has_localkey_try_with))]
58     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
59         panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
60     }
61 
62     // If ThreadData is expensive to construct, then we want to use a cached
63     // version in thread-local storage if possible.
64     if !cfg!(windows) && !cfg!(all(feature = "nightly", target_os = "linux")) {
65         thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
66         if let Some(tls) = try_get_tls(&THREAD_DATA) {
67             return &*tls;
68         }
69     }
70 
71     // Otherwise just create a ThreadData on the stack
72     *local = Some(ThreadData::new());
73     local.as_ref().unwrap()
74 }
75 
76 const LOCKED_BIT: usize = 1;
77 const QUEUE_LOCKED_BIT: usize = 2;
78 const QUEUE_MASK: usize = !3;
79 
80 // Word-sized lock that is used to implement the parking_lot API. Since this
81 // can't use parking_lot, it instead manages its own queue of waiting threads.
82 pub struct WordLock {
83     state: AtomicUsize,
84 }
85 
86 impl WordLock {
87     #[inline]
new() -> WordLock88     pub fn new() -> WordLock {
89         WordLock {
90             state: AtomicUsize::new(0),
91         }
92     }
93 
94     #[inline]
lock(&self)95     pub unsafe fn lock(&self) {
96         if self
97             .state
98             .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
99             .is_ok()
100         {
101             return;
102         }
103         self.lock_slow();
104     }
105 
106     #[inline]
unlock(&self)107     pub unsafe fn unlock(&self) {
108         let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
109         if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
110             return;
111         }
112         self.unlock_slow();
113     }
114 
115     #[cold]
116     #[inline(never)]
lock_slow(&self)117     unsafe fn lock_slow(&self) {
118         let mut spinwait = SpinWait::new();
119         let mut state = self.state.load(Ordering::Relaxed);
120         loop {
121             // Grab the lock if it isn't locked, even if there is a queue on it
122             if state & LOCKED_BIT == 0 {
123                 match self.state.compare_exchange_weak(
124                     state,
125                     state | LOCKED_BIT,
126                     Ordering::Acquire,
127                     Ordering::Relaxed,
128                 ) {
129                     Ok(_) => return,
130                     Err(x) => state = x,
131                 }
132                 continue;
133             }
134 
135             // If there is no queue, try spinning a few times
136             if state & QUEUE_MASK == 0 && spinwait.spin() {
137                 state = self.state.load(Ordering::Relaxed);
138                 continue;
139             }
140 
141             // Get our thread data and prepare it for parking
142             let mut thread_data = None;
143             let thread_data = get_thread_data(&mut thread_data);
144             assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
145             thread_data.parker.prepare_park();
146 
147             // Add our thread to the front of the queue
148             let queue_head = (state & QUEUE_MASK) as *const ThreadData;
149             if queue_head.is_null() {
150                 thread_data.queue_tail.set(thread_data);
151                 thread_data.prev.set(ptr::null());
152             } else {
153                 thread_data.queue_tail.set(ptr::null());
154                 thread_data.prev.set(ptr::null());
155                 thread_data.next.set(queue_head);
156             }
157             if let Err(x) = self.state.compare_exchange_weak(
158                 state,
159                 (state & !QUEUE_MASK) | thread_data as *const _ as usize,
160                 Ordering::Release,
161                 Ordering::Relaxed,
162             ) {
163                 state = x;
164                 continue;
165             }
166 
167             // Sleep until we are woken up by an unlock
168             thread_data.parker.park();
169 
170             // Loop back and try locking again
171             spinwait.reset();
172             self.state.load(Ordering::Relaxed);
173         }
174     }
175 
176     #[cold]
177     #[inline(never)]
unlock_slow(&self)178     unsafe fn unlock_slow(&self) {
179         let mut state = self.state.load(Ordering::Relaxed);
180         loop {
181             // We just unlocked the WordLock. Just check if there is a thread
182             // to wake up. If the queue is locked then another thread is already
183             // taking care of waking up a thread.
184             if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
185                 return;
186             }
187 
188             // Try to grab the queue lock
189             match self.state.compare_exchange_weak(
190                 state,
191                 state | QUEUE_LOCKED_BIT,
192                 Ordering::Acquire,
193                 Ordering::Relaxed,
194             ) {
195                 Ok(_) => break,
196                 Err(x) => state = x,
197             }
198         }
199 
200         // Now we have the queue lock and the queue is non-empty
201         'outer: loop {
202             // First, we need to fill in the prev pointers for any newly added
203             // threads. We do this until we reach a node that we previously
204             // processed, which has a non-null queue_tail pointer.
205             let queue_head = (state & QUEUE_MASK) as *const ThreadData;
206             let mut queue_tail;
207             let mut current = queue_head;
208             loop {
209                 queue_tail = (*current).queue_tail.get();
210                 if !queue_tail.is_null() {
211                     break;
212                 }
213                 let next = (*current).next.get();
214                 (*next).prev.set(current);
215                 current = next;
216             }
217 
218             // Set queue_tail on the queue head to indicate that the whole list
219             // has prev pointers set correctly.
220             (*queue_head).queue_tail.set(queue_tail);
221 
222             // If the WordLock is locked, then there is no point waking up a
223             // thread now. Instead we let the next unlocker take care of waking
224             // up a thread.
225             if state & LOCKED_BIT != 0 {
226                 match self.state.compare_exchange_weak(
227                     state,
228                     state & !QUEUE_LOCKED_BIT,
229                     Ordering::Release,
230                     Ordering::Relaxed,
231                 ) {
232                     Ok(_) => return,
233                     Err(x) => state = x,
234                 }
235 
236                 // Need an acquire fence before reading the new queue
237                 fence(Ordering::Acquire);
238                 continue;
239             }
240 
241             // Remove the last thread from the queue and unlock the queue
242             let new_tail = (*queue_tail).prev.get();
243             if new_tail.is_null() {
244                 loop {
245                     match self.state.compare_exchange_weak(
246                         state,
247                         state & LOCKED_BIT,
248                         Ordering::Release,
249                         Ordering::Relaxed,
250                     ) {
251                         Ok(_) => break,
252                         Err(x) => state = x,
253                     }
254 
255                     // If the compare_exchange failed because a new thread was
256                     // added to the queue then we need to re-scan the queue to
257                     // find the previous element.
258                     if state & QUEUE_MASK == 0 {
259                         continue;
260                     } else {
261                         // Need an acquire fence before reading the new queue
262                         fence(Ordering::Acquire);
263                         continue 'outer;
264                     }
265                 }
266             } else {
267                 (*queue_head).queue_tail.set(new_tail);
268                 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
269             }
270 
271             // Finally, wake up the thread we removed from the queue. Note that
272             // we don't need to worry about any races here since the thread is
273             // guaranteed to be sleeping right now and we are the only one who
274             // can wake it up.
275             (*queue_tail).parker.unpark_lock().unpark();
276             break;
277         }
278     }
279 }
280