1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stdint.h>
8 
9 #include "base/bind.h"
10 #include "base/check_op.h"
11 #include "base/compiler_specific.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/metrics/field_trial_params.h"
16 #include "base/metrics/histogram_macros.h"
17 #include "base/no_destructor.h"
18 #include "base/rand_util.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task/current_thread.h"
22 #include "base/threading/sequence_local_storage_slot.h"
23 #include "base/trace_event/trace_event.h"
24 #include "base/trace_event/typed_macros.h"
25 #include "mojo/public/c/system/quota.h"
26 #include "mojo/public/cpp/bindings/features.h"
27 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
28 #include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
29 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
30 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
31 #include "mojo/public/cpp/system/wait.h"
32 #include "third_party/perfetto/protos/perfetto/trace/track_event/chrome_mojo_event_info.pbzero.h"
33 
34 #if defined(ENABLE_IPC_FUZZER)
35 #include "mojo/public/cpp/bindings/message_dumper.h"
36 #endif
37 
38 namespace mojo {
39 
40 namespace {
41 
42 // The default outgoing serialization mode for new Connectors.
43 Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
44     Connector::OutgoingSerializationMode::kLazy;
45 
46 // The default incoming serialization mode for new Connectors.
47 Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
48     Connector::IncomingSerializationMode::kDispatchAsIs;
49 
EnableTaskPerMessage()50 bool EnableTaskPerMessage() {
51   // Const since this may be called from any thread. Initialization is
52   // thread-safe. This is a workaround since some consumers of Mojo (e.g. many
53   // browser tests) use base::FeatureList incorrectly and thus cause data races
54   // when features are queried from arbitrary threads.
55   static const bool enable =
56       base::FeatureList::IsEnabled(features::kTaskPerMessage);
57   return enable;
58 }
59 
60 }  // namespace
61 
62 // Used to efficiently maintain a doubly-linked list of all Connectors
63 // currently dispatching on any given thread.
64 class Connector::ActiveDispatchTracker {
65  public:
66   explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
67   ~ActiveDispatchTracker();
68 
69   void NotifyBeginNesting();
70 
71  private:
72   const base::WeakPtr<Connector> connector_;
73   RunLoopNestingObserver* const nesting_observer_;
74   ActiveDispatchTracker* outer_tracker_ = nullptr;
75   ActiveDispatchTracker* inner_tracker_ = nullptr;
76 
77   DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker);
78 };
79 
80 // Watches the MessageLoop on the current thread. Notifies the current chain of
81 // ActiveDispatchTrackers when a nested run loop is started.
82 class Connector::RunLoopNestingObserver
83     : public base::RunLoop::NestingObserver {
84  public:
RunLoopNestingObserver()85   RunLoopNestingObserver() {
86     base::RunLoop::AddNestingObserverOnCurrentThread(this);
87   }
88 
~RunLoopNestingObserver()89   ~RunLoopNestingObserver() override {
90     base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
91   }
92 
93   // base::RunLoop::NestingObserver:
OnBeginNestedRunLoop()94   void OnBeginNestedRunLoop() override {
95     if (top_tracker_)
96       top_tracker_->NotifyBeginNesting();
97   }
98 
GetForThread()99   static RunLoopNestingObserver* GetForThread() {
100     if (!base::CurrentThread::Get())
101       return nullptr;
102     // The NestingObserver for each thread. Note that this is always a
103     // Connector::RunLoopNestingObserver; we use the base type here because that
104     // subclass is private to Connector.
105     static base::NoDestructor<
106         base::SequenceLocalStorageSlot<RunLoopNestingObserver>>
107         sls_nesting_observer;
108     return &sls_nesting_observer->GetOrCreateValue();
109   }
110 
111  private:
112   friend class ActiveDispatchTracker;
113 
114   ActiveDispatchTracker* top_tracker_ = nullptr;
115 
116   DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver);
117 };
118 
ActiveDispatchTracker(const base::WeakPtr<Connector> & connector)119 Connector::ActiveDispatchTracker::ActiveDispatchTracker(
120     const base::WeakPtr<Connector>& connector)
121     : connector_(connector), nesting_observer_(connector_->nesting_observer_) {
122   DCHECK(nesting_observer_);
123   if (nesting_observer_->top_tracker_) {
124     outer_tracker_ = nesting_observer_->top_tracker_;
125     outer_tracker_->inner_tracker_ = this;
126   }
127   nesting_observer_->top_tracker_ = this;
128 }
129 
~ActiveDispatchTracker()130 Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
131   if (nesting_observer_->top_tracker_ == this)
132     nesting_observer_->top_tracker_ = outer_tracker_;
133   else if (inner_tracker_)
134     inner_tracker_->outer_tracker_ = outer_tracker_;
135   if (outer_tracker_)
136     outer_tracker_->inner_tracker_ = inner_tracker_;
137 }
138 
NotifyBeginNesting()139 void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
140   if (connector_ && connector_->handle_watcher_)
141     connector_->handle_watcher_->ArmOrNotify();
142   if (outer_tracker_)
143     outer_tracker_->NotifyBeginNesting();
144 }
145 
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SequencedTaskRunner> runner,const char * interface_name)146 Connector::Connector(ScopedMessagePipeHandle message_pipe,
147                      ConnectorConfig config,
148                      scoped_refptr<base::SequencedTaskRunner> runner,
149                      const char* interface_name)
150     : message_pipe_(std::move(message_pipe)),
151       task_runner_(std::move(runner)),
152       error_(false),
153       force_immediate_dispatch_(!EnableTaskPerMessage()),
154       outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
155       incoming_serialization_mode_(g_default_incoming_serialization_mode),
156       interface_name_(interface_name),
157       nesting_observer_(RunLoopNestingObserver::GetForThread()) {
158   if (config == MULTI_THREADED_SEND)
159     lock_.emplace();
160 
161 #if defined(ENABLE_IPC_FUZZER)
162   if (!MessageDumper::GetMessageDumpDirectory().empty())
163     message_dumper_ = std::make_unique<MessageDumper>();
164 #endif
165 
166   weak_self_ = weak_factory_.GetWeakPtr();
167   // Even though we don't have an incoming receiver, we still want to monitor
168   // the message pipe to know if is closed or encounters an error.
169   WaitToReadMore();
170 }
171 
~Connector()172 Connector::~Connector() {
173   if (quota_checker_) {
174     // Clear the message pipe handle in the checker.
175     quota_checker_->SetMessagePipe(MessagePipeHandle());
176     UMA_HISTOGRAM_COUNTS_1M("Mojo.Connector.MaxUnreadMessageQuotaUsed",
177                             quota_checker_->GetMaxQuotaUsage());
178   }
179 
180   {
181     // Allow for quick destruction on any sequence if the pipe is already
182     // closed.
183     base::AutoLock lock(connected_lock_);
184     if (!connected_)
185       return;
186   }
187 
188   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
189   CancelWait();
190 }
191 
SetOutgoingSerializationMode(OutgoingSerializationMode mode)192 void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
193   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
194   outgoing_serialization_mode_ = mode;
195 }
196 
SetIncomingSerializationMode(IncomingSerializationMode mode)197 void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
198   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
199   incoming_serialization_mode_ = mode;
200 }
201 
CloseMessagePipe()202 void Connector::CloseMessagePipe() {
203   // Throw away the returned message pipe.
204   PassMessagePipe();
205 }
206 
PassMessagePipe()207 ScopedMessagePipeHandle Connector::PassMessagePipe() {
208   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
209 
210   CancelWait();
211   internal::MayAutoLock locker(&lock_);
212   ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
213   weak_factory_.InvalidateWeakPtrs();
214   sync_handle_watcher_callback_count_ = 0;
215 
216   base::AutoLock lock(connected_lock_);
217   connected_ = false;
218   return message_pipe;
219 }
220 
RaiseError()221 void Connector::RaiseError() {
222   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
223 
224   HandleError(true, true);
225 }
226 
SetConnectionGroup(ConnectionGroup::Ref ref)227 void Connector::SetConnectionGroup(ConnectionGroup::Ref ref) {
228   // If this Connector already belonged to a group, parent the new group to that
229   // one so that the reference is not lost.
230   if (connection_group_)
231     ref.SetParentGroup(std::move(connection_group_));
232   connection_group_ = std::move(ref);
233 }
234 
WaitForIncomingMessage()235 bool Connector::WaitForIncomingMessage() {
236   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
237 
238   if (error_)
239     return false;
240 
241   ResumeIncomingMethodCallProcessing();
242 
243   MojoResult rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
244   if (rv != MOJO_RESULT_OK) {
245     // Users that call WaitForIncomingMessage() should expect their code to be
246     // re-entered, so we call the error handler synchronously.
247     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
248                 false /* force_async_handler */);
249     return false;
250   }
251 
252   Message message;
253   if ((rv = ReadMessage(&message)) != MOJO_RESULT_OK) {
254     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
255                 false /* force_async_handler */);
256     return false;
257   }
258 
259   DCHECK(!message.IsNull());
260   return DispatchMessage(std::move(message));
261 }
262 
PauseIncomingMethodCallProcessing()263 void Connector::PauseIncomingMethodCallProcessing() {
264   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
265 
266   if (paused_)
267     return;
268 
269   paused_ = true;
270   CancelWait();
271 }
272 
ResumeIncomingMethodCallProcessing()273 void Connector::ResumeIncomingMethodCallProcessing() {
274   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
275 
276   if (!paused_)
277     return;
278 
279   paused_ = false;
280   WaitToReadMore();
281 }
282 
PrefersSerializedMessages()283 bool Connector::PrefersSerializedMessages() {
284   if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager)
285     return true;
286   DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_);
287   return peer_remoteness_tracker_ &&
288          peer_remoteness_tracker_->last_known_state().peer_remote();
289 }
290 
Accept(Message * message)291 bool Connector::Accept(Message* message) {
292   if (!lock_)
293     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
294 
295   if (error_)
296     return false;
297 
298   internal::MayAutoLock locker(&lock_);
299 
300   if (!message_pipe_.is_valid() || drop_writes_)
301     return true;
302 
303 #if defined(ENABLE_IPC_FUZZER)
304   if (message_dumper_ && message->is_serialized()) {
305     bool dump_result = message_dumper_->Accept(message);
306     DCHECK(dump_result);
307   }
308 #endif
309 
310   if (quota_checker_)
311     quota_checker_->BeforeWrite();
312 
313   MojoResult rv =
314       WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
315                       MOJO_WRITE_MESSAGE_FLAG_NONE);
316 
317   switch (rv) {
318     case MOJO_RESULT_OK:
319       break;
320     case MOJO_RESULT_FAILED_PRECONDITION:
321       // There's no point in continuing to write to this pipe since the other
322       // end is gone. Avoid writing any future messages. Hide write failures
323       // from the caller since we'd like them to continue consuming any backlog
324       // of incoming messages before regarding the message pipe as closed.
325       drop_writes_ = true;
326       break;
327     case MOJO_RESULT_BUSY:
328       // We'd get a "busy" result if one of the message's handles is:
329       //   - |message_pipe_|'s own handle;
330       //   - simultaneously being used on another sequence; or
331       //   - in a "busy" state that prohibits it from being transferred (e.g.,
332       //     a data pipe handle in the middle of a two-phase read/write,
333       //     regardless of which sequence that two-phase read/write is happening
334       //     on).
335       // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
336       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
337       // rather than hanging.)
338       CHECK(false) << "Race condition or other bug detected";
339       return false;
340     default:
341       // This particular write was rejected, presumably because of bad input.
342       // The pipe is not necessarily in a bad state.
343       return false;
344   }
345   return true;
346 }
347 
AllowWokenUpBySyncWatchOnSameThread()348 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
349   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
350 
351   allow_woken_up_by_others_ = true;
352 
353   EnsureSyncWatcherExists();
354   sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
355 }
356 
SetMessageQuotaChecker(scoped_refptr<internal::MessageQuotaChecker> checker)357 void Connector::SetMessageQuotaChecker(
358     scoped_refptr<internal::MessageQuotaChecker> checker) {
359   DCHECK(checker && !quota_checker_);
360 
361   quota_checker_ = std::move(checker);
362   quota_checker_->SetMessagePipe(message_pipe_.get());
363 }
364 
365 // static
OverrideDefaultSerializationBehaviorForTesting(OutgoingSerializationMode outgoing_mode,IncomingSerializationMode incoming_mode)366 void Connector::OverrideDefaultSerializationBehaviorForTesting(
367     OutgoingSerializationMode outgoing_mode,
368     IncomingSerializationMode incoming_mode) {
369   g_default_outgoing_serialization_mode = outgoing_mode;
370   g_default_incoming_serialization_mode = incoming_mode;
371 }
372 
OnWatcherHandleReady(MojoResult result)373 void Connector::OnWatcherHandleReady(MojoResult result) {
374   OnHandleReadyInternal(result);
375 }
376 
OnSyncHandleWatcherHandleReady(MojoResult result)377 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
378   base::WeakPtr<Connector> weak_self(weak_self_);
379 
380   sync_handle_watcher_callback_count_++;
381   OnHandleReadyInternal(result);
382   // At this point, this object might have been deleted.
383   if (weak_self) {
384     DCHECK_LT(0u, sync_handle_watcher_callback_count_);
385     sync_handle_watcher_callback_count_--;
386   }
387 }
388 
OnHandleReadyInternal(MojoResult result)389 void Connector::OnHandleReadyInternal(MojoResult result) {
390   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
391 
392   if (result == MOJO_RESULT_FAILED_PRECONDITION) {
393     // No more messages on the pipe and the peer is closed.
394     HandleError(false /* force_pipe_reset */, false /* force_async_handler */);
395     return;
396   } else if (result != MOJO_RESULT_OK) {
397     // Some other fatal error condition was encountered. We can propagate this
398     // immediately.
399     HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
400     return;
401   }
402 
403   ReadAllAvailableMessages();
404   // At this point, this object might have been deleted. Return.
405 }
406 
WaitToReadMore()407 void Connector::WaitToReadMore() {
408   CHECK(!paused_);
409   DCHECK(!handle_watcher_);
410 
411   DCHECK(task_runner_->RunsTasksInCurrentSequence());
412   handle_watcher_ = std::make_unique<SimpleWatcher>(
413       FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_,
414       interface_name_);
415   MojoResult rv = handle_watcher_->Watch(
416       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
417       base::BindRepeating(&Connector::OnWatcherHandleReady,
418                           base::Unretained(this)));
419 
420   if (message_pipe_.is_valid()) {
421     peer_remoteness_tracker_.emplace(
422         message_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_REMOTE, task_runner_);
423   }
424 
425   if (rv != MOJO_RESULT_OK) {
426     // If the watch failed because the handle is invalid or its conditions can
427     // no longer be met, we signal the error asynchronously to avoid reentry.
428     task_runner_->PostTask(
429         FROM_HERE,
430         base::BindOnce(&Connector::OnWatcherHandleReady, weak_self_, rv));
431   } else {
432     handle_watcher_->ArmOrNotify();
433   }
434 
435   if (allow_woken_up_by_others_) {
436     EnsureSyncWatcherExists();
437     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
438   }
439 }
440 
QueryPendingMessageCount() const441 uint64_t Connector::QueryPendingMessageCount() const {
442   uint64_t unused_current_limit = 0;
443   uint64_t pending_message_count = 0;
444   MojoQueryQuota(
445       message_pipe_.get().value(), MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH,
446       /*options=*/nullptr, &unused_current_limit, &pending_message_count);
447   return pending_message_count;
448 }
449 
ReadMessage(Message * message)450 MojoResult Connector::ReadMessage(Message* message) {
451   ScopedMessageHandle handle;
452   MojoResult result =
453       ReadMessageNew(message_pipe_.get(), &handle, MOJO_READ_MESSAGE_FLAG_NONE);
454   if (result != MOJO_RESULT_OK)
455     return result;
456 
457   *message = Message::CreateFromMessageHandle(&handle);
458   if (message->IsNull()) {
459     // Even if the read was successful, the Message may still be null if there
460     // was a problem extracting handles from it. We treat this essentially as
461     // a bad IPC because we don't really have a better option.
462     //
463     // We include |interface_name_| in the error message since it usually
464     // (via this Connector's owner) provides useful information about which
465     // binding interface is using this Connector.
466     NotifyBadMessage(handle.get(),
467                      std::string(interface_name_) +
468                          "One or more handle attachments were invalid.");
469     return MOJO_RESULT_ABORTED;
470   }
471 
472   return MOJO_RESULT_OK;
473 }
474 
DispatchMessage(Message message)475 bool Connector::DispatchMessage(Message message) {
476   DCHECK(!paused_);
477 
478   base::WeakPtr<Connector> weak_self = weak_self_;
479   base::Optional<ActiveDispatchTracker> dispatch_tracker;
480   if (!is_dispatching_ && nesting_observer_) {
481     is_dispatching_ = true;
482     dispatch_tracker.emplace(weak_self);
483   }
484 
485   if (incoming_serialization_mode_ ==
486       IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
487     message.SerializeIfNecessary();
488   } else {
489     DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
490               incoming_serialization_mode_);
491   }
492 
493   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "mojo::Message Receive",
494                          message.header()->trace_id, TRACE_EVENT_FLAG_FLOW_IN);
495 #if !BUILDFLAG(MOJO_TRACE_ENABLED)
496   // This emits just full class name, and is inferior to mojo tracing.
497   TRACE_EVENT("toplevel", "Connector::DispatchMessage",
498               [this](perfetto::EventContext ctx) {
499                 ctx.event()
500                     ->set_chrome_mojo_event_info()
501                     ->set_watcher_notify_interface_tag(interface_name_);
502               });
503 #endif
504 
505   if (connection_group_)
506     message.set_receiver_connection_group(&connection_group_);
507   bool receiver_result =
508       incoming_receiver_ && incoming_receiver_->Accept(&message);
509   if (!weak_self)
510     return receiver_result;
511 
512   if (dispatch_tracker) {
513     is_dispatching_ = false;
514     dispatch_tracker.reset();
515   }
516 
517   if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
518     HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
519     return false;
520   }
521 
522   return true;
523 }
524 
PostDispatchNextMessageFromPipe()525 void Connector::PostDispatchNextMessageFromPipe() {
526   ++num_pending_dispatch_tasks_;
527   task_runner_->PostTask(
528       FROM_HERE,
529       base::BindOnce(&Connector::CallDispatchNextMessageFromPipe, weak_self_));
530 }
531 
CallDispatchNextMessageFromPipe()532 void Connector::CallDispatchNextMessageFromPipe() {
533   DCHECK_GT(num_pending_dispatch_tasks_, 0u);
534   --num_pending_dispatch_tasks_;
535   ReadAllAvailableMessages();
536 }
537 
ScheduleDispatchOfPendingMessagesOrWaitForMore(uint64_t pending_message_count)538 void Connector::ScheduleDispatchOfPendingMessagesOrWaitForMore(
539     uint64_t pending_message_count) {
540   if (pending_message_count == 0) {
541     // We're done only because there are no more messages to read, so go back to
542     // watching the pipe for more.
543     if (handle_watcher_)
544       handle_watcher_->ArmOrNotify();
545     return;
546   }
547 
548   while (pending_message_count > num_pending_dispatch_tasks_) {
549     PostDispatchNextMessageFromPipe();
550   }
551 }
552 
ReadAllAvailableMessages()553 void Connector::ReadAllAvailableMessages() {
554   if (paused_ || error_)
555     return;
556 
557   base::WeakPtr<Connector> weak_self = weak_self_;
558 
559   do {
560     Message message;
561     MojoResult rv = ReadMessage(&message);
562 
563     switch (rv) {
564       case MOJO_RESULT_OK:
565         DCHECK(!message.IsNull());
566         if (!DispatchMessage(std::move(message)) || !weak_self || paused_) {
567           return;
568         }
569         break;
570 
571       case MOJO_RESULT_SHOULD_WAIT:
572         // No more messages - we need to wait for new ones to arrive.
573         ScheduleDispatchOfPendingMessagesOrWaitForMore(
574             /*pending_message_count*/ 0u);
575         return;
576 
577       case MOJO_RESULT_FAILED_PRECONDITION:
578         // The peer endpoint was closed and there are no more messages to read.
579         // We can signal an error right away.
580         HandleError(false /* force_pipe_reset */,
581                     false /* force_async_handler */);
582         return;
583 
584       default:
585         // A fatal error occurred on the pipe, handle it immediately.
586         HandleError(true /* force_pipe_reset */,
587                     false /* force_async_handler */);
588         return;
589     }
590   } while (weak_self && should_dispatch_messages_immediately());
591 
592   if (weak_self) {
593     const auto pending_message_count = QueryPendingMessageCount();
594     ScheduleDispatchOfPendingMessagesOrWaitForMore(pending_message_count);
595   }
596 }
597 
CancelWait()598 void Connector::CancelWait() {
599   peer_remoteness_tracker_.reset();
600   handle_watcher_.reset();
601   sync_watcher_.reset();
602 }
603 
HandleError(bool force_pipe_reset,bool force_async_handler)604 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
605   if (error_ || !message_pipe_.is_valid())
606     return;
607 
608   if (paused_) {
609     // Enforce calling the error handler asynchronously if the user has paused
610     // receiving messages. We need to wait until the user starts receiving
611     // messages again.
612     force_async_handler = true;
613   }
614 
615   if (!force_pipe_reset && force_async_handler)
616     force_pipe_reset = true;
617 
618   if (force_pipe_reset) {
619     CancelWait();
620     internal::MayAutoLock locker(&lock_);
621     message_pipe_.reset();
622     MessagePipe dummy_pipe;
623     message_pipe_ = std::move(dummy_pipe.handle0);
624   } else {
625     CancelWait();
626   }
627 
628   if (force_async_handler) {
629     if (!paused_)
630       WaitToReadMore();
631   } else {
632     error_ = true;
633     if (connection_error_handler_)
634       std::move(connection_error_handler_).Run();
635   }
636 }
637 
EnsureSyncWatcherExists()638 void Connector::EnsureSyncWatcherExists() {
639   if (sync_watcher_)
640     return;
641   sync_watcher_.reset(new SyncHandleWatcher(
642       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
643       base::BindRepeating(&Connector::OnSyncHandleWatcherHandleReady,
644                           base::Unretained(this))));
645 }
646 
647 }  // namespace mojo
648