1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use std::sync::atomic::{fence, AtomicUsize, Ordering};
9 use std::ptr;
10 use std::mem;
11 use std::cell::Cell;
12 use std::thread::LocalKey;
13 #[cfg(not(feature = "nightly"))]
14 use std::panic;
15 use spinwait::SpinWait;
16 use thread_parker::ThreadParker;
17 
18 struct ThreadData {
19     parker: ThreadParker,
20 
21     // Linked list of threads in the queue. The queue is split into two parts:
22     // the processed part and the unprocessed part. When new nodes are added to
23     // the list, they only have the next pointer set, and queue_tail is null.
24     //
25     // Nodes are processed with the queue lock held, which consists of setting
26     // the prev pointer for each node and setting the queue_tail pointer on the
27     // first processed node of the list.
28     //
29     // This setup allows nodes to be added to the queue without a lock, while
30     // still allowing O(1) removal of nodes from the processed part of the list.
31     // The only cost is the O(n) processing, but this only needs to be done
32     // once for each node, and therefore isn't too expensive.
33     queue_tail: Cell<*const ThreadData>,
34     prev: Cell<*const ThreadData>,
35     next: Cell<*const ThreadData>,
36 }
37 
38 impl ThreadData {
new() -> ThreadData39     fn new() -> ThreadData {
40         ThreadData {
41             parker: ThreadParker::new(),
42             queue_tail: Cell::new(ptr::null()),
43             prev: Cell::new(ptr::null()),
44             next: Cell::new(ptr::null()),
45         }
46     }
47 }
48 
49 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData50 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
51     // Try to read from thread-local storage, but return None if the TLS has
52     // already been destroyed.
53     #[cfg(feature = "nightly")]
54     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
55         key.try_with(|x| x as *const ThreadData).ok()
56     }
57     #[cfg(not(feature = "nightly"))]
58     fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
59         panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
60     }
61 
62     // If ThreadData is expensive to construct, then we want to use a cached
63     // version in thread-local storage if possible.
64     if !cfg!(windows) && !cfg!(all(feature = "nightly", target_os = "linux")) {
65         thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
66         if let Some(tls) = try_get_tls(&THREAD_DATA) {
67             return &*tls;
68         }
69     }
70 
71     // Otherwise just create a ThreadData on the stack
72     *local = Some(ThreadData::new());
73     local.as_ref().unwrap()
74 }
75 
76 const LOCKED_BIT: usize = 1;
77 const QUEUE_LOCKED_BIT: usize = 2;
78 const QUEUE_MASK: usize = !3;
79 
80 // Word-sized lock that is used to implement the parking_lot API. Since this
81 // can't use parking_lot, it instead manages its own queue of waiting threads.
82 pub struct WordLock {
83     state: AtomicUsize,
84 }
85 
86 impl WordLock {
87     #[inline]
new() -> WordLock88     pub fn new() -> WordLock {
89         WordLock {
90             state: AtomicUsize::new(0),
91         }
92     }
93 
94     #[inline]
lock(&self)95     pub unsafe fn lock(&self) {
96         if self.state
97             .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
98             .is_ok()
99         {
100             return;
101         }
102         self.lock_slow();
103     }
104 
105     #[inline]
unlock(&self)106     pub unsafe fn unlock(&self) {
107         let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
108         if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
109             return;
110         }
111         self.unlock_slow();
112     }
113 
114     #[cold]
115     #[inline(never)]
lock_slow(&self)116     unsafe fn lock_slow(&self) {
117         let mut spinwait = SpinWait::new();
118         let mut state = self.state.load(Ordering::Relaxed);
119         loop {
120             // Grab the lock if it isn't locked, even if there is a queue on it
121             if state & LOCKED_BIT == 0 {
122                 match self.state.compare_exchange_weak(
123                     state,
124                     state | LOCKED_BIT,
125                     Ordering::Acquire,
126                     Ordering::Relaxed,
127                 ) {
128                     Ok(_) => return,
129                     Err(x) => state = x,
130                 }
131                 continue;
132             }
133 
134             // If there is no queue, try spinning a few times
135             if state & QUEUE_MASK == 0 && spinwait.spin() {
136                 state = self.state.load(Ordering::Relaxed);
137                 continue;
138             }
139 
140             // Get our thread data and prepare it for parking
141             let mut thread_data = None;
142             let thread_data = get_thread_data(&mut thread_data);
143             assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
144             thread_data.parker.prepare_park();
145 
146             // Add our thread to the front of the queue
147             let queue_head = (state & QUEUE_MASK) as *const ThreadData;
148             if queue_head.is_null() {
149                 thread_data.queue_tail.set(thread_data);
150                 thread_data.prev.set(ptr::null());
151             } else {
152                 thread_data.queue_tail.set(ptr::null());
153                 thread_data.prev.set(ptr::null());
154                 thread_data.next.set(queue_head);
155             }
156             if let Err(x) = self.state.compare_exchange_weak(
157                 state,
158                 (state & !QUEUE_MASK) | thread_data as *const _ as usize,
159                 Ordering::Release,
160                 Ordering::Relaxed,
161             ) {
162                 state = x;
163                 continue;
164             }
165 
166             // Sleep until we are woken up by an unlock
167             thread_data.parker.park();
168 
169             // Loop back and try locking again
170             spinwait.reset();
171             self.state.load(Ordering::Relaxed);
172         }
173     }
174 
175     #[cold]
176     #[inline(never)]
unlock_slow(&self)177     unsafe fn unlock_slow(&self) {
178         let mut state = self.state.load(Ordering::Relaxed);
179         loop {
180             // We just unlocked the WordLock. Just check if there is a thread
181             // to wake up. If the queue is locked then another thread is already
182             // taking care of waking up a thread.
183             if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
184                 return;
185             }
186 
187             // Try to grab the queue lock
188             match self.state.compare_exchange_weak(
189                 state,
190                 state | QUEUE_LOCKED_BIT,
191                 Ordering::Acquire,
192                 Ordering::Relaxed,
193             ) {
194                 Ok(_) => break,
195                 Err(x) => state = x,
196             }
197         }
198 
199         // Now we have the queue lock and the queue is non-empty
200         'outer: loop {
201             // First, we need to fill in the prev pointers for any newly added
202             // threads. We do this until we reach a node that we previously
203             // processed, which has a non-null queue_tail pointer.
204             let queue_head = (state & QUEUE_MASK) as *const ThreadData;
205             let mut queue_tail;
206             let mut current = queue_head;
207             loop {
208                 queue_tail = (*current).queue_tail.get();
209                 if !queue_tail.is_null() {
210                     break;
211                 }
212                 let next = (*current).next.get();
213                 (*next).prev.set(current);
214                 current = next;
215             }
216 
217             // Set queue_tail on the queue head to indicate that the whole list
218             // has prev pointers set correctly.
219             (*queue_head).queue_tail.set(queue_tail);
220 
221             // If the WordLock is locked, then there is no point waking up a
222             // thread now. Instead we let the next unlocker take care of waking
223             // up a thread.
224             if state & LOCKED_BIT != 0 {
225                 match self.state.compare_exchange_weak(
226                     state,
227                     state & !QUEUE_LOCKED_BIT,
228                     Ordering::Release,
229                     Ordering::Relaxed,
230                 ) {
231                     Ok(_) => return,
232                     Err(x) => state = x,
233                 }
234 
235                 // Need an acquire fence before reading the new queue
236                 fence(Ordering::Acquire);
237                 continue;
238             }
239 
240             // Remove the last thread from the queue and unlock the queue
241             let new_tail = (*queue_tail).prev.get();
242             if new_tail.is_null() {
243                 loop {
244                     match self.state.compare_exchange_weak(
245                         state,
246                         state & LOCKED_BIT,
247                         Ordering::Release,
248                         Ordering::Relaxed,
249                     ) {
250                         Ok(_) => break,
251                         Err(x) => state = x,
252                     }
253 
254                     // If the compare_exchange failed because a new thread was
255                     // added to the queue then we need to re-scan the queue to
256                     // find the previous element.
257                     if state & QUEUE_MASK == 0 {
258                         continue;
259                     } else {
260                         // Need an acquire fence before reading the new queue
261                         fence(Ordering::Acquire);
262                         continue 'outer;
263                     }
264                 }
265             } else {
266                 (*queue_head).queue_tail.set(new_tail);
267                 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
268             }
269 
270             // Finally, wake up the thread we removed from the queue. Note that
271             // we don't need to worry about any races here since the thread is
272             // guaranteed to be sleeping right now and we are the only one who
273             // can wake it up.
274             (*queue_tail).parker.unpark_lock().unpark();
275             break;
276         }
277     }
278 }
279