1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/ports/node.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <utility>
13 #include <vector>
14
15 #include "mozilla/Mutex.h"
16 #include "mozilla/RandomNum.h"
17 #include "nsTArray.h"
18
19 #include "base/logging.h"
20 #include "mojo/core/ports/event.h"
21 #include "mojo/core/ports/node_delegate.h"
22 #include "mojo/core/ports/port_locker.h"
23
24 namespace mojo {
25 namespace core {
26 namespace ports {
27
28 namespace {
29
DebugError(const char * message,int error_code)30 int DebugError(const char* message, int error_code) {
31 NOTREACHED() << "Oops: " << message;
32 return error_code;
33 }
34
35 #define OOPS(x) DebugError(#x, x)
36
CanAcceptMoreMessages(const Port * port)37 bool CanAcceptMoreMessages(const Port* port) {
38 // Have we already doled out the last message (i.e., do we expect to NOT
39 // receive further messages)?
40 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
41 if (port->state == Port::kClosed) {
42 return false;
43 }
44 if (port->peer_closed || port->remove_proxy_on_last_message) {
45 if (port->peer_lost_unexpectedly) {
46 return port->message_queue.HasNextMessage();
47 }
48 if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
49 return false;
50 }
51 }
52 return true;
53 }
54
GenerateRandomPortName(PortName * name)55 void GenerateRandomPortName(PortName* name) {
56 // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when
57 // generating port names to keep this overhead down. If this method starts
58 // showing up on profiles we should consider doing the same.
59 *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()};
60 }
61
62 } // namespace
63
Node(const NodeName & name,NodeDelegate * delegate)64 Node::Node(const NodeName& name, NodeDelegate* delegate)
65 : name_(name), delegate_(this, delegate) {}
66
~Node()67 Node::~Node() {
68 if (!ports_.empty()) {
69 DLOG(WARNING) << "Unclean shutdown for node " << name_;
70 }
71 }
72
CanShutdownCleanly(ShutdownPolicy policy)73 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
74 PortLocker::AssertNoPortsLockedOnCurrentThread();
75 mozilla::MutexAutoLock ports_lock(ports_lock_);
76
77 if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
78 #ifdef DEBUG
79 for (auto& entry : ports_) {
80 DVLOG(2) << "Port " << entry.first << " referencing node "
81 << entry.second->peer_node_name << " is blocking shutdown of "
82 << "node " << name_ << " (state=" << entry.second->state << ")";
83 }
84 #endif
85 return ports_.empty();
86 }
87
88 DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
89
90 // NOTE: This is not efficient, though it probably doesn't need to be since
91 // relatively few ports should be open during shutdown and shutdown doesn't
92 // need to be blazingly fast.
93 bool can_shutdown = true;
94 for (auto& entry : ports_) {
95 PortRef port_ref(entry.first, entry.second);
96 SinglePortLocker locker(&port_ref);
97 auto* port = locker.port();
98 if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
99 can_shutdown = false;
100 #ifdef DEBUG
101 DVLOG(2) << "Port " << entry.first << " referencing node "
102 << port->peer_node_name << " is blocking shutdown of "
103 << "node " << name_ << " (state=" << port->state << ")";
104 #else
105 // Exit early when not debugging.
106 break;
107 #endif
108 }
109 }
110
111 return can_shutdown;
112 }
113
GetPort(const PortName & port_name,PortRef * port_ref)114 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
115 PortLocker::AssertNoPortsLockedOnCurrentThread();
116 mozilla::MutexAutoLock lock(ports_lock_);
117 auto iter = ports_.find(port_name);
118 if (iter == ports_.end()) {
119 return ERROR_PORT_UNKNOWN;
120 }
121
122 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
123 // Workaround for https://crbug.com/665869.
124 std::atomic_thread_fence(std::memory_order_seq_cst);
125 #endif
126
127 *port_ref = PortRef(port_name, iter->second);
128 return OK;
129 }
130
CreateUninitializedPort(PortRef * port_ref)131 int Node::CreateUninitializedPort(PortRef* port_ref) {
132 PortName port_name;
133 GenerateRandomPortName(&port_name);
134
135 RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
136 int rv = AddPortWithName(port_name, port);
137 if (rv != OK) {
138 return rv;
139 }
140
141 *port_ref = PortRef(port_name, std::move(port));
142 return OK;
143 }
144
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)145 int Node::InitializePort(const PortRef& port_ref,
146 const NodeName& peer_node_name,
147 const PortName& peer_port_name) {
148 {
149 // Must be acquired for UpdatePortPeerAddress below.
150 PortLocker::AssertNoPortsLockedOnCurrentThread();
151 mozilla::MutexAutoLock ports_lock(ports_lock_);
152
153 SinglePortLocker locker(&port_ref);
154 auto* port = locker.port();
155 if (port->state != Port::kUninitialized) {
156 return ERROR_PORT_STATE_UNEXPECTED;
157 }
158
159 port->state = Port::kReceiving;
160 UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
161 peer_port_name);
162 }
163
164 delegate_->PortStatusChanged(port_ref);
165
166 return OK;
167 }
168
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)169 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
170 int rv;
171
172 rv = CreateUninitializedPort(port0_ref);
173 if (rv != OK) {
174 return rv;
175 }
176
177 rv = CreateUninitializedPort(port1_ref);
178 if (rv != OK) {
179 return rv;
180 }
181
182 rv = InitializePort(*port0_ref, name_, port1_ref->name());
183 if (rv != OK) {
184 return rv;
185 }
186
187 rv = InitializePort(*port1_ref, name_, port0_ref->name());
188 if (rv != OK) {
189 return rv;
190 }
191
192 return OK;
193 }
194
SetUserData(const PortRef & port_ref,RefPtr<UserData> user_data)195 int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
196 SinglePortLocker locker(&port_ref);
197 auto* port = locker.port();
198 if (port->state == Port::kClosed) {
199 return ERROR_PORT_STATE_UNEXPECTED;
200 }
201
202 port->user_data = std::move(user_data);
203
204 return OK;
205 }
206
GetUserData(const PortRef & port_ref,RefPtr<UserData> * user_data)207 int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
208 SinglePortLocker locker(&port_ref);
209 auto* port = locker.port();
210 if (port->state == Port::kClosed) {
211 return ERROR_PORT_STATE_UNEXPECTED;
212 }
213
214 *user_data = port->user_data;
215
216 return OK;
217 }
218
ClosePort(const PortRef & port_ref)219 int Node::ClosePort(const PortRef& port_ref) {
220 std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
221 NodeName peer_node_name;
222 PortName peer_port_name;
223 uint64_t last_sequence_num = 0;
224 bool was_initialized = false;
225 {
226 SinglePortLocker locker(&port_ref);
227 auto* port = locker.port();
228 switch (port->state) {
229 case Port::kUninitialized:
230 break;
231
232 case Port::kReceiving:
233 was_initialized = true;
234 port->state = Port::kClosed;
235
236 // We pass along the sequence number of the last message sent from this
237 // port to allow the peer to have the opportunity to consume all inbound
238 // messages before notifying the embedder that this port is closed.
239 last_sequence_num = port->next_sequence_num_to_send - 1;
240
241 peer_node_name = port->peer_node_name;
242 peer_port_name = port->peer_port_name;
243
244 // If the port being closed still has unread messages, then we need to
245 // take care to close those ports so as to avoid leaking memory.
246 port->message_queue.TakeAllMessages(&undelivered_messages);
247 break;
248
249 default:
250 return ERROR_PORT_STATE_UNEXPECTED;
251 }
252 }
253
254 ErasePort(port_ref.name());
255
256 if (was_initialized) {
257 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
258 << name_ << " to " << peer_port_name << "@" << peer_node_name;
259 delegate_->ForwardEvent(peer_node_name,
260 mozilla::MakeUnique<ObserveClosureEvent>(
261 peer_port_name, last_sequence_num));
262 for (const auto& message : undelivered_messages) {
263 for (size_t i = 0; i < message->num_ports(); ++i) {
264 PortRef ref;
265 if (GetPort(message->ports()[i], &ref) == OK) {
266 ClosePort(ref);
267 }
268 }
269 }
270 }
271 return OK;
272 }
273
GetStatus(const PortRef & port_ref,PortStatus * port_status)274 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
275 SinglePortLocker locker(&port_ref);
276 auto* port = locker.port();
277 if (port->state != Port::kReceiving) {
278 return ERROR_PORT_STATE_UNEXPECTED;
279 }
280
281 port_status->has_messages = port->message_queue.HasNextMessage();
282 port_status->receiving_messages = CanAcceptMoreMessages(port);
283 port_status->peer_closed = port->peer_closed;
284 port_status->peer_remote = port->peer_node_name != name_;
285 port_status->queued_message_count =
286 port->message_queue.queued_message_count();
287 port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
288 port_status->unacknowledged_message_count =
289 port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
290 1;
291
292 return OK;
293 }
294
GetMessage(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> * message,MessageFilter * filter)295 int Node::GetMessage(const PortRef& port_ref,
296 mozilla::UniquePtr<UserMessageEvent>* message,
297 MessageFilter* filter) {
298 *message = nullptr;
299
300 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
301
302 NodeName peer_node_name;
303 ScopedEvent ack_event;
304 {
305 SinglePortLocker locker(&port_ref);
306 auto* port = locker.port();
307
308 // This could also be treated like the port being unknown since the
309 // embedder should no longer be referring to a port that has been sent.
310 if (port->state != Port::kReceiving) {
311 return ERROR_PORT_STATE_UNEXPECTED;
312 }
313
314 // Let the embedder get messages until there are no more before reporting
315 // that the peer closed its end.
316 if (!CanAcceptMoreMessages(port)) {
317 return ERROR_PORT_PEER_CLOSED;
318 }
319
320 port->message_queue.GetNextMessage(message, filter);
321 if (*message &&
322 (*message)->sequence_num() == port->sequence_num_to_acknowledge) {
323 peer_node_name = port->peer_node_name;
324 ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
325 port->peer_port_name, port->sequence_num_to_acknowledge);
326 }
327 }
328
329 if (ack_event) {
330 delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
331 }
332
333 // Allow referenced ports to trigger PortStatusChanged calls.
334 if (*message) {
335 for (size_t i = 0; i < (*message)->num_ports(); ++i) {
336 PortRef new_port_ref;
337 int rv = GetPort((*message)->ports()[i], &new_port_ref);
338
339 DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
340 << " does not exist!";
341
342 SinglePortLocker locker(&new_port_ref);
343 DCHECK(locker.port()->state == Port::kReceiving);
344 locker.port()->message_queue.set_signalable(true);
345 }
346
347 // The user may retransmit this message from another port. We reset the
348 // sequence number so that the message will get a new one if that happens.
349 (*message)->set_sequence_num(0);
350 }
351
352 return OK;
353 }
354
SendUserMessage(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> message)355 int Node::SendUserMessage(const PortRef& port_ref,
356 mozilla::UniquePtr<UserMessageEvent> message) {
357 int rv = SendUserMessageInternal(port_ref, &message);
358 if (rv != OK) {
359 // If send failed, close all carried ports. Note that we're careful not to
360 // close the sending port itself if it happened to be one of the encoded
361 // ports (an invalid but possible condition.)
362 for (size_t i = 0; i < message->num_ports(); ++i) {
363 if (message->ports()[i] == port_ref.name()) {
364 continue;
365 }
366
367 PortRef port;
368 if (GetPort(message->ports()[i], &port) == OK) {
369 ClosePort(port);
370 }
371 }
372 }
373 return rv;
374 }
375
SetAcknowledgeRequestInterval(const PortRef & port_ref,uint64_t sequence_num_acknowledge_interval)376 int Node::SetAcknowledgeRequestInterval(
377 const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
378 NodeName peer_node_name;
379 PortName peer_port_name;
380 uint64_t sequence_num_to_request_ack = 0;
381 {
382 SinglePortLocker locker(&port_ref);
383 auto* port = locker.port();
384 if (port->state != Port::kReceiving) {
385 return ERROR_PORT_STATE_UNEXPECTED;
386 }
387
388 port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
389 if (!sequence_num_acknowledge_interval) {
390 return OK;
391 }
392
393 peer_node_name = port->peer_node_name;
394 peer_port_name = port->peer_port_name;
395
396 sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
397 sequence_num_acknowledge_interval;
398 }
399
400 delegate_->ForwardEvent(peer_node_name,
401 mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
402 peer_port_name, sequence_num_to_request_ack));
403 return OK;
404 }
405
AcceptEvent(ScopedEvent event)406 int Node::AcceptEvent(ScopedEvent event) {
407 switch (event->type()) {
408 case Event::Type::kUserMessage:
409 return OnUserMessage(Event::Cast<UserMessageEvent>(&event));
410 case Event::Type::kPortAccepted:
411 return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event));
412 case Event::Type::kObserveProxy:
413 return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event));
414 case Event::Type::kObserveProxyAck:
415 return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event));
416 case Event::Type::kObserveClosure:
417 return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event));
418 case Event::Type::kMergePort:
419 return OnMergePort(Event::Cast<MergePortEvent>(&event));
420 case Event::Type::kUserMessageReadAckRequest:
421 return OnUserMessageReadAckRequest(
422 Event::Cast<UserMessageReadAckRequestEvent>(&event));
423 case Event::Type::kUserMessageReadAck:
424 return OnUserMessageReadAck(Event::Cast<UserMessageReadAckEvent>(&event));
425 }
426 return OOPS(ERROR_NOT_IMPLEMENTED);
427 }
428
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)429 int Node::MergePorts(const PortRef& port_ref,
430 const NodeName& destination_node_name,
431 const PortName& destination_port_name) {
432 PortName new_port_name;
433 Event::PortDescriptor new_port_descriptor;
434 {
435 // Must be held for ConvertToProxy.
436 PortLocker::AssertNoPortsLockedOnCurrentThread();
437 mozilla::MutexAutoLock ports_locker(ports_lock_);
438
439 SinglePortLocker locker(&port_ref);
440
441 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
442 << " to " << destination_port_name << "@" << destination_node_name;
443
444 // Send the port-to-merge over to the destination node so it can be merged
445 // into the port cycle atomically there.
446 new_port_name = port_ref.name();
447 ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
448 &new_port_descriptor);
449 }
450
451 if (new_port_descriptor.peer_node_name == name_ &&
452 destination_node_name != name_) {
453 // Ensure that the locally retained peer of the new proxy gets a status
454 // update so it notices that its peer is now remote.
455 PortRef local_peer;
456 if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
457 delegate_->PortStatusChanged(local_peer);
458 }
459 }
460
461 delegate_->ForwardEvent(
462 destination_node_name,
463 mozilla::MakeUnique<MergePortEvent>(destination_port_name, new_port_name,
464 new_port_descriptor));
465 return OK;
466 }
467
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)468 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
469 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
470 << " and " << port1_ref.name() << "@" << name_;
471 return MergePortsInternal(port0_ref, port1_ref,
472 true /* allow_close_on_bad_state */);
473 }
474
LostConnectionToNode(const NodeName & node_name)475 int Node::LostConnectionToNode(const NodeName& node_name) {
476 // We can no longer send events to the given node. We also can't expect any
477 // PortAccepted events.
478
479 DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
480 << node_name;
481
482 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
483 return OK;
484 }
485
OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message)486 int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
487 PortName port_name = message->port_name();
488
489 #ifdef DEBUG
490 std::ostringstream ports_buf;
491 for (size_t i = 0; i < message->num_ports(); ++i) {
492 if (i > 0) {
493 ports_buf << ",";
494 }
495 ports_buf << message->ports()[i];
496 }
497
498 DVLOG(4) << "OnUserMessage " << message->sequence_num()
499 << " [ports=" << ports_buf.str() << "] at " << port_name << "@"
500 << name_;
501 #endif
502
503 // Even if this port does not exist, cannot receive anymore messages or is
504 // buffering or proxying messages, we still need these ports to be bound to
505 // this node. When the message is forwarded, these ports will get transferred
506 // following the usual method. If the message cannot be accepted, then the
507 // newly bound ports will simply be closed.
508 for (size_t i = 0; i < message->num_ports(); ++i) {
509 Event::PortDescriptor& descriptor = message->port_descriptors()[i];
510 if (descriptor.referring_node_name == kInvalidNodeName) {
511 // If the referring node name is invalid, this descriptor can be ignored
512 // and the port should already exist locally.
513 PortRef port_ref;
514 if (GetPort(message->ports()[i], &port_ref) != OK) {
515 return ERROR_PORT_UNKNOWN;
516 }
517 } else {
518 int rv = AcceptPort(message->ports()[i], descriptor);
519 if (rv != OK) {
520 return rv;
521 }
522
523 // Ensure that the referring node is wiped out of this descriptor. This
524 // allows the event to be forwarded across multiple local hops without
525 // attempting to accept the port more than once.
526 descriptor.referring_node_name = kInvalidNodeName;
527 }
528 }
529
530 PortRef port_ref;
531 GetPort(port_name, &port_ref);
532 bool has_next_message = false;
533 bool message_accepted = false;
534 bool should_forward_messages = false;
535 if (port_ref.is_valid()) {
536 SinglePortLocker locker(&port_ref);
537 auto* port = locker.port();
538
539 // Reject spurious messages if we've already received the last expected
540 // message.
541 if (CanAcceptMoreMessages(port)) {
542 message_accepted = true;
543 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
544
545 if (port->state == Port::kBuffering) {
546 has_next_message = false;
547 } else if (port->state == Port::kProxying) {
548 has_next_message = false;
549 should_forward_messages = true;
550 }
551 }
552 }
553
554 if (should_forward_messages) {
555 int rv = ForwardUserMessagesFromProxy(port_ref);
556 if (rv != OK) {
557 return rv;
558 }
559 TryRemoveProxy(port_ref);
560 }
561
562 if (!message_accepted) {
563 DVLOG(2) << "Message not accepted!\n";
564 // Close all newly accepted ports as they are effectively orphaned.
565 for (size_t i = 0; i < message->num_ports(); ++i) {
566 PortRef attached_port_ref;
567 if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
568 ClosePort(attached_port_ref);
569 } else {
570 DLOG(WARNING) << "Cannot close non-existent port!\n";
571 }
572 }
573 } else if (has_next_message) {
574 delegate_->PortStatusChanged(port_ref);
575 }
576
577 return OK;
578 }
579
OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event)580 int Node::OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event) {
581 PortRef port_ref;
582 if (GetPort(event->port_name(), &port_ref) != OK) {
583 return ERROR_PORT_UNKNOWN;
584 }
585
586 #ifdef DEBUG
587 {
588 SinglePortLocker locker(&port_ref);
589 DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
590 << " pointing to " << locker.port()->peer_port_name << "@"
591 << locker.port()->peer_node_name;
592 }
593 #endif
594
595 return BeginProxying(port_ref);
596 }
597
OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event)598 int Node::OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event) {
599 if (event->port_name() == kInvalidPortName) {
600 // An ObserveProxy with an invalid target port name is a broadcast used to
601 // inform ports when their peer (which was itself a proxy) has become
602 // defunct due to unexpected node disconnection.
603 //
604 // Receiving ports affected by this treat it as equivalent to peer closure.
605 // Proxies affected by this can be removed and will in turn broadcast their
606 // own death with a similar message.
607 DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
608 DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
609 DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
610 return OK;
611 }
612
613 // The port may have already been closed locally, in which case the
614 // ObserveClosure message will contain the last_sequence_num field.
615 // We can then silently ignore this message.
616 PortRef port_ref;
617 if (GetPort(event->port_name(), &port_ref) != OK) {
618 DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
619 << " not found";
620 return OK;
621 }
622
623 DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
624 << ", proxy at " << event->proxy_port_name() << "@"
625 << event->proxy_node_name() << " pointing to "
626 << event->proxy_target_port_name() << "@"
627 << event->proxy_target_node_name();
628
629 bool peer_changed = false;
630 ScopedEvent event_to_forward;
631 NodeName event_target_node;
632 {
633 // Must be acquired for UpdatePortPeerAddress below.
634 PortLocker::AssertNoPortsLockedOnCurrentThread();
635 mozilla::MutexAutoLock ports_locker(ports_lock_);
636
637 SinglePortLocker locker(&port_ref);
638 auto* port = locker.port();
639
640 if (port->peer_node_name == event->proxy_node_name() &&
641 port->peer_port_name == event->proxy_port_name()) {
642 if (port->state == Port::kReceiving) {
643 UpdatePortPeerAddress(port_ref.name(), port,
644 event->proxy_target_node_name(),
645 event->proxy_target_port_name());
646 event_target_node = event->proxy_node_name();
647 event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>(
648 event->proxy_port_name(), port->next_sequence_num_to_send - 1);
649 peer_changed = true;
650 DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
651 << "@" << name_ << " to " << event->proxy_port_name() << "@"
652 << event_target_node;
653 } else {
654 // As a proxy ourselves, we don't know how to honor the ObserveProxy
655 // event or to populate the last_sequence_num field of ObserveProxyAck.
656 // Afterall, another port could be sending messages to our peer now
657 // that we've sent out our own ObserveProxy event. Instead, we will
658 // send an ObserveProxyAck indicating that the ObserveProxy event
659 // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
660 // However, this has to be done after we are removed as a proxy.
661 // Otherwise, we might just find ourselves back here again, which
662 // would be akin to a busy loop.
663
664 DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
665 << "@" << event->proxy_node_name();
666
667 port->send_on_proxy_removal =
668 mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>(
669 event->proxy_node_name(),
670 mozilla::MakeUnique<ObserveProxyAckEvent>(
671 event->proxy_port_name(), kInvalidSequenceNum));
672 }
673 } else {
674 // Forward this event along to our peer. Eventually, it should find the
675 // port referring to the proxy.
676 event_target_node = port->peer_node_name;
677 event->set_port_name(port->peer_port_name);
678 event_to_forward = std::move(event);
679 }
680 }
681
682 if (event_to_forward) {
683 delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
684 }
685
686 if (peer_changed) {
687 // Re-send ack and/or ack requests, as the previous peer proxy may not have
688 // forwarded the previous request before it died.
689 MaybeResendAck(port_ref);
690 MaybeResendAckRequest(port_ref);
691
692 delegate_->PortStatusChanged(port_ref);
693 }
694
695 return OK;
696 }
697
OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event)698 int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) {
699 DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
700 << " (last_sequence_num=" << event->last_sequence_num() << ")";
701
702 PortRef port_ref;
703 if (GetPort(event->port_name(), &port_ref) != OK) {
704 return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
705 }
706
707 bool try_remove_proxy_immediately;
708 {
709 SinglePortLocker locker(&port_ref);
710 auto* port = locker.port();
711 if (port->state != Port::kProxying) {
712 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
713 }
714
715 // If the last sequence number is invalid, this is a signal that we need to
716 // retransmit the ObserveProxy event for this port rather than flagging the
717 // the proxy for removal ASAP.
718 try_remove_proxy_immediately =
719 event->last_sequence_num() != kInvalidSequenceNum;
720 if (try_remove_proxy_immediately) {
721 // We can now remove this port once we have received and forwarded the
722 // last message addressed to this port.
723 port->remove_proxy_on_last_message = true;
724 port->last_sequence_num_to_receive = event->last_sequence_num();
725 }
726 }
727
728 if (try_remove_proxy_immediately) {
729 TryRemoveProxy(port_ref);
730 } else {
731 InitiateProxyRemoval(port_ref);
732 }
733
734 return OK;
735 }
736
OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event)737 int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) {
738 // OK if the port doesn't exist, as it may have been closed already.
739 PortRef port_ref;
740 if (GetPort(event->port_name(), &port_ref) != OK) {
741 return OK;
742 }
743
744 // This message tells the port that it should no longer expect more messages
745 // beyond last_sequence_num. This message is forwarded along until we reach
746 // the receiving end, and this message serves as an equivalent to
747 // ObserveProxyAck.
748
749 bool notify_delegate = false;
750 NodeName peer_node_name;
751 PortName peer_port_name;
752 bool try_remove_proxy = false;
753 {
754 SinglePortLocker locker(&port_ref);
755 auto* port = locker.port();
756
757 port->peer_closed = true;
758 port->last_sequence_num_to_receive = event->last_sequence_num();
759
760 DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
761 << " (state=" << port->state << ") pointing to "
762 << port->peer_port_name << "@" << port->peer_node_name
763 << " (last_sequence_num=" << event->last_sequence_num() << ")";
764
765 // We always forward ObserveClosure, even beyond the receiving port which
766 // cares about it. This ensures that any dead-end proxies beyond that port
767 // are notified to remove themselves.
768
769 if (port->state == Port::kReceiving) {
770 notify_delegate = true;
771
772 // When forwarding along the other half of the port cycle, this will only
773 // reach dead-end proxies. Tell them we've sent our last message so they
774 // can go away.
775 //
776 // TODO: Repurposing ObserveClosure for this has the desired result but
777 // may be semantically confusing since the forwarding port is not actually
778 // closed. Consider replacing this with a new event type.
779 event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
780
781 // Treat the closure as an acknowledge that all sent messages have been
782 // read from the other end.
783 port->last_sequence_num_acknowledged =
784 port->next_sequence_num_to_send - 1;
785 } else {
786 // We haven't yet reached the receiving peer of the closed port, so we'll
787 // forward the message along as-is.
788 // See about removing the port if it is a proxy as our peer won't be able
789 // to participate in proxy removal.
790 port->remove_proxy_on_last_message = true;
791 if (port->state == Port::kProxying) {
792 try_remove_proxy = true;
793 }
794 }
795
796 DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
797 << name_ << " to peer " << port->peer_port_name << "@"
798 << port->peer_node_name
799 << " (last_sequence_num=" << event->last_sequence_num() << ")";
800
801 peer_node_name = port->peer_node_name;
802 peer_port_name = port->peer_port_name;
803 }
804
805 if (try_remove_proxy) {
806 TryRemoveProxy(port_ref);
807 }
808
809 event->set_port_name(peer_port_name);
810 delegate_->ForwardEvent(peer_node_name, std::move(event));
811
812 if (notify_delegate) {
813 delegate_->PortStatusChanged(port_ref);
814 }
815
816 return OK;
817 }
818
OnMergePort(mozilla::UniquePtr<MergePortEvent> event)819 int Node::OnMergePort(mozilla::UniquePtr<MergePortEvent> event) {
820 PortRef port_ref;
821 GetPort(event->port_name(), &port_ref);
822
823 DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
824 << " merging with proxy " << event->new_port_name() << "@" << name_
825 << " pointing to " << event->new_port_descriptor().peer_port_name
826 << "@" << event->new_port_descriptor().peer_node_name
827 << " referred by "
828 << event->new_port_descriptor().referring_port_name << "@"
829 << event->new_port_descriptor().referring_node_name;
830
831 // Accept the new port. This is now the receiving end of the other port cycle
832 // to be merged with ours. Note that we always attempt to accept the new port
833 // first as otherwise its peer receiving port could be left stranded
834 // indefinitely.
835 if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
836 if (port_ref.is_valid()) {
837 ClosePort(port_ref);
838 }
839 return ERROR_PORT_STATE_UNEXPECTED;
840 }
841
842 PortRef new_port_ref;
843 GetPort(event->new_port_name(), &new_port_ref);
844 if (!port_ref.is_valid() && new_port_ref.is_valid()) {
845 ClosePort(new_port_ref);
846 return ERROR_PORT_UNKNOWN;
847 }
848 if (port_ref.is_valid() && !new_port_ref.is_valid()) {
849 ClosePort(port_ref);
850 return ERROR_PORT_UNKNOWN;
851 }
852
853 return MergePortsInternal(port_ref, new_port_ref,
854 false /* allow_close_on_bad_state */);
855 }
856
OnUserMessageReadAckRequest(mozilla::UniquePtr<UserMessageReadAckRequestEvent> event)857 int Node::OnUserMessageReadAckRequest(
858 mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) {
859 PortRef port_ref;
860 GetPort(event->port_name(), &port_ref);
861
862 DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
863 << event->sequence_num_to_acknowledge();
864
865 if (!port_ref.is_valid()) {
866 return ERROR_PORT_UNKNOWN;
867 }
868
869 NodeName peer_node_name;
870 mozilla::UniquePtr<Event> event_to_send;
871 {
872 SinglePortLocker locker(&port_ref);
873 auto* port = locker.port();
874
875 peer_node_name = port->peer_node_name;
876 if (port->state == Port::kProxying) {
877 // Proxies simply forward the ack request to their peer.
878 event->set_port_name(port->peer_port_name);
879 event_to_send = std::move(event);
880 } else {
881 uint64_t current_sequence_num =
882 port->message_queue.next_sequence_num() - 1;
883 // Either this is requesting an ack for a sequence number already read, or
884 // else for a sequence number that is yet to be read.
885 if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
886 // If the current sequence number to read already exceeds the ack
887 // request, send an ack immediately.
888 event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>(
889 port->peer_port_name, current_sequence_num);
890
891 // This might be a late or duplicate acknowledge request, that's
892 // requesting acknowledge for an already read message. There may already
893 // have been a request for future reads, so take care not to back up
894 // the requested acknowledge counter.
895 if (current_sequence_num > port->sequence_num_to_acknowledge) {
896 port->sequence_num_to_acknowledge = current_sequence_num;
897 }
898 } else {
899 // This is request to ack a sequence number that hasn't been read yet.
900 // The state of the port can either be that it already has a
901 // future-requested ack, or not. Because ack requests aren't guaranteed
902 // to arrive in order, store the earlier of the current queued request
903 // and the new one, if one was already requested.
904 bool has_queued_ack_request =
905 port->sequence_num_to_acknowledge > current_sequence_num;
906 if (!has_queued_ack_request ||
907 port->sequence_num_to_acknowledge >
908 event->sequence_num_to_acknowledge()) {
909 port->sequence_num_to_acknowledge =
910 event->sequence_num_to_acknowledge();
911 }
912 return OK;
913 }
914 }
915 }
916
917 if (event_to_send) {
918 delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
919 }
920
921 return OK;
922 }
923
OnUserMessageReadAck(mozilla::UniquePtr<UserMessageReadAckEvent> event)924 int Node::OnUserMessageReadAck(
925 mozilla::UniquePtr<UserMessageReadAckEvent> event) {
926 PortRef port_ref;
927 GetPort(event->port_name(), &port_ref);
928
929 DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
930 << event->sequence_num_acknowledged();
931
932 NodeName peer_node_name;
933 ScopedEvent ack_request_event;
934 if (port_ref.is_valid()) {
935 SinglePortLocker locker(&port_ref);
936 auto* port = locker.port();
937
938 if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
939 // TODO(http://crbug.com/980952): This is a malformed event.
940 // This could return a new error "ERROR_MALFORMED_EVENT" which the
941 // delegate could use as a signal to drop the peer node.
942 return OK;
943 }
944
945 // Keep the largest acknowledge seen.
946 if (event->sequence_num_acknowledged() <=
947 port->last_sequence_num_acknowledged) {
948 // The acknowledge was late or a duplicate, it's safe to ignore it.
949 return OK;
950 }
951
952 port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
953 // Send another ack request if the interval is non-zero and the peer has
954 // not been closed.
955 if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
956 peer_node_name = port->peer_node_name;
957 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
958 port->peer_port_name, port->last_sequence_num_acknowledged +
959 port->sequence_num_acknowledge_interval);
960 }
961 }
962 if (ack_request_event) {
963 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
964 }
965
966 if (port_ref.is_valid()) {
967 delegate_->PortStatusChanged(port_ref);
968 }
969
970 return OK;
971 }
972
AddPortWithName(const PortName & port_name,RefPtr<Port> port)973 int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
974 PortLocker::AssertNoPortsLockedOnCurrentThread();
975 mozilla::MutexAutoLock lock(ports_lock_);
976 if (port->peer_port_name != kInvalidPortName) {
977 DCHECK_NE(kInvalidNodeName, port->peer_node_name);
978 peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
979 port_name, PortRef(port_name, port));
980 }
981 if (!ports_.emplace(port_name, std::move(port)).second) {
982 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
983 }
984 DVLOG(2) << "Created port " << port_name << "@" << name_;
985 return OK;
986 }
987
ErasePort(const PortName & port_name)988 void Node::ErasePort(const PortName& port_name) {
989 PortLocker::AssertNoPortsLockedOnCurrentThread();
990 RefPtr<Port> port;
991 {
992 mozilla::MutexAutoLock lock(ports_lock_);
993 auto it = ports_.find(port_name);
994 if (it == ports_.end()) {
995 return;
996 }
997 port = std::move(it->second);
998 ports_.erase(it);
999
1000 RemoveFromPeerPortMap(port_name, port.get());
1001 }
1002 // NOTE: We are careful not to release the port's messages while holding any
1003 // locks, since they may run arbitrary user code upon destruction.
1004 std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
1005 {
1006 PortRef port_ref(port_name, std::move(port));
1007 SinglePortLocker locker(&port_ref);
1008 locker.port()->message_queue.TakeAllMessages(&messages);
1009 }
1010 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
1011 }
1012
SendUserMessageInternal(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> * message)1013 int Node::SendUserMessageInternal(
1014 const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
1015 mozilla::UniquePtr<UserMessageEvent>& m = *message;
1016 for (size_t i = 0; i < m->num_ports(); ++i) {
1017 if (m->ports()[i] == port_ref.name()) {
1018 return ERROR_PORT_CANNOT_SEND_SELF;
1019 }
1020 }
1021
1022 NodeName target_node;
1023 int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
1024 false /* ignore_closed_peer */, m.get(),
1025 &target_node);
1026 if (rv != OK) {
1027 return rv;
1028 }
1029
1030 // Beyond this point there's no sense in returning anything but OK. Even if
1031 // message forwarding or acceptance fails, there's nothing the embedder can
1032 // do to recover. Assume that failure beyond this point must be treated as a
1033 // transport failure.
1034
1035 DCHECK_NE(kInvalidNodeName, target_node);
1036 if (target_node != name_) {
1037 delegate_->ForwardEvent(target_node, std::move(m));
1038 return OK;
1039 }
1040
1041 int accept_result = AcceptEvent(std::move(m));
1042 if (accept_result != OK) {
1043 // See comment above for why we don't return an error in this case.
1044 DVLOG(2) << "AcceptEvent failed: " << accept_result;
1045 }
1046
1047 return OK;
1048 }
1049
MergePortsInternal(const PortRef & port0_ref,const PortRef & port1_ref,bool allow_close_on_bad_state)1050 int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
1051 bool allow_close_on_bad_state) {
1052 const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
1053 {
1054 // Needed to swap peer map entries below.
1055 PortLocker::AssertNoPortsLockedOnCurrentThread();
1056 mozilla::Maybe<mozilla::MutexAutoLock> ports_locker(std::in_place,
1057 ports_lock_);
1058
1059 mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2));
1060 auto* port0 = locker->GetPort(port0_ref);
1061 auto* port1 = locker->GetPort(port1_ref);
1062
1063 // There are several conditions which must be met before we'll consider
1064 // merging two ports:
1065 //
1066 // - They must both be in the kReceiving state
1067 // - They must not be each other's peer
1068 // - They must have never sent a user message
1069 //
1070 // If any of these criteria are not met, we fail early.
1071 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
1072 (port0->peer_node_name == name_ &&
1073 port0->peer_port_name == port1_ref.name()) ||
1074 (port1->peer_node_name == name_ &&
1075 port1->peer_port_name == port0_ref.name()) ||
1076 port0->next_sequence_num_to_send != kInitialSequenceNum ||
1077 port1->next_sequence_num_to_send != kInitialSequenceNum) {
1078 // On failure, we only close a port if it was at least properly in the
1079 // |kReceiving| state. This avoids getting the system in an inconsistent
1080 // state by e.g. closing a proxy abruptly.
1081 //
1082 // Note that we must release the port locks before closing ports.
1083 const bool close_port0 =
1084 port0->state == Port::kReceiving || allow_close_on_bad_state;
1085 const bool close_port1 =
1086 port1->state == Port::kReceiving || allow_close_on_bad_state;
1087 locker.reset();
1088 ports_locker.reset();
1089 if (close_port0) {
1090 ClosePort(port0_ref);
1091 }
1092 if (close_port1) {
1093 ClosePort(port1_ref);
1094 }
1095 return ERROR_PORT_STATE_UNEXPECTED;
1096 }
1097
1098 // Swap the ports' peer information and switch them both to proxying mode.
1099 SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
1100 port0->state = Port::kProxying;
1101 port1->state = Port::kProxying;
1102 if (port0->peer_closed) {
1103 port0->remove_proxy_on_last_message = true;
1104 }
1105 if (port1->peer_closed) {
1106 port1->remove_proxy_on_last_message = true;
1107 }
1108 }
1109
1110 // Flush any queued messages from the new proxies and, if successful, complete
1111 // the merge by initiating proxy removals.
1112 if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
1113 ForwardUserMessagesFromProxy(port1_ref) == OK) {
1114 for (auto& port_ref : port_refs) {
1115 bool try_remove_proxy_immediately = false;
1116 ScopedEvent closure_event;
1117 NodeName closure_event_target_node;
1118 {
1119 SinglePortLocker locker(port_ref);
1120 auto* port = locker.port();
1121 DCHECK(port->state == Port::kProxying);
1122 try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1123 if (try_remove_proxy_immediately || port->peer_closed) {
1124 // If either end of the port cycle is closed, we propagate an
1125 // ObserveClosure event.
1126 closure_event_target_node = port->peer_node_name;
1127 closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
1128 port->peer_port_name, port->last_sequence_num_to_receive);
1129 }
1130 }
1131 if (try_remove_proxy_immediately) {
1132 TryRemoveProxy(*port_ref);
1133 } else {
1134 InitiateProxyRemoval(*port_ref);
1135 }
1136
1137 if (closure_event) {
1138 delegate_->ForwardEvent(closure_event_target_node,
1139 std::move(closure_event));
1140 }
1141 }
1142
1143 return OK;
1144 }
1145
1146 // If we failed to forward proxied messages, we keep the system in a
1147 // consistent state by undoing the peer swap and closing the ports.
1148 {
1149 PortLocker::AssertNoPortsLockedOnCurrentThread();
1150 mozilla::MutexAutoLock ports_locker(ports_lock_);
1151 PortLocker locker(port_refs, 2);
1152 auto* port0 = locker.GetPort(port0_ref);
1153 auto* port1 = locker.GetPort(port1_ref);
1154 SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
1155 port0->remove_proxy_on_last_message = false;
1156 port1->remove_proxy_on_last_message = false;
1157 DCHECK_EQ(Port::kProxying, port0->state);
1158 DCHECK_EQ(Port::kProxying, port1->state);
1159 port0->state = Port::kReceiving;
1160 port1->state = Port::kReceiving;
1161 }
1162
1163 ClosePort(port0_ref);
1164 ClosePort(port1_ref);
1165 return ERROR_PORT_STATE_UNEXPECTED;
1166 }
1167
ConvertToProxy(Port * port,const NodeName & to_node_name,PortName * port_name,Event::PortDescriptor * port_descriptor)1168 void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
1169 PortName* port_name,
1170 Event::PortDescriptor* port_descriptor) {
1171 port->AssertLockAcquired();
1172 PortName local_port_name = *port_name;
1173
1174 PortName new_port_name;
1175 GenerateRandomPortName(&new_port_name);
1176
1177 // Make sure we don't send messages to the new peer until after we know it
1178 // exists. In the meantime, just buffer messages locally.
1179 DCHECK(port->state == Port::kReceiving);
1180 port->state = Port::kBuffering;
1181
1182 // If we already know our peer is closed, we already know this proxy can
1183 // be removed once it receives and forwards its last expected message.
1184 if (port->peer_closed) {
1185 port->remove_proxy_on_last_message = true;
1186 }
1187
1188 *port_name = new_port_name;
1189
1190 port_descriptor->peer_node_name = port->peer_node_name;
1191 port_descriptor->peer_port_name = port->peer_port_name;
1192 port_descriptor->referring_node_name = name_;
1193 port_descriptor->referring_port_name = local_port_name;
1194 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
1195 port_descriptor->next_sequence_num_to_receive =
1196 port->message_queue.next_sequence_num();
1197 port_descriptor->last_sequence_num_to_receive =
1198 port->last_sequence_num_to_receive;
1199 port_descriptor->peer_closed = port->peer_closed;
1200 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
1201
1202 // Configure the local port to point to the new port.
1203 UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
1204 }
1205
AcceptPort(const PortName & port_name,const Event::PortDescriptor & port_descriptor)1206 int Node::AcceptPort(const PortName& port_name,
1207 const Event::PortDescriptor& port_descriptor) {
1208 RefPtr<Port> port =
1209 mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send,
1210 port_descriptor.next_sequence_num_to_receive);
1211 port->state = Port::kReceiving;
1212 port->peer_node_name = port_descriptor.peer_node_name;
1213 port->peer_port_name = port_descriptor.peer_port_name;
1214 port->last_sequence_num_to_receive =
1215 port_descriptor.last_sequence_num_to_receive;
1216 port->peer_closed = port_descriptor.peer_closed;
1217
1218 DVLOG(2) << "Accepting port " << port_name
1219 << " [peer_closed=" << port->peer_closed
1220 << "; last_sequence_num_to_receive="
1221 << port->last_sequence_num_to_receive << "]";
1222
1223 // A newly accepted port is not signalable until the message referencing the
1224 // new port finds its way to the consumer (see GetMessage).
1225 port->message_queue.set_signalable(false);
1226
1227 int rv = AddPortWithName(port_name, std::move(port));
1228 if (rv != OK) {
1229 return rv;
1230 }
1231
1232 // Allow referring port to forward messages.
1233 delegate_->ForwardEvent(port_descriptor.referring_node_name,
1234 mozilla::MakeUnique<PortAcceptedEvent>(
1235 port_descriptor.referring_port_name));
1236 return OK;
1237 }
1238
PrepareToForwardUserMessage(const PortRef & forwarding_port_ref,Port::State expected_port_state,bool ignore_closed_peer,UserMessageEvent * message,NodeName * forward_to_node)1239 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
1240 Port::State expected_port_state,
1241 bool ignore_closed_peer,
1242 UserMessageEvent* message,
1243 NodeName* forward_to_node) {
1244 bool target_is_remote = false;
1245 for (;;) {
1246 NodeName target_node_name;
1247 {
1248 SinglePortLocker locker(&forwarding_port_ref);
1249 target_node_name = locker.port()->peer_node_name;
1250 }
1251
1252 // NOTE: This may call out to arbitrary user code, so it's important to call
1253 // it only while no port locks are held on the calling thread.
1254 if (target_node_name != name_) {
1255 if (!message->NotifyWillBeRoutedExternally()) {
1256 CHROMIUM_LOG(ERROR)
1257 << "NotifyWillBeRoutedExternally failed unexpectedly.";
1258 return ERROR_PORT_STATE_UNEXPECTED;
1259 }
1260 }
1261
1262 // Must be held because ConvertToProxy needs to update |peer_port_maps_|.
1263 PortLocker::AssertNoPortsLockedOnCurrentThread();
1264 mozilla::MutexAutoLock ports_locker(ports_lock_);
1265
1266 // Simultaneously lock the forwarding port as well as all attached ports.
1267 AutoTArray<PortRef, 4> attached_port_refs;
1268 AutoTArray<const PortRef*, 5> ports_to_lock;
1269 attached_port_refs.SetCapacity(message->num_ports());
1270 ports_to_lock.SetCapacity(message->num_ports() + 1);
1271 ports_to_lock.AppendElement(&forwarding_port_ref);
1272 for (size_t i = 0; i < message->num_ports(); ++i) {
1273 const PortName& attached_port_name = message->ports()[i];
1274 auto iter = ports_.find(attached_port_name);
1275 DCHECK(iter != ports_.end());
1276 attached_port_refs.AppendElement(
1277 PortRef(attached_port_name, iter->second));
1278 ports_to_lock.AppendElement(&attached_port_refs[i]);
1279 }
1280 PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length());
1281 auto* forwarding_port = locker.GetPort(forwarding_port_ref);
1282
1283 if (forwarding_port->peer_node_name != target_node_name) {
1284 // The target node has already changed since we last held the lock.
1285 if (target_node_name == name_) {
1286 // If the target node was previously this local node, we need to restart
1287 // the loop, since that means we may now route the message externally.
1288 continue;
1289 }
1290
1291 target_node_name = forwarding_port->peer_node_name;
1292 }
1293 target_is_remote = target_node_name != name_;
1294
1295 if (forwarding_port->state != expected_port_state) {
1296 return ERROR_PORT_STATE_UNEXPECTED;
1297 }
1298 if (forwarding_port->peer_closed && !ignore_closed_peer) {
1299 return ERROR_PORT_PEER_CLOSED;
1300 }
1301
1302 // Messages may already have a sequence number if they're being forwarded by
1303 // a proxy. Otherwise, use the next outgoing sequence number.
1304 if (message->sequence_num() == 0) {
1305 message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
1306 }
1307 #ifdef DEBUG
1308 std::ostringstream ports_buf;
1309 for (size_t i = 0; i < message->num_ports(); ++i) {
1310 if (i > 0) {
1311 ports_buf << ",";
1312 }
1313 ports_buf << message->ports()[i];
1314 }
1315 #endif
1316
1317 if (message->num_ports() > 0) {
1318 // Sanity check to make sure we can actually send all the attached ports.
1319 // They must all be in the |kReceiving| state and must not be the sender's
1320 // own peer.
1321 DCHECK_EQ(message->num_ports(), attached_port_refs.Length());
1322 for (size_t i = 0; i < message->num_ports(); ++i) {
1323 auto* attached_port = locker.GetPort(attached_port_refs[i]);
1324 int error = OK;
1325 if (attached_port->state != Port::kReceiving) {
1326 error = ERROR_PORT_STATE_UNEXPECTED;
1327 } else if (attached_port_refs[i].name() ==
1328 forwarding_port->peer_port_name) {
1329 error = ERROR_PORT_CANNOT_SEND_PEER;
1330 }
1331
1332 if (error != OK) {
1333 // Not going to send. Backpedal on the sequence number.
1334 forwarding_port->next_sequence_num_to_send--;
1335 return error;
1336 }
1337 }
1338
1339 if (target_is_remote) {
1340 // We only bother to proxy and rewrite ports in the event if it's
1341 // going to be routed to an external node. This substantially reduces
1342 // the amount of port churn in the system, as many port-carrying
1343 // events are routed at least 1 or 2 intra-node hops before (if ever)
1344 // being routed externally.
1345 Event::PortDescriptor* port_descriptors = message->port_descriptors();
1346 for (size_t i = 0; i < message->num_ports(); ++i) {
1347 ConvertToProxy(locker.GetPort(attached_port_refs[i]),
1348 target_node_name, message->ports() + i,
1349 port_descriptors + i);
1350 }
1351 }
1352 }
1353
1354 #ifdef DEBUG
1355 DVLOG(4) << "Sending message " << message->sequence_num()
1356 << " [ports=" << ports_buf.str() << "]"
1357 << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
1358 << forwarding_port->peer_port_name << "@" << target_node_name;
1359 #endif
1360
1361 *forward_to_node = target_node_name;
1362 message->set_port_name(forwarding_port->peer_port_name);
1363 break;
1364 }
1365
1366 if (target_is_remote) {
1367 for (size_t i = 0; i < message->num_ports(); ++i) {
1368 // For any ports that were converted to proxies above, make sure their
1369 // prior local peer (if applicable) receives a status update so it can be
1370 // made aware of its peer's location.
1371 const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
1372 if (descriptor.peer_node_name == name_) {
1373 PortRef local_peer;
1374 if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
1375 delegate_->PortStatusChanged(local_peer);
1376 }
1377 }
1378 }
1379 }
1380
1381 return OK;
1382 }
1383
BeginProxying(const PortRef & port_ref)1384 int Node::BeginProxying(const PortRef& port_ref) {
1385 {
1386 SinglePortLocker locker(&port_ref);
1387 auto* port = locker.port();
1388 if (port->state != Port::kBuffering) {
1389 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1390 }
1391 port->state = Port::kProxying;
1392 }
1393
1394 int rv = ForwardUserMessagesFromProxy(port_ref);
1395 if (rv != OK) {
1396 return rv;
1397 }
1398
1399 // Forward any pending acknowledge request.
1400 MaybeForwardAckRequest(port_ref);
1401
1402 bool try_remove_proxy_immediately;
1403 ScopedEvent closure_event;
1404 NodeName closure_target_node;
1405 {
1406 SinglePortLocker locker(&port_ref);
1407 auto* port = locker.port();
1408 if (port->state != Port::kProxying) {
1409 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1410 }
1411
1412 try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1413 if (try_remove_proxy_immediately) {
1414 // Make sure we propagate closure to our current peer.
1415 closure_target_node = port->peer_node_name;
1416 closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
1417 port->peer_port_name, port->last_sequence_num_to_receive);
1418 }
1419 }
1420
1421 if (try_remove_proxy_immediately) {
1422 TryRemoveProxy(port_ref);
1423 delegate_->ForwardEvent(closure_target_node, std::move(closure_event));
1424 } else {
1425 InitiateProxyRemoval(port_ref);
1426 }
1427
1428 return OK;
1429 }
1430
ForwardUserMessagesFromProxy(const PortRef & port_ref)1431 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
1432 for (;;) {
1433 // NOTE: We forward messages in sequential order here so that we maintain
1434 // the message queue's notion of next sequence number. That's useful for the
1435 // proxy removal process as we can tell when this port has seen all of the
1436 // messages it is expected to see.
1437 mozilla::UniquePtr<UserMessageEvent> message;
1438 {
1439 SinglePortLocker locker(&port_ref);
1440 locker.port()->message_queue.GetNextMessage(&message, nullptr);
1441 if (!message) {
1442 break;
1443 }
1444 }
1445
1446 NodeName target_node;
1447 int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
1448 true /* ignore_closed_peer */,
1449 message.get(), &target_node);
1450 if (rv != OK) {
1451 return rv;
1452 }
1453
1454 delegate_->ForwardEvent(target_node, std::move(message));
1455 }
1456 return OK;
1457 }
1458
InitiateProxyRemoval(const PortRef & port_ref)1459 void Node::InitiateProxyRemoval(const PortRef& port_ref) {
1460 NodeName peer_node_name;
1461 PortName peer_port_name;
1462 {
1463 SinglePortLocker locker(&port_ref);
1464 auto* port = locker.port();
1465 peer_node_name = port->peer_node_name;
1466 peer_port_name = port->peer_port_name;
1467 }
1468
1469 // To remove this node, we start by notifying the connected graph that we are
1470 // a proxy. This allows whatever port is referencing this node to skip it.
1471 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1472 // the peer was closed in the meantime).
1473 delegate_->ForwardEvent(peer_node_name,
1474 mozilla::MakeUnique<ObserveProxyEvent>(
1475 peer_port_name, name_, port_ref.name(),
1476 peer_node_name, peer_port_name));
1477 }
1478
TryRemoveProxy(const PortRef & port_ref)1479 void Node::TryRemoveProxy(const PortRef& port_ref) {
1480 bool should_erase = false;
1481 NodeName removal_target_node;
1482 ScopedEvent removal_event;
1483
1484 {
1485 SinglePortLocker locker(&port_ref);
1486 auto* port = locker.port();
1487 DCHECK(port->state == Port::kProxying);
1488
1489 // Make sure we have seen ObserveProxyAck before removing the port.
1490 if (!port->remove_proxy_on_last_message) {
1491 return;
1492 }
1493
1494 if (!CanAcceptMoreMessages(port)) {
1495 should_erase = true;
1496 if (port->send_on_proxy_removal) {
1497 removal_target_node = port->send_on_proxy_removal->first;
1498 removal_event = std::move(port->send_on_proxy_removal->second);
1499 }
1500 } else {
1501 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1502 << " now; waiting for more messages";
1503 }
1504 }
1505
1506 if (should_erase) {
1507 ErasePort(port_ref.name());
1508 }
1509
1510 if (removal_event) {
1511 delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
1512 }
1513 }
1514
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1515 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1516 const PortName& port_name) {
1517 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1518 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1519 // node is matched.
1520
1521 std::vector<PortRef> ports_to_notify;
1522 std::vector<PortName> dead_proxies_to_broadcast;
1523 std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
1524
1525 {
1526 PortLocker::AssertNoPortsLockedOnCurrentThread();
1527 mozilla::MutexAutoLock ports_lock(ports_lock_);
1528
1529 auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
1530 if (node_peer_port_map_iter == peer_port_maps_.end()) {
1531 return;
1532 }
1533
1534 auto& node_peer_port_map = node_peer_port_map_iter->second;
1535 auto peer_ports_begin = node_peer_port_map.begin();
1536 auto peer_ports_end = node_peer_port_map.end();
1537 if (port_name != kInvalidPortName) {
1538 // If |port_name| is given, we limit the set of local ports to the ones
1539 // with that specific port as their peer.
1540 peer_ports_begin = node_peer_port_map.find(port_name);
1541 if (peer_ports_begin == node_peer_port_map.end()) {
1542 return;
1543 }
1544
1545 peer_ports_end = peer_ports_begin;
1546 ++peer_ports_end;
1547 }
1548
1549 for (auto peer_port_iter = peer_ports_begin;
1550 peer_port_iter != peer_ports_end; ++peer_port_iter) {
1551 auto& local_ports = peer_port_iter->second;
1552 // NOTE: This inner loop almost always has only one element. There are
1553 // relatively short-lived cases where more than one local port points to
1554 // the same peer, and this only happens when extra ports are bypassed
1555 // proxies waiting to be torn down.
1556 for (auto& local_port : local_ports) {
1557 auto& local_port_ref = local_port.second;
1558
1559 SinglePortLocker locker(&local_port_ref);
1560 auto* port = locker.port();
1561
1562 if (!port->peer_closed) {
1563 // Treat this as immediate peer closure. It's an exceptional
1564 // condition akin to a broken pipe, so we don't care about losing
1565 // messages.
1566
1567 port->peer_closed = true;
1568 port->peer_lost_unexpectedly = true;
1569 if (port->state == Port::kReceiving) {
1570 ports_to_notify.push_back(local_port_ref);
1571 }
1572 }
1573
1574 // We don't expect to forward any further messages, and we don't
1575 // expect to receive a Port{Accepted,Rejected} event. Because we're
1576 // a proxy with no active peer, we cannot use the normal proxy removal
1577 // procedure of forward-propagating an ObserveProxy. Instead we
1578 // broadcast our own death so it can be back-propagated. This is
1579 // inefficient but rare.
1580 if (port->state != Port::kReceiving) {
1581 dead_proxies_to_broadcast.push_back(local_port_ref.name());
1582 std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
1583 port->message_queue.TakeAllMessages(&messages);
1584 for (auto& message : messages) {
1585 undelivered_messages.emplace_back(std::move(message));
1586 }
1587 }
1588 }
1589 }
1590 }
1591
1592 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1593 ErasePort(proxy_name);
1594 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1595 }
1596
1597 // Wake up any receiving ports who have just observed simulated peer closure.
1598 for (const auto& port : ports_to_notify) {
1599 delegate_->PortStatusChanged(port);
1600 }
1601
1602 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1603 // Broadcast an event signifying that this proxy is no longer functioning.
1604 delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>(
1605 kInvalidPortName, name_, proxy_name, kInvalidNodeName,
1606 kInvalidPortName));
1607
1608 // Also process death locally since the port that points this closed one
1609 // could be on the current node.
1610 // Note: Although this is recursive, only a single port is involved which
1611 // limits the expected branching to 1.
1612 DestroyAllPortsWithPeer(name_, proxy_name);
1613 }
1614
1615 // Close any ports referenced by undelivered messages.
1616 for (const auto& message : undelivered_messages) {
1617 for (size_t i = 0; i < message->num_ports(); ++i) {
1618 PortRef ref;
1619 if (GetPort(message->ports()[i], &ref) == OK) {
1620 ClosePort(ref);
1621 }
1622 }
1623 }
1624 }
1625
UpdatePortPeerAddress(const PortName & local_port_name,Port * local_port,const NodeName & new_peer_node,const PortName & new_peer_port)1626 void Node::UpdatePortPeerAddress(const PortName& local_port_name,
1627 Port* local_port,
1628 const NodeName& new_peer_node,
1629 const PortName& new_peer_port) {
1630 ports_lock_.AssertCurrentThreadOwns();
1631 local_port->AssertLockAcquired();
1632
1633 RemoveFromPeerPortMap(local_port_name, local_port);
1634 local_port->peer_node_name = new_peer_node;
1635 local_port->peer_port_name = new_peer_port;
1636 if (new_peer_port != kInvalidPortName) {
1637 peer_port_maps_[new_peer_node][new_peer_port].emplace(
1638 local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port}));
1639 }
1640 }
1641
RemoveFromPeerPortMap(const PortName & local_port_name,Port * local_port)1642 void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
1643 Port* local_port) {
1644 if (local_port->peer_port_name == kInvalidPortName) {
1645 return;
1646 }
1647
1648 auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
1649 if (node_iter == peer_port_maps_.end()) {
1650 return;
1651 }
1652
1653 auto& node_peer_port_map = node_iter->second;
1654 auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
1655 if (ports_iter == node_peer_port_map.end()) {
1656 return;
1657 }
1658
1659 auto& local_ports_with_this_peer = ports_iter->second;
1660 local_ports_with_this_peer.erase(local_port_name);
1661 if (local_ports_with_this_peer.empty()) {
1662 node_peer_port_map.erase(ports_iter);
1663 }
1664 if (node_peer_port_map.empty()) {
1665 peer_port_maps_.erase(node_iter);
1666 }
1667 }
1668
SwapPortPeers(const PortName & port0_name,Port * port0,const PortName & port1_name,Port * port1)1669 void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
1670 const PortName& port1_name, Port* port1) {
1671 ports_lock_.AssertCurrentThreadOwns();
1672 port0->AssertLockAcquired();
1673 port1->AssertLockAcquired();
1674
1675 auto& peer0_ports =
1676 peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
1677 auto& peer1_ports =
1678 peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
1679 peer0_ports.erase(port0_name);
1680 peer1_ports.erase(port1_name);
1681 peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1}));
1682 peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0}));
1683
1684 std::swap(port0->peer_node_name, port1->peer_node_name);
1685 std::swap(port0->peer_port_name, port1->peer_port_name);
1686 }
1687
MaybeResendAckRequest(const PortRef & port_ref)1688 void Node::MaybeResendAckRequest(const PortRef& port_ref) {
1689 NodeName peer_node_name;
1690 ScopedEvent ack_request_event;
1691 {
1692 SinglePortLocker locker(&port_ref);
1693 auto* port = locker.port();
1694 if (port->state != Port::kReceiving) {
1695 return;
1696 }
1697
1698 if (!port->sequence_num_acknowledge_interval) {
1699 return;
1700 }
1701
1702 peer_node_name = port->peer_node_name;
1703 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
1704 port->peer_port_name, port->last_sequence_num_acknowledged +
1705 port->sequence_num_acknowledge_interval);
1706 }
1707
1708 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
1709 }
1710
MaybeForwardAckRequest(const PortRef & port_ref)1711 void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
1712 NodeName peer_node_name;
1713 ScopedEvent ack_request_event;
1714 {
1715 SinglePortLocker locker(&port_ref);
1716 auto* port = locker.port();
1717 if (port->state != Port::kProxying) {
1718 return;
1719 }
1720
1721 if (!port->sequence_num_to_acknowledge) {
1722 return;
1723 }
1724
1725 peer_node_name = port->peer_node_name;
1726 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
1727 port->peer_port_name, port->sequence_num_to_acknowledge);
1728
1729 port->sequence_num_to_acknowledge = 0;
1730 }
1731
1732 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
1733 }
1734
MaybeResendAck(const PortRef & port_ref)1735 void Node::MaybeResendAck(const PortRef& port_ref) {
1736 NodeName peer_node_name;
1737 ScopedEvent ack_event;
1738 {
1739 SinglePortLocker locker(&port_ref);
1740 auto* port = locker.port();
1741 if (port->state != Port::kReceiving) {
1742 return;
1743 }
1744
1745 uint64_t last_sequence_num_read =
1746 port->message_queue.next_sequence_num() - 1;
1747 if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) {
1748 return;
1749 }
1750
1751 peer_node_name = port->peer_node_name;
1752 ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
1753 port->peer_port_name, last_sequence_num_read);
1754 }
1755
1756 delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
1757 }
1758
DelegateHolder(Node * node,NodeDelegate * delegate)1759 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
1760 : node_(node), delegate_(delegate) {
1761 DCHECK(node_);
1762 }
1763
1764 Node::DelegateHolder::~DelegateHolder() = default;
1765
1766 #ifdef DEBUG
EnsureSafeDelegateAccess() const1767 void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
1768 PortLocker::AssertNoPortsLockedOnCurrentThread();
1769 mozilla::MutexAutoLock lock(node_->ports_lock_);
1770 }
1771 #endif
1772
1773 } // namespace ports
1774 } // namespace core
1775 } // namespace mojo
1776