1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/ports/node.h"
6 
7 #include <string.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <utility>
13 #include <vector>
14 
15 #include "mozilla/Mutex.h"
16 #include "mozilla/RandomNum.h"
17 #include "nsTArray.h"
18 
19 #include "base/logging.h"
20 #include "mojo/core/ports/event.h"
21 #include "mojo/core/ports/node_delegate.h"
22 #include "mojo/core/ports/port_locker.h"
23 
24 namespace mojo {
25 namespace core {
26 namespace ports {
27 
28 namespace {
29 
DebugError(const char * message,int error_code)30 int DebugError(const char* message, int error_code) {
31   NOTREACHED() << "Oops: " << message;
32   return error_code;
33 }
34 
35 #define OOPS(x) DebugError(#x, x)
36 
CanAcceptMoreMessages(const Port * port)37 bool CanAcceptMoreMessages(const Port* port) {
38   // Have we already doled out the last message (i.e., do we expect to NOT
39   // receive further messages)?
40   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
41   if (port->state == Port::kClosed) {
42     return false;
43   }
44   if (port->peer_closed || port->remove_proxy_on_last_message) {
45     if (port->peer_lost_unexpectedly) {
46       return port->message_queue.HasNextMessage();
47     }
48     if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
49       return false;
50     }
51   }
52   return true;
53 }
54 
GenerateRandomPortName(PortName * name)55 void GenerateRandomPortName(PortName* name) {
56   // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when
57   // generating port names to keep this overhead down. If this method starts
58   // showing up on profiles we should consider doing the same.
59   *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()};
60 }
61 
62 }  // namespace
63 
Node(const NodeName & name,NodeDelegate * delegate)64 Node::Node(const NodeName& name, NodeDelegate* delegate)
65     : name_(name), delegate_(this, delegate) {}
66 
~Node()67 Node::~Node() {
68   if (!ports_.empty()) {
69     DLOG(WARNING) << "Unclean shutdown for node " << name_;
70   }
71 }
72 
CanShutdownCleanly(ShutdownPolicy policy)73 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
74   PortLocker::AssertNoPortsLockedOnCurrentThread();
75   mozilla::MutexAutoLock ports_lock(ports_lock_);
76 
77   if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
78 #ifdef DEBUG
79     for (auto& entry : ports_) {
80       DVLOG(2) << "Port " << entry.first << " referencing node "
81                << entry.second->peer_node_name << " is blocking shutdown of "
82                << "node " << name_ << " (state=" << entry.second->state << ")";
83     }
84 #endif
85     return ports_.empty();
86   }
87 
88   DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
89 
90   // NOTE: This is not efficient, though it probably doesn't need to be since
91   // relatively few ports should be open during shutdown and shutdown doesn't
92   // need to be blazingly fast.
93   bool can_shutdown = true;
94   for (auto& entry : ports_) {
95     PortRef port_ref(entry.first, entry.second);
96     SinglePortLocker locker(&port_ref);
97     auto* port = locker.port();
98     if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
99       can_shutdown = false;
100 #ifdef DEBUG
101       DVLOG(2) << "Port " << entry.first << " referencing node "
102                << port->peer_node_name << " is blocking shutdown of "
103                << "node " << name_ << " (state=" << port->state << ")";
104 #else
105       // Exit early when not debugging.
106       break;
107 #endif
108     }
109   }
110 
111   return can_shutdown;
112 }
113 
GetPort(const PortName & port_name,PortRef * port_ref)114 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
115   PortLocker::AssertNoPortsLockedOnCurrentThread();
116   mozilla::MutexAutoLock lock(ports_lock_);
117   auto iter = ports_.find(port_name);
118   if (iter == ports_.end()) {
119     return ERROR_PORT_UNKNOWN;
120   }
121 
122 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
123   // Workaround for https://crbug.com/665869.
124   std::atomic_thread_fence(std::memory_order_seq_cst);
125 #endif
126 
127   *port_ref = PortRef(port_name, iter->second);
128   return OK;
129 }
130 
CreateUninitializedPort(PortRef * port_ref)131 int Node::CreateUninitializedPort(PortRef* port_ref) {
132   PortName port_name;
133   GenerateRandomPortName(&port_name);
134 
135   RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
136   int rv = AddPortWithName(port_name, port);
137   if (rv != OK) {
138     return rv;
139   }
140 
141   *port_ref = PortRef(port_name, std::move(port));
142   return OK;
143 }
144 
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)145 int Node::InitializePort(const PortRef& port_ref,
146                          const NodeName& peer_node_name,
147                          const PortName& peer_port_name) {
148   {
149     // Must be acquired for UpdatePortPeerAddress below.
150     PortLocker::AssertNoPortsLockedOnCurrentThread();
151     mozilla::MutexAutoLock ports_lock(ports_lock_);
152 
153     SinglePortLocker locker(&port_ref);
154     auto* port = locker.port();
155     if (port->state != Port::kUninitialized) {
156       return ERROR_PORT_STATE_UNEXPECTED;
157     }
158 
159     port->state = Port::kReceiving;
160     UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
161                           peer_port_name);
162   }
163 
164   delegate_->PortStatusChanged(port_ref);
165 
166   return OK;
167 }
168 
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)169 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
170   int rv;
171 
172   rv = CreateUninitializedPort(port0_ref);
173   if (rv != OK) {
174     return rv;
175   }
176 
177   rv = CreateUninitializedPort(port1_ref);
178   if (rv != OK) {
179     return rv;
180   }
181 
182   rv = InitializePort(*port0_ref, name_, port1_ref->name());
183   if (rv != OK) {
184     return rv;
185   }
186 
187   rv = InitializePort(*port1_ref, name_, port0_ref->name());
188   if (rv != OK) {
189     return rv;
190   }
191 
192   return OK;
193 }
194 
SetUserData(const PortRef & port_ref,RefPtr<UserData> user_data)195 int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
196   SinglePortLocker locker(&port_ref);
197   auto* port = locker.port();
198   if (port->state == Port::kClosed) {
199     return ERROR_PORT_STATE_UNEXPECTED;
200   }
201 
202   port->user_data = std::move(user_data);
203 
204   return OK;
205 }
206 
GetUserData(const PortRef & port_ref,RefPtr<UserData> * user_data)207 int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
208   SinglePortLocker locker(&port_ref);
209   auto* port = locker.port();
210   if (port->state == Port::kClosed) {
211     return ERROR_PORT_STATE_UNEXPECTED;
212   }
213 
214   *user_data = port->user_data;
215 
216   return OK;
217 }
218 
ClosePort(const PortRef & port_ref)219 int Node::ClosePort(const PortRef& port_ref) {
220   std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
221   NodeName peer_node_name;
222   PortName peer_port_name;
223   uint64_t last_sequence_num = 0;
224   bool was_initialized = false;
225   {
226     SinglePortLocker locker(&port_ref);
227     auto* port = locker.port();
228     switch (port->state) {
229       case Port::kUninitialized:
230         break;
231 
232       case Port::kReceiving:
233         was_initialized = true;
234         port->state = Port::kClosed;
235 
236         // We pass along the sequence number of the last message sent from this
237         // port to allow the peer to have the opportunity to consume all inbound
238         // messages before notifying the embedder that this port is closed.
239         last_sequence_num = port->next_sequence_num_to_send - 1;
240 
241         peer_node_name = port->peer_node_name;
242         peer_port_name = port->peer_port_name;
243 
244         // If the port being closed still has unread messages, then we need to
245         // take care to close those ports so as to avoid leaking memory.
246         port->message_queue.TakeAllMessages(&undelivered_messages);
247         break;
248 
249       default:
250         return ERROR_PORT_STATE_UNEXPECTED;
251     }
252   }
253 
254   ErasePort(port_ref.name());
255 
256   if (was_initialized) {
257     DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
258              << name_ << " to " << peer_port_name << "@" << peer_node_name;
259     delegate_->ForwardEvent(peer_node_name,
260                             mozilla::MakeUnique<ObserveClosureEvent>(
261                                 peer_port_name, last_sequence_num));
262     for (const auto& message : undelivered_messages) {
263       for (size_t i = 0; i < message->num_ports(); ++i) {
264         PortRef ref;
265         if (GetPort(message->ports()[i], &ref) == OK) {
266           ClosePort(ref);
267         }
268       }
269     }
270   }
271   return OK;
272 }
273 
GetStatus(const PortRef & port_ref,PortStatus * port_status)274 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
275   SinglePortLocker locker(&port_ref);
276   auto* port = locker.port();
277   if (port->state != Port::kReceiving) {
278     return ERROR_PORT_STATE_UNEXPECTED;
279   }
280 
281   port_status->has_messages = port->message_queue.HasNextMessage();
282   port_status->receiving_messages = CanAcceptMoreMessages(port);
283   port_status->peer_closed = port->peer_closed;
284   port_status->peer_remote = port->peer_node_name != name_;
285   port_status->queued_message_count =
286       port->message_queue.queued_message_count();
287   port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
288   port_status->unacknowledged_message_count =
289       port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
290       1;
291 
292   return OK;
293 }
294 
GetMessage(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> * message,MessageFilter * filter)295 int Node::GetMessage(const PortRef& port_ref,
296                      mozilla::UniquePtr<UserMessageEvent>* message,
297                      MessageFilter* filter) {
298   *message = nullptr;
299 
300   DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
301 
302   NodeName peer_node_name;
303   ScopedEvent ack_event;
304   {
305     SinglePortLocker locker(&port_ref);
306     auto* port = locker.port();
307 
308     // This could also be treated like the port being unknown since the
309     // embedder should no longer be referring to a port that has been sent.
310     if (port->state != Port::kReceiving) {
311       return ERROR_PORT_STATE_UNEXPECTED;
312     }
313 
314     // Let the embedder get messages until there are no more before reporting
315     // that the peer closed its end.
316     if (!CanAcceptMoreMessages(port)) {
317       return ERROR_PORT_PEER_CLOSED;
318     }
319 
320     port->message_queue.GetNextMessage(message, filter);
321     if (*message &&
322         (*message)->sequence_num() == port->sequence_num_to_acknowledge) {
323       peer_node_name = port->peer_node_name;
324       ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
325           port->peer_port_name, port->sequence_num_to_acknowledge);
326     }
327   }
328 
329   if (ack_event) {
330     delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
331   }
332 
333   // Allow referenced ports to trigger PortStatusChanged calls.
334   if (*message) {
335     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
336       PortRef new_port_ref;
337       int rv = GetPort((*message)->ports()[i], &new_port_ref);
338 
339       DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
340                         << " does not exist!";
341 
342       SinglePortLocker locker(&new_port_ref);
343       DCHECK(locker.port()->state == Port::kReceiving);
344       locker.port()->message_queue.set_signalable(true);
345     }
346 
347     // The user may retransmit this message from another port. We reset the
348     // sequence number so that the message will get a new one if that happens.
349     (*message)->set_sequence_num(0);
350   }
351 
352   return OK;
353 }
354 
SendUserMessage(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> message)355 int Node::SendUserMessage(const PortRef& port_ref,
356                           mozilla::UniquePtr<UserMessageEvent> message) {
357   int rv = SendUserMessageInternal(port_ref, &message);
358   if (rv != OK) {
359     // If send failed, close all carried ports. Note that we're careful not to
360     // close the sending port itself if it happened to be one of the encoded
361     // ports (an invalid but possible condition.)
362     for (size_t i = 0; i < message->num_ports(); ++i) {
363       if (message->ports()[i] == port_ref.name()) {
364         continue;
365       }
366 
367       PortRef port;
368       if (GetPort(message->ports()[i], &port) == OK) {
369         ClosePort(port);
370       }
371     }
372   }
373   return rv;
374 }
375 
SetAcknowledgeRequestInterval(const PortRef & port_ref,uint64_t sequence_num_acknowledge_interval)376 int Node::SetAcknowledgeRequestInterval(
377     const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
378   NodeName peer_node_name;
379   PortName peer_port_name;
380   uint64_t sequence_num_to_request_ack = 0;
381   {
382     SinglePortLocker locker(&port_ref);
383     auto* port = locker.port();
384     if (port->state != Port::kReceiving) {
385       return ERROR_PORT_STATE_UNEXPECTED;
386     }
387 
388     port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
389     if (!sequence_num_acknowledge_interval) {
390       return OK;
391     }
392 
393     peer_node_name = port->peer_node_name;
394     peer_port_name = port->peer_port_name;
395 
396     sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
397                                   sequence_num_acknowledge_interval;
398   }
399 
400   delegate_->ForwardEvent(peer_node_name,
401                           mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
402                               peer_port_name, sequence_num_to_request_ack));
403   return OK;
404 }
405 
AcceptEvent(ScopedEvent event)406 int Node::AcceptEvent(ScopedEvent event) {
407   switch (event->type()) {
408     case Event::Type::kUserMessage:
409       return OnUserMessage(Event::Cast<UserMessageEvent>(&event));
410     case Event::Type::kPortAccepted:
411       return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event));
412     case Event::Type::kObserveProxy:
413       return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event));
414     case Event::Type::kObserveProxyAck:
415       return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event));
416     case Event::Type::kObserveClosure:
417       return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event));
418     case Event::Type::kMergePort:
419       return OnMergePort(Event::Cast<MergePortEvent>(&event));
420     case Event::Type::kUserMessageReadAckRequest:
421       return OnUserMessageReadAckRequest(
422           Event::Cast<UserMessageReadAckRequestEvent>(&event));
423     case Event::Type::kUserMessageReadAck:
424       return OnUserMessageReadAck(Event::Cast<UserMessageReadAckEvent>(&event));
425   }
426   return OOPS(ERROR_NOT_IMPLEMENTED);
427 }
428 
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)429 int Node::MergePorts(const PortRef& port_ref,
430                      const NodeName& destination_node_name,
431                      const PortName& destination_port_name) {
432   PortName new_port_name;
433   Event::PortDescriptor new_port_descriptor;
434   {
435     // Must be held for ConvertToProxy.
436     PortLocker::AssertNoPortsLockedOnCurrentThread();
437     mozilla::MutexAutoLock ports_locker(ports_lock_);
438 
439     SinglePortLocker locker(&port_ref);
440 
441     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
442              << " to " << destination_port_name << "@" << destination_node_name;
443 
444     // Send the port-to-merge over to the destination node so it can be merged
445     // into the port cycle atomically there.
446     new_port_name = port_ref.name();
447     ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
448                    &new_port_descriptor);
449   }
450 
451   if (new_port_descriptor.peer_node_name == name_ &&
452       destination_node_name != name_) {
453     // Ensure that the locally retained peer of the new proxy gets a status
454     // update so it notices that its peer is now remote.
455     PortRef local_peer;
456     if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
457       delegate_->PortStatusChanged(local_peer);
458     }
459   }
460 
461   delegate_->ForwardEvent(
462       destination_node_name,
463       mozilla::MakeUnique<MergePortEvent>(destination_port_name, new_port_name,
464                                           new_port_descriptor));
465   return OK;
466 }
467 
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)468 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
469   DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
470            << " and " << port1_ref.name() << "@" << name_;
471   return MergePortsInternal(port0_ref, port1_ref,
472                             true /* allow_close_on_bad_state */);
473 }
474 
LostConnectionToNode(const NodeName & node_name)475 int Node::LostConnectionToNode(const NodeName& node_name) {
476   // We can no longer send events to the given node. We also can't expect any
477   // PortAccepted events.
478 
479   DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
480            << node_name;
481 
482   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
483   return OK;
484 }
485 
OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message)486 int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
487   PortName port_name = message->port_name();
488 
489 #ifdef DEBUG
490   std::ostringstream ports_buf;
491   for (size_t i = 0; i < message->num_ports(); ++i) {
492     if (i > 0) {
493       ports_buf << ",";
494     }
495     ports_buf << message->ports()[i];
496   }
497 
498   DVLOG(4) << "OnUserMessage " << message->sequence_num()
499            << " [ports=" << ports_buf.str() << "] at " << port_name << "@"
500            << name_;
501 #endif
502 
503   // Even if this port does not exist, cannot receive anymore messages or is
504   // buffering or proxying messages, we still need these ports to be bound to
505   // this node. When the message is forwarded, these ports will get transferred
506   // following the usual method. If the message cannot be accepted, then the
507   // newly bound ports will simply be closed.
508   for (size_t i = 0; i < message->num_ports(); ++i) {
509     Event::PortDescriptor& descriptor = message->port_descriptors()[i];
510     if (descriptor.referring_node_name == kInvalidNodeName) {
511       // If the referring node name is invalid, this descriptor can be ignored
512       // and the port should already exist locally.
513       PortRef port_ref;
514       if (GetPort(message->ports()[i], &port_ref) != OK) {
515         return ERROR_PORT_UNKNOWN;
516       }
517     } else {
518       int rv = AcceptPort(message->ports()[i], descriptor);
519       if (rv != OK) {
520         return rv;
521       }
522 
523       // Ensure that the referring node is wiped out of this descriptor. This
524       // allows the event to be forwarded across multiple local hops without
525       // attempting to accept the port more than once.
526       descriptor.referring_node_name = kInvalidNodeName;
527     }
528   }
529 
530   PortRef port_ref;
531   GetPort(port_name, &port_ref);
532   bool has_next_message = false;
533   bool message_accepted = false;
534   bool should_forward_messages = false;
535   if (port_ref.is_valid()) {
536     SinglePortLocker locker(&port_ref);
537     auto* port = locker.port();
538 
539     // Reject spurious messages if we've already received the last expected
540     // message.
541     if (CanAcceptMoreMessages(port)) {
542       message_accepted = true;
543       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
544 
545       if (port->state == Port::kBuffering) {
546         has_next_message = false;
547       } else if (port->state == Port::kProxying) {
548         has_next_message = false;
549         should_forward_messages = true;
550       }
551     }
552   }
553 
554   if (should_forward_messages) {
555     int rv = ForwardUserMessagesFromProxy(port_ref);
556     if (rv != OK) {
557       return rv;
558     }
559     TryRemoveProxy(port_ref);
560   }
561 
562   if (!message_accepted) {
563     DVLOG(2) << "Message not accepted!\n";
564     // Close all newly accepted ports as they are effectively orphaned.
565     for (size_t i = 0; i < message->num_ports(); ++i) {
566       PortRef attached_port_ref;
567       if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
568         ClosePort(attached_port_ref);
569       } else {
570         DLOG(WARNING) << "Cannot close non-existent port!\n";
571       }
572     }
573   } else if (has_next_message) {
574     delegate_->PortStatusChanged(port_ref);
575   }
576 
577   return OK;
578 }
579 
OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event)580 int Node::OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event) {
581   PortRef port_ref;
582   if (GetPort(event->port_name(), &port_ref) != OK) {
583     return ERROR_PORT_UNKNOWN;
584   }
585 
586 #ifdef DEBUG
587   {
588     SinglePortLocker locker(&port_ref);
589     DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
590              << " pointing to " << locker.port()->peer_port_name << "@"
591              << locker.port()->peer_node_name;
592   }
593 #endif
594 
595   return BeginProxying(port_ref);
596 }
597 
OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event)598 int Node::OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event) {
599   if (event->port_name() == kInvalidPortName) {
600     // An ObserveProxy with an invalid target port name is a broadcast used to
601     // inform ports when their peer (which was itself a proxy) has become
602     // defunct due to unexpected node disconnection.
603     //
604     // Receiving ports affected by this treat it as equivalent to peer closure.
605     // Proxies affected by this can be removed and will in turn broadcast their
606     // own death with a similar message.
607     DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
608     DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
609     DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
610     return OK;
611   }
612 
613   // The port may have already been closed locally, in which case the
614   // ObserveClosure message will contain the last_sequence_num field.
615   // We can then silently ignore this message.
616   PortRef port_ref;
617   if (GetPort(event->port_name(), &port_ref) != OK) {
618     DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
619              << " not found";
620     return OK;
621   }
622 
623   DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
624            << ", proxy at " << event->proxy_port_name() << "@"
625            << event->proxy_node_name() << " pointing to "
626            << event->proxy_target_port_name() << "@"
627            << event->proxy_target_node_name();
628 
629   bool peer_changed = false;
630   ScopedEvent event_to_forward;
631   NodeName event_target_node;
632   {
633     // Must be acquired for UpdatePortPeerAddress below.
634     PortLocker::AssertNoPortsLockedOnCurrentThread();
635     mozilla::MutexAutoLock ports_locker(ports_lock_);
636 
637     SinglePortLocker locker(&port_ref);
638     auto* port = locker.port();
639 
640     if (port->peer_node_name == event->proxy_node_name() &&
641         port->peer_port_name == event->proxy_port_name()) {
642       if (port->state == Port::kReceiving) {
643         UpdatePortPeerAddress(port_ref.name(), port,
644                               event->proxy_target_node_name(),
645                               event->proxy_target_port_name());
646         event_target_node = event->proxy_node_name();
647         event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>(
648             event->proxy_port_name(), port->next_sequence_num_to_send - 1);
649         peer_changed = true;
650         DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
651                  << "@" << name_ << " to " << event->proxy_port_name() << "@"
652                  << event_target_node;
653       } else {
654         // As a proxy ourselves, we don't know how to honor the ObserveProxy
655         // event or to populate the last_sequence_num field of ObserveProxyAck.
656         // Afterall, another port could be sending messages to our peer now
657         // that we've sent out our own ObserveProxy event.  Instead, we will
658         // send an ObserveProxyAck indicating that the ObserveProxy event
659         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
660         // However, this has to be done after we are removed as a proxy.
661         // Otherwise, we might just find ourselves back here again, which
662         // would be akin to a busy loop.
663 
664         DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
665                  << "@" << event->proxy_node_name();
666 
667         port->send_on_proxy_removal =
668             mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>(
669                 event->proxy_node_name(),
670                 mozilla::MakeUnique<ObserveProxyAckEvent>(
671                     event->proxy_port_name(), kInvalidSequenceNum));
672       }
673     } else {
674       // Forward this event along to our peer. Eventually, it should find the
675       // port referring to the proxy.
676       event_target_node = port->peer_node_name;
677       event->set_port_name(port->peer_port_name);
678       event_to_forward = std::move(event);
679     }
680   }
681 
682   if (event_to_forward) {
683     delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
684   }
685 
686   if (peer_changed) {
687     // Re-send ack and/or ack requests, as the previous peer proxy may not have
688     // forwarded the previous request before it died.
689     MaybeResendAck(port_ref);
690     MaybeResendAckRequest(port_ref);
691 
692     delegate_->PortStatusChanged(port_ref);
693   }
694 
695   return OK;
696 }
697 
OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event)698 int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) {
699   DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
700            << " (last_sequence_num=" << event->last_sequence_num() << ")";
701 
702   PortRef port_ref;
703   if (GetPort(event->port_name(), &port_ref) != OK) {
704     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first.
705   }
706 
707   bool try_remove_proxy_immediately;
708   {
709     SinglePortLocker locker(&port_ref);
710     auto* port = locker.port();
711     if (port->state != Port::kProxying) {
712       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
713     }
714 
715     // If the last sequence number is invalid, this is a signal that we need to
716     // retransmit the ObserveProxy event for this port rather than flagging the
717     // the proxy for removal ASAP.
718     try_remove_proxy_immediately =
719         event->last_sequence_num() != kInvalidSequenceNum;
720     if (try_remove_proxy_immediately) {
721       // We can now remove this port once we have received and forwarded the
722       // last message addressed to this port.
723       port->remove_proxy_on_last_message = true;
724       port->last_sequence_num_to_receive = event->last_sequence_num();
725     }
726   }
727 
728   if (try_remove_proxy_immediately) {
729     TryRemoveProxy(port_ref);
730   } else {
731     InitiateProxyRemoval(port_ref);
732   }
733 
734   return OK;
735 }
736 
OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event)737 int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) {
738   // OK if the port doesn't exist, as it may have been closed already.
739   PortRef port_ref;
740   if (GetPort(event->port_name(), &port_ref) != OK) {
741     return OK;
742   }
743 
744   // This message tells the port that it should no longer expect more messages
745   // beyond last_sequence_num. This message is forwarded along until we reach
746   // the receiving end, and this message serves as an equivalent to
747   // ObserveProxyAck.
748 
749   bool notify_delegate = false;
750   NodeName peer_node_name;
751   PortName peer_port_name;
752   bool try_remove_proxy = false;
753   {
754     SinglePortLocker locker(&port_ref);
755     auto* port = locker.port();
756 
757     port->peer_closed = true;
758     port->last_sequence_num_to_receive = event->last_sequence_num();
759 
760     DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
761              << " (state=" << port->state << ") pointing to "
762              << port->peer_port_name << "@" << port->peer_node_name
763              << " (last_sequence_num=" << event->last_sequence_num() << ")";
764 
765     // We always forward ObserveClosure, even beyond the receiving port which
766     // cares about it. This ensures that any dead-end proxies beyond that port
767     // are notified to remove themselves.
768 
769     if (port->state == Port::kReceiving) {
770       notify_delegate = true;
771 
772       // When forwarding along the other half of the port cycle, this will only
773       // reach dead-end proxies. Tell them we've sent our last message so they
774       // can go away.
775       //
776       // TODO: Repurposing ObserveClosure for this has the desired result but
777       // may be semantically confusing since the forwarding port is not actually
778       // closed. Consider replacing this with a new event type.
779       event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
780 
781       // Treat the closure as an acknowledge that all sent messages have been
782       // read from the other end.
783       port->last_sequence_num_acknowledged =
784           port->next_sequence_num_to_send - 1;
785     } else {
786       // We haven't yet reached the receiving peer of the closed port, so we'll
787       // forward the message along as-is.
788       // See about removing the port if it is a proxy as our peer won't be able
789       // to participate in proxy removal.
790       port->remove_proxy_on_last_message = true;
791       if (port->state == Port::kProxying) {
792         try_remove_proxy = true;
793       }
794     }
795 
796     DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
797              << name_ << " to peer " << port->peer_port_name << "@"
798              << port->peer_node_name
799              << " (last_sequence_num=" << event->last_sequence_num() << ")";
800 
801     peer_node_name = port->peer_node_name;
802     peer_port_name = port->peer_port_name;
803   }
804 
805   if (try_remove_proxy) {
806     TryRemoveProxy(port_ref);
807   }
808 
809   event->set_port_name(peer_port_name);
810   delegate_->ForwardEvent(peer_node_name, std::move(event));
811 
812   if (notify_delegate) {
813     delegate_->PortStatusChanged(port_ref);
814   }
815 
816   return OK;
817 }
818 
OnMergePort(mozilla::UniquePtr<MergePortEvent> event)819 int Node::OnMergePort(mozilla::UniquePtr<MergePortEvent> event) {
820   PortRef port_ref;
821   GetPort(event->port_name(), &port_ref);
822 
823   DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
824            << " merging with proxy " << event->new_port_name() << "@" << name_
825            << " pointing to " << event->new_port_descriptor().peer_port_name
826            << "@" << event->new_port_descriptor().peer_node_name
827            << " referred by "
828            << event->new_port_descriptor().referring_port_name << "@"
829            << event->new_port_descriptor().referring_node_name;
830 
831   // Accept the new port. This is now the receiving end of the other port cycle
832   // to be merged with ours. Note that we always attempt to accept the new port
833   // first as otherwise its peer receiving port could be left stranded
834   // indefinitely.
835   if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
836     if (port_ref.is_valid()) {
837       ClosePort(port_ref);
838     }
839     return ERROR_PORT_STATE_UNEXPECTED;
840   }
841 
842   PortRef new_port_ref;
843   GetPort(event->new_port_name(), &new_port_ref);
844   if (!port_ref.is_valid() && new_port_ref.is_valid()) {
845     ClosePort(new_port_ref);
846     return ERROR_PORT_UNKNOWN;
847   }
848   if (port_ref.is_valid() && !new_port_ref.is_valid()) {
849     ClosePort(port_ref);
850     return ERROR_PORT_UNKNOWN;
851   }
852 
853   return MergePortsInternal(port_ref, new_port_ref,
854                             false /* allow_close_on_bad_state */);
855 }
856 
OnUserMessageReadAckRequest(mozilla::UniquePtr<UserMessageReadAckRequestEvent> event)857 int Node::OnUserMessageReadAckRequest(
858     mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) {
859   PortRef port_ref;
860   GetPort(event->port_name(), &port_ref);
861 
862   DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
863            << event->sequence_num_to_acknowledge();
864 
865   if (!port_ref.is_valid()) {
866     return ERROR_PORT_UNKNOWN;
867   }
868 
869   NodeName peer_node_name;
870   mozilla::UniquePtr<Event> event_to_send;
871   {
872     SinglePortLocker locker(&port_ref);
873     auto* port = locker.port();
874 
875     peer_node_name = port->peer_node_name;
876     if (port->state == Port::kProxying) {
877       // Proxies simply forward the ack request to their peer.
878       event->set_port_name(port->peer_port_name);
879       event_to_send = std::move(event);
880     } else {
881       uint64_t current_sequence_num =
882           port->message_queue.next_sequence_num() - 1;
883       // Either this is requesting an ack for a sequence number already read, or
884       // else for a sequence number that is yet to be read.
885       if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
886         // If the current sequence number to read already exceeds the ack
887         // request, send an ack immediately.
888         event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>(
889             port->peer_port_name, current_sequence_num);
890 
891         // This might be a late or duplicate acknowledge request, that's
892         // requesting acknowledge for an already read message. There may already
893         // have been a request for future reads, so take care not to back up
894         // the requested acknowledge counter.
895         if (current_sequence_num > port->sequence_num_to_acknowledge) {
896           port->sequence_num_to_acknowledge = current_sequence_num;
897         }
898       } else {
899         // This is request to ack a sequence number that hasn't been read yet.
900         // The state of the port can either be that it already has a
901         // future-requested ack, or not. Because ack requests aren't guaranteed
902         // to arrive in order, store the earlier of the current  queued request
903         // and the new one, if one was already requested.
904         bool has_queued_ack_request =
905             port->sequence_num_to_acknowledge > current_sequence_num;
906         if (!has_queued_ack_request ||
907             port->sequence_num_to_acknowledge >
908                 event->sequence_num_to_acknowledge()) {
909           port->sequence_num_to_acknowledge =
910               event->sequence_num_to_acknowledge();
911         }
912         return OK;
913       }
914     }
915   }
916 
917   if (event_to_send) {
918     delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
919   }
920 
921   return OK;
922 }
923 
OnUserMessageReadAck(mozilla::UniquePtr<UserMessageReadAckEvent> event)924 int Node::OnUserMessageReadAck(
925     mozilla::UniquePtr<UserMessageReadAckEvent> event) {
926   PortRef port_ref;
927   GetPort(event->port_name(), &port_ref);
928 
929   DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
930            << event->sequence_num_acknowledged();
931 
932   NodeName peer_node_name;
933   ScopedEvent ack_request_event;
934   if (port_ref.is_valid()) {
935     SinglePortLocker locker(&port_ref);
936     auto* port = locker.port();
937 
938     if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
939       // TODO(http://crbug.com/980952): This is a malformed event.
940       //      This could return a new error "ERROR_MALFORMED_EVENT" which the
941       //      delegate could use as a signal to drop the peer node.
942       return OK;
943     }
944 
945     // Keep the largest acknowledge seen.
946     if (event->sequence_num_acknowledged() <=
947         port->last_sequence_num_acknowledged) {
948       // The acknowledge was late or a duplicate, it's safe to ignore it.
949       return OK;
950     }
951 
952     port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
953     // Send another ack request if the interval is non-zero and the peer has
954     // not been closed.
955     if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
956       peer_node_name = port->peer_node_name;
957       ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
958           port->peer_port_name, port->last_sequence_num_acknowledged +
959                                     port->sequence_num_acknowledge_interval);
960     }
961   }
962   if (ack_request_event) {
963     delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
964   }
965 
966   if (port_ref.is_valid()) {
967     delegate_->PortStatusChanged(port_ref);
968   }
969 
970   return OK;
971 }
972 
AddPortWithName(const PortName & port_name,RefPtr<Port> port)973 int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
974   PortLocker::AssertNoPortsLockedOnCurrentThread();
975   mozilla::MutexAutoLock lock(ports_lock_);
976   if (port->peer_port_name != kInvalidPortName) {
977     DCHECK_NE(kInvalidNodeName, port->peer_node_name);
978     peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
979         port_name, PortRef(port_name, port));
980   }
981   if (!ports_.emplace(port_name, std::move(port)).second) {
982     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
983   }
984   DVLOG(2) << "Created port " << port_name << "@" << name_;
985   return OK;
986 }
987 
ErasePort(const PortName & port_name)988 void Node::ErasePort(const PortName& port_name) {
989   PortLocker::AssertNoPortsLockedOnCurrentThread();
990   RefPtr<Port> port;
991   {
992     mozilla::MutexAutoLock lock(ports_lock_);
993     auto it = ports_.find(port_name);
994     if (it == ports_.end()) {
995       return;
996     }
997     port = std::move(it->second);
998     ports_.erase(it);
999 
1000     RemoveFromPeerPortMap(port_name, port.get());
1001   }
1002   // NOTE: We are careful not to release the port's messages while holding any
1003   // locks, since they may run arbitrary user code upon destruction.
1004   std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
1005   {
1006     PortRef port_ref(port_name, std::move(port));
1007     SinglePortLocker locker(&port_ref);
1008     locker.port()->message_queue.TakeAllMessages(&messages);
1009   }
1010   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
1011 }
1012 
SendUserMessageInternal(const PortRef & port_ref,mozilla::UniquePtr<UserMessageEvent> * message)1013 int Node::SendUserMessageInternal(
1014     const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
1015   mozilla::UniquePtr<UserMessageEvent>& m = *message;
1016   for (size_t i = 0; i < m->num_ports(); ++i) {
1017     if (m->ports()[i] == port_ref.name()) {
1018       return ERROR_PORT_CANNOT_SEND_SELF;
1019     }
1020   }
1021 
1022   NodeName target_node;
1023   int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
1024                                        false /* ignore_closed_peer */, m.get(),
1025                                        &target_node);
1026   if (rv != OK) {
1027     return rv;
1028   }
1029 
1030   // Beyond this point there's no sense in returning anything but OK. Even if
1031   // message forwarding or acceptance fails, there's nothing the embedder can
1032   // do to recover. Assume that failure beyond this point must be treated as a
1033   // transport failure.
1034 
1035   DCHECK_NE(kInvalidNodeName, target_node);
1036   if (target_node != name_) {
1037     delegate_->ForwardEvent(target_node, std::move(m));
1038     return OK;
1039   }
1040 
1041   int accept_result = AcceptEvent(std::move(m));
1042   if (accept_result != OK) {
1043     // See comment above for why we don't return an error in this case.
1044     DVLOG(2) << "AcceptEvent failed: " << accept_result;
1045   }
1046 
1047   return OK;
1048 }
1049 
MergePortsInternal(const PortRef & port0_ref,const PortRef & port1_ref,bool allow_close_on_bad_state)1050 int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
1051                              bool allow_close_on_bad_state) {
1052   const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
1053   {
1054     // Needed to swap peer map entries below.
1055     PortLocker::AssertNoPortsLockedOnCurrentThread();
1056     mozilla::Maybe<mozilla::MutexAutoLock> ports_locker(std::in_place,
1057                                                         ports_lock_);
1058 
1059     mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2));
1060     auto* port0 = locker->GetPort(port0_ref);
1061     auto* port1 = locker->GetPort(port1_ref);
1062 
1063     // There are several conditions which must be met before we'll consider
1064     // merging two ports:
1065     //
1066     // - They must both be in the kReceiving state
1067     // - They must not be each other's peer
1068     // - They must have never sent a user message
1069     //
1070     // If any of these criteria are not met, we fail early.
1071     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
1072         (port0->peer_node_name == name_ &&
1073          port0->peer_port_name == port1_ref.name()) ||
1074         (port1->peer_node_name == name_ &&
1075          port1->peer_port_name == port0_ref.name()) ||
1076         port0->next_sequence_num_to_send != kInitialSequenceNum ||
1077         port1->next_sequence_num_to_send != kInitialSequenceNum) {
1078       // On failure, we only close a port if it was at least properly in the
1079       // |kReceiving| state. This avoids getting the system in an inconsistent
1080       // state by e.g. closing a proxy abruptly.
1081       //
1082       // Note that we must release the port locks before closing ports.
1083       const bool close_port0 =
1084           port0->state == Port::kReceiving || allow_close_on_bad_state;
1085       const bool close_port1 =
1086           port1->state == Port::kReceiving || allow_close_on_bad_state;
1087       locker.reset();
1088       ports_locker.reset();
1089       if (close_port0) {
1090         ClosePort(port0_ref);
1091       }
1092       if (close_port1) {
1093         ClosePort(port1_ref);
1094       }
1095       return ERROR_PORT_STATE_UNEXPECTED;
1096     }
1097 
1098     // Swap the ports' peer information and switch them both to proxying mode.
1099     SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
1100     port0->state = Port::kProxying;
1101     port1->state = Port::kProxying;
1102     if (port0->peer_closed) {
1103       port0->remove_proxy_on_last_message = true;
1104     }
1105     if (port1->peer_closed) {
1106       port1->remove_proxy_on_last_message = true;
1107     }
1108   }
1109 
1110   // Flush any queued messages from the new proxies and, if successful, complete
1111   // the merge by initiating proxy removals.
1112   if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
1113       ForwardUserMessagesFromProxy(port1_ref) == OK) {
1114     for (auto& port_ref : port_refs) {
1115       bool try_remove_proxy_immediately = false;
1116       ScopedEvent closure_event;
1117       NodeName closure_event_target_node;
1118       {
1119         SinglePortLocker locker(port_ref);
1120         auto* port = locker.port();
1121         DCHECK(port->state == Port::kProxying);
1122         try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1123         if (try_remove_proxy_immediately || port->peer_closed) {
1124           // If either end of the port cycle is closed, we propagate an
1125           // ObserveClosure event.
1126           closure_event_target_node = port->peer_node_name;
1127           closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
1128               port->peer_port_name, port->last_sequence_num_to_receive);
1129         }
1130       }
1131       if (try_remove_proxy_immediately) {
1132         TryRemoveProxy(*port_ref);
1133       } else {
1134         InitiateProxyRemoval(*port_ref);
1135       }
1136 
1137       if (closure_event) {
1138         delegate_->ForwardEvent(closure_event_target_node,
1139                                 std::move(closure_event));
1140       }
1141     }
1142 
1143     return OK;
1144   }
1145 
1146   // If we failed to forward proxied messages, we keep the system in a
1147   // consistent state by undoing the peer swap and closing the ports.
1148   {
1149     PortLocker::AssertNoPortsLockedOnCurrentThread();
1150     mozilla::MutexAutoLock ports_locker(ports_lock_);
1151     PortLocker locker(port_refs, 2);
1152     auto* port0 = locker.GetPort(port0_ref);
1153     auto* port1 = locker.GetPort(port1_ref);
1154     SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
1155     port0->remove_proxy_on_last_message = false;
1156     port1->remove_proxy_on_last_message = false;
1157     DCHECK_EQ(Port::kProxying, port0->state);
1158     DCHECK_EQ(Port::kProxying, port1->state);
1159     port0->state = Port::kReceiving;
1160     port1->state = Port::kReceiving;
1161   }
1162 
1163   ClosePort(port0_ref);
1164   ClosePort(port1_ref);
1165   return ERROR_PORT_STATE_UNEXPECTED;
1166 }
1167 
ConvertToProxy(Port * port,const NodeName & to_node_name,PortName * port_name,Event::PortDescriptor * port_descriptor)1168 void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
1169                           PortName* port_name,
1170                           Event::PortDescriptor* port_descriptor) {
1171   port->AssertLockAcquired();
1172   PortName local_port_name = *port_name;
1173 
1174   PortName new_port_name;
1175   GenerateRandomPortName(&new_port_name);
1176 
1177   // Make sure we don't send messages to the new peer until after we know it
1178   // exists. In the meantime, just buffer messages locally.
1179   DCHECK(port->state == Port::kReceiving);
1180   port->state = Port::kBuffering;
1181 
1182   // If we already know our peer is closed, we already know this proxy can
1183   // be removed once it receives and forwards its last expected message.
1184   if (port->peer_closed) {
1185     port->remove_proxy_on_last_message = true;
1186   }
1187 
1188   *port_name = new_port_name;
1189 
1190   port_descriptor->peer_node_name = port->peer_node_name;
1191   port_descriptor->peer_port_name = port->peer_port_name;
1192   port_descriptor->referring_node_name = name_;
1193   port_descriptor->referring_port_name = local_port_name;
1194   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
1195   port_descriptor->next_sequence_num_to_receive =
1196       port->message_queue.next_sequence_num();
1197   port_descriptor->last_sequence_num_to_receive =
1198       port->last_sequence_num_to_receive;
1199   port_descriptor->peer_closed = port->peer_closed;
1200   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
1201 
1202   // Configure the local port to point to the new port.
1203   UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
1204 }
1205 
AcceptPort(const PortName & port_name,const Event::PortDescriptor & port_descriptor)1206 int Node::AcceptPort(const PortName& port_name,
1207                      const Event::PortDescriptor& port_descriptor) {
1208   RefPtr<Port> port =
1209       mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send,
1210                                 port_descriptor.next_sequence_num_to_receive);
1211   port->state = Port::kReceiving;
1212   port->peer_node_name = port_descriptor.peer_node_name;
1213   port->peer_port_name = port_descriptor.peer_port_name;
1214   port->last_sequence_num_to_receive =
1215       port_descriptor.last_sequence_num_to_receive;
1216   port->peer_closed = port_descriptor.peer_closed;
1217 
1218   DVLOG(2) << "Accepting port " << port_name
1219            << " [peer_closed=" << port->peer_closed
1220            << "; last_sequence_num_to_receive="
1221            << port->last_sequence_num_to_receive << "]";
1222 
1223   // A newly accepted port is not signalable until the message referencing the
1224   // new port finds its way to the consumer (see GetMessage).
1225   port->message_queue.set_signalable(false);
1226 
1227   int rv = AddPortWithName(port_name, std::move(port));
1228   if (rv != OK) {
1229     return rv;
1230   }
1231 
1232   // Allow referring port to forward messages.
1233   delegate_->ForwardEvent(port_descriptor.referring_node_name,
1234                           mozilla::MakeUnique<PortAcceptedEvent>(
1235                               port_descriptor.referring_port_name));
1236   return OK;
1237 }
1238 
PrepareToForwardUserMessage(const PortRef & forwarding_port_ref,Port::State expected_port_state,bool ignore_closed_peer,UserMessageEvent * message,NodeName * forward_to_node)1239 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
1240                                       Port::State expected_port_state,
1241                                       bool ignore_closed_peer,
1242                                       UserMessageEvent* message,
1243                                       NodeName* forward_to_node) {
1244   bool target_is_remote = false;
1245   for (;;) {
1246     NodeName target_node_name;
1247     {
1248       SinglePortLocker locker(&forwarding_port_ref);
1249       target_node_name = locker.port()->peer_node_name;
1250     }
1251 
1252     // NOTE: This may call out to arbitrary user code, so it's important to call
1253     // it only while no port locks are held on the calling thread.
1254     if (target_node_name != name_) {
1255       if (!message->NotifyWillBeRoutedExternally()) {
1256         CHROMIUM_LOG(ERROR)
1257             << "NotifyWillBeRoutedExternally failed unexpectedly.";
1258         return ERROR_PORT_STATE_UNEXPECTED;
1259       }
1260     }
1261 
1262     // Must be held because ConvertToProxy needs to update |peer_port_maps_|.
1263     PortLocker::AssertNoPortsLockedOnCurrentThread();
1264     mozilla::MutexAutoLock ports_locker(ports_lock_);
1265 
1266     // Simultaneously lock the forwarding port as well as all attached ports.
1267     AutoTArray<PortRef, 4> attached_port_refs;
1268     AutoTArray<const PortRef*, 5> ports_to_lock;
1269     attached_port_refs.SetCapacity(message->num_ports());
1270     ports_to_lock.SetCapacity(message->num_ports() + 1);
1271     ports_to_lock.AppendElement(&forwarding_port_ref);
1272     for (size_t i = 0; i < message->num_ports(); ++i) {
1273       const PortName& attached_port_name = message->ports()[i];
1274       auto iter = ports_.find(attached_port_name);
1275       DCHECK(iter != ports_.end());
1276       attached_port_refs.AppendElement(
1277           PortRef(attached_port_name, iter->second));
1278       ports_to_lock.AppendElement(&attached_port_refs[i]);
1279     }
1280     PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length());
1281     auto* forwarding_port = locker.GetPort(forwarding_port_ref);
1282 
1283     if (forwarding_port->peer_node_name != target_node_name) {
1284       // The target node has already changed since we last held the lock.
1285       if (target_node_name == name_) {
1286         // If the target node was previously this local node, we need to restart
1287         // the loop, since that means we may now route the message externally.
1288         continue;
1289       }
1290 
1291       target_node_name = forwarding_port->peer_node_name;
1292     }
1293     target_is_remote = target_node_name != name_;
1294 
1295     if (forwarding_port->state != expected_port_state) {
1296       return ERROR_PORT_STATE_UNEXPECTED;
1297     }
1298     if (forwarding_port->peer_closed && !ignore_closed_peer) {
1299       return ERROR_PORT_PEER_CLOSED;
1300     }
1301 
1302     // Messages may already have a sequence number if they're being forwarded by
1303     // a proxy. Otherwise, use the next outgoing sequence number.
1304     if (message->sequence_num() == 0) {
1305       message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
1306     }
1307 #ifdef DEBUG
1308     std::ostringstream ports_buf;
1309     for (size_t i = 0; i < message->num_ports(); ++i) {
1310       if (i > 0) {
1311         ports_buf << ",";
1312       }
1313       ports_buf << message->ports()[i];
1314     }
1315 #endif
1316 
1317     if (message->num_ports() > 0) {
1318       // Sanity check to make sure we can actually send all the attached ports.
1319       // They must all be in the |kReceiving| state and must not be the sender's
1320       // own peer.
1321       DCHECK_EQ(message->num_ports(), attached_port_refs.Length());
1322       for (size_t i = 0; i < message->num_ports(); ++i) {
1323         auto* attached_port = locker.GetPort(attached_port_refs[i]);
1324         int error = OK;
1325         if (attached_port->state != Port::kReceiving) {
1326           error = ERROR_PORT_STATE_UNEXPECTED;
1327         } else if (attached_port_refs[i].name() ==
1328                    forwarding_port->peer_port_name) {
1329           error = ERROR_PORT_CANNOT_SEND_PEER;
1330         }
1331 
1332         if (error != OK) {
1333           // Not going to send. Backpedal on the sequence number.
1334           forwarding_port->next_sequence_num_to_send--;
1335           return error;
1336         }
1337       }
1338 
1339       if (target_is_remote) {
1340         // We only bother to proxy and rewrite ports in the event if it's
1341         // going to be routed to an external node. This substantially reduces
1342         // the amount of port churn in the system, as many port-carrying
1343         // events are routed at least 1 or 2 intra-node hops before (if ever)
1344         // being routed externally.
1345         Event::PortDescriptor* port_descriptors = message->port_descriptors();
1346         for (size_t i = 0; i < message->num_ports(); ++i) {
1347           ConvertToProxy(locker.GetPort(attached_port_refs[i]),
1348                          target_node_name, message->ports() + i,
1349                          port_descriptors + i);
1350         }
1351       }
1352     }
1353 
1354 #ifdef DEBUG
1355     DVLOG(4) << "Sending message " << message->sequence_num()
1356              << " [ports=" << ports_buf.str() << "]"
1357              << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
1358              << forwarding_port->peer_port_name << "@" << target_node_name;
1359 #endif
1360 
1361     *forward_to_node = target_node_name;
1362     message->set_port_name(forwarding_port->peer_port_name);
1363     break;
1364   }
1365 
1366   if (target_is_remote) {
1367     for (size_t i = 0; i < message->num_ports(); ++i) {
1368       // For any ports that were converted to proxies above, make sure their
1369       // prior local peer (if applicable) receives a status update so it can be
1370       // made aware of its peer's location.
1371       const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
1372       if (descriptor.peer_node_name == name_) {
1373         PortRef local_peer;
1374         if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
1375           delegate_->PortStatusChanged(local_peer);
1376         }
1377       }
1378     }
1379   }
1380 
1381   return OK;
1382 }
1383 
BeginProxying(const PortRef & port_ref)1384 int Node::BeginProxying(const PortRef& port_ref) {
1385   {
1386     SinglePortLocker locker(&port_ref);
1387     auto* port = locker.port();
1388     if (port->state != Port::kBuffering) {
1389       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1390     }
1391     port->state = Port::kProxying;
1392   }
1393 
1394   int rv = ForwardUserMessagesFromProxy(port_ref);
1395   if (rv != OK) {
1396     return rv;
1397   }
1398 
1399   // Forward any pending acknowledge request.
1400   MaybeForwardAckRequest(port_ref);
1401 
1402   bool try_remove_proxy_immediately;
1403   ScopedEvent closure_event;
1404   NodeName closure_target_node;
1405   {
1406     SinglePortLocker locker(&port_ref);
1407     auto* port = locker.port();
1408     if (port->state != Port::kProxying) {
1409       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1410     }
1411 
1412     try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1413     if (try_remove_proxy_immediately) {
1414       // Make sure we propagate closure to our current peer.
1415       closure_target_node = port->peer_node_name;
1416       closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
1417           port->peer_port_name, port->last_sequence_num_to_receive);
1418     }
1419   }
1420 
1421   if (try_remove_proxy_immediately) {
1422     TryRemoveProxy(port_ref);
1423     delegate_->ForwardEvent(closure_target_node, std::move(closure_event));
1424   } else {
1425     InitiateProxyRemoval(port_ref);
1426   }
1427 
1428   return OK;
1429 }
1430 
ForwardUserMessagesFromProxy(const PortRef & port_ref)1431 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
1432   for (;;) {
1433     // NOTE: We forward messages in sequential order here so that we maintain
1434     // the message queue's notion of next sequence number. That's useful for the
1435     // proxy removal process as we can tell when this port has seen all of the
1436     // messages it is expected to see.
1437     mozilla::UniquePtr<UserMessageEvent> message;
1438     {
1439       SinglePortLocker locker(&port_ref);
1440       locker.port()->message_queue.GetNextMessage(&message, nullptr);
1441       if (!message) {
1442         break;
1443       }
1444     }
1445 
1446     NodeName target_node;
1447     int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
1448                                          true /* ignore_closed_peer */,
1449                                          message.get(), &target_node);
1450     if (rv != OK) {
1451       return rv;
1452     }
1453 
1454     delegate_->ForwardEvent(target_node, std::move(message));
1455   }
1456   return OK;
1457 }
1458 
InitiateProxyRemoval(const PortRef & port_ref)1459 void Node::InitiateProxyRemoval(const PortRef& port_ref) {
1460   NodeName peer_node_name;
1461   PortName peer_port_name;
1462   {
1463     SinglePortLocker locker(&port_ref);
1464     auto* port = locker.port();
1465     peer_node_name = port->peer_node_name;
1466     peer_port_name = port->peer_port_name;
1467   }
1468 
1469   // To remove this node, we start by notifying the connected graph that we are
1470   // a proxy. This allows whatever port is referencing this node to skip it.
1471   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1472   // the peer was closed in the meantime).
1473   delegate_->ForwardEvent(peer_node_name,
1474                           mozilla::MakeUnique<ObserveProxyEvent>(
1475                               peer_port_name, name_, port_ref.name(),
1476                               peer_node_name, peer_port_name));
1477 }
1478 
TryRemoveProxy(const PortRef & port_ref)1479 void Node::TryRemoveProxy(const PortRef& port_ref) {
1480   bool should_erase = false;
1481   NodeName removal_target_node;
1482   ScopedEvent removal_event;
1483 
1484   {
1485     SinglePortLocker locker(&port_ref);
1486     auto* port = locker.port();
1487     DCHECK(port->state == Port::kProxying);
1488 
1489     // Make sure we have seen ObserveProxyAck before removing the port.
1490     if (!port->remove_proxy_on_last_message) {
1491       return;
1492     }
1493 
1494     if (!CanAcceptMoreMessages(port)) {
1495       should_erase = true;
1496       if (port->send_on_proxy_removal) {
1497         removal_target_node = port->send_on_proxy_removal->first;
1498         removal_event = std::move(port->send_on_proxy_removal->second);
1499       }
1500     } else {
1501       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1502                << " now; waiting for more messages";
1503     }
1504   }
1505 
1506   if (should_erase) {
1507     ErasePort(port_ref.name());
1508   }
1509 
1510   if (removal_event) {
1511     delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
1512   }
1513 }
1514 
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1515 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1516                                    const PortName& port_name) {
1517   // Wipes out all ports whose peer node matches |node_name| and whose peer port
1518   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1519   // node is matched.
1520 
1521   std::vector<PortRef> ports_to_notify;
1522   std::vector<PortName> dead_proxies_to_broadcast;
1523   std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
1524 
1525   {
1526     PortLocker::AssertNoPortsLockedOnCurrentThread();
1527     mozilla::MutexAutoLock ports_lock(ports_lock_);
1528 
1529     auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
1530     if (node_peer_port_map_iter == peer_port_maps_.end()) {
1531       return;
1532     }
1533 
1534     auto& node_peer_port_map = node_peer_port_map_iter->second;
1535     auto peer_ports_begin = node_peer_port_map.begin();
1536     auto peer_ports_end = node_peer_port_map.end();
1537     if (port_name != kInvalidPortName) {
1538       // If |port_name| is given, we limit the set of local ports to the ones
1539       // with that specific port as their peer.
1540       peer_ports_begin = node_peer_port_map.find(port_name);
1541       if (peer_ports_begin == node_peer_port_map.end()) {
1542         return;
1543       }
1544 
1545       peer_ports_end = peer_ports_begin;
1546       ++peer_ports_end;
1547     }
1548 
1549     for (auto peer_port_iter = peer_ports_begin;
1550          peer_port_iter != peer_ports_end; ++peer_port_iter) {
1551       auto& local_ports = peer_port_iter->second;
1552       // NOTE: This inner loop almost always has only one element. There are
1553       // relatively short-lived cases where more than one local port points to
1554       // the same peer, and this only happens when extra ports are bypassed
1555       // proxies waiting to be torn down.
1556       for (auto& local_port : local_ports) {
1557         auto& local_port_ref = local_port.second;
1558 
1559         SinglePortLocker locker(&local_port_ref);
1560         auto* port = locker.port();
1561 
1562         if (!port->peer_closed) {
1563           // Treat this as immediate peer closure. It's an exceptional
1564           // condition akin to a broken pipe, so we don't care about losing
1565           // messages.
1566 
1567           port->peer_closed = true;
1568           port->peer_lost_unexpectedly = true;
1569           if (port->state == Port::kReceiving) {
1570             ports_to_notify.push_back(local_port_ref);
1571           }
1572         }
1573 
1574         // We don't expect to forward any further messages, and we don't
1575         // expect to receive a Port{Accepted,Rejected} event. Because we're
1576         // a proxy with no active peer, we cannot use the normal proxy removal
1577         // procedure of forward-propagating an ObserveProxy. Instead we
1578         // broadcast our own death so it can be back-propagated. This is
1579         // inefficient but rare.
1580         if (port->state != Port::kReceiving) {
1581           dead_proxies_to_broadcast.push_back(local_port_ref.name());
1582           std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
1583           port->message_queue.TakeAllMessages(&messages);
1584           for (auto& message : messages) {
1585             undelivered_messages.emplace_back(std::move(message));
1586           }
1587         }
1588       }
1589     }
1590   }
1591 
1592   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1593     ErasePort(proxy_name);
1594     DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1595   }
1596 
1597   // Wake up any receiving ports who have just observed simulated peer closure.
1598   for (const auto& port : ports_to_notify) {
1599     delegate_->PortStatusChanged(port);
1600   }
1601 
1602   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1603     // Broadcast an event signifying that this proxy is no longer functioning.
1604     delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>(
1605         kInvalidPortName, name_, proxy_name, kInvalidNodeName,
1606         kInvalidPortName));
1607 
1608     // Also process death locally since the port that points this closed one
1609     // could be on the current node.
1610     // Note: Although this is recursive, only a single port is involved which
1611     // limits the expected branching to 1.
1612     DestroyAllPortsWithPeer(name_, proxy_name);
1613   }
1614 
1615   // Close any ports referenced by undelivered messages.
1616   for (const auto& message : undelivered_messages) {
1617     for (size_t i = 0; i < message->num_ports(); ++i) {
1618       PortRef ref;
1619       if (GetPort(message->ports()[i], &ref) == OK) {
1620         ClosePort(ref);
1621       }
1622     }
1623   }
1624 }
1625 
UpdatePortPeerAddress(const PortName & local_port_name,Port * local_port,const NodeName & new_peer_node,const PortName & new_peer_port)1626 void Node::UpdatePortPeerAddress(const PortName& local_port_name,
1627                                  Port* local_port,
1628                                  const NodeName& new_peer_node,
1629                                  const PortName& new_peer_port) {
1630   ports_lock_.AssertCurrentThreadOwns();
1631   local_port->AssertLockAcquired();
1632 
1633   RemoveFromPeerPortMap(local_port_name, local_port);
1634   local_port->peer_node_name = new_peer_node;
1635   local_port->peer_port_name = new_peer_port;
1636   if (new_peer_port != kInvalidPortName) {
1637     peer_port_maps_[new_peer_node][new_peer_port].emplace(
1638         local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port}));
1639   }
1640 }
1641 
RemoveFromPeerPortMap(const PortName & local_port_name,Port * local_port)1642 void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
1643                                  Port* local_port) {
1644   if (local_port->peer_port_name == kInvalidPortName) {
1645     return;
1646   }
1647 
1648   auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
1649   if (node_iter == peer_port_maps_.end()) {
1650     return;
1651   }
1652 
1653   auto& node_peer_port_map = node_iter->second;
1654   auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
1655   if (ports_iter == node_peer_port_map.end()) {
1656     return;
1657   }
1658 
1659   auto& local_ports_with_this_peer = ports_iter->second;
1660   local_ports_with_this_peer.erase(local_port_name);
1661   if (local_ports_with_this_peer.empty()) {
1662     node_peer_port_map.erase(ports_iter);
1663   }
1664   if (node_peer_port_map.empty()) {
1665     peer_port_maps_.erase(node_iter);
1666   }
1667 }
1668 
SwapPortPeers(const PortName & port0_name,Port * port0,const PortName & port1_name,Port * port1)1669 void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
1670                          const PortName& port1_name, Port* port1) {
1671   ports_lock_.AssertCurrentThreadOwns();
1672   port0->AssertLockAcquired();
1673   port1->AssertLockAcquired();
1674 
1675   auto& peer0_ports =
1676       peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
1677   auto& peer1_ports =
1678       peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
1679   peer0_ports.erase(port0_name);
1680   peer1_ports.erase(port1_name);
1681   peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1}));
1682   peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0}));
1683 
1684   std::swap(port0->peer_node_name, port1->peer_node_name);
1685   std::swap(port0->peer_port_name, port1->peer_port_name);
1686 }
1687 
MaybeResendAckRequest(const PortRef & port_ref)1688 void Node::MaybeResendAckRequest(const PortRef& port_ref) {
1689   NodeName peer_node_name;
1690   ScopedEvent ack_request_event;
1691   {
1692     SinglePortLocker locker(&port_ref);
1693     auto* port = locker.port();
1694     if (port->state != Port::kReceiving) {
1695       return;
1696     }
1697 
1698     if (!port->sequence_num_acknowledge_interval) {
1699       return;
1700     }
1701 
1702     peer_node_name = port->peer_node_name;
1703     ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
1704         port->peer_port_name, port->last_sequence_num_acknowledged +
1705                                   port->sequence_num_acknowledge_interval);
1706   }
1707 
1708   delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
1709 }
1710 
MaybeForwardAckRequest(const PortRef & port_ref)1711 void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
1712   NodeName peer_node_name;
1713   ScopedEvent ack_request_event;
1714   {
1715     SinglePortLocker locker(&port_ref);
1716     auto* port = locker.port();
1717     if (port->state != Port::kProxying) {
1718       return;
1719     }
1720 
1721     if (!port->sequence_num_to_acknowledge) {
1722       return;
1723     }
1724 
1725     peer_node_name = port->peer_node_name;
1726     ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
1727         port->peer_port_name, port->sequence_num_to_acknowledge);
1728 
1729     port->sequence_num_to_acknowledge = 0;
1730   }
1731 
1732   delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
1733 }
1734 
MaybeResendAck(const PortRef & port_ref)1735 void Node::MaybeResendAck(const PortRef& port_ref) {
1736   NodeName peer_node_name;
1737   ScopedEvent ack_event;
1738   {
1739     SinglePortLocker locker(&port_ref);
1740     auto* port = locker.port();
1741     if (port->state != Port::kReceiving) {
1742       return;
1743     }
1744 
1745     uint64_t last_sequence_num_read =
1746         port->message_queue.next_sequence_num() - 1;
1747     if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) {
1748       return;
1749     }
1750 
1751     peer_node_name = port->peer_node_name;
1752     ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
1753         port->peer_port_name, last_sequence_num_read);
1754   }
1755 
1756   delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
1757 }
1758 
DelegateHolder(Node * node,NodeDelegate * delegate)1759 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
1760     : node_(node), delegate_(delegate) {
1761   DCHECK(node_);
1762 }
1763 
1764 Node::DelegateHolder::~DelegateHolder() = default;
1765 
1766 #ifdef DEBUG
EnsureSafeDelegateAccess() const1767 void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
1768   PortLocker::AssertNoPortsLockedOnCurrentThread();
1769   mozilla::MutexAutoLock lock(node_->ports_lock_);
1770 }
1771 #endif
1772 
1773 }  // namespace ports
1774 }  // namespace core
1775 }  // namespace mojo
1776