1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use std::sync::atomic::{fence, AtomicUsize, Ordering};
9 use std::ptr;
10 use std::mem;
11 use std::cell::Cell;
12 use std::thread::LocalKey;
13 #[cfg(not(feature = "nightly"))]
14 use std::panic;
15 use spinwait::SpinWait;
16 use thread_parker::ThreadParker;
17
18 struct ThreadData {
19 parker: ThreadParker,
20
21 // Linked list of threads in the queue. The queue is split into two parts:
22 // the processed part and the unprocessed part. When new nodes are added to
23 // the list, they only have the next pointer set, and queue_tail is null.
24 //
25 // Nodes are processed with the queue lock held, which consists of setting
26 // the prev pointer for each node and setting the queue_tail pointer on the
27 // first processed node of the list.
28 //
29 // This setup allows nodes to be added to the queue without a lock, while
30 // still allowing O(1) removal of nodes from the processed part of the list.
31 // The only cost is the O(n) processing, but this only needs to be done
32 // once for each node, and therefore isn't too expensive.
33 queue_tail: Cell<*const ThreadData>,
34 prev: Cell<*const ThreadData>,
35 next: Cell<*const ThreadData>,
36 }
37
38 impl ThreadData {
new() -> ThreadData39 fn new() -> ThreadData {
40 ThreadData {
41 parker: ThreadParker::new(),
42 queue_tail: Cell::new(ptr::null()),
43 prev: Cell::new(ptr::null()),
44 next: Cell::new(ptr::null()),
45 }
46 }
47 }
48
49 // Returns a ThreadData structure for the current thread
get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData50 unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
51 // Try to read from thread-local storage, but return None if the TLS has
52 // already been destroyed.
53 #[cfg(feature = "nightly")]
54 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
55 key.try_with(|x| x as *const ThreadData).ok()
56 }
57 #[cfg(not(feature = "nightly"))]
58 fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> {
59 panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
60 }
61
62 // If ThreadData is expensive to construct, then we want to use a cached
63 // version in thread-local storage if possible.
64 if !cfg!(windows) && !cfg!(all(feature = "nightly", target_os = "linux")) {
65 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
66 if let Some(tls) = try_get_tls(&THREAD_DATA) {
67 return &*tls;
68 }
69 }
70
71 // Otherwise just create a ThreadData on the stack
72 *local = Some(ThreadData::new());
73 local.as_ref().unwrap()
74 }
75
76 const LOCKED_BIT: usize = 1;
77 const QUEUE_LOCKED_BIT: usize = 2;
78 const QUEUE_MASK: usize = !3;
79
80 // Word-sized lock that is used to implement the parking_lot API. Since this
81 // can't use parking_lot, it instead manages its own queue of waiting threads.
82 pub struct WordLock {
83 state: AtomicUsize,
84 }
85
86 impl WordLock {
87 #[inline]
new() -> WordLock88 pub fn new() -> WordLock {
89 WordLock {
90 state: AtomicUsize::new(0),
91 }
92 }
93
94 #[inline]
lock(&self)95 pub unsafe fn lock(&self) {
96 if self.state
97 .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
98 .is_ok()
99 {
100 return;
101 }
102 self.lock_slow();
103 }
104
105 #[inline]
unlock(&self)106 pub unsafe fn unlock(&self) {
107 let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
108 if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
109 return;
110 }
111 self.unlock_slow();
112 }
113
114 #[cold]
115 #[inline(never)]
lock_slow(&self)116 unsafe fn lock_slow(&self) {
117 let mut spinwait = SpinWait::new();
118 let mut state = self.state.load(Ordering::Relaxed);
119 loop {
120 // Grab the lock if it isn't locked, even if there is a queue on it
121 if state & LOCKED_BIT == 0 {
122 match self.state.compare_exchange_weak(
123 state,
124 state | LOCKED_BIT,
125 Ordering::Acquire,
126 Ordering::Relaxed,
127 ) {
128 Ok(_) => return,
129 Err(x) => state = x,
130 }
131 continue;
132 }
133
134 // If there is no queue, try spinning a few times
135 if state & QUEUE_MASK == 0 && spinwait.spin() {
136 state = self.state.load(Ordering::Relaxed);
137 continue;
138 }
139
140 // Get our thread data and prepare it for parking
141 let mut thread_data = None;
142 let thread_data = get_thread_data(&mut thread_data);
143 assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
144 thread_data.parker.prepare_park();
145
146 // Add our thread to the front of the queue
147 let queue_head = (state & QUEUE_MASK) as *const ThreadData;
148 if queue_head.is_null() {
149 thread_data.queue_tail.set(thread_data);
150 thread_data.prev.set(ptr::null());
151 } else {
152 thread_data.queue_tail.set(ptr::null());
153 thread_data.prev.set(ptr::null());
154 thread_data.next.set(queue_head);
155 }
156 if let Err(x) = self.state.compare_exchange_weak(
157 state,
158 (state & !QUEUE_MASK) | thread_data as *const _ as usize,
159 Ordering::Release,
160 Ordering::Relaxed,
161 ) {
162 state = x;
163 continue;
164 }
165
166 // Sleep until we are woken up by an unlock
167 thread_data.parker.park();
168
169 // Loop back and try locking again
170 spinwait.reset();
171 self.state.load(Ordering::Relaxed);
172 }
173 }
174
175 #[cold]
176 #[inline(never)]
unlock_slow(&self)177 unsafe fn unlock_slow(&self) {
178 let mut state = self.state.load(Ordering::Relaxed);
179 loop {
180 // We just unlocked the WordLock. Just check if there is a thread
181 // to wake up. If the queue is locked then another thread is already
182 // taking care of waking up a thread.
183 if state & QUEUE_LOCKED_BIT != 0 || state & QUEUE_MASK == 0 {
184 return;
185 }
186
187 // Try to grab the queue lock
188 match self.state.compare_exchange_weak(
189 state,
190 state | QUEUE_LOCKED_BIT,
191 Ordering::Acquire,
192 Ordering::Relaxed,
193 ) {
194 Ok(_) => break,
195 Err(x) => state = x,
196 }
197 }
198
199 // Now we have the queue lock and the queue is non-empty
200 'outer: loop {
201 // First, we need to fill in the prev pointers for any newly added
202 // threads. We do this until we reach a node that we previously
203 // processed, which has a non-null queue_tail pointer.
204 let queue_head = (state & QUEUE_MASK) as *const ThreadData;
205 let mut queue_tail;
206 let mut current = queue_head;
207 loop {
208 queue_tail = (*current).queue_tail.get();
209 if !queue_tail.is_null() {
210 break;
211 }
212 let next = (*current).next.get();
213 (*next).prev.set(current);
214 current = next;
215 }
216
217 // Set queue_tail on the queue head to indicate that the whole list
218 // has prev pointers set correctly.
219 (*queue_head).queue_tail.set(queue_tail);
220
221 // If the WordLock is locked, then there is no point waking up a
222 // thread now. Instead we let the next unlocker take care of waking
223 // up a thread.
224 if state & LOCKED_BIT != 0 {
225 match self.state.compare_exchange_weak(
226 state,
227 state & !QUEUE_LOCKED_BIT,
228 Ordering::Release,
229 Ordering::Relaxed,
230 ) {
231 Ok(_) => return,
232 Err(x) => state = x,
233 }
234
235 // Need an acquire fence before reading the new queue
236 fence(Ordering::Acquire);
237 continue;
238 }
239
240 // Remove the last thread from the queue and unlock the queue
241 let new_tail = (*queue_tail).prev.get();
242 if new_tail.is_null() {
243 loop {
244 match self.state.compare_exchange_weak(
245 state,
246 state & LOCKED_BIT,
247 Ordering::Release,
248 Ordering::Relaxed,
249 ) {
250 Ok(_) => break,
251 Err(x) => state = x,
252 }
253
254 // If the compare_exchange failed because a new thread was
255 // added to the queue then we need to re-scan the queue to
256 // find the previous element.
257 if state & QUEUE_MASK == 0 {
258 continue;
259 } else {
260 // Need an acquire fence before reading the new queue
261 fence(Ordering::Acquire);
262 continue 'outer;
263 }
264 }
265 } else {
266 (*queue_head).queue_tail.set(new_tail);
267 self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
268 }
269
270 // Finally, wake up the thread we removed from the queue. Note that
271 // we don't need to worry about any races here since the thread is
272 // guaranteed to be sleeping right now and we are the only one who
273 // can wake it up.
274 (*queue_tail).parker.unpark_lock().unpark();
275 break;
276 }
277 }
278 }
279