1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 
10 #include <map>
11 #include <sstream>
12 #include <utility>
13 
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/containers/queue.h"
17 #include "base/logging.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_piece.h"
20 #include "base/strings/stringprintf.h"
21 #include "base/synchronization/lock.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/test/task_environment.h"
24 #include "base/threading/thread.h"
25 #include "mojo/core/ports/event.h"
26 #include "mojo/core/ports/node.h"
27 #include "mojo/core/ports/node_delegate.h"
28 #include "mojo/core/ports/user_message.h"
29 #include "testing/gtest/include/gtest/gtest.h"
30 
31 namespace mojo {
32 namespace core {
33 namespace ports {
34 namespace test {
35 
36 namespace {
37 
38 // TODO(rockot): Remove this unnecessary alias.
39 using ScopedMessage = std::unique_ptr<UserMessageEvent>;
40 
41 class TestMessage : public UserMessage {
42  public:
43   static const TypeInfo kUserMessageTypeInfo;
44 
TestMessage(const base::StringPiece & payload)45   TestMessage(const base::StringPiece& payload)
46       : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
~TestMessage()47   ~TestMessage() override {}
48 
payload() const49   const std::string& payload() const { return payload_; }
50 
51  private:
52   std::string payload_;
53 };
54 
55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
56 
NewUserMessageEvent(const base::StringPiece & payload,size_t num_ports)57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload,
58                                   size_t num_ports) {
59   auto event = std::make_unique<UserMessageEvent>(num_ports);
60   event->AttachMessage(std::make_unique<TestMessage>(payload));
61   return event;
62 }
63 
MessageEquals(const ScopedMessage & message,const base::StringPiece & s)64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
65   return message->GetMessage<TestMessage>()->payload() == s;
66 }
67 
68 class TestNode;
69 
70 class MessageRouter {
71  public:
~MessageRouter()72   virtual ~MessageRouter() {}
73 
74   virtual void ForwardEvent(TestNode* from_node,
75                             const NodeName& node_name,
76                             ScopedEvent event) = 0;
77   virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
78 };
79 
80 class TestNode : public NodeDelegate {
81  public:
TestNode(uint64_t id)82   explicit TestNode(uint64_t id)
83       : node_name_(id, 1),
84         node_(node_name_, this),
85         node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
86         events_available_event_(
87             base::WaitableEvent::ResetPolicy::AUTOMATIC,
88             base::WaitableEvent::InitialState::NOT_SIGNALED),
89         idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
90                     base::WaitableEvent::InitialState::SIGNALED) {}
91 
~TestNode()92   ~TestNode() override {
93     StopWhenIdle();
94     node_thread_.Stop();
95   }
96 
name() const97   const NodeName& name() const { return node_name_; }
98 
99   // NOTE: Node is thread-safe.
node()100   Node& node() { return node_; }
101 
idle_event()102   base::WaitableEvent& idle_event() { return idle_event_; }
103 
IsIdle()104   bool IsIdle() {
105     base::AutoLock lock(lock_);
106     return started_ && !dispatching_ &&
107            (incoming_events_.empty() || (block_on_event_ && blocked_));
108   }
109 
BlockOnEvent(Event::Type type)110   void BlockOnEvent(Event::Type type) {
111     base::AutoLock lock(lock_);
112     blocked_event_type_ = type;
113     block_on_event_ = true;
114   }
115 
Unblock()116   void Unblock() {
117     base::AutoLock lock(lock_);
118     block_on_event_ = false;
119     events_available_event_.Signal();
120   }
121 
Start(MessageRouter * router)122   void Start(MessageRouter* router) {
123     router_ = router;
124     node_thread_.Start();
125     node_thread_.task_runner()->PostTask(
126         FROM_HERE,
127         base::BindOnce(&TestNode::ProcessEvents, base::Unretained(this)));
128   }
129 
StopWhenIdle()130   void StopWhenIdle() {
131     base::AutoLock lock(lock_);
132     should_quit_ = true;
133     events_available_event_.Signal();
134   }
135 
WakeUp()136   void WakeUp() { events_available_event_.Signal(); }
137 
SendStringMessage(const PortRef & port,const std::string & s)138   int SendStringMessage(const PortRef& port, const std::string& s) {
139     return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
140   }
141 
SendMultipleMessages(const PortRef & port,size_t num_messages)142   int SendMultipleMessages(const PortRef& port, size_t num_messages) {
143     for (size_t i = 0; i < num_messages; ++i) {
144       int result = SendStringMessage(port, "");
145       if (result != OK)
146         return result;
147     }
148     return OK;
149   }
150 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)151   int SendStringMessageWithPort(const PortRef& port,
152                                 const std::string& s,
153                                 const PortName& sent_port_name) {
154     auto event = NewUserMessageEvent(s, 1);
155     event->ports()[0] = sent_port_name;
156     return node_.SendUserMessage(port, std::move(event));
157   }
158 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)159   int SendStringMessageWithPort(const PortRef& port,
160                                 const std::string& s,
161                                 const PortRef& sent_port) {
162     return SendStringMessageWithPort(port, s, sent_port.name());
163   }
164 
set_drop_messages(bool value)165   void set_drop_messages(bool value) {
166     base::AutoLock lock(lock_);
167     drop_messages_ = value;
168   }
169 
set_save_messages(bool value)170   void set_save_messages(bool value) {
171     base::AutoLock lock(lock_);
172     save_messages_ = value;
173   }
174 
ReadMessage(const PortRef & port,ScopedMessage * message)175   bool ReadMessage(const PortRef& port, ScopedMessage* message) {
176     return node_.GetMessage(port, message, nullptr) == OK && *message;
177   }
178 
ReadMultipleMessages(const PortRef & port,size_t num_messages)179   bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
180     for (size_t i = 0; i < num_messages; ++i) {
181       ScopedMessage message;
182       if (!ReadMessage(port, &message))
183         return false;
184     }
185     return true;
186   }
187 
GetSavedMessage(ScopedMessage * message)188   bool GetSavedMessage(ScopedMessage* message) {
189     base::AutoLock lock(lock_);
190     if (saved_messages_.empty()) {
191       message->reset();
192       return false;
193     }
194     std::swap(*message, saved_messages_.front());
195     saved_messages_.pop();
196     return true;
197   }
198 
EnqueueEvent(ScopedEvent event)199   void EnqueueEvent(ScopedEvent event) {
200     idle_event_.Reset();
201 
202     // NOTE: This may be called from ForwardMessage and thus must not reenter
203     // |node_|.
204     base::AutoLock lock(lock_);
205     incoming_events_.emplace(std::move(event));
206     events_available_event_.Signal();
207   }
208 
ForwardEvent(const NodeName & node_name,ScopedEvent event)209   void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
210     {
211       base::AutoLock lock(lock_);
212       if (drop_messages_) {
213         DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
214                  << node_name;
215 
216         base::AutoUnlock unlock(lock_);
217         ClosePortsInEvent(event.get());
218         return;
219       }
220     }
221 
222     DCHECK(router_);
223     DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
224     router_->ForwardEvent(this, node_name, std::move(event));
225   }
226 
BroadcastEvent(ScopedEvent event)227   void BroadcastEvent(ScopedEvent event) override {
228     router_->BroadcastEvent(this, std::move(event));
229   }
230 
PortStatusChanged(const PortRef & port)231   void PortStatusChanged(const PortRef& port) override {
232     // The port may be closed, in which case we ignore the notification.
233     base::AutoLock lock(lock_);
234     if (!save_messages_)
235       return;
236 
237     for (;;) {
238       ScopedMessage message;
239       {
240         base::AutoUnlock unlock(lock_);
241         if (!ReadMessage(port, &message))
242           break;
243       }
244 
245       saved_messages_.emplace(std::move(message));
246     }
247   }
248 
ClosePortsInEvent(Event * event)249   void ClosePortsInEvent(Event* event) {
250     if (event->type() != Event::Type::kUserMessage)
251       return;
252 
253     UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
254     for (size_t i = 0; i < message_event->num_ports(); ++i) {
255       PortRef port;
256       ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
257       EXPECT_EQ(OK, node_.ClosePort(port));
258     }
259   }
260 
GetUnacknowledgedMessageCount(const PortRef & port_ref)261   uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
262     PortStatus status;
263     if (node_.GetStatus(port_ref, &status) != OK)
264       return 0;
265 
266     return status.unacknowledged_message_count;
267   }
268 
269  private:
ProcessEvents()270   void ProcessEvents() {
271     for (;;) {
272       events_available_event_.Wait();
273       base::AutoLock lock(lock_);
274 
275       if (should_quit_)
276         return;
277 
278       dispatching_ = true;
279       while (!incoming_events_.empty()) {
280         if (block_on_event_ &&
281             incoming_events_.front()->type() == blocked_event_type_) {
282           blocked_ = true;
283           // Go idle if we hit a blocked event type.
284           break;
285         } else {
286           blocked_ = false;
287         }
288         ScopedEvent event = std::move(incoming_events_.front());
289         incoming_events_.pop();
290 
291         // NOTE: AcceptMessage() can re-enter this object to call any of the
292         // NodeDelegate interface methods.
293         base::AutoUnlock unlock(lock_);
294         node_.AcceptEvent(std::move(event));
295       }
296 
297       dispatching_ = false;
298       started_ = true;
299       idle_event_.Signal();
300     };
301   }
302 
303   const NodeName node_name_;
304   Node node_;
305   MessageRouter* router_ = nullptr;
306 
307   base::Thread node_thread_;
308   base::WaitableEvent events_available_event_;
309   base::WaitableEvent idle_event_;
310 
311   // Guards fields below.
312   base::Lock lock_;
313   bool started_ = false;
314   bool dispatching_ = false;
315   bool should_quit_ = false;
316   bool drop_messages_ = false;
317   bool save_messages_ = false;
318   bool blocked_ = false;
319   bool block_on_event_ = false;
320   Event::Type blocked_event_type_;
321   base::queue<ScopedEvent> incoming_events_;
322   base::queue<ScopedMessage> saved_messages_;
323 };
324 
325 class PortsTest : public testing::Test, public MessageRouter {
326  public:
AddNode(TestNode * node)327   void AddNode(TestNode* node) {
328     {
329       base::AutoLock lock(lock_);
330       nodes_[node->name()] = node;
331     }
332     node->Start(this);
333   }
334 
RemoveNode(TestNode * node)335   void RemoveNode(TestNode* node) {
336     {
337       base::AutoLock lock(lock_);
338       nodes_.erase(node->name());
339     }
340 
341     for (const auto& entry : nodes_)
342       entry.second->node().LostConnectionToNode(node->name());
343   }
344 
345   // Waits until all known Nodes are idle. Message forwarding and processing
346   // is handled in such a way that idleness is a stable state: once all nodes in
347   // the system are idle, they will remain idle until the test explicitly
348   // initiates some further event (e.g. sending a message, closing a port, or
349   // removing a Node).
WaitForIdle()350   void WaitForIdle() {
351     for (;;) {
352       base::AutoLock global_lock(global_lock_);
353       bool all_nodes_idle = true;
354       for (const auto& entry : nodes_) {
355         if (!entry.second->IsIdle())
356           all_nodes_idle = false;
357         entry.second->WakeUp();
358       }
359       if (all_nodes_idle)
360         return;
361 
362       // Wait for any Node to signal that it's idle.
363       base::AutoUnlock global_unlock(global_lock_);
364       std::vector<base::WaitableEvent*> events;
365       for (const auto& entry : nodes_)
366         events.push_back(&entry.second->idle_event());
367       base::WaitableEvent::WaitMany(events.data(), events.size());
368     }
369   }
370 
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)371   void CreatePortPair(TestNode* node0,
372                       PortRef* port0,
373                       TestNode* node1,
374                       PortRef* port1) {
375     if (node0 == node1) {
376       EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
377     } else {
378       EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
379       EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
380       EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
381                                                  port1->name()));
382       EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
383                                                  port0->name()));
384     }
385   }
386 
387  private:
388   // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)389   void ForwardEvent(TestNode* from_node,
390                     const NodeName& node_name,
391                     ScopedEvent event) override {
392     base::AutoLock global_lock(global_lock_);
393     base::AutoLock lock(lock_);
394     // Drop messages from nodes that have been removed.
395     if (nodes_.find(from_node->name()) == nodes_.end()) {
396       from_node->ClosePortsInEvent(event.get());
397       return;
398     }
399 
400     auto it = nodes_.find(node_name);
401     if (it == nodes_.end()) {
402       DVLOG(1) << "Node not found: " << node_name;
403       return;
404     }
405 
406     // Serialize and de-serialize all forwarded events.
407     size_t buf_size = event->GetSerializedSize();
408     std::unique_ptr<char[]> buf(new char[buf_size]);
409     event->Serialize(buf.get());
410     ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
411     // This should always succeed unless serialization or deserialization
412     // is broken. In that case, the loss of events should cause a test failure.
413     ASSERT_TRUE(copy);
414 
415     // Also copy the payload for user messages.
416     if (event->type() == Event::Type::kUserMessage) {
417       UserMessageEvent* message_event =
418           static_cast<UserMessageEvent*>(event.get());
419       UserMessageEvent* message_copy =
420           static_cast<UserMessageEvent*>(copy.get());
421 
422       message_copy->AttachMessage(std::make_unique<TestMessage>(
423           message_event->GetMessage<TestMessage>()->payload()));
424     }
425 
426     it->second->EnqueueEvent(std::move(event));
427   }
428 
BroadcastEvent(TestNode * from_node,ScopedEvent event)429   void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
430     base::AutoLock global_lock(global_lock_);
431     base::AutoLock lock(lock_);
432 
433     // Drop messages from nodes that have been removed.
434     if (nodes_.find(from_node->name()) == nodes_.end())
435       return;
436 
437     for (const auto& entry : nodes_) {
438       TestNode* node = entry.second;
439       // Broadcast doesn't deliver to the local node.
440       if (node == from_node)
441         continue;
442       node->EnqueueEvent(event->Clone());
443     }
444   }
445 
446   base::test::TaskEnvironment task_environment_;
447 
448   // Acquired before any operation which makes a Node busy, and before testing
449   // if all nodes are idle.
450   base::Lock global_lock_;
451 
452   base::Lock lock_;
453   std::map<NodeName, TestNode*> nodes_;
454 };
455 
456 }  // namespace
457 
TEST_F(PortsTest,Basic1)458 TEST_F(PortsTest, Basic1) {
459   TestNode node0(0);
460   AddNode(&node0);
461 
462   TestNode node1(1);
463   AddNode(&node1);
464 
465   PortRef x0, x1;
466   CreatePortPair(&node0, &x0, &node1, &x1);
467 
468   PortRef a0, a1;
469   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
470   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
471   EXPECT_EQ(OK, node0.node().ClosePort(a0));
472 
473   EXPECT_EQ(OK, node0.node().ClosePort(x0));
474   EXPECT_EQ(OK, node1.node().ClosePort(x1));
475 
476   WaitForIdle();
477 
478   EXPECT_TRUE(node0.node().CanShutdownCleanly());
479   EXPECT_TRUE(node1.node().CanShutdownCleanly());
480 }
481 
TEST_F(PortsTest,Basic2)482 TEST_F(PortsTest, Basic2) {
483   TestNode node0(0);
484   AddNode(&node0);
485 
486   TestNode node1(1);
487   AddNode(&node1);
488 
489   PortRef x0, x1;
490   CreatePortPair(&node0, &x0, &node1, &x1);
491 
492   PortRef b0, b1;
493   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
494   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
495   EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
496 
497   EXPECT_EQ(OK, node0.node().ClosePort(b0));
498 
499   EXPECT_EQ(OK, node0.node().ClosePort(x0));
500   EXPECT_EQ(OK, node1.node().ClosePort(x1));
501 
502   WaitForIdle();
503 
504   EXPECT_TRUE(node0.node().CanShutdownCleanly());
505   EXPECT_TRUE(node1.node().CanShutdownCleanly());
506 }
507 
TEST_F(PortsTest,Basic3)508 TEST_F(PortsTest, Basic3) {
509   TestNode node0(0);
510   AddNode(&node0);
511 
512   TestNode node1(1);
513   AddNode(&node1);
514 
515   PortRef x0, x1;
516   CreatePortPair(&node0, &x0, &node1, &x1);
517 
518   PortRef a0, a1;
519   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
520 
521   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
522   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
523 
524   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
525 
526   PortRef b0, b1;
527   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
528   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
529   EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
530 
531   EXPECT_EQ(OK, node0.node().ClosePort(b0));
532 
533   EXPECT_EQ(OK, node0.node().ClosePort(x0));
534   EXPECT_EQ(OK, node1.node().ClosePort(x1));
535 
536   WaitForIdle();
537 
538   EXPECT_TRUE(node0.node().CanShutdownCleanly());
539   EXPECT_TRUE(node1.node().CanShutdownCleanly());
540 }
541 
TEST_F(PortsTest,LostConnectionToNode1)542 TEST_F(PortsTest, LostConnectionToNode1) {
543   TestNode node0(0);
544   AddNode(&node0);
545 
546   TestNode node1(1);
547   AddNode(&node1);
548   node1.set_drop_messages(true);
549 
550   PortRef x0, x1;
551   CreatePortPair(&node0, &x0, &node1, &x1);
552 
553   // Transfer a port to node1 and simulate a lost connection to node1.
554 
555   PortRef a0, a1;
556   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
557   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
558 
559   WaitForIdle();
560 
561   RemoveNode(&node1);
562 
563   WaitForIdle();
564 
565   EXPECT_EQ(OK, node0.node().ClosePort(a0));
566   EXPECT_EQ(OK, node0.node().ClosePort(x0));
567   EXPECT_EQ(OK, node1.node().ClosePort(x1));
568 
569   WaitForIdle();
570 
571   EXPECT_TRUE(node0.node().CanShutdownCleanly());
572   EXPECT_TRUE(node1.node().CanShutdownCleanly());
573 }
574 
TEST_F(PortsTest,LostConnectionToNode2)575 TEST_F(PortsTest, LostConnectionToNode2) {
576   TestNode node0(0);
577   AddNode(&node0);
578 
579   TestNode node1(1);
580   AddNode(&node1);
581 
582   PortRef x0, x1;
583   CreatePortPair(&node0, &x0, &node1, &x1);
584 
585   PortRef a0, a1;
586   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
587   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
588 
589   WaitForIdle();
590 
591   node1.set_drop_messages(true);
592 
593   RemoveNode(&node1);
594 
595   WaitForIdle();
596 
597   // a0 should have eventually detected peer closure after node loss.
598   ScopedMessage message;
599   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
600             node0.node().GetMessage(a0, &message, nullptr));
601   EXPECT_FALSE(message);
602 
603   EXPECT_EQ(OK, node0.node().ClosePort(a0));
604 
605   EXPECT_EQ(OK, node0.node().ClosePort(x0));
606 
607   EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
608   EXPECT_TRUE(message);
609   node1.ClosePortsInEvent(message.get());
610 
611   EXPECT_EQ(OK, node1.node().ClosePort(x1));
612 
613   WaitForIdle();
614 
615   EXPECT_TRUE(node0.node().CanShutdownCleanly());
616   EXPECT_TRUE(node1.node().CanShutdownCleanly());
617 }
618 
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)619 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
620   // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
621   // node.
622 
623   TestNode node0(0);
624   AddNode(&node0);
625 
626   TestNode node1(1);
627   AddNode(&node1);
628 
629   TestNode node2(2);
630   AddNode(&node2);
631 
632   // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
633   PortRef A, B, C, D;
634   CreatePortPair(&node0, &A, &node1, &B);
635   CreatePortPair(&node1, &C, &node2, &D);
636 
637   // Create E-F and send F over A to node 1.
638   PortRef E, F;
639   EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
640   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
641 
642   WaitForIdle();
643 
644   ScopedMessage message;
645   ASSERT_TRUE(node1.ReadMessage(B, &message));
646   ASSERT_EQ(1u, message->num_ports());
647 
648   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
649 
650   // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
651   // will trivially become aware of the loss, and this test verifies that the
652   // port A on node 0 will eventually also become aware of it.
653 
654   // Make sure node2 stops processing events when it encounters an ObserveProxy.
655   node2.BlockOnEvent(Event::Type::kObserveProxy);
656 
657   EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
658   WaitForIdle();
659 
660   // Simulate node 1 and 2 disconnecting.
661   EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
662 
663   // Let node2 continue processing events and wait for everyone to go idle.
664   node2.Unblock();
665   WaitForIdle();
666 
667   // Port F should be gone.
668   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
669 
670   // Port E should have detected peer closure despite the fact that there is
671   // no longer a continuous route from F to E over which the event could travel.
672   PortStatus status;
673   EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
674   EXPECT_TRUE(status.peer_closed);
675 
676   EXPECT_EQ(OK, node0.node().ClosePort(A));
677   EXPECT_EQ(OK, node1.node().ClosePort(B));
678   EXPECT_EQ(OK, node1.node().ClosePort(C));
679   EXPECT_EQ(OK, node0.node().ClosePort(E));
680 
681   WaitForIdle();
682 
683   EXPECT_TRUE(node0.node().CanShutdownCleanly());
684   EXPECT_TRUE(node1.node().CanShutdownCleanly());
685 }
686 
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)687 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
688   // Tests that a proxy gets cleaned up when its direct peer lives on a lost
689   // node and it's predecessor lives on the same node.
690 
691   TestNode node0(0);
692   AddNode(&node0);
693 
694   TestNode node1(1);
695   AddNode(&node1);
696 
697   PortRef A, B;
698   CreatePortPair(&node0, &A, &node1, &B);
699 
700   PortRef C, D;
701   EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
702 
703   // Send D but block node0 on an ObserveProxy event.
704   node0.BlockOnEvent(Event::Type::kObserveProxy);
705   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
706 
707   // node0 won't collapse the proxy but node1 will receive the message before
708   // going idle.
709   WaitForIdle();
710 
711   ScopedMessage message;
712   ASSERT_TRUE(node1.ReadMessage(B, &message));
713   ASSERT_EQ(1u, message->num_ports());
714   PortRef E;
715   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
716 
717   RemoveNode(&node1);
718 
719   node0.Unblock();
720   WaitForIdle();
721 
722   // Port C should have detected peer closure.
723   PortStatus status;
724   EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
725   EXPECT_TRUE(status.peer_closed);
726 
727   EXPECT_EQ(OK, node0.node().ClosePort(A));
728   EXPECT_EQ(OK, node1.node().ClosePort(B));
729   EXPECT_EQ(OK, node0.node().ClosePort(C));
730   EXPECT_EQ(OK, node1.node().ClosePort(E));
731 
732   EXPECT_TRUE(node0.node().CanShutdownCleanly());
733   EXPECT_TRUE(node1.node().CanShutdownCleanly());
734 }
735 
TEST_F(PortsTest,GetMessage1)736 TEST_F(PortsTest, GetMessage1) {
737   TestNode node(0);
738   AddNode(&node);
739 
740   PortRef a0, a1;
741   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
742 
743   ScopedMessage message;
744   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
745   EXPECT_FALSE(message);
746 
747   EXPECT_EQ(OK, node.node().ClosePort(a1));
748 
749   WaitForIdle();
750 
751   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
752             node.node().GetMessage(a0, &message, nullptr));
753   EXPECT_FALSE(message);
754 
755   EXPECT_EQ(OK, node.node().ClosePort(a0));
756 
757   WaitForIdle();
758 
759   EXPECT_TRUE(node.node().CanShutdownCleanly());
760 }
761 
TEST_F(PortsTest,GetMessage2)762 TEST_F(PortsTest, GetMessage2) {
763   TestNode node(0);
764   AddNode(&node);
765 
766   PortRef a0, a1;
767   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
768 
769   EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
770 
771   ScopedMessage message;
772   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
773 
774   ASSERT_TRUE(message);
775   EXPECT_TRUE(MessageEquals(message, "1"));
776 
777   EXPECT_EQ(OK, node.node().ClosePort(a0));
778   EXPECT_EQ(OK, node.node().ClosePort(a1));
779 
780   EXPECT_TRUE(node.node().CanShutdownCleanly());
781 }
782 
TEST_F(PortsTest,GetMessage3)783 TEST_F(PortsTest, GetMessage3) {
784   TestNode node(0);
785   AddNode(&node);
786 
787   PortRef a0, a1;
788   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
789 
790   const char* kStrings[] = {"1", "2", "3"};
791 
792   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
793     EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
794 
795   ScopedMessage message;
796   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
797     EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
798     ASSERT_TRUE(message);
799     EXPECT_TRUE(MessageEquals(message, kStrings[i]));
800   }
801 
802   EXPECT_EQ(OK, node.node().ClosePort(a0));
803   EXPECT_EQ(OK, node.node().ClosePort(a1));
804 
805   EXPECT_TRUE(node.node().CanShutdownCleanly());
806 }
807 
TEST_F(PortsTest,Delegation1)808 TEST_F(PortsTest, Delegation1) {
809   TestNode node0(0);
810   AddNode(&node0);
811 
812   TestNode node1(1);
813   AddNode(&node1);
814 
815   PortRef x0, x1;
816   CreatePortPair(&node0, &x0, &node1, &x1);
817 
818   // In this test, we send a message to a port that has been moved.
819 
820   PortRef a0, a1;
821   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
822   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
823   WaitForIdle();
824 
825   ScopedMessage message;
826   ASSERT_TRUE(node1.ReadMessage(x1, &message));
827   ASSERT_EQ(1u, message->num_ports());
828   EXPECT_TRUE(MessageEquals(message, "a1"));
829 
830   // This is "a1" from the point of view of node1.
831   PortName a2_name = message->ports()[0];
832   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
833   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
834 
835   WaitForIdle();
836 
837   ASSERT_TRUE(node0.ReadMessage(x0, &message));
838   ASSERT_EQ(1u, message->num_ports());
839   EXPECT_TRUE(MessageEquals(message, "a2"));
840 
841   // This is "a2" from the point of view of node1.
842   PortName a3_name = message->ports()[0];
843 
844   PortRef a3;
845   EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
846 
847   ASSERT_TRUE(node0.ReadMessage(a3, &message));
848   EXPECT_EQ(0u, message->num_ports());
849   EXPECT_TRUE(MessageEquals(message, "hello"));
850 
851   EXPECT_EQ(OK, node0.node().ClosePort(a0));
852   EXPECT_EQ(OK, node0.node().ClosePort(a3));
853 
854   EXPECT_EQ(OK, node0.node().ClosePort(x0));
855   EXPECT_EQ(OK, node1.node().ClosePort(x1));
856 
857   EXPECT_TRUE(node0.node().CanShutdownCleanly());
858   EXPECT_TRUE(node1.node().CanShutdownCleanly());
859 }
860 
TEST_F(PortsTest,Delegation2)861 TEST_F(PortsTest, Delegation2) {
862   TestNode node0(0);
863   AddNode(&node0);
864 
865   TestNode node1(1);
866   AddNode(&node1);
867 
868   for (int i = 0; i < 100; ++i) {
869     // Setup pipe a<->b between node0 and node1.
870     PortRef A, B;
871     CreatePortPair(&node0, &A, &node1, &B);
872 
873     PortRef C, D;
874     EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
875 
876     PortRef E, F;
877     EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
878 
879     node1.set_save_messages(true);
880 
881     // Pass D over A to B.
882     EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
883 
884     // Pass F over C to D.
885     EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
886 
887     // This message should find its way to node1.
888     EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
889 
890     WaitForIdle();
891 
892     EXPECT_EQ(OK, node0.node().ClosePort(C));
893     EXPECT_EQ(OK, node0.node().ClosePort(E));
894 
895     EXPECT_EQ(OK, node0.node().ClosePort(A));
896     EXPECT_EQ(OK, node1.node().ClosePort(B));
897 
898     bool got_hello = false;
899     ScopedMessage message;
900     while (node1.GetSavedMessage(&message)) {
901       node1.ClosePortsInEvent(message.get());
902       if (MessageEquals(message, "hello")) {
903         got_hello = true;
904         break;
905       }
906     }
907 
908     EXPECT_TRUE(got_hello);
909 
910     WaitForIdle();  // Because closing ports may have generated tasks.
911   }
912 
913   EXPECT_TRUE(node0.node().CanShutdownCleanly());
914   EXPECT_TRUE(node1.node().CanShutdownCleanly());
915 }
916 
TEST_F(PortsTest,SendUninitialized)917 TEST_F(PortsTest, SendUninitialized) {
918   TestNode node(0);
919   AddNode(&node);
920 
921   PortRef x0;
922   EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
923   EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
924   EXPECT_EQ(OK, node.node().ClosePort(x0));
925   EXPECT_TRUE(node.node().CanShutdownCleanly());
926 }
927 
TEST_F(PortsTest,SendFailure)928 TEST_F(PortsTest, SendFailure) {
929   TestNode node(0);
930   AddNode(&node);
931 
932   node.set_save_messages(true);
933 
934   PortRef A, B;
935   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
936 
937   // Try to send A over itself.
938 
939   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
940             node.SendStringMessageWithPort(A, "oops", A));
941 
942   // Try to send B over A.
943 
944   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
945             node.SendStringMessageWithPort(A, "nope", B));
946 
947   // B should be closed immediately.
948   EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
949 
950   WaitForIdle();
951 
952   // There should have been no messages accepted.
953   ScopedMessage message;
954   EXPECT_FALSE(node.GetSavedMessage(&message));
955 
956   EXPECT_EQ(OK, node.node().ClosePort(A));
957 
958   WaitForIdle();
959 
960   EXPECT_TRUE(node.node().CanShutdownCleanly());
961 }
962 
TEST_F(PortsTest,DontLeakUnreceivedPorts)963 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
964   TestNode node(0);
965   AddNode(&node);
966 
967   PortRef A, B, C, D;
968   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
969   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
970 
971   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
972 
973   EXPECT_EQ(OK, node.node().ClosePort(C));
974   EXPECT_EQ(OK, node.node().ClosePort(A));
975   EXPECT_EQ(OK, node.node().ClosePort(B));
976 
977   WaitForIdle();
978 
979   EXPECT_TRUE(node.node().CanShutdownCleanly());
980 }
981 
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)982 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
983   TestNode node(0);
984   AddNode(&node);
985 
986   PortRef A, B, C, D;
987   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
988   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
989 
990   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
991 
992   ScopedMessage message;
993   EXPECT_TRUE(node.ReadMessage(B, &message));
994   ASSERT_EQ(1u, message->num_ports());
995   EXPECT_TRUE(MessageEquals(message, "foo"));
996   PortRef E;
997   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
998 
999   EXPECT_TRUE(
1000       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1001 
1002   WaitForIdle();
1003 
1004   EXPECT_TRUE(
1005       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1006   EXPECT_FALSE(node.node().CanShutdownCleanly());
1007 
1008   EXPECT_EQ(OK, node.node().ClosePort(A));
1009   EXPECT_EQ(OK, node.node().ClosePort(B));
1010   EXPECT_EQ(OK, node.node().ClosePort(C));
1011   EXPECT_EQ(OK, node.node().ClosePort(E));
1012 
1013   WaitForIdle();
1014 
1015   EXPECT_TRUE(node.node().CanShutdownCleanly());
1016 }
1017 
TEST_F(PortsTest,ProxyCollapse1)1018 TEST_F(PortsTest, ProxyCollapse1) {
1019   TestNode node(0);
1020   AddNode(&node);
1021 
1022   PortRef A, B;
1023   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1024 
1025   PortRef X, Y;
1026   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1027 
1028   ScopedMessage message;
1029 
1030   // Send B and receive it as C.
1031   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1032   ASSERT_TRUE(node.ReadMessage(Y, &message));
1033   ASSERT_EQ(1u, message->num_ports());
1034   PortRef C;
1035   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1036 
1037   // Send C and receive it as D.
1038   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1039   ASSERT_TRUE(node.ReadMessage(Y, &message));
1040   ASSERT_EQ(1u, message->num_ports());
1041   PortRef D;
1042   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1043 
1044   // Send D and receive it as E.
1045   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1046   ASSERT_TRUE(node.ReadMessage(Y, &message));
1047   ASSERT_EQ(1u, message->num_ports());
1048   PortRef E;
1049   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1050 
1051   EXPECT_EQ(OK, node.node().ClosePort(X));
1052   EXPECT_EQ(OK, node.node().ClosePort(Y));
1053 
1054   EXPECT_EQ(OK, node.node().ClosePort(A));
1055   EXPECT_EQ(OK, node.node().ClosePort(E));
1056 
1057   // The node should not idle until all proxies are collapsed.
1058   WaitForIdle();
1059 
1060   EXPECT_TRUE(node.node().CanShutdownCleanly());
1061 }
1062 
TEST_F(PortsTest,ProxyCollapse2)1063 TEST_F(PortsTest, ProxyCollapse2) {
1064   TestNode node(0);
1065   AddNode(&node);
1066 
1067   PortRef A, B;
1068   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1069 
1070   PortRef X, Y;
1071   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1072 
1073   ScopedMessage message;
1074 
1075   // Send B and A to create proxies in each direction.
1076   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1077   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1078 
1079   EXPECT_EQ(OK, node.node().ClosePort(X));
1080   EXPECT_EQ(OK, node.node().ClosePort(Y));
1081 
1082   // At this point we have a scenario with:
1083   //
1084   // D -> [B] -> C -> [A]
1085   //
1086   // Ensure that the proxies can collapse. The sent ports will be closed
1087   // eventually as a result of Y's closure.
1088 
1089   WaitForIdle();
1090 
1091   EXPECT_TRUE(node.node().CanShutdownCleanly());
1092 }
1093 
TEST_F(PortsTest,SendWithClosedPeer)1094 TEST_F(PortsTest, SendWithClosedPeer) {
1095   // This tests that if a port is sent when its peer is already known to be
1096   // closed, the newly created port will be aware of that peer closure, and the
1097   // proxy will eventually collapse.
1098 
1099   TestNode node(0);
1100   AddNode(&node);
1101 
1102   // Send a message from A to B, then close A.
1103   PortRef A, B;
1104   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1105   EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1106   EXPECT_EQ(OK, node.node().ClosePort(A));
1107 
1108   // Now send B over X-Y as new port C.
1109   PortRef X, Y;
1110   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1111   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1112   ScopedMessage message;
1113   ASSERT_TRUE(node.ReadMessage(Y, &message));
1114   ASSERT_EQ(1u, message->num_ports());
1115   PortRef C;
1116   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1117 
1118   EXPECT_EQ(OK, node.node().ClosePort(X));
1119   EXPECT_EQ(OK, node.node().ClosePort(Y));
1120 
1121   WaitForIdle();
1122 
1123   // C should have received the message originally sent to B, and it should also
1124   // be aware of A's closure.
1125 
1126   ASSERT_TRUE(node.ReadMessage(C, &message));
1127   EXPECT_TRUE(MessageEquals(message, "hey"));
1128 
1129   PortStatus status;
1130   EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1131   EXPECT_FALSE(status.receiving_messages);
1132   EXPECT_FALSE(status.has_messages);
1133   EXPECT_TRUE(status.peer_closed);
1134 
1135   node.node().ClosePort(C);
1136 
1137   WaitForIdle();
1138 
1139   EXPECT_TRUE(node.node().CanShutdownCleanly());
1140 }
1141 
TEST_F(PortsTest,SendWithClosedPeerSent)1142 TEST_F(PortsTest, SendWithClosedPeerSent) {
1143   // This tests that if a port is closed while some number of proxies are still
1144   // routing messages (directly or indirectly) to it, that the peer port is
1145   // eventually notified of the closure, and the dead-end proxies will
1146   // eventually be removed.
1147 
1148   TestNode node(0);
1149   AddNode(&node);
1150 
1151   PortRef X, Y;
1152   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1153 
1154   PortRef A, B;
1155   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1156 
1157   ScopedMessage message;
1158 
1159   // Send A as new port C.
1160   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1161 
1162   ASSERT_TRUE(node.ReadMessage(Y, &message));
1163   ASSERT_EQ(1u, message->num_ports());
1164   PortRef C;
1165   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1166 
1167   // Send C as new port D.
1168   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1169 
1170   ASSERT_TRUE(node.ReadMessage(Y, &message));
1171   ASSERT_EQ(1u, message->num_ports());
1172   PortRef D;
1173   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1174 
1175   // Send a message to B through D, then close D.
1176   EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1177   EXPECT_EQ(OK, node.node().ClosePort(D));
1178 
1179   // Now send B as new port E.
1180 
1181   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1182   EXPECT_EQ(OK, node.node().ClosePort(X));
1183 
1184   ASSERT_TRUE(node.ReadMessage(Y, &message));
1185   ASSERT_EQ(1u, message->num_ports());
1186   PortRef E;
1187   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1188 
1189   EXPECT_EQ(OK, node.node().ClosePort(Y));
1190 
1191   WaitForIdle();
1192 
1193   // E should receive the message originally sent to B, and it should also be
1194   // aware of D's closure.
1195 
1196   ASSERT_TRUE(node.ReadMessage(E, &message));
1197   EXPECT_TRUE(MessageEquals(message, "hey"));
1198 
1199   PortStatus status;
1200   EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1201   EXPECT_FALSE(status.receiving_messages);
1202   EXPECT_FALSE(status.has_messages);
1203   EXPECT_TRUE(status.peer_closed);
1204 
1205   EXPECT_EQ(OK, node.node().ClosePort(E));
1206 
1207   WaitForIdle();
1208 
1209   EXPECT_TRUE(node.node().CanShutdownCleanly());
1210 }
1211 
TEST_F(PortsTest,MergePorts)1212 TEST_F(PortsTest, MergePorts) {
1213   TestNode node0(0);
1214   AddNode(&node0);
1215 
1216   TestNode node1(1);
1217   AddNode(&node1);
1218 
1219   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1220   PortRef A, B, C, D;
1221   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1222   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1223 
1224   // Write a message on A.
1225   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1226 
1227   // Initiate a merge between B and C.
1228   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1229 
1230   WaitForIdle();
1231 
1232   // Expect all proxies to be gone once idle.
1233   EXPECT_TRUE(
1234       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1235   EXPECT_TRUE(
1236       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1237 
1238   // Expect D to have received the message sent on A.
1239   ScopedMessage message;
1240   ASSERT_TRUE(node1.ReadMessage(D, &message));
1241   EXPECT_TRUE(MessageEquals(message, "hey"));
1242 
1243   EXPECT_EQ(OK, node0.node().ClosePort(A));
1244   EXPECT_EQ(OK, node1.node().ClosePort(D));
1245 
1246   // No more ports should be open.
1247   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1248   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1249 }
1250 
TEST_F(PortsTest,MergePortWithClosedPeer1)1251 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1252   // This tests that the right thing happens when initiating a merge on a port
1253   // whose peer has already been closed.
1254 
1255   TestNode node0(0);
1256   AddNode(&node0);
1257 
1258   TestNode node1(1);
1259   AddNode(&node1);
1260 
1261   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1262   PortRef A, B, C, D;
1263   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1264   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1265 
1266   // Write a message on A.
1267   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1268 
1269   // Close A.
1270   EXPECT_EQ(OK, node0.node().ClosePort(A));
1271 
1272   // Initiate a merge between B and C.
1273   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1274 
1275   WaitForIdle();
1276 
1277   // Expect all proxies to be gone once idle. node0 should have no ports since
1278   // A was explicitly closed.
1279   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1280   EXPECT_TRUE(
1281       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1282 
1283   // Expect D to have received the message sent on A.
1284   ScopedMessage message;
1285   ASSERT_TRUE(node1.ReadMessage(D, &message));
1286   EXPECT_TRUE(MessageEquals(message, "hey"));
1287 
1288   EXPECT_EQ(OK, node1.node().ClosePort(D));
1289 
1290   // No more ports should be open.
1291   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1292   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1293 }
1294 
TEST_F(PortsTest,MergePortWithClosedPeer2)1295 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1296   // This tests that the right thing happens when merging into a port whose peer
1297   // has already been closed.
1298 
1299   TestNode node0(0);
1300   AddNode(&node0);
1301 
1302   TestNode node1(1);
1303   AddNode(&node1);
1304 
1305   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1306   PortRef A, B, C, D;
1307   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1308   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1309 
1310   // Write a message on D and close it.
1311   EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1312   EXPECT_EQ(OK, node1.node().ClosePort(D));
1313 
1314   // Initiate a merge between B and C.
1315   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1316 
1317   WaitForIdle();
1318 
1319   // Expect all proxies to be gone once idle. node1 should have no ports since
1320   // D was explicitly closed.
1321   EXPECT_TRUE(
1322       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1323   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1324 
1325   // Expect A to have received the message sent on D.
1326   ScopedMessage message;
1327   ASSERT_TRUE(node0.ReadMessage(A, &message));
1328   EXPECT_TRUE(MessageEquals(message, "hey"));
1329 
1330   EXPECT_EQ(OK, node0.node().ClosePort(A));
1331 
1332   // No more ports should be open.
1333   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1334   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1335 }
1336 
TEST_F(PortsTest,MergePortsWithClosedPeers)1337 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1338   // This tests that no residual ports are left behind if two ports are merged
1339   // when both of their peers have been closed.
1340 
1341   TestNode node0(0);
1342   AddNode(&node0);
1343 
1344   TestNode node1(1);
1345   AddNode(&node1);
1346 
1347   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1348   PortRef A, B, C, D;
1349   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1350   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1351 
1352   // Close A and D.
1353   EXPECT_EQ(OK, node0.node().ClosePort(A));
1354   EXPECT_EQ(OK, node1.node().ClosePort(D));
1355 
1356   WaitForIdle();
1357 
1358   // Initiate a merge between B and C.
1359   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1360 
1361   WaitForIdle();
1362 
1363   // Expect everything to have gone away.
1364   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1365   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1366 }
1367 
TEST_F(PortsTest,MergePortsWithMovedPeers)1368 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1369   // This tests that ports can be merged successfully even if their peers are
1370   // moved around.
1371 
1372   TestNode node0(0);
1373   AddNode(&node0);
1374 
1375   TestNode node1(1);
1376   AddNode(&node1);
1377 
1378   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1379   PortRef A, B, C, D;
1380   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1381   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1382 
1383   // Set up another pair X-Y for moving ports on node0.
1384   PortRef X, Y;
1385   EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1386 
1387   ScopedMessage message;
1388 
1389   // Move A to new port E.
1390   EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1391   ASSERT_TRUE(node0.ReadMessage(Y, &message));
1392   ASSERT_EQ(1u, message->num_ports());
1393   PortRef E;
1394   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1395 
1396   EXPECT_EQ(OK, node0.node().ClosePort(X));
1397   EXPECT_EQ(OK, node0.node().ClosePort(Y));
1398 
1399   // Write messages on E and D.
1400   EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1401   EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1402 
1403   // Initiate a merge between B and C.
1404   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1405 
1406   WaitForIdle();
1407 
1408   // Expect to receive D's message on E and E's message on D.
1409   ASSERT_TRUE(node0.ReadMessage(E, &message));
1410   EXPECT_TRUE(MessageEquals(message, "hi"));
1411   ASSERT_TRUE(node1.ReadMessage(D, &message));
1412   EXPECT_TRUE(MessageEquals(message, "hey"));
1413 
1414   // Close E and D.
1415   EXPECT_EQ(OK, node0.node().ClosePort(E));
1416   EXPECT_EQ(OK, node1.node().ClosePort(D));
1417 
1418   WaitForIdle();
1419 
1420   // Expect everything to have gone away.
1421   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1422   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1423 }
1424 
TEST_F(PortsTest,MergePortsFailsGracefully)1425 TEST_F(PortsTest, MergePortsFailsGracefully) {
1426   // This tests that the system remains in a well-defined state if something
1427   // goes wrong during port merge.
1428 
1429   TestNode node0(0);
1430   AddNode(&node0);
1431 
1432   TestNode node1(1);
1433   AddNode(&node1);
1434 
1435   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1436   PortRef A, B, C, D;
1437   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1438   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1439 
1440   ScopedMessage message;
1441   PortRef X, Y;
1442   EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1443   EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1444   EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1445   EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1446 
1447   // Block the merge from proceeding until we can do something stupid with port
1448   // C. This avoids the test logic racing with async merge logic.
1449   node1.BlockOnEvent(Event::Type::kMergePort);
1450 
1451   // Initiate the merge between B and C.
1452   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1453 
1454   // Move C to a new port E. This is not a sane use of Node's public API but
1455   // is still hypothetically possible. It allows us to force a merge failure
1456   // because C will be in an invalid state by the time the merge is processed.
1457   // As a result, B should be closed.
1458   EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1459 
1460   node1.Unblock();
1461 
1462   WaitForIdle();
1463 
1464   ASSERT_TRUE(node0.ReadMessage(X, &message));
1465   ASSERT_EQ(1u, message->num_ports());
1466   PortRef E;
1467   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1468 
1469   EXPECT_EQ(OK, node0.node().ClosePort(X));
1470   EXPECT_EQ(OK, node1.node().ClosePort(Y));
1471 
1472   WaitForIdle();
1473 
1474   // C goes away as a result of normal proxy removal. B should have been closed
1475   // cleanly by the failed MergePorts.
1476   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1477   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1478 
1479   // Close A, D, and E.
1480   EXPECT_EQ(OK, node0.node().ClosePort(A));
1481   EXPECT_EQ(OK, node1.node().ClosePort(D));
1482   EXPECT_EQ(OK, node0.node().ClosePort(E));
1483 
1484   WaitForIdle();
1485 
1486   // Expect everything to have gone away.
1487   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1488   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1489 }
1490 
TEST_F(PortsTest,RemotePeerStatus)1491 TEST_F(PortsTest, RemotePeerStatus) {
1492   TestNode node0(0);
1493   AddNode(&node0);
1494 
1495   TestNode node1(1);
1496   AddNode(&node1);
1497 
1498   // Create a local port pair. Neither port should appear to have a remote peer.
1499   PortRef a, b;
1500   PortStatus status;
1501   node0.node().CreatePortPair(&a, &b);
1502   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1503   EXPECT_FALSE(status.peer_remote);
1504   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1505   EXPECT_FALSE(status.peer_remote);
1506 
1507   // Create a port pair spanning the two nodes. Both spanning ports should
1508   // immediately appear to have a remote peer.
1509   PortRef x0, x1;
1510   CreatePortPair(&node0, &x0, &node1, &x1);
1511 
1512   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1513   EXPECT_TRUE(status.peer_remote);
1514   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1515   EXPECT_TRUE(status.peer_remote);
1516 
1517   PortRef x2, x3;
1518   CreatePortPair(&node0, &x2, &node1, &x3);
1519 
1520   // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1521   // remote and the remote peers local.
1522   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1523   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1524   WaitForIdle();
1525 
1526   ScopedMessage message;
1527   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1528   ASSERT_EQ(1u, message->num_ports());
1529   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1530 
1531   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1532   ASSERT_EQ(1u, message->num_ports());
1533   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1534 
1535   // Now x0-x1 should be local to node0 and a-b should span the nodes.
1536   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1537   EXPECT_FALSE(status.peer_remote);
1538   ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1539   EXPECT_FALSE(status.peer_remote);
1540   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1541   EXPECT_TRUE(status.peer_remote);
1542   ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1543   EXPECT_TRUE(status.peer_remote);
1544 
1545   // And swap them back one more time.
1546   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1547   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1548   WaitForIdle();
1549 
1550   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1551   ASSERT_EQ(1u, message->num_ports());
1552   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1553 
1554   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1555   ASSERT_EQ(1u, message->num_ports());
1556   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1557 
1558   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1559   EXPECT_TRUE(status.peer_remote);
1560   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1561   EXPECT_TRUE(status.peer_remote);
1562   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1563   EXPECT_FALSE(status.peer_remote);
1564   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1565   EXPECT_FALSE(status.peer_remote);
1566 
1567   EXPECT_EQ(OK, node0.node().ClosePort(x0));
1568   EXPECT_EQ(OK, node1.node().ClosePort(x1));
1569   EXPECT_EQ(OK, node0.node().ClosePort(x2));
1570   EXPECT_EQ(OK, node1.node().ClosePort(x3));
1571   EXPECT_EQ(OK, node0.node().ClosePort(a));
1572   EXPECT_EQ(OK, node0.node().ClosePort(b));
1573 
1574   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1575   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1576 }
1577 
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1578 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1579   TestNode node0(0);
1580   AddNode(&node0);
1581 
1582   TestNode node1(1);
1583   AddNode(&node1);
1584 
1585   // Set up a-b on node0 and c-d spanning node0-node1.
1586   PortRef a, b, c, d;
1587   node0.node().CreatePortPair(&a, &b);
1588   CreatePortPair(&node0, &c, &node1, &d);
1589 
1590   PortStatus status;
1591   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1592   EXPECT_FALSE(status.peer_remote);
1593   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1594   EXPECT_FALSE(status.peer_remote);
1595   ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1596   EXPECT_TRUE(status.peer_remote);
1597   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1598   EXPECT_TRUE(status.peer_remote);
1599 
1600   EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1601   WaitForIdle();
1602 
1603   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1604   EXPECT_TRUE(status.peer_remote);
1605   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1606   EXPECT_TRUE(status.peer_remote);
1607 
1608   EXPECT_EQ(OK, node0.node().ClosePort(a));
1609   EXPECT_EQ(OK, node1.node().ClosePort(d));
1610   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1611   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1612 }
1613 
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1614 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1615   TestNode node0(0);
1616   AddNode(&node0);
1617 
1618   TestNode node1(1);
1619   AddNode(&node1);
1620 
1621   // Set up a-b on node0 and c-d on node1.
1622   PortRef a, b, c, d;
1623   node0.node().CreatePortPair(&a, &b);
1624   node1.node().CreatePortPair(&c, &d);
1625 
1626   PortStatus status;
1627   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1628   EXPECT_FALSE(status.peer_remote);
1629   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1630   EXPECT_FALSE(status.peer_remote);
1631   ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1632   EXPECT_FALSE(status.peer_remote);
1633   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1634   EXPECT_FALSE(status.peer_remote);
1635 
1636   EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1637   WaitForIdle();
1638 
1639   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1640   EXPECT_TRUE(status.peer_remote);
1641   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1642   EXPECT_TRUE(status.peer_remote);
1643 
1644   EXPECT_EQ(OK, node0.node().ClosePort(a));
1645   EXPECT_EQ(OK, node1.node().ClosePort(d));
1646   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1647   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1648 }
1649 
TEST_F(PortsTest,RetransmitUserMessageEvents)1650 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1651   // Ensures that user message events can be retransmitted properly.
1652   TestNode node0(0);
1653   AddNode(&node0);
1654 
1655   PortRef a, b;
1656   node0.node().CreatePortPair(&a, &b);
1657 
1658   // Ping.
1659   const char* kMessage = "hey";
1660   ScopedMessage message;
1661   EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1662   ASSERT_TRUE(node0.ReadMessage(b, &message));
1663   EXPECT_TRUE(MessageEquals(message, kMessage));
1664 
1665   // Pong.
1666   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1667   EXPECT_FALSE(message);
1668   ASSERT_TRUE(node0.ReadMessage(a, &message));
1669   EXPECT_TRUE(MessageEquals(message, kMessage));
1670 
1671   // Ping again.
1672   EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1673   EXPECT_FALSE(message);
1674   ASSERT_TRUE(node0.ReadMessage(b, &message));
1675   EXPECT_TRUE(MessageEquals(message, kMessage));
1676 
1677   // Pong again!
1678   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1679   EXPECT_FALSE(message);
1680   ASSERT_TRUE(node0.ReadMessage(a, &message));
1681   EXPECT_TRUE(MessageEquals(message, kMessage));
1682 
1683   EXPECT_EQ(OK, node0.node().ClosePort(a));
1684   EXPECT_EQ(OK, node0.node().ClosePort(b));
1685 }
1686 
TEST_F(PortsTest,SetAcknowledgeRequestInterval)1687 TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
1688   TestNode node0(0);
1689   AddNode(&node0);
1690 
1691   PortRef a0, a1;
1692   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
1693   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1694 
1695   // Send a batch of messages.
1696   EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
1697   EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1698   EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1699   WaitForIdle();
1700   EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1701 
1702   // Set to acknowledge every read message, and validate that already-read
1703   // messages are acknowledged.
1704   EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
1705   WaitForIdle();
1706   EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
1707 
1708   // Read a third of the messages from the other end.
1709   EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1710   WaitForIdle();
1711 
1712   EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
1713 
1714   TestNode node1(1);
1715   AddNode(&node1);
1716 
1717   // Transfer a1 across to node1.
1718   PortRef x0, x1;
1719   CreatePortPair(&node0, &x0, &node1, &x1);
1720   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
1721   WaitForIdle();
1722 
1723   ScopedMessage message;
1724   ASSERT_TRUE(node1.ReadMessage(x1, &message));
1725   ASSERT_EQ(1u, message->num_ports());
1726   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
1727 
1728   // Read the last third of the messages from the transferred node, and
1729   // validate that the unacknowledge message count updates correctly.
1730   EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
1731   WaitForIdle();
1732   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1733 
1734   // Turn the acknowledges down and validate that they don't go on indefinitely.
1735   EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
1736   EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
1737   WaitForIdle();
1738   EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
1739   WaitForIdle();
1740   EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
1741 
1742   // Close the far port and validate that the closure updates the unacknowledged
1743   // count.
1744   EXPECT_EQ(OK, node1.node().ClosePort(a1));
1745   WaitForIdle();
1746   EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1747 
1748   EXPECT_EQ(OK, node0.node().ClosePort(a0));
1749   EXPECT_EQ(OK, node0.node().ClosePort(x0));
1750   EXPECT_EQ(OK, node1.node().ClosePort(x1));
1751 }
1752 
1753 }  // namespace test
1754 }  // namespace ports
1755 }  // namespace core
1756 }  // namespace mojo
1757