1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/signaling/ftl_message_reception_channel.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10 #include <utility>
11 #include <vector>
12
13 #include "base/bind.h"
14 #include "base/callback_helpers.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/notreached.h"
17 #include "base/run_loop.h"
18 #include "base/test/bind.h"
19 #include "base/test/mock_callback.h"
20 #include "base/test/task_environment.h"
21 #include "base/threading/sequenced_task_runner_handle.h"
22 #include "remoting/base/protobuf_http_status.h"
23 #include "remoting/base/scoped_protobuf_http_request.h"
24 #include "remoting/proto/ftl/v1/ftl_messages.pb.h"
25 #include "remoting/signaling/ftl_services_context.h"
26 #include "remoting/signaling/signaling_tracker.h"
27 #include "testing/gmock/include/gmock/gmock.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29
30 namespace remoting {
31
32 namespace {
33
34 using ::testing::_;
35 using ::testing::Expectation;
36 using ::testing::Invoke;
37 using ::testing::Property;
38 using ::testing::Return;
39
40 using ReceiveMessagesResponseCallback = base::RepeatingCallback<void(
41 std::unique_ptr<ftl::ReceiveMessagesResponse>)>;
42 using StatusCallback = base::OnceCallback<void(const ProtobufHttpStatus&)>;
43
44 class MockSignalingTracker : public SignalingTracker {
45 public:
46 MOCK_METHOD0(OnSignalingActive, void());
47 };
48
49 // Fake stream implementation to allow probing if a stream is closed by client.
50 class FakeScopedProtobufHttpRequest : public ScopedProtobufHttpRequest {
51 public:
FakeScopedProtobufHttpRequest()52 FakeScopedProtobufHttpRequest()
53 : ScopedProtobufHttpRequest(base::DoNothing::Once()) {}
54 ~FakeScopedProtobufHttpRequest() override = default;
55
GetWeakPtr()56 base::WeakPtr<FakeScopedProtobufHttpRequest> GetWeakPtr() {
57 return weak_factory_.GetWeakPtr();
58 }
59
60 private:
61 base::WeakPtrFactory<FakeScopedProtobufHttpRequest> weak_factory_{this};
62 DISALLOW_COPY_AND_ASSIGN(FakeScopedProtobufHttpRequest);
63 };
64
CreateFakeServerStream()65 std::unique_ptr<FakeScopedProtobufHttpRequest> CreateFakeServerStream() {
66 return std::make_unique<FakeScopedProtobufHttpRequest>();
67 }
68
69 // Creates a gmock EXPECT_CALL action that:
70 // 1. Creates a fake server stream and returns it as the start stream result
71 // 2. Posts a task to call |on_stream_opened| at the end of current sequence
72 // 3. Writes the WeakPtr to the fake server stream to |optional_out_stream|
73 // if it is provided.
74 template <typename OnStreamOpenedLambda>
StartStream(OnStreamOpenedLambda on_stream_opened,base::WeakPtr<FakeScopedProtobufHttpRequest> * optional_out_stream=nullptr)75 decltype(auto) StartStream(OnStreamOpenedLambda on_stream_opened,
76 base::WeakPtr<FakeScopedProtobufHttpRequest>*
77 optional_out_stream = nullptr) {
78 return [=](base::OnceClosure on_channel_ready,
79 const ReceiveMessagesResponseCallback& on_incoming_msg,
80 StatusCallback on_channel_closed) {
81 auto fake_stream = CreateFakeServerStream();
82 if (optional_out_stream) {
83 *optional_out_stream = fake_stream->GetWeakPtr();
84 }
85 auto on_stream_opened_cb = base::BindLambdaForTesting(on_stream_opened);
86 base::SequencedTaskRunnerHandle::Get()->PostTask(
87 FROM_HERE,
88 base::BindOnce(on_stream_opened_cb, std::move(on_channel_ready),
89 on_incoming_msg, std::move(on_channel_closed)));
90 return fake_stream;
91 };
92 }
93
NotReachedClosure()94 base::OnceClosure NotReachedClosure() {
95 return base::BindOnce([]() { NOTREACHED(); });
96 }
97
98 base::RepeatingCallback<void(const ProtobufHttpStatus&)>
NotReachedStatusCallback(const base::Location & location)99 NotReachedStatusCallback(const base::Location& location) {
100 return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) {
101 NOTREACHED() << "Location: " << location.ToString()
102 << ", status code: " << static_cast<int>(status.error_code());
103 });
104 }
105
106 base::OnceCallback<void(const ProtobufHttpStatus&)>
CheckStatusThenQuitRunLoopCallback(const base::Location & from_here,ProtobufHttpStatus::Code expected_status_code,base::RunLoop * run_loop)107 CheckStatusThenQuitRunLoopCallback(
108 const base::Location& from_here,
109 ProtobufHttpStatus::Code expected_status_code,
110 base::RunLoop* run_loop) {
111 return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) {
112 ASSERT_EQ(expected_status_code, status.error_code())
113 << "Incorrect status code. Location: " << from_here.ToString();
114 run_loop->QuitWhenIdle();
115 });
116 }
117
118 } // namespace
119
120 class FtlMessageReceptionChannelTest : public testing::Test {
121 public:
122 void SetUp() override;
123 void TearDown() override;
124
125 protected:
126 base::TimeDelta GetTimeUntilRetry() const;
127 int GetRetryFailureCount() const;
128
129 base::test::TaskEnvironment task_environment_{
130 base::test::TaskEnvironment::TimeSource::MOCK_TIME};
131 std::unique_ptr<FtlMessageReceptionChannel> channel_;
132 base::MockCallback<FtlMessageReceptionChannel::StreamOpener>
133 mock_stream_opener_;
134 base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>>
135 mock_on_incoming_msg_;
136 MockSignalingTracker mock_signaling_tracker_;
137 };
138
SetUp()139 void FtlMessageReceptionChannelTest::SetUp() {
140 channel_ =
141 std::make_unique<FtlMessageReceptionChannel>(&mock_signaling_tracker_);
142 channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get());
143 }
144
TearDown()145 void FtlMessageReceptionChannelTest::TearDown() {
146 channel_.reset();
147 task_environment_.FastForwardUntilNoTasksRemain();
148 }
149
GetTimeUntilRetry() const150 base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const {
151 return channel_->GetReconnectRetryBackoffEntryForTesting()
152 .GetTimeUntilRelease();
153 }
154
GetRetryFailureCount() const155 int FtlMessageReceptionChannelTest::GetRetryFailureCount() const {
156 return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count();
157 }
158
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_StoppedImmediately)159 TEST_F(FtlMessageReceptionChannelTest,
160 TestStartReceivingMessages_StoppedImmediately) {
161 base::RunLoop run_loop;
162
163 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
164 .WillOnce(StartStream(
165 [&](base::OnceClosure on_channel_ready,
166 const ReceiveMessagesResponseCallback& on_incoming_msg,
167 StatusCallback on_channel_closed) {
168 channel_->StopReceivingMessages();
169 run_loop.Quit();
170 }));
171
172 channel_->StartReceivingMessages(NotReachedClosure(),
173 NotReachedStatusCallback(FROM_HERE));
174
175 run_loop.Run();
176 }
177
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_NotAuthenticated)178 TEST_F(FtlMessageReceptionChannelTest,
179 TestStartReceivingMessages_NotAuthenticated) {
180 base::RunLoop run_loop;
181
182 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
183 .WillOnce(StartStream(
184 [&](base::OnceClosure on_channel_ready,
185 const ReceiveMessagesResponseCallback& on_incoming_msg,
186 StatusCallback on_channel_closed) {
187 std::move(on_channel_closed)
188 .Run(ProtobufHttpStatus(
189 ProtobufHttpStatus::Code::UNAUTHENTICATED, ""));
190 }));
191
192 channel_->StartReceivingMessages(
193 NotReachedClosure(),
194 CheckStatusThenQuitRunLoopCallback(
195 FROM_HERE, ProtobufHttpStatus::Code::UNAUTHENTICATED, &run_loop));
196
197 run_loop.Run();
198 }
199
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_StreamStarted)200 TEST_F(FtlMessageReceptionChannelTest,
201 TestStartReceivingMessages_StreamStarted) {
202 base::RunLoop run_loop;
203
204 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
205 .WillOnce(StartStream(
206 [&](base::OnceClosure on_channel_ready,
207 const ReceiveMessagesResponseCallback& on_incoming_msg,
208 StatusCallback on_channel_closed) {
209 std::move(on_channel_ready).Run();
210 }));
211
212 channel_->StartReceivingMessages(run_loop.QuitClosure(),
213 NotReachedStatusCallback(FROM_HERE));
214
215 run_loop.Run();
216 }
217
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_RecoverableStreamError)218 TEST_F(FtlMessageReceptionChannelTest,
219 TestStartReceivingMessages_RecoverableStreamError) {
220 base::RunLoop run_loop;
221
222 base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
223 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
224 .WillOnce(StartStream(
225 [&](base::OnceClosure on_channel_ready,
226 const ReceiveMessagesResponseCallback& on_incoming_msg,
227 StatusCallback on_channel_closed) {
228 // The first open stream attempt fails with UNAVAILABLE error.
229 ASSERT_EQ(0, GetRetryFailureCount());
230
231 std::move(on_channel_closed)
232 .Run(ProtobufHttpStatus(ProtobufHttpStatus::Code::UNAVAILABLE,
233 ""));
234
235 ASSERT_EQ(1, GetRetryFailureCount());
236 ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
237 GetTimeUntilRetry().InSecondsF(), 0.5);
238
239 // This will make the channel reopen the stream.
240 task_environment_.FastForwardBy(GetTimeUntilRetry());
241 },
242 &old_stream))
243 .WillOnce(StartStream(
244 [&](base::OnceClosure on_channel_ready,
245 const ReceiveMessagesResponseCallback& on_incoming_msg,
246 StatusCallback on_channel_closed) {
247 // Second open stream attempt succeeds.
248
249 // Assert old stream closed.
250 ASSERT_FALSE(old_stream);
251
252 std::move(on_channel_ready).Run();
253
254 ASSERT_EQ(0, GetRetryFailureCount());
255 }));
256
257 channel_->StartReceivingMessages(run_loop.QuitClosure(),
258 NotReachedStatusCallback(FROM_HERE));
259
260 run_loop.Run();
261 }
262
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_MultipleCalls)263 TEST_F(FtlMessageReceptionChannelTest,
264 TestStartReceivingMessages_MultipleCalls) {
265 base::RunLoop run_loop;
266
267 base::MockCallback<base::OnceClosure> stream_ready_callback;
268
269 // Exits the run loop iff the callback is called three times with OK.
270 EXPECT_CALL(stream_ready_callback, Run())
271 .WillOnce(Return())
272 .WillOnce(Return())
273 .WillOnce([&]() { run_loop.Quit(); });
274
275 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
276 .WillOnce(StartStream(
277 [&](base::OnceClosure on_channel_ready,
278 const ReceiveMessagesResponseCallback& on_incoming_msg,
279 StatusCallback on_channel_closed) {
280 std::move(on_channel_ready).Run();
281 }));
282
283 channel_->StartReceivingMessages(stream_ready_callback.Get(),
284 NotReachedStatusCallback(FROM_HERE));
285 channel_->StartReceivingMessages(stream_ready_callback.Get(),
286 NotReachedStatusCallback(FROM_HERE));
287 channel_->StartReceivingMessages(stream_ready_callback.Get(),
288 NotReachedStatusCallback(FROM_HERE));
289
290 run_loop.Run();
291 }
292
TEST_F(FtlMessageReceptionChannelTest,StreamsTwoMessages)293 TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) {
294 base::RunLoop run_loop;
295
296 constexpr char kMessage1Id[] = "msg_1";
297 constexpr char kMessage2Id[] = "msg_2";
298
299 ftl::InboxMessage message_1;
300 message_1.set_message_id(kMessage1Id);
301 ftl::InboxMessage message_2;
302 message_2.set_message_id(kMessage2Id);
303
304 EXPECT_CALL(mock_on_incoming_msg_,
305 Run(Property(&ftl::InboxMessage::message_id, kMessage1Id)))
306 .WillOnce(Return());
307 EXPECT_CALL(mock_on_incoming_msg_,
308 Run(Property(&ftl::InboxMessage::message_id, kMessage2Id)))
309 .WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); }));
310
311 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
312 .WillOnce(StartStream(
313 [&](base::OnceClosure on_channel_ready,
314 const ReceiveMessagesResponseCallback& on_incoming_msg,
315 StatusCallback on_channel_closed) {
316 std::move(on_channel_ready).Run();
317
318 auto response = std::make_unique<ftl::ReceiveMessagesResponse>();
319 *response->mutable_inbox_message() = message_1;
320 on_incoming_msg.Run(std::move(response));
321
322 response = std::make_unique<ftl::ReceiveMessagesResponse>();
323 *response->mutable_inbox_message() = message_2;
324 on_incoming_msg.Run(std::move(response));
325
326 const ProtobufHttpStatus kCancel(
327 ProtobufHttpStatus::Code::CANCELLED, "Cancelled");
328 std::move(on_channel_closed).Run(kCancel);
329 }));
330
331 channel_->StartReceivingMessages(
332 base::DoNothing(),
333 CheckStatusThenQuitRunLoopCallback(
334 FROM_HERE, ProtobufHttpStatus::ProtobufHttpStatus::Code::CANCELLED,
335 &run_loop));
336
337 run_loop.Run();
338 }
339
TEST_F(FtlMessageReceptionChannelTest,ReceivedOnePong_OnSignalingActiveTwice)340 TEST_F(FtlMessageReceptionChannelTest, ReceivedOnePong_OnSignalingActiveTwice) {
341 base::RunLoop run_loop;
342
343 base::MockCallback<base::OnceClosure> stream_ready_callback;
344
345 EXPECT_CALL(mock_signaling_tracker_, OnSignalingActive())
346 .WillOnce(Return())
347 .WillOnce([&]() { run_loop.Quit(); });
348
349 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
350 .WillOnce(StartStream(
351 [&](base::OnceClosure on_channel_ready,
352 const ReceiveMessagesResponseCallback& on_incoming_msg,
353 StatusCallback on_channel_closed) {
354 std::move(on_channel_ready).Run();
355 auto response = std::make_unique<ftl::ReceiveMessagesResponse>();
356 response->mutable_pong();
357 on_incoming_msg.Run(std::move(response));
358 }));
359
360 channel_->StartReceivingMessages(base::DoNothing(),
361 NotReachedStatusCallback(FROM_HERE));
362
363 run_loop.Run();
364 }
365
TEST_F(FtlMessageReceptionChannelTest,NoPongWithinTimeout_ResetsStream)366 TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) {
367 base::RunLoop run_loop;
368
369 base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
370 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
371 .WillOnce(StartStream(
372 [&](base::OnceClosure on_channel_ready,
373 const ReceiveMessagesResponseCallback& on_incoming_msg,
374 StatusCallback on_channel_closed) {
375 std::move(on_channel_ready).Run();
376 task_environment_.FastForwardBy(
377 FtlMessageReceptionChannel::kPongTimeout);
378
379 ASSERT_EQ(1, GetRetryFailureCount());
380 ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
381 GetTimeUntilRetry().InSecondsF(), 0.5);
382
383 // This will make the channel reopen the stream.
384 task_environment_.FastForwardBy(GetTimeUntilRetry());
385 },
386 &old_stream))
387 .WillOnce(StartStream(
388 [&](base::OnceClosure on_channel_ready,
389 const ReceiveMessagesResponseCallback& on_incoming_msg,
390 StatusCallback on_channel_closed) {
391 // Stream is reopened.
392
393 // Assert old stream closed.
394 ASSERT_FALSE(old_stream);
395
396 std::move(on_channel_ready).Run();
397 ASSERT_EQ(0, GetRetryFailureCount());
398 run_loop.Quit();
399 }));
400
401 channel_->StartReceivingMessages(base::DoNothing(),
402 NotReachedStatusCallback(FROM_HERE));
403
404 run_loop.Run();
405 }
406
TEST_F(FtlMessageReceptionChannelTest,ServerClosesStream_ResetsStream)407 TEST_F(FtlMessageReceptionChannelTest, ServerClosesStream_ResetsStream) {
408 base::RunLoop run_loop;
409
410 base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
411 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
412 .WillOnce(StartStream(
413 [&](base::OnceClosure on_channel_ready,
414 const ReceiveMessagesResponseCallback& on_incoming_msg,
415 StatusCallback on_channel_closed) {
416 auto fake_server_stream = CreateFakeServerStream();
417 std::move(on_channel_ready).Run();
418
419 // Close the stream with OK.
420 std::move(on_channel_closed).Run(ProtobufHttpStatus::OK());
421 },
422 &old_stream))
423 .WillOnce(StartStream(
424 [&](base::OnceClosure on_channel_ready,
425 const ReceiveMessagesResponseCallback& on_incoming_msg,
426 StatusCallback on_channel_closed) {
427 ASSERT_FALSE(old_stream);
428
429 std::move(on_channel_ready).Run();
430 ASSERT_EQ(0, GetRetryFailureCount());
431 run_loop.Quit();
432 }));
433
434 channel_->StartReceivingMessages(base::DoNothing(),
435 NotReachedStatusCallback(FROM_HERE));
436
437 run_loop.Run();
438 }
439
TEST_F(FtlMessageReceptionChannelTest,TimeoutIncreasesToMaximum)440 TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) {
441 base::RunLoop run_loop;
442
443 int failure_count = 0;
444 int hitting_max_delay_count = 0;
445 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
446 .WillRepeatedly(StartStream(
447 [&](base::OnceClosure on_channel_ready,
448 const ReceiveMessagesResponseCallback& on_incoming_msg,
449 StatusCallback on_channel_closed) {
450 // Quit if delay is ~kBackoffMaxDelay three times.
451 if (hitting_max_delay_count == 3) {
452 std::move(on_channel_ready).Run();
453 ASSERT_EQ(0, GetRetryFailureCount());
454 run_loop.Quit();
455 return;
456 }
457
458 // Otherwise send UNAVAILABLE to reset the stream.
459
460 std::move(on_channel_closed)
461 .Run(ProtobufHttpStatus(
462 ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAVAILABLE,
463 ""));
464
465 int new_failure_count = GetRetryFailureCount();
466 ASSERT_LT(failure_count, new_failure_count);
467 failure_count = new_failure_count;
468
469 base::TimeDelta time_until_retry = GetTimeUntilRetry();
470
471 base::TimeDelta max_delay_diff =
472 time_until_retry - FtlServicesContext::kBackoffMaxDelay;
473
474 // Adjust for fuzziness.
475 if (max_delay_diff.magnitude() <
476 base::TimeDelta::FromMilliseconds(500)) {
477 hitting_max_delay_count++;
478 }
479
480 // This will tail-recursively call the stream opener.
481 task_environment_.FastForwardBy(time_until_retry);
482 }));
483
484 channel_->StartReceivingMessages(base::DoNothing(),
485 NotReachedStatusCallback(FROM_HERE));
486
487 run_loop.Run();
488 }
489
TEST_F(FtlMessageReceptionChannelTest,StartStreamFailsWithUnRecoverableErrorAndRetry_TimeoutApplied)490 TEST_F(FtlMessageReceptionChannelTest,
491 StartStreamFailsWithUnRecoverableErrorAndRetry_TimeoutApplied) {
492 base::RunLoop run_loop;
493
494 base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
495 EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
496 .WillOnce(StartStream(
497 [&](base::OnceClosure on_channel_ready,
498 const ReceiveMessagesResponseCallback& on_incoming_msg,
499 StatusCallback on_channel_closed) {
500 // The first open stream attempt fails with UNAUTHENTICATED error.
501 ASSERT_EQ(0, GetRetryFailureCount());
502
503 std::move(on_channel_closed)
504 .Run(ProtobufHttpStatus(ProtobufHttpStatus::ProtobufHttpStatus::
505 Code::UNAUTHENTICATED,
506 ""));
507
508 ASSERT_EQ(1, GetRetryFailureCount());
509 ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
510 GetTimeUntilRetry().InSecondsF(), 0.5);
511 },
512 &old_stream))
513 .WillOnce(StartStream(
514 [&](base::OnceClosure on_channel_ready,
515 const ReceiveMessagesResponseCallback& on_incoming_msg,
516 StatusCallback on_channel_closed) {
517 // Second open stream attempt succeeds.
518
519 // Assert old stream closed.
520 ASSERT_FALSE(old_stream);
521
522 ASSERT_EQ(1, GetRetryFailureCount());
523
524 std::move(on_channel_ready).Run();
525
526 ASSERT_EQ(0, GetRetryFailureCount());
527 }));
528
529 channel_->StartReceivingMessages(
530 base::DoNothing(),
531 base::BindLambdaForTesting([&](const ProtobufHttpStatus& status) {
532 ASSERT_EQ(ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAUTHENTICATED,
533 status.error_code());
534 channel_->StartReceivingMessages(run_loop.QuitClosure(),
535 NotReachedStatusCallback(FROM_HERE));
536 }));
537
538 run_loop.Run();
539 }
540
541 } // namespace remoting
542