1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use crate::spinwait::SpinWait;
9 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
10 use core::{
11 cell::Cell,
12 mem, ptr,
13 sync::atomic::{fence, AtomicUsize, Ordering},
14 };
15
16 struct ThreadData {
17 parker: ThreadParker,
18
19 // Linked list of threads in the queue. The queue is split into two parts:
20 // the processed part and the unprocessed part. When new nodes are added to
21 // the list, they only have the next pointer set, and queue_tail is null.
22 //
23 // Nodes are processed with the queue lock held, which consists of setting
24 // the prev pointer for each node and setting the queue_tail pointer on the
25 // first processed node of the list.
26 //
27 // This setup allows nodes to be added to the queue without a lock, while
28 // still allowing O(1) removal of nodes from the processed part of the list.
29 // The only cost is the O(n) processing, but this only needs to be done
30 // once for each node, and therefore isn't too expensive.
31 queue_tail: Cell<*const ThreadData>,
32 prev: Cell<*const ThreadData>,
33 next: Cell<*const ThreadData>,
34 }
35
36 impl ThreadData {
37 #[inline]
new() -> ThreadData38 fn new() -> ThreadData {
39 assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
40 ThreadData {
41 parker: ThreadParker::new(),
42 queue_tail: Cell::new(ptr::null()),
43 prev: Cell::new(ptr::null()),
44 next: Cell::new(ptr::null()),
45 }
46 }
47 }
48
49 // Invokes the given closure with a reference to the current thread `ThreadData`.
50 #[inline]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T51 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
52 let mut thread_data_ptr = ptr::null();
53 // If ThreadData is expensive to construct, then we want to use a cached
54 // version in thread-local storage if possible.
55 if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
56 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
57 if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
58 thread_data_ptr = tls_thread_data;
59 }
60 }
61 // Otherwise just create a ThreadData on the stack
62 let mut thread_data_storage = None;
63 if thread_data_ptr.is_null() {
64 thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
65 }
66
67 f(unsafe { &*thread_data_ptr })
68 }
69
70 const LOCKED_BIT: usize = 1;
71 const QUEUE_LOCKED_BIT: usize = 2;
72 const QUEUE_MASK: usize = !3;
73
74 // Word-sized lock that is used to implement the parking_lot API. Since this
75 // can't use parking_lot, it instead manages its own queue of waiting threads.
76 pub struct WordLock {
77 state: AtomicUsize,
78 }
79
80 impl WordLock {
81 pub const INIT: WordLock = WordLock {
82 state: AtomicUsize::new(0),
83 };
84
85 #[inline]
lock(&self)86 pub fn lock(&self) {
87 if self
88 .state
89 .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
90 .is_ok()
91 {
92 return;
93 }
94 self.lock_slow();
95 }
96
97 /// Must not be called on an already unlocked `WordLock`!
98 #[inline]
unlock(&self)99 pub unsafe fn unlock(&self) {
100 let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
101 if state.is_queue_locked() || state.queue_head().is_null() {
102 return;
103 }
104 self.unlock_slow();
105 }
106
107 #[cold]
lock_slow(&self)108 fn lock_slow(&self) {
109 let mut spinwait = SpinWait::new();
110 let mut state = self.state.load(Ordering::Relaxed);
111 loop {
112 // Grab the lock if it isn't locked, even if there is a queue on it
113 if !state.is_locked() {
114 match self.state.compare_exchange_weak(
115 state,
116 state | LOCKED_BIT,
117 Ordering::Acquire,
118 Ordering::Relaxed,
119 ) {
120 Ok(_) => return,
121 Err(x) => state = x,
122 }
123 continue;
124 }
125
126 // If there is no queue, try spinning a few times
127 if state.queue_head().is_null() && spinwait.spin() {
128 state = self.state.load(Ordering::Relaxed);
129 continue;
130 }
131
132 // Get our thread data and prepare it for parking
133 state = with_thread_data(|thread_data| {
134 // The pthread implementation is still unsafe, so we need to surround `prepare_park`
135 // with `unsafe {}`.
136 #[allow(unused_unsafe)]
137 unsafe {
138 thread_data.parker.prepare_park();
139 }
140
141 // Add our thread to the front of the queue
142 let queue_head = state.queue_head();
143 if queue_head.is_null() {
144 thread_data.queue_tail.set(thread_data);
145 thread_data.prev.set(ptr::null());
146 } else {
147 thread_data.queue_tail.set(ptr::null());
148 thread_data.prev.set(ptr::null());
149 thread_data.next.set(queue_head);
150 }
151 if let Err(x) = self.state.compare_exchange_weak(
152 state,
153 state.with_queue_head(thread_data),
154 Ordering::Release,
155 Ordering::Relaxed,
156 ) {
157 return x;
158 }
159
160 // Sleep until we are woken up by an unlock
161 // Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
162 #[allow(unused_unsafe)]
163 unsafe {
164 thread_data.parker.park();
165 }
166
167 // Loop back and try locking again
168 spinwait.reset();
169 self.state.load(Ordering::Relaxed)
170 });
171 }
172 }
173
174 #[cold]
unlock_slow(&self)175 fn unlock_slow(&self) {
176 let mut state = self.state.load(Ordering::Relaxed);
177 loop {
178 // We just unlocked the WordLock. Just check if there is a thread
179 // to wake up. If the queue is locked then another thread is already
180 // taking care of waking up a thread.
181 if state.is_queue_locked() || state.queue_head().is_null() {
182 return;
183 }
184
185 // Try to grab the queue lock
186 match self.state.compare_exchange_weak(
187 state,
188 state | QUEUE_LOCKED_BIT,
189 Ordering::Acquire,
190 Ordering::Relaxed,
191 ) {
192 Ok(_) => break,
193 Err(x) => state = x,
194 }
195 }
196
197 // Now we have the queue lock and the queue is non-empty
198 'outer: loop {
199 // First, we need to fill in the prev pointers for any newly added
200 // threads. We do this until we reach a node that we previously
201 // processed, which has a non-null queue_tail pointer.
202 let queue_head = state.queue_head();
203 let mut queue_tail;
204 let mut current = queue_head;
205 loop {
206 queue_tail = unsafe { (*current).queue_tail.get() };
207 if !queue_tail.is_null() {
208 break;
209 }
210 unsafe {
211 let next = (*current).next.get();
212 (*next).prev.set(current);
213 current = next;
214 }
215 }
216
217 // Set queue_tail on the queue head to indicate that the whole list
218 // has prev pointers set correctly.
219 unsafe {
220 (*queue_head).queue_tail.set(queue_tail);
221 }
222
223 // If the WordLock is locked, then there is no point waking up a
224 // thread now. Instead we let the next unlocker take care of waking
225 // up a thread.
226 if state.is_locked() {
227 match self.state.compare_exchange_weak(
228 state,
229 state & !QUEUE_LOCKED_BIT,
230 Ordering::Release,
231 Ordering::Relaxed,
232 ) {
233 Ok(_) => return,
234 Err(x) => state = x,
235 }
236
237 // Need an acquire fence before reading the new queue
238 fence(Ordering::Acquire);
239 continue;
240 }
241
242 // Remove the last thread from the queue and unlock the queue
243 let new_tail = unsafe { (*queue_tail).prev.get() };
244 if new_tail.is_null() {
245 loop {
246 match self.state.compare_exchange_weak(
247 state,
248 state & LOCKED_BIT,
249 Ordering::Release,
250 Ordering::Relaxed,
251 ) {
252 Ok(_) => break,
253 Err(x) => state = x,
254 }
255
256 // If the compare_exchange failed because a new thread was
257 // added to the queue then we need to re-scan the queue to
258 // find the previous element.
259 if state.queue_head().is_null() {
260 continue;
261 } else {
262 // Need an acquire fence before reading the new queue
263 fence(Ordering::Acquire);
264 continue 'outer;
265 }
266 }
267 } else {
268 unsafe {
269 (*queue_head).queue_tail.set(new_tail);
270 }
271 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
272 }
273
274 // Finally, wake up the thread we removed from the queue. Note that
275 // we don't need to worry about any races here since the thread is
276 // guaranteed to be sleeping right now and we are the only one who
277 // can wake it up.
278 unsafe {
279 (*queue_tail).parker.unpark_lock().unpark();
280 }
281 break;
282 }
283 }
284 }
285
286 trait LockState {
is_locked(self) -> bool287 fn is_locked(self) -> bool;
is_queue_locked(self) -> bool288 fn is_queue_locked(self) -> bool;
queue_head(self) -> *const ThreadData289 fn queue_head(self) -> *const ThreadData;
with_queue_head(self, thread_data: *const ThreadData) -> Self290 fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
291 }
292
293 impl LockState for usize {
294 #[inline]
is_locked(self) -> bool295 fn is_locked(self) -> bool {
296 self & LOCKED_BIT != 0
297 }
298
299 #[inline]
is_queue_locked(self) -> bool300 fn is_queue_locked(self) -> bool {
301 self & QUEUE_LOCKED_BIT != 0
302 }
303
304 #[inline]
queue_head(self) -> *const ThreadData305 fn queue_head(self) -> *const ThreadData {
306 (self & QUEUE_MASK) as *const ThreadData
307 }
308
309 #[inline]
with_queue_head(self, thread_data: *const ThreadData) -> Self310 fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
311 (self & !QUEUE_MASK) | thread_data as *const _ as usize
312 }
313 }
314