1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/subscriber_connection.h"
16 #include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
17 #include "google/cloud/internal/api_client_header.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include "google/cloud/testing_util/capture_log_lines_backend.h"
20 #include "google/cloud/testing_util/validate_metadata.h"
21 #include <gmock/gmock.h>
22 #include <atomic>
23 
24 namespace google {
25 namespace cloud {
26 namespace pubsub {
27 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
28 namespace {
29 
30 using ::google::cloud::testing_util::IsContextMDValid;
31 using ::testing::_;
32 using ::testing::AtLeast;
33 using ::testing::Contains;
34 using ::testing::HasSubstr;
35 
TEST(SubscriberConnectionTest,Basic)36 TEST(SubscriberConnectionTest, Basic) {
37   auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
38   Subscription const subscription("test-project", "test-subscription");
39 
40   EXPECT_CALL(*mock, AsyncPull(_, _, _))
41       .Times(AtLeast(1))
42       .WillRepeatedly([&](google::cloud::CompletionQueue&,
43                           std::unique_ptr<grpc::ClientContext>,
44                           google::pubsub::v1::PullRequest const& request) {
45         EXPECT_EQ(subscription.FullName(), request.subscription());
46         google::pubsub::v1::PullResponse response;
47         auto& m = *response.add_received_messages();
48         m.set_ack_id("test-ack-id-0");
49         m.mutable_message()->set_message_id("test-message-id-0");
50         return make_ready_future(make_status_or(response));
51       });
52   EXPECT_CALL(*mock, AsyncAcknowledge(_, _, _))
53       .Times(AtLeast(1))
54       .WillRepeatedly(
55           [&](google::cloud::CompletionQueue&,
56               std::unique_ptr<grpc::ClientContext>,
57               google::pubsub::v1::AcknowledgeRequest const& request) {
58             EXPECT_EQ(subscription.FullName(), request.subscription());
59             EXPECT_FALSE(request.ack_ids().empty());
60             for (auto const& id : request.ack_ids()) {
61               EXPECT_EQ("test-ack-id-0", id);
62             }
63             return make_ready_future(Status{});
64           });
65   // Depending on timing this might be called, but it is very rare.
66   EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _))
67       .WillRepeatedly([](google::cloud::CompletionQueue&,
68                          std::unique_ptr<grpc::ClientContext>,
69                          google::pubsub::v1::ModifyAckDeadlineRequest const&) {
70         return make_ready_future(Status{});
71       });
72 
73   CompletionQueue cq;
74   auto subscriber = pubsub_internal::MakeSubscriberConnection(
75       mock, ConnectionOptions{grpc::InsecureChannelCredentials()}
76                 .DisableBackgroundThreads(cq));
77   std::atomic_flag received_one{false};
78   promise<void> waiter;
79   auto handler = [&](Message const& m, AckHandler h) {
80     EXPECT_EQ("test-message-id-0", m.message_id());
81     EXPECT_EQ("test-ack-id-0", h.ack_id());
82     ASSERT_NO_FATAL_FAILURE(std::move(h).ack());
83     if (received_one.test_and_set()) return;
84     waiter.set_value();
85   };
86   std::thread t([&cq] { cq.Run(); });
87   auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
88   waiter.get_future().wait();
89   response.cancel();
90   ASSERT_STATUS_OK(response.get());
91   // We need to explicitly cancel any pending timers (some of which may be quite
92   // long) left by the subscription.
93   cq.CancelAll();
94   cq.Shutdown();
95   t.join();
96 }
97 
TEST(SubscriberConnectionTest,PullFailure)98 TEST(SubscriberConnectionTest, PullFailure) {
99   auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
100   Subscription const subscription("test-project", "test-subscription");
101 
102   auto const expected = Status(StatusCode::kPermissionDenied, "uh-oh");
103   EXPECT_CALL(*mock, AsyncPull(_, _, _))
104       .Times(AtLeast(1))
105       .WillRepeatedly([&](google::cloud::CompletionQueue&,
106                           std::unique_ptr<grpc::ClientContext>,
107                           google::pubsub::v1::PullRequest const& request) {
108         EXPECT_EQ(subscription.FullName(), request.subscription());
109         return make_ready_future(
110             StatusOr<google::pubsub::v1::PullResponse>(expected));
111       });
112 
113   auto subscriber = pubsub_internal::MakeSubscriberConnection(
114       mock, ConnectionOptions{grpc::InsecureChannelCredentials()});
115   auto handler = [&](Message const&, AckHandler const&) {};
116   auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
117   EXPECT_EQ(expected, response.get());
118 }
119 
120 /// @test Verify key events are logged
TEST(SubscriberConnectionTest,MakeSubscriberConnectionSetupsLogging)121 TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsLogging) {
122   auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
123   Subscription const subscription("test-project", "test-subscription");
124 
125   EXPECT_CALL(*mock, AsyncPull)
126       .Times(AtLeast(1))
127       .WillRepeatedly([&](google::cloud::CompletionQueue&,
128                           std::unique_ptr<grpc::ClientContext>,
129                           google::pubsub::v1::PullRequest const&) {
130         google::pubsub::v1::PullResponse response;
131         auto& m = *response.add_received_messages();
132         m.set_ack_id("test-ack-id-0");
133         m.mutable_message()->set_message_id("test-message-id-0");
134         return make_ready_future(make_status_or(response));
135       });
136   EXPECT_CALL(*mock, AsyncAcknowledge)
137       .Times(AtLeast(1))
138       .WillRepeatedly([](google::cloud::CompletionQueue&,
139                          std::unique_ptr<grpc::ClientContext>,
140                          google::pubsub::v1::AcknowledgeRequest const&) {
141         return make_ready_future(Status{});
142       });
143   // Depending on timing this might be called, but it is very rare.
144   EXPECT_CALL(*mock, AsyncModifyAckDeadline)
145       .WillRepeatedly([](google::cloud::CompletionQueue&,
146                          std::unique_ptr<grpc::ClientContext>,
147                          google::pubsub::v1::ModifyAckDeadlineRequest const&) {
148         return make_ready_future(Status{});
149       });
150 
151   auto backend =
152       std::make_shared<google::cloud::testing_util::CaptureLogLinesBackend>();
153   auto id = google::cloud::LogSink::Instance().AddBackend(backend);
154 
155   CompletionQueue cq;
156   auto subscriber = pubsub_internal::MakeSubscriberConnection(
157       mock, ConnectionOptions{grpc::InsecureChannelCredentials()}
158                 .DisableBackgroundThreads(cq)
159                 .enable_tracing("rpc"));
160   std::atomic_flag received_one{false};
161   promise<void> waiter;
162   auto handler = [&](Message const&, AckHandler h) {
163     std::move(h).ack();
164     if (received_one.test_and_set()) return;
165     waiter.set_value();
166   };
167   std::thread t([&cq] { cq.Run(); });
168   auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
169   waiter.get_future().wait();
170   response.cancel();
171   ASSERT_STATUS_OK(response.get());
172   // We need to explicitly cancel any pending timers (some of which may be quite
173   // long) left by the subscription.
174   cq.CancelAll();
175   cq.Shutdown();
176   t.join();
177 
178   auto const log_lines = backend->ClearLogLines();
179   EXPECT_THAT(log_lines, Contains(HasSubstr("AsyncPull")));
180   EXPECT_THAT(log_lines, Contains(HasSubstr("AsyncAcknowledge")));
181   google::cloud::LogSink::Instance().RemoveBackend(id);
182 }
183 
184 /// @test Verify the metadata decorator is configured
TEST(SubscriberConnectionTest,MakeSubscriberConnectionSetupsMetadata)185 TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsMetadata) {
186   auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
187   Subscription const subscription("test-project", "test-subscription");
188 
189   EXPECT_CALL(*mock, AsyncPull)
190       .Times(AtLeast(1))
191       .WillRepeatedly([&](google::cloud::CompletionQueue&,
192                           std::unique_ptr<grpc::ClientContext> context,
193                           google::pubsub::v1::PullRequest const&) {
194         EXPECT_STATUS_OK(
195             IsContextMDValid(*context, "google.pubsub.v1.Subscriber.Pull",
196                              google::cloud::internal::ApiClientHeader()));
197         google::pubsub::v1::PullResponse response;
198         auto& m = *response.add_received_messages();
199         m.set_ack_id("test-ack-id-0");
200         m.mutable_message()->set_message_id("test-message-id-0");
201         return make_ready_future(make_status_or(response));
202       });
203   EXPECT_CALL(*mock, AsyncAcknowledge)
204       .Times(AtLeast(1))
205       .WillRepeatedly([](google::cloud::CompletionQueue&,
206                          std::unique_ptr<grpc::ClientContext>,
207                          google::pubsub::v1::AcknowledgeRequest const&) {
208         return make_ready_future(Status{});
209       });
210   // Depending on timing this might be called, but it is very rare.
211   EXPECT_CALL(*mock, AsyncModifyAckDeadline)
212       .WillRepeatedly([](google::cloud::CompletionQueue&,
213                          std::unique_ptr<grpc::ClientContext>,
214                          google::pubsub::v1::ModifyAckDeadlineRequest const&) {
215         return make_ready_future(Status{});
216       });
217 
218   auto subscriber = pubsub_internal::MakeSubscriberConnection(
219       mock, ConnectionOptions{grpc::InsecureChannelCredentials()});
220   std::atomic_flag received_one{false};
221   promise<void> waiter;
222   auto handler = [&](Message const&, AckHandler h) {
223     std::move(h).ack();
224     if (received_one.test_and_set()) return;
225     waiter.set_value();
226   };
227   auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
228   waiter.get_future().wait();
229   response.cancel();
230   ASSERT_STATUS_OK(response.get());
231 }
232 
233 }  // namespace
234 }  // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
235 }  // namespace pubsub
236 }  // namespace cloud
237 }  // namespace google
238