1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 #include <map>
11 #include <sstream>
12 #include <utility>
13
14 #include "base/logging.h"
15 #include "base/waitable_event.h"
16 #include "base/thread.h"
17 #include "base/string_piece.h"
18 #include "base/string_util.h"
19 #include "mojo/core/ports/event.h"
20 #include "mojo/core/ports/node.h"
21 #include "mojo/core/ports/node_delegate.h"
22 #include "mojo/core/ports/user_message.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24
25 #include "mozilla/Mutex.h"
26
27 namespace mojo {
28 namespace core {
29 namespace ports {
30 namespace test {
31
32 namespace {
33
34 // TODO(rockot): Remove this unnecessary alias.
35 using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>;
36
37 class TestMessage : public UserMessage {
38 public:
39 static const TypeInfo kUserMessageTypeInfo;
40
TestMessage(const std::string & payload)41 explicit TestMessage(const std::string& payload)
42 : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
43 ~TestMessage() override = default;
44
payload() const45 const std::string& payload() const { return payload_; }
46
47 private:
48 std::string payload_;
49 };
50
51 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
52
NewUserMessageEvent(const std::string & payload,size_t num_ports)53 ScopedMessage NewUserMessageEvent(const std::string& payload,
54 size_t num_ports) {
55 auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports);
56 event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload));
57 return event;
58 }
59
MessageEquals(const ScopedMessage & message,const std::string & s)60 bool MessageEquals(const ScopedMessage& message, const std::string& s) {
61 return message->GetMessage<TestMessage>()->payload() == s;
62 }
63
64 class TestNode;
65
66 class MessageRouter {
67 public:
68 virtual ~MessageRouter() = default;
69
70 virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
71 ScopedEvent event) = 0;
72 virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
73 };
74
75 class TestNode : public NodeDelegate {
76 public:
TestNode(uint64_t id)77 explicit TestNode(uint64_t id)
78 : node_name_(id, 1),
79 node_(node_name_, this),
80 node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()),
81 events_available_event_(/* manual_reset */ false,
82 /* initially_signaled */ false),
83 idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {}
84
~TestNode()85 ~TestNode() override {
86 StopWhenIdle();
87 node_thread_.Stop();
88 }
89
name() const90 const NodeName& name() const { return node_name_; }
91
92 // NOTE: Node is thread-safe.
node()93 Node& node() { return node_; }
94
idle_event()95 base::WaitableEvent& idle_event() { return idle_event_; }
96
IsIdle()97 bool IsIdle() {
98 mozilla::MutexAutoLock lock(lock_);
99 return started_ && !dispatching_ &&
100 (incoming_events_.empty() || (block_on_event_ && blocked_));
101 }
102
BlockOnEvent(Event::Type type)103 void BlockOnEvent(Event::Type type) {
104 mozilla::MutexAutoLock lock(lock_);
105 blocked_event_type_ = type;
106 block_on_event_ = true;
107 }
108
Unblock()109 void Unblock() {
110 mozilla::MutexAutoLock lock(lock_);
111 block_on_event_ = false;
112 events_available_event_.Signal();
113 }
114
Start(MessageRouter * router)115 void Start(MessageRouter* router) {
116 router_ = router;
117 node_thread_.Start();
118 node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod(
119 "TestNode::ProcessEvents", this, &TestNode::ProcessEvents));
120 }
121
StopWhenIdle()122 void StopWhenIdle() {
123 mozilla::MutexAutoLock lock(lock_);
124 should_quit_ = true;
125 events_available_event_.Signal();
126 }
127
WakeUp()128 void WakeUp() { events_available_event_.Signal(); }
129
SendStringMessage(const PortRef & port,const std::string & s)130 int SendStringMessage(const PortRef& port, const std::string& s) {
131 return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
132 }
133
SendMultipleMessages(const PortRef & port,size_t num_messages)134 int SendMultipleMessages(const PortRef& port, size_t num_messages) {
135 for (size_t i = 0; i < num_messages; ++i) {
136 int result = SendStringMessage(port, "");
137 if (result != OK) {
138 return result;
139 }
140 }
141 return OK;
142 }
143
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)144 int SendStringMessageWithPort(const PortRef& port, const std::string& s,
145 const PortName& sent_port_name) {
146 auto event = NewUserMessageEvent(s, 1);
147 event->ports()[0] = sent_port_name;
148 return node_.SendUserMessage(port, std::move(event));
149 }
150
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)151 int SendStringMessageWithPort(const PortRef& port, const std::string& s,
152 const PortRef& sent_port) {
153 return SendStringMessageWithPort(port, s, sent_port.name());
154 }
155
set_drop_messages(bool value)156 void set_drop_messages(bool value) {
157 mozilla::MutexAutoLock lock(lock_);
158 drop_messages_ = value;
159 }
160
set_save_messages(bool value)161 void set_save_messages(bool value) {
162 mozilla::MutexAutoLock lock(lock_);
163 save_messages_ = value;
164 }
165
ReadMessage(const PortRef & port,ScopedMessage * message)166 bool ReadMessage(const PortRef& port, ScopedMessage* message) {
167 return node_.GetMessage(port, message, nullptr) == OK && *message;
168 }
169
ReadMultipleMessages(const PortRef & port,size_t num_messages)170 bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
171 for (size_t i = 0; i < num_messages; ++i) {
172 ScopedMessage message;
173 if (!ReadMessage(port, &message)) {
174 return false;
175 }
176 }
177 return true;
178 }
179
GetSavedMessage(ScopedMessage * message)180 bool GetSavedMessage(ScopedMessage* message) {
181 mozilla::MutexAutoLock lock(lock_);
182 if (saved_messages_.empty()) {
183 message->reset();
184 return false;
185 }
186 std::swap(*message, saved_messages_.front());
187 saved_messages_.pop();
188 return true;
189 }
190
EnqueueEvent(ScopedEvent event)191 void EnqueueEvent(ScopedEvent event) {
192 idle_event_.Reset();
193
194 // NOTE: This may be called from ForwardMessage and thus must not reenter
195 // |node_|.
196 mozilla::MutexAutoLock lock(lock_);
197 incoming_events_.emplace(std::move(event));
198 events_available_event_.Signal();
199 }
200
ForwardEvent(const NodeName & node_name,ScopedEvent event)201 void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
202 {
203 mozilla::MutexAutoLock lock(lock_);
204 if (drop_messages_) {
205 DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
206 << node_name;
207
208 mozilla::MutexAutoUnlock unlock(lock_);
209 ClosePortsInEvent(event.get());
210 return;
211 }
212 }
213
214 DCHECK(router_);
215 DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
216 router_->ForwardEvent(this, node_name, std::move(event));
217 }
218
BroadcastEvent(ScopedEvent event)219 void BroadcastEvent(ScopedEvent event) override {
220 router_->BroadcastEvent(this, std::move(event));
221 }
222
PortStatusChanged(const PortRef & port)223 void PortStatusChanged(const PortRef& port) override {
224 // The port may be closed, in which case we ignore the notification.
225 mozilla::MutexAutoLock lock(lock_);
226 if (!save_messages_) {
227 return;
228 }
229
230 for (;;) {
231 ScopedMessage message;
232 {
233 mozilla::MutexAutoUnlock unlock(lock_);
234 if (!ReadMessage(port, &message)) {
235 break;
236 }
237 }
238
239 saved_messages_.emplace(std::move(message));
240 }
241 }
242
ClosePortsInEvent(Event * event)243 void ClosePortsInEvent(Event* event) {
244 if (event->type() != Event::Type::kUserMessage) {
245 return;
246 }
247
248 UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
249 for (size_t i = 0; i < message_event->num_ports(); ++i) {
250 PortRef port;
251 ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
252 EXPECT_EQ(OK, node_.ClosePort(port));
253 }
254 }
255
GetUnacknowledgedMessageCount(const PortRef & port_ref)256 uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
257 PortStatus status{};
258 if (node_.GetStatus(port_ref, &status) != OK) {
259 return 0;
260 }
261
262 return status.unacknowledged_message_count;
263 }
264
265 private:
ProcessEvents()266 void ProcessEvents() {
267 for (;;) {
268 events_available_event_.Wait();
269 mozilla::MutexAutoLock lock(lock_);
270
271 if (should_quit_) {
272 return;
273 }
274
275 dispatching_ = true;
276 while (!incoming_events_.empty()) {
277 if (block_on_event_ &&
278 incoming_events_.front()->type() == blocked_event_type_) {
279 blocked_ = true;
280 // Go idle if we hit a blocked event type.
281 break;
282 }
283 blocked_ = false;
284
285 ScopedEvent event = std::move(incoming_events_.front());
286 incoming_events_.pop();
287
288 // NOTE: AcceptMessage() can re-enter this object to call any of the
289 // NodeDelegate interface methods.
290 mozilla::MutexAutoUnlock unlock(lock_);
291 node_.AcceptEvent(std::move(event));
292 }
293
294 dispatching_ = false;
295 started_ = true;
296 idle_event_.Signal();
297 };
298 }
299
300 const NodeName node_name_;
301 Node node_;
302 MessageRouter* router_ = nullptr;
303
304 base::Thread node_thread_;
305 base::WaitableEvent events_available_event_;
306 base::WaitableEvent idle_event_;
307
308 // Guards fields below.
309 mozilla::Mutex lock_{"TestNode"};
310 bool started_ = false;
311 bool dispatching_ = false;
312 bool should_quit_ = false;
313 bool drop_messages_ = false;
314 bool save_messages_ = false;
315 bool blocked_ = false;
316 bool block_on_event_ = false;
317 Event::Type blocked_event_type_{};
318 std::queue<ScopedEvent> incoming_events_;
319 std::queue<ScopedMessage> saved_messages_;
320 };
321
322 class PortsTest : public testing::Test, public MessageRouter {
323 public:
AddNode(TestNode * node)324 void AddNode(TestNode* node) {
325 {
326 mozilla::MutexAutoLock lock(lock_);
327 nodes_[node->name()] = node;
328 }
329 node->Start(this);
330 }
331
RemoveNode(TestNode * node)332 void RemoveNode(TestNode* node) {
333 {
334 mozilla::MutexAutoLock lock(lock_);
335 nodes_.erase(node->name());
336 }
337
338 for (const auto& entry : nodes_) {
339 entry.second->node().LostConnectionToNode(node->name());
340 }
341 }
342
343 // Waits until all known Nodes are idle. Message forwarding and processing
344 // is handled in such a way that idleness is a stable state: once all nodes in
345 // the system are idle, they will remain idle until the test explicitly
346 // initiates some further event (e.g. sending a message, closing a port, or
347 // removing a Node).
WaitForIdle()348 void WaitForIdle() {
349 for (;;) {
350 mozilla::MutexAutoLock global_lock(global_lock_);
351 bool all_nodes_idle = true;
352 for (const auto& entry : nodes_) {
353 if (!entry.second->IsIdle()) {
354 all_nodes_idle = false;
355 }
356 entry.second->WakeUp();
357 }
358 if (all_nodes_idle) {
359 return;
360 }
361
362 // Wait for any Node to signal that it's idle.
363 mozilla::MutexAutoUnlock global_unlock(global_lock_);
364 std::vector<base::WaitableEvent*> events;
365 for (const auto& entry : nodes_) {
366 events.push_back(&entry.second->idle_event());
367 }
368 base::WaitableEvent::WaitMany(events.data(), events.size());
369 }
370 }
371
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)372 void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
373 PortRef* port1) {
374 if (node0 == node1) {
375 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
376 } else {
377 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
378 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
379 EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
380 port1->name()));
381 EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
382 port0->name()));
383 }
384 }
385
386 private:
387 // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)388 void ForwardEvent(TestNode* from_node, const NodeName& node_name,
389 ScopedEvent event) override {
390 mozilla::MutexAutoLock global_lock(global_lock_);
391 mozilla::MutexAutoLock lock(lock_);
392 // Drop messages from nodes that have been removed.
393 if (nodes_.find(from_node->name()) == nodes_.end()) {
394 from_node->ClosePortsInEvent(event.get());
395 return;
396 }
397
398 auto it = nodes_.find(node_name);
399 if (it == nodes_.end()) {
400 DVLOG(1) << "Node not found: " << node_name;
401 return;
402 }
403
404 // Serialize and de-serialize all forwarded events.
405 size_t buf_size = event->GetSerializedSize();
406 mozilla::UniquePtr<char[]> buf(new char[buf_size]);
407 event->Serialize(buf.get());
408 ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
409 // This should always succeed unless serialization or deserialization
410 // is broken. In that case, the loss of events should cause a test failure.
411 ASSERT_TRUE(copy);
412
413 // Also copy the payload for user messages.
414 if (event->type() == Event::Type::kUserMessage) {
415 UserMessageEvent* message_event =
416 static_cast<UserMessageEvent*>(event.get());
417 UserMessageEvent* message_copy =
418 static_cast<UserMessageEvent*>(copy.get());
419
420 message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>(
421 message_event->GetMessage<TestMessage>()->payload()));
422 }
423
424 it->second->EnqueueEvent(std::move(event));
425 }
426
BroadcastEvent(TestNode * from_node,ScopedEvent event)427 void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
428 mozilla::MutexAutoLock global_lock(global_lock_);
429 mozilla::MutexAutoLock lock(lock_);
430
431 // Drop messages from nodes that have been removed.
432 if (nodes_.find(from_node->name()) == nodes_.end()) {
433 return;
434 }
435
436 for (const auto& entry : nodes_) {
437 TestNode* node = entry.second;
438 // Broadcast doesn't deliver to the local node.
439 if (node == from_node) {
440 continue;
441 }
442 node->EnqueueEvent(event->Clone());
443 }
444 }
445
446 // Acquired before any operation which makes a Node busy, and before testing
447 // if all nodes are idle.
448 mozilla::Mutex global_lock_{"PortsTest Global Lock"};
449
450 mozilla::Mutex lock_{"PortsTest Lock"};
451 std::map<NodeName, TestNode*> nodes_;
452 };
453
454 } // namespace
455
TEST_F(PortsTest,Basic1)456 TEST_F(PortsTest, Basic1) {
457 TestNode node0(0);
458 AddNode(&node0);
459
460 TestNode node1(1);
461 AddNode(&node1);
462
463 PortRef x0, x1;
464 CreatePortPair(&node0, &x0, &node1, &x1);
465
466 PortRef a0, a1;
467 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
468 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
469 EXPECT_EQ(OK, node0.node().ClosePort(a0));
470
471 EXPECT_EQ(OK, node0.node().ClosePort(x0));
472 EXPECT_EQ(OK, node1.node().ClosePort(x1));
473
474 WaitForIdle();
475
476 EXPECT_TRUE(node0.node().CanShutdownCleanly());
477 EXPECT_TRUE(node1.node().CanShutdownCleanly());
478 }
479
TEST_F(PortsTest,Basic2)480 TEST_F(PortsTest, Basic2) {
481 TestNode node0(0);
482 AddNode(&node0);
483
484 TestNode node1(1);
485 AddNode(&node1);
486
487 PortRef x0, x1;
488 CreatePortPair(&node0, &x0, &node1, &x1);
489
490 PortRef b0, b1;
491 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
492 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
493 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
494
495 EXPECT_EQ(OK, node0.node().ClosePort(b0));
496
497 EXPECT_EQ(OK, node0.node().ClosePort(x0));
498 EXPECT_EQ(OK, node1.node().ClosePort(x1));
499
500 WaitForIdle();
501
502 EXPECT_TRUE(node0.node().CanShutdownCleanly());
503 EXPECT_TRUE(node1.node().CanShutdownCleanly());
504 }
505
TEST_F(PortsTest,Basic3)506 TEST_F(PortsTest, Basic3) {
507 TestNode node0(0);
508 AddNode(&node0);
509
510 TestNode node1(1);
511 AddNode(&node1);
512
513 PortRef x0, x1;
514 CreatePortPair(&node0, &x0, &node1, &x1);
515
516 PortRef a0, a1;
517 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
518
519 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
520 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
521
522 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
523
524 PortRef b0, b1;
525 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
526 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
527 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
528
529 EXPECT_EQ(OK, node0.node().ClosePort(b0));
530
531 EXPECT_EQ(OK, node0.node().ClosePort(x0));
532 EXPECT_EQ(OK, node1.node().ClosePort(x1));
533
534 WaitForIdle();
535
536 EXPECT_TRUE(node0.node().CanShutdownCleanly());
537 EXPECT_TRUE(node1.node().CanShutdownCleanly());
538 }
539
TEST_F(PortsTest,LostConnectionToNode1)540 TEST_F(PortsTest, LostConnectionToNode1) {
541 TestNode node0(0);
542 AddNode(&node0);
543
544 TestNode node1(1);
545 AddNode(&node1);
546 node1.set_drop_messages(true);
547
548 PortRef x0, x1;
549 CreatePortPair(&node0, &x0, &node1, &x1);
550
551 // Transfer a port to node1 and simulate a lost connection to node1.
552
553 PortRef a0, a1;
554 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
555 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
556
557 WaitForIdle();
558
559 RemoveNode(&node1);
560
561 WaitForIdle();
562
563 EXPECT_EQ(OK, node0.node().ClosePort(a0));
564 EXPECT_EQ(OK, node0.node().ClosePort(x0));
565 EXPECT_EQ(OK, node1.node().ClosePort(x1));
566
567 WaitForIdle();
568
569 EXPECT_TRUE(node0.node().CanShutdownCleanly());
570 EXPECT_TRUE(node1.node().CanShutdownCleanly());
571 }
572
TEST_F(PortsTest,LostConnectionToNode2)573 TEST_F(PortsTest, LostConnectionToNode2) {
574 TestNode node0(0);
575 AddNode(&node0);
576
577 TestNode node1(1);
578 AddNode(&node1);
579
580 PortRef x0, x1;
581 CreatePortPair(&node0, &x0, &node1, &x1);
582
583 PortRef a0, a1;
584 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
585 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
586
587 WaitForIdle();
588
589 node1.set_drop_messages(true);
590
591 RemoveNode(&node1);
592
593 WaitForIdle();
594
595 // a0 should have eventually detected peer closure after node loss.
596 ScopedMessage message;
597 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
598 node0.node().GetMessage(a0, &message, nullptr));
599 EXPECT_FALSE(message);
600
601 EXPECT_EQ(OK, node0.node().ClosePort(a0));
602
603 EXPECT_EQ(OK, node0.node().ClosePort(x0));
604
605 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
606 EXPECT_TRUE(message);
607 node1.ClosePortsInEvent(message.get());
608
609 EXPECT_EQ(OK, node1.node().ClosePort(x1));
610
611 WaitForIdle();
612
613 EXPECT_TRUE(node0.node().CanShutdownCleanly());
614 EXPECT_TRUE(node1.node().CanShutdownCleanly());
615 }
616
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)617 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
618 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
619 // node.
620
621 TestNode node0(0);
622 AddNode(&node0);
623
624 TestNode node1(1);
625 AddNode(&node1);
626
627 TestNode node2(2);
628 AddNode(&node2);
629
630 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
631 PortRef A, B, C, D;
632 CreatePortPair(&node0, &A, &node1, &B);
633 CreatePortPair(&node1, &C, &node2, &D);
634
635 // Create E-F and send F over A to node 1.
636 PortRef E, F;
637 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
638 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
639
640 WaitForIdle();
641
642 ScopedMessage message;
643 ASSERT_TRUE(node1.ReadMessage(B, &message));
644 ASSERT_EQ(1u, message->num_ports());
645
646 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
647
648 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
649 // will trivially become aware of the loss, and this test verifies that the
650 // port A on node 0 will eventually also become aware of it.
651
652 // Make sure node2 stops processing events when it encounters an ObserveProxy.
653 node2.BlockOnEvent(Event::Type::kObserveProxy);
654
655 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
656 WaitForIdle();
657
658 // Simulate node 1 and 2 disconnecting.
659 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
660
661 // Let node2 continue processing events and wait for everyone to go idle.
662 node2.Unblock();
663 WaitForIdle();
664
665 // Port F should be gone.
666 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
667
668 // Port E should have detected peer closure despite the fact that there is
669 // no longer a continuous route from F to E over which the event could travel.
670 PortStatus status{};
671 EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
672 EXPECT_TRUE(status.peer_closed);
673
674 EXPECT_EQ(OK, node0.node().ClosePort(A));
675 EXPECT_EQ(OK, node1.node().ClosePort(B));
676 EXPECT_EQ(OK, node1.node().ClosePort(C));
677 EXPECT_EQ(OK, node0.node().ClosePort(E));
678
679 WaitForIdle();
680
681 EXPECT_TRUE(node0.node().CanShutdownCleanly());
682 EXPECT_TRUE(node1.node().CanShutdownCleanly());
683 }
684
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)685 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
686 // Tests that a proxy gets cleaned up when its direct peer lives on a lost
687 // node and it's predecessor lives on the same node.
688
689 TestNode node0(0);
690 AddNode(&node0);
691
692 TestNode node1(1);
693 AddNode(&node1);
694
695 PortRef A, B;
696 CreatePortPair(&node0, &A, &node1, &B);
697
698 PortRef C, D;
699 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
700
701 // Send D but block node0 on an ObserveProxy event.
702 node0.BlockOnEvent(Event::Type::kObserveProxy);
703 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
704
705 // node0 won't collapse the proxy but node1 will receive the message before
706 // going idle.
707 WaitForIdle();
708
709 ScopedMessage message;
710 ASSERT_TRUE(node1.ReadMessage(B, &message));
711 ASSERT_EQ(1u, message->num_ports());
712 PortRef E;
713 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
714
715 RemoveNode(&node1);
716
717 node0.Unblock();
718 WaitForIdle();
719
720 // Port C should have detected peer closure.
721 PortStatus status{};
722 EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
723 EXPECT_TRUE(status.peer_closed);
724
725 EXPECT_EQ(OK, node0.node().ClosePort(A));
726 EXPECT_EQ(OK, node1.node().ClosePort(B));
727 EXPECT_EQ(OK, node0.node().ClosePort(C));
728 EXPECT_EQ(OK, node1.node().ClosePort(E));
729
730 EXPECT_TRUE(node0.node().CanShutdownCleanly());
731 EXPECT_TRUE(node1.node().CanShutdownCleanly());
732 }
733
TEST_F(PortsTest,GetMessage1)734 TEST_F(PortsTest, GetMessage1) {
735 TestNode node(0);
736 AddNode(&node);
737
738 PortRef a0, a1;
739 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
740
741 ScopedMessage message;
742 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
743 EXPECT_FALSE(message);
744
745 EXPECT_EQ(OK, node.node().ClosePort(a1));
746
747 WaitForIdle();
748
749 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
750 node.node().GetMessage(a0, &message, nullptr));
751 EXPECT_FALSE(message);
752
753 EXPECT_EQ(OK, node.node().ClosePort(a0));
754
755 WaitForIdle();
756
757 EXPECT_TRUE(node.node().CanShutdownCleanly());
758 }
759
TEST_F(PortsTest,GetMessage2)760 TEST_F(PortsTest, GetMessage2) {
761 TestNode node(0);
762 AddNode(&node);
763
764 PortRef a0, a1;
765 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
766
767 EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
768
769 ScopedMessage message;
770 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
771
772 ASSERT_TRUE(message);
773 EXPECT_TRUE(MessageEquals(message, "1"));
774
775 EXPECT_EQ(OK, node.node().ClosePort(a0));
776 EXPECT_EQ(OK, node.node().ClosePort(a1));
777
778 EXPECT_TRUE(node.node().CanShutdownCleanly());
779 }
780
TEST_F(PortsTest,GetMessage3)781 TEST_F(PortsTest, GetMessage3) {
782 TestNode node(0);
783 AddNode(&node);
784
785 PortRef a0, a1;
786 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
787
788 const char* kStrings[] = {"1", "2", "3"};
789
790 for (auto& kString : kStrings) {
791 EXPECT_EQ(OK, node.SendStringMessage(a1, kString));
792 }
793
794 ScopedMessage message;
795 for (auto& kString : kStrings) {
796 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
797 ASSERT_TRUE(message);
798 EXPECT_TRUE(MessageEquals(message, kString));
799 }
800
801 EXPECT_EQ(OK, node.node().ClosePort(a0));
802 EXPECT_EQ(OK, node.node().ClosePort(a1));
803
804 EXPECT_TRUE(node.node().CanShutdownCleanly());
805 }
806
TEST_F(PortsTest,Delegation1)807 TEST_F(PortsTest, Delegation1) {
808 TestNode node0(0);
809 AddNode(&node0);
810
811 TestNode node1(1);
812 AddNode(&node1);
813
814 PortRef x0, x1;
815 CreatePortPair(&node0, &x0, &node1, &x1);
816
817 // In this test, we send a message to a port that has been moved.
818
819 PortRef a0, a1;
820 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
821 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
822 WaitForIdle();
823
824 ScopedMessage message;
825 ASSERT_TRUE(node1.ReadMessage(x1, &message));
826 ASSERT_EQ(1u, message->num_ports());
827 EXPECT_TRUE(MessageEquals(message, "a1"));
828
829 // This is "a1" from the point of view of node1.
830 PortName a2_name = message->ports()[0];
831 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
832 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
833
834 WaitForIdle();
835
836 ASSERT_TRUE(node0.ReadMessage(x0, &message));
837 ASSERT_EQ(1u, message->num_ports());
838 EXPECT_TRUE(MessageEquals(message, "a2"));
839
840 // This is "a2" from the point of view of node1.
841 PortName a3_name = message->ports()[0];
842
843 PortRef a3;
844 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
845
846 ASSERT_TRUE(node0.ReadMessage(a3, &message));
847 EXPECT_EQ(0u, message->num_ports());
848 EXPECT_TRUE(MessageEquals(message, "hello"));
849
850 EXPECT_EQ(OK, node0.node().ClosePort(a0));
851 EXPECT_EQ(OK, node0.node().ClosePort(a3));
852
853 EXPECT_EQ(OK, node0.node().ClosePort(x0));
854 EXPECT_EQ(OK, node1.node().ClosePort(x1));
855
856 EXPECT_TRUE(node0.node().CanShutdownCleanly());
857 EXPECT_TRUE(node1.node().CanShutdownCleanly());
858 }
859
TEST_F(PortsTest,Delegation2)860 TEST_F(PortsTest, Delegation2) {
861 TestNode node0(0);
862 AddNode(&node0);
863
864 TestNode node1(1);
865 AddNode(&node1);
866
867 for (int i = 0; i < 100; ++i) {
868 // Setup pipe a<->b between node0 and node1.
869 PortRef A, B;
870 CreatePortPair(&node0, &A, &node1, &B);
871
872 PortRef C, D;
873 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
874
875 PortRef E, F;
876 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
877
878 node1.set_save_messages(true);
879
880 // Pass D over A to B.
881 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
882
883 // Pass F over C to D.
884 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
885
886 // This message should find its way to node1.
887 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
888
889 WaitForIdle();
890
891 EXPECT_EQ(OK, node0.node().ClosePort(C));
892 EXPECT_EQ(OK, node0.node().ClosePort(E));
893
894 EXPECT_EQ(OK, node0.node().ClosePort(A));
895 EXPECT_EQ(OK, node1.node().ClosePort(B));
896
897 bool got_hello = false;
898 ScopedMessage message;
899 while (node1.GetSavedMessage(&message)) {
900 node1.ClosePortsInEvent(message.get());
901 if (MessageEquals(message, "hello")) {
902 got_hello = true;
903 break;
904 }
905 }
906
907 EXPECT_TRUE(got_hello);
908
909 WaitForIdle(); // Because closing ports may have generated tasks.
910 }
911
912 EXPECT_TRUE(node0.node().CanShutdownCleanly());
913 EXPECT_TRUE(node1.node().CanShutdownCleanly());
914 }
915
TEST_F(PortsTest,SendUninitialized)916 TEST_F(PortsTest, SendUninitialized) {
917 TestNode node(0);
918 AddNode(&node);
919
920 PortRef x0;
921 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
922 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
923 EXPECT_EQ(OK, node.node().ClosePort(x0));
924 EXPECT_TRUE(node.node().CanShutdownCleanly());
925 }
926
TEST_F(PortsTest,SendFailure)927 TEST_F(PortsTest, SendFailure) {
928 TestNode node(0);
929 AddNode(&node);
930
931 node.set_save_messages(true);
932
933 PortRef A, B;
934 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
935
936 // Try to send A over itself.
937
938 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
939 node.SendStringMessageWithPort(A, "oops", A));
940
941 // Try to send B over A.
942
943 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
944 node.SendStringMessageWithPort(A, "nope", B));
945
946 // B should be closed immediately.
947 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
948
949 WaitForIdle();
950
951 // There should have been no messages accepted.
952 ScopedMessage message;
953 EXPECT_FALSE(node.GetSavedMessage(&message));
954
955 EXPECT_EQ(OK, node.node().ClosePort(A));
956
957 WaitForIdle();
958
959 EXPECT_TRUE(node.node().CanShutdownCleanly());
960 }
961
TEST_F(PortsTest,DontLeakUnreceivedPorts)962 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
963 TestNode node(0);
964 AddNode(&node);
965
966 PortRef A, B, C, D;
967 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
968 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
969
970 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
971
972 EXPECT_EQ(OK, node.node().ClosePort(C));
973 EXPECT_EQ(OK, node.node().ClosePort(A));
974 EXPECT_EQ(OK, node.node().ClosePort(B));
975
976 WaitForIdle();
977
978 EXPECT_TRUE(node.node().CanShutdownCleanly());
979 }
980
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)981 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
982 TestNode node(0);
983 AddNode(&node);
984
985 PortRef A, B, C, D;
986 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
987 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
988
989 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
990
991 ScopedMessage message;
992 EXPECT_TRUE(node.ReadMessage(B, &message));
993 ASSERT_EQ(1u, message->num_ports());
994 EXPECT_TRUE(MessageEquals(message, "foo"));
995 PortRef E;
996 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
997
998 EXPECT_TRUE(
999 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1000
1001 WaitForIdle();
1002
1003 EXPECT_TRUE(
1004 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1005 EXPECT_FALSE(node.node().CanShutdownCleanly());
1006
1007 EXPECT_EQ(OK, node.node().ClosePort(A));
1008 EXPECT_EQ(OK, node.node().ClosePort(B));
1009 EXPECT_EQ(OK, node.node().ClosePort(C));
1010 EXPECT_EQ(OK, node.node().ClosePort(E));
1011
1012 WaitForIdle();
1013
1014 EXPECT_TRUE(node.node().CanShutdownCleanly());
1015 }
1016
TEST_F(PortsTest,ProxyCollapse1)1017 TEST_F(PortsTest, ProxyCollapse1) {
1018 TestNode node(0);
1019 AddNode(&node);
1020
1021 PortRef A, B;
1022 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1023
1024 PortRef X, Y;
1025 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1026
1027 ScopedMessage message;
1028
1029 // Send B and receive it as C.
1030 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1031 ASSERT_TRUE(node.ReadMessage(Y, &message));
1032 ASSERT_EQ(1u, message->num_ports());
1033 PortRef C;
1034 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1035
1036 // Send C and receive it as D.
1037 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1038 ASSERT_TRUE(node.ReadMessage(Y, &message));
1039 ASSERT_EQ(1u, message->num_ports());
1040 PortRef D;
1041 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1042
1043 // Send D and receive it as E.
1044 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1045 ASSERT_TRUE(node.ReadMessage(Y, &message));
1046 ASSERT_EQ(1u, message->num_ports());
1047 PortRef E;
1048 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1049
1050 EXPECT_EQ(OK, node.node().ClosePort(X));
1051 EXPECT_EQ(OK, node.node().ClosePort(Y));
1052
1053 EXPECT_EQ(OK, node.node().ClosePort(A));
1054 EXPECT_EQ(OK, node.node().ClosePort(E));
1055
1056 // The node should not idle until all proxies are collapsed.
1057 WaitForIdle();
1058
1059 EXPECT_TRUE(node.node().CanShutdownCleanly());
1060 }
1061
TEST_F(PortsTest,ProxyCollapse2)1062 TEST_F(PortsTest, ProxyCollapse2) {
1063 TestNode node(0);
1064 AddNode(&node);
1065
1066 PortRef A, B;
1067 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1068
1069 PortRef X, Y;
1070 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1071
1072 ScopedMessage message;
1073
1074 // Send B and A to create proxies in each direction.
1075 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1076 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1077
1078 EXPECT_EQ(OK, node.node().ClosePort(X));
1079 EXPECT_EQ(OK, node.node().ClosePort(Y));
1080
1081 // At this point we have a scenario with:
1082 //
1083 // D -> [B] -> C -> [A]
1084 //
1085 // Ensure that the proxies can collapse. The sent ports will be closed
1086 // eventually as a result of Y's closure.
1087
1088 WaitForIdle();
1089
1090 EXPECT_TRUE(node.node().CanShutdownCleanly());
1091 }
1092
TEST_F(PortsTest,SendWithClosedPeer)1093 TEST_F(PortsTest, SendWithClosedPeer) {
1094 // This tests that if a port is sent when its peer is already known to be
1095 // closed, the newly created port will be aware of that peer closure, and the
1096 // proxy will eventually collapse.
1097
1098 TestNode node(0);
1099 AddNode(&node);
1100
1101 // Send a message from A to B, then close A.
1102 PortRef A, B;
1103 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1104 EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1105 EXPECT_EQ(OK, node.node().ClosePort(A));
1106
1107 // Now send B over X-Y as new port C.
1108 PortRef X, Y;
1109 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1110 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1111 ScopedMessage message;
1112 ASSERT_TRUE(node.ReadMessage(Y, &message));
1113 ASSERT_EQ(1u, message->num_ports());
1114 PortRef C;
1115 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1116
1117 EXPECT_EQ(OK, node.node().ClosePort(X));
1118 EXPECT_EQ(OK, node.node().ClosePort(Y));
1119
1120 WaitForIdle();
1121
1122 // C should have received the message originally sent to B, and it should also
1123 // be aware of A's closure.
1124
1125 ASSERT_TRUE(node.ReadMessage(C, &message));
1126 EXPECT_TRUE(MessageEquals(message, "hey"));
1127
1128 PortStatus status{};
1129 EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1130 EXPECT_FALSE(status.receiving_messages);
1131 EXPECT_FALSE(status.has_messages);
1132 EXPECT_TRUE(status.peer_closed);
1133
1134 node.node().ClosePort(C);
1135
1136 WaitForIdle();
1137
1138 EXPECT_TRUE(node.node().CanShutdownCleanly());
1139 }
1140
TEST_F(PortsTest,SendWithClosedPeerSent)1141 TEST_F(PortsTest, SendWithClosedPeerSent) {
1142 // This tests that if a port is closed while some number of proxies are still
1143 // routing messages (directly or indirectly) to it, that the peer port is
1144 // eventually notified of the closure, and the dead-end proxies will
1145 // eventually be removed.
1146
1147 TestNode node(0);
1148 AddNode(&node);
1149
1150 PortRef X, Y;
1151 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1152
1153 PortRef A, B;
1154 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1155
1156 ScopedMessage message;
1157
1158 // Send A as new port C.
1159 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1160
1161 ASSERT_TRUE(node.ReadMessage(Y, &message));
1162 ASSERT_EQ(1u, message->num_ports());
1163 PortRef C;
1164 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1165
1166 // Send C as new port D.
1167 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1168
1169 ASSERT_TRUE(node.ReadMessage(Y, &message));
1170 ASSERT_EQ(1u, message->num_ports());
1171 PortRef D;
1172 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1173
1174 // Send a message to B through D, then close D.
1175 EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1176 EXPECT_EQ(OK, node.node().ClosePort(D));
1177
1178 // Now send B as new port E.
1179
1180 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1181 EXPECT_EQ(OK, node.node().ClosePort(X));
1182
1183 ASSERT_TRUE(node.ReadMessage(Y, &message));
1184 ASSERT_EQ(1u, message->num_ports());
1185 PortRef E;
1186 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1187
1188 EXPECT_EQ(OK, node.node().ClosePort(Y));
1189
1190 WaitForIdle();
1191
1192 // E should receive the message originally sent to B, and it should also be
1193 // aware of D's closure.
1194
1195 ASSERT_TRUE(node.ReadMessage(E, &message));
1196 EXPECT_TRUE(MessageEquals(message, "hey"));
1197
1198 PortStatus status{};
1199 EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1200 EXPECT_FALSE(status.receiving_messages);
1201 EXPECT_FALSE(status.has_messages);
1202 EXPECT_TRUE(status.peer_closed);
1203
1204 EXPECT_EQ(OK, node.node().ClosePort(E));
1205
1206 WaitForIdle();
1207
1208 EXPECT_TRUE(node.node().CanShutdownCleanly());
1209 }
1210
TEST_F(PortsTest,MergePorts)1211 TEST_F(PortsTest, MergePorts) {
1212 TestNode node0(0);
1213 AddNode(&node0);
1214
1215 TestNode node1(1);
1216 AddNode(&node1);
1217
1218 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1219 PortRef A, B, C, D;
1220 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1221 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1222
1223 // Write a message on A.
1224 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1225
1226 // Initiate a merge between B and C.
1227 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1228
1229 WaitForIdle();
1230
1231 // Expect all proxies to be gone once idle.
1232 EXPECT_TRUE(
1233 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1234 EXPECT_TRUE(
1235 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1236
1237 // Expect D to have received the message sent on A.
1238 ScopedMessage message;
1239 ASSERT_TRUE(node1.ReadMessage(D, &message));
1240 EXPECT_TRUE(MessageEquals(message, "hey"));
1241
1242 EXPECT_EQ(OK, node0.node().ClosePort(A));
1243 EXPECT_EQ(OK, node1.node().ClosePort(D));
1244
1245 // No more ports should be open.
1246 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1247 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1248 }
1249
TEST_F(PortsTest,MergePortWithClosedPeer1)1250 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1251 // This tests that the right thing happens when initiating a merge on a port
1252 // whose peer has already been closed.
1253
1254 TestNode node0(0);
1255 AddNode(&node0);
1256
1257 TestNode node1(1);
1258 AddNode(&node1);
1259
1260 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1261 PortRef A, B, C, D;
1262 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1263 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1264
1265 // Write a message on A.
1266 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1267
1268 // Close A.
1269 EXPECT_EQ(OK, node0.node().ClosePort(A));
1270
1271 // Initiate a merge between B and C.
1272 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1273
1274 WaitForIdle();
1275
1276 // Expect all proxies to be gone once idle. node0 should have no ports since
1277 // A was explicitly closed.
1278 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1279 EXPECT_TRUE(
1280 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1281
1282 // Expect D to have received the message sent on A.
1283 ScopedMessage message;
1284 ASSERT_TRUE(node1.ReadMessage(D, &message));
1285 EXPECT_TRUE(MessageEquals(message, "hey"));
1286
1287 EXPECT_EQ(OK, node1.node().ClosePort(D));
1288
1289 // No more ports should be open.
1290 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1291 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1292 }
1293
TEST_F(PortsTest,MergePortWithClosedPeer2)1294 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1295 // This tests that the right thing happens when merging into a port whose peer
1296 // has already been closed.
1297
1298 TestNode node0(0);
1299 AddNode(&node0);
1300
1301 TestNode node1(1);
1302 AddNode(&node1);
1303
1304 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1305 PortRef A, B, C, D;
1306 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1307 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1308
1309 // Write a message on D and close it.
1310 EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1311 EXPECT_EQ(OK, node1.node().ClosePort(D));
1312
1313 // Initiate a merge between B and C.
1314 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1315
1316 WaitForIdle();
1317
1318 // Expect all proxies to be gone once idle. node1 should have no ports since
1319 // D was explicitly closed.
1320 EXPECT_TRUE(
1321 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1322 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1323
1324 // Expect A to have received the message sent on D.
1325 ScopedMessage message;
1326 ASSERT_TRUE(node0.ReadMessage(A, &message));
1327 EXPECT_TRUE(MessageEquals(message, "hey"));
1328
1329 EXPECT_EQ(OK, node0.node().ClosePort(A));
1330
1331 // No more ports should be open.
1332 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1333 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1334 }
1335
TEST_F(PortsTest,MergePortsWithClosedPeers)1336 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1337 // This tests that no residual ports are left behind if two ports are merged
1338 // when both of their peers have been closed.
1339
1340 TestNode node0(0);
1341 AddNode(&node0);
1342
1343 TestNode node1(1);
1344 AddNode(&node1);
1345
1346 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1347 PortRef A, B, C, D;
1348 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1349 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1350
1351 // Close A and D.
1352 EXPECT_EQ(OK, node0.node().ClosePort(A));
1353 EXPECT_EQ(OK, node1.node().ClosePort(D));
1354
1355 WaitForIdle();
1356
1357 // Initiate a merge between B and C.
1358 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1359
1360 WaitForIdle();
1361
1362 // Expect everything to have gone away.
1363 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1364 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1365 }
1366
TEST_F(PortsTest,MergePortsWithMovedPeers)1367 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1368 // This tests that ports can be merged successfully even if their peers are
1369 // moved around.
1370
1371 TestNode node0(0);
1372 AddNode(&node0);
1373
1374 TestNode node1(1);
1375 AddNode(&node1);
1376
1377 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1378 PortRef A, B, C, D;
1379 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1380 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1381
1382 // Set up another pair X-Y for moving ports on node0.
1383 PortRef X, Y;
1384 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1385
1386 ScopedMessage message;
1387
1388 // Move A to new port E.
1389 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1390 ASSERT_TRUE(node0.ReadMessage(Y, &message));
1391 ASSERT_EQ(1u, message->num_ports());
1392 PortRef E;
1393 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1394
1395 EXPECT_EQ(OK, node0.node().ClosePort(X));
1396 EXPECT_EQ(OK, node0.node().ClosePort(Y));
1397
1398 // Write messages on E and D.
1399 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1400 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1401
1402 // Initiate a merge between B and C.
1403 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1404
1405 WaitForIdle();
1406
1407 // Expect to receive D's message on E and E's message on D.
1408 ASSERT_TRUE(node0.ReadMessage(E, &message));
1409 EXPECT_TRUE(MessageEquals(message, "hi"));
1410 ASSERT_TRUE(node1.ReadMessage(D, &message));
1411 EXPECT_TRUE(MessageEquals(message, "hey"));
1412
1413 // Close E and D.
1414 EXPECT_EQ(OK, node0.node().ClosePort(E));
1415 EXPECT_EQ(OK, node1.node().ClosePort(D));
1416
1417 WaitForIdle();
1418
1419 // Expect everything to have gone away.
1420 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1421 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1422 }
1423
TEST_F(PortsTest,MergePortsFailsGracefully)1424 TEST_F(PortsTest, MergePortsFailsGracefully) {
1425 // This tests that the system remains in a well-defined state if something
1426 // goes wrong during port merge.
1427
1428 TestNode node0(0);
1429 AddNode(&node0);
1430
1431 TestNode node1(1);
1432 AddNode(&node1);
1433
1434 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1435 PortRef A, B, C, D;
1436 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1437 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1438
1439 ScopedMessage message;
1440 PortRef X, Y;
1441 EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1442 EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1443 EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1444 EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1445
1446 // Block the merge from proceeding until we can do something stupid with port
1447 // C. This avoids the test logic racing with async merge logic.
1448 node1.BlockOnEvent(Event::Type::kMergePort);
1449
1450 // Initiate the merge between B and C.
1451 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1452
1453 // Move C to a new port E. This is not a sane use of Node's public API but
1454 // is still hypothetically possible. It allows us to force a merge failure
1455 // because C will be in an invalid state by the time the merge is processed.
1456 // As a result, B should be closed.
1457 EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1458
1459 node1.Unblock();
1460
1461 WaitForIdle();
1462
1463 ASSERT_TRUE(node0.ReadMessage(X, &message));
1464 ASSERT_EQ(1u, message->num_ports());
1465 PortRef E;
1466 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1467
1468 EXPECT_EQ(OK, node0.node().ClosePort(X));
1469 EXPECT_EQ(OK, node1.node().ClosePort(Y));
1470
1471 WaitForIdle();
1472
1473 // C goes away as a result of normal proxy removal. B should have been closed
1474 // cleanly by the failed MergePorts.
1475 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1476 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1477
1478 // Close A, D, and E.
1479 EXPECT_EQ(OK, node0.node().ClosePort(A));
1480 EXPECT_EQ(OK, node1.node().ClosePort(D));
1481 EXPECT_EQ(OK, node0.node().ClosePort(E));
1482
1483 WaitForIdle();
1484
1485 // Expect everything to have gone away.
1486 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1487 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1488 }
1489
TEST_F(PortsTest,RemotePeerStatus)1490 TEST_F(PortsTest, RemotePeerStatus) {
1491 TestNode node0(0);
1492 AddNode(&node0);
1493
1494 TestNode node1(1);
1495 AddNode(&node1);
1496
1497 // Create a local port pair. Neither port should appear to have a remote peer.
1498 PortRef a, b;
1499 PortStatus status{};
1500 node0.node().CreatePortPair(&a, &b);
1501 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1502 EXPECT_FALSE(status.peer_remote);
1503 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1504 EXPECT_FALSE(status.peer_remote);
1505
1506 // Create a port pair spanning the two nodes. Both spanning ports should
1507 // immediately appear to have a remote peer.
1508 PortRef x0, x1;
1509 CreatePortPair(&node0, &x0, &node1, &x1);
1510
1511 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1512 EXPECT_TRUE(status.peer_remote);
1513 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1514 EXPECT_TRUE(status.peer_remote);
1515
1516 PortRef x2, x3;
1517 CreatePortPair(&node0, &x2, &node1, &x3);
1518
1519 // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1520 // remote and the remote peers local.
1521 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1522 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1523 WaitForIdle();
1524
1525 ScopedMessage message;
1526 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1527 ASSERT_EQ(1u, message->num_ports());
1528 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1529
1530 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1531 ASSERT_EQ(1u, message->num_ports());
1532 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1533
1534 // Now x0-x1 should be local to node0 and a-b should span the nodes.
1535 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1536 EXPECT_FALSE(status.peer_remote);
1537 ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1538 EXPECT_FALSE(status.peer_remote);
1539 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1540 EXPECT_TRUE(status.peer_remote);
1541 ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1542 EXPECT_TRUE(status.peer_remote);
1543
1544 // And swap them back one more time.
1545 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1546 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1547 WaitForIdle();
1548
1549 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1550 ASSERT_EQ(1u, message->num_ports());
1551 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1552
1553 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1554 ASSERT_EQ(1u, message->num_ports());
1555 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1556
1557 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1558 EXPECT_TRUE(status.peer_remote);
1559 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1560 EXPECT_TRUE(status.peer_remote);
1561 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1562 EXPECT_FALSE(status.peer_remote);
1563 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1564 EXPECT_FALSE(status.peer_remote);
1565
1566 EXPECT_EQ(OK, node0.node().ClosePort(x0));
1567 EXPECT_EQ(OK, node1.node().ClosePort(x1));
1568 EXPECT_EQ(OK, node0.node().ClosePort(x2));
1569 EXPECT_EQ(OK, node1.node().ClosePort(x3));
1570 EXPECT_EQ(OK, node0.node().ClosePort(a));
1571 EXPECT_EQ(OK, node0.node().ClosePort(b));
1572
1573 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1574 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1575 }
1576
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1577 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1578 TestNode node0(0);
1579 AddNode(&node0);
1580
1581 TestNode node1(1);
1582 AddNode(&node1);
1583
1584 // Set up a-b on node0 and c-d spanning node0-node1.
1585 PortRef a, b, c, d;
1586 node0.node().CreatePortPair(&a, &b);
1587 CreatePortPair(&node0, &c, &node1, &d);
1588
1589 PortStatus status{};
1590 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1591 EXPECT_FALSE(status.peer_remote);
1592 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1593 EXPECT_FALSE(status.peer_remote);
1594 ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1595 EXPECT_TRUE(status.peer_remote);
1596 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1597 EXPECT_TRUE(status.peer_remote);
1598
1599 EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1600 WaitForIdle();
1601
1602 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1603 EXPECT_TRUE(status.peer_remote);
1604 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1605 EXPECT_TRUE(status.peer_remote);
1606
1607 EXPECT_EQ(OK, node0.node().ClosePort(a));
1608 EXPECT_EQ(OK, node1.node().ClosePort(d));
1609 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1610 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1611 }
1612
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1613 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1614 TestNode node0(0);
1615 AddNode(&node0);
1616
1617 TestNode node1(1);
1618 AddNode(&node1);
1619
1620 // Set up a-b on node0 and c-d on node1.
1621 PortRef a, b, c, d;
1622 node0.node().CreatePortPair(&a, &b);
1623 node1.node().CreatePortPair(&c, &d);
1624
1625 PortStatus status{};
1626 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1627 EXPECT_FALSE(status.peer_remote);
1628 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1629 EXPECT_FALSE(status.peer_remote);
1630 ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1631 EXPECT_FALSE(status.peer_remote);
1632 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1633 EXPECT_FALSE(status.peer_remote);
1634
1635 EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1636 WaitForIdle();
1637
1638 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1639 EXPECT_TRUE(status.peer_remote);
1640 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1641 EXPECT_TRUE(status.peer_remote);
1642
1643 EXPECT_EQ(OK, node0.node().ClosePort(a));
1644 EXPECT_EQ(OK, node1.node().ClosePort(d));
1645 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1646 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1647 }
1648
TEST_F(PortsTest,RetransmitUserMessageEvents)1649 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1650 // Ensures that user message events can be retransmitted properly.
1651 TestNode node0(0);
1652 AddNode(&node0);
1653
1654 PortRef a, b;
1655 node0.node().CreatePortPair(&a, &b);
1656
1657 // Ping.
1658 const char* kMessage = "hey";
1659 ScopedMessage message;
1660 EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1661 ASSERT_TRUE(node0.ReadMessage(b, &message));
1662 EXPECT_TRUE(MessageEquals(message, kMessage));
1663
1664 // Pong.
1665 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1666 EXPECT_FALSE(message);
1667 ASSERT_TRUE(node0.ReadMessage(a, &message));
1668 EXPECT_TRUE(MessageEquals(message, kMessage));
1669
1670 // Ping again.
1671 EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1672 EXPECT_FALSE(message);
1673 ASSERT_TRUE(node0.ReadMessage(b, &message));
1674 EXPECT_TRUE(MessageEquals(message, kMessage));
1675
1676 // Pong again!
1677 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1678 EXPECT_FALSE(message);
1679 ASSERT_TRUE(node0.ReadMessage(a, &message));
1680 EXPECT_TRUE(MessageEquals(message, kMessage));
1681
1682 EXPECT_EQ(OK, node0.node().ClosePort(a));
1683 EXPECT_EQ(OK, node0.node().ClosePort(b));
1684 }
1685
TEST_F(PortsTest,SetAcknowledgeRequestInterval)1686 TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
1687 TestNode node0(0);
1688 AddNode(&node0);
1689
1690 PortRef a0, a1;
1691 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
1692 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1693
1694 // Send a batch of messages.
1695 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
1696 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1697 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1698 WaitForIdle();
1699 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1700
1701 // Set to acknowledge every read message, and validate that already-read
1702 // messages are acknowledged.
1703 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
1704 WaitForIdle();
1705 EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
1706
1707 // Read a third of the messages from the other end.
1708 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1709 WaitForIdle();
1710
1711 EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
1712
1713 TestNode node1(1);
1714 AddNode(&node1);
1715
1716 // Transfer a1 across to node1.
1717 PortRef x0, x1;
1718 CreatePortPair(&node0, &x0, &node1, &x1);
1719 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
1720 WaitForIdle();
1721
1722 ScopedMessage message;
1723 ASSERT_TRUE(node1.ReadMessage(x1, &message));
1724 ASSERT_EQ(1u, message->num_ports());
1725 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
1726
1727 // Read the last third of the messages from the transferred node, and
1728 // validate that the unacknowledge message count updates correctly.
1729 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
1730 WaitForIdle();
1731 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1732
1733 // Turn the acknowledges down and validate that they don't go on indefinitely.
1734 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
1735 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
1736 WaitForIdle();
1737 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
1738 WaitForIdle();
1739 EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
1740
1741 // Close the far port and validate that the closure updates the unacknowledged
1742 // count.
1743 EXPECT_EQ(OK, node1.node().ClosePort(a1));
1744 WaitForIdle();
1745 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1746
1747 EXPECT_EQ(OK, node0.node().ClosePort(a0));
1748 EXPECT_EQ(OK, node0.node().ClosePort(x0));
1749 EXPECT_EQ(OK, node1.node().ClosePort(x1));
1750 }
1751
1752 } // namespace test
1753 } // namespace ports
1754 } // namespace core
1755 } // namespace mojo
1756