1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use spinwait::SpinWait;
9 use std::cell::Cell;
10 use std::mem;
11 #[cfg(not(has_localkey_try_with))]
12 use std::panic;
13 use std::ptr;
14 use std::sync::atomic::{fence, AtomicUsize, Ordering};
15 use std::thread::LocalKey;
16 use thread_parker::ThreadParker;
17
18 struct ThreadData {
19 parker: ThreadParker,
20
21 // Linked list of threads in the queue. The queue is split into two parts:
22 // the processed part and the unprocessed part. When new nodes are added to
23 // the list, they only have the next pointer set, and queue_tail is null.
24 //
25 // Nodes are processed with the queue lock held, which consists of setting
26 // the prev pointer for each node and setting the queue_tail pointer on the
27 // first processed node of the list.
28 //
29 // This setup allows nodes to be added to the queue without a lock, while
30 // still allowing O(1) removal of nodes from the processed part of the list.
31 // The only cost is the O(n) processing, but this only needs to be done
32 // once for each node, and therefore isn't too expensive.
33 queue_tail: Cell<*const ThreadData>,
34 prev: Cell<*const ThreadData>,
35 next: Cell<*const ThreadData>,
36 }
37
38 impl ThreadData {
new() -> ThreadData39 fn new() -> ThreadData {
40 ThreadData {
41 parker: ThreadParker::new(),
42 queue_tail: Cell::new(ptr::null()),
43 prev: Cell::new(ptr::null()),
44 next: Cell::new(ptr::null()),
45 }
46 }
47 }
48
49 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData50 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
51 // Try to read from thread-local storage, but return None if the TLS has
52 // already been destroyed.
53 #[cfg(has_localkey_try_with)]
54 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
55 key.try_with(|x| x as *const ThreadData).ok()
56 }
57 #[cfg(not(has_localkey_try_with))]
58 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
59 panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
60 }
61
62 // If ThreadData is expensive to construct, then we want to use a cached
63 // version in thread-local storage if possible.
64 if !cfg!(windows) && !cfg!(all(feature = "nightly", target_os = "linux")) {
65 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
66 if let Some(tls) = try_get_tls(&THREAD_DATA) {
67 return &*tls;
68 }
69 }
70
71 // Otherwise just create a ThreadData on the stack
72 *local = Some(ThreadData::new());
73 local.as_ref().unwrap()
74 }
75
76 const LOCKED_BIT: usize = 1;
77 const QUEUE_LOCKED_BIT: usize = 2;
78 const QUEUE_MASK: usize = !3;
79
80 // Word-sized lock that is used to implement the parking_lot API. Since this
81 // can't use parking_lot, it instead manages its own queue of waiting threads.
82 pub struct WordLock {
83 state: AtomicUsize,
84 }
85
86 impl WordLock {
87 #[inline]
new() -> WordLock88 pub fn new() -> WordLock {
89 WordLock {
90 state: AtomicUsize::new(0),
91 }
92 }
93
94 #[inline]
lock(&self)95 pub unsafe fn lock(&self) {
96 if self
97 .state
98 .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.lock_slow();
104 }
105
106 #[inline]
unlock(&self)107 pub unsafe fn unlock(&self) {
108 let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
109 if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
110 return;
111 }
112 self.unlock_slow();
113 }
114
115 #[cold]
116 #[inline(never)]
lock_slow(&self)117 unsafe fn lock_slow(&self) {
118 let mut spinwait = SpinWait::new();
119 let mut state = self.state.load(Ordering::Relaxed);
120 loop {
121 // Grab the lock if it isn't locked, even if there is a queue on it
122 if state & LOCKED_BIT == 0 {
123 match self.state.compare_exchange_weak(
124 state,
125 state | LOCKED_BIT,
126 Ordering::Acquire,
127 Ordering::Relaxed,
128 ) {
129 Ok(_) => return,
130 Err(x) => state = x,
131 }
132 continue;
133 }
134
135 // If there is no queue, try spinning a few times
136 if state & QUEUE_MASK == 0 && spinwait.spin() {
137 state = self.state.load(Ordering::Relaxed);
138 continue;
139 }
140
141 // Get our thread data and prepare it for parking
142 let mut thread_data = None;
143 let thread_data = get_thread_data(&mut thread_data);
144 assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
145 thread_data.parker.prepare_park();
146
147 // Add our thread to the front of the queue
148 let queue_head = (state & QUEUE_MASK) as *const ThreadData;
149 if queue_head.is_null() {
150 thread_data.queue_tail.set(thread_data);
151 thread_data.prev.set(ptr::null());
152 } else {
153 thread_data.queue_tail.set(ptr::null());
154 thread_data.prev.set(ptr::null());
155 thread_data.next.set(queue_head);
156 }
157 if let Err(x) = self.state.compare_exchange_weak(
158 state,
159 (state & !QUEUE_MASK) | thread_data as *const _ as usize,
160 Ordering::Release,
161 Ordering::Relaxed,
162 ) {
163 state = x;
164 continue;
165 }
166
167 // Sleep until we are woken up by an unlock
168 thread_data.parker.park();
169
170 // Loop back and try locking again
171 spinwait.reset();
172 self.state.load(Ordering::Relaxed);
173 }
174 }
175
176 #[cold]
177 #[inline(never)]
unlock_slow(&self)178 unsafe fn unlock_slow(&self) {
179 let mut state = self.state.load(Ordering::Relaxed);
180 loop {
181 // We just unlocked the WordLock. Just check if there is a thread
182 // to wake up. If the queue is locked then another thread is already
183 // taking care of waking up a thread.
184 if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
185 return;
186 }
187
188 // Try to grab the queue lock
189 match self.state.compare_exchange_weak(
190 state,
191 state | QUEUE_LOCKED_BIT,
192 Ordering::Acquire,
193 Ordering::Relaxed,
194 ) {
195 Ok(_) => break,
196 Err(x) => state = x,
197 }
198 }
199
200 // Now we have the queue lock and the queue is non-empty
201 'outer: loop {
202 // First, we need to fill in the prev pointers for any newly added
203 // threads. We do this until we reach a node that we previously
204 // processed, which has a non-null queue_tail pointer.
205 let queue_head = (state & QUEUE_MASK) as *const ThreadData;
206 let mut queue_tail;
207 let mut current = queue_head;
208 loop {
209 queue_tail = (*current).queue_tail.get();
210 if !queue_tail.is_null() {
211 break;
212 }
213 let next = (*current).next.get();
214 (*next).prev.set(current);
215 current = next;
216 }
217
218 // Set queue_tail on the queue head to indicate that the whole list
219 // has prev pointers set correctly.
220 (*queue_head).queue_tail.set(queue_tail);
221
222 // If the WordLock is locked, then there is no point waking up a
223 // thread now. Instead we let the next unlocker take care of waking
224 // up a thread.
225 if state & LOCKED_BIT != 0 {
226 match self.state.compare_exchange_weak(
227 state,
228 state & !QUEUE_LOCKED_BIT,
229 Ordering::Release,
230 Ordering::Relaxed,
231 ) {
232 Ok(_) => return,
233 Err(x) => state = x,
234 }
235
236 // Need an acquire fence before reading the new queue
237 fence(Ordering::Acquire);
238 continue;
239 }
240
241 // Remove the last thread from the queue and unlock the queue
242 let new_tail = (*queue_tail).prev.get();
243 if new_tail.is_null() {
244 loop {
245 match self.state.compare_exchange_weak(
246 state,
247 state & LOCKED_BIT,
248 Ordering::Release,
249 Ordering::Relaxed,
250 ) {
251 Ok(_) => break,
252 Err(x) => state = x,
253 }
254
255 // If the compare_exchange failed because a new thread was
256 // added to the queue then we need to re-scan the queue to
257 // find the previous element.
258 if state & QUEUE_MASK == 0 {
259 continue;
260 } else {
261 // Need an acquire fence before reading the new queue
262 fence(Ordering::Acquire);
263 continue 'outer;
264 }
265 }
266 } else {
267 (*queue_head).queue_tail.set(new_tail);
268 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
269 }
270
271 // Finally, wake up the thread we removed from the queue. Note that
272 // we don't need to worry about any races here since the thread is
273 // guaranteed to be sleeping right now and we are the only one who
274 // can wake it up.
275 (*queue_tail).parker.unpark_lock().unpark();
276 break;
277 }
278 }
279 }
280