1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stdint.h>
8
9 #include "base/bind.h"
10 #include "base/check_op.h"
11 #include "base/compiler_specific.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/metrics/field_trial_params.h"
16 #include "base/metrics/histogram_macros.h"
17 #include "base/no_destructor.h"
18 #include "base/rand_util.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task/current_thread.h"
22 #include "base/threading/sequence_local_storage_slot.h"
23 #include "base/trace_event/trace_event.h"
24 #include "base/trace_event/typed_macros.h"
25 #include "mojo/public/c/system/quota.h"
26 #include "mojo/public/cpp/bindings/features.h"
27 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
28 #include "mojo/public/cpp/bindings/lib/message_quota_checker.h"
29 #include "mojo/public/cpp/bindings/mojo_buildflags.h"
30 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
31 #include "mojo/public/cpp/system/wait.h"
32 #include "third_party/perfetto/protos/perfetto/trace/track_event/chrome_mojo_event_info.pbzero.h"
33
34 #if defined(ENABLE_IPC_FUZZER)
35 #include "mojo/public/cpp/bindings/message_dumper.h"
36 #endif
37
38 namespace mojo {
39
40 namespace {
41
42 // The default outgoing serialization mode for new Connectors.
43 Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
44 Connector::OutgoingSerializationMode::kLazy;
45
46 // The default incoming serialization mode for new Connectors.
47 Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
48 Connector::IncomingSerializationMode::kDispatchAsIs;
49
EnableTaskPerMessage()50 bool EnableTaskPerMessage() {
51 // Const since this may be called from any thread. Initialization is
52 // thread-safe. This is a workaround since some consumers of Mojo (e.g. many
53 // browser tests) use base::FeatureList incorrectly and thus cause data races
54 // when features are queried from arbitrary threads.
55 static const bool enable =
56 base::FeatureList::IsEnabled(features::kTaskPerMessage);
57 return enable;
58 }
59
60 } // namespace
61
62 // Used to efficiently maintain a doubly-linked list of all Connectors
63 // currently dispatching on any given thread.
64 class Connector::ActiveDispatchTracker {
65 public:
66 explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
67 ~ActiveDispatchTracker();
68
69 void NotifyBeginNesting();
70
71 private:
72 const base::WeakPtr<Connector> connector_;
73 RunLoopNestingObserver* const nesting_observer_;
74 ActiveDispatchTracker* outer_tracker_ = nullptr;
75 ActiveDispatchTracker* inner_tracker_ = nullptr;
76
77 DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker);
78 };
79
80 // Watches the MessageLoop on the current thread. Notifies the current chain of
81 // ActiveDispatchTrackers when a nested run loop is started.
82 class Connector::RunLoopNestingObserver
83 : public base::RunLoop::NestingObserver {
84 public:
RunLoopNestingObserver()85 RunLoopNestingObserver() {
86 base::RunLoop::AddNestingObserverOnCurrentThread(this);
87 }
88
~RunLoopNestingObserver()89 ~RunLoopNestingObserver() override {
90 base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
91 }
92
93 // base::RunLoop::NestingObserver:
OnBeginNestedRunLoop()94 void OnBeginNestedRunLoop() override {
95 if (top_tracker_)
96 top_tracker_->NotifyBeginNesting();
97 }
98
GetForThread()99 static RunLoopNestingObserver* GetForThread() {
100 if (!base::CurrentThread::Get())
101 return nullptr;
102 // The NestingObserver for each thread. Note that this is always a
103 // Connector::RunLoopNestingObserver; we use the base type here because that
104 // subclass is private to Connector.
105 static base::NoDestructor<
106 base::SequenceLocalStorageSlot<RunLoopNestingObserver>>
107 sls_nesting_observer;
108 return &sls_nesting_observer->GetOrCreateValue();
109 }
110
111 private:
112 friend class ActiveDispatchTracker;
113
114 ActiveDispatchTracker* top_tracker_ = nullptr;
115
116 DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver);
117 };
118
ActiveDispatchTracker(const base::WeakPtr<Connector> & connector)119 Connector::ActiveDispatchTracker::ActiveDispatchTracker(
120 const base::WeakPtr<Connector>& connector)
121 : connector_(connector), nesting_observer_(connector_->nesting_observer_) {
122 DCHECK(nesting_observer_);
123 if (nesting_observer_->top_tracker_) {
124 outer_tracker_ = nesting_observer_->top_tracker_;
125 outer_tracker_->inner_tracker_ = this;
126 }
127 nesting_observer_->top_tracker_ = this;
128 }
129
~ActiveDispatchTracker()130 Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
131 if (nesting_observer_->top_tracker_ == this)
132 nesting_observer_->top_tracker_ = outer_tracker_;
133 else if (inner_tracker_)
134 inner_tracker_->outer_tracker_ = outer_tracker_;
135 if (outer_tracker_)
136 outer_tracker_->inner_tracker_ = inner_tracker_;
137 }
138
NotifyBeginNesting()139 void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
140 if (connector_ && connector_->handle_watcher_)
141 connector_->handle_watcher_->ArmOrNotify();
142 if (outer_tracker_)
143 outer_tracker_->NotifyBeginNesting();
144 }
145
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SequencedTaskRunner> runner,const char * interface_name)146 Connector::Connector(ScopedMessagePipeHandle message_pipe,
147 ConnectorConfig config,
148 scoped_refptr<base::SequencedTaskRunner> runner,
149 const char* interface_name)
150 : message_pipe_(std::move(message_pipe)),
151 task_runner_(std::move(runner)),
152 error_(false),
153 force_immediate_dispatch_(!EnableTaskPerMessage()),
154 outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
155 incoming_serialization_mode_(g_default_incoming_serialization_mode),
156 interface_name_(interface_name),
157 nesting_observer_(RunLoopNestingObserver::GetForThread()) {
158 if (config == MULTI_THREADED_SEND)
159 lock_.emplace();
160
161 #if defined(ENABLE_IPC_FUZZER)
162 if (!MessageDumper::GetMessageDumpDirectory().empty())
163 message_dumper_ = std::make_unique<MessageDumper>();
164 #endif
165
166 weak_self_ = weak_factory_.GetWeakPtr();
167 // Even though we don't have an incoming receiver, we still want to monitor
168 // the message pipe to know if is closed or encounters an error.
169 WaitToReadMore();
170 }
171
~Connector()172 Connector::~Connector() {
173 if (quota_checker_) {
174 // Clear the message pipe handle in the checker.
175 quota_checker_->SetMessagePipe(MessagePipeHandle());
176 UMA_HISTOGRAM_COUNTS_1M("Mojo.Connector.MaxUnreadMessageQuotaUsed",
177 quota_checker_->GetMaxQuotaUsage());
178 }
179
180 {
181 // Allow for quick destruction on any sequence if the pipe is already
182 // closed.
183 base::AutoLock lock(connected_lock_);
184 if (!connected_)
185 return;
186 }
187
188 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
189 CancelWait();
190 }
191
SetOutgoingSerializationMode(OutgoingSerializationMode mode)192 void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
193 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
194 outgoing_serialization_mode_ = mode;
195 }
196
SetIncomingSerializationMode(IncomingSerializationMode mode)197 void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
198 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
199 incoming_serialization_mode_ = mode;
200 }
201
CloseMessagePipe()202 void Connector::CloseMessagePipe() {
203 // Throw away the returned message pipe.
204 PassMessagePipe();
205 }
206
PassMessagePipe()207 ScopedMessagePipeHandle Connector::PassMessagePipe() {
208 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
209
210 CancelWait();
211 internal::MayAutoLock locker(&lock_);
212 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
213 weak_factory_.InvalidateWeakPtrs();
214 sync_handle_watcher_callback_count_ = 0;
215
216 base::AutoLock lock(connected_lock_);
217 connected_ = false;
218 return message_pipe;
219 }
220
RaiseError()221 void Connector::RaiseError() {
222 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
223
224 HandleError(true, true);
225 }
226
SetConnectionGroup(ConnectionGroup::Ref ref)227 void Connector::SetConnectionGroup(ConnectionGroup::Ref ref) {
228 // If this Connector already belonged to a group, parent the new group to that
229 // one so that the reference is not lost.
230 if (connection_group_)
231 ref.SetParentGroup(std::move(connection_group_));
232 connection_group_ = std::move(ref);
233 }
234
WaitForIncomingMessage()235 bool Connector::WaitForIncomingMessage() {
236 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
237
238 if (error_)
239 return false;
240
241 ResumeIncomingMethodCallProcessing();
242
243 MojoResult rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
244 if (rv != MOJO_RESULT_OK) {
245 // Users that call WaitForIncomingMessage() should expect their code to be
246 // re-entered, so we call the error handler synchronously.
247 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
248 false /* force_async_handler */);
249 return false;
250 }
251
252 Message message;
253 if ((rv = ReadMessage(&message)) != MOJO_RESULT_OK) {
254 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
255 false /* force_async_handler */);
256 return false;
257 }
258
259 DCHECK(!message.IsNull());
260 return DispatchMessage(std::move(message));
261 }
262
PauseIncomingMethodCallProcessing()263 void Connector::PauseIncomingMethodCallProcessing() {
264 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
265
266 if (paused_)
267 return;
268
269 paused_ = true;
270 CancelWait();
271 }
272
ResumeIncomingMethodCallProcessing()273 void Connector::ResumeIncomingMethodCallProcessing() {
274 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
275
276 if (!paused_)
277 return;
278
279 paused_ = false;
280 WaitToReadMore();
281 }
282
PrefersSerializedMessages()283 bool Connector::PrefersSerializedMessages() {
284 if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager)
285 return true;
286 DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_);
287 return peer_remoteness_tracker_ &&
288 peer_remoteness_tracker_->last_known_state().peer_remote();
289 }
290
Accept(Message * message)291 bool Connector::Accept(Message* message) {
292 if (!lock_)
293 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
294
295 if (error_)
296 return false;
297
298 internal::MayAutoLock locker(&lock_);
299
300 if (!message_pipe_.is_valid() || drop_writes_)
301 return true;
302
303 #if defined(ENABLE_IPC_FUZZER)
304 if (message_dumper_ && message->is_serialized()) {
305 bool dump_result = message_dumper_->Accept(message);
306 DCHECK(dump_result);
307 }
308 #endif
309
310 if (quota_checker_)
311 quota_checker_->BeforeWrite();
312
313 MojoResult rv =
314 WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
315 MOJO_WRITE_MESSAGE_FLAG_NONE);
316
317 switch (rv) {
318 case MOJO_RESULT_OK:
319 break;
320 case MOJO_RESULT_FAILED_PRECONDITION:
321 // There's no point in continuing to write to this pipe since the other
322 // end is gone. Avoid writing any future messages. Hide write failures
323 // from the caller since we'd like them to continue consuming any backlog
324 // of incoming messages before regarding the message pipe as closed.
325 drop_writes_ = true;
326 break;
327 case MOJO_RESULT_BUSY:
328 // We'd get a "busy" result if one of the message's handles is:
329 // - |message_pipe_|'s own handle;
330 // - simultaneously being used on another sequence; or
331 // - in a "busy" state that prohibits it from being transferred (e.g.,
332 // a data pipe handle in the middle of a two-phase read/write,
333 // regardless of which sequence that two-phase read/write is happening
334 // on).
335 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
336 // crbug.com/389666, etc. are resolved, this will make tests fail quickly
337 // rather than hanging.)
338 CHECK(false) << "Race condition or other bug detected";
339 return false;
340 default:
341 // This particular write was rejected, presumably because of bad input.
342 // The pipe is not necessarily in a bad state.
343 return false;
344 }
345 return true;
346 }
347
AllowWokenUpBySyncWatchOnSameThread()348 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
349 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
350
351 allow_woken_up_by_others_ = true;
352
353 EnsureSyncWatcherExists();
354 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
355 }
356
SetMessageQuotaChecker(scoped_refptr<internal::MessageQuotaChecker> checker)357 void Connector::SetMessageQuotaChecker(
358 scoped_refptr<internal::MessageQuotaChecker> checker) {
359 DCHECK(checker && !quota_checker_);
360
361 quota_checker_ = std::move(checker);
362 quota_checker_->SetMessagePipe(message_pipe_.get());
363 }
364
365 // static
OverrideDefaultSerializationBehaviorForTesting(OutgoingSerializationMode outgoing_mode,IncomingSerializationMode incoming_mode)366 void Connector::OverrideDefaultSerializationBehaviorForTesting(
367 OutgoingSerializationMode outgoing_mode,
368 IncomingSerializationMode incoming_mode) {
369 g_default_outgoing_serialization_mode = outgoing_mode;
370 g_default_incoming_serialization_mode = incoming_mode;
371 }
372
OnWatcherHandleReady(MojoResult result)373 void Connector::OnWatcherHandleReady(MojoResult result) {
374 OnHandleReadyInternal(result);
375 }
376
OnSyncHandleWatcherHandleReady(MojoResult result)377 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
378 base::WeakPtr<Connector> weak_self(weak_self_);
379
380 sync_handle_watcher_callback_count_++;
381 OnHandleReadyInternal(result);
382 // At this point, this object might have been deleted.
383 if (weak_self) {
384 DCHECK_LT(0u, sync_handle_watcher_callback_count_);
385 sync_handle_watcher_callback_count_--;
386 }
387 }
388
OnHandleReadyInternal(MojoResult result)389 void Connector::OnHandleReadyInternal(MojoResult result) {
390 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
391
392 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
393 // No more messages on the pipe and the peer is closed.
394 HandleError(false /* force_pipe_reset */, false /* force_async_handler */);
395 return;
396 } else if (result != MOJO_RESULT_OK) {
397 // Some other fatal error condition was encountered. We can propagate this
398 // immediately.
399 HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
400 return;
401 }
402
403 ReadAllAvailableMessages();
404 // At this point, this object might have been deleted. Return.
405 }
406
WaitToReadMore()407 void Connector::WaitToReadMore() {
408 CHECK(!paused_);
409 DCHECK(!handle_watcher_);
410
411 DCHECK(task_runner_->RunsTasksInCurrentSequence());
412 handle_watcher_ = std::make_unique<SimpleWatcher>(
413 FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_,
414 interface_name_);
415 MojoResult rv = handle_watcher_->Watch(
416 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
417 base::BindRepeating(&Connector::OnWatcherHandleReady,
418 base::Unretained(this)));
419
420 if (message_pipe_.is_valid()) {
421 peer_remoteness_tracker_.emplace(
422 message_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_REMOTE, task_runner_);
423 }
424
425 if (rv != MOJO_RESULT_OK) {
426 // If the watch failed because the handle is invalid or its conditions can
427 // no longer be met, we signal the error asynchronously to avoid reentry.
428 task_runner_->PostTask(
429 FROM_HERE,
430 base::BindOnce(&Connector::OnWatcherHandleReady, weak_self_, rv));
431 } else {
432 handle_watcher_->ArmOrNotify();
433 }
434
435 if (allow_woken_up_by_others_) {
436 EnsureSyncWatcherExists();
437 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
438 }
439 }
440
QueryPendingMessageCount() const441 uint64_t Connector::QueryPendingMessageCount() const {
442 uint64_t unused_current_limit = 0;
443 uint64_t pending_message_count = 0;
444 MojoQueryQuota(
445 message_pipe_.get().value(), MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH,
446 /*options=*/nullptr, &unused_current_limit, &pending_message_count);
447 return pending_message_count;
448 }
449
ReadMessage(Message * message)450 MojoResult Connector::ReadMessage(Message* message) {
451 ScopedMessageHandle handle;
452 MojoResult result =
453 ReadMessageNew(message_pipe_.get(), &handle, MOJO_READ_MESSAGE_FLAG_NONE);
454 if (result != MOJO_RESULT_OK)
455 return result;
456
457 *message = Message::CreateFromMessageHandle(&handle);
458 if (message->IsNull()) {
459 // Even if the read was successful, the Message may still be null if there
460 // was a problem extracting handles from it. We treat this essentially as
461 // a bad IPC because we don't really have a better option.
462 //
463 // We include |interface_name_| in the error message since it usually
464 // (via this Connector's owner) provides useful information about which
465 // binding interface is using this Connector.
466 NotifyBadMessage(handle.get(),
467 std::string(interface_name_) +
468 "One or more handle attachments were invalid.");
469 return MOJO_RESULT_ABORTED;
470 }
471
472 return MOJO_RESULT_OK;
473 }
474
DispatchMessage(Message message)475 bool Connector::DispatchMessage(Message message) {
476 DCHECK(!paused_);
477
478 base::WeakPtr<Connector> weak_self = weak_self_;
479 base::Optional<ActiveDispatchTracker> dispatch_tracker;
480 if (!is_dispatching_ && nesting_observer_) {
481 is_dispatching_ = true;
482 dispatch_tracker.emplace(weak_self);
483 }
484
485 if (incoming_serialization_mode_ ==
486 IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
487 message.SerializeIfNecessary();
488 } else {
489 DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
490 incoming_serialization_mode_);
491 }
492
493 TRACE_EVENT_WITH_FLOW0("toplevel.flow", "mojo::Message Receive",
494 message.header()->trace_id, TRACE_EVENT_FLAG_FLOW_IN);
495 #if !BUILDFLAG(MOJO_TRACE_ENABLED)
496 // This emits just full class name, and is inferior to mojo tracing.
497 TRACE_EVENT("toplevel", "Connector::DispatchMessage",
498 [this](perfetto::EventContext ctx) {
499 ctx.event()
500 ->set_chrome_mojo_event_info()
501 ->set_watcher_notify_interface_tag(interface_name_);
502 });
503 #endif
504
505 if (connection_group_)
506 message.set_receiver_connection_group(&connection_group_);
507 bool receiver_result =
508 incoming_receiver_ && incoming_receiver_->Accept(&message);
509 if (!weak_self)
510 return receiver_result;
511
512 if (dispatch_tracker) {
513 is_dispatching_ = false;
514 dispatch_tracker.reset();
515 }
516
517 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
518 HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
519 return false;
520 }
521
522 return true;
523 }
524
PostDispatchNextMessageFromPipe()525 void Connector::PostDispatchNextMessageFromPipe() {
526 ++num_pending_dispatch_tasks_;
527 task_runner_->PostTask(
528 FROM_HERE,
529 base::BindOnce(&Connector::CallDispatchNextMessageFromPipe, weak_self_));
530 }
531
CallDispatchNextMessageFromPipe()532 void Connector::CallDispatchNextMessageFromPipe() {
533 DCHECK_GT(num_pending_dispatch_tasks_, 0u);
534 --num_pending_dispatch_tasks_;
535 ReadAllAvailableMessages();
536 }
537
ScheduleDispatchOfPendingMessagesOrWaitForMore(uint64_t pending_message_count)538 void Connector::ScheduleDispatchOfPendingMessagesOrWaitForMore(
539 uint64_t pending_message_count) {
540 if (pending_message_count == 0) {
541 // We're done only because there are no more messages to read, so go back to
542 // watching the pipe for more.
543 if (handle_watcher_)
544 handle_watcher_->ArmOrNotify();
545 return;
546 }
547
548 while (pending_message_count > num_pending_dispatch_tasks_) {
549 PostDispatchNextMessageFromPipe();
550 }
551 }
552
ReadAllAvailableMessages()553 void Connector::ReadAllAvailableMessages() {
554 if (paused_ || error_)
555 return;
556
557 base::WeakPtr<Connector> weak_self = weak_self_;
558
559 do {
560 Message message;
561 MojoResult rv = ReadMessage(&message);
562
563 switch (rv) {
564 case MOJO_RESULT_OK:
565 DCHECK(!message.IsNull());
566 if (!DispatchMessage(std::move(message)) || !weak_self || paused_) {
567 return;
568 }
569 break;
570
571 case MOJO_RESULT_SHOULD_WAIT:
572 // No more messages - we need to wait for new ones to arrive.
573 ScheduleDispatchOfPendingMessagesOrWaitForMore(
574 /*pending_message_count*/ 0u);
575 return;
576
577 case MOJO_RESULT_FAILED_PRECONDITION:
578 // The peer endpoint was closed and there are no more messages to read.
579 // We can signal an error right away.
580 HandleError(false /* force_pipe_reset */,
581 false /* force_async_handler */);
582 return;
583
584 default:
585 // A fatal error occurred on the pipe, handle it immediately.
586 HandleError(true /* force_pipe_reset */,
587 false /* force_async_handler */);
588 return;
589 }
590 } while (weak_self && should_dispatch_messages_immediately());
591
592 if (weak_self) {
593 const auto pending_message_count = QueryPendingMessageCount();
594 ScheduleDispatchOfPendingMessagesOrWaitForMore(pending_message_count);
595 }
596 }
597
CancelWait()598 void Connector::CancelWait() {
599 peer_remoteness_tracker_.reset();
600 handle_watcher_.reset();
601 sync_watcher_.reset();
602 }
603
HandleError(bool force_pipe_reset,bool force_async_handler)604 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
605 if (error_ || !message_pipe_.is_valid())
606 return;
607
608 if (paused_) {
609 // Enforce calling the error handler asynchronously if the user has paused
610 // receiving messages. We need to wait until the user starts receiving
611 // messages again.
612 force_async_handler = true;
613 }
614
615 if (!force_pipe_reset && force_async_handler)
616 force_pipe_reset = true;
617
618 if (force_pipe_reset) {
619 CancelWait();
620 internal::MayAutoLock locker(&lock_);
621 message_pipe_.reset();
622 MessagePipe dummy_pipe;
623 message_pipe_ = std::move(dummy_pipe.handle0);
624 } else {
625 CancelWait();
626 }
627
628 if (force_async_handler) {
629 if (!paused_)
630 WaitToReadMore();
631 } else {
632 error_ = true;
633 if (connection_error_handler_)
634 std::move(connection_error_handler_).Run();
635 }
636 }
637
EnsureSyncWatcherExists()638 void Connector::EnsureSyncWatcherExists() {
639 if (sync_watcher_)
640 return;
641 sync_watcher_.reset(new SyncHandleWatcher(
642 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
643 base::BindRepeating(&Connector::OnSyncHandleWatcherHandleReady,
644 base::Unretained(this))));
645 }
646
647 } // namespace mojo
648