1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 
7 #include <stdint.h>
8 
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
22 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
23 
24 namespace mojo {
25 namespace internal {
26 
27 // InterfaceEndpoint stores the information of an interface endpoint registered
28 // with the router.
29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
30 // this object.
31 class MultiplexRouter::InterfaceEndpoint
32     : public base::RefCountedThreadSafe<InterfaceEndpoint>,
33       public InterfaceEndpointController {
34  public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)35   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
36       : router_(router),
37         id_(id),
38         closed_(false),
39         peer_closed_(false),
40         handle_created_(false),
41         client_(nullptr) {}
42 
43   // ---------------------------------------------------------------------------
44   // The following public methods are safe to call from any sequence without
45   // locking.
46 
id() const47   InterfaceId id() const { return id_; }
48 
49   // ---------------------------------------------------------------------------
50   // The following public methods are called under the router's lock.
51 
closed() const52   bool closed() const { return closed_; }
set_closed()53   void set_closed() {
54     router_->AssertLockAcquired();
55     closed_ = true;
56   }
57 
peer_closed() const58   bool peer_closed() const { return peer_closed_; }
set_peer_closed()59   void set_peer_closed() {
60     router_->AssertLockAcquired();
61     peer_closed_ = true;
62   }
63 
handle_created() const64   bool handle_created() const { return handle_created_; }
set_handle_created()65   void set_handle_created() {
66     router_->AssertLockAcquired();
67     handle_created_ = true;
68   }
69 
disconnect_reason() const70   const base::Optional<DisconnectReason>& disconnect_reason() const {
71     return disconnect_reason_;
72   }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)73   void set_disconnect_reason(
74       const base::Optional<DisconnectReason>& disconnect_reason) {
75     router_->AssertLockAcquired();
76     disconnect_reason_ = disconnect_reason;
77   }
78 
task_runner() const79   base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
80 
client() const81   InterfaceEndpointClient* client() const { return client_; }
82 
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)83   void AttachClient(InterfaceEndpointClient* client,
84                     scoped_refptr<base::SequencedTaskRunner> runner) {
85     router_->AssertLockAcquired();
86     DCHECK(!client_);
87     DCHECK(!closed_);
88     DCHECK(runner->RunsTasksInCurrentSequence());
89 
90     task_runner_ = std::move(runner);
91     client_ = client;
92   }
93 
94   // This method must be called on the same sequence as the corresponding
95   // AttachClient() call.
DetachClient()96   void DetachClient() {
97     router_->AssertLockAcquired();
98     DCHECK(client_);
99     DCHECK(task_runner_->RunsTasksInCurrentSequence());
100     DCHECK(!closed_);
101 
102     task_runner_ = nullptr;
103     client_ = nullptr;
104     sync_watcher_.reset();
105   }
106 
SignalSyncMessageEvent()107   void SignalSyncMessageEvent() {
108     router_->AssertLockAcquired();
109     if (sync_message_event_signaled_)
110       return;
111     sync_message_event_signaled_ = true;
112     if (sync_watcher_)
113       sync_watcher_->SignalEvent();
114   }
115 
ResetSyncMessageSignal()116   void ResetSyncMessageSignal() {
117     router_->AssertLockAcquired();
118     if (!sync_message_event_signaled_)
119       return;
120     sync_message_event_signaled_ = false;
121     if (sync_watcher_)
122       sync_watcher_->ResetEvent();
123   }
124 
125   // ---------------------------------------------------------------------------
126   // The following public methods (i.e., InterfaceEndpointController
127   // implementation) are called by the client on the same sequence as the
128   // AttachClient() call. They are called outside of the router's lock.
129 
SendMessage(Message * message)130   bool SendMessage(Message* message) override {
131     DCHECK(task_runner_->RunsTasksInCurrentSequence());
132     message->set_interface_id(id_);
133     return router_->connector_.Accept(message);
134   }
135 
AllowWokenUpBySyncWatchOnSameThread()136   void AllowWokenUpBySyncWatchOnSameThread() override {
137     DCHECK(task_runner_->RunsTasksInCurrentSequence());
138 
139     EnsureSyncWatcherExists();
140     sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
141   }
142 
SyncWatch(const bool * should_stop)143   bool SyncWatch(const bool* should_stop) override {
144     DCHECK(task_runner_->RunsTasksInCurrentSequence());
145 
146     EnsureSyncWatcherExists();
147     return sync_watcher_->SyncWatch(should_stop);
148   }
149 
150  private:
151   friend class base::RefCountedThreadSafe<InterfaceEndpoint>;
152 
~InterfaceEndpoint()153   ~InterfaceEndpoint() override {
154     router_->AssertLockAcquired();
155 
156     DCHECK(!client_);
157   }
158 
OnSyncEventSignaled()159   void OnSyncEventSignaled() {
160     DCHECK(task_runner_->RunsTasksInCurrentSequence());
161     scoped_refptr<MultiplexRouter> router_protector(router_);
162 
163     MayAutoLock locker(&router_->lock_);
164     scoped_refptr<InterfaceEndpoint> self_protector(this);
165 
166     bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
167 
168     if (!more_to_process)
169       ResetSyncMessageSignal();
170 
171     // Currently there are no queued sync messages and the peer has closed so
172     // there won't be incoming sync messages in the future.
173     if (!more_to_process && peer_closed_) {
174       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
175       // on the call stack, resetting the sync watcher will allow it to exit
176       // when the call stack unwinds to that frame.
177       sync_watcher_.reset();
178     }
179   }
180 
EnsureSyncWatcherExists()181   void EnsureSyncWatcherExists() {
182     DCHECK(task_runner_->RunsTasksInCurrentSequence());
183     if (sync_watcher_)
184       return;
185 
186     MayAutoLock locker(&router_->lock_);
187     sync_watcher_ =
188         std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating(
189             &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this)));
190     if (sync_message_event_signaled_)
191       sync_watcher_->SignalEvent();
192   }
193 
194   // ---------------------------------------------------------------------------
195   // The following members are safe to access from any sequence.
196 
197   MultiplexRouter* const router_;
198   const InterfaceId id_;
199 
200   // ---------------------------------------------------------------------------
201   // The following members are accessed under the router's lock.
202 
203   // Whether the endpoint has been closed.
204   bool closed_;
205   // Whether the peer endpoint has been closed.
206   bool peer_closed_;
207 
208   // Whether there is already a ScopedInterfaceEndpointHandle created for this
209   // endpoint.
210   bool handle_created_;
211 
212   base::Optional<DisconnectReason> disconnect_reason_;
213 
214   // The task runner on which |client_|'s methods can be called.
215   scoped_refptr<base::SequencedTaskRunner> task_runner_;
216   // Not owned. It is null if no client is attached to this endpoint.
217   InterfaceEndpointClient* client_;
218 
219   // Indicates whether the sync watcher should be signaled for this endpoint.
220   bool sync_message_event_signaled_ = false;
221 
222   // Guarded by the router's lock. Used to synchronously wait on replies.
223   std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_;
224 
225   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
226 };
227 
228 // MessageWrapper objects are always destroyed under the router's lock. On
229 // destruction, if the message it wrappers contains interface IDs, the wrapper
230 // closes the corresponding endpoints.
231 class MultiplexRouter::MessageWrapper {
232  public:
233   MessageWrapper() = default;
234 
MessageWrapper(MultiplexRouter * router,Message message)235   MessageWrapper(MultiplexRouter* router, Message message)
236       : router_(router), value_(std::move(message)) {}
237 
MessageWrapper(MessageWrapper && other)238   MessageWrapper(MessageWrapper&& other)
239       : router_(other.router_), value_(std::move(other.value_)) {}
240 
~MessageWrapper()241   ~MessageWrapper() {
242     if (!router_ || value_.IsNull())
243       return;
244 
245     router_->AssertLockAcquired();
246     // Don't try to close the endpoints if at this point the router is already
247     // half-destructed.
248     if (!router_->being_destructed_)
249       router_->CloseEndpointsForMessage(value_);
250   }
251 
operator =(MessageWrapper && other)252   MessageWrapper& operator=(MessageWrapper&& other) {
253     router_ = other.router_;
254     value_ = std::move(other.value_);
255     return *this;
256   }
257 
value() const258   const Message& value() const { return value_; }
259 
260   // Must be called outside of the router's lock.
261   // Returns a null message if it fails to deseralize the associated endpoint
262   // handles.
DeserializeEndpointHandlesAndTake()263   Message DeserializeEndpointHandlesAndTake() {
264     if (!value_.DeserializeAssociatedEndpointHandles(router_)) {
265       // The previous call may have deserialized part of the associated
266       // interface endpoint handles. They must be destroyed outside of the
267       // router's lock, so we cannot wait until destruction of MessageWrapper.
268       value_.Reset();
269       return Message();
270     }
271     return std::move(value_);
272   }
273 
274  private:
275   MultiplexRouter* router_ = nullptr;
276   Message value_;
277 
278   DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
279 };
280 
281 struct MultiplexRouter::Task {
282  public:
283   // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task284   static std::unique_ptr<Task> CreateMessageTask(
285       MessageWrapper message_wrapper) {
286     Task* task = new Task(MESSAGE);
287     task->message_wrapper = std::move(message_wrapper);
288     return base::WrapUnique(task);
289   }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task290   static std::unique_ptr<Task> CreateNotifyErrorTask(
291       InterfaceEndpoint* endpoint) {
292     Task* task = new Task(NOTIFY_ERROR);
293     task->endpoint_to_notify = endpoint;
294     return base::WrapUnique(task);
295   }
296 
~Taskmojo::internal::MultiplexRouter::Task297   ~Task() {}
298 
IsMessageTaskmojo::internal::MultiplexRouter::Task299   bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task300   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
301 
302   MessageWrapper message_wrapper;
303   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
304 
305   enum Type { MESSAGE, NOTIFY_ERROR };
306   Type type;
307 
308  private:
Taskmojo::internal::MultiplexRouter::Task309   explicit Task(Type in_type) : type(in_type) {}
310 
311   DISALLOW_COPY_AND_ASSIGN(Task);
312 };
313 
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namespace_bit,scoped_refptr<base::SequencedTaskRunner> runner)314 MultiplexRouter::MultiplexRouter(
315     ScopedMessagePipeHandle message_pipe,
316     Config config,
317     bool set_interface_id_namespace_bit,
318     scoped_refptr<base::SequencedTaskRunner> runner)
319     : set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
320       task_runner_(runner),
321       dispatcher_(this),
322       connector_(std::move(message_pipe),
323                  config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
324                                            : Connector::SINGLE_THREADED_SEND,
325                  std::move(runner)),
326       control_message_handler_(this),
327       control_message_proxy_(&connector_) {
328   DCHECK(task_runner_->RunsTasksInCurrentSequence());
329 
330   if (config == MULTI_INTERFACE)
331     lock_.emplace();
332 
333   if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
334       config == MULTI_INTERFACE) {
335     // Always participate in sync handle watching in multi-interface mode,
336     // because even if it doesn't expect sync requests during sync handle
337     // watching, it may still need to dispatch messages to associated endpoints
338     // on a different sequence.
339     connector_.AllowWokenUpBySyncWatchOnSameThread();
340   }
341   connector_.set_incoming_receiver(&dispatcher_);
342   connector_.set_connection_error_handler(
343       base::BindOnce(&MultiplexRouter::OnPipeConnectionError,
344                      base::Unretained(this), false /* force_async_dispatch */));
345 
346   scoped_refptr<internal::MessageQuotaChecker> quota_checker =
347       internal::MessageQuotaChecker::MaybeCreate();
348   if (quota_checker)
349     connector_.SetMessageQuotaChecker(std::move(quota_checker));
350 
351   std::unique_ptr<MessageHeaderValidator> header_validator =
352       std::make_unique<MessageHeaderValidator>();
353   header_validator_ = header_validator.get();
354   dispatcher_.SetValidator(std::move(header_validator));
355 }
356 
~MultiplexRouter()357 MultiplexRouter::~MultiplexRouter() {
358   MayAutoLock locker(&lock_);
359 
360   being_destructed_ = true;
361 
362   sync_message_tasks_.clear();
363   tasks_.clear();
364   endpoints_.clear();
365 }
366 
SetIncomingMessageFilter(std::unique_ptr<MessageFilter> filter)367 void MultiplexRouter::SetIncomingMessageFilter(
368     std::unique_ptr<MessageFilter> filter) {
369   dispatcher_.SetFilter(std::move(filter));
370 }
371 
SetMasterInterfaceName(const char * name)372 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
373   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
374   header_validator_->SetDescription(std::string(name) +
375                                     " [master] MessageHeaderValidator");
376   control_message_handler_.SetDescription(
377       std::string(name) + " [master] PipeControlMessageHandler");
378   connector_.SetWatcherHeapProfilerTag(name);
379 }
380 
SetConnectionGroup(ConnectionGroup::Ref ref)381 void MultiplexRouter::SetConnectionGroup(ConnectionGroup::Ref ref) {
382   connector_.SetConnectionGroup(std::move(ref));
383 }
384 
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)385 InterfaceId MultiplexRouter::AssociateInterface(
386     ScopedInterfaceEndpointHandle handle_to_send) {
387   if (!handle_to_send.pending_association())
388     return kInvalidInterfaceId;
389 
390   uint32_t id = 0;
391   {
392     MayAutoLock locker(&lock_);
393     do {
394       if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
395         next_interface_id_value_ = 1;
396       id = next_interface_id_value_++;
397       if (set_interface_id_namespace_bit_)
398         id |= kInterfaceIdNamespaceMask;
399     } while (base::Contains(endpoints_, id));
400 
401     InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
402     endpoints_[id] = endpoint;
403     if (encountered_error_)
404       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
405     endpoint->set_handle_created();
406   }
407 
408   if (!NotifyAssociation(&handle_to_send, id)) {
409     // The peer handle of |handle_to_send|, which is supposed to join this
410     // associated group, has been closed.
411     {
412       MayAutoLock locker(&lock_);
413       InterfaceEndpoint* endpoint = FindEndpoint(id);
414       if (endpoint)
415         UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
416     }
417 
418     control_message_proxy_.NotifyPeerEndpointClosed(
419         id, handle_to_send.disconnect_reason());
420   }
421   return id;
422 }
423 
CreateLocalEndpointHandle(InterfaceId id)424 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
425     InterfaceId id) {
426   if (!IsValidInterfaceId(id))
427     return ScopedInterfaceEndpointHandle();
428 
429   MayAutoLock locker(&lock_);
430   bool inserted = false;
431   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
432   if (inserted) {
433     if (encountered_error_)
434       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
435   } else {
436     if (endpoint->handle_created() || endpoint->closed())
437       return ScopedInterfaceEndpointHandle();
438   }
439 
440   endpoint->set_handle_created();
441   return CreateScopedInterfaceEndpointHandle(id);
442 }
443 
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)444 void MultiplexRouter::CloseEndpointHandle(
445     InterfaceId id,
446     const base::Optional<DisconnectReason>& reason) {
447   if (!IsValidInterfaceId(id))
448     return;
449 
450   MayAutoLock locker(&lock_);
451   DCHECK(base::Contains(endpoints_, id));
452   InterfaceEndpoint* endpoint = endpoints_[id].get();
453   DCHECK(!endpoint->client());
454   DCHECK(!endpoint->closed());
455   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
456 
457   if (!IsMasterInterfaceId(id) || reason) {
458     MayAutoUnlock unlocker(&lock_);
459     control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
460   }
461 
462   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
463 }
464 
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)465 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
466     const ScopedInterfaceEndpointHandle& handle,
467     InterfaceEndpointClient* client,
468     scoped_refptr<base::SequencedTaskRunner> runner) {
469   const InterfaceId id = handle.id();
470 
471   DCHECK(IsValidInterfaceId(id));
472   DCHECK(client);
473 
474   MayAutoLock locker(&lock_);
475   DCHECK(base::Contains(endpoints_, id));
476 
477   InterfaceEndpoint* endpoint = endpoints_[id].get();
478   endpoint->AttachClient(client, std::move(runner));
479 
480   if (endpoint->peer_closed())
481     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
482   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
483 
484   return endpoint;
485 }
486 
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)487 void MultiplexRouter::DetachEndpointClient(
488     const ScopedInterfaceEndpointHandle& handle) {
489   const InterfaceId id = handle.id();
490 
491   DCHECK(IsValidInterfaceId(id));
492 
493   MayAutoLock locker(&lock_);
494   DCHECK(base::Contains(endpoints_, id));
495 
496   InterfaceEndpoint* endpoint = endpoints_[id].get();
497   endpoint->DetachClient();
498 }
499 
RaiseError()500 void MultiplexRouter::RaiseError() {
501   if (task_runner_->RunsTasksInCurrentSequence()) {
502     connector_.RaiseError();
503   } else {
504     task_runner_->PostTask(FROM_HERE,
505                            base::BindOnce(&MultiplexRouter::RaiseError, this));
506   }
507 }
508 
PrefersSerializedMessages()509 bool MultiplexRouter::PrefersSerializedMessages() {
510   MayAutoLock locker(&lock_);
511   return connector_.PrefersSerializedMessages();
512 }
513 
CloseMessagePipe()514 void MultiplexRouter::CloseMessagePipe() {
515   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
516   connector_.CloseMessagePipe();
517   flush_pipe_watcher_.reset();
518   active_flush_pipe_.reset();
519   // CloseMessagePipe() above won't trigger connection error handler.
520   // Explicitly call OnPipeConnectionError() so that associated endpoints will
521   // get notified.
522   OnPipeConnectionError(true /* force_async_dispatch */);
523 }
524 
PauseIncomingMethodCallProcessing()525 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
526   PauseInternal(/*must_resume_manually=*/true);
527 }
528 
ResumeIncomingMethodCallProcessing()529 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
530   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
531   // If the owner is manually resuming from a previous pause request, the
532   // interface may also still be paused due to waiting on a pending async flush
533   // in the system.
534   //
535   // In that case we ignore the caller, except to subsequently allow implicit
536   // resume once the pending flush operation is finished.
537   if (active_flush_pipe_) {
538     MayAutoLock locker(&lock_);
539     must_resume_manually_ = false;
540     return;
541   }
542 
543   connector_.ResumeIncomingMethodCallProcessing();
544 
545   MayAutoLock locker(&lock_);
546   paused_ = false;
547   must_resume_manually_ = false;
548 
549   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
550     auto sync_iter = sync_message_tasks_.find(iter->first);
551     if (iter->second->peer_closed() ||
552         (sync_iter != sync_message_tasks_.end() &&
553          !sync_iter->second.empty())) {
554       iter->second->SignalSyncMessageEvent();
555     }
556   }
557 
558   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
559 }
560 
FlushAsync(AsyncFlusher flusher)561 void MultiplexRouter::FlushAsync(AsyncFlusher flusher) {
562   control_message_proxy_.FlushAsync(std::move(flusher));
563 }
564 
PausePeerUntilFlushCompletes(PendingFlush flush)565 void MultiplexRouter::PausePeerUntilFlushCompletes(PendingFlush flush) {
566   control_message_proxy_.PausePeerUntilFlushCompletes(std::move(flush));
567 }
568 
HasAssociatedEndpoints() const569 bool MultiplexRouter::HasAssociatedEndpoints() const {
570   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
571   MayAutoLock locker(&lock_);
572 
573   if (endpoints_.size() > 1)
574     return true;
575   if (endpoints_.size() == 0)
576     return false;
577 
578   return !base::Contains(endpoints_, kMasterInterfaceId);
579 }
580 
EnableBatchDispatch()581 void MultiplexRouter::EnableBatchDispatch() {
582   connector_.set_force_immediate_dispatch(true);
583 }
584 
EnableTestingMode()585 void MultiplexRouter::EnableTestingMode() {
586   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
587   MayAutoLock locker(&lock_);
588 
589   testing_mode_ = true;
590   connector_.set_enforce_errors_from_incoming_receiver(false);
591 }
592 
Accept(Message * message)593 bool MultiplexRouter::Accept(Message* message) {
594   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
595 
596   // Insert endpoints for the payload interface IDs as soon as the message
597   // arrives, instead of waiting till the message is dispatched. Consider the
598   // following sequence:
599   // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not
600   //    dispatched because a sync call is blocking the thread.
601   // 2) Sync message msg2 arrives targeting interface ID x.
602   //
603   // If we don't insert endpoint for interface ID x, when trying to dispatch
604   // msg2 we don't know whether it is an unexpected message or it is just
605   // because the message containing x hasn't been dispatched.
606   if (!InsertEndpointsForMessage(*message))
607     return false;
608 
609   scoped_refptr<MultiplexRouter> protector(this);
610   MayAutoLock locker(&lock_);
611 
612   DCHECK(!paused_);
613 
614   ClientCallBehavior client_call_behavior =
615       connector_.during_sync_handle_watcher_callback()
616           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
617           : ALLOW_DIRECT_CLIENT_CALLS;
618 
619   MessageWrapper message_wrapper(this, std::move(*message));
620   bool processed = tasks_.empty() && ProcessIncomingMessage(
621                                          &message_wrapper, client_call_behavior,
622                                          connector_.task_runner());
623 
624   if (!processed) {
625     // Either the task queue is not empty or we cannot process the message
626     // directly. In both cases, there is no need to call ProcessTasks().
627     tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
628     Task* task = tasks_.back().get();
629 
630     if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
631       InterfaceId id = task->message_wrapper.value().interface_id();
632       sync_message_tasks_[id].push_back(task);
633       InterfaceEndpoint* endpoint = FindEndpoint(id);
634       if (endpoint)
635         endpoint->SignalSyncMessageEvent();
636     }
637   } else if (!tasks_.empty()) {
638     // Processing the message may result in new tasks (for error notification)
639     // being added to the queue. In this case, we have to attempt to process the
640     // tasks.
641     ProcessTasks(client_call_behavior, connector_.task_runner());
642   }
643 
644   // Always return true. If we see errors during message processing, we will
645   // explicitly call Connector::RaiseError() to disconnect the message pipe.
646   return true;
647 }
648 
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)649 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
650     InterfaceId id,
651     const base::Optional<DisconnectReason>& reason) {
652   MayAutoLock locker(&lock_);
653   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
654 
655   if (reason)
656     endpoint->set_disconnect_reason(reason);
657 
658   // It is possible that this endpoint has been set as peer closed. That is
659   // because when the message pipe is closed, all the endpoints are updated with
660   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
661   // as long as there are refs keeping the router alive. If there is a
662   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
663   // here and see that the endpoint has been marked as peer closed.
664   if (!endpoint->peer_closed()) {
665     if (endpoint->client())
666       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
667     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
668   }
669 
670   // No need to trigger a ProcessTasks() because it is already on the stack.
671 
672   return true;
673 }
674 
WaitForFlushToComplete(ScopedMessagePipeHandle pipe)675 bool MultiplexRouter::WaitForFlushToComplete(ScopedMessagePipeHandle pipe) {
676   // If this MultiplexRouter has an associated interface on some task runner
677   // other than the master interface's task runner, it is possible to process
678   // incoming control messages on that task runner. We don't support this
679   // control message on anything but the main interface though.
680   if (!task_runner_->RunsTasksInCurrentSequence())
681     return false;
682 
683   flush_pipe_watcher_.emplace(FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL,
684                               task_runner_);
685   flush_pipe_watcher_->Watch(
686       pipe.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
687       MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
688       base::BindRepeating(&MultiplexRouter::OnFlushPipeSignaled, this));
689   if (flush_pipe_watcher_->Arm() != MOJO_RESULT_OK) {
690     // The peer must already be closed, so consider the flush to be complete.
691     flush_pipe_watcher_.reset();
692     return true;
693   }
694 
695   active_flush_pipe_ = std::move(pipe);
696   PauseInternal(/*must_resume_manually=*/false);
697   return true;
698 }
699 
OnPipeConnectionError(bool force_async_dispatch)700 void MultiplexRouter::OnPipeConnectionError(bool force_async_dispatch) {
701   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
702 
703   scoped_refptr<MultiplexRouter> protector(this);
704   MayAutoLock locker(&lock_);
705 
706   encountered_error_ = true;
707 
708   // Calling UpdateEndpointStateMayRemove() may remove the corresponding value
709   // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore,
710   // copy the endpoint pointers to a vector and iterate over it instead.
711   std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector;
712   endpoint_vector.reserve(endpoints_.size());
713   for (const auto& pair : endpoints_)
714     endpoint_vector.push_back(pair.second);
715 
716   for (const auto& endpoint : endpoint_vector) {
717     if (endpoint->client())
718       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get()));
719 
720     UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED);
721   }
722 
723   ClientCallBehavior call_behavior = ALLOW_DIRECT_CLIENT_CALLS;
724   if (force_async_dispatch)
725     call_behavior = NO_DIRECT_CLIENT_CALLS;
726   else if (connector_.during_sync_handle_watcher_callback())
727     call_behavior = ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES;
728 
729   ProcessTasks(call_behavior, connector_.task_runner());
730 }
731 
OnFlushPipeSignaled(MojoResult result,const HandleSignalsState & state)732 void MultiplexRouter::OnFlushPipeSignaled(MojoResult result,
733                                           const HandleSignalsState& state) {
734   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
735   flush_pipe_watcher_.reset();
736   active_flush_pipe_.reset();
737 
738   // If there is not an explicit Pause waiting for a Resume, we can unpause.
739   if (!must_resume_manually_)
740     ResumeIncomingMethodCallProcessing();
741 }
742 
PauseInternal(bool must_resume_manually)743 void MultiplexRouter::PauseInternal(bool must_resume_manually) {
744   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
745 
746   connector_.PauseIncomingMethodCallProcessing();
747 
748   MayAutoLock locker(&lock_);
749 
750   paused_ = true;
751   for (auto& entry : endpoints_)
752     entry.second->ResetSyncMessageSignal();
753 
754   // We do not want to override this to |false| if it's already |true|. If it's
755   // ever |true|, that means there's been at least one explicit Pause call since
756   // the last Resume and we must never unpause until at least one call to Resume
757   // is made.
758   must_resume_manually_ = must_resume_manually_ || must_resume_manually;
759 }
760 
ProcessTasks(ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)761 void MultiplexRouter::ProcessTasks(
762     ClientCallBehavior client_call_behavior,
763     base::SequencedTaskRunner* current_task_runner) {
764   AssertLockAcquired();
765 
766   if (posted_to_process_tasks_)
767     return;
768 
769   while (!tasks_.empty() && !paused_) {
770     std::unique_ptr<Task> task(std::move(tasks_.front()));
771     tasks_.pop_front();
772 
773     InterfaceId id = kInvalidInterfaceId;
774     bool sync_message =
775         task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
776         task->message_wrapper.value().has_flag(Message::kFlagIsSync);
777     if (sync_message) {
778       id = task->message_wrapper.value().interface_id();
779       auto& sync_message_queue = sync_message_tasks_[id];
780       DCHECK_EQ(task.get(), sync_message_queue.front());
781       sync_message_queue.pop_front();
782     }
783 
784     bool processed =
785         task->IsNotifyErrorTask()
786             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
787                                      current_task_runner)
788             : ProcessIncomingMessage(&task->message_wrapper,
789                                      client_call_behavior, current_task_runner);
790 
791     if (!processed) {
792       if (sync_message) {
793         auto& sync_message_queue = sync_message_tasks_[id];
794         sync_message_queue.push_front(task.get());
795       }
796       tasks_.push_front(std::move(task));
797       break;
798     } else {
799       if (sync_message) {
800         auto iter = sync_message_tasks_.find(id);
801         if (iter != sync_message_tasks_.end() && iter->second.empty())
802           sync_message_tasks_.erase(iter);
803       }
804     }
805   }
806 }
807 
ProcessFirstSyncMessageForEndpoint(InterfaceId id)808 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
809   AssertLockAcquired();
810 
811   auto iter = sync_message_tasks_.find(id);
812   if (iter == sync_message_tasks_.end())
813     return false;
814 
815   if (paused_)
816     return true;
817 
818   MultiplexRouter::Task* task = iter->second.front();
819   iter->second.pop_front();
820 
821   DCHECK(task->IsMessageTask());
822   MessageWrapper message_wrapper = std::move(task->message_wrapper);
823 
824   // Note: after this call, |task| and |iter| may be invalidated.
825   bool processed = ProcessIncomingMessage(
826       &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
827   DCHECK(processed);
828 
829   iter = sync_message_tasks_.find(id);
830   if (iter == sync_message_tasks_.end())
831     return false;
832 
833   if (iter->second.empty()) {
834     sync_message_tasks_.erase(iter);
835     return false;
836   }
837 
838   return true;
839 }
840 
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)841 bool MultiplexRouter::ProcessNotifyErrorTask(
842     Task* task,
843     ClientCallBehavior client_call_behavior,
844     base::SequencedTaskRunner* current_task_runner) {
845   DCHECK(!current_task_runner ||
846          current_task_runner->RunsTasksInCurrentSequence());
847   DCHECK(!paused_);
848 
849   AssertLockAcquired();
850   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
851   if (!endpoint->client())
852     return true;
853 
854   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
855       endpoint->task_runner() != current_task_runner) {
856     MaybePostToProcessTasks(endpoint->task_runner());
857     return false;
858   }
859 
860   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
861 
862   InterfaceEndpointClient* client = endpoint->client();
863   base::Optional<DisconnectReason> disconnect_reason(
864       endpoint->disconnect_reason());
865 
866   {
867     // We must unlock before calling into |client| because it may call this
868     // object within NotifyError(). Holding the lock will lead to deadlock.
869     //
870     // It is safe to call into |client| without the lock. Because |client| is
871     // always accessed on the same sequence, including DetachEndpointClient().
872     MayAutoUnlock unlocker(&lock_);
873     client->NotifyError(disconnect_reason);
874   }
875   return true;
876 }
877 
ProcessIncomingMessage(MessageWrapper * message_wrapper,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)878 bool MultiplexRouter::ProcessIncomingMessage(
879     MessageWrapper* message_wrapper,
880     ClientCallBehavior client_call_behavior,
881     base::SequencedTaskRunner* current_task_runner) {
882   DCHECK(!current_task_runner ||
883          current_task_runner->RunsTasksInCurrentSequence());
884   DCHECK(!paused_);
885   DCHECK(message_wrapper);
886   AssertLockAcquired();
887 
888   const Message* message = &message_wrapper->value();
889   if (message->IsNull()) {
890     // This is a sync message and has been processed during sync handle
891     // watching.
892     return true;
893   }
894 
895   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
896     bool result = false;
897 
898     {
899       MayAutoUnlock unlocker(&lock_);
900       Message tmp_message =
901           message_wrapper->DeserializeEndpointHandlesAndTake();
902       result = !tmp_message.IsNull() &&
903                control_message_handler_.Accept(&tmp_message);
904     }
905 
906     if (!result)
907       RaiseErrorInNonTestingMode();
908 
909     return true;
910   }
911 
912   InterfaceId id = message->interface_id();
913   DCHECK(IsValidInterfaceId(id));
914 
915   InterfaceEndpoint* endpoint = FindEndpoint(id);
916   if (!endpoint || endpoint->closed())
917     return true;
918 
919   if (!endpoint->client()) {
920     // We need to wait until a client is attached in order to dispatch further
921     // messages.
922     return false;
923   }
924 
925   bool can_direct_call;
926   if (message->has_flag(Message::kFlagIsSync)) {
927     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
928                       endpoint->task_runner()->RunsTasksInCurrentSequence();
929   } else {
930     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
931                       endpoint->task_runner() == current_task_runner;
932   }
933 
934   if (!can_direct_call) {
935     MaybePostToProcessTasks(endpoint->task_runner());
936     return false;
937   }
938 
939   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
940 
941   InterfaceEndpointClient* client = endpoint->client();
942   bool result = false;
943   {
944     // We must unlock before calling into |client| because it may call this
945     // object within HandleIncomingMessage(). Holding the lock will lead to
946     // deadlock.
947     //
948     // It is safe to call into |client| without the lock. Because |client| is
949     // always accessed on the same sequence, including DetachEndpointClient().
950     MayAutoUnlock unlocker(&lock_);
951     Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake();
952     result =
953         !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message);
954   }
955   if (!result)
956     RaiseErrorInNonTestingMode();
957 
958   return true;
959 }
960 
MaybePostToProcessTasks(base::SequencedTaskRunner * task_runner)961 void MultiplexRouter::MaybePostToProcessTasks(
962     base::SequencedTaskRunner* task_runner) {
963   AssertLockAcquired();
964   if (posted_to_process_tasks_)
965     return;
966 
967   posted_to_process_tasks_ = true;
968   posted_to_task_runner_ = task_runner;
969   task_runner->PostTask(
970       FROM_HERE,
971       base::BindOnce(&MultiplexRouter::LockAndCallProcessTasks, this));
972 }
973 
LockAndCallProcessTasks()974 void MultiplexRouter::LockAndCallProcessTasks() {
975   // There is no need to hold a ref to this class in this case because this is
976   // always called from a bound callback, which holds a ref.
977   MayAutoLock locker(&lock_);
978   posted_to_process_tasks_ = false;
979   scoped_refptr<base::SequencedTaskRunner> runner(
980       std::move(posted_to_task_runner_));
981   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
982 }
983 
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)984 void MultiplexRouter::UpdateEndpointStateMayRemove(
985     InterfaceEndpoint* endpoint,
986     EndpointStateUpdateType type) {
987   if (type == ENDPOINT_CLOSED) {
988     endpoint->set_closed();
989   } else {
990     endpoint->set_peer_closed();
991     // If the interface endpoint is performing a sync watch, this makes sure
992     // it is notified and eventually exits the sync watch.
993     endpoint->SignalSyncMessageEvent();
994   }
995   if (endpoint->closed() && endpoint->peer_closed())
996     endpoints_.erase(endpoint->id());
997 }
998 
RaiseErrorInNonTestingMode()999 void MultiplexRouter::RaiseErrorInNonTestingMode() {
1000   AssertLockAcquired();
1001   if (!testing_mode_)
1002     RaiseError();
1003 }
1004 
FindOrInsertEndpoint(InterfaceId id,bool * inserted)1005 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
1006     InterfaceId id,
1007     bool* inserted) {
1008   AssertLockAcquired();
1009   // Either |inserted| is nullptr or it points to a boolean initialized as
1010   // false.
1011   DCHECK(!inserted || !*inserted);
1012 
1013   InterfaceEndpoint* endpoint = FindEndpoint(id);
1014   if (!endpoint) {
1015     endpoint = new InterfaceEndpoint(this, id);
1016     endpoints_[id] = endpoint;
1017     if (inserted)
1018       *inserted = true;
1019   }
1020 
1021   return endpoint;
1022 }
1023 
FindEndpoint(InterfaceId id)1024 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
1025     InterfaceId id) {
1026   AssertLockAcquired();
1027   auto iter = endpoints_.find(id);
1028   return iter != endpoints_.end() ? iter->second.get() : nullptr;
1029 }
1030 
AssertLockAcquired()1031 void MultiplexRouter::AssertLockAcquired() {
1032 #if DCHECK_IS_ON()
1033   if (lock_)
1034     lock_->AssertAcquired();
1035 #endif
1036 }
1037 
InsertEndpointsForMessage(const Message & message)1038 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) {
1039   if (!message.is_serialized())
1040     return true;
1041 
1042   uint32_t num_ids = message.payload_num_interface_ids();
1043   if (num_ids == 0)
1044     return true;
1045 
1046   const uint32_t* ids = message.payload_interface_ids();
1047 
1048   MayAutoLock locker(&lock_);
1049   for (uint32_t i = 0; i < num_ids; ++i) {
1050     // Message header validation already ensures that the IDs are valid and not
1051     // the master ID.
1052     // The IDs are from the remote side and therefore their namespace bit is
1053     // supposed to be different than the value that this router would use.
1054     if (set_interface_id_namespace_bit_ ==
1055         HasInterfaceIdNamespaceBitSet(ids[i])) {
1056       return false;
1057     }
1058 
1059     // It is possible that the endpoint already exists even when the remote side
1060     // is well-behaved: it might have notified us that the peer endpoint has
1061     // closed.
1062     bool inserted = false;
1063     InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted);
1064     if (endpoint->closed() || endpoint->handle_created())
1065       return false;
1066   }
1067 
1068   return true;
1069 }
1070 
CloseEndpointsForMessage(const Message & message)1071 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) {
1072   AssertLockAcquired();
1073 
1074   if (!message.is_serialized())
1075     return;
1076 
1077   uint32_t num_ids = message.payload_num_interface_ids();
1078   if (num_ids == 0)
1079     return;
1080 
1081   const uint32_t* ids = message.payload_interface_ids();
1082   for (uint32_t i = 0; i < num_ids; ++i) {
1083     InterfaceEndpoint* endpoint = FindEndpoint(ids[i]);
1084     // If the remote side maliciously sends the same interface ID in another
1085     // message which has been dispatched, we could get here with no endpoint
1086     // for the ID, a closed endpoint, or an endpoint with handle created.
1087     if (!endpoint || endpoint->closed() || endpoint->handle_created()) {
1088       RaiseErrorInNonTestingMode();
1089       continue;
1090     }
1091 
1092     UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
1093     MayAutoUnlock unlocker(&lock_);
1094     control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt);
1095   }
1096 
1097   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
1098 }
1099 
1100 }  // namespace internal
1101 }  // namespace mojo
1102