1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/signaling/ftl_message_reception_channel.h"
6 
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10 #include <utility>
11 #include <vector>
12 
13 #include "base/bind.h"
14 #include "base/callback_helpers.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/notreached.h"
17 #include "base/run_loop.h"
18 #include "base/test/bind.h"
19 #include "base/test/mock_callback.h"
20 #include "base/test/task_environment.h"
21 #include "base/threading/sequenced_task_runner_handle.h"
22 #include "remoting/base/protobuf_http_status.h"
23 #include "remoting/base/scoped_protobuf_http_request.h"
24 #include "remoting/proto/ftl/v1/ftl_messages.pb.h"
25 #include "remoting/signaling/ftl_services_context.h"
26 #include "remoting/signaling/signaling_tracker.h"
27 #include "testing/gmock/include/gmock/gmock.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29 
30 namespace remoting {
31 
32 namespace {
33 
34 using ::testing::_;
35 using ::testing::Expectation;
36 using ::testing::Invoke;
37 using ::testing::Property;
38 using ::testing::Return;
39 
40 using ReceiveMessagesResponseCallback = base::RepeatingCallback<void(
41     std::unique_ptr<ftl::ReceiveMessagesResponse>)>;
42 using StatusCallback = base::OnceCallback<void(const ProtobufHttpStatus&)>;
43 
44 class MockSignalingTracker : public SignalingTracker {
45  public:
46   MOCK_METHOD0(OnSignalingActive, void());
47 };
48 
49 // Fake stream implementation to allow probing if a stream is closed by client.
50 class FakeScopedProtobufHttpRequest : public ScopedProtobufHttpRequest {
51  public:
FakeScopedProtobufHttpRequest()52   FakeScopedProtobufHttpRequest()
53       : ScopedProtobufHttpRequest(base::DoNothing::Once()) {}
54   ~FakeScopedProtobufHttpRequest() override = default;
55 
GetWeakPtr()56   base::WeakPtr<FakeScopedProtobufHttpRequest> GetWeakPtr() {
57     return weak_factory_.GetWeakPtr();
58   }
59 
60  private:
61   base::WeakPtrFactory<FakeScopedProtobufHttpRequest> weak_factory_{this};
62   DISALLOW_COPY_AND_ASSIGN(FakeScopedProtobufHttpRequest);
63 };
64 
CreateFakeServerStream()65 std::unique_ptr<FakeScopedProtobufHttpRequest> CreateFakeServerStream() {
66   return std::make_unique<FakeScopedProtobufHttpRequest>();
67 }
68 
69 // Creates a gmock EXPECT_CALL action that:
70 //   1. Creates a fake server stream and returns it as the start stream result
71 //   2. Posts a task to call |on_stream_opened| at the end of current sequence
72 //   3. Writes the WeakPtr to the fake server stream to |optional_out_stream|
73 //      if it is provided.
74 template <typename OnStreamOpenedLambda>
StartStream(OnStreamOpenedLambda on_stream_opened,base::WeakPtr<FakeScopedProtobufHttpRequest> * optional_out_stream=nullptr)75 decltype(auto) StartStream(OnStreamOpenedLambda on_stream_opened,
76                            base::WeakPtr<FakeScopedProtobufHttpRequest>*
77                                optional_out_stream = nullptr) {
78   return [=](base::OnceClosure on_channel_ready,
79              const ReceiveMessagesResponseCallback& on_incoming_msg,
80              StatusCallback on_channel_closed) {
81     auto fake_stream = CreateFakeServerStream();
82     if (optional_out_stream) {
83       *optional_out_stream = fake_stream->GetWeakPtr();
84     }
85     auto on_stream_opened_cb = base::BindLambdaForTesting(on_stream_opened);
86     base::SequencedTaskRunnerHandle::Get()->PostTask(
87         FROM_HERE,
88         base::BindOnce(on_stream_opened_cb, std::move(on_channel_ready),
89                        on_incoming_msg, std::move(on_channel_closed)));
90     return fake_stream;
91   };
92 }
93 
NotReachedClosure()94 base::OnceClosure NotReachedClosure() {
95   return base::BindOnce([]() { NOTREACHED(); });
96 }
97 
98 base::RepeatingCallback<void(const ProtobufHttpStatus&)>
NotReachedStatusCallback(const base::Location & location)99 NotReachedStatusCallback(const base::Location& location) {
100   return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) {
101     NOTREACHED() << "Location: " << location.ToString()
102                  << ", status code: " << static_cast<int>(status.error_code());
103   });
104 }
105 
106 base::OnceCallback<void(const ProtobufHttpStatus&)>
CheckStatusThenQuitRunLoopCallback(const base::Location & from_here,ProtobufHttpStatus::Code expected_status_code,base::RunLoop * run_loop)107 CheckStatusThenQuitRunLoopCallback(
108     const base::Location& from_here,
109     ProtobufHttpStatus::Code expected_status_code,
110     base::RunLoop* run_loop) {
111   return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) {
112     ASSERT_EQ(expected_status_code, status.error_code())
113         << "Incorrect status code. Location: " << from_here.ToString();
114     run_loop->QuitWhenIdle();
115   });
116 }
117 
118 }  // namespace
119 
120 class FtlMessageReceptionChannelTest : public testing::Test {
121  public:
122   void SetUp() override;
123   void TearDown() override;
124 
125  protected:
126   base::TimeDelta GetTimeUntilRetry() const;
127   int GetRetryFailureCount() const;
128 
129   base::test::TaskEnvironment task_environment_{
130       base::test::TaskEnvironment::TimeSource::MOCK_TIME};
131   std::unique_ptr<FtlMessageReceptionChannel> channel_;
132   base::MockCallback<FtlMessageReceptionChannel::StreamOpener>
133       mock_stream_opener_;
134   base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>>
135       mock_on_incoming_msg_;
136   MockSignalingTracker mock_signaling_tracker_;
137 };
138 
SetUp()139 void FtlMessageReceptionChannelTest::SetUp() {
140   channel_ =
141       std::make_unique<FtlMessageReceptionChannel>(&mock_signaling_tracker_);
142   channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get());
143 }
144 
TearDown()145 void FtlMessageReceptionChannelTest::TearDown() {
146   channel_.reset();
147   task_environment_.FastForwardUntilNoTasksRemain();
148 }
149 
GetTimeUntilRetry() const150 base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const {
151   return channel_->GetReconnectRetryBackoffEntryForTesting()
152       .GetTimeUntilRelease();
153 }
154 
GetRetryFailureCount() const155 int FtlMessageReceptionChannelTest::GetRetryFailureCount() const {
156   return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count();
157 }
158 
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_StoppedImmediately)159 TEST_F(FtlMessageReceptionChannelTest,
160        TestStartReceivingMessages_StoppedImmediately) {
161   base::RunLoop run_loop;
162 
163   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
164       .WillOnce(StartStream(
165           [&](base::OnceClosure on_channel_ready,
166               const ReceiveMessagesResponseCallback& on_incoming_msg,
167               StatusCallback on_channel_closed) {
168             channel_->StopReceivingMessages();
169             run_loop.Quit();
170           }));
171 
172   channel_->StartReceivingMessages(NotReachedClosure(),
173                                    NotReachedStatusCallback(FROM_HERE));
174 
175   run_loop.Run();
176 }
177 
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_NotAuthenticated)178 TEST_F(FtlMessageReceptionChannelTest,
179        TestStartReceivingMessages_NotAuthenticated) {
180   base::RunLoop run_loop;
181 
182   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
183       .WillOnce(StartStream(
184           [&](base::OnceClosure on_channel_ready,
185               const ReceiveMessagesResponseCallback& on_incoming_msg,
186               StatusCallback on_channel_closed) {
187             std::move(on_channel_closed)
188                 .Run(ProtobufHttpStatus(
189                     ProtobufHttpStatus::Code::UNAUTHENTICATED, ""));
190           }));
191 
192   channel_->StartReceivingMessages(
193       NotReachedClosure(),
194       CheckStatusThenQuitRunLoopCallback(
195           FROM_HERE, ProtobufHttpStatus::Code::UNAUTHENTICATED, &run_loop));
196 
197   run_loop.Run();
198 }
199 
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_StreamStarted)200 TEST_F(FtlMessageReceptionChannelTest,
201        TestStartReceivingMessages_StreamStarted) {
202   base::RunLoop run_loop;
203 
204   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
205       .WillOnce(StartStream(
206           [&](base::OnceClosure on_channel_ready,
207               const ReceiveMessagesResponseCallback& on_incoming_msg,
208               StatusCallback on_channel_closed) {
209             std::move(on_channel_ready).Run();
210           }));
211 
212   channel_->StartReceivingMessages(run_loop.QuitClosure(),
213                                    NotReachedStatusCallback(FROM_HERE));
214 
215   run_loop.Run();
216 }
217 
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_RecoverableStreamError)218 TEST_F(FtlMessageReceptionChannelTest,
219        TestStartReceivingMessages_RecoverableStreamError) {
220   base::RunLoop run_loop;
221 
222   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
223   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
224       .WillOnce(StartStream(
225           [&](base::OnceClosure on_channel_ready,
226               const ReceiveMessagesResponseCallback& on_incoming_msg,
227               StatusCallback on_channel_closed) {
228             // The first open stream attempt fails with UNAVAILABLE error.
229             ASSERT_EQ(0, GetRetryFailureCount());
230 
231             std::move(on_channel_closed)
232                 .Run(ProtobufHttpStatus(ProtobufHttpStatus::Code::UNAVAILABLE,
233                                         ""));
234 
235             ASSERT_EQ(1, GetRetryFailureCount());
236             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
237                         GetTimeUntilRetry().InSecondsF(), 0.5);
238 
239             // This will make the channel reopen the stream.
240             task_environment_.FastForwardBy(GetTimeUntilRetry());
241           },
242           &old_stream))
243       .WillOnce(StartStream(
244           [&](base::OnceClosure on_channel_ready,
245               const ReceiveMessagesResponseCallback& on_incoming_msg,
246               StatusCallback on_channel_closed) {
247             // Second open stream attempt succeeds.
248 
249             // Assert old stream closed.
250             ASSERT_FALSE(old_stream);
251 
252             std::move(on_channel_ready).Run();
253 
254             ASSERT_EQ(0, GetRetryFailureCount());
255           }));
256 
257   channel_->StartReceivingMessages(run_loop.QuitClosure(),
258                                    NotReachedStatusCallback(FROM_HERE));
259 
260   run_loop.Run();
261 }
262 
TEST_F(FtlMessageReceptionChannelTest,TestStartReceivingMessages_MultipleCalls)263 TEST_F(FtlMessageReceptionChannelTest,
264        TestStartReceivingMessages_MultipleCalls) {
265   base::RunLoop run_loop;
266 
267   base::MockCallback<base::OnceClosure> stream_ready_callback;
268 
269   // Exits the run loop iff the callback is called three times with OK.
270   EXPECT_CALL(stream_ready_callback, Run())
271       .WillOnce(Return())
272       .WillOnce(Return())
273       .WillOnce([&]() { run_loop.Quit(); });
274 
275   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
276       .WillOnce(StartStream(
277           [&](base::OnceClosure on_channel_ready,
278               const ReceiveMessagesResponseCallback& on_incoming_msg,
279               StatusCallback on_channel_closed) {
280             std::move(on_channel_ready).Run();
281           }));
282 
283   channel_->StartReceivingMessages(stream_ready_callback.Get(),
284                                    NotReachedStatusCallback(FROM_HERE));
285   channel_->StartReceivingMessages(stream_ready_callback.Get(),
286                                    NotReachedStatusCallback(FROM_HERE));
287   channel_->StartReceivingMessages(stream_ready_callback.Get(),
288                                    NotReachedStatusCallback(FROM_HERE));
289 
290   run_loop.Run();
291 }
292 
TEST_F(FtlMessageReceptionChannelTest,StreamsTwoMessages)293 TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) {
294   base::RunLoop run_loop;
295 
296   constexpr char kMessage1Id[] = "msg_1";
297   constexpr char kMessage2Id[] = "msg_2";
298 
299   ftl::InboxMessage message_1;
300   message_1.set_message_id(kMessage1Id);
301   ftl::InboxMessage message_2;
302   message_2.set_message_id(kMessage2Id);
303 
304   EXPECT_CALL(mock_on_incoming_msg_,
305               Run(Property(&ftl::InboxMessage::message_id, kMessage1Id)))
306       .WillOnce(Return());
307   EXPECT_CALL(mock_on_incoming_msg_,
308               Run(Property(&ftl::InboxMessage::message_id, kMessage2Id)))
309       .WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); }));
310 
311   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
312       .WillOnce(StartStream(
313           [&](base::OnceClosure on_channel_ready,
314               const ReceiveMessagesResponseCallback& on_incoming_msg,
315               StatusCallback on_channel_closed) {
316             std::move(on_channel_ready).Run();
317 
318             auto response = std::make_unique<ftl::ReceiveMessagesResponse>();
319             *response->mutable_inbox_message() = message_1;
320             on_incoming_msg.Run(std::move(response));
321 
322             response = std::make_unique<ftl::ReceiveMessagesResponse>();
323             *response->mutable_inbox_message() = message_2;
324             on_incoming_msg.Run(std::move(response));
325 
326             const ProtobufHttpStatus kCancel(
327                 ProtobufHttpStatus::Code::CANCELLED, "Cancelled");
328             std::move(on_channel_closed).Run(kCancel);
329           }));
330 
331   channel_->StartReceivingMessages(
332       base::DoNothing(),
333       CheckStatusThenQuitRunLoopCallback(
334           FROM_HERE, ProtobufHttpStatus::ProtobufHttpStatus::Code::CANCELLED,
335           &run_loop));
336 
337   run_loop.Run();
338 }
339 
TEST_F(FtlMessageReceptionChannelTest,ReceivedOnePong_OnSignalingActiveTwice)340 TEST_F(FtlMessageReceptionChannelTest, ReceivedOnePong_OnSignalingActiveTwice) {
341   base::RunLoop run_loop;
342 
343   base::MockCallback<base::OnceClosure> stream_ready_callback;
344 
345   EXPECT_CALL(mock_signaling_tracker_, OnSignalingActive())
346       .WillOnce(Return())
347       .WillOnce([&]() { run_loop.Quit(); });
348 
349   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
350       .WillOnce(StartStream(
351           [&](base::OnceClosure on_channel_ready,
352               const ReceiveMessagesResponseCallback& on_incoming_msg,
353               StatusCallback on_channel_closed) {
354             std::move(on_channel_ready).Run();
355             auto response = std::make_unique<ftl::ReceiveMessagesResponse>();
356             response->mutable_pong();
357             on_incoming_msg.Run(std::move(response));
358           }));
359 
360   channel_->StartReceivingMessages(base::DoNothing(),
361                                    NotReachedStatusCallback(FROM_HERE));
362 
363   run_loop.Run();
364 }
365 
TEST_F(FtlMessageReceptionChannelTest,NoPongWithinTimeout_ResetsStream)366 TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) {
367   base::RunLoop run_loop;
368 
369   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
370   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
371       .WillOnce(StartStream(
372           [&](base::OnceClosure on_channel_ready,
373               const ReceiveMessagesResponseCallback& on_incoming_msg,
374               StatusCallback on_channel_closed) {
375             std::move(on_channel_ready).Run();
376             task_environment_.FastForwardBy(
377                 FtlMessageReceptionChannel::kPongTimeout);
378 
379             ASSERT_EQ(1, GetRetryFailureCount());
380             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
381                         GetTimeUntilRetry().InSecondsF(), 0.5);
382 
383             // This will make the channel reopen the stream.
384             task_environment_.FastForwardBy(GetTimeUntilRetry());
385           },
386           &old_stream))
387       .WillOnce(StartStream(
388           [&](base::OnceClosure on_channel_ready,
389               const ReceiveMessagesResponseCallback& on_incoming_msg,
390               StatusCallback on_channel_closed) {
391             // Stream is reopened.
392 
393             // Assert old stream closed.
394             ASSERT_FALSE(old_stream);
395 
396             std::move(on_channel_ready).Run();
397             ASSERT_EQ(0, GetRetryFailureCount());
398             run_loop.Quit();
399           }));
400 
401   channel_->StartReceivingMessages(base::DoNothing(),
402                                    NotReachedStatusCallback(FROM_HERE));
403 
404   run_loop.Run();
405 }
406 
TEST_F(FtlMessageReceptionChannelTest,ServerClosesStream_ResetsStream)407 TEST_F(FtlMessageReceptionChannelTest, ServerClosesStream_ResetsStream) {
408   base::RunLoop run_loop;
409 
410   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
411   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
412       .WillOnce(StartStream(
413           [&](base::OnceClosure on_channel_ready,
414               const ReceiveMessagesResponseCallback& on_incoming_msg,
415               StatusCallback on_channel_closed) {
416             auto fake_server_stream = CreateFakeServerStream();
417             std::move(on_channel_ready).Run();
418 
419             // Close the stream with OK.
420             std::move(on_channel_closed).Run(ProtobufHttpStatus::OK());
421           },
422           &old_stream))
423       .WillOnce(StartStream(
424           [&](base::OnceClosure on_channel_ready,
425               const ReceiveMessagesResponseCallback& on_incoming_msg,
426               StatusCallback on_channel_closed) {
427             ASSERT_FALSE(old_stream);
428 
429             std::move(on_channel_ready).Run();
430             ASSERT_EQ(0, GetRetryFailureCount());
431             run_loop.Quit();
432           }));
433 
434   channel_->StartReceivingMessages(base::DoNothing(),
435                                    NotReachedStatusCallback(FROM_HERE));
436 
437   run_loop.Run();
438 }
439 
TEST_F(FtlMessageReceptionChannelTest,TimeoutIncreasesToMaximum)440 TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) {
441   base::RunLoop run_loop;
442 
443   int failure_count = 0;
444   int hitting_max_delay_count = 0;
445   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
446       .WillRepeatedly(StartStream(
447           [&](base::OnceClosure on_channel_ready,
448               const ReceiveMessagesResponseCallback& on_incoming_msg,
449               StatusCallback on_channel_closed) {
450             // Quit if delay is ~kBackoffMaxDelay three times.
451             if (hitting_max_delay_count == 3) {
452               std::move(on_channel_ready).Run();
453               ASSERT_EQ(0, GetRetryFailureCount());
454               run_loop.Quit();
455               return;
456             }
457 
458             // Otherwise send UNAVAILABLE to reset the stream.
459 
460             std::move(on_channel_closed)
461                 .Run(ProtobufHttpStatus(
462                     ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAVAILABLE,
463                     ""));
464 
465             int new_failure_count = GetRetryFailureCount();
466             ASSERT_LT(failure_count, new_failure_count);
467             failure_count = new_failure_count;
468 
469             base::TimeDelta time_until_retry = GetTimeUntilRetry();
470 
471             base::TimeDelta max_delay_diff =
472                 time_until_retry - FtlServicesContext::kBackoffMaxDelay;
473 
474             // Adjust for fuzziness.
475             if (max_delay_diff.magnitude() <
476                 base::TimeDelta::FromMilliseconds(500)) {
477               hitting_max_delay_count++;
478             }
479 
480             // This will tail-recursively call the stream opener.
481             task_environment_.FastForwardBy(time_until_retry);
482           }));
483 
484   channel_->StartReceivingMessages(base::DoNothing(),
485                                    NotReachedStatusCallback(FROM_HERE));
486 
487   run_loop.Run();
488 }
489 
TEST_F(FtlMessageReceptionChannelTest,StartStreamFailsWithUnRecoverableErrorAndRetry_TimeoutApplied)490 TEST_F(FtlMessageReceptionChannelTest,
491        StartStreamFailsWithUnRecoverableErrorAndRetry_TimeoutApplied) {
492   base::RunLoop run_loop;
493 
494   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream;
495   EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
496       .WillOnce(StartStream(
497           [&](base::OnceClosure on_channel_ready,
498               const ReceiveMessagesResponseCallback& on_incoming_msg,
499               StatusCallback on_channel_closed) {
500             // The first open stream attempt fails with UNAUTHENTICATED error.
501             ASSERT_EQ(0, GetRetryFailureCount());
502 
503             std::move(on_channel_closed)
504                 .Run(ProtobufHttpStatus(ProtobufHttpStatus::ProtobufHttpStatus::
505                                             Code::UNAUTHENTICATED,
506                                         ""));
507 
508             ASSERT_EQ(1, GetRetryFailureCount());
509             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(),
510                         GetTimeUntilRetry().InSecondsF(), 0.5);
511           },
512           &old_stream))
513       .WillOnce(StartStream(
514           [&](base::OnceClosure on_channel_ready,
515               const ReceiveMessagesResponseCallback& on_incoming_msg,
516               StatusCallback on_channel_closed) {
517             // Second open stream attempt succeeds.
518 
519             // Assert old stream closed.
520             ASSERT_FALSE(old_stream);
521 
522             ASSERT_EQ(1, GetRetryFailureCount());
523 
524             std::move(on_channel_ready).Run();
525 
526             ASSERT_EQ(0, GetRetryFailureCount());
527           }));
528 
529   channel_->StartReceivingMessages(
530       base::DoNothing(),
531       base::BindLambdaForTesting([&](const ProtobufHttpStatus& status) {
532         ASSERT_EQ(ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAUTHENTICATED,
533                   status.error_code());
534         channel_->StartReceivingMessages(run_loop.QuitClosure(),
535                                          NotReachedStatusCallback(FROM_HERE));
536       }));
537 
538   run_loop.Run();
539 }
540 
541 }  // namespace remoting
542