1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 #include <map>
11 #include <sstream>
12 #include <utility>
13
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/containers/queue.h"
17 #include "base/logging.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_piece.h"
20 #include "base/strings/stringprintf.h"
21 #include "base/synchronization/lock.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/test/task_environment.h"
24 #include "base/threading/thread.h"
25 #include "mojo/core/ports/event.h"
26 #include "mojo/core/ports/node.h"
27 #include "mojo/core/ports/node_delegate.h"
28 #include "mojo/core/ports/user_message.h"
29 #include "testing/gtest/include/gtest/gtest.h"
30
31 namespace mojo {
32 namespace core {
33 namespace ports {
34 namespace test {
35
36 namespace {
37
38 // TODO(rockot): Remove this unnecessary alias.
39 using ScopedMessage = std::unique_ptr<UserMessageEvent>;
40
41 class TestMessage : public UserMessage {
42 public:
43 static const TypeInfo kUserMessageTypeInfo;
44
TestMessage(const base::StringPiece & payload)45 TestMessage(const base::StringPiece& payload)
46 : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
~TestMessage()47 ~TestMessage() override {}
48
payload() const49 const std::string& payload() const { return payload_; }
50
51 private:
52 std::string payload_;
53 };
54
55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
56
NewUserMessageEvent(const base::StringPiece & payload,size_t num_ports)57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload,
58 size_t num_ports) {
59 auto event = std::make_unique<UserMessageEvent>(num_ports);
60 event->AttachMessage(std::make_unique<TestMessage>(payload));
61 return event;
62 }
63
MessageEquals(const ScopedMessage & message,const base::StringPiece & s)64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
65 return message->GetMessage<TestMessage>()->payload() == s;
66 }
67
68 class TestNode;
69
70 class MessageRouter {
71 public:
~MessageRouter()72 virtual ~MessageRouter() {}
73
74 virtual void ForwardEvent(TestNode* from_node,
75 const NodeName& node_name,
76 ScopedEvent event) = 0;
77 virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
78 };
79
80 class TestNode : public NodeDelegate {
81 public:
TestNode(uint64_t id)82 explicit TestNode(uint64_t id)
83 : node_name_(id, 1),
84 node_(node_name_, this),
85 node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
86 events_available_event_(
87 base::WaitableEvent::ResetPolicy::AUTOMATIC,
88 base::WaitableEvent::InitialState::NOT_SIGNALED),
89 idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
90 base::WaitableEvent::InitialState::SIGNALED) {}
91
~TestNode()92 ~TestNode() override {
93 StopWhenIdle();
94 node_thread_.Stop();
95 }
96
name() const97 const NodeName& name() const { return node_name_; }
98
99 // NOTE: Node is thread-safe.
node()100 Node& node() { return node_; }
101
idle_event()102 base::WaitableEvent& idle_event() { return idle_event_; }
103
IsIdle()104 bool IsIdle() {
105 base::AutoLock lock(lock_);
106 return started_ && !dispatching_ &&
107 (incoming_events_.empty() || (block_on_event_ && blocked_));
108 }
109
BlockOnEvent(Event::Type type)110 void BlockOnEvent(Event::Type type) {
111 base::AutoLock lock(lock_);
112 blocked_event_type_ = type;
113 block_on_event_ = true;
114 }
115
Unblock()116 void Unblock() {
117 base::AutoLock lock(lock_);
118 block_on_event_ = false;
119 events_available_event_.Signal();
120 }
121
Start(MessageRouter * router)122 void Start(MessageRouter* router) {
123 router_ = router;
124 node_thread_.Start();
125 node_thread_.task_runner()->PostTask(
126 FROM_HERE,
127 base::BindOnce(&TestNode::ProcessEvents, base::Unretained(this)));
128 }
129
StopWhenIdle()130 void StopWhenIdle() {
131 base::AutoLock lock(lock_);
132 should_quit_ = true;
133 events_available_event_.Signal();
134 }
135
WakeUp()136 void WakeUp() { events_available_event_.Signal(); }
137
SendStringMessage(const PortRef & port,const std::string & s)138 int SendStringMessage(const PortRef& port, const std::string& s) {
139 return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
140 }
141
SendMultipleMessages(const PortRef & port,size_t num_messages)142 int SendMultipleMessages(const PortRef& port, size_t num_messages) {
143 for (size_t i = 0; i < num_messages; ++i) {
144 int result = SendStringMessage(port, "");
145 if (result != OK)
146 return result;
147 }
148 return OK;
149 }
150
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)151 int SendStringMessageWithPort(const PortRef& port,
152 const std::string& s,
153 const PortName& sent_port_name) {
154 auto event = NewUserMessageEvent(s, 1);
155 event->ports()[0] = sent_port_name;
156 return node_.SendUserMessage(port, std::move(event));
157 }
158
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)159 int SendStringMessageWithPort(const PortRef& port,
160 const std::string& s,
161 const PortRef& sent_port) {
162 return SendStringMessageWithPort(port, s, sent_port.name());
163 }
164
set_drop_messages(bool value)165 void set_drop_messages(bool value) {
166 base::AutoLock lock(lock_);
167 drop_messages_ = value;
168 }
169
set_save_messages(bool value)170 void set_save_messages(bool value) {
171 base::AutoLock lock(lock_);
172 save_messages_ = value;
173 }
174
ReadMessage(const PortRef & port,ScopedMessage * message)175 bool ReadMessage(const PortRef& port, ScopedMessage* message) {
176 return node_.GetMessage(port, message, nullptr) == OK && *message;
177 }
178
ReadMultipleMessages(const PortRef & port,size_t num_messages)179 bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
180 for (size_t i = 0; i < num_messages; ++i) {
181 ScopedMessage message;
182 if (!ReadMessage(port, &message))
183 return false;
184 }
185 return true;
186 }
187
GetSavedMessage(ScopedMessage * message)188 bool GetSavedMessage(ScopedMessage* message) {
189 base::AutoLock lock(lock_);
190 if (saved_messages_.empty()) {
191 message->reset();
192 return false;
193 }
194 std::swap(*message, saved_messages_.front());
195 saved_messages_.pop();
196 return true;
197 }
198
EnqueueEvent(ScopedEvent event)199 void EnqueueEvent(ScopedEvent event) {
200 idle_event_.Reset();
201
202 // NOTE: This may be called from ForwardMessage and thus must not reenter
203 // |node_|.
204 base::AutoLock lock(lock_);
205 incoming_events_.emplace(std::move(event));
206 events_available_event_.Signal();
207 }
208
ForwardEvent(const NodeName & node_name,ScopedEvent event)209 void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
210 {
211 base::AutoLock lock(lock_);
212 if (drop_messages_) {
213 DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
214 << node_name;
215
216 base::AutoUnlock unlock(lock_);
217 ClosePortsInEvent(event.get());
218 return;
219 }
220 }
221
222 DCHECK(router_);
223 DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
224 router_->ForwardEvent(this, node_name, std::move(event));
225 }
226
BroadcastEvent(ScopedEvent event)227 void BroadcastEvent(ScopedEvent event) override {
228 router_->BroadcastEvent(this, std::move(event));
229 }
230
PortStatusChanged(const PortRef & port)231 void PortStatusChanged(const PortRef& port) override {
232 // The port may be closed, in which case we ignore the notification.
233 base::AutoLock lock(lock_);
234 if (!save_messages_)
235 return;
236
237 for (;;) {
238 ScopedMessage message;
239 {
240 base::AutoUnlock unlock(lock_);
241 if (!ReadMessage(port, &message))
242 break;
243 }
244
245 saved_messages_.emplace(std::move(message));
246 }
247 }
248
ClosePortsInEvent(Event * event)249 void ClosePortsInEvent(Event* event) {
250 if (event->type() != Event::Type::kUserMessage)
251 return;
252
253 UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
254 for (size_t i = 0; i < message_event->num_ports(); ++i) {
255 PortRef port;
256 ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
257 EXPECT_EQ(OK, node_.ClosePort(port));
258 }
259 }
260
GetUnacknowledgedMessageCount(const PortRef & port_ref)261 uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
262 PortStatus status;
263 if (node_.GetStatus(port_ref, &status) != OK)
264 return 0;
265
266 return status.unacknowledged_message_count;
267 }
268
269 private:
ProcessEvents()270 void ProcessEvents() {
271 for (;;) {
272 events_available_event_.Wait();
273 base::AutoLock lock(lock_);
274
275 if (should_quit_)
276 return;
277
278 dispatching_ = true;
279 while (!incoming_events_.empty()) {
280 if (block_on_event_ &&
281 incoming_events_.front()->type() == blocked_event_type_) {
282 blocked_ = true;
283 // Go idle if we hit a blocked event type.
284 break;
285 } else {
286 blocked_ = false;
287 }
288 ScopedEvent event = std::move(incoming_events_.front());
289 incoming_events_.pop();
290
291 // NOTE: AcceptMessage() can re-enter this object to call any of the
292 // NodeDelegate interface methods.
293 base::AutoUnlock unlock(lock_);
294 node_.AcceptEvent(std::move(event));
295 }
296
297 dispatching_ = false;
298 started_ = true;
299 idle_event_.Signal();
300 };
301 }
302
303 const NodeName node_name_;
304 Node node_;
305 MessageRouter* router_ = nullptr;
306
307 base::Thread node_thread_;
308 base::WaitableEvent events_available_event_;
309 base::WaitableEvent idle_event_;
310
311 // Guards fields below.
312 base::Lock lock_;
313 bool started_ = false;
314 bool dispatching_ = false;
315 bool should_quit_ = false;
316 bool drop_messages_ = false;
317 bool save_messages_ = false;
318 bool blocked_ = false;
319 bool block_on_event_ = false;
320 Event::Type blocked_event_type_;
321 base::queue<ScopedEvent> incoming_events_;
322 base::queue<ScopedMessage> saved_messages_;
323 };
324
325 class PortsTest : public testing::Test, public MessageRouter {
326 public:
AddNode(TestNode * node)327 void AddNode(TestNode* node) {
328 {
329 base::AutoLock lock(lock_);
330 nodes_[node->name()] = node;
331 }
332 node->Start(this);
333 }
334
RemoveNode(TestNode * node)335 void RemoveNode(TestNode* node) {
336 {
337 base::AutoLock lock(lock_);
338 nodes_.erase(node->name());
339 }
340
341 for (const auto& entry : nodes_)
342 entry.second->node().LostConnectionToNode(node->name());
343 }
344
345 // Waits until all known Nodes are idle. Message forwarding and processing
346 // is handled in such a way that idleness is a stable state: once all nodes in
347 // the system are idle, they will remain idle until the test explicitly
348 // initiates some further event (e.g. sending a message, closing a port, or
349 // removing a Node).
WaitForIdle()350 void WaitForIdle() {
351 for (;;) {
352 base::AutoLock global_lock(global_lock_);
353 bool all_nodes_idle = true;
354 for (const auto& entry : nodes_) {
355 if (!entry.second->IsIdle())
356 all_nodes_idle = false;
357 entry.second->WakeUp();
358 }
359 if (all_nodes_idle)
360 return;
361
362 // Wait for any Node to signal that it's idle.
363 base::AutoUnlock global_unlock(global_lock_);
364 std::vector<base::WaitableEvent*> events;
365 for (const auto& entry : nodes_)
366 events.push_back(&entry.second->idle_event());
367 base::WaitableEvent::WaitMany(events.data(), events.size());
368 }
369 }
370
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)371 void CreatePortPair(TestNode* node0,
372 PortRef* port0,
373 TestNode* node1,
374 PortRef* port1) {
375 if (node0 == node1) {
376 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
377 } else {
378 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
379 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
380 EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
381 port1->name()));
382 EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
383 port0->name()));
384 }
385 }
386
387 private:
388 // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)389 void ForwardEvent(TestNode* from_node,
390 const NodeName& node_name,
391 ScopedEvent event) override {
392 base::AutoLock global_lock(global_lock_);
393 base::AutoLock lock(lock_);
394 // Drop messages from nodes that have been removed.
395 if (nodes_.find(from_node->name()) == nodes_.end()) {
396 from_node->ClosePortsInEvent(event.get());
397 return;
398 }
399
400 auto it = nodes_.find(node_name);
401 if (it == nodes_.end()) {
402 DVLOG(1) << "Node not found: " << node_name;
403 return;
404 }
405
406 // Serialize and de-serialize all forwarded events.
407 size_t buf_size = event->GetSerializedSize();
408 std::unique_ptr<char[]> buf(new char[buf_size]);
409 event->Serialize(buf.get());
410 ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
411 // This should always succeed unless serialization or deserialization
412 // is broken. In that case, the loss of events should cause a test failure.
413 ASSERT_TRUE(copy);
414
415 // Also copy the payload for user messages.
416 if (event->type() == Event::Type::kUserMessage) {
417 UserMessageEvent* message_event =
418 static_cast<UserMessageEvent*>(event.get());
419 UserMessageEvent* message_copy =
420 static_cast<UserMessageEvent*>(copy.get());
421
422 message_copy->AttachMessage(std::make_unique<TestMessage>(
423 message_event->GetMessage<TestMessage>()->payload()));
424 }
425
426 it->second->EnqueueEvent(std::move(event));
427 }
428
BroadcastEvent(TestNode * from_node,ScopedEvent event)429 void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
430 base::AutoLock global_lock(global_lock_);
431 base::AutoLock lock(lock_);
432
433 // Drop messages from nodes that have been removed.
434 if (nodes_.find(from_node->name()) == nodes_.end())
435 return;
436
437 for (const auto& entry : nodes_) {
438 TestNode* node = entry.second;
439 // Broadcast doesn't deliver to the local node.
440 if (node == from_node)
441 continue;
442 node->EnqueueEvent(event->Clone());
443 }
444 }
445
446 base::test::TaskEnvironment task_environment_;
447
448 // Acquired before any operation which makes a Node busy, and before testing
449 // if all nodes are idle.
450 base::Lock global_lock_;
451
452 base::Lock lock_;
453 std::map<NodeName, TestNode*> nodes_;
454 };
455
456 } // namespace
457
TEST_F(PortsTest,Basic1)458 TEST_F(PortsTest, Basic1) {
459 TestNode node0(0);
460 AddNode(&node0);
461
462 TestNode node1(1);
463 AddNode(&node1);
464
465 PortRef x0, x1;
466 CreatePortPair(&node0, &x0, &node1, &x1);
467
468 PortRef a0, a1;
469 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
470 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
471 EXPECT_EQ(OK, node0.node().ClosePort(a0));
472
473 EXPECT_EQ(OK, node0.node().ClosePort(x0));
474 EXPECT_EQ(OK, node1.node().ClosePort(x1));
475
476 WaitForIdle();
477
478 EXPECT_TRUE(node0.node().CanShutdownCleanly());
479 EXPECT_TRUE(node1.node().CanShutdownCleanly());
480 }
481
TEST_F(PortsTest,Basic2)482 TEST_F(PortsTest, Basic2) {
483 TestNode node0(0);
484 AddNode(&node0);
485
486 TestNode node1(1);
487 AddNode(&node1);
488
489 PortRef x0, x1;
490 CreatePortPair(&node0, &x0, &node1, &x1);
491
492 PortRef b0, b1;
493 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
494 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
495 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
496
497 EXPECT_EQ(OK, node0.node().ClosePort(b0));
498
499 EXPECT_EQ(OK, node0.node().ClosePort(x0));
500 EXPECT_EQ(OK, node1.node().ClosePort(x1));
501
502 WaitForIdle();
503
504 EXPECT_TRUE(node0.node().CanShutdownCleanly());
505 EXPECT_TRUE(node1.node().CanShutdownCleanly());
506 }
507
TEST_F(PortsTest,Basic3)508 TEST_F(PortsTest, Basic3) {
509 TestNode node0(0);
510 AddNode(&node0);
511
512 TestNode node1(1);
513 AddNode(&node1);
514
515 PortRef x0, x1;
516 CreatePortPair(&node0, &x0, &node1, &x1);
517
518 PortRef a0, a1;
519 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
520
521 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
522 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
523
524 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
525
526 PortRef b0, b1;
527 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
528 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
529 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
530
531 EXPECT_EQ(OK, node0.node().ClosePort(b0));
532
533 EXPECT_EQ(OK, node0.node().ClosePort(x0));
534 EXPECT_EQ(OK, node1.node().ClosePort(x1));
535
536 WaitForIdle();
537
538 EXPECT_TRUE(node0.node().CanShutdownCleanly());
539 EXPECT_TRUE(node1.node().CanShutdownCleanly());
540 }
541
TEST_F(PortsTest,LostConnectionToNode1)542 TEST_F(PortsTest, LostConnectionToNode1) {
543 TestNode node0(0);
544 AddNode(&node0);
545
546 TestNode node1(1);
547 AddNode(&node1);
548 node1.set_drop_messages(true);
549
550 PortRef x0, x1;
551 CreatePortPair(&node0, &x0, &node1, &x1);
552
553 // Transfer a port to node1 and simulate a lost connection to node1.
554
555 PortRef a0, a1;
556 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
557 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
558
559 WaitForIdle();
560
561 RemoveNode(&node1);
562
563 WaitForIdle();
564
565 EXPECT_EQ(OK, node0.node().ClosePort(a0));
566 EXPECT_EQ(OK, node0.node().ClosePort(x0));
567 EXPECT_EQ(OK, node1.node().ClosePort(x1));
568
569 WaitForIdle();
570
571 EXPECT_TRUE(node0.node().CanShutdownCleanly());
572 EXPECT_TRUE(node1.node().CanShutdownCleanly());
573 }
574
TEST_F(PortsTest,LostConnectionToNode2)575 TEST_F(PortsTest, LostConnectionToNode2) {
576 TestNode node0(0);
577 AddNode(&node0);
578
579 TestNode node1(1);
580 AddNode(&node1);
581
582 PortRef x0, x1;
583 CreatePortPair(&node0, &x0, &node1, &x1);
584
585 PortRef a0, a1;
586 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
587 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
588
589 WaitForIdle();
590
591 node1.set_drop_messages(true);
592
593 RemoveNode(&node1);
594
595 WaitForIdle();
596
597 // a0 should have eventually detected peer closure after node loss.
598 ScopedMessage message;
599 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
600 node0.node().GetMessage(a0, &message, nullptr));
601 EXPECT_FALSE(message);
602
603 EXPECT_EQ(OK, node0.node().ClosePort(a0));
604
605 EXPECT_EQ(OK, node0.node().ClosePort(x0));
606
607 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
608 EXPECT_TRUE(message);
609 node1.ClosePortsInEvent(message.get());
610
611 EXPECT_EQ(OK, node1.node().ClosePort(x1));
612
613 WaitForIdle();
614
615 EXPECT_TRUE(node0.node().CanShutdownCleanly());
616 EXPECT_TRUE(node1.node().CanShutdownCleanly());
617 }
618
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)619 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
620 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
621 // node.
622
623 TestNode node0(0);
624 AddNode(&node0);
625
626 TestNode node1(1);
627 AddNode(&node1);
628
629 TestNode node2(2);
630 AddNode(&node2);
631
632 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
633 PortRef A, B, C, D;
634 CreatePortPair(&node0, &A, &node1, &B);
635 CreatePortPair(&node1, &C, &node2, &D);
636
637 // Create E-F and send F over A to node 1.
638 PortRef E, F;
639 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
640 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
641
642 WaitForIdle();
643
644 ScopedMessage message;
645 ASSERT_TRUE(node1.ReadMessage(B, &message));
646 ASSERT_EQ(1u, message->num_ports());
647
648 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
649
650 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
651 // will trivially become aware of the loss, and this test verifies that the
652 // port A on node 0 will eventually also become aware of it.
653
654 // Make sure node2 stops processing events when it encounters an ObserveProxy.
655 node2.BlockOnEvent(Event::Type::kObserveProxy);
656
657 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
658 WaitForIdle();
659
660 // Simulate node 1 and 2 disconnecting.
661 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
662
663 // Let node2 continue processing events and wait for everyone to go idle.
664 node2.Unblock();
665 WaitForIdle();
666
667 // Port F should be gone.
668 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
669
670 // Port E should have detected peer closure despite the fact that there is
671 // no longer a continuous route from F to E over which the event could travel.
672 PortStatus status;
673 EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
674 EXPECT_TRUE(status.peer_closed);
675
676 EXPECT_EQ(OK, node0.node().ClosePort(A));
677 EXPECT_EQ(OK, node1.node().ClosePort(B));
678 EXPECT_EQ(OK, node1.node().ClosePort(C));
679 EXPECT_EQ(OK, node0.node().ClosePort(E));
680
681 WaitForIdle();
682
683 EXPECT_TRUE(node0.node().CanShutdownCleanly());
684 EXPECT_TRUE(node1.node().CanShutdownCleanly());
685 }
686
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)687 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
688 // Tests that a proxy gets cleaned up when its direct peer lives on a lost
689 // node and it's predecessor lives on the same node.
690
691 TestNode node0(0);
692 AddNode(&node0);
693
694 TestNode node1(1);
695 AddNode(&node1);
696
697 PortRef A, B;
698 CreatePortPair(&node0, &A, &node1, &B);
699
700 PortRef C, D;
701 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
702
703 // Send D but block node0 on an ObserveProxy event.
704 node0.BlockOnEvent(Event::Type::kObserveProxy);
705 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
706
707 // node0 won't collapse the proxy but node1 will receive the message before
708 // going idle.
709 WaitForIdle();
710
711 ScopedMessage message;
712 ASSERT_TRUE(node1.ReadMessage(B, &message));
713 ASSERT_EQ(1u, message->num_ports());
714 PortRef E;
715 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
716
717 RemoveNode(&node1);
718
719 node0.Unblock();
720 WaitForIdle();
721
722 // Port C should have detected peer closure.
723 PortStatus status;
724 EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
725 EXPECT_TRUE(status.peer_closed);
726
727 EXPECT_EQ(OK, node0.node().ClosePort(A));
728 EXPECT_EQ(OK, node1.node().ClosePort(B));
729 EXPECT_EQ(OK, node0.node().ClosePort(C));
730 EXPECT_EQ(OK, node1.node().ClosePort(E));
731
732 EXPECT_TRUE(node0.node().CanShutdownCleanly());
733 EXPECT_TRUE(node1.node().CanShutdownCleanly());
734 }
735
TEST_F(PortsTest,GetMessage1)736 TEST_F(PortsTest, GetMessage1) {
737 TestNode node(0);
738 AddNode(&node);
739
740 PortRef a0, a1;
741 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
742
743 ScopedMessage message;
744 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
745 EXPECT_FALSE(message);
746
747 EXPECT_EQ(OK, node.node().ClosePort(a1));
748
749 WaitForIdle();
750
751 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
752 node.node().GetMessage(a0, &message, nullptr));
753 EXPECT_FALSE(message);
754
755 EXPECT_EQ(OK, node.node().ClosePort(a0));
756
757 WaitForIdle();
758
759 EXPECT_TRUE(node.node().CanShutdownCleanly());
760 }
761
TEST_F(PortsTest,GetMessage2)762 TEST_F(PortsTest, GetMessage2) {
763 TestNode node(0);
764 AddNode(&node);
765
766 PortRef a0, a1;
767 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
768
769 EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
770
771 ScopedMessage message;
772 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
773
774 ASSERT_TRUE(message);
775 EXPECT_TRUE(MessageEquals(message, "1"));
776
777 EXPECT_EQ(OK, node.node().ClosePort(a0));
778 EXPECT_EQ(OK, node.node().ClosePort(a1));
779
780 EXPECT_TRUE(node.node().CanShutdownCleanly());
781 }
782
TEST_F(PortsTest,GetMessage3)783 TEST_F(PortsTest, GetMessage3) {
784 TestNode node(0);
785 AddNode(&node);
786
787 PortRef a0, a1;
788 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
789
790 const char* kStrings[] = {"1", "2", "3"};
791
792 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
793 EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
794
795 ScopedMessage message;
796 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
797 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
798 ASSERT_TRUE(message);
799 EXPECT_TRUE(MessageEquals(message, kStrings[i]));
800 }
801
802 EXPECT_EQ(OK, node.node().ClosePort(a0));
803 EXPECT_EQ(OK, node.node().ClosePort(a1));
804
805 EXPECT_TRUE(node.node().CanShutdownCleanly());
806 }
807
TEST_F(PortsTest,Delegation1)808 TEST_F(PortsTest, Delegation1) {
809 TestNode node0(0);
810 AddNode(&node0);
811
812 TestNode node1(1);
813 AddNode(&node1);
814
815 PortRef x0, x1;
816 CreatePortPair(&node0, &x0, &node1, &x1);
817
818 // In this test, we send a message to a port that has been moved.
819
820 PortRef a0, a1;
821 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
822 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
823 WaitForIdle();
824
825 ScopedMessage message;
826 ASSERT_TRUE(node1.ReadMessage(x1, &message));
827 ASSERT_EQ(1u, message->num_ports());
828 EXPECT_TRUE(MessageEquals(message, "a1"));
829
830 // This is "a1" from the point of view of node1.
831 PortName a2_name = message->ports()[0];
832 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
833 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
834
835 WaitForIdle();
836
837 ASSERT_TRUE(node0.ReadMessage(x0, &message));
838 ASSERT_EQ(1u, message->num_ports());
839 EXPECT_TRUE(MessageEquals(message, "a2"));
840
841 // This is "a2" from the point of view of node1.
842 PortName a3_name = message->ports()[0];
843
844 PortRef a3;
845 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
846
847 ASSERT_TRUE(node0.ReadMessage(a3, &message));
848 EXPECT_EQ(0u, message->num_ports());
849 EXPECT_TRUE(MessageEquals(message, "hello"));
850
851 EXPECT_EQ(OK, node0.node().ClosePort(a0));
852 EXPECT_EQ(OK, node0.node().ClosePort(a3));
853
854 EXPECT_EQ(OK, node0.node().ClosePort(x0));
855 EXPECT_EQ(OK, node1.node().ClosePort(x1));
856
857 EXPECT_TRUE(node0.node().CanShutdownCleanly());
858 EXPECT_TRUE(node1.node().CanShutdownCleanly());
859 }
860
TEST_F(PortsTest,Delegation2)861 TEST_F(PortsTest, Delegation2) {
862 TestNode node0(0);
863 AddNode(&node0);
864
865 TestNode node1(1);
866 AddNode(&node1);
867
868 for (int i = 0; i < 100; ++i) {
869 // Setup pipe a<->b between node0 and node1.
870 PortRef A, B;
871 CreatePortPair(&node0, &A, &node1, &B);
872
873 PortRef C, D;
874 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
875
876 PortRef E, F;
877 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
878
879 node1.set_save_messages(true);
880
881 // Pass D over A to B.
882 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
883
884 // Pass F over C to D.
885 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
886
887 // This message should find its way to node1.
888 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
889
890 WaitForIdle();
891
892 EXPECT_EQ(OK, node0.node().ClosePort(C));
893 EXPECT_EQ(OK, node0.node().ClosePort(E));
894
895 EXPECT_EQ(OK, node0.node().ClosePort(A));
896 EXPECT_EQ(OK, node1.node().ClosePort(B));
897
898 bool got_hello = false;
899 ScopedMessage message;
900 while (node1.GetSavedMessage(&message)) {
901 node1.ClosePortsInEvent(message.get());
902 if (MessageEquals(message, "hello")) {
903 got_hello = true;
904 break;
905 }
906 }
907
908 EXPECT_TRUE(got_hello);
909
910 WaitForIdle(); // Because closing ports may have generated tasks.
911 }
912
913 EXPECT_TRUE(node0.node().CanShutdownCleanly());
914 EXPECT_TRUE(node1.node().CanShutdownCleanly());
915 }
916
TEST_F(PortsTest,SendUninitialized)917 TEST_F(PortsTest, SendUninitialized) {
918 TestNode node(0);
919 AddNode(&node);
920
921 PortRef x0;
922 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
923 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
924 EXPECT_EQ(OK, node.node().ClosePort(x0));
925 EXPECT_TRUE(node.node().CanShutdownCleanly());
926 }
927
TEST_F(PortsTest,SendFailure)928 TEST_F(PortsTest, SendFailure) {
929 TestNode node(0);
930 AddNode(&node);
931
932 node.set_save_messages(true);
933
934 PortRef A, B;
935 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
936
937 // Try to send A over itself.
938
939 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
940 node.SendStringMessageWithPort(A, "oops", A));
941
942 // Try to send B over A.
943
944 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
945 node.SendStringMessageWithPort(A, "nope", B));
946
947 // B should be closed immediately.
948 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
949
950 WaitForIdle();
951
952 // There should have been no messages accepted.
953 ScopedMessage message;
954 EXPECT_FALSE(node.GetSavedMessage(&message));
955
956 EXPECT_EQ(OK, node.node().ClosePort(A));
957
958 WaitForIdle();
959
960 EXPECT_TRUE(node.node().CanShutdownCleanly());
961 }
962
TEST_F(PortsTest,DontLeakUnreceivedPorts)963 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
964 TestNode node(0);
965 AddNode(&node);
966
967 PortRef A, B, C, D;
968 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
969 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
970
971 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
972
973 EXPECT_EQ(OK, node.node().ClosePort(C));
974 EXPECT_EQ(OK, node.node().ClosePort(A));
975 EXPECT_EQ(OK, node.node().ClosePort(B));
976
977 WaitForIdle();
978
979 EXPECT_TRUE(node.node().CanShutdownCleanly());
980 }
981
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)982 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
983 TestNode node(0);
984 AddNode(&node);
985
986 PortRef A, B, C, D;
987 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
988 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
989
990 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
991
992 ScopedMessage message;
993 EXPECT_TRUE(node.ReadMessage(B, &message));
994 ASSERT_EQ(1u, message->num_ports());
995 EXPECT_TRUE(MessageEquals(message, "foo"));
996 PortRef E;
997 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
998
999 EXPECT_TRUE(
1000 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1001
1002 WaitForIdle();
1003
1004 EXPECT_TRUE(
1005 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1006 EXPECT_FALSE(node.node().CanShutdownCleanly());
1007
1008 EXPECT_EQ(OK, node.node().ClosePort(A));
1009 EXPECT_EQ(OK, node.node().ClosePort(B));
1010 EXPECT_EQ(OK, node.node().ClosePort(C));
1011 EXPECT_EQ(OK, node.node().ClosePort(E));
1012
1013 WaitForIdle();
1014
1015 EXPECT_TRUE(node.node().CanShutdownCleanly());
1016 }
1017
TEST_F(PortsTest,ProxyCollapse1)1018 TEST_F(PortsTest, ProxyCollapse1) {
1019 TestNode node(0);
1020 AddNode(&node);
1021
1022 PortRef A, B;
1023 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1024
1025 PortRef X, Y;
1026 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1027
1028 ScopedMessage message;
1029
1030 // Send B and receive it as C.
1031 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1032 ASSERT_TRUE(node.ReadMessage(Y, &message));
1033 ASSERT_EQ(1u, message->num_ports());
1034 PortRef C;
1035 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1036
1037 // Send C and receive it as D.
1038 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1039 ASSERT_TRUE(node.ReadMessage(Y, &message));
1040 ASSERT_EQ(1u, message->num_ports());
1041 PortRef D;
1042 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1043
1044 // Send D and receive it as E.
1045 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1046 ASSERT_TRUE(node.ReadMessage(Y, &message));
1047 ASSERT_EQ(1u, message->num_ports());
1048 PortRef E;
1049 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1050
1051 EXPECT_EQ(OK, node.node().ClosePort(X));
1052 EXPECT_EQ(OK, node.node().ClosePort(Y));
1053
1054 EXPECT_EQ(OK, node.node().ClosePort(A));
1055 EXPECT_EQ(OK, node.node().ClosePort(E));
1056
1057 // The node should not idle until all proxies are collapsed.
1058 WaitForIdle();
1059
1060 EXPECT_TRUE(node.node().CanShutdownCleanly());
1061 }
1062
TEST_F(PortsTest,ProxyCollapse2)1063 TEST_F(PortsTest, ProxyCollapse2) {
1064 TestNode node(0);
1065 AddNode(&node);
1066
1067 PortRef A, B;
1068 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1069
1070 PortRef X, Y;
1071 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1072
1073 ScopedMessage message;
1074
1075 // Send B and A to create proxies in each direction.
1076 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1077 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1078
1079 EXPECT_EQ(OK, node.node().ClosePort(X));
1080 EXPECT_EQ(OK, node.node().ClosePort(Y));
1081
1082 // At this point we have a scenario with:
1083 //
1084 // D -> [B] -> C -> [A]
1085 //
1086 // Ensure that the proxies can collapse. The sent ports will be closed
1087 // eventually as a result of Y's closure.
1088
1089 WaitForIdle();
1090
1091 EXPECT_TRUE(node.node().CanShutdownCleanly());
1092 }
1093
TEST_F(PortsTest,SendWithClosedPeer)1094 TEST_F(PortsTest, SendWithClosedPeer) {
1095 // This tests that if a port is sent when its peer is already known to be
1096 // closed, the newly created port will be aware of that peer closure, and the
1097 // proxy will eventually collapse.
1098
1099 TestNode node(0);
1100 AddNode(&node);
1101
1102 // Send a message from A to B, then close A.
1103 PortRef A, B;
1104 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1105 EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1106 EXPECT_EQ(OK, node.node().ClosePort(A));
1107
1108 // Now send B over X-Y as new port C.
1109 PortRef X, Y;
1110 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1111 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1112 ScopedMessage message;
1113 ASSERT_TRUE(node.ReadMessage(Y, &message));
1114 ASSERT_EQ(1u, message->num_ports());
1115 PortRef C;
1116 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1117
1118 EXPECT_EQ(OK, node.node().ClosePort(X));
1119 EXPECT_EQ(OK, node.node().ClosePort(Y));
1120
1121 WaitForIdle();
1122
1123 // C should have received the message originally sent to B, and it should also
1124 // be aware of A's closure.
1125
1126 ASSERT_TRUE(node.ReadMessage(C, &message));
1127 EXPECT_TRUE(MessageEquals(message, "hey"));
1128
1129 PortStatus status;
1130 EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1131 EXPECT_FALSE(status.receiving_messages);
1132 EXPECT_FALSE(status.has_messages);
1133 EXPECT_TRUE(status.peer_closed);
1134
1135 node.node().ClosePort(C);
1136
1137 WaitForIdle();
1138
1139 EXPECT_TRUE(node.node().CanShutdownCleanly());
1140 }
1141
TEST_F(PortsTest,SendWithClosedPeerSent)1142 TEST_F(PortsTest, SendWithClosedPeerSent) {
1143 // This tests that if a port is closed while some number of proxies are still
1144 // routing messages (directly or indirectly) to it, that the peer port is
1145 // eventually notified of the closure, and the dead-end proxies will
1146 // eventually be removed.
1147
1148 TestNode node(0);
1149 AddNode(&node);
1150
1151 PortRef X, Y;
1152 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1153
1154 PortRef A, B;
1155 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1156
1157 ScopedMessage message;
1158
1159 // Send A as new port C.
1160 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1161
1162 ASSERT_TRUE(node.ReadMessage(Y, &message));
1163 ASSERT_EQ(1u, message->num_ports());
1164 PortRef C;
1165 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1166
1167 // Send C as new port D.
1168 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1169
1170 ASSERT_TRUE(node.ReadMessage(Y, &message));
1171 ASSERT_EQ(1u, message->num_ports());
1172 PortRef D;
1173 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1174
1175 // Send a message to B through D, then close D.
1176 EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1177 EXPECT_EQ(OK, node.node().ClosePort(D));
1178
1179 // Now send B as new port E.
1180
1181 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1182 EXPECT_EQ(OK, node.node().ClosePort(X));
1183
1184 ASSERT_TRUE(node.ReadMessage(Y, &message));
1185 ASSERT_EQ(1u, message->num_ports());
1186 PortRef E;
1187 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1188
1189 EXPECT_EQ(OK, node.node().ClosePort(Y));
1190
1191 WaitForIdle();
1192
1193 // E should receive the message originally sent to B, and it should also be
1194 // aware of D's closure.
1195
1196 ASSERT_TRUE(node.ReadMessage(E, &message));
1197 EXPECT_TRUE(MessageEquals(message, "hey"));
1198
1199 PortStatus status;
1200 EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1201 EXPECT_FALSE(status.receiving_messages);
1202 EXPECT_FALSE(status.has_messages);
1203 EXPECT_TRUE(status.peer_closed);
1204
1205 EXPECT_EQ(OK, node.node().ClosePort(E));
1206
1207 WaitForIdle();
1208
1209 EXPECT_TRUE(node.node().CanShutdownCleanly());
1210 }
1211
TEST_F(PortsTest,MergePorts)1212 TEST_F(PortsTest, MergePorts) {
1213 TestNode node0(0);
1214 AddNode(&node0);
1215
1216 TestNode node1(1);
1217 AddNode(&node1);
1218
1219 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1220 PortRef A, B, C, D;
1221 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1222 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1223
1224 // Write a message on A.
1225 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1226
1227 // Initiate a merge between B and C.
1228 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1229
1230 WaitForIdle();
1231
1232 // Expect all proxies to be gone once idle.
1233 EXPECT_TRUE(
1234 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1235 EXPECT_TRUE(
1236 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1237
1238 // Expect D to have received the message sent on A.
1239 ScopedMessage message;
1240 ASSERT_TRUE(node1.ReadMessage(D, &message));
1241 EXPECT_TRUE(MessageEquals(message, "hey"));
1242
1243 EXPECT_EQ(OK, node0.node().ClosePort(A));
1244 EXPECT_EQ(OK, node1.node().ClosePort(D));
1245
1246 // No more ports should be open.
1247 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1248 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1249 }
1250
TEST_F(PortsTest,MergePortWithClosedPeer1)1251 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1252 // This tests that the right thing happens when initiating a merge on a port
1253 // whose peer has already been closed.
1254
1255 TestNode node0(0);
1256 AddNode(&node0);
1257
1258 TestNode node1(1);
1259 AddNode(&node1);
1260
1261 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1262 PortRef A, B, C, D;
1263 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1264 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1265
1266 // Write a message on A.
1267 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1268
1269 // Close A.
1270 EXPECT_EQ(OK, node0.node().ClosePort(A));
1271
1272 // Initiate a merge between B and C.
1273 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1274
1275 WaitForIdle();
1276
1277 // Expect all proxies to be gone once idle. node0 should have no ports since
1278 // A was explicitly closed.
1279 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1280 EXPECT_TRUE(
1281 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1282
1283 // Expect D to have received the message sent on A.
1284 ScopedMessage message;
1285 ASSERT_TRUE(node1.ReadMessage(D, &message));
1286 EXPECT_TRUE(MessageEquals(message, "hey"));
1287
1288 EXPECT_EQ(OK, node1.node().ClosePort(D));
1289
1290 // No more ports should be open.
1291 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1292 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1293 }
1294
TEST_F(PortsTest,MergePortWithClosedPeer2)1295 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1296 // This tests that the right thing happens when merging into a port whose peer
1297 // has already been closed.
1298
1299 TestNode node0(0);
1300 AddNode(&node0);
1301
1302 TestNode node1(1);
1303 AddNode(&node1);
1304
1305 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1306 PortRef A, B, C, D;
1307 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1308 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1309
1310 // Write a message on D and close it.
1311 EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1312 EXPECT_EQ(OK, node1.node().ClosePort(D));
1313
1314 // Initiate a merge between B and C.
1315 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1316
1317 WaitForIdle();
1318
1319 // Expect all proxies to be gone once idle. node1 should have no ports since
1320 // D was explicitly closed.
1321 EXPECT_TRUE(
1322 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1323 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1324
1325 // Expect A to have received the message sent on D.
1326 ScopedMessage message;
1327 ASSERT_TRUE(node0.ReadMessage(A, &message));
1328 EXPECT_TRUE(MessageEquals(message, "hey"));
1329
1330 EXPECT_EQ(OK, node0.node().ClosePort(A));
1331
1332 // No more ports should be open.
1333 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1334 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1335 }
1336
TEST_F(PortsTest,MergePortsWithClosedPeers)1337 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1338 // This tests that no residual ports are left behind if two ports are merged
1339 // when both of their peers have been closed.
1340
1341 TestNode node0(0);
1342 AddNode(&node0);
1343
1344 TestNode node1(1);
1345 AddNode(&node1);
1346
1347 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1348 PortRef A, B, C, D;
1349 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1350 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1351
1352 // Close A and D.
1353 EXPECT_EQ(OK, node0.node().ClosePort(A));
1354 EXPECT_EQ(OK, node1.node().ClosePort(D));
1355
1356 WaitForIdle();
1357
1358 // Initiate a merge between B and C.
1359 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1360
1361 WaitForIdle();
1362
1363 // Expect everything to have gone away.
1364 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1365 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1366 }
1367
TEST_F(PortsTest,MergePortsWithMovedPeers)1368 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1369 // This tests that ports can be merged successfully even if their peers are
1370 // moved around.
1371
1372 TestNode node0(0);
1373 AddNode(&node0);
1374
1375 TestNode node1(1);
1376 AddNode(&node1);
1377
1378 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1379 PortRef A, B, C, D;
1380 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1381 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1382
1383 // Set up another pair X-Y for moving ports on node0.
1384 PortRef X, Y;
1385 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1386
1387 ScopedMessage message;
1388
1389 // Move A to new port E.
1390 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1391 ASSERT_TRUE(node0.ReadMessage(Y, &message));
1392 ASSERT_EQ(1u, message->num_ports());
1393 PortRef E;
1394 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1395
1396 EXPECT_EQ(OK, node0.node().ClosePort(X));
1397 EXPECT_EQ(OK, node0.node().ClosePort(Y));
1398
1399 // Write messages on E and D.
1400 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1401 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1402
1403 // Initiate a merge between B and C.
1404 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1405
1406 WaitForIdle();
1407
1408 // Expect to receive D's message on E and E's message on D.
1409 ASSERT_TRUE(node0.ReadMessage(E, &message));
1410 EXPECT_TRUE(MessageEquals(message, "hi"));
1411 ASSERT_TRUE(node1.ReadMessage(D, &message));
1412 EXPECT_TRUE(MessageEquals(message, "hey"));
1413
1414 // Close E and D.
1415 EXPECT_EQ(OK, node0.node().ClosePort(E));
1416 EXPECT_EQ(OK, node1.node().ClosePort(D));
1417
1418 WaitForIdle();
1419
1420 // Expect everything to have gone away.
1421 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1422 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1423 }
1424
TEST_F(PortsTest,MergePortsFailsGracefully)1425 TEST_F(PortsTest, MergePortsFailsGracefully) {
1426 // This tests that the system remains in a well-defined state if something
1427 // goes wrong during port merge.
1428
1429 TestNode node0(0);
1430 AddNode(&node0);
1431
1432 TestNode node1(1);
1433 AddNode(&node1);
1434
1435 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1436 PortRef A, B, C, D;
1437 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1438 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1439
1440 ScopedMessage message;
1441 PortRef X, Y;
1442 EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1443 EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1444 EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1445 EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1446
1447 // Block the merge from proceeding until we can do something stupid with port
1448 // C. This avoids the test logic racing with async merge logic.
1449 node1.BlockOnEvent(Event::Type::kMergePort);
1450
1451 // Initiate the merge between B and C.
1452 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1453
1454 // Move C to a new port E. This is not a sane use of Node's public API but
1455 // is still hypothetically possible. It allows us to force a merge failure
1456 // because C will be in an invalid state by the time the merge is processed.
1457 // As a result, B should be closed.
1458 EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1459
1460 node1.Unblock();
1461
1462 WaitForIdle();
1463
1464 ASSERT_TRUE(node0.ReadMessage(X, &message));
1465 ASSERT_EQ(1u, message->num_ports());
1466 PortRef E;
1467 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1468
1469 EXPECT_EQ(OK, node0.node().ClosePort(X));
1470 EXPECT_EQ(OK, node1.node().ClosePort(Y));
1471
1472 WaitForIdle();
1473
1474 // C goes away as a result of normal proxy removal. B should have been closed
1475 // cleanly by the failed MergePorts.
1476 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1477 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1478
1479 // Close A, D, and E.
1480 EXPECT_EQ(OK, node0.node().ClosePort(A));
1481 EXPECT_EQ(OK, node1.node().ClosePort(D));
1482 EXPECT_EQ(OK, node0.node().ClosePort(E));
1483
1484 WaitForIdle();
1485
1486 // Expect everything to have gone away.
1487 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1488 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1489 }
1490
TEST_F(PortsTest,RemotePeerStatus)1491 TEST_F(PortsTest, RemotePeerStatus) {
1492 TestNode node0(0);
1493 AddNode(&node0);
1494
1495 TestNode node1(1);
1496 AddNode(&node1);
1497
1498 // Create a local port pair. Neither port should appear to have a remote peer.
1499 PortRef a, b;
1500 PortStatus status;
1501 node0.node().CreatePortPair(&a, &b);
1502 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1503 EXPECT_FALSE(status.peer_remote);
1504 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1505 EXPECT_FALSE(status.peer_remote);
1506
1507 // Create a port pair spanning the two nodes. Both spanning ports should
1508 // immediately appear to have a remote peer.
1509 PortRef x0, x1;
1510 CreatePortPair(&node0, &x0, &node1, &x1);
1511
1512 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1513 EXPECT_TRUE(status.peer_remote);
1514 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1515 EXPECT_TRUE(status.peer_remote);
1516
1517 PortRef x2, x3;
1518 CreatePortPair(&node0, &x2, &node1, &x3);
1519
1520 // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1521 // remote and the remote peers local.
1522 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1523 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1524 WaitForIdle();
1525
1526 ScopedMessage message;
1527 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1528 ASSERT_EQ(1u, message->num_ports());
1529 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1530
1531 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1532 ASSERT_EQ(1u, message->num_ports());
1533 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1534
1535 // Now x0-x1 should be local to node0 and a-b should span the nodes.
1536 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1537 EXPECT_FALSE(status.peer_remote);
1538 ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1539 EXPECT_FALSE(status.peer_remote);
1540 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1541 EXPECT_TRUE(status.peer_remote);
1542 ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1543 EXPECT_TRUE(status.peer_remote);
1544
1545 // And swap them back one more time.
1546 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1547 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1548 WaitForIdle();
1549
1550 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1551 ASSERT_EQ(1u, message->num_ports());
1552 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1553
1554 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1555 ASSERT_EQ(1u, message->num_ports());
1556 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1557
1558 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1559 EXPECT_TRUE(status.peer_remote);
1560 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1561 EXPECT_TRUE(status.peer_remote);
1562 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1563 EXPECT_FALSE(status.peer_remote);
1564 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1565 EXPECT_FALSE(status.peer_remote);
1566
1567 EXPECT_EQ(OK, node0.node().ClosePort(x0));
1568 EXPECT_EQ(OK, node1.node().ClosePort(x1));
1569 EXPECT_EQ(OK, node0.node().ClosePort(x2));
1570 EXPECT_EQ(OK, node1.node().ClosePort(x3));
1571 EXPECT_EQ(OK, node0.node().ClosePort(a));
1572 EXPECT_EQ(OK, node0.node().ClosePort(b));
1573
1574 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1575 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1576 }
1577
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1578 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1579 TestNode node0(0);
1580 AddNode(&node0);
1581
1582 TestNode node1(1);
1583 AddNode(&node1);
1584
1585 // Set up a-b on node0 and c-d spanning node0-node1.
1586 PortRef a, b, c, d;
1587 node0.node().CreatePortPair(&a, &b);
1588 CreatePortPair(&node0, &c, &node1, &d);
1589
1590 PortStatus status;
1591 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1592 EXPECT_FALSE(status.peer_remote);
1593 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1594 EXPECT_FALSE(status.peer_remote);
1595 ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1596 EXPECT_TRUE(status.peer_remote);
1597 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1598 EXPECT_TRUE(status.peer_remote);
1599
1600 EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1601 WaitForIdle();
1602
1603 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1604 EXPECT_TRUE(status.peer_remote);
1605 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1606 EXPECT_TRUE(status.peer_remote);
1607
1608 EXPECT_EQ(OK, node0.node().ClosePort(a));
1609 EXPECT_EQ(OK, node1.node().ClosePort(d));
1610 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1611 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1612 }
1613
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1614 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1615 TestNode node0(0);
1616 AddNode(&node0);
1617
1618 TestNode node1(1);
1619 AddNode(&node1);
1620
1621 // Set up a-b on node0 and c-d on node1.
1622 PortRef a, b, c, d;
1623 node0.node().CreatePortPair(&a, &b);
1624 node1.node().CreatePortPair(&c, &d);
1625
1626 PortStatus status;
1627 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1628 EXPECT_FALSE(status.peer_remote);
1629 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1630 EXPECT_FALSE(status.peer_remote);
1631 ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1632 EXPECT_FALSE(status.peer_remote);
1633 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1634 EXPECT_FALSE(status.peer_remote);
1635
1636 EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1637 WaitForIdle();
1638
1639 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1640 EXPECT_TRUE(status.peer_remote);
1641 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1642 EXPECT_TRUE(status.peer_remote);
1643
1644 EXPECT_EQ(OK, node0.node().ClosePort(a));
1645 EXPECT_EQ(OK, node1.node().ClosePort(d));
1646 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1647 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1648 }
1649
TEST_F(PortsTest,RetransmitUserMessageEvents)1650 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1651 // Ensures that user message events can be retransmitted properly.
1652 TestNode node0(0);
1653 AddNode(&node0);
1654
1655 PortRef a, b;
1656 node0.node().CreatePortPair(&a, &b);
1657
1658 // Ping.
1659 const char* kMessage = "hey";
1660 ScopedMessage message;
1661 EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1662 ASSERT_TRUE(node0.ReadMessage(b, &message));
1663 EXPECT_TRUE(MessageEquals(message, kMessage));
1664
1665 // Pong.
1666 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1667 EXPECT_FALSE(message);
1668 ASSERT_TRUE(node0.ReadMessage(a, &message));
1669 EXPECT_TRUE(MessageEquals(message, kMessage));
1670
1671 // Ping again.
1672 EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1673 EXPECT_FALSE(message);
1674 ASSERT_TRUE(node0.ReadMessage(b, &message));
1675 EXPECT_TRUE(MessageEquals(message, kMessage));
1676
1677 // Pong again!
1678 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1679 EXPECT_FALSE(message);
1680 ASSERT_TRUE(node0.ReadMessage(a, &message));
1681 EXPECT_TRUE(MessageEquals(message, kMessage));
1682
1683 EXPECT_EQ(OK, node0.node().ClosePort(a));
1684 EXPECT_EQ(OK, node0.node().ClosePort(b));
1685 }
1686
TEST_F(PortsTest,SetAcknowledgeRequestInterval)1687 TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
1688 TestNode node0(0);
1689 AddNode(&node0);
1690
1691 PortRef a0, a1;
1692 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
1693 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1694
1695 // Send a batch of messages.
1696 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
1697 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1698 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1699 WaitForIdle();
1700 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
1701
1702 // Set to acknowledge every read message, and validate that already-read
1703 // messages are acknowledged.
1704 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
1705 WaitForIdle();
1706 EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
1707
1708 // Read a third of the messages from the other end.
1709 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
1710 WaitForIdle();
1711
1712 EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
1713
1714 TestNode node1(1);
1715 AddNode(&node1);
1716
1717 // Transfer a1 across to node1.
1718 PortRef x0, x1;
1719 CreatePortPair(&node0, &x0, &node1, &x1);
1720 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
1721 WaitForIdle();
1722
1723 ScopedMessage message;
1724 ASSERT_TRUE(node1.ReadMessage(x1, &message));
1725 ASSERT_EQ(1u, message->num_ports());
1726 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
1727
1728 // Read the last third of the messages from the transferred node, and
1729 // validate that the unacknowledge message count updates correctly.
1730 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
1731 WaitForIdle();
1732 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1733
1734 // Turn the acknowledges down and validate that they don't go on indefinitely.
1735 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
1736 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
1737 WaitForIdle();
1738 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
1739 WaitForIdle();
1740 EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
1741
1742 // Close the far port and validate that the closure updates the unacknowledged
1743 // count.
1744 EXPECT_EQ(OK, node1.node().ClosePort(a1));
1745 WaitForIdle();
1746 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
1747
1748 EXPECT_EQ(OK, node0.node().ClosePort(a0));
1749 EXPECT_EQ(OK, node0.node().ClosePort(x0));
1750 EXPECT_EQ(OK, node1.node().ClosePort(x1));
1751 }
1752
1753 } // namespace test
1754 } // namespace ports
1755 } // namespace core
1756 } // namespace mojo
1757