1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/system/wait_set.h"
6 
7 #include <algorithm>
8 #include <limits>
9 #include <map>
10 #include <set>
11 #include <vector>
12 
13 #include "base/containers/stack_container.h"
14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/synchronization/lock.h"
18 #include "base/synchronization/waitable_event.h"
19 #include "mojo/public/cpp/system/trap.h"
20 
21 namespace mojo {
22 
23 class WaitSet::State : public base::RefCountedThreadSafe<State> {
24  public:
State()25   State()
26       : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
27                       base::WaitableEvent::InitialState::NOT_SIGNALED) {
28     MojoResult rv = CreateTrap(&Context::OnNotification, &trap_handle_);
29     DCHECK_EQ(MOJO_RESULT_OK, rv);
30   }
31 
ShutDown()32   void ShutDown() {
33     // NOTE: This may immediately invoke Notify for every context.
34     trap_handle_.reset();
35 
36     cancelled_contexts_.clear();
37   }
38 
AddEvent(base::WaitableEvent * event)39   MojoResult AddEvent(base::WaitableEvent* event) {
40     auto result = user_events_.insert(event);
41     if (result.second)
42       return MOJO_RESULT_OK;
43     return MOJO_RESULT_ALREADY_EXISTS;
44   }
45 
RemoveEvent(base::WaitableEvent * event)46   MojoResult RemoveEvent(base::WaitableEvent* event) {
47     auto it = user_events_.find(event);
48     if (it == user_events_.end())
49       return MOJO_RESULT_NOT_FOUND;
50     user_events_.erase(it);
51     return MOJO_RESULT_OK;
52   }
53 
AddHandle(Handle handle,MojoHandleSignals signals)54   MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
55     DCHECK(trap_handle_.is_valid());
56 
57     scoped_refptr<Context> context = new Context(this, handle);
58 
59     {
60       base::AutoLock lock(lock_);
61 
62       if (handle_to_context_.count(handle))
63         return MOJO_RESULT_ALREADY_EXISTS;
64       DCHECK(!contexts_.count(context->context_value()));
65 
66       handle_to_context_[handle] = context;
67       contexts_[context->context_value()] = context;
68     }
69 
70     // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
71     // MojoAddTrigger() succeeds. Otherwise balanced immediately below.
72     context->AddRef();
73 
74     // This can notify immediately if the watcher is already armed. Don't hold
75     // |lock_| while calling it.
76     MojoResult rv =
77         MojoAddTrigger(trap_handle_.get().value(), handle.value(), signals,
78                        MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
79                        context->context_value(), nullptr);
80     if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
81       base::AutoLock lock(lock_);
82       handle_to_context_.erase(handle);
83       contexts_.erase(context->context_value());
84 
85       // Balanced above.
86       context->Release();
87       return rv;
88     }
89     DCHECK_EQ(MOJO_RESULT_OK, rv);
90 
91     return rv;
92   }
93 
RemoveHandle(Handle handle)94   MojoResult RemoveHandle(Handle handle) {
95     DCHECK(trap_handle_.is_valid());
96 
97     scoped_refptr<Context> context;
98     {
99       base::AutoLock lock(lock_);
100 
101       // Always clear |cancelled_contexts_| in case it's accumulated any more
102       // entries since the last time we ran.
103       cancelled_contexts_.clear();
104 
105       auto it = handle_to_context_.find(handle);
106       if (it == handle_to_context_.end())
107         return MOJO_RESULT_NOT_FOUND;
108 
109       context = std::move(it->second);
110       handle_to_context_.erase(it);
111 
112       // Ensure that we never return this handle as a ready result again. Note
113       // that it's removal from |handle_to_context_| above ensures it will never
114       // be added back to this map.
115       ready_handles_.erase(handle);
116     }
117 
118     // NOTE: This may enter the notification callback immediately, so don't hold
119     // |lock_| while calling it.
120     MojoResult rv = MojoRemoveTrigger(trap_handle_.get().value(),
121                                       context->context_value(), nullptr);
122 
123     // We don't really care whether or not this succeeds. In either case, the
124     // context was or will imminently be cancelled and moved from |contexts_|
125     // to |cancelled_contexts_|.
126     DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
127 
128     return rv;
129   }
130 
Wait(base::WaitableEvent ** ready_event,size_t * num_ready_handles,Handle * ready_handles,MojoResult * ready_results,MojoHandleSignalsState * signals_states)131   void Wait(base::WaitableEvent** ready_event,
132             size_t* num_ready_handles,
133             Handle* ready_handles,
134             MojoResult* ready_results,
135             MojoHandleSignalsState* signals_states) {
136     DCHECK(trap_handle_.is_valid());
137     DCHECK(num_ready_handles);
138     DCHECK(ready_handles);
139     DCHECK(ready_results);
140     {
141       base::AutoLock lock(lock_);
142       if (ready_handles_.empty()) {
143         // No handles are currently in the ready set. Make sure the event is
144         // reset and try to arm the watcher.
145         handle_event_.Reset();
146 
147         DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
148         uint32_t num_blocking_events =
149             static_cast<uint32_t>(*num_ready_handles);
150 
151         base::StackVector<MojoTrapEvent, 4> blocking_events;
152         blocking_events.container().resize(num_blocking_events);
153         for (size_t i = 0; i < num_blocking_events; ++i) {
154           blocking_events.container()[i].struct_size =
155               sizeof(blocking_events.container()[i]);
156         }
157         MojoResult rv = MojoArmTrap(trap_handle_.get().value(), nullptr,
158                                     &num_blocking_events,
159                                     blocking_events.container().data());
160 
161         if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
162           // Simulate the handles becoming ready. We do this in lieu of
163           // returning the results immediately so as to avoid potentially
164           // starving user events. i.e., we always want to call WaitMany()
165           // below.
166           handle_event_.Signal();
167           for (size_t i = 0; i < num_blocking_events; ++i) {
168             const auto& event = blocking_events.container()[i];
169             auto it = contexts_.find(event.trigger_context);
170             DCHECK(it != contexts_.end());
171             ready_handles_[it->second->handle()] = {event.result,
172                                                     event.signals_state};
173           }
174         } else if (rv == MOJO_RESULT_NOT_FOUND) {
175           // Nothing to watch. If there are no user events, always signal to
176           // avoid deadlock.
177           if (user_events_.empty())
178             handle_event_.Signal();
179         } else {
180           // Watcher must be armed now. No need to manually signal.
181           DCHECK_EQ(MOJO_RESULT_OK, rv);
182         }
183       }
184     }
185 
186     // Build a local contiguous array of events to wait on. These are rotated
187     // across Wait() calls to avoid starvation, by virtue of the fact that
188     // WaitMany guarantees left-to-right priority when multiple events are
189     // signaled.
190 
191     base::StackVector<base::WaitableEvent*, 4> events;
192     events.container().resize(user_events_.size() + 1);
193     if (waitable_index_shift_ > user_events_.size())
194       waitable_index_shift_ = 0;
195 
196     size_t dest_index = waitable_index_shift_++;
197     events.container()[dest_index] = &handle_event_;
198     for (auto* e : user_events_) {
199       dest_index = (dest_index + 1) % events.container().size();
200       events.container()[dest_index] = e;
201     }
202 
203     size_t index = base::WaitableEvent::WaitMany(events.container().data(),
204                                                  events.container().size());
205     base::AutoLock lock(lock_);
206 
207     // Pop as many handles as we can out of the ready set and return them. Note
208     // that we do this regardless of which event signaled, as there may be
209     // ready handles in any case and they may be interesting to the caller.
210     *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
211     for (size_t i = 0; i < *num_ready_handles; ++i) {
212       auto it = ready_handles_.begin();
213       ready_handles[i] = it->first;
214       ready_results[i] = it->second.result;
215       if (signals_states)
216         signals_states[i] = it->second.signals_state;
217       ready_handles_.erase(it);
218     }
219 
220     // If the caller cares, let them know which user event unblocked us, if any.
221     if (ready_event) {
222       if (events.container()[index] == &handle_event_)
223         *ready_event = nullptr;
224       else
225         *ready_event = events.container()[index];
226     }
227   }
228 
229  private:
230   friend class base::RefCountedThreadSafe<State>;
231 
232   class Context : public base::RefCountedThreadSafe<Context> {
233    public:
Context(scoped_refptr<State> state,Handle handle)234     Context(scoped_refptr<State> state, Handle handle)
235         : state_(state), handle_(handle) {}
236 
handle() const237     Handle handle() const { return handle_; }
238 
context_value() const239     uintptr_t context_value() const {
240       return reinterpret_cast<uintptr_t>(this);
241     }
242 
OnNotification(const MojoTrapEvent * event)243     static void OnNotification(const MojoTrapEvent* event) {
244       reinterpret_cast<Context*>(event->trigger_context)
245           ->Notify(event->result, event->signals_state);
246     }
247 
248    private:
249     friend class base::RefCountedThreadSafe<Context>;
250 
~Context()251     ~Context() {}
252 
Notify(MojoResult result,MojoHandleSignalsState signals_state)253     void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
254       state_->Notify(handle_, result, signals_state, this);
255     }
256 
257     const scoped_refptr<State> state_;
258     const Handle handle_;
259 
260     DISALLOW_COPY_AND_ASSIGN(Context);
261   };
262 
~State()263   ~State() {}
264 
Notify(Handle handle,MojoResult result,MojoHandleSignalsState signals_state,Context * context)265   void Notify(Handle handle,
266               MojoResult result,
267               MojoHandleSignalsState signals_state,
268               Context* context) {
269     base::AutoLock lock(lock_);
270 
271     // This notification may have raced with RemoveHandle() from another
272     // sequence. We only signal the WaitSet if that's not the case.
273     if (handle_to_context_.count(handle)) {
274       ready_handles_[handle] = {result, signals_state};
275       handle_event_.Signal();
276     }
277 
278     // Whether it's an implicit or explicit cancellation, erase from |contexts_|
279     // and append to |cancelled_contexts_|.
280     if (result == MOJO_RESULT_CANCELLED) {
281       contexts_.erase(context->context_value());
282       handle_to_context_.erase(handle);
283 
284       // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
285       // this Context's heap address is not reused too soon. For example, it
286       // would otherwise be possible for the user to call AddHandle() from the
287       // WaitSet's sequence immediately after this notification has fired on
288       // another sequence, potentially reusing the same heap address for the
289       // newly added Context; and then they may call RemoveHandle() for this
290       // handle (not knowing its context has just been implicitly cancelled) and
291       // cause the new Context to be incorrectly removed from |contexts_|.
292       //
293       // This vector is cleared on the WaitSet's own sequence every time
294       // RemoveHandle is called.
295       cancelled_contexts_.emplace_back(base::WrapRefCounted(context));
296 
297       // Balanced in State::AddHandle().
298       context->Release();
299     }
300   }
301 
302   struct ReadyState {
303     ReadyState() = default;
ReadyStatemojo::WaitSet::State::ReadyState304     ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
305         : result(result), signals_state(signals_state) {}
306     ~ReadyState() = default;
307 
308     MojoResult result = MOJO_RESULT_UNKNOWN;
309     MojoHandleSignalsState signals_state = {0, 0};
310   };
311 
312   // Not guarded by lock. Must only be accessed from the WaitSet's owning
313   // sequence.
314   ScopedTrapHandle trap_handle_;
315 
316   base::Lock lock_;
317   std::map<uintptr_t, scoped_refptr<Context>> contexts_;
318   std::map<Handle, scoped_refptr<Context>> handle_to_context_;
319   std::map<Handle, ReadyState> ready_handles_;
320   std::vector<scoped_refptr<Context>> cancelled_contexts_;
321   std::set<base::WaitableEvent*> user_events_;
322 
323   // Event signaled any time a handle notification is received.
324   base::WaitableEvent handle_event_;
325 
326   // Offset by which to rotate the current set of waitable objects. This is used
327   // to guard against event starvation, as base::WaitableEvent::WaitMany gives
328   // preference to events in left-to-right order.
329   size_t waitable_index_shift_ = 0;
330 
331   DISALLOW_COPY_AND_ASSIGN(State);
332 };
333 
WaitSet()334 WaitSet::WaitSet() : state_(new State) {}
335 
~WaitSet()336 WaitSet::~WaitSet() {
337   state_->ShutDown();
338 }
339 
AddEvent(base::WaitableEvent * event)340 MojoResult WaitSet::AddEvent(base::WaitableEvent* event) {
341   return state_->AddEvent(event);
342 }
343 
RemoveEvent(base::WaitableEvent * event)344 MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) {
345   return state_->RemoveEvent(event);
346 }
347 
AddHandle(Handle handle,MojoHandleSignals signals)348 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
349   return state_->AddHandle(handle, signals);
350 }
351 
RemoveHandle(Handle handle)352 MojoResult WaitSet::RemoveHandle(Handle handle) {
353   return state_->RemoveHandle(handle);
354 }
355 
Wait(base::WaitableEvent ** ready_event,size_t * num_ready_handles,Handle * ready_handles,MojoResult * ready_results,MojoHandleSignalsState * signals_states)356 void WaitSet::Wait(base::WaitableEvent** ready_event,
357                    size_t* num_ready_handles,
358                    Handle* ready_handles,
359                    MojoResult* ready_results,
360                    MojoHandleSignalsState* signals_states) {
361   state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results,
362                signals_states);
363 }
364 
365 }  // namespace mojo
366