1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 #if _WIN32 || __CYGWIN__
23 #define WIN32_LEAN_AND_MEAN 1  // lolz
24 #define WINVER 0x0600
25 #define _WIN32_WINNT 0x0600
26 #endif
27 
28 #include "mutex.h"
29 #include "debug.h"
30 
31 #if !_WIN32 && !__CYGWIN__
32 #include <time.h>
33 #include <errno.h>
34 #endif
35 
36 #if KJ_USE_FUTEX
37 #include <unistd.h>
38 #include <sys/syscall.h>
39 #include <linux/futex.h>
40 #include <limits.h>
41 
42 #ifndef SYS_futex
43 // Missing on Android/Bionic.
44 #define SYS_futex __NR_futex
45 #endif
46 
47 #ifndef FUTEX_WAIT_PRIVATE
48 // Missing on Android/Bionic.
49 #define FUTEX_WAIT_PRIVATE FUTEX_WAIT
50 #define FUTEX_WAKE_PRIVATE FUTEX_WAKE
51 #endif
52 
53 #elif _WIN32 || __CYGWIN__
54 #include <windows.h>
55 #endif
56 
57 namespace kj {
58 namespace _ {  // private
59 
addWaiter(Waiter & waiter)60 inline void Mutex::addWaiter(Waiter& waiter) {
61 #ifdef KJ_DEBUG
62   assertLockedByCaller(EXCLUSIVE);
63 #endif
64   *waitersTail = waiter;
65   waitersTail = &waiter.next;
66 }
removeWaiter(Waiter & waiter)67 inline void Mutex::removeWaiter(Waiter& waiter) {
68 #ifdef KJ_DEBUG
69   assertLockedByCaller(EXCLUSIVE);
70 #endif
71   *waiter.prev = waiter.next;
72   KJ_IF_MAYBE(next, waiter.next) {
73     next->prev = waiter.prev;
74   } else {
75     KJ_DASSERT(waitersTail == &waiter.next);
76     waitersTail = waiter.prev;
77   }
78 }
79 
checkPredicate(Waiter & waiter)80 bool Mutex::checkPredicate(Waiter& waiter) {
81   // Run the predicate from a thread other than the waiting thread, returning true if it's time to
82   // signal the waiting thread. This is not only when the predicate passes, but also when it
83   // throws, in which case we want to propagate the exception to the waiting thread.
84 
85   if (waiter.exception != nullptr) return true;  // don't run again after an exception
86 
87   bool result = false;
88   KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
89     result = waiter.predicate.check();
90   })) {
91     // Exception thown.
92     result = true;
93     waiter.exception = kj::heap(kj::mv(*exception));
94   };
95   return result;
96 }
97 
98 #if !_WIN32 && !__CYGWIN__
99 namespace {
100 
toTimePoint(struct timespec ts)101 TimePoint toTimePoint(struct timespec ts) {
102   return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS;
103 }
now()104 TimePoint now() {
105   struct timespec now;
106   KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
107   return toTimePoint(now);
108 }
toRelativeTimespec(Duration timeout)109 struct timespec toRelativeTimespec(Duration timeout) {
110   struct timespec ts;
111   ts.tv_sec = timeout / kj::SECONDS;
112   ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS;
113   return ts;
114 }
toAbsoluteTimespec(TimePoint time)115 struct timespec toAbsoluteTimespec(TimePoint time) {
116   return toRelativeTimespec(time - kj::origin<TimePoint>());
117 }
118 
119 }  // namespace
120 #endif
121 
122 #if KJ_USE_FUTEX
123 // =======================================================================================
124 // Futex-based implementation (Linux-only)
125 
Mutex()126 Mutex::Mutex(): futex(0) {}
~Mutex()127 Mutex::~Mutex() {
128   // This will crash anyway, might as well crash with a nice error message.
129   KJ_ASSERT(futex == 0, "Mutex destroyed while locked.") { break; }
130 }
131 
lock(Exclusivity exclusivity)132 void Mutex::lock(Exclusivity exclusivity) {
133   switch (exclusivity) {
134     case EXCLUSIVE:
135       for (;;) {
136         uint state = 0;
137         if (KJ_LIKELY(__atomic_compare_exchange_n(&futex, &state, EXCLUSIVE_HELD, false,
138                                                   __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
139           // Acquired.
140           break;
141         }
142 
143         // The mutex is contended.  Set the exclusive-requested bit and wait.
144         if ((state & EXCLUSIVE_REQUESTED) == 0) {
145           if (!__atomic_compare_exchange_n(&futex, &state, state | EXCLUSIVE_REQUESTED, false,
146                                            __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
147             // Oops, the state changed before we could set the request bit.  Start over.
148             continue;
149           }
150 
151           state |= EXCLUSIVE_REQUESTED;
152         }
153 
154         syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
155       }
156       break;
157     case SHARED: {
158       uint state = __atomic_add_fetch(&futex, 1, __ATOMIC_ACQUIRE);
159       for (;;) {
160         if (KJ_LIKELY((state & EXCLUSIVE_HELD) == 0)) {
161           // Acquired.
162           break;
163         }
164 
165         // The mutex is exclusively locked by another thread.  Since we incremented the counter
166         // already, we just have to wait for it to be unlocked.
167         syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
168         state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
169       }
170       break;
171     }
172   }
173 }
174 
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)175 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
176   switch (exclusivity) {
177     case EXCLUSIVE: {
178       KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
179 
180       // First check if there are any conditional waiters. Note we only do this when unlocking an
181       // exclusive lock since under a shared lock the state couldn't have changed.
182       auto nextWaiter = waitersHead;
183       for (;;) {
184         KJ_IF_MAYBE(waiter, nextWaiter) {
185           nextWaiter = waiter->next;
186 
187           if (waiter != waiterToSkip && checkPredicate(*waiter)) {
188             // This waiter's predicate now evaluates true, so wake it up.
189             if (waiter->hasTimeout) {
190               // In this case we need to be careful to make sure the target thread isn't already
191               // processing a timeout, so we need to do an atomic CAS rather than just a store.
192               uint expected = 0;
193               if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false,
194                                               __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
195                 // Good, we set it to 1, transferring ownership of the mutex. Continue on below.
196               } else {
197                 // Looks like the thread already timed out and set its own futex to 1. In that
198                 // case it is going to try to lock the mutex itself, so we should NOT attempt an
199                 // ownership transfer as this will deadlock.
200                 //
201                 // We have two options here: We can continue along the waiter list looking for
202                 // another waiter that's ready to be signaled, or we could drop out of the list
203                 // immediately since we know that another thread is already waiting for the lock
204                 // and will re-evaluate the waiter queue itself when it is done. It feels cleaner
205                 // to me to continue.
206                 continue;
207               }
208             } else {
209               __atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
210             }
211             syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
212 
213             // We transferred ownership of the lock to this waiter, so we're done now.
214             return;
215           }
216         } else {
217           // No more waiters.
218           break;
219         }
220       }
221 
222       // Didn't wake any waiters, so wake normally.
223       uint oldState = __atomic_fetch_and(
224           &futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE);
225 
226       if (KJ_UNLIKELY(oldState & ~EXCLUSIVE_HELD)) {
227         // Other threads are waiting.  If there are any shared waiters, they now collectively hold
228         // the lock, and we must wake them up.  If there are any exclusive waiters, we must wake
229         // them up even if readers are waiting so that at the very least they may re-establish the
230         // EXCLUSIVE_REQUESTED bit that we just removed.
231         syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
232       }
233       break;
234     }
235 
236     case SHARED: {
237       KJ_DASSERT(futex & SHARED_COUNT_MASK, "Unshared a mutex that wasn't shared.");
238       uint state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELEASE);
239 
240       // The only case where anyone is waiting is if EXCLUSIVE_REQUESTED is set, and the only time
241       // it makes sense to wake up that waiter is if the shared count has reached zero.
242       if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) {
243         if (__atomic_compare_exchange_n(
244             &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
245           // Wake all exclusive waiters.  We have to wake all of them because one of them will
246           // grab the lock while the others will re-establish the exclusive-requested bit.
247           syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
248         }
249       }
250       break;
251     }
252   }
253 }
254 
assertLockedByCaller(Exclusivity exclusivity)255 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
256   switch (exclusivity) {
257     case EXCLUSIVE:
258       KJ_ASSERT(futex & EXCLUSIVE_HELD,
259                 "Tried to call getAlreadyLocked*() but lock is not held.");
260       break;
261     case SHARED:
262       KJ_ASSERT(futex & SHARED_COUNT_MASK,
263                 "Tried to call getAlreadyLocked*() but lock is not held.");
264       break;
265   }
266 }
267 
wait(Predicate & predicate,Maybe<Duration> timeout)268 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
269   // Add waiter to list.
270   Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
271   addWaiter(waiter);
272 
273   // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
274   // currently.
275   bool currentlyLocked = true;
276   KJ_DEFER({
277     if (!currentlyLocked) lock(EXCLUSIVE);
278     removeWaiter(waiter);
279   });
280 
281   if (!predicate.check()) {
282     unlock(EXCLUSIVE, &waiter);
283     currentlyLocked = false;
284 
285     struct timespec ts;
286     struct timespec* tsp = nullptr;
287     KJ_IF_MAYBE(t, timeout) {
288       ts = toAbsoluteTimespec(now() + *t);
289       tsp = &ts;
290     }
291 
292     // Wait for someone to set our futex to 1.
293     for (;;) {
294       // Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as
295       // FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on
296       // CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing
297       // us to recompute the time after every iteration.
298       KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex,
299           &waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, nullptr, FUTEX_BITSET_MATCH_ANY)) {
300         case EAGAIN:
301           // Indicates that the futex was already non-zero by the time the kernal looked at it.
302           // Not an error.
303           break;
304         case ETIMEDOUT: {
305           // Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not
306           // transferred to us from another thread. So, we need to lock it ourselves. But, another
307           // thread might be in the process of signaling us and transferring ownership. So, we
308           // first must atomically take control of our destiny.
309           KJ_ASSERT(timeout != nullptr);
310           uint expected = 0;
311           if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false,
312                                           __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
313             // OK, we set our own futex to 1. That means no other thread will, and so we won't be
314             // receiving a mutex ownership transfer. We have to lock the mutex ourselves.
315             lock(EXCLUSIVE);
316             currentlyLocked = true;
317             return;
318           } else {
319             // Oh, someone else actually did signal us, apparently. Let's move on as if the futex
320             // call told us so.
321             break;
322           }
323         }
324         default:
325           KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
326       }
327 
328       if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) {
329         // We received a lock ownership transfer from another thread.
330         currentlyLocked = true;
331 
332         // The other thread checked the predicate before the transfer.
333 #ifdef KJ_DEBUG
334         assertLockedByCaller(EXCLUSIVE);
335 #endif
336 
337         KJ_IF_MAYBE(exception, waiter.exception) {
338           // The predicate threw an exception, apparently. Propagate it.
339           // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
340           //   then want MutexGuarded::when() to skip calling the callback, but then what should it
341           //   return, since it normally returns the callback's result? Or maybe people who disable
342           //   exceptions just really should not write predicates that can throw.
343           kj::throwFatalException(kj::mv(**exception));
344         }
345 
346         return;
347       }
348     }
349   }
350 }
351 
induceSpuriousWakeupForTest()352 void Mutex::induceSpuriousWakeupForTest() {
353   auto nextWaiter = waitersHead;
354   for (;;) {
355     KJ_IF_MAYBE(waiter, nextWaiter) {
356       nextWaiter = waiter->next;
357       syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
358     } else {
359       // No more waiters.
360       break;
361     }
362   }
363 }
364 
runOnce(Initializer & init)365 void Once::runOnce(Initializer& init) {
366 startOver:
367   uint state = UNINITIALIZED;
368   if (__atomic_compare_exchange_n(&futex, &state, INITIALIZING, false,
369                                   __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
370     // It's our job to initialize!
371     {
372       KJ_ON_SCOPE_FAILURE({
373         // An exception was thrown by the initializer.  We have to revert.
374         if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) ==
375             INITIALIZING_WITH_WAITERS) {
376           // Someone was waiting for us to finish.
377           syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
378         }
379       });
380 
381       init.run();
382     }
383     if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) ==
384         INITIALIZING_WITH_WAITERS) {
385       // Someone was waiting for us to finish.
386       syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
387     }
388   } else {
389     for (;;) {
390       if (state == INITIALIZED) {
391         break;
392       } else if (state == INITIALIZING) {
393         // Initialization is taking place in another thread.  Indicate that we're waiting.
394         if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true,
395                                          __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
396           // State changed, retry.
397           continue;
398         }
399       } else {
400         KJ_DASSERT(state == INITIALIZING_WITH_WAITERS);
401       }
402 
403       // Wait for initialization.
404       syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS,
405                          nullptr, nullptr, 0);
406       state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
407 
408       if (state == UNINITIALIZED) {
409         // Oh hey, apparently whoever was trying to initialize gave up.  Let's take it from the
410         // top.
411         goto startOver;
412       }
413     }
414   }
415 }
416 
reset()417 void Once::reset() {
418   uint state = INITIALIZED;
419   if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED,
420                                    false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
421     KJ_FAIL_REQUIRE("reset() called while not initialized.");
422   }
423 }
424 
425 #elif _WIN32 || __CYGWIN__
426 // =======================================================================================
427 // Win32 implementation
428 
429 #define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock))
430 #define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce))
431 #define coercedCondvar(var) (*reinterpret_cast<CONDITION_VARIABLE*>(&var))
432 
Mutex()433 Mutex::Mutex() {
434   static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?");
435   InitializeSRWLock(&coercedSrwLock);
436 }
~Mutex()437 Mutex::~Mutex() {}
438 
lock(Exclusivity exclusivity)439 void Mutex::lock(Exclusivity exclusivity) {
440   switch (exclusivity) {
441     case EXCLUSIVE:
442       AcquireSRWLockExclusive(&coercedSrwLock);
443       break;
444     case SHARED:
445       AcquireSRWLockShared(&coercedSrwLock);
446       break;
447   }
448 }
449 
wakeReadyWaiter(Waiter * waiterToSkip)450 void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) {
451   // Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than
452   // one waiter because only one waiter could get the lock anyway, and once it releases that lock
453   // it will awake the next waiter if necessary.
454 
455   auto nextWaiter = waitersHead;
456   for (;;) {
457     KJ_IF_MAYBE(waiter, nextWaiter) {
458       nextWaiter = waiter->next;
459 
460       if (waiter != waiterToSkip && checkPredicate(*waiter)) {
461         // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
462         // use Wake vs. WakeAll here since there's always only one thread waiting.
463         WakeConditionVariable(&coercedCondvar(waiter->condvar));
464 
465         // We only need to wake one waiter. Note that unlike the futex-based implementation, we
466         // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
467         // that the condition is still true when that waiter finally awakes. However, if the
468         // condition is no longer true at that point, the waiter will re-check all other
469         // waiters' conditions and possibly wake up any other waiter who is now ready, hence we
470         // still only need to wake one waiter here.
471         return;
472       }
473     } else {
474       // No more waiters.
475       break;
476     }
477   }
478 }
479 
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)480 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
481   switch (exclusivity) {
482     case EXCLUSIVE: {
483       KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
484 
485       // Check if there are any conditional waiters. Note we only do this when unlocking an
486       // exclusive lock since under a shared lock the state couldn't have changed.
487       wakeReadyWaiter(waiterToSkip);
488       break;
489     }
490 
491     case SHARED:
492       ReleaseSRWLockShared(&coercedSrwLock);
493       break;
494   }
495 }
496 
assertLockedByCaller(Exclusivity exclusivity)497 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
498   // We could use TryAcquireSRWLock*() here like we do with the pthread version. However, as of
499   // this writing, my version of Wine (1.6.2) doesn't implement these functions and will abort if
500   // they are called. Since we were only going to use them as a hacky way to check if the lock is
501   // held for debug purposes anyway, we just don't bother.
502 }
503 
wait(Predicate & predicate,Maybe<Duration> timeout)504 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
505   // Add waiter to list.
506   Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
507   static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
508                 "CONDITION_VARIABLE is not a pointer?");
509   InitializeConditionVariable(&coercedCondvar(waiter.condvar));
510 
511   addWaiter(waiter);
512   KJ_DEFER(removeWaiter(waiter));
513 
514   DWORD sleepMs;
515 
516   // Only initialized if `timeout` is non-null.
517   const MonotonicClock* clock = nullptr;
518   kj::Maybe<kj::TimePoint> endTime;
519 
520   KJ_IF_MAYBE(t, timeout) {
521     // Windows sleeps are inaccurate -- they can be longer *or shorter* than the requested amount.
522     // For many use cases of our API, a too-short sleep would be unacceptable. Experimentally, it
523     // seems like sleeps can be up to half a millisecond short, so we'll add half a millisecond
524     // (and then we round up, below).
525     *t += 500 * kj::MICROSECONDS;
526 
527     // Compute initial sleep time.
528     sleepMs = *t / kj::MILLISECONDS;
529     if (*t % kj::MILLISECONDS > 0 * kj::SECONDS) {
530       // We guarantee we won't wake up too early.
531       ++sleepMs;
532     }
533 
534     clock = &systemPreciseMonotonicClock();
535     endTime = clock->now() + *t;
536   } else {
537     sleepMs = INFINITE;
538   }
539 
540   while (!predicate.check()) {
541     // SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other
542     // waiters that are now ready.
543     wakeReadyWaiter(&waiter);
544 
545     if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
546       // Normal result. Continue loop to check predicate.
547     } else {
548       DWORD error = GetLastError();
549       if (error == ERROR_TIMEOUT) {
550         // Windows may have woken us up too early, so don't return yet. Instead, proceed through the
551         // loop and rely on our sleep time recalculation to detect if we timed out.
552       } else {
553         KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
554       }
555     }
556 
557     KJ_IF_MAYBE(exception, waiter.exception) {
558       // The predicate threw an exception, apparently. Propagate it.
559       // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
560       //   then want MutexGuarded::when() to skip calling the callback, but then what should it
561       //   return, since it normally returns the callback's result? Or maybe people who disable
562       //   exceptions just really should not write predicates that can throw.
563       kj::throwFatalException(kj::mv(**exception));
564     }
565 
566     // Recompute sleep time.
567     KJ_IF_MAYBE(e, endTime) {
568       auto now = clock->now();
569 
570       if (*e > now) {
571         auto sleepTime = *e - now;
572         sleepMs = sleepTime / kj::MILLISECONDS;
573         if (sleepTime % kj::MILLISECONDS > 0 * kj::SECONDS) {
574           // We guarantee we won't wake up too early.
575           ++sleepMs;
576         }
577       } else {
578         // Oops, already timed out.
579         return;
580       }
581     }
582   }
583 }
584 
induceSpuriousWakeupForTest()585 void Mutex::induceSpuriousWakeupForTest() {
586   auto nextWaiter = waitersHead;
587   for (;;) {
588     KJ_IF_MAYBE(waiter, nextWaiter) {
589       nextWaiter = waiter->next;
590       WakeConditionVariable(&coercedCondvar(waiter->condvar));
591     } else {
592       // No more waiters.
593       break;
594     }
595   }
596 }
597 
nullInitializer(PINIT_ONCE initOnce,PVOID parameter,PVOID * context)598 static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) {
599   return true;
600 }
601 
Once(bool startInitialized)602 Once::Once(bool startInitialized) {
603   static_assert(sizeof(INIT_ONCE) == sizeof(initOnce), "INIT_ONCE is not a pointer?");
604   InitOnceInitialize(&coercedInitOnce);
605   if (startInitialized) {
606     InitOnceExecuteOnce(&coercedInitOnce, &nullInitializer, nullptr, nullptr);
607   }
608 }
~Once()609 Once::~Once() {}
610 
runOnce(Initializer & init)611 void Once::runOnce(Initializer& init) {
612   BOOL needInit;
613   while (!InitOnceBeginInitialize(&coercedInitOnce, 0, &needInit, nullptr)) {
614     // Init was occurring in another thread, but then failed with an exception. Retry.
615   }
616 
617   if (needInit) {
618     {
619       KJ_ON_SCOPE_FAILURE(InitOnceComplete(&coercedInitOnce, INIT_ONCE_INIT_FAILED, nullptr));
620       init.run();
621     }
622 
623     KJ_ASSERT(InitOnceComplete(&coercedInitOnce, 0, nullptr));
624   }
625 }
626 
isInitialized()627 bool Once::isInitialized() noexcept {
628   BOOL junk;
629   return InitOnceBeginInitialize(&coercedInitOnce, INIT_ONCE_CHECK_ONLY, &junk, nullptr);
630 }
631 
reset()632 void Once::reset() {
633   InitOnceInitialize(&coercedInitOnce);
634 }
635 
636 #else
637 // =======================================================================================
638 // Generic pthreads-based implementation
639 
640 #define KJ_PTHREAD_CALL(code) \
641   { \
642     int pthreadError = code; \
643     if (pthreadError != 0) { \
644       KJ_FAIL_SYSCALL(#code, pthreadError); \
645     } \
646   }
647 
648 #define KJ_PTHREAD_CLEANUP(code) \
649   { \
650     int pthreadError = code; \
651     if (pthreadError != 0) { \
652       KJ_LOG(ERROR, #code, strerror(pthreadError)); \
653     } \
654   }
655 
Mutex()656 Mutex::Mutex(): mutex(PTHREAD_RWLOCK_INITIALIZER) {}
~Mutex()657 Mutex::~Mutex() {
658   KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
659 }
660 
lock(Exclusivity exclusivity)661 void Mutex::lock(Exclusivity exclusivity) {
662   switch (exclusivity) {
663     case EXCLUSIVE:
664       KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
665       break;
666     case SHARED:
667       KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
668       break;
669   }
670 }
671 
unlock(Exclusivity exclusivity,Waiter * waiterToSkip)672 void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
673   KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
674 
675   if (exclusivity == EXCLUSIVE) {
676     // Check if there are any conditional waiters. Note we only do this when unlocking an
677     // exclusive lock since under a shared lock the state couldn't have changed.
678     auto nextWaiter = waitersHead;
679     for (;;) {
680       KJ_IF_MAYBE(waiter, nextWaiter) {
681         nextWaiter = waiter->next;
682 
683         if (waiter != waiterToSkip && checkPredicate(*waiter)) {
684           // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
685           // use _signal() vs. _broadcast() here since there's always only one thread waiting.
686           KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
687           KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
688           KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
689 
690           // We only need to wake one waiter. Note that unlike the futex-based implementation, we
691           // cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
692           // that the condition is still true when that waiter finally awakes. However, if the
693           // condition is no longer true at that point, the waiter will re-check all other waiters'
694           // conditions and possibly wake up any other waiter who is now ready, hence we still only
695           // need to wake one waiter here.
696           break;
697         }
698       } else {
699         // No more waiters.
700         break;
701       }
702     }
703   }
704 }
705 
assertLockedByCaller(Exclusivity exclusivity)706 void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
707   switch (exclusivity) {
708     case EXCLUSIVE:
709       // A read lock should fail if the mutex is already held for writing.
710       if (pthread_rwlock_tryrdlock(&mutex) == 0) {
711         pthread_rwlock_unlock(&mutex);
712         KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
713       }
714       break;
715     case SHARED:
716       // A write lock should fail if the mutex is already held for reading or writing.  We don't
717       // have any way to prove that the lock is held only for reading.
718       if (pthread_rwlock_trywrlock(&mutex) == 0) {
719         pthread_rwlock_unlock(&mutex);
720         KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held.");
721       }
722       break;
723   }
724 }
725 
wait(Predicate & predicate,Maybe<Duration> timeout)726 void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
727   // Add waiter to list.
728   Waiter waiter {
729     nullptr, waitersTail, predicate, nullptr,
730     PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
731   };
732   addWaiter(waiter);
733 
734   // To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
735   // currently.
736   bool currentlyLocked = true;
737   KJ_DEFER({
738     if (!currentlyLocked) lock(EXCLUSIVE);
739     removeWaiter(waiter);
740 
741     // Destroy pthread objects.
742     KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
743     KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar));
744   });
745 
746 #if !__APPLE__
747   if (timeout != nullptr) {
748     // Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic
749     // clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's
750     // a bizarre pthread_cond_timedwait_relative_np() method we can use instead...)
751     pthread_condattr_t attr;
752     KJ_PTHREAD_CALL(pthread_condattr_init(&attr));
753     KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
754     pthread_cond_init(&waiter.condvar, &attr);
755     KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr));
756   }
757 #endif
758 
759   Maybe<struct timespec> endTime = timeout.map([](Duration d) {
760     return toAbsoluteTimespec(now() + d);
761   });
762 
763   while (!predicate.check()) {
764     // pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic
765     // mutex before we unlock the real mutex, and the signaling thread also needs to lock this
766     // mutex, in order to ensure that this thread is actually waiting on the condvar before it is
767     // signaled.
768     KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
769 
770     // OK, now we can unlock the main mutex.
771     unlock(EXCLUSIVE, &waiter);
772     currentlyLocked = false;
773 
774     bool timedOut = false;
775 
776     // Wait for someone to signal the condvar.
777     KJ_IF_MAYBE(t, endTime) {
778 #if __APPLE__
779       // On macOS, the absolute timeout can only be specified in wall time, not monotonic time,
780       // which means modifying the system clock will break the wait. However, macOS happens to
781       // provide an alternative relative-time wait function, so I guess we'll use that. It does
782       // require recomputing the time every iteration...
783       struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS));
784       int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
785 #else
786       int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
787 #endif
788       if (error != 0) {
789         if (error == ETIMEDOUT) {
790           timedOut = true;
791         } else {
792           KJ_FAIL_SYSCALL("pthread_cond_timedwait", error);
793         }
794       }
795     } else {
796       KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
797     }
798 
799     // We have to be very careful about lock ordering here. We need to unlock stupidMutex before
800     // re-locking the main mutex, because another thread may have a lock on the main mutex already
801     // and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar
802     // right after we unlock stupidMutex but before we re-lock the main mutex. That is fine,
803     // because we've already been signaled.
804     KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
805 
806     lock(EXCLUSIVE);
807     currentlyLocked = true;
808 
809     KJ_IF_MAYBE(exception, waiter.exception) {
810       // The predicate threw an exception, apparently. Propagate it.
811       // TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
812       //   then want MutexGuarded::when() to skip calling the callback, but then what should it
813       //   return, since it normally returns the callback's result? Or maybe people who disable
814       //   exceptions just really should not write predicates that can throw.
815       kj::throwFatalException(kj::mv(**exception));
816     }
817 
818     if (timedOut) {
819       return;
820     }
821   }
822 }
823 
induceSpuriousWakeupForTest()824 void Mutex::induceSpuriousWakeupForTest() {
825   auto nextWaiter = waitersHead;
826   for (;;) {
827     KJ_IF_MAYBE(waiter, nextWaiter) {
828       nextWaiter = waiter->next;
829       KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
830       KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
831       KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
832     } else {
833       // No more waiters.
834       break;
835     }
836   }
837 }
838 
Once(bool startInitialized)839 Once::Once(bool startInitialized)
840     : state(startInitialized ? INITIALIZED : UNINITIALIZED),
841       mutex(PTHREAD_MUTEX_INITIALIZER) {}
~Once()842 Once::~Once() {
843   KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex));
844 }
845 
runOnce(Initializer & init)846 void Once::runOnce(Initializer& init) {
847   KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex));
848   KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex)));
849 
850   if (state != UNINITIALIZED) {
851     return;
852   }
853 
854   init.run();
855 
856   __atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE);
857 }
858 
reset()859 void Once::reset() {
860   State oldState = INITIALIZED;
861   if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED,
862                                    false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
863     KJ_FAIL_REQUIRE("reset() called while not initialized.");
864   }
865 }
866 
867 #endif
868 
869 }  // namespace _ (private)
870 }  // namespace kj
871