1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/synchronization/waitable_event.h"
6 
7 #include <dispatch/dispatch.h>
8 #include <mach/mach.h>
9 #include <sys/event.h>
10 
11 #include "base/debug/activity_tracker.h"
12 #include "base/files/scoped_file.h"
13 #include "base/mac/dispatch_source_mach.h"
14 #include "base/mac/mac_util.h"
15 #include "base/mac/mach_logging.h"
16 #include "base/mac/scoped_dispatch_object.h"
17 #include "base/optional.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "base/threading/scoped_blocking_call.h"
20 #include "base/threading/thread_restrictions.h"
21 #include "base/time/time.h"
22 #include "base/time/time_override.h"
23 #include "build/build_config.h"
24 
25 namespace base {
26 
WaitableEvent(ResetPolicy reset_policy,InitialState initial_state)27 WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
28                              InitialState initial_state)
29     : policy_(reset_policy) {
30   mach_port_options_t options{};
31   options.flags = MPO_INSERT_SEND_RIGHT;
32   options.mpl.mpl_qlimit = 1;
33 
34   mach_port_t name;
35   kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name);
36   MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
37 
38   receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_));
39   send_right_.reset(name);
40 
41   if (initial_state == InitialState::SIGNALED)
42     Signal();
43 }
44 
45 WaitableEvent::~WaitableEvent() = default;
46 
Reset()47 void WaitableEvent::Reset() {
48   PeekPort(receive_right_->Name(), true);
49 }
50 
51 // NO_THREAD_SAFETY_ANALYSIS: Runtime dependent locking.
Signal()52 void WaitableEvent::Signal() NO_THREAD_SAFETY_ANALYSIS {
53   // If using the slow watch-list, copy the watchers to a local. After
54   // mach_msg(), the event object may be deleted by an awoken thread.
55   const bool use_slow_path = UseSlowWatchList(policy_);
56   ReceiveRight* receive_right = nullptr;  // Manually reference counted.
57   std::unique_ptr<std::list<OnceClosure>> watch_list;
58   if (use_slow_path) {
59     // To avoid a race condition of a WaitableEventWatcher getting added
60     // while another thread is in this method, hold the watch-list lock for
61     // the duration of mach_msg(). This requires ref-counting the
62     // |receive_right_| object that contains it, in case the event is deleted
63     // by a waiting thread after mach_msg().
64     receive_right = receive_right_.get();
65     receive_right->AddRef();
66 
67     ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList();
68     slow_watch_list->lock.Acquire();
69 
70     if (!slow_watch_list->list.empty()) {
71       watch_list.reset(new std::list<OnceClosure>());
72       std::swap(*watch_list, slow_watch_list->list);
73     }
74   }
75 
76   mach_msg_empty_send_t msg{};
77   msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
78   msg.header.msgh_size = sizeof(&msg);
79   msg.header.msgh_remote_port = send_right_.get();
80   // If the event is already signaled, this will time out because the queue
81   // has a length of one.
82   kern_return_t kr =
83       mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0,
84                MACH_PORT_NULL, 0, MACH_PORT_NULL);
85   MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
86 
87   if (use_slow_path) {
88     // If a WaitableEventWatcher were to start watching when the event is
89     // signaled, it runs the callback immediately without adding it to the
90     // list. Therefore the watch list can only be non-empty if the event is
91     // newly signaled.
92     if (watch_list.get()) {
93       MACH_CHECK(kr == KERN_SUCCESS, kr);
94       for (auto& watcher : *watch_list) {
95         std::move(watcher).Run();
96       }
97     }
98 
99     receive_right->SlowWatchList()->lock.Release();
100     receive_right->Release();
101   }
102 }
103 
IsSignaled()104 bool WaitableEvent::IsSignaled() {
105   return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
106 }
107 
Wait()108 void WaitableEvent::Wait() {
109   bool result = TimedWait(TimeDelta::Max());
110   DCHECK(result) << "TimedWait() should never fail with infinite timeout";
111 }
112 
TimedWait(const TimeDelta & wait_delta)113 bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) {
114   if (wait_delta <= TimeDelta())
115     return IsSignaled();
116 
117   // Record the event that this thread is blocking upon (for hang diagnosis) and
118   // consider blocked for scheduling purposes. Ignore this for non-blocking
119   // WaitableEvents.
120   Optional<debug::ScopedEventWaitActivity> event_activity;
121   Optional<internal::ScopedBlockingCallWithBaseSyncPrimitives>
122       scoped_blocking_call;
123   if (waiting_is_blocking_) {
124     event_activity.emplace(this);
125     scoped_blocking_call.emplace(FROM_HERE, BlockingType::MAY_BLOCK);
126   }
127 
128   mach_msg_empty_rcv_t msg{};
129   msg.header.msgh_local_port = receive_right_->Name();
130 
131   mach_msg_option_t options = MACH_RCV_MSG;
132 
133   if (!wait_delta.is_max())
134     options |= MACH_RCV_TIMEOUT | MACH_RCV_INTERRUPT;
135 
136   mach_msg_size_t rcv_size = sizeof(msg);
137   if (policy_ == ResetPolicy::MANUAL) {
138     // To avoid dequeing the message, receive with a size of 0 and set
139     // MACH_RCV_LARGE to keep the message in the queue.
140     options |= MACH_RCV_LARGE;
141     rcv_size = 0;
142   }
143 
144   // TimeTicks takes care of overflow but we special case is_max() nonetheless
145   // to avoid invoking TimeTicksNowIgnoringOverride() unnecessarily (same for
146   // the increment step of the for loop if the condition variable returns
147   // early). Ref: https://crbug.com/910524#c7
148   const TimeTicks end_time =
149       wait_delta.is_max() ? TimeTicks::Max()
150                           : subtle::TimeTicksNowIgnoringOverride() + wait_delta;
151   // Fake |kr| value to boostrap the for loop.
152   kern_return_t kr = MACH_RCV_INTERRUPTED;
153   for (mach_msg_timeout_t timeout = wait_delta.is_max()
154                                         ? MACH_MSG_TIMEOUT_NONE
155                                         : wait_delta.InMillisecondsRoundedUp();
156        // If the thread is interrupted during mach_msg(), the system call will
157        // be restarted. However, the libsyscall wrapper does not adjust the
158        // timeout by the amount of time already waited. Using MACH_RCV_INTERRUPT
159        // will instead return from mach_msg(), so that the call can be retried
160        // with an adjusted timeout.
161        kr == MACH_RCV_INTERRUPTED;
162        timeout =
163            end_time.is_max()
164                ? MACH_MSG_TIMEOUT_NONE
165                : std::max<int64_t>(
166                      0, (end_time - subtle::TimeTicksNowIgnoringOverride())
167                             .InMillisecondsRoundedUp())) {
168     kr = mach_msg(&msg.header, options, 0, rcv_size, receive_right_->Name(),
169                   timeout, MACH_PORT_NULL);
170   }
171 
172   if (kr == KERN_SUCCESS) {
173     return true;
174   } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
175     return true;
176   } else {
177     MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
178     return false;
179   }
180 }
181 
182 // static
UseSlowWatchList(ResetPolicy policy)183 bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) {
184 #if defined(OS_IOS)
185   const bool use_slow_path = false;
186 #else
187   static bool use_slow_path = !mac::IsAtLeastOS10_12();
188 #endif
189   return policy == ResetPolicy::MANUAL && use_slow_path;
190 }
191 
192 // static
WaitMany(WaitableEvent ** raw_waitables,size_t count)193 size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
194   DCHECK(count) << "Cannot wait on no events";
195   internal::ScopedBlockingCallWithBaseSyncPrimitives scoped_blocking_call(
196       FROM_HERE, BlockingType::MAY_BLOCK);
197   // Record an event (the first) that this thread is blocking upon.
198   debug::ScopedEventWaitActivity event_activity(raw_waitables[0]);
199 
200   // On macOS 10.11+, using Mach port sets may cause system instability, per
201   // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used
202   // instead to work around that. On macOS 10.9 and 10.10, kqueue only works
203   // for port sets, so port sets are just used directly. On macOS 10.11,
204   // libdispatch sources are used. Therefore, there are three different
205   // primitives that can be used to implement WaitMany. Which one to use is
206   // selected at run-time by OS version checks.
207   enum WaitManyPrimitive {
208     KQUEUE,
209     DISPATCH,
210     PORT_SET,
211   };
212 #if defined(OS_IOS)
213   const WaitManyPrimitive kPrimitive = PORT_SET;
214 #else
215   const WaitManyPrimitive kPrimitive =
216       mac::IsAtLeastOS10_12() ? KQUEUE
217                               : (mac::IsOS10_11() ? DISPATCH : PORT_SET);
218 #endif
219   if (kPrimitive == KQUEUE) {
220     std::vector<kevent64_s> events(count);
221     for (size_t i = 0; i < count; ++i) {
222       EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(),
223                EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0);
224     }
225 
226     std::vector<kevent64_s> out_events(count);
227 
228     ScopedFD wait_many(kqueue());
229     PCHECK(wait_many.is_valid()) << "kqueue";
230 
231     int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count,
232                                    out_events.data(), count, 0, nullptr));
233     PCHECK(rv > 0) << "kevent64";
234 
235     size_t triggered = -1;
236     for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
237       // WaitMany should return the lowest index in |raw_waitables| that was
238       // triggered.
239       size_t index = static_cast<size_t>(out_events[i].udata);
240       triggered = std::min(triggered, index);
241     }
242 
243     if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) {
244       // The message needs to be dequeued to reset the event.
245       PeekPort(raw_waitables[triggered]->receive_right_->Name(), true);
246     }
247 
248     return triggered;
249   } else if (kPrimitive == DISPATCH) {
250     // Each item in |raw_waitables| will be watched using a dispatch souce
251     // scheduled on the serial |queue|. The first one to be invoked will
252     // signal the |semaphore| that this method will wait on.
253     ScopedDispatchObject<dispatch_queue_t> queue(dispatch_queue_create(
254         "org.chromium.base.WaitableEvent.WaitMany", DISPATCH_QUEUE_SERIAL));
255     ScopedDispatchObject<dispatch_semaphore_t> semaphore(
256         dispatch_semaphore_create(0));
257 
258     // Block capture references. |signaled| will identify the index in
259     // |raw_waitables| whose source was invoked.
260     dispatch_semaphore_t semaphore_ref = semaphore.get();
261     const size_t kUnsignaled = -1;
262     __block size_t signaled = kUnsignaled;
263 
264     // Create a MACH_RECV dispatch source for each event. These must be
265     // destroyed before the |queue| and |semaphore|.
266     std::vector<std::unique_ptr<DispatchSourceMach>> sources;
267     for (size_t i = 0; i < count; ++i) {
268       const bool auto_reset =
269           raw_waitables[i]->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC;
270       // The block will copy a reference to |right|.
271       scoped_refptr<WaitableEvent::ReceiveRight> right =
272           raw_waitables[i]->receive_right_;
273       auto source =
274           std::make_unique<DispatchSourceMach>(queue, right->Name(), ^{
275             // After the semaphore is signaled, another event be signaled and
276             // the source may have its block put on the |queue|. WaitMany
277             // should only report (and auto-reset) one event, so the first
278             // event to signal is reported.
279             if (signaled == kUnsignaled) {
280               signaled = i;
281               if (auto_reset) {
282                 PeekPort(right->Name(), true);
283               }
284               dispatch_semaphore_signal(semaphore_ref);
285             }
286           });
287       source->Resume();
288       sources.push_back(std::move(source));
289     }
290 
291     dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
292     DCHECK_NE(signaled, kUnsignaled);
293     return signaled;
294   } else {
295     DCHECK_EQ(kPrimitive, PORT_SET);
296 
297     kern_return_t kr;
298 
299     mac::ScopedMachPortSet port_set;
300     {
301       mach_port_t name;
302       kr =
303           mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
304       MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
305       port_set.reset(name);
306     }
307 
308     for (size_t i = 0; i < count; ++i) {
309       kr = mach_port_insert_member(mach_task_self(),
310                                    raw_waitables[i]->receive_right_->Name(),
311                                    port_set.get());
312       MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
313     }
314 
315     mach_msg_empty_rcv_t msg{};
316     // Wait on the port set. Only specify space enough for the header, to
317     // identify which port in the set is signaled. Otherwise, receiving from the
318     // port set may dequeue a message for a manual-reset event object, which
319     // would cause it to be reset.
320     kr = mach_msg(&msg.header,
321                   MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0,
322                   sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL);
323     MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
324 
325     for (size_t i = 0; i < count; ++i) {
326       WaitableEvent* event = raw_waitables[i];
327       if (msg.header.msgh_local_port == event->receive_right_->Name()) {
328         if (event->policy_ == ResetPolicy::AUTOMATIC) {
329           // The message needs to be dequeued to reset the event.
330           PeekPort(msg.header.msgh_local_port, true);
331         }
332         return i;
333       }
334     }
335 
336     NOTREACHED();
337     return 0;
338   }
339 }
340 
341 // static
PeekPort(mach_port_t port,bool dequeue)342 bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
343   if (dequeue) {
344     mach_msg_empty_rcv_t msg{};
345     msg.header.msgh_local_port = port;
346     kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
347                                 sizeof(msg), port, 0, MACH_PORT_NULL);
348     if (kr == KERN_SUCCESS) {
349       return true;
350     } else {
351       MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
352       return false;
353     }
354   } else {
355     mach_port_seqno_t seqno = 0;
356     mach_msg_size_t size;
357     mach_msg_id_t id;
358     mach_msg_trailer_t trailer;
359     mach_msg_type_number_t trailer_size = sizeof(trailer);
360     kern_return_t kr = mach_port_peek(
361         mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
362         &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
363         &trailer_size);
364     if (kr == KERN_SUCCESS) {
365       return true;
366     } else {
367       MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
368       return false;
369     }
370   }
371 }
372 
ReceiveRight(mach_port_t name,bool create_slow_watch_list)373 WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name,
374                                           bool create_slow_watch_list)
375     : right_(name),
376       slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {}
377 
378 WaitableEvent::ReceiveRight::~ReceiveRight() = default;
379 
380 WaitableEvent::ReceiveRight::WatchList::WatchList() = default;
381 
382 WaitableEvent::ReceiveRight::WatchList::~WatchList() = default;
383 
384 }  // namespace base
385