1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 
10 #include <map>
11 #include <sstream>
12 #include <utility>
13 
14 #include "base/logging.h"
15 #include "base/waitable_event.h"
16 #include "base/thread.h"
17 #include "base/string_piece.h"
18 #include "base/string_util.h"
19 #include "mojo/core/ports/event.h"
20 #include "mojo/core/ports/node.h"
21 #include "mojo/core/ports/node_delegate.h"
22 #include "mojo/core/ports/user_message.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24 
25 #include "mozilla/Mutex.h"
26 
27 namespace mojo {
28 namespace core {
29 namespace ports {
30 namespace test {
31 
32 namespace {
33 
34 // TODO(rockot): Remove this unnecessary alias.
35 using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>;
36 
37 class TestMessage : public UserMessage {
38  public:
39   static const TypeInfo kUserMessageTypeInfo;
40 
TestMessage(const std::string & payload)41   explicit TestMessage(const std::string& payload)
42       : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
43   ~TestMessage() override = default;
44 
payload() const45   const std::string& payload() const { return payload_; }
46 
47  private:
48   std::string payload_;
49 };
50 
51 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
52 
NewUserMessageEvent(const std::string & payload,size_t num_ports)53 ScopedMessage NewUserMessageEvent(const std::string& payload,
54                                   size_t num_ports) {
55   auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports);
56   event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload));
57   return event;
58 }
59 
MessageEquals(const ScopedMessage & message,const std::string & s)60 bool MessageEquals(const ScopedMessage& message, const std::string& s) {
61   return message->GetMessage<TestMessage>()->payload() == s;
62 }
63 
64 class TestNode;
65 
66 class MessageRouter {
67  public:
68   virtual ~MessageRouter() = default;
69 
70   virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
71                             ScopedEvent event) = 0;
72   virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
73 };
74 
75 class TestNode : public NodeDelegate {
76  public:
TestNode(uint64_t id)77   explicit TestNode(uint64_t id)
78       : node_name_(id, 1),
79         node_(node_name_, this),
80         node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()),
81         events_available_event_(/* manual_reset */ false,
82                                 /* initially_signaled */ false),
83         idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {}
84 
~TestNode()85   ~TestNode() override {
86     StopWhenIdle();
87     node_thread_.Stop();
88   }
89 
name() const90   const NodeName& name() const { return node_name_; }
91 
92   // NOTE: Node is thread-safe.
node()93   Node& node() { return node_; }
94 
idle_event()95   base::WaitableEvent& idle_event() { return idle_event_; }
96 
IsIdle()97   bool IsIdle() {
98     mozilla::MutexAutoLock lock(lock_);
99     return started_ && !dispatching_ &&
100            (incoming_events_.empty() || (block_on_event_ && blocked_));
101   }
102 
BlockOnEvent(Event::Type type)103   void BlockOnEvent(Event::Type type) {
104     mozilla::MutexAutoLock lock(lock_);
105     blocked_event_type_ = type;
106     block_on_event_ = true;
107   }
108 
Unblock()109   void Unblock() {
110     mozilla::MutexAutoLock lock(lock_);
111     block_on_event_ = false;
112     events_available_event_.Signal();
113   }
114 
Start(MessageRouter * router)115   void Start(MessageRouter* router) {
116     router_ = router;
117     node_thread_.Start();
118     node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod(
119         "TestNode::ProcessEvents", this, &TestNode::ProcessEvents));
120   }
121 
StopWhenIdle()122   void StopWhenIdle() {
123     mozilla::MutexAutoLock lock(lock_);
124     should_quit_ = true;
125     events_available_event_.Signal();
126   }
127 
WakeUp()128   void WakeUp() { events_available_event_.Signal(); }
129 
SendStringMessage(const PortRef & port,const std::string & s)130   int SendStringMessage(const PortRef& port, const std::string& s) {
131     return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
132   }
133 
SendMultipleMessages(const PortRef & port,size_t num_messages)134   int SendMultipleMessages(const PortRef& port, size_t num_messages) {
135     for (size_t i = 0; i < num_messages; ++i) {
136       int result = SendStringMessage(port, "");
137       if (result != OK) {
138         return result;
139       }
140     }
141     return OK;
142   }
143 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)144   int SendStringMessageWithPort(const PortRef& port, const std::string& s,
145                                 const PortName& sent_port_name) {
146     auto event = NewUserMessageEvent(s, 1);
147     event->ports()[0] = sent_port_name;
148     return node_.SendUserMessage(port, std::move(event));
149   }
150 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)151   int SendStringMessageWithPort(const PortRef& port, const std::string& s,
152                                 const PortRef& sent_port) {
153     return SendStringMessageWithPort(port, s, sent_port.name());
154   }
155 
set_drop_messages(bool value)156   void set_drop_messages(bool value) {
157     mozilla::MutexAutoLock lock(lock_);
158     drop_messages_ = value;
159   }
160 
set_save_messages(bool value)161   void set_save_messages(bool value) {
162     mozilla::MutexAutoLock lock(lock_);
163     save_messages_ = value;
164   }
165 
ReadMessage(const PortRef & port,ScopedMessage * message)166   bool ReadMessage(const PortRef& port, ScopedMessage* message) {
167     return node_.GetMessage(port, message, nullptr) == OK && *message;
168   }
169 
ReadMultipleMessages(const PortRef & port,size_t num_messages)170   bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
171     for (size_t i = 0; i < num_messages; ++i) {
172       ScopedMessage message;
173       if (!ReadMessage(port, &message)) {
174         return false;
175       }
176     }
177     return true;
178   }
179 
GetSavedMessage(ScopedMessage * message)180   bool GetSavedMessage(ScopedMessage* message) {
181     mozilla::MutexAutoLock lock(lock_);
182     if (saved_messages_.empty()) {
183       message->reset();
184       return false;
185     }
186     std::swap(*message, saved_messages_.front());
187     saved_messages_.pop();
188     return true;
189   }
190 
EnqueueEvent(ScopedEvent event)191   void EnqueueEvent(ScopedEvent event) {
192     idle_event_.Reset();
193 
194     // NOTE: This may be called from ForwardMessage and thus must not reenter
195     // |node_|.
196     mozilla::MutexAutoLock lock(lock_);
197     incoming_events_.emplace(std::move(event));
198     events_available_event_.Signal();
199   }
200 
ForwardEvent(const NodeName & node_name,ScopedEvent event)201   void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
202     {
203       mozilla::MutexAutoLock lock(lock_);
204       if (drop_messages_) {
205         DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
206                  << node_name;
207 
208         mozilla::MutexAutoUnlock unlock(lock_);
209         ClosePortsInEvent(event.get());
210         return;
211       }
212     }
213 
214     DCHECK(router_);
215     DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
216     router_->ForwardEvent(this, node_name, std::move(event));
217   }
218 
BroadcastEvent(ScopedEvent event)219   void BroadcastEvent(ScopedEvent event) override {
220     router_->BroadcastEvent(this, std::move(event));
221   }
222 
PortStatusChanged(const PortRef & port)223   void PortStatusChanged(const PortRef& port) override {
224     // The port may be closed, in which case we ignore the notification.
225     mozilla::MutexAutoLock lock(lock_);
226     if (!save_messages_) {
227       return;
228     }
229 
230     for (;;) {
231       ScopedMessage message;
232       {
233         mozilla::MutexAutoUnlock unlock(lock_);
234         if (!ReadMessage(port, &message)) {
235           break;
236         }
237       }
238 
239       saved_messages_.emplace(std::move(message));
240     }
241   }
242 
ClosePortsInEvent(Event * event)243   void ClosePortsInEvent(Event* event) {
244     if (event->type() != Event::Type::kUserMessage) {
245       return;
246     }
247 
248     UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
249     for (size_t i = 0; i < message_event->num_ports(); ++i) {
250       PortRef port;
251       ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
252       EXPECT_EQ(OK, node_.ClosePort(port));
253     }
254   }
255 
GetUnacknowledgedMessageCount(const PortRef & port_ref)256   uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
257     PortStatus status{};
258     if (node_.GetStatus(port_ref, &status) != OK) {
259       return 0;
260     }
261 
262     return status.unacknowledged_message_count;
263   }
264 
265  private:
ProcessEvents()266   void ProcessEvents() {
267     for (;;) {
268       events_available_event_.Wait();
269       mozilla::MutexAutoLock lock(lock_);
270 
271       if (should_quit_) {
272         return;
273       }
274 
275       dispatching_ = true;
276       while (!incoming_events_.empty()) {
277         if (block_on_event_ &&
278             incoming_events_.front()->type() == blocked_event_type_) {
279           blocked_ = true;
280           // Go idle if we hit a blocked event type.
281           break;
282         }
283         blocked_ = false;
284 
285         ScopedEvent event = std::move(incoming_events_.front());
286         incoming_events_.pop();
287 
288         // NOTE: AcceptMessage() can re-enter this object to call any of the
289         // NodeDelegate interface methods.
290         mozilla::MutexAutoUnlock unlock(lock_);
291         node_.AcceptEvent(std::move(event));
292       }
293 
294       dispatching_ = false;
295       started_ = true;
296       idle_event_.Signal();
297     };
298   }
299 
300   const NodeName node_name_;
301   Node node_;
302   MessageRouter* router_ = nullptr;
303 
304   base::Thread node_thread_;
305   base::WaitableEvent events_available_event_;
306   base::WaitableEvent idle_event_;
307 
308   // Guards fields below.
309   mozilla::Mutex lock_{"TestNode"};
310   bool started_ = false;
311   bool dispatching_ = false;
312   bool should_quit_ = false;
313   bool drop_messages_ = false;
314   bool save_messages_ = false;
315   bool blocked_ = false;
316   bool block_on_event_ = false;
317   Event::Type blocked_event_type_{};
318   std::queue<ScopedEvent> incoming_events_;
319   std::queue<ScopedMessage> saved_messages_;
320 };
321 
322 class PortsTest : public testing::Test, public MessageRouter {
323  public:
AddNode(TestNode * node)324   void AddNode(TestNode* node) {
325     {
326       mozilla::MutexAutoLock lock(lock_);
327       nodes_[node->name()] = node;
328     }
329     node->Start(this);
330   }
331 
RemoveNode(TestNode * node)332   void RemoveNode(TestNode* node) {
333     {
334       mozilla::MutexAutoLock lock(lock_);
335       nodes_.erase(node->name());
336     }
337 
338     for (const auto& entry : nodes_) {
339       entry.second->node().LostConnectionToNode(node->name());
340     }
341   }
342 
343   // Waits until all known Nodes are idle. Message forwarding and processing
344   // is handled in such a way that idleness is a stable state: once all nodes in
345   // the system are idle, they will remain idle until the test explicitly
346   // initiates some further event (e.g. sending a message, closing a port, or
347   // removing a Node).
WaitForIdle()348   void WaitForIdle() {
349     for (;;) {
350       mozilla::MutexAutoLock global_lock(global_lock_);
351       bool all_nodes_idle = true;
352       for (const auto& entry : nodes_) {
353         if (!entry.second->IsIdle()) {
354           all_nodes_idle = false;
355         }
356         entry.second->WakeUp();
357       }
358       if (all_nodes_idle) {
359         return;
360       }
361 
362       // Wait for any Node to signal that it's idle.
363       mozilla::MutexAutoUnlock global_unlock(global_lock_);
364       std::vector<base::WaitableEvent*> events;
365       for (const auto& entry : nodes_) {
366         events.push_back(&entry.second->idle_event());
367       }
368       base::WaitableEvent::WaitMany(events.data(), events.size());
369     }
370   }
371 
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)372   void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
373                       PortRef* port1) {
374     if (node0 == node1) {
375       EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
376     } else {
377       EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
378       EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
379       EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
380                                                  port1->name()));
381       EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
382                                                  port0->name()));
383     }
384   }
385 
386  private:
387   // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)388   void ForwardEvent(TestNode* from_node, const NodeName& node_name,
389                     ScopedEvent event) override {
390     mozilla::MutexAutoLock global_lock(global_lock_);
391     mozilla::MutexAutoLock lock(lock_);
392     // Drop messages from nodes that have been removed.
393     if (nodes_.find(from_node->name()) == nodes_.end()) {
394       from_node->ClosePortsInEvent(event.get());
395       return;
396     }
397 
398     auto it = nodes_.find(node_name);
399     if (it == nodes_.end()) {
400       DVLOG(1) << "Node not found: " << node_name;
401       return;
402     }
403 
404     // Serialize and de-serialize all forwarded events.
405     size_t buf_size = event->GetSerializedSize();
406     mozilla::UniquePtr<char[]> buf(new char[buf_size]);
407     event->Serialize(buf.get());
408     ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
409     // This should always succeed unless serialization or deserialization
410     // is broken. In that case, the loss of events should cause a test failure.
411     ASSERT_TRUE(copy);
412 
413     // Also copy the payload for user messages.
414     if (event->type() == Event::Type::kUserMessage) {
415       UserMessageEvent* message_event =
416           static_cast<UserMessageEvent*>(event.get());
417       UserMessageEvent* message_copy =
418           static_cast<UserMessageEvent*>(copy.get());
419 
420       message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>(
421           message_event->GetMessage<TestMessage>()->payload()));
422     }
423 
424     it->second->EnqueueEvent(std::move(event));
425   }
426 
BroadcastEvent(TestNode * from_node,ScopedEvent event)427   void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
428     mozilla::MutexAutoLock global_lock(global_lock_);
429     mozilla::MutexAutoLock lock(lock_);
430 
431     // Drop messages from nodes that have been removed.
432     if (nodes_.find(from_node->name()) == nodes_.end()) {
433       return;
434     }
435 
436     for (const auto& entry : nodes_) {
437       TestNode* node = entry.second;
438       // Broadcast doesn't deliver to the local node.
439       if (node == from_node) {
440         continue;
441       }
442       node->EnqueueEvent(event->Clone());
443     }
444   }
445 
446   // Acquired before any operation which makes a Node busy, and before testing
447   // if all nodes are idle.
448   mozilla::Mutex global_lock_{"PortsTest Global Lock"};
449 
450   mozilla::Mutex lock_{"PortsTest Lock"};
451   std::map<NodeName, TestNode*> nodes_;
452 };
453 
454 }  // namespace
455 
TEST_F(PortsTest,Basic1)456 TEST_F(PortsTest, Basic1) {
457   TestNode node0(0);
458   AddNode(&node0);
459 
460   TestNode node1(1);
461   AddNode(&node1);
462 
463   PortRef x0, x1;
464   CreatePortPair(&node0, &x0, &node1, &x1);
465 
466   PortRef a0, a1;
467   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
468   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
469   EXPECT_EQ(OK, node0.node().ClosePort(a0));
470 
471   EXPECT_EQ(OK, node0.node().ClosePort(x0));
472   EXPECT_EQ(OK, node1.node().ClosePort(x1));
473 
474   WaitForIdle();
475 
476   EXPECT_TRUE(node0.node().CanShutdownCleanly());
477   EXPECT_TRUE(node1.node().CanShutdownCleanly());
478 }
479 
TEST_F(PortsTest,Basic2)480 TEST_F(PortsTest, Basic2) {
481   TestNode node0(0);
482   AddNode(&node0);
483 
484   TestNode node1(1);
485   AddNode(&node1);
486 
487   PortRef x0, x1;
488   CreatePortPair(&node0, &x0, &node1, &x1);
489 
490   PortRef b0, b1;
491   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
492   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
493   EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
494 
495   EXPECT_EQ(OK, node0.node().ClosePort(b0));
496 
497   EXPECT_EQ(OK, node0.node().ClosePort(x0));
498   EXPECT_EQ(OK, node1.node().ClosePort(x1));
499 
500   WaitForIdle();
501 
502   EXPECT_TRUE(node0.node().CanShutdownCleanly());
503   EXPECT_TRUE(node1.node().CanShutdownCleanly());
504 }
505 
TEST_F(PortsTest,Basic3)506 TEST_F(PortsTest, Basic3) {
507   TestNode node0(0);
508   AddNode(&node0);
509 
510   TestNode node1(1);
511   AddNode(&node1);
512 
513   PortRef x0, x1;
514   CreatePortPair(&node0, &x0, &node1, &x1);
515 
516   PortRef a0, a1;
517   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
518 
519   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
520   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
521 
522   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
523 
524   PortRef b0, b1;
525   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
526   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
527   EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
528 
529   EXPECT_EQ(OK, node0.node().ClosePort(b0));
530 
531   EXPECT_EQ(OK, node0.node().ClosePort(x0));
532   EXPECT_EQ(OK, node1.node().ClosePort(x1));
533 
534   WaitForIdle();
535 
536   EXPECT_TRUE(node0.node().CanShutdownCleanly());
537   EXPECT_TRUE(node1.node().CanShutdownCleanly());
538 }
539 
TEST_F(PortsTest,LostConnectionToNode1)540 TEST_F(PortsTest, LostConnectionToNode1) {
541   TestNode node0(0);
542   AddNode(&node0);
543 
544   TestNode node1(1);
545   AddNode(&node1);
546   node1.set_drop_messages(true);
547 
548   PortRef x0, x1;
549   CreatePortPair(&node0, &x0, &node1, &x1);
550 
551   // Transfer a port to node1 and simulate a lost connection to node1.
552 
553   PortRef a0, a1;
554   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
555   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
556 
557   WaitForIdle();
558 
559   RemoveNode(&node1);
560 
561   WaitForIdle();
562 
563   EXPECT_EQ(OK, node0.node().ClosePort(a0));
564   EXPECT_EQ(OK, node0.node().ClosePort(x0));
565   EXPECT_EQ(OK, node1.node().ClosePort(x1));
566 
567   WaitForIdle();
568 
569   EXPECT_TRUE(node0.node().CanShutdownCleanly());
570   EXPECT_TRUE(node1.node().CanShutdownCleanly());
571 }
572 
TEST_F(PortsTest,LostConnectionToNode2)573 TEST_F(PortsTest, LostConnectionToNode2) {
574   TestNode node0(0);
575   AddNode(&node0);
576 
577   TestNode node1(1);
578   AddNode(&node1);
579 
580   PortRef x0, x1;
581   CreatePortPair(&node0, &x0, &node1, &x1);
582 
583   PortRef a0, a1;
584   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
585   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
586 
587   WaitForIdle();
588 
589   node1.set_drop_messages(true);
590 
591   RemoveNode(&node1);
592 
593   WaitForIdle();
594 
595   // a0 should have eventually detected peer closure after node loss.
596   ScopedMessage message;
597   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
598             node0.node().GetMessage(a0, &message, nullptr));
599   EXPECT_FALSE(message);
600 
601   EXPECT_EQ(OK, node0.node().ClosePort(a0));
602 
603   EXPECT_EQ(OK, node0.node().ClosePort(x0));
604 
605   EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
606   EXPECT_TRUE(message);
607   node1.ClosePortsInEvent(message.get());
608 
609   EXPECT_EQ(OK, node1.node().ClosePort(x1));
610 
611   WaitForIdle();
612 
613   EXPECT_TRUE(node0.node().CanShutdownCleanly());
614   EXPECT_TRUE(node1.node().CanShutdownCleanly());
615 }
616 
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)617 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
618   // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
619   // node.
620 
621   TestNode node0(0);
622   AddNode(&node0);
623 
624   TestNode node1(1);
625   AddNode(&node1);
626 
627   TestNode node2(2);
628   AddNode(&node2);
629 
630   // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
631   PortRef A, B, C, D;
632   CreatePortPair(&node0, &A, &node1, &B);
633   CreatePortPair(&node1, &C, &node2, &D);
634 
635   // Create E-F and send F over A to node 1.
636   PortRef E, F;
637   EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
638   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
639 
640   WaitForIdle();
641 
642   ScopedMessage message;
643   ASSERT_TRUE(node1.ReadMessage(B, &message));
644   ASSERT_EQ(1u, message->num_ports());
645 
646   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
647 
648   // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
649   // will trivially become aware of the loss, and this test verifies that the
650   // port A on node 0 will eventually also become aware of it.
651 
652   // Make sure node2 stops processing events when it encounters an ObserveProxy.
653   node2.BlockOnEvent(Event::Type::kObserveProxy);
654 
655   EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
656   WaitForIdle();
657 
658   // Simulate node 1 and 2 disconnecting.
659   EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
660 
661   // Let node2 continue processing events and wait for everyone to go idle.
662   node2.Unblock();
663   WaitForIdle();
664 
665   // Port F should be gone.
666   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
667 
668   // Port E should have detected peer closure despite the fact that there is
669   // no longer a continuous route from F to E over which the event could travel.
670   PortStatus status{};
671   EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
672   EXPECT_TRUE(status.peer_closed);
673 
674   EXPECT_EQ(OK, node0.node().ClosePort(A));
675   EXPECT_EQ(OK, node1.node().ClosePort(B));
676   EXPECT_EQ(OK, node1.node().ClosePort(C));
677   EXPECT_EQ(OK, node0.node().ClosePort(E));
678 
679   WaitForIdle();
680 
681   EXPECT_TRUE(node0.node().CanShutdownCleanly());
682   EXPECT_TRUE(node1.node().CanShutdownCleanly());
683 }
684 
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)685 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
686   // Tests that a proxy gets cleaned up when its direct peer lives on a lost
687   // node and it's predecessor lives on the same node.
688 
689   TestNode node0(0);
690   AddNode(&node0);
691 
692   TestNode node1(1);
693   AddNode(&node1);
694 
695   PortRef A, B;
696   CreatePortPair(&node0, &A, &node1, &B);
697 
698   PortRef C, D;
699   EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
700 
701   // Send D but block node0 on an ObserveProxy event.
702   node0.BlockOnEvent(Event::Type::kObserveProxy);
703   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
704 
705   // node0 won't collapse the proxy but node1 will receive the message before
706   // going idle.
707   WaitForIdle();
708 
709   ScopedMessage message;
710   ASSERT_TRUE(node1.ReadMessage(B, &message));
711   ASSERT_EQ(1u, message->num_ports());
712   PortRef E;
713   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
714 
715   RemoveNode(&node1);
716 
717   node0.Unblock();
718   WaitForIdle();
719 
720   // Port C should have detected peer closure.
721   PortStatus status{};
722   EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
723   EXPECT_TRUE(status.peer_closed);
724 
725   EXPECT_EQ(OK, node0.node().ClosePort(A));
726   EXPECT_EQ(OK, node1.node().ClosePort(B));
727   EXPECT_EQ(OK, node0.node().ClosePort(C));
728   EXPECT_EQ(OK, node1.node().ClosePort(E));
729 
730   EXPECT_TRUE(node0.node().CanShutdownCleanly());
731   EXPECT_TRUE(node1.node().CanShutdownCleanly());
732 }
733 
TEST_F(PortsTest,GetMessage1)734 TEST_F(PortsTest, GetMessage1) {
735   TestNode node(0);
736   AddNode(&node);
737 
738   PortRef a0, a1;
739   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
740 
741   ScopedMessage message;
742   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
743   EXPECT_FALSE(message);
744 
745   EXPECT_EQ(OK, node.node().ClosePort(a1));
746 
747   WaitForIdle();
748 
749   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
750             node.node().GetMessage(a0, &message, nullptr));
751   EXPECT_FALSE(message);
752 
753   EXPECT_EQ(OK, node.node().ClosePort(a0));
754 
755   WaitForIdle();
756 
757   EXPECT_TRUE(node.node().CanShutdownCleanly());
758 }
759 
TEST_F(PortsTest,GetMessage2)760 TEST_F(PortsTest, GetMessage2) {
761   TestNode node(0);
762   AddNode(&node);
763 
764   PortRef a0, a1;
765   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
766 
767   EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
768 
769   ScopedMessage message;
770   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
771 
772   ASSERT_TRUE(message);
773   EXPECT_TRUE(MessageEquals(message, "1"));
774 
775   EXPECT_EQ(OK, node.node().ClosePort(a0));
776   EXPECT_EQ(OK, node.node().ClosePort(a1));
777 
778   EXPECT_TRUE(node.node().CanShutdownCleanly());
779 }
780 
TEST_F(PortsTest,GetMessage3)781 TEST_F(PortsTest, GetMessage3) {
782   TestNode node(0);
783   AddNode(&node);
784 
785   PortRef a0, a1;
786   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
787 
788   const char* kStrings[] = {"1", "2", "3"};
789 
790   for (auto& kString : kStrings) {
791     EXPECT_EQ(OK, node.SendStringMessage(a1, kString));
792   }
793 
794   ScopedMessage message;
795   for (auto& kString : kStrings) {
796     EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
797     ASSERT_TRUE(message);
798     EXPECT_TRUE(MessageEquals(message, kString));
799   }
800 
801   EXPECT_EQ(OK, node.node().ClosePort(a0));
802   EXPECT_EQ(OK, node.node().ClosePort(a1));
803 
804   EXPECT_TRUE(node.node().CanShutdownCleanly());
805 }
806 
TEST_F(PortsTest,Delegation1)807 TEST_F(PortsTest, Delegation1) {
808   TestNode node0(0);
809   AddNode(&node0);
810 
811   TestNode node1(1);
812   AddNode(&node1);
813 
814   PortRef x0, x1;
815   CreatePortPair(&node0, &x0, &node1, &x1);
816 
817   // In this test, we send a message to a port that has been moved.
818 
819   PortRef a0, a1;
820   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
821   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
822   WaitForIdle();
823 
824   ScopedMessage message;
825   ASSERT_TRUE(node1.ReadMessage(x1, &message));
826   ASSERT_EQ(1u, message->num_ports());
827   EXPECT_TRUE(MessageEquals(message, "a1"));
828 
829   // This is "a1" from the point of view of node1.
830   PortName a2_name = message->ports()[0];
831   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
832   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
833 
834   WaitForIdle();
835 
836   ASSERT_TRUE(node0.ReadMessage(x0, &message));
837   ASSERT_EQ(1u, message->num_ports());
838   EXPECT_TRUE(MessageEquals(message, "a2"));
839 
840   // This is "a2" from the point of view of node1.
841   PortName a3_name = message->ports()[0];
842 
843   PortRef a3;
844   EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
845 
846   ASSERT_TRUE(node0.ReadMessage(a3, &message));
847   EXPECT_EQ(0u, message->num_ports());
848   EXPECT_TRUE(MessageEquals(message, "hello"));
849 
850   EXPECT_EQ(OK, node0.node().ClosePort(a0));
851   EXPECT_EQ(OK, node0.node().ClosePort(a3));
852 
853   EXPECT_EQ(OK, node0.node().ClosePort(x0));
854   EXPECT_EQ(OK, node1.node().ClosePort(x1));
855 
856   EXPECT_TRUE(node0.node().CanShutdownCleanly());
857   EXPECT_TRUE(node1.node().CanShutdownCleanly());
858 }
859 
TEST_F(PortsTest,Delegation2)860 TEST_F(PortsTest, Delegation2) {
861   TestNode node0(0);
862   AddNode(&node0);
863 
864   TestNode node1(1);
865   AddNode(&node1);
866 
867   for (int i = 0; i < 100; ++i) {
868     // Setup pipe a<->b between node0 and node1.
869     PortRef A, B;
870     CreatePortPair(&node0, &A, &node1, &B);
871 
872     PortRef C, D;
873     EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
874 
875     PortRef E, F;
876     EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
877 
878     node1.set_save_messages(true);
879 
880     // Pass D over A to B.
881     EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
882 
883     // Pass F over C to D.
884     EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
885 
886     // This message should find its way to node1.
887     EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
888 
889     WaitForIdle();
890 
891     EXPECT_EQ(OK, node0.node().ClosePort(C));
892     EXPECT_EQ(OK, node0.node().ClosePort(E));
893 
894     EXPECT_EQ(OK, node0.node().ClosePort(A));
895     EXPECT_EQ(OK, node1.node().ClosePort(B));
896 
897     bool got_hello = false;
898     ScopedMessage message;
899     while (node1.GetSavedMessage(&message)) {
900       node1.ClosePortsInEvent(message.get());
901       if (MessageEquals(message, "hello")) {
902         got_hello = true;
903         break;
904       }
905     }
906 
907     EXPECT_TRUE(got_hello);
908 
909     WaitForIdle();  // Because closing ports may have generated tasks.
910   }
911 
912   EXPECT_TRUE(node0.node().CanShutdownCleanly());
913   EXPECT_TRUE(node1.node().CanShutdownCleanly());
914 }
915 
TEST_F(PortsTest,SendUninitialized)916 TEST_F(PortsTest, SendUninitialized) {
917   TestNode node(0);
918   AddNode(&node);
919 
920   PortRef x0;
921   EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
922   EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
923   EXPECT_EQ(OK, node.node().ClosePort(x0));
924   EXPECT_TRUE(node.node().CanShutdownCleanly());
925 }
926 
TEST_F(PortsTest,SendFailure)927 TEST_F(PortsTest, SendFailure) {
928   TestNode node(0);
929   AddNode(&node);
930 
931   node.set_save_messages(true);
932 
933   PortRef A, B;
934   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
935 
936   // Try to send A over itself.
937 
938   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
939             node.SendStringMessageWithPort(A, "oops", A));
940 
941   // Try to send B over A.
942 
943   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
944             node.SendStringMessageWithPort(A, "nope", B));
945 
946   // B should be closed immediately.
947   EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
948 
949   WaitForIdle();
950 
951   // There should have been no messages accepted.
952   ScopedMessage message;
953   EXPECT_FALSE(node.GetSavedMessage(&message));
954 
955   EXPECT_EQ(OK, node.node().ClosePort(A));
956 
957   WaitForIdle();
958 
959   EXPECT_TRUE(node.node().CanShutdownCleanly());
960 }
961 
TEST_F(PortsTest,DontLeakUnreceivedPorts)962 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
963   TestNode node(0);
964   AddNode(&node);
965 
966   PortRef A, B, C, D;
967   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
968   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
969 
970   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
971 
972   EXPECT_EQ(OK, node.node().ClosePort(C));
973   EXPECT_EQ(OK, node.node().ClosePort(A));
974   EXPECT_EQ(OK, node.node().ClosePort(B));
975 
976   WaitForIdle();
977 
978   EXPECT_TRUE(node.node().CanShutdownCleanly());
979 }
980 
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)981 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
982   TestNode node(0);
983   AddNode(&node);
984 
985   PortRef A, B, C, D;
986   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
987   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
988 
989   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
990 
991   ScopedMessage message;
992   EXPECT_TRUE(node.ReadMessage(B, &message));
993   ASSERT_EQ(1u, message->num_ports());
994   EXPECT_TRUE(MessageEquals(message, "foo"));
995   PortRef E;
996   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
997 
998   EXPECT_TRUE(
999       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1000 
1001   WaitForIdle();
1002 
1003   EXPECT_TRUE(
1004       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1005   EXPECT_FALSE(node.node().CanShutdownCleanly());
1006 
1007   EXPECT_EQ(OK, node.node().ClosePort(A));
1008   EXPECT_EQ(OK, node.node().ClosePort(B));
1009   EXPECT_EQ(OK, node.node().ClosePort(C));
1010   EXPECT_EQ(OK, node.node().ClosePort(E));
1011 
1012   WaitForIdle();
1013 
1014   EXPECT_TRUE(node.node().CanShutdownCleanly());
1015 }
1016 
TEST_F(PortsTest,ProxyCollapse1)1017 TEST_F(PortsTest, ProxyCollapse1) {
1018   TestNode node(0);
1019   AddNode(&node);
1020 
1021   PortRef A, B;
1022   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1023 
1024   PortRef X, Y;
1025   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1026 
1027   ScopedMessage message;
1028 
1029   // Send B and receive it as C.
1030   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1031   ASSERT_TRUE(node.ReadMessage(Y, &message));
1032   ASSERT_EQ(1u, message->num_ports());
1033   PortRef C;
1034   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1035 
1036   // Send C and receive it as D.
1037   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1038   ASSERT_TRUE(node.ReadMessage(Y, &message));
1039   ASSERT_EQ(1u, message->num_ports());
1040   PortRef D;
1041   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1042 
1043   // Send D and receive it as E.
1044   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1045   ASSERT_TRUE(node.ReadMessage(Y, &message));
1046   ASSERT_EQ(1u, message->num_ports());
1047   PortRef E;
1048   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1049 
1050   EXPECT_EQ(OK, node.node().ClosePort(X));
1051   EXPECT_EQ(OK, node.node().ClosePort(Y));
1052 
1053   EXPECT_EQ(OK, node.node().ClosePort(A));
1054   EXPECT_EQ(OK, node.node().ClosePort(E));
1055 
1056   // The node should not idle until all proxies are collapsed.
1057   WaitForIdle();
1058 
1059   EXPECT_TRUE(node.node().CanShutdownCleanly());
1060 }
1061 
TEST_F(PortsTest,ProxyCollapse2)1062 TEST_F(PortsTest, ProxyCollapse2) {
1063   TestNode node(0);
1064   AddNode(&node);
1065 
1066   PortRef A, B;
1067   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1068 
1069   PortRef X, Y;
1070   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1071 
1072   ScopedMessage message;
1073 
1074   // Send B and A to create proxies in each direction.
1075   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1076   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1077 
1078   EXPECT_EQ(OK, node.node().ClosePort(X));
1079   EXPECT_EQ(OK, node.node().ClosePort(Y));
1080 
1081   // At this point we have a scenario with:
1082   //
1083   // D -> [B] -> C -> [A]
1084   //
1085   // Ensure that the proxies can collapse. The sent ports will be closed
1086   // eventually as a result of Y's closure.
1087 
1088   WaitForIdle();
1089 
1090   EXPECT_TRUE(node.node().CanShutdownCleanly());
1091 }
1092 
TEST_F(PortsTest,SendWithClosedPeer)1093 TEST_F(PortsTest, SendWithClosedPeer) {
1094   // This tests that if a port is sent when its peer is already known to be
1095   // closed, the newly created port will be aware of that peer closure, and the
1096   // proxy will eventually collapse.
1097 
1098   TestNode node(0);
1099   AddNode(&node);
1100 
1101   // Send a message from A to B, then close A.
1102   PortRef A, B;
1103   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1104   EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1105   EXPECT_EQ(OK, node.node().ClosePort(A));
1106 
1107   // Now send B over X-Y as new port C.
1108   PortRef X, Y;
1109   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1110   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1111   ScopedMessage message;
1112   ASSERT_TRUE(node.ReadMessage(Y, &message));
1113   ASSERT_EQ(1u, message->num_ports());
1114   PortRef C;
1115   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1116 
1117   EXPECT_EQ(OK, node.node().ClosePort(X));
1118   EXPECT_EQ(OK, node.node().ClosePort(Y));
1119 
1120   WaitForIdle();
1121 
1122   // C should have received the message originally sent to B, and it should also
1123   // be aware of A's closure.
1124 
1125   ASSERT_TRUE(node.ReadMessage(C, &message));
1126   EXPECT_TRUE(MessageEquals(message, "hey"));
1127 
1128   PortStatus status{};
1129   EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1130   EXPECT_FALSE(status.receiving_messages);
1131   EXPECT_FALSE(status.has_messages);
1132   EXPECT_TRUE(status.peer_closed);
1133 
1134   node.node().ClosePort(C);
1135 
1136   WaitForIdle();
1137 
1138   EXPECT_TRUE(node.node().CanShutdownCleanly());
1139 }
1140 
TEST_F(PortsTest,SendWithClosedPeerSent)1141 TEST_F(PortsTest, SendWithClosedPeerSent) {
1142   // This tests that if a port is closed while some number of proxies are still
1143   // routing messages (directly or indirectly) to it, that the peer port is
1144   // eventually notified of the closure, and the dead-end proxies will
1145   // eventually be removed.
1146 
1147   TestNode node(0);
1148   AddNode(&node);
1149 
1150   PortRef X, Y;
1151   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1152 
1153   PortRef A, B;
1154   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1155 
1156   ScopedMessage message;
1157 
1158   // Send A as new port C.
1159   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1160 
1161   ASSERT_TRUE(node.ReadMessage(Y, &message));
1162   ASSERT_EQ(1u, message->num_ports());
1163   PortRef C;
1164   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1165 
1166   // Send C as new port D.
1167   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1168 
1169   ASSERT_TRUE(node.ReadMessage(Y, &message));
1170   ASSERT_EQ(1u, message->num_ports());
1171   PortRef D;
1172   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1173 
1174   // Send a message to B through D, then close D.
1175   EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1176   EXPECT_EQ(OK, node.node().ClosePort(D));
1177 
1178   // Now send B as new port E.
1179 
1180   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1181   EXPECT_EQ(OK, node.node().ClosePort(X));
1182 
1183   ASSERT_TRUE(node.ReadMessage(Y, &message));
1184   ASSERT_EQ(1u, message->num_ports());
1185   PortRef E;
1186   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1187 
1188   EXPECT_EQ(OK, node.node().ClosePort(Y));
1189 
1190   WaitForIdle();
1191 
1192   // E should receive the message originally sent to B, and it should also be
1193   // aware of D's closure.
1194 
1195   ASSERT_TRUE(node.ReadMessage(E, &message));
1196   EXPECT_TRUE(MessageEquals(message, "hey"));
1197 
1198   PortStatus status{};
1199   EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1200   EXPECT_FALSE(status.receiving_messages);
1201   EXPECT_FALSE(status.has_messages);
1202   EXPECT_TRUE(status.peer_closed);
1203 
1204   EXPECT_EQ(OK, node.node().ClosePort(E));
1205 
1206   WaitForIdle();
1207 
1208   EXPECT_TRUE(node.node().CanShutdownCleanly());
1209 }
1210 
TEST_F(PortsTest,MergePorts)1211 TEST_F(PortsTest, MergePorts) {
1212   TestNode node0(0);
1213   AddNode(&node0);
1214 
1215   TestNode node1(1);
1216   AddNode(&node1);
1217 
1218   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1219   PortRef A, B, C, D;
1220   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1221   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1222 
1223   // Write a message on A.
1224   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1225 
1226   // Initiate a merge between B and C.
1227   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1228 
1229   WaitForIdle();
1230 
1231   // Expect all proxies to be gone once idle.
1232   EXPECT_TRUE(
1233       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1234   EXPECT_TRUE(
1235       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1236 
1237   // Expect D to have received the message sent on A.
1238   ScopedMessage message;
1239   ASSERT_TRUE(node1.ReadMessage(D, &message));
1240   EXPECT_TRUE(MessageEquals(message, "hey"));
1241 
1242   EXPECT_EQ(OK, node0.node().ClosePort(A));
1243   EXPECT_EQ(OK, node1.node().ClosePort(D));
1244 
1245   // No more ports should be open.
1246   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1247   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1248 }
1249 
TEST_F(PortsTest,MergePortWithClosedPeer1)1250 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1251   // This tests that the right thing happens when initiating a merge on a port
1252   // whose peer has already been closed.
1253 
1254   TestNode node0(0);
1255   AddNode(&node0);
1256 
1257   TestNode node1(1);
1258   AddNode(&node1);
1259 
1260   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1261   PortRef A, B, C, D;
1262   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1263   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1264 
1265   // Write a message on A.
1266   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1267 
1268   // Close A.
1269   EXPECT_EQ(OK, node0.node().ClosePort(A));
1270 
1271   // Initiate a merge between B and C.
1272   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1273 
1274   WaitForIdle();
1275 
1276   // Expect all proxies to be gone once idle. node0 should have no ports since
1277   // A was explicitly closed.
1278   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1279   EXPECT_TRUE(
1280       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1281 
1282   // Expect D to have received the message sent on A.
1283   ScopedMessage message;
1284   ASSERT_TRUE(node1.ReadMessage(D, &message));
1285   EXPECT_TRUE(MessageEquals(message, "hey"));
1286 
1287   EXPECT_EQ(OK, node1.node().ClosePort(D));
1288 
1289   // No more ports should be open.
1290   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1291   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1292 }
1293 
TEST_F(PortsTest,MergePortWithClosedPeer2)1294 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1295   // This tests that the right thing happens when merging into a port whose peer
1296   // has already been closed.
1297 
1298   TestNode node0(0);
1299   AddNode(&node0);
1300 
1301   TestNode node1(1);
1302   AddNode(&node1);
1303 
1304   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1305   PortRef A, B, C, D;
1306   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1307   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1308 
1309   // Write a message on D and close it.
1310   EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1311   EXPECT_EQ(OK, node1.node().ClosePort(D));
1312 
1313   // Initiate a merge between B and C.
1314   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1315 
1316   WaitForIdle();
1317 
1318   // Expect all proxies to be gone once idle. node1 should have no ports since
1319   // D was explicitly closed.
1320   EXPECT_TRUE(
1321       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1322   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1323 
1324   // Expect A to have received the message sent on D.
1325   ScopedMessage message;
1326   ASSERT_TRUE(node0.ReadMessage(A, &message));
1327   EXPECT_TRUE(MessageEquals(message, "hey"));
1328 
1329   EXPECT_EQ(OK, node0.node().ClosePort(A));
1330 
1331   // No more ports should be open.
1332   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1333   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1334 }
1335 
TEST_F(PortsTest,MergePortsWithClosedPeers)1336 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1337   // This tests that no residual ports are left behind if two ports are merged
1338   // when both of their peers have been closed.
1339 
1340   TestNode node0(0);
1341   AddNode(&node0);
1342 
1343   TestNode node1(1);
1344   AddNode(&node1);
1345 
1346   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1347   PortRef A, B, C, D;
1348   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1349   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1350 
1351   // Close A and D.
1352   EXPECT_EQ(OK, node0.node().ClosePort(A));
1353   EXPECT_EQ(OK, node1.node().ClosePort(D));
1354 
1355   WaitForIdle();
1356 
1357   // Initiate a merge between B and C.
1358   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1359 
1360   WaitForIdle();
1361 
1362   // Expect everything to have gone away.
1363   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1364   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1365 }
1366 
TEST_F(PortsTest,MergePortsWithMovedPeers)1367 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1368   // This tests that ports can be merged successfully even if their peers are
1369   // moved around.
1370 
1371   TestNode node0(0);
1372   AddNode(&node0);
1373 
1374   TestNode node1(1);
1375   AddNode(&node1);
1376 
1377   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1378   PortRef A, B, C, D;
1379   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1380   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1381 
1382   // Set up another pair X-Y for moving ports on node0.
1383   PortRef X, Y;
1384   EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1385 
1386   ScopedMessage message;
1387 
1388   // Move A to new port E.
1389   EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1390   ASSERT_TRUE(node0.ReadMessage(Y, &message));
1391   ASSERT_EQ(1u, message->num_ports());
1392   PortRef E;
1393   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1394 
1395   EXPECT_EQ(OK, node0.node().ClosePort(X));
1396   EXPECT_EQ(OK, node0.node().ClosePort(Y));
1397 
1398   // Write messages on E and D.
1399   EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1400   EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1401 
1402   // Initiate a merge between B and C.
1403   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1404 
1405   WaitForIdle();
1406 
1407   // Expect to receive D's message on E and E's message on D.
1408   ASSERT_TRUE(node0.ReadMessage(E, &message));
1409   EXPECT_TRUE(MessageEquals(message, "hi"));
1410   ASSERT_TRUE(node1.ReadMessage(D, &message));
1411   EXPECT_TRUE(MessageEquals(message, "hey"));
1412 
1413   // Close E and D.
1414   EXPECT_EQ(OK, node0.node().ClosePort(E));
1415   EXPECT_EQ(OK, node1.node().ClosePort(D));
1416 
1417   WaitForIdle();
1418 
1419   // Expect everything to have gone away.
1420   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1421   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1422 }
1423 
TEST_F(PortsTest,MergePortsFailsGracefully)1424 TEST_F(PortsTest, MergePortsFailsGracefully) {
1425   // This tests that the system remains in a well-defined state if something
1426   // goes wrong during port merge.
1427 
1428   TestNode node0(0);
1429   AddNode(&node0);
1430 
1431   TestNode node1(1);
1432   AddNode(&node1);
1433 
1434   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1435   PortRef A, B, C, D;
1436   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1437   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1438 
1439   ScopedMessage message;
1440   PortRef X, Y;
1441   EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1442   EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1443   EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1444   EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1445 
1446   // Block the merge from proceeding until we can do something stupid with port
1447   // C. This avoids the test logic racing with async merge logic.
1448   node1.BlockOnEvent(Event::Type::kMergePort);
1449 
1450   // Initiate the merge between B and C.
1451   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1452 
1453   // Move C to a new port E. This is not a sane use of Node's public API but
1454   // is still hypothetically possible. It allows us to force a merge failure
1455   // because C will be in an invalid state by the time the merge is processed.
1456   // As a result, B should be closed.
1457   EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1458 
1459   node1.Unblock();
1460 
1461   WaitForIdle();
1462 
1463   ASSERT_TRUE(node0.ReadMessage(X, &message));
1464   ASSERT_EQ(1u, message->num_ports());
1465   PortRef E;
1466   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1467 
1468   EXPECT_EQ(OK, node0.node().ClosePort(X));
1469   EXPECT_EQ(OK, node1.node().ClosePort(Y));
1470 
1471   WaitForIdle();
1472 
1473   // C goes away as a result of normal proxy removal. B should have been closed
1474   // cleanly by the failed MergePorts.
1475   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1476   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1477 
1478   // Close A, D, and E.
1479   EXPECT_EQ(OK, node0.node().ClosePort(A));
1480   EXPECT_EQ(OK, node1.node().ClosePort(D));
1481   EXPECT_EQ(OK, node0.node().ClosePort(E));
1482 
1483   WaitForIdle();
1484 
1485   // Expect everything to have gone away.
1486   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1487   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1488 }
1489 
TEST_F(PortsTest,RemotePeerStatus)1490 TEST_F(PortsTest, RemotePeerStatus) {
1491   TestNode node0(0);
1492   AddNode(&node0);
1493 
1494   TestNode node1(1);
1495   AddNode(&node1);
1496 
1497   // Create a local port pair. Neither port should appear to have a remote peer.
1498   PortRef a, b;
1499   PortStatus status{};
1500   node0.node().CreatePortPair(&a, &b);
1501   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1502   EXPECT_FALSE(status.peer_remote);
1503   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1504   EXPECT_FALSE(status.peer_remote);
1505 
1506   // Create a port pair spanning the two nodes. Both spanning ports should
1507   // immediately appear to have a remote peer.
1508   PortRef x0, x1;
1509   CreatePortPair(&node0, &x0, &node1, &x1);
1510 
1511   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1512   EXPECT_TRUE(status.peer_remote);
1513   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1514   EXPECT_TRUE(status.peer_remote);
1515 
1516   PortRef x2, x3;
1517   CreatePortPair(&node0, &x2, &node1, &x3);
1518 
1519   // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1520   // remote and the remote peers local.
1521   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1522   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1523   WaitForIdle();
1524 
1525   ScopedMessage message;
1526   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1527   ASSERT_EQ(1u, message->num_ports());
1528   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1529 
1530   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1531   ASSERT_EQ(1u, message->num_ports());
1532   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1533 
1534   // Now x0-x1 should be local to node0 and a-b should span the nodes.
1535   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1536   EXPECT_FALSE(status.peer_remote);
1537   ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1538   EXPECT_FALSE(status.peer_remote);
1539   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1540   EXPECT_TRUE(status.peer_remote);
1541   ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1542   EXPECT_TRUE(status.peer_remote);
1543 
1544   // And swap them back one more time.
1545   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1546   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1547   WaitForIdle();
1548 
1549   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1550   ASSERT_EQ(1u, message->num_ports());
1551   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1552 
1553   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1554   ASSERT_EQ(1u, message->num_ports());
1555   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1556 
1557   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1558   EXPECT_TRUE(status.peer_remote);
1559   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1560   EXPECT_TRUE(status.peer_remote);
1561   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1562   EXPECT_FALSE(status.peer_remote);
1563   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1564   EXPECT_FALSE(status.peer_remote);
1565 
1566   EXPECT_EQ(OK, node0.node().ClosePort(x0));
1567   EXPECT_EQ(OK, node1.node().ClosePort(x1));
1568   EXPECT_EQ(OK, node0.node().ClosePort(x2));
1569   EXPECT_EQ(OK, node1.node().ClosePort(x3));
1570   EXPECT_EQ(OK, node0.node().ClosePort(a));
1571   EXPECT_EQ(OK, node0.node().ClosePort(b));
1572 
1573   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1574   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1575 }
1576 
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1577 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1578   TestNode node0(0);
1579   AddNode(&node0);
1580 
1581   TestNode node1(1);
1582   AddNode(&node1);
1583 
1584   // Set up a-b on node0 and c-d spanning node0-node1.
1585   PortRef a, b, c, d;
1586   node0.node().CreatePortPair(&a, &b);
1587   CreatePortPair(&node0, &c, &node1, &d);
1588 
1589   PortStatus status{};
1590   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1591   EXPECT_FALSE(status.peer_remote);
1592   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1593   EXPECT_FALSE(status.peer_remote);
1594   ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1595   EXPECT_TRUE(status.peer_remote);
1596   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1597   EXPECT_TRUE(status.peer_remote);
1598 
1599   EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1600   WaitForIdle();
1601 
1602   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1603   EXPECT_TRUE(status.peer_remote);
1604   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1605   EXPECT_TRUE(status.peer_remote);
1606 
1607   EXPECT_EQ(OK, node0.node().ClosePort(a));
1608   EXPECT_EQ(OK, node1.node().ClosePort(d));
1609   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1610   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1611 }
1612 
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1613 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1614   TestNode node0(0);
1615   AddNode(&node0);
1616 
1617   TestNode node1(1);
1618   AddNode(&node1);
1619 
1620   // Set up a-b on node0 and c-d on node1.
1621   PortRef a, b, c, d;
1622   node0.node().CreatePortPair(&a, &b);
1623   node1.node().CreatePortPair(&c, &d);
1624 
1625   PortStatus status{};
1626   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1627   EXPECT_FALSE(status.peer_remote);
1628   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1629   EXPECT_FALSE(status.peer_remote);
1630   ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1631   EXPECT_FALSE(status.peer_remote);
1632   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1633   EXPECT_FALSE(status.peer_remote);
1634 
1635   EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1636   WaitForIdle();
1637 
1638   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1639   EXPECT_TRUE(status.peer_remote);
1640   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1641   EXPECT_TRUE(status.peer_remote);
1642 
1643   EXPECT_EQ(OK, node0.node().ClosePort(a));
1644   EXPECT_EQ(OK, node1.node().ClosePort(d));
1645   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1646   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1647 }
1648 
TEST_F(PortsTest,RetransmitUserMessageEvents)1649 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1650   // Ensures that user message events can be retransmitted properly.
1651   TestNode node0(0);
1652   AddNode(&node0);
1653 
1654   PortRef a, b;
1655   node0.node().CreatePortPair(&a, &b);
1656 
1657   // Ping.
1658   const char* kMessage = "hey";
1659   ScopedMessage message;
1660   EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1661   ASSERT_TRUE(node0.ReadMessage(b, &message));
1662   EXPECT_TRUE(MessageEquals(message, kMessage));
1663 
1664   // Pong.
1665   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1666   EXPECT_FALSE(message);
1667   ASSERT_TRUE(node0.ReadMessage(a, &message));
1668   EXPECT_TRUE(MessageEquals(message, kMessage));
1669 
1670   // Ping again.
1671   EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1672   EXPECT_FALSE(message);
1673   ASSERT_TRUE(node0.ReadMessage(b, &message));
1674   EXPECT_TRUE(MessageEquals(message, kMessage));
1675 
1676   // Pong again!
1677   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1678   EXPECT_FALSE(message);
1679   ASSERT_TRUE(node0.ReadMessage(a, &message));
1680   EXPECT_TRUE(MessageEquals(message, kMessage));
1681 
1682   EXPECT_EQ(OK, node0.node().ClosePort(a));
1683   EXPECT_EQ(OK, node0.node().ClosePort(b));
1684 }
1685 
TEST_F(PortsTest,SetAcknowledgeRequestInterval)1686 TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
1687   TestNode node0(0);
1688   AddNode(&node0);
1689 
1690   PortRef a0, a1;
1691   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
1692   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1693 
1694   // Send a batch of messages.
1695   EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
1696   EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1697   EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1698   WaitForIdle();
1699   EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1700 
1701   // Set to acknowledge every read message, and validate that already-read
1702   // messages are acknowledged.
1703   EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
1704   WaitForIdle();
1705   EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
1706 
1707   // Read a third of the messages from the other end.
1708   EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1709   WaitForIdle();
1710 
1711   EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
1712 
1713   TestNode node1(1);
1714   AddNode(&node1);
1715 
1716   // Transfer a1 across to node1.
1717   PortRef x0, x1;
1718   CreatePortPair(&node0, &x0, &node1, &x1);
1719   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
1720   WaitForIdle();
1721 
1722   ScopedMessage message;
1723   ASSERT_TRUE(node1.ReadMessage(x1, &message));
1724   ASSERT_EQ(1u, message->num_ports());
1725   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
1726 
1727   // Read the last third of the messages from the transferred node, and
1728   // validate that the unacknowledge message count updates correctly.
1729   EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
1730   WaitForIdle();
1731   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1732 
1733   // Turn the acknowledges down and validate that they don't go on indefinitely.
1734   EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
1735   EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
1736   WaitForIdle();
1737   EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
1738   WaitForIdle();
1739   EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
1740 
1741   // Close the far port and validate that the closure updates the unacknowledged
1742   // count.
1743   EXPECT_EQ(OK, node1.node().ClosePort(a1));
1744   WaitForIdle();
1745   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1746 
1747   EXPECT_EQ(OK, node0.node().ClosePort(a0));
1748   EXPECT_EQ(OK, node0.node().ClosePort(x0));
1749   EXPECT_EQ(OK, node1.node().ClosePort(x1));
1750 }
1751 
1752 }  // namespace test
1753 }  // namespace ports
1754 }  // namespace core
1755 }  // namespace mojo
1756