1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #if _WIN32 || __CYGWIN__
23 #define WIN32_LEAN_AND_MEAN 1 // lolz
24 #define WINVER 0x0600
25 #define _WIN32_WINNT 0x0600
26 #endif
27
28 #include "mutex.h"
29 #include "debug.h"
30
31 #if !_WIN32 && !__CYGWIN__
32 #include <time.h>
33 #include <errno.h>
34 #endif
35
36 #if KJ_USE_FUTEX
37 #include <unistd.h>
38 #include <sys/syscall.h>
39 #include <linux/futex.h>
40 #include <limits.h>
41
42 #ifndef SYS_futex
43 // Missing on Android/Bionic.
44 #define SYS_futex __NR_futex
45 #endif
46
47 #ifndef FUTEX_WAIT_PRIVATE
48 // Missing on Android/Bionic.
49 #define FUTEX_WAIT_PRIVATE FUTEX_WAIT
50 #define FUTEX_WAKE_PRIVATE FUTEX_WAKE
51 #endif
52
53 #elif _WIN32 || __CYGWIN__
54 #include <windows.h>
55 #endif
56
57 namespace kj {
58 namespace _ { // private
59
addWaiter(Waiter & waiter)60 inline void Mutex::addWaiter(Waiter& waiter) {
61 #ifdef KJ_DEBUG
62 assertLockedByCaller(EXCLUSIVE);
63 #endif
64 *waitersTail = waiter;
65 waitersTail = &waiter.next;
66 }
removeWaiter(Waiter & waiter)67 inline void Mutex::removeWaiter(Waiter& waiter) {
68 #ifdef KJ_DEBUG
69 assertLockedByCaller(EXCLUSIVE);
70 #endif
71 *waiter.prev = waiter.next;
72 KJ_IF_MAYBE(next, waiter.next) {
73 next->prev = waiter.prev;
74 } else {
75 KJ_DASSERT(waitersTail == &waiter.next);
76 waitersTail = waiter.prev;
77 }
78 }
79
checkPredicate(Waiter & waiter)80 bool Mutex::checkPredicate(Waiter& waiter) {
81 // Run the predicate from a thread other than the waiting thread, returning true if it's time to
82 // signal the waiting thread. This is not only when the predicate passes, but also when it
83 // throws, in which case we want to propagate the exception to the waiting thread.
84
85 if (waiter.exception != nullptr) return true; // don't run again after an exception
86
87 bool result = false;
88 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
89 result = waiter.predicate.check();
90 })) {
91 // Exception thown.
92 result = true;
93 waiter.exception = kj::heap(kj::mv(*exception));
94 };
95 return result;
96 }
97
98 #if !_WIN32 && !__CYGWIN__
99 namespace {
100
toTimePoint(struct timespec ts)101 TimePoint toTimePoint(struct timespec ts) {
102 return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS;
103 }
now()104 TimePoint now() {
105 struct timespec now;
106 KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
107 return toTimePoint(now);
108 }
toRelativeTimespec(Duration timeout)109 struct timespec toRelativeTimespec(Duration timeout) {
110 struct timespec ts;
111 ts.tv_sec = timeout / kj::SECONDS;
112 ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS;
113 return ts;
114 }
toAbsoluteTimespec(TimePoint time)115 struct timespec toAbsoluteTimespec(TimePoint time) {
116 return toRelativeTimespec(time - kj::origin<TimePoint>());
117 }
118
119 } // namespace
120 #endif
121
122 #if KJ_USE_FUTEX
123 // =======================================================================================
124 // Futex-based implementation (Linux-only)
125
Mutex()126 Mutex::Mutex(): futex(0) {}
~Mutex()127 Mutex::~Mutex() {
128 // This will crash anyway, might as well crash with a nice error message.
129 KJ_ASSERT(futex == 0, "Mutex destroyed while locked.") { break; }
130 }
131
lock(Exclusivity exclusivity)132 void Mutex::lock(Exclusivity exclusivity) {
133 switch (exclusivity) {
134 case EXCLUSIVE:
135 for (;;) {
136 uint state = 0;
137 if (KJ_LIKELY(__atomic_compare_exchange_n(&futex, &state, EXCLUSIVE_HELD, false,
138 __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
139 // Acquired.
140 break;
141 }
142
143 // The mutex is contended. Set the exclusive-requested bit and wait.
144 if ((state & EXCLUSIVE_REQUESTED) == 0) {
145 if (!__atomic_compare_exchange_n(&futex, &state, state | EXCLUSIVE_REQUESTED, false,
146 __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
147 // Oops, the state changed before we could set the request bit. Start over.
148 continue;
149 }
150
151 state |= EXCLUSIVE_REQUESTED;
152 }
153
154 syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
155 }
156 break;
157 case SHARED: {
158 uint state = __atomic_add_fetch(&futex, 1, __ATOMIC_ACQUIRE);
159 for (;;) {
160 if (KJ_LIKELY((state & EXCLUSIVE_HELD) == 0)) {
161 // Acquired.
162 break;
163 }
164
165 // The mutex is exclusively locked by another thread. Since we incremented the counter
166 // already, we just have to wait for it to be unlocked.
167 syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
168 state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
169 }
170 break;
171 }
172 }
173 }
174
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)175 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
176 switch (exclusivity) {
177 case EXCLUSIVE: {
178 KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
179
180 // First check if there are any conditional waiters. Note we only do this when unlocking an
181 // exclusive lock since under a shared lock the state couldn't have changed.
182 auto nextWaiter = waitersHead;
183 for (;;) {
184 KJ_IF_MAYBE(waiter, nextWaiter) {
185 nextWaiter = waiter->next;
186
187 if (waiter != waiterToSkip && checkPredicate(*waiter)) {
188 // This waiter's predicate now evaluates true, so wake it up.
189 if (waiter->hasTimeout) {
190 // In this case we need to be careful to make sure the target thread isn't already
191 // processing a timeout, so we need to do an atomic CAS rather than just a store.
192 uint expected = 0;
193 if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false,
194 __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
195 // Good, we set it to 1, transferring ownership of the mutex. Continue on below.
196 } else {
197 // Looks like the thread already timed out and set its own futex to 1. In that
198 // case it is going to try to lock the mutex itself, so we should NOT attempt an
199 // ownership transfer as this will deadlock.
200 //
201 // We have two options here: We can continue along the waiter list looking for
202 // another waiter that's ready to be signaled, or we could drop out of the list
203 // immediately since we know that another thread is already waiting for the lock
204 // and will re-evaluate the waiter queue itself when it is done. It feels cleaner
205 // to me to continue.
206 continue;
207 }
208 } else {
209 __atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
210 }
211 syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
212
213 // We transferred ownership of the lock to this waiter, so we're done now.
214 return;
215 }
216 } else {
217 // No more waiters.
218 break;
219 }
220 }
221
222 // Didn't wake any waiters, so wake normally.
223 uint oldState = __atomic_fetch_and(
224 &futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE);
225
226 if (KJ_UNLIKELY(oldState & ~EXCLUSIVE_HELD)) {
227 // Other threads are waiting. If there are any shared waiters, they now collectively hold
228 // the lock, and we must wake them up. If there are any exclusive waiters, we must wake
229 // them up even if readers are waiting so that at the very least they may re-establish the
230 // EXCLUSIVE_REQUESTED bit that we just removed.
231 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
232 }
233 break;
234 }
235
236 case SHARED: {
237 KJ_DASSERT(futex & SHARED_COUNT_MASK, "Unshared a mutex that wasn't shared.");
238 uint state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELEASE);
239
240 // The only case where anyone is waiting is if EXCLUSIVE_REQUESTED is set, and the only time
241 // it makes sense to wake up that waiter is if the shared count has reached zero.
242 if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) {
243 if (__atomic_compare_exchange_n(
244 &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
245 // Wake all exclusive waiters. We have to wake all of them because one of them will
246 // grab the lock while the others will re-establish the exclusive-requested bit.
247 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
248 }
249 }
250 break;
251 }
252 }
253 }
254
assertLockedByCaller(Exclusivity exclusivity)255 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
256 switch (exclusivity) {
257 case EXCLUSIVE:
258 KJ_ASSERT(futex & EXCLUSIVE_HELD,
259 "Tried to call getAlreadyLocked*() but lock is not held.");
260 break;
261 case SHARED:
262 KJ_ASSERT(futex & SHARED_COUNT_MASK,
263 "Tried to call getAlreadyLocked*() but lock is not held.");
264 break;
265 }
266 }
267
wait(Predicate & predicate,Maybe<Duration> timeout)268 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
269 // Add waiter to list.
270 Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
271 addWaiter(waiter);
272
273 // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
274 // currently.
275 bool currentlyLocked = true;
276 KJ_DEFER({
277 if (!currentlyLocked) lock(EXCLUSIVE);
278 removeWaiter(waiter);
279 });
280
281 if (!predicate.check()) {
282 unlock(EXCLUSIVE, &waiter);
283 currentlyLocked = false;
284
285 struct timespec ts;
286 struct timespec* tsp = nullptr;
287 KJ_IF_MAYBE(t, timeout) {
288 ts = toAbsoluteTimespec(now() + *t);
289 tsp = &ts;
290 }
291
292 // Wait for someone to set our futex to 1.
293 for (;;) {
294 // Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as
295 // FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on
296 // CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing
297 // us to recompute the time after every iteration.
298 KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex,
299 &waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, nullptr, FUTEX_BITSET_MATCH_ANY)) {
300 case EAGAIN:
301 // Indicates that the futex was already non-zero by the time the kernal looked at it.
302 // Not an error.
303 break;
304 case ETIMEDOUT: {
305 // Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not
306 // transferred to us from another thread. So, we need to lock it ourselves. But, another
307 // thread might be in the process of signaling us and transferring ownership. So, we
308 // first must atomically take control of our destiny.
309 KJ_ASSERT(timeout != nullptr);
310 uint expected = 0;
311 if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false,
312 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
313 // OK, we set our own futex to 1. That means no other thread will, and so we won't be
314 // receiving a mutex ownership transfer. We have to lock the mutex ourselves.
315 lock(EXCLUSIVE);
316 currentlyLocked = true;
317 return;
318 } else {
319 // Oh, someone else actually did signal us, apparently. Let's move on as if the futex
320 // call told us so.
321 break;
322 }
323 }
324 default:
325 KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
326 }
327
328 if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) {
329 // We received a lock ownership transfer from another thread.
330 currentlyLocked = true;
331
332 // The other thread checked the predicate before the transfer.
333 #ifdef KJ_DEBUG
334 assertLockedByCaller(EXCLUSIVE);
335 #endif
336
337 KJ_IF_MAYBE(exception, waiter.exception) {
338 // The predicate threw an exception, apparently. Propagate it.
339 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
340 // then want MutexGuarded::when() to skip calling the callback, but then what should it
341 // return, since it normally returns the callback's result? Or maybe people who disable
342 // exceptions just really should not write predicates that can throw.
343 kj::throwFatalException(kj::mv(**exception));
344 }
345
346 return;
347 }
348 }
349 }
350 }
351
induceSpuriousWakeupForTest()352 void Mutex::induceSpuriousWakeupForTest() {
353 auto nextWaiter = waitersHead;
354 for (;;) {
355 KJ_IF_MAYBE(waiter, nextWaiter) {
356 nextWaiter = waiter->next;
357 syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
358 } else {
359 // No more waiters.
360 break;
361 }
362 }
363 }
364
runOnce(Initializer & init)365 void Once::runOnce(Initializer& init) {
366 startOver:
367 uint state = UNINITIALIZED;
368 if (__atomic_compare_exchange_n(&futex, &state, INITIALIZING, false,
369 __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
370 // It's our job to initialize!
371 {
372 KJ_ON_SCOPE_FAILURE({
373 // An exception was thrown by the initializer. We have to revert.
374 if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) ==
375 INITIALIZING_WITH_WAITERS) {
376 // Someone was waiting for us to finish.
377 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
378 }
379 });
380
381 init.run();
382 }
383 if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) ==
384 INITIALIZING_WITH_WAITERS) {
385 // Someone was waiting for us to finish.
386 syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
387 }
388 } else {
389 for (;;) {
390 if (state == INITIALIZED) {
391 break;
392 } else if (state == INITIALIZING) {
393 // Initialization is taking place in another thread. Indicate that we're waiting.
394 if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true,
395 __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
396 // State changed, retry.
397 continue;
398 }
399 } else {
400 KJ_DASSERT(state == INITIALIZING_WITH_WAITERS);
401 }
402
403 // Wait for initialization.
404 syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS,
405 nullptr, nullptr, 0);
406 state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
407
408 if (state == UNINITIALIZED) {
409 // Oh hey, apparently whoever was trying to initialize gave up. Let's take it from the
410 // top.
411 goto startOver;
412 }
413 }
414 }
415 }
416
reset()417 void Once::reset() {
418 uint state = INITIALIZED;
419 if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED,
420 false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
421 KJ_FAIL_REQUIRE("reset() called while not initialized.");
422 }
423 }
424
425 #elif _WIN32 || __CYGWIN__
426 // =======================================================================================
427 // Win32 implementation
428
429 #define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock))
430 #define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce))
431 #define coercedCondvar(var) (*reinterpret_cast<CONDITION_VARIABLE*>(&var))
432
Mutex()433 Mutex::Mutex() {
434 static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?");
435 InitializeSRWLock(&coercedSrwLock);
436 }
~Mutex()437 Mutex::~Mutex() {}
438
lock(Exclusivity exclusivity)439 void Mutex::lock(Exclusivity exclusivity) {
440 switch (exclusivity) {
441 case EXCLUSIVE:
442 AcquireSRWLockExclusive(&coercedSrwLock);
443 break;
444 case SHARED:
445 AcquireSRWLockShared(&coercedSrwLock);
446 break;
447 }
448 }
449
wakeReadyWaiter(Waiter * waiterToSkip)450 void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) {
451 // Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than
452 // one waiter because only one waiter could get the lock anyway, and once it releases that lock
453 // it will awake the next waiter if necessary.
454
455 auto nextWaiter = waitersHead;
456 for (;;) {
457 KJ_IF_MAYBE(waiter, nextWaiter) {
458 nextWaiter = waiter->next;
459
460 if (waiter != waiterToSkip && checkPredicate(*waiter)) {
461 // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
462 // use Wake vs. WakeAll here since there's always only one thread waiting.
463 WakeConditionVariable(&coercedCondvar(waiter->condvar));
464
465 // We only need to wake one waiter. Note that unlike the futex-based implementation, we
466 // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
467 // that the condition is still true when that waiter finally awakes. However, if the
468 // condition is no longer true at that point, the waiter will re-check all other
469 // waiters' conditions and possibly wake up any other waiter who is now ready, hence we
470 // still only need to wake one waiter here.
471 return;
472 }
473 } else {
474 // No more waiters.
475 break;
476 }
477 }
478 }
479
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)480 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
481 switch (exclusivity) {
482 case EXCLUSIVE: {
483 KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
484
485 // Check if there are any conditional waiters. Note we only do this when unlocking an
486 // exclusive lock since under a shared lock the state couldn't have changed.
487 wakeReadyWaiter(waiterToSkip);
488 break;
489 }
490
491 case SHARED:
492 ReleaseSRWLockShared(&coercedSrwLock);
493 break;
494 }
495 }
496
assertLockedByCaller(Exclusivity exclusivity)497 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
498 // We could use TryAcquireSRWLock*() here like we do with the pthread version. However, as of
499 // this writing, my version of Wine (1.6.2) doesn't implement these functions and will abort if
500 // they are called. Since we were only going to use them as a hacky way to check if the lock is
501 // held for debug purposes anyway, we just don't bother.
502 }
503
wait(Predicate & predicate,Maybe<Duration> timeout)504 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
505 // Add waiter to list.
506 Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
507 static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
508 "CONDITION_VARIABLE is not a pointer?");
509 InitializeConditionVariable(&coercedCondvar(waiter.condvar));
510
511 addWaiter(waiter);
512 KJ_DEFER(removeWaiter(waiter));
513
514 DWORD sleepMs;
515
516 // Only initialized if `timeout` is non-null.
517 const MonotonicClock* clock = nullptr;
518 kj::Maybe<kj::TimePoint> endTime;
519
520 KJ_IF_MAYBE(t, timeout) {
521 // Windows sleeps are inaccurate -- they can be longer *or shorter* than the requested amount.
522 // For many use cases of our API, a too-short sleep would be unacceptable. Experimentally, it
523 // seems like sleeps can be up to half a millisecond short, so we'll add half a millisecond
524 // (and then we round up, below).
525 *t += 500 * kj::MICROSECONDS;
526
527 // Compute initial sleep time.
528 sleepMs = *t / kj::MILLISECONDS;
529 if (*t % kj::MILLISECONDS > 0 * kj::SECONDS) {
530 // We guarantee we won't wake up too early.
531 ++sleepMs;
532 }
533
534 clock = &systemPreciseMonotonicClock();
535 endTime = clock->now() + *t;
536 } else {
537 sleepMs = INFINITE;
538 }
539
540 while (!predicate.check()) {
541 // SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other
542 // waiters that are now ready.
543 wakeReadyWaiter(&waiter);
544
545 if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
546 // Normal result. Continue loop to check predicate.
547 } else {
548 DWORD error = GetLastError();
549 if (error == ERROR_TIMEOUT) {
550 // Windows may have woken us up too early, so don't return yet. Instead, proceed through the
551 // loop and rely on our sleep time recalculation to detect if we timed out.
552 } else {
553 KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
554 }
555 }
556
557 KJ_IF_MAYBE(exception, waiter.exception) {
558 // The predicate threw an exception, apparently. Propagate it.
559 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
560 // then want MutexGuarded::when() to skip calling the callback, but then what should it
561 // return, since it normally returns the callback's result? Or maybe people who disable
562 // exceptions just really should not write predicates that can throw.
563 kj::throwFatalException(kj::mv(**exception));
564 }
565
566 // Recompute sleep time.
567 KJ_IF_MAYBE(e, endTime) {
568 auto now = clock->now();
569
570 if (*e > now) {
571 auto sleepTime = *e - now;
572 sleepMs = sleepTime / kj::MILLISECONDS;
573 if (sleepTime % kj::MILLISECONDS > 0 * kj::SECONDS) {
574 // We guarantee we won't wake up too early.
575 ++sleepMs;
576 }
577 } else {
578 // Oops, already timed out.
579 return;
580 }
581 }
582 }
583 }
584
induceSpuriousWakeupForTest()585 void Mutex::induceSpuriousWakeupForTest() {
586 auto nextWaiter = waitersHead;
587 for (;;) {
588 KJ_IF_MAYBE(waiter, nextWaiter) {
589 nextWaiter = waiter->next;
590 WakeConditionVariable(&coercedCondvar(waiter->condvar));
591 } else {
592 // No more waiters.
593 break;
594 }
595 }
596 }
597
nullInitializer(PINIT_ONCE initOnce,PVOID parameter,PVOID * context)598 static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) {
599 return true;
600 }
601
Once(bool startInitialized)602 Once::Once(bool startInitialized) {
603 static_assert(sizeof(INIT_ONCE) == sizeof(initOnce), "INIT_ONCE is not a pointer?");
604 InitOnceInitialize(&coercedInitOnce);
605 if (startInitialized) {
606 InitOnceExecuteOnce(&coercedInitOnce, &nullInitializer, nullptr, nullptr);
607 }
608 }
~Once()609 Once::~Once() {}
610
runOnce(Initializer & init)611 void Once::runOnce(Initializer& init) {
612 BOOL needInit;
613 while (!InitOnceBeginInitialize(&coercedInitOnce, 0, &needInit, nullptr)) {
614 // Init was occurring in another thread, but then failed with an exception. Retry.
615 }
616
617 if (needInit) {
618 {
619 KJ_ON_SCOPE_FAILURE(InitOnceComplete(&coercedInitOnce, INIT_ONCE_INIT_FAILED, nullptr));
620 init.run();
621 }
622
623 KJ_ASSERT(InitOnceComplete(&coercedInitOnce, 0, nullptr));
624 }
625 }
626
isInitialized()627 bool Once::isInitialized() noexcept {
628 BOOL junk;
629 return InitOnceBeginInitialize(&coercedInitOnce, INIT_ONCE_CHECK_ONLY, &junk, nullptr);
630 }
631
reset()632 void Once::reset() {
633 InitOnceInitialize(&coercedInitOnce);
634 }
635
636 #else
637 // =======================================================================================
638 // Generic pthreads-based implementation
639
640 #define KJ_PTHREAD_CALL(code) \
641 { \
642 int pthreadError = code; \
643 if (pthreadError != 0) { \
644 KJ_FAIL_SYSCALL(#code, pthreadError); \
645 } \
646 }
647
648 #define KJ_PTHREAD_CLEANUP(code) \
649 { \
650 int pthreadError = code; \
651 if (pthreadError != 0) { \
652 KJ_LOG(ERROR, #code, strerror(pthreadError)); \
653 } \
654 }
655
Mutex()656 Mutex::Mutex(): mutex(PTHREAD_RWLOCK_INITIALIZER) {}
~Mutex()657 Mutex::~Mutex() {
658 KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
659 }
660
lock(Exclusivity exclusivity)661 void Mutex::lock(Exclusivity exclusivity) {
662 switch (exclusivity) {
663 case EXCLUSIVE:
664 KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
665 break;
666 case SHARED:
667 KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
668 break;
669 }
670 }
671
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)672 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
673 KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
674
675 if (exclusivity == EXCLUSIVE) {
676 // Check if there are any conditional waiters. Note we only do this when unlocking an
677 // exclusive lock since under a shared lock the state couldn't have changed.
678 auto nextWaiter = waitersHead;
679 for (;;) {
680 KJ_IF_MAYBE(waiter, nextWaiter) {
681 nextWaiter = waiter->next;
682
683 if (waiter != waiterToSkip && checkPredicate(*waiter)) {
684 // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
685 // use _signal() vs. _broadcast() here since there's always only one thread waiting.
686 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
687 KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
688 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
689
690 // We only need to wake one waiter. Note that unlike the futex-based implementation, we
691 // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
692 // that the condition is still true when that waiter finally awakes. However, if the
693 // condition is no longer true at that point, the waiter will re-check all other waiters'
694 // conditions and possibly wake up any other waiter who is now ready, hence we still only
695 // need to wake one waiter here.
696 break;
697 }
698 } else {
699 // No more waiters.
700 break;
701 }
702 }
703 }
704 }
705
assertLockedByCaller(Exclusivity exclusivity)706 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
707 switch (exclusivity) {
708 case EXCLUSIVE:
709 // A read lock should fail if the mutex is already held for writing.
710 if (pthread_rwlock_tryrdlock(&mutex) == 0) {
711 pthread_rwlock_unlock(&mutex);
712 KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
713 }
714 break;
715 case SHARED:
716 // A write lock should fail if the mutex is already held for reading or writing. We don't
717 // have any way to prove that the lock is held only for reading.
718 if (pthread_rwlock_trywrlock(&mutex) == 0) {
719 pthread_rwlock_unlock(&mutex);
720 KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
721 }
722 break;
723 }
724 }
725
wait(Predicate & predicate,Maybe<Duration> timeout)726 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
727 // Add waiter to list.
728 Waiter waiter {
729 nullptr, waitersTail, predicate, nullptr,
730 PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
731 };
732 addWaiter(waiter);
733
734 // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
735 // currently.
736 bool currentlyLocked = true;
737 KJ_DEFER({
738 if (!currentlyLocked) lock(EXCLUSIVE);
739 removeWaiter(waiter);
740
741 // Destroy pthread objects.
742 KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
743 KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar));
744 });
745
746 #if !__APPLE__
747 if (timeout != nullptr) {
748 // Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic
749 // clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's
750 // a bizarre pthread_cond_timedwait_relative_np() method we can use instead...)
751 pthread_condattr_t attr;
752 KJ_PTHREAD_CALL(pthread_condattr_init(&attr));
753 KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
754 pthread_cond_init(&waiter.condvar, &attr);
755 KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr));
756 }
757 #endif
758
759 Maybe<struct timespec> endTime = timeout.map([](Duration d) {
760 return toAbsoluteTimespec(now() + d);
761 });
762
763 while (!predicate.check()) {
764 // pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic
765 // mutex before we unlock the real mutex, and the signaling thread also needs to lock this
766 // mutex, in order to ensure that this thread is actually waiting on the condvar before it is
767 // signaled.
768 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
769
770 // OK, now we can unlock the main mutex.
771 unlock(EXCLUSIVE, &waiter);
772 currentlyLocked = false;
773
774 bool timedOut = false;
775
776 // Wait for someone to signal the condvar.
777 KJ_IF_MAYBE(t, endTime) {
778 #if __APPLE__
779 // On macOS, the absolute timeout can only be specified in wall time, not monotonic time,
780 // which means modifying the system clock will break the wait. However, macOS happens to
781 // provide an alternative relative-time wait function, so I guess we'll use that. It does
782 // require recomputing the time every iteration...
783 struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS));
784 int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
785 #else
786 int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
787 #endif
788 if (error != 0) {
789 if (error == ETIMEDOUT) {
790 timedOut = true;
791 } else {
792 KJ_FAIL_SYSCALL("pthread_cond_timedwait", error);
793 }
794 }
795 } else {
796 KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
797 }
798
799 // We have to be very careful about lock ordering here. We need to unlock stupidMutex before
800 // re-locking the main mutex, because another thread may have a lock on the main mutex already
801 // and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar
802 // right after we unlock stupidMutex but before we re-lock the main mutex. That is fine,
803 // because we've already been signaled.
804 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
805
806 lock(EXCLUSIVE);
807 currentlyLocked = true;
808
809 KJ_IF_MAYBE(exception, waiter.exception) {
810 // The predicate threw an exception, apparently. Propagate it.
811 // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
812 // then want MutexGuarded::when() to skip calling the callback, but then what should it
813 // return, since it normally returns the callback's result? Or maybe people who disable
814 // exceptions just really should not write predicates that can throw.
815 kj::throwFatalException(kj::mv(**exception));
816 }
817
818 if (timedOut) {
819 return;
820 }
821 }
822 }
823
induceSpuriousWakeupForTest()824 void Mutex::induceSpuriousWakeupForTest() {
825 auto nextWaiter = waitersHead;
826 for (;;) {
827 KJ_IF_MAYBE(waiter, nextWaiter) {
828 nextWaiter = waiter->next;
829 KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
830 KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
831 KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
832 } else {
833 // No more waiters.
834 break;
835 }
836 }
837 }
838
Once(bool startInitialized)839 Once::Once(bool startInitialized)
840 : state(startInitialized ? INITIALIZED : UNINITIALIZED),
841 mutex(PTHREAD_MUTEX_INITIALIZER) {}
~Once()842 Once::~Once() {
843 KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex));
844 }
845
runOnce(Initializer & init)846 void Once::runOnce(Initializer& init) {
847 KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex));
848 KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex)));
849
850 if (state != UNINITIALIZED) {
851 return;
852 }
853
854 init.run();
855
856 __atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE);
857 }
858
reset()859 void Once::reset() {
860 State oldState = INITIALIZED;
861 if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED,
862 false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
863 KJ_FAIL_REQUIRE("reset() called while not initialized.");
864 }
865 }
866
867 #endif
868
869 } // namespace _ (private)
870 } // namespace kj
871