1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
22 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
23
24 namespace mojo {
25 namespace internal {
26
27 // InterfaceEndpoint stores the information of an interface endpoint registered
28 // with the router.
29 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
30 // this object.
31 class MultiplexRouter::InterfaceEndpoint
32 : public base::RefCountedThreadSafe<InterfaceEndpoint>,
33 public InterfaceEndpointController {
34 public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
36 : router_(router),
37 id_(id),
38 closed_(false),
39 peer_closed_(false),
40 handle_created_(false),
41 client_(nullptr) {}
42
43 // ---------------------------------------------------------------------------
44 // The following public methods are safe to call from any sequence without
45 // locking.
46
id() const47 InterfaceId id() const { return id_; }
48
49 // ---------------------------------------------------------------------------
50 // The following public methods are called under the router's lock.
51
closed() const52 bool closed() const { return closed_; }
set_closed()53 void set_closed() {
54 router_->AssertLockAcquired();
55 closed_ = true;
56 }
57
peer_closed() const58 bool peer_closed() const { return peer_closed_; }
set_peer_closed()59 void set_peer_closed() {
60 router_->AssertLockAcquired();
61 peer_closed_ = true;
62 }
63
handle_created() const64 bool handle_created() const { return handle_created_; }
set_handle_created()65 void set_handle_created() {
66 router_->AssertLockAcquired();
67 handle_created_ = true;
68 }
69
disconnect_reason() const70 const base::Optional<DisconnectReason>& disconnect_reason() const {
71 return disconnect_reason_;
72 }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)73 void set_disconnect_reason(
74 const base::Optional<DisconnectReason>& disconnect_reason) {
75 router_->AssertLockAcquired();
76 disconnect_reason_ = disconnect_reason;
77 }
78
task_runner() const79 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
80
client() const81 InterfaceEndpointClient* client() const { return client_; }
82
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)83 void AttachClient(InterfaceEndpointClient* client,
84 scoped_refptr<base::SequencedTaskRunner> runner) {
85 router_->AssertLockAcquired();
86 DCHECK(!client_);
87 DCHECK(!closed_);
88 DCHECK(runner->RunsTasksInCurrentSequence());
89
90 task_runner_ = std::move(runner);
91 client_ = client;
92 }
93
94 // This method must be called on the same sequence as the corresponding
95 // AttachClient() call.
DetachClient()96 void DetachClient() {
97 router_->AssertLockAcquired();
98 DCHECK(client_);
99 DCHECK(task_runner_->RunsTasksInCurrentSequence());
100 DCHECK(!closed_);
101
102 task_runner_ = nullptr;
103 client_ = nullptr;
104 sync_watcher_.reset();
105 }
106
SignalSyncMessageEvent()107 void SignalSyncMessageEvent() {
108 router_->AssertLockAcquired();
109 if (sync_message_event_signaled_)
110 return;
111 sync_message_event_signaled_ = true;
112 if (sync_watcher_)
113 sync_watcher_->SignalEvent();
114 }
115
ResetSyncMessageSignal()116 void ResetSyncMessageSignal() {
117 router_->AssertLockAcquired();
118 if (!sync_message_event_signaled_)
119 return;
120 sync_message_event_signaled_ = false;
121 if (sync_watcher_)
122 sync_watcher_->ResetEvent();
123 }
124
125 // ---------------------------------------------------------------------------
126 // The following public methods (i.e., InterfaceEndpointController
127 // implementation) are called by the client on the same sequence as the
128 // AttachClient() call. They are called outside of the router's lock.
129
SendMessage(Message * message)130 bool SendMessage(Message* message) override {
131 DCHECK(task_runner_->RunsTasksInCurrentSequence());
132 message->set_interface_id(id_);
133 return router_->connector_.Accept(message);
134 }
135
AllowWokenUpBySyncWatchOnSameThread()136 void AllowWokenUpBySyncWatchOnSameThread() override {
137 DCHECK(task_runner_->RunsTasksInCurrentSequence());
138
139 EnsureSyncWatcherExists();
140 sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
141 }
142
SyncWatch(const bool * should_stop)143 bool SyncWatch(const bool* should_stop) override {
144 DCHECK(task_runner_->RunsTasksInCurrentSequence());
145
146 EnsureSyncWatcherExists();
147 return sync_watcher_->SyncWatch(should_stop);
148 }
149
150 private:
151 friend class base::RefCountedThreadSafe<InterfaceEndpoint>;
152
~InterfaceEndpoint()153 ~InterfaceEndpoint() override {
154 router_->AssertLockAcquired();
155
156 DCHECK(!client_);
157 }
158
OnSyncEventSignaled()159 void OnSyncEventSignaled() {
160 DCHECK(task_runner_->RunsTasksInCurrentSequence());
161 scoped_refptr<MultiplexRouter> router_protector(router_);
162
163 MayAutoLock locker(&router_->lock_);
164 scoped_refptr<InterfaceEndpoint> self_protector(this);
165
166 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
167
168 if (!more_to_process)
169 ResetSyncMessageSignal();
170
171 // Currently there are no queued sync messages and the peer has closed so
172 // there won't be incoming sync messages in the future.
173 if (!more_to_process && peer_closed_) {
174 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
175 // on the call stack, resetting the sync watcher will allow it to exit
176 // when the call stack unwinds to that frame.
177 sync_watcher_.reset();
178 }
179 }
180
EnsureSyncWatcherExists()181 void EnsureSyncWatcherExists() {
182 DCHECK(task_runner_->RunsTasksInCurrentSequence());
183 if (sync_watcher_)
184 return;
185
186 MayAutoLock locker(&router_->lock_);
187 sync_watcher_ =
188 std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating(
189 &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this)));
190 if (sync_message_event_signaled_)
191 sync_watcher_->SignalEvent();
192 }
193
194 // ---------------------------------------------------------------------------
195 // The following members are safe to access from any sequence.
196
197 MultiplexRouter* const router_;
198 const InterfaceId id_;
199
200 // ---------------------------------------------------------------------------
201 // The following members are accessed under the router's lock.
202
203 // Whether the endpoint has been closed.
204 bool closed_;
205 // Whether the peer endpoint has been closed.
206 bool peer_closed_;
207
208 // Whether there is already a ScopedInterfaceEndpointHandle created for this
209 // endpoint.
210 bool handle_created_;
211
212 base::Optional<DisconnectReason> disconnect_reason_;
213
214 // The task runner on which |client_|'s methods can be called.
215 scoped_refptr<base::SequencedTaskRunner> task_runner_;
216 // Not owned. It is null if no client is attached to this endpoint.
217 InterfaceEndpointClient* client_;
218
219 // Indicates whether the sync watcher should be signaled for this endpoint.
220 bool sync_message_event_signaled_ = false;
221
222 // Guarded by the router's lock. Used to synchronously wait on replies.
223 std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_;
224
225 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
226 };
227
228 // MessageWrapper objects are always destroyed under the router's lock. On
229 // destruction, if the message it wrappers contains interface IDs, the wrapper
230 // closes the corresponding endpoints.
231 class MultiplexRouter::MessageWrapper {
232 public:
233 MessageWrapper() = default;
234
MessageWrapper(MultiplexRouter * router,Message message)235 MessageWrapper(MultiplexRouter* router, Message message)
236 : router_(router), value_(std::move(message)) {}
237
MessageWrapper(MessageWrapper && other)238 MessageWrapper(MessageWrapper&& other)
239 : router_(other.router_), value_(std::move(other.value_)) {}
240
~MessageWrapper()241 ~MessageWrapper() {
242 if (!router_ || value_.IsNull())
243 return;
244
245 router_->AssertLockAcquired();
246 // Don't try to close the endpoints if at this point the router is already
247 // half-destructed.
248 if (!router_->being_destructed_)
249 router_->CloseEndpointsForMessage(value_);
250 }
251
operator =(MessageWrapper && other)252 MessageWrapper& operator=(MessageWrapper&& other) {
253 router_ = other.router_;
254 value_ = std::move(other.value_);
255 return *this;
256 }
257
value() const258 const Message& value() const { return value_; }
259
260 // Must be called outside of the router's lock.
261 // Returns a null message if it fails to deseralize the associated endpoint
262 // handles.
DeserializeEndpointHandlesAndTake()263 Message DeserializeEndpointHandlesAndTake() {
264 if (!value_.DeserializeAssociatedEndpointHandles(router_)) {
265 // The previous call may have deserialized part of the associated
266 // interface endpoint handles. They must be destroyed outside of the
267 // router's lock, so we cannot wait until destruction of MessageWrapper.
268 value_.Reset();
269 return Message();
270 }
271 return std::move(value_);
272 }
273
274 private:
275 MultiplexRouter* router_ = nullptr;
276 Message value_;
277
278 DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
279 };
280
281 struct MultiplexRouter::Task {
282 public:
283 // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task284 static std::unique_ptr<Task> CreateMessageTask(
285 MessageWrapper message_wrapper) {
286 Task* task = new Task(MESSAGE);
287 task->message_wrapper = std::move(message_wrapper);
288 return base::WrapUnique(task);
289 }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task290 static std::unique_ptr<Task> CreateNotifyErrorTask(
291 InterfaceEndpoint* endpoint) {
292 Task* task = new Task(NOTIFY_ERROR);
293 task->endpoint_to_notify = endpoint;
294 return base::WrapUnique(task);
295 }
296
~Taskmojo::internal::MultiplexRouter::Task297 ~Task() {}
298
IsMessageTaskmojo::internal::MultiplexRouter::Task299 bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task300 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
301
302 MessageWrapper message_wrapper;
303 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
304
305 enum Type { MESSAGE, NOTIFY_ERROR };
306 Type type;
307
308 private:
Taskmojo::internal::MultiplexRouter::Task309 explicit Task(Type in_type) : type(in_type) {}
310
311 DISALLOW_COPY_AND_ASSIGN(Task);
312 };
313
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namespace_bit,scoped_refptr<base::SequencedTaskRunner> runner)314 MultiplexRouter::MultiplexRouter(
315 ScopedMessagePipeHandle message_pipe,
316 Config config,
317 bool set_interface_id_namespace_bit,
318 scoped_refptr<base::SequencedTaskRunner> runner)
319 : set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
320 task_runner_(runner),
321 dispatcher_(this),
322 connector_(std::move(message_pipe),
323 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
324 : Connector::SINGLE_THREADED_SEND,
325 std::move(runner)),
326 control_message_handler_(this),
327 control_message_proxy_(&connector_) {
328 DCHECK(task_runner_->RunsTasksInCurrentSequence());
329
330 if (config == MULTI_INTERFACE)
331 lock_.emplace();
332
333 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
334 config == MULTI_INTERFACE) {
335 // Always participate in sync handle watching in multi-interface mode,
336 // because even if it doesn't expect sync requests during sync handle
337 // watching, it may still need to dispatch messages to associated endpoints
338 // on a different sequence.
339 connector_.AllowWokenUpBySyncWatchOnSameThread();
340 }
341 connector_.set_incoming_receiver(&dispatcher_);
342 connector_.set_connection_error_handler(
343 base::BindOnce(&MultiplexRouter::OnPipeConnectionError,
344 base::Unretained(this), false /* force_async_dispatch */));
345
346 scoped_refptr<internal::MessageQuotaChecker> quota_checker =
347 internal::MessageQuotaChecker::MaybeCreate();
348 if (quota_checker)
349 connector_.SetMessageQuotaChecker(std::move(quota_checker));
350
351 std::unique_ptr<MessageHeaderValidator> header_validator =
352 std::make_unique<MessageHeaderValidator>();
353 header_validator_ = header_validator.get();
354 dispatcher_.SetValidator(std::move(header_validator));
355 }
356
~MultiplexRouter()357 MultiplexRouter::~MultiplexRouter() {
358 MayAutoLock locker(&lock_);
359
360 being_destructed_ = true;
361
362 sync_message_tasks_.clear();
363 tasks_.clear();
364 endpoints_.clear();
365 }
366
SetIncomingMessageFilter(std::unique_ptr<MessageFilter> filter)367 void MultiplexRouter::SetIncomingMessageFilter(
368 std::unique_ptr<MessageFilter> filter) {
369 dispatcher_.SetFilter(std::move(filter));
370 }
371
SetMasterInterfaceName(const char * name)372 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
373 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
374 header_validator_->SetDescription(std::string(name) +
375 " [master] MessageHeaderValidator");
376 control_message_handler_.SetDescription(
377 std::string(name) + " [master] PipeControlMessageHandler");
378 connector_.SetWatcherHeapProfilerTag(name);
379 }
380
SetConnectionGroup(ConnectionGroup::Ref ref)381 void MultiplexRouter::SetConnectionGroup(ConnectionGroup::Ref ref) {
382 connector_.SetConnectionGroup(std::move(ref));
383 }
384
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)385 InterfaceId MultiplexRouter::AssociateInterface(
386 ScopedInterfaceEndpointHandle handle_to_send) {
387 if (!handle_to_send.pending_association())
388 return kInvalidInterfaceId;
389
390 uint32_t id = 0;
391 {
392 MayAutoLock locker(&lock_);
393 do {
394 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
395 next_interface_id_value_ = 1;
396 id = next_interface_id_value_++;
397 if (set_interface_id_namespace_bit_)
398 id |= kInterfaceIdNamespaceMask;
399 } while (base::Contains(endpoints_, id));
400
401 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
402 endpoints_[id] = endpoint;
403 if (encountered_error_)
404 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
405 endpoint->set_handle_created();
406 }
407
408 if (!NotifyAssociation(&handle_to_send, id)) {
409 // The peer handle of |handle_to_send|, which is supposed to join this
410 // associated group, has been closed.
411 {
412 MayAutoLock locker(&lock_);
413 InterfaceEndpoint* endpoint = FindEndpoint(id);
414 if (endpoint)
415 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
416 }
417
418 control_message_proxy_.NotifyPeerEndpointClosed(
419 id, handle_to_send.disconnect_reason());
420 }
421 return id;
422 }
423
CreateLocalEndpointHandle(InterfaceId id)424 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
425 InterfaceId id) {
426 if (!IsValidInterfaceId(id))
427 return ScopedInterfaceEndpointHandle();
428
429 MayAutoLock locker(&lock_);
430 bool inserted = false;
431 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
432 if (inserted) {
433 if (encountered_error_)
434 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
435 } else {
436 if (endpoint->handle_created() || endpoint->closed())
437 return ScopedInterfaceEndpointHandle();
438 }
439
440 endpoint->set_handle_created();
441 return CreateScopedInterfaceEndpointHandle(id);
442 }
443
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)444 void MultiplexRouter::CloseEndpointHandle(
445 InterfaceId id,
446 const base::Optional<DisconnectReason>& reason) {
447 if (!IsValidInterfaceId(id))
448 return;
449
450 MayAutoLock locker(&lock_);
451 DCHECK(base::Contains(endpoints_, id));
452 InterfaceEndpoint* endpoint = endpoints_[id].get();
453 DCHECK(!endpoint->client());
454 DCHECK(!endpoint->closed());
455 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
456
457 if (!IsMasterInterfaceId(id) || reason) {
458 MayAutoUnlock unlocker(&lock_);
459 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
460 }
461
462 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
463 }
464
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)465 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
466 const ScopedInterfaceEndpointHandle& handle,
467 InterfaceEndpointClient* client,
468 scoped_refptr<base::SequencedTaskRunner> runner) {
469 const InterfaceId id = handle.id();
470
471 DCHECK(IsValidInterfaceId(id));
472 DCHECK(client);
473
474 MayAutoLock locker(&lock_);
475 DCHECK(base::Contains(endpoints_, id));
476
477 InterfaceEndpoint* endpoint = endpoints_[id].get();
478 endpoint->AttachClient(client, std::move(runner));
479
480 if (endpoint->peer_closed())
481 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
482 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
483
484 return endpoint;
485 }
486
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)487 void MultiplexRouter::DetachEndpointClient(
488 const ScopedInterfaceEndpointHandle& handle) {
489 const InterfaceId id = handle.id();
490
491 DCHECK(IsValidInterfaceId(id));
492
493 MayAutoLock locker(&lock_);
494 DCHECK(base::Contains(endpoints_, id));
495
496 InterfaceEndpoint* endpoint = endpoints_[id].get();
497 endpoint->DetachClient();
498 }
499
RaiseError()500 void MultiplexRouter::RaiseError() {
501 if (task_runner_->RunsTasksInCurrentSequence()) {
502 connector_.RaiseError();
503 } else {
504 task_runner_->PostTask(FROM_HERE,
505 base::BindOnce(&MultiplexRouter::RaiseError, this));
506 }
507 }
508
PrefersSerializedMessages()509 bool MultiplexRouter::PrefersSerializedMessages() {
510 MayAutoLock locker(&lock_);
511 return connector_.PrefersSerializedMessages();
512 }
513
CloseMessagePipe()514 void MultiplexRouter::CloseMessagePipe() {
515 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
516 connector_.CloseMessagePipe();
517 flush_pipe_watcher_.reset();
518 active_flush_pipe_.reset();
519 // CloseMessagePipe() above won't trigger connection error handler.
520 // Explicitly call OnPipeConnectionError() so that associated endpoints will
521 // get notified.
522 OnPipeConnectionError(true /* force_async_dispatch */);
523 }
524
PauseIncomingMethodCallProcessing()525 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
526 PauseInternal(/*must_resume_manually=*/true);
527 }
528
ResumeIncomingMethodCallProcessing()529 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
530 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
531 // If the owner is manually resuming from a previous pause request, the
532 // interface may also still be paused due to waiting on a pending async flush
533 // in the system.
534 //
535 // In that case we ignore the caller, except to subsequently allow implicit
536 // resume once the pending flush operation is finished.
537 if (active_flush_pipe_) {
538 MayAutoLock locker(&lock_);
539 must_resume_manually_ = false;
540 return;
541 }
542
543 connector_.ResumeIncomingMethodCallProcessing();
544
545 MayAutoLock locker(&lock_);
546 paused_ = false;
547 must_resume_manually_ = false;
548
549 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
550 auto sync_iter = sync_message_tasks_.find(iter->first);
551 if (iter->second->peer_closed() ||
552 (sync_iter != sync_message_tasks_.end() &&
553 !sync_iter->second.empty())) {
554 iter->second->SignalSyncMessageEvent();
555 }
556 }
557
558 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
559 }
560
FlushAsync(AsyncFlusher flusher)561 void MultiplexRouter::FlushAsync(AsyncFlusher flusher) {
562 control_message_proxy_.FlushAsync(std::move(flusher));
563 }
564
PausePeerUntilFlushCompletes(PendingFlush flush)565 void MultiplexRouter::PausePeerUntilFlushCompletes(PendingFlush flush) {
566 control_message_proxy_.PausePeerUntilFlushCompletes(std::move(flush));
567 }
568
HasAssociatedEndpoints() const569 bool MultiplexRouter::HasAssociatedEndpoints() const {
570 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
571 MayAutoLock locker(&lock_);
572
573 if (endpoints_.size() > 1)
574 return true;
575 if (endpoints_.size() == 0)
576 return false;
577
578 return !base::Contains(endpoints_, kMasterInterfaceId);
579 }
580
EnableBatchDispatch()581 void MultiplexRouter::EnableBatchDispatch() {
582 connector_.set_force_immediate_dispatch(true);
583 }
584
EnableTestingMode()585 void MultiplexRouter::EnableTestingMode() {
586 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
587 MayAutoLock locker(&lock_);
588
589 testing_mode_ = true;
590 connector_.set_enforce_errors_from_incoming_receiver(false);
591 }
592
Accept(Message * message)593 bool MultiplexRouter::Accept(Message* message) {
594 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
595
596 // Insert endpoints for the payload interface IDs as soon as the message
597 // arrives, instead of waiting till the message is dispatched. Consider the
598 // following sequence:
599 // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not
600 // dispatched because a sync call is blocking the thread.
601 // 2) Sync message msg2 arrives targeting interface ID x.
602 //
603 // If we don't insert endpoint for interface ID x, when trying to dispatch
604 // msg2 we don't know whether it is an unexpected message or it is just
605 // because the message containing x hasn't been dispatched.
606 if (!InsertEndpointsForMessage(*message))
607 return false;
608
609 scoped_refptr<MultiplexRouter> protector(this);
610 MayAutoLock locker(&lock_);
611
612 DCHECK(!paused_);
613
614 ClientCallBehavior client_call_behavior =
615 connector_.during_sync_handle_watcher_callback()
616 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
617 : ALLOW_DIRECT_CLIENT_CALLS;
618
619 MessageWrapper message_wrapper(this, std::move(*message));
620 bool processed = tasks_.empty() && ProcessIncomingMessage(
621 &message_wrapper, client_call_behavior,
622 connector_.task_runner());
623
624 if (!processed) {
625 // Either the task queue is not empty or we cannot process the message
626 // directly. In both cases, there is no need to call ProcessTasks().
627 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
628 Task* task = tasks_.back().get();
629
630 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
631 InterfaceId id = task->message_wrapper.value().interface_id();
632 sync_message_tasks_[id].push_back(task);
633 InterfaceEndpoint* endpoint = FindEndpoint(id);
634 if (endpoint)
635 endpoint->SignalSyncMessageEvent();
636 }
637 } else if (!tasks_.empty()) {
638 // Processing the message may result in new tasks (for error notification)
639 // being added to the queue. In this case, we have to attempt to process the
640 // tasks.
641 ProcessTasks(client_call_behavior, connector_.task_runner());
642 }
643
644 // Always return true. If we see errors during message processing, we will
645 // explicitly call Connector::RaiseError() to disconnect the message pipe.
646 return true;
647 }
648
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)649 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
650 InterfaceId id,
651 const base::Optional<DisconnectReason>& reason) {
652 MayAutoLock locker(&lock_);
653 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
654
655 if (reason)
656 endpoint->set_disconnect_reason(reason);
657
658 // It is possible that this endpoint has been set as peer closed. That is
659 // because when the message pipe is closed, all the endpoints are updated with
660 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
661 // as long as there are refs keeping the router alive. If there is a
662 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
663 // here and see that the endpoint has been marked as peer closed.
664 if (!endpoint->peer_closed()) {
665 if (endpoint->client())
666 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
667 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
668 }
669
670 // No need to trigger a ProcessTasks() because it is already on the stack.
671
672 return true;
673 }
674
WaitForFlushToComplete(ScopedMessagePipeHandle pipe)675 bool MultiplexRouter::WaitForFlushToComplete(ScopedMessagePipeHandle pipe) {
676 // If this MultiplexRouter has an associated interface on some task runner
677 // other than the master interface's task runner, it is possible to process
678 // incoming control messages on that task runner. We don't support this
679 // control message on anything but the main interface though.
680 if (!task_runner_->RunsTasksInCurrentSequence())
681 return false;
682
683 flush_pipe_watcher_.emplace(FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL,
684 task_runner_);
685 flush_pipe_watcher_->Watch(
686 pipe.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
687 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
688 base::BindRepeating(&MultiplexRouter::OnFlushPipeSignaled, this));
689 if (flush_pipe_watcher_->Arm() != MOJO_RESULT_OK) {
690 // The peer must already be closed, so consider the flush to be complete.
691 flush_pipe_watcher_.reset();
692 return true;
693 }
694
695 active_flush_pipe_ = std::move(pipe);
696 PauseInternal(/*must_resume_manually=*/false);
697 return true;
698 }
699
OnPipeConnectionError(bool force_async_dispatch)700 void MultiplexRouter::OnPipeConnectionError(bool force_async_dispatch) {
701 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
702
703 scoped_refptr<MultiplexRouter> protector(this);
704 MayAutoLock locker(&lock_);
705
706 encountered_error_ = true;
707
708 // Calling UpdateEndpointStateMayRemove() may remove the corresponding value
709 // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore,
710 // copy the endpoint pointers to a vector and iterate over it instead.
711 std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector;
712 endpoint_vector.reserve(endpoints_.size());
713 for (const auto& pair : endpoints_)
714 endpoint_vector.push_back(pair.second);
715
716 for (const auto& endpoint : endpoint_vector) {
717 if (endpoint->client())
718 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get()));
719
720 UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED);
721 }
722
723 ClientCallBehavior call_behavior = ALLOW_DIRECT_CLIENT_CALLS;
724 if (force_async_dispatch)
725 call_behavior = NO_DIRECT_CLIENT_CALLS;
726 else if (connector_.during_sync_handle_watcher_callback())
727 call_behavior = ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES;
728
729 ProcessTasks(call_behavior, connector_.task_runner());
730 }
731
OnFlushPipeSignaled(MojoResult result,const HandleSignalsState & state)732 void MultiplexRouter::OnFlushPipeSignaled(MojoResult result,
733 const HandleSignalsState& state) {
734 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
735 flush_pipe_watcher_.reset();
736 active_flush_pipe_.reset();
737
738 // If there is not an explicit Pause waiting for a Resume, we can unpause.
739 if (!must_resume_manually_)
740 ResumeIncomingMethodCallProcessing();
741 }
742
PauseInternal(bool must_resume_manually)743 void MultiplexRouter::PauseInternal(bool must_resume_manually) {
744 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
745
746 connector_.PauseIncomingMethodCallProcessing();
747
748 MayAutoLock locker(&lock_);
749
750 paused_ = true;
751 for (auto& entry : endpoints_)
752 entry.second->ResetSyncMessageSignal();
753
754 // We do not want to override this to |false| if it's already |true|. If it's
755 // ever |true|, that means there's been at least one explicit Pause call since
756 // the last Resume and we must never unpause until at least one call to Resume
757 // is made.
758 must_resume_manually_ = must_resume_manually_ || must_resume_manually;
759 }
760
ProcessTasks(ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)761 void MultiplexRouter::ProcessTasks(
762 ClientCallBehavior client_call_behavior,
763 base::SequencedTaskRunner* current_task_runner) {
764 AssertLockAcquired();
765
766 if (posted_to_process_tasks_)
767 return;
768
769 while (!tasks_.empty() && !paused_) {
770 std::unique_ptr<Task> task(std::move(tasks_.front()));
771 tasks_.pop_front();
772
773 InterfaceId id = kInvalidInterfaceId;
774 bool sync_message =
775 task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
776 task->message_wrapper.value().has_flag(Message::kFlagIsSync);
777 if (sync_message) {
778 id = task->message_wrapper.value().interface_id();
779 auto& sync_message_queue = sync_message_tasks_[id];
780 DCHECK_EQ(task.get(), sync_message_queue.front());
781 sync_message_queue.pop_front();
782 }
783
784 bool processed =
785 task->IsNotifyErrorTask()
786 ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
787 current_task_runner)
788 : ProcessIncomingMessage(&task->message_wrapper,
789 client_call_behavior, current_task_runner);
790
791 if (!processed) {
792 if (sync_message) {
793 auto& sync_message_queue = sync_message_tasks_[id];
794 sync_message_queue.push_front(task.get());
795 }
796 tasks_.push_front(std::move(task));
797 break;
798 } else {
799 if (sync_message) {
800 auto iter = sync_message_tasks_.find(id);
801 if (iter != sync_message_tasks_.end() && iter->second.empty())
802 sync_message_tasks_.erase(iter);
803 }
804 }
805 }
806 }
807
ProcessFirstSyncMessageForEndpoint(InterfaceId id)808 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
809 AssertLockAcquired();
810
811 auto iter = sync_message_tasks_.find(id);
812 if (iter == sync_message_tasks_.end())
813 return false;
814
815 if (paused_)
816 return true;
817
818 MultiplexRouter::Task* task = iter->second.front();
819 iter->second.pop_front();
820
821 DCHECK(task->IsMessageTask());
822 MessageWrapper message_wrapper = std::move(task->message_wrapper);
823
824 // Note: after this call, |task| and |iter| may be invalidated.
825 bool processed = ProcessIncomingMessage(
826 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
827 DCHECK(processed);
828
829 iter = sync_message_tasks_.find(id);
830 if (iter == sync_message_tasks_.end())
831 return false;
832
833 if (iter->second.empty()) {
834 sync_message_tasks_.erase(iter);
835 return false;
836 }
837
838 return true;
839 }
840
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)841 bool MultiplexRouter::ProcessNotifyErrorTask(
842 Task* task,
843 ClientCallBehavior client_call_behavior,
844 base::SequencedTaskRunner* current_task_runner) {
845 DCHECK(!current_task_runner ||
846 current_task_runner->RunsTasksInCurrentSequence());
847 DCHECK(!paused_);
848
849 AssertLockAcquired();
850 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
851 if (!endpoint->client())
852 return true;
853
854 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
855 endpoint->task_runner() != current_task_runner) {
856 MaybePostToProcessTasks(endpoint->task_runner());
857 return false;
858 }
859
860 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
861
862 InterfaceEndpointClient* client = endpoint->client();
863 base::Optional<DisconnectReason> disconnect_reason(
864 endpoint->disconnect_reason());
865
866 {
867 // We must unlock before calling into |client| because it may call this
868 // object within NotifyError(). Holding the lock will lead to deadlock.
869 //
870 // It is safe to call into |client| without the lock. Because |client| is
871 // always accessed on the same sequence, including DetachEndpointClient().
872 MayAutoUnlock unlocker(&lock_);
873 client->NotifyError(disconnect_reason);
874 }
875 return true;
876 }
877
ProcessIncomingMessage(MessageWrapper * message_wrapper,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)878 bool MultiplexRouter::ProcessIncomingMessage(
879 MessageWrapper* message_wrapper,
880 ClientCallBehavior client_call_behavior,
881 base::SequencedTaskRunner* current_task_runner) {
882 DCHECK(!current_task_runner ||
883 current_task_runner->RunsTasksInCurrentSequence());
884 DCHECK(!paused_);
885 DCHECK(message_wrapper);
886 AssertLockAcquired();
887
888 const Message* message = &message_wrapper->value();
889 if (message->IsNull()) {
890 // This is a sync message and has been processed during sync handle
891 // watching.
892 return true;
893 }
894
895 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
896 bool result = false;
897
898 {
899 MayAutoUnlock unlocker(&lock_);
900 Message tmp_message =
901 message_wrapper->DeserializeEndpointHandlesAndTake();
902 result = !tmp_message.IsNull() &&
903 control_message_handler_.Accept(&tmp_message);
904 }
905
906 if (!result)
907 RaiseErrorInNonTestingMode();
908
909 return true;
910 }
911
912 InterfaceId id = message->interface_id();
913 DCHECK(IsValidInterfaceId(id));
914
915 InterfaceEndpoint* endpoint = FindEndpoint(id);
916 if (!endpoint || endpoint->closed())
917 return true;
918
919 if (!endpoint->client()) {
920 // We need to wait until a client is attached in order to dispatch further
921 // messages.
922 return false;
923 }
924
925 bool can_direct_call;
926 if (message->has_flag(Message::kFlagIsSync)) {
927 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
928 endpoint->task_runner()->RunsTasksInCurrentSequence();
929 } else {
930 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
931 endpoint->task_runner() == current_task_runner;
932 }
933
934 if (!can_direct_call) {
935 MaybePostToProcessTasks(endpoint->task_runner());
936 return false;
937 }
938
939 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
940
941 InterfaceEndpointClient* client = endpoint->client();
942 bool result = false;
943 {
944 // We must unlock before calling into |client| because it may call this
945 // object within HandleIncomingMessage(). Holding the lock will lead to
946 // deadlock.
947 //
948 // It is safe to call into |client| without the lock. Because |client| is
949 // always accessed on the same sequence, including DetachEndpointClient().
950 MayAutoUnlock unlocker(&lock_);
951 Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake();
952 result =
953 !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message);
954 }
955 if (!result)
956 RaiseErrorInNonTestingMode();
957
958 return true;
959 }
960
MaybePostToProcessTasks(base::SequencedTaskRunner * task_runner)961 void MultiplexRouter::MaybePostToProcessTasks(
962 base::SequencedTaskRunner* task_runner) {
963 AssertLockAcquired();
964 if (posted_to_process_tasks_)
965 return;
966
967 posted_to_process_tasks_ = true;
968 posted_to_task_runner_ = task_runner;
969 task_runner->PostTask(
970 FROM_HERE,
971 base::BindOnce(&MultiplexRouter::LockAndCallProcessTasks, this));
972 }
973
LockAndCallProcessTasks()974 void MultiplexRouter::LockAndCallProcessTasks() {
975 // There is no need to hold a ref to this class in this case because this is
976 // always called from a bound callback, which holds a ref.
977 MayAutoLock locker(&lock_);
978 posted_to_process_tasks_ = false;
979 scoped_refptr<base::SequencedTaskRunner> runner(
980 std::move(posted_to_task_runner_));
981 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
982 }
983
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)984 void MultiplexRouter::UpdateEndpointStateMayRemove(
985 InterfaceEndpoint* endpoint,
986 EndpointStateUpdateType type) {
987 if (type == ENDPOINT_CLOSED) {
988 endpoint->set_closed();
989 } else {
990 endpoint->set_peer_closed();
991 // If the interface endpoint is performing a sync watch, this makes sure
992 // it is notified and eventually exits the sync watch.
993 endpoint->SignalSyncMessageEvent();
994 }
995 if (endpoint->closed() && endpoint->peer_closed())
996 endpoints_.erase(endpoint->id());
997 }
998
RaiseErrorInNonTestingMode()999 void MultiplexRouter::RaiseErrorInNonTestingMode() {
1000 AssertLockAcquired();
1001 if (!testing_mode_)
1002 RaiseError();
1003 }
1004
FindOrInsertEndpoint(InterfaceId id,bool * inserted)1005 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
1006 InterfaceId id,
1007 bool* inserted) {
1008 AssertLockAcquired();
1009 // Either |inserted| is nullptr or it points to a boolean initialized as
1010 // false.
1011 DCHECK(!inserted || !*inserted);
1012
1013 InterfaceEndpoint* endpoint = FindEndpoint(id);
1014 if (!endpoint) {
1015 endpoint = new InterfaceEndpoint(this, id);
1016 endpoints_[id] = endpoint;
1017 if (inserted)
1018 *inserted = true;
1019 }
1020
1021 return endpoint;
1022 }
1023
FindEndpoint(InterfaceId id)1024 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
1025 InterfaceId id) {
1026 AssertLockAcquired();
1027 auto iter = endpoints_.find(id);
1028 return iter != endpoints_.end() ? iter->second.get() : nullptr;
1029 }
1030
AssertLockAcquired()1031 void MultiplexRouter::AssertLockAcquired() {
1032 #if DCHECK_IS_ON()
1033 if (lock_)
1034 lock_->AssertAcquired();
1035 #endif
1036 }
1037
InsertEndpointsForMessage(const Message & message)1038 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) {
1039 if (!message.is_serialized())
1040 return true;
1041
1042 uint32_t num_ids = message.payload_num_interface_ids();
1043 if (num_ids == 0)
1044 return true;
1045
1046 const uint32_t* ids = message.payload_interface_ids();
1047
1048 MayAutoLock locker(&lock_);
1049 for (uint32_t i = 0; i < num_ids; ++i) {
1050 // Message header validation already ensures that the IDs are valid and not
1051 // the master ID.
1052 // The IDs are from the remote side and therefore their namespace bit is
1053 // supposed to be different than the value that this router would use.
1054 if (set_interface_id_namespace_bit_ ==
1055 HasInterfaceIdNamespaceBitSet(ids[i])) {
1056 return false;
1057 }
1058
1059 // It is possible that the endpoint already exists even when the remote side
1060 // is well-behaved: it might have notified us that the peer endpoint has
1061 // closed.
1062 bool inserted = false;
1063 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted);
1064 if (endpoint->closed() || endpoint->handle_created())
1065 return false;
1066 }
1067
1068 return true;
1069 }
1070
CloseEndpointsForMessage(const Message & message)1071 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) {
1072 AssertLockAcquired();
1073
1074 if (!message.is_serialized())
1075 return;
1076
1077 uint32_t num_ids = message.payload_num_interface_ids();
1078 if (num_ids == 0)
1079 return;
1080
1081 const uint32_t* ids = message.payload_interface_ids();
1082 for (uint32_t i = 0; i < num_ids; ++i) {
1083 InterfaceEndpoint* endpoint = FindEndpoint(ids[i]);
1084 // If the remote side maliciously sends the same interface ID in another
1085 // message which has been dispatched, we could get here with no endpoint
1086 // for the ID, a closed endpoint, or an endpoint with handle created.
1087 if (!endpoint || endpoint->closed() || endpoint->handle_created()) {
1088 RaiseErrorInNonTestingMode();
1089 continue;
1090 }
1091
1092 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
1093 MayAutoUnlock unlocker(&lock_);
1094 control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt);
1095 }
1096
1097 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
1098 }
1099
1100 } // namespace internal
1101 } // namespace mojo
1102