1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/pubsub/subscriber_connection.h"
16 #include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
17 #include "google/cloud/internal/api_client_header.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include "google/cloud/testing_util/capture_log_lines_backend.h"
20 #include "google/cloud/testing_util/validate_metadata.h"
21 #include <gmock/gmock.h>
22 #include <atomic>
23
24 namespace google {
25 namespace cloud {
26 namespace pubsub {
27 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
28 namespace {
29
30 using ::google::cloud::testing_util::IsContextMDValid;
31 using ::testing::_;
32 using ::testing::AtLeast;
33 using ::testing::Contains;
34 using ::testing::HasSubstr;
35
TEST(SubscriberConnectionTest,Basic)36 TEST(SubscriberConnectionTest, Basic) {
37 auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
38 Subscription const subscription("test-project", "test-subscription");
39
40 EXPECT_CALL(*mock, AsyncPull(_, _, _))
41 .Times(AtLeast(1))
42 .WillRepeatedly([&](google::cloud::CompletionQueue&,
43 std::unique_ptr<grpc::ClientContext>,
44 google::pubsub::v1::PullRequest const& request) {
45 EXPECT_EQ(subscription.FullName(), request.subscription());
46 google::pubsub::v1::PullResponse response;
47 auto& m = *response.add_received_messages();
48 m.set_ack_id("test-ack-id-0");
49 m.mutable_message()->set_message_id("test-message-id-0");
50 return make_ready_future(make_status_or(response));
51 });
52 EXPECT_CALL(*mock, AsyncAcknowledge(_, _, _))
53 .Times(AtLeast(1))
54 .WillRepeatedly(
55 [&](google::cloud::CompletionQueue&,
56 std::unique_ptr<grpc::ClientContext>,
57 google::pubsub::v1::AcknowledgeRequest const& request) {
58 EXPECT_EQ(subscription.FullName(), request.subscription());
59 EXPECT_FALSE(request.ack_ids().empty());
60 for (auto const& id : request.ack_ids()) {
61 EXPECT_EQ("test-ack-id-0", id);
62 }
63 return make_ready_future(Status{});
64 });
65 // Depending on timing this might be called, but it is very rare.
66 EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _))
67 .WillRepeatedly([](google::cloud::CompletionQueue&,
68 std::unique_ptr<grpc::ClientContext>,
69 google::pubsub::v1::ModifyAckDeadlineRequest const&) {
70 return make_ready_future(Status{});
71 });
72
73 CompletionQueue cq;
74 auto subscriber = pubsub_internal::MakeSubscriberConnection(
75 mock, ConnectionOptions{grpc::InsecureChannelCredentials()}
76 .DisableBackgroundThreads(cq));
77 std::atomic_flag received_one{false};
78 promise<void> waiter;
79 auto handler = [&](Message const& m, AckHandler h) {
80 EXPECT_EQ("test-message-id-0", m.message_id());
81 EXPECT_EQ("test-ack-id-0", h.ack_id());
82 ASSERT_NO_FATAL_FAILURE(std::move(h).ack());
83 if (received_one.test_and_set()) return;
84 waiter.set_value();
85 };
86 std::thread t([&cq] { cq.Run(); });
87 auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
88 waiter.get_future().wait();
89 response.cancel();
90 ASSERT_STATUS_OK(response.get());
91 // We need to explicitly cancel any pending timers (some of which may be quite
92 // long) left by the subscription.
93 cq.CancelAll();
94 cq.Shutdown();
95 t.join();
96 }
97
TEST(SubscriberConnectionTest,PullFailure)98 TEST(SubscriberConnectionTest, PullFailure) {
99 auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
100 Subscription const subscription("test-project", "test-subscription");
101
102 auto const expected = Status(StatusCode::kPermissionDenied, "uh-oh");
103 EXPECT_CALL(*mock, AsyncPull(_, _, _))
104 .Times(AtLeast(1))
105 .WillRepeatedly([&](google::cloud::CompletionQueue&,
106 std::unique_ptr<grpc::ClientContext>,
107 google::pubsub::v1::PullRequest const& request) {
108 EXPECT_EQ(subscription.FullName(), request.subscription());
109 return make_ready_future(
110 StatusOr<google::pubsub::v1::PullResponse>(expected));
111 });
112
113 auto subscriber = pubsub_internal::MakeSubscriberConnection(
114 mock, ConnectionOptions{grpc::InsecureChannelCredentials()});
115 auto handler = [&](Message const&, AckHandler const&) {};
116 auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
117 EXPECT_EQ(expected, response.get());
118 }
119
120 /// @test Verify key events are logged
TEST(SubscriberConnectionTest,MakeSubscriberConnectionSetupsLogging)121 TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsLogging) {
122 auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
123 Subscription const subscription("test-project", "test-subscription");
124
125 EXPECT_CALL(*mock, AsyncPull)
126 .Times(AtLeast(1))
127 .WillRepeatedly([&](google::cloud::CompletionQueue&,
128 std::unique_ptr<grpc::ClientContext>,
129 google::pubsub::v1::PullRequest const&) {
130 google::pubsub::v1::PullResponse response;
131 auto& m = *response.add_received_messages();
132 m.set_ack_id("test-ack-id-0");
133 m.mutable_message()->set_message_id("test-message-id-0");
134 return make_ready_future(make_status_or(response));
135 });
136 EXPECT_CALL(*mock, AsyncAcknowledge)
137 .Times(AtLeast(1))
138 .WillRepeatedly([](google::cloud::CompletionQueue&,
139 std::unique_ptr<grpc::ClientContext>,
140 google::pubsub::v1::AcknowledgeRequest const&) {
141 return make_ready_future(Status{});
142 });
143 // Depending on timing this might be called, but it is very rare.
144 EXPECT_CALL(*mock, AsyncModifyAckDeadline)
145 .WillRepeatedly([](google::cloud::CompletionQueue&,
146 std::unique_ptr<grpc::ClientContext>,
147 google::pubsub::v1::ModifyAckDeadlineRequest const&) {
148 return make_ready_future(Status{});
149 });
150
151 auto backend =
152 std::make_shared<google::cloud::testing_util::CaptureLogLinesBackend>();
153 auto id = google::cloud::LogSink::Instance().AddBackend(backend);
154
155 CompletionQueue cq;
156 auto subscriber = pubsub_internal::MakeSubscriberConnection(
157 mock, ConnectionOptions{grpc::InsecureChannelCredentials()}
158 .DisableBackgroundThreads(cq)
159 .enable_tracing("rpc"));
160 std::atomic_flag received_one{false};
161 promise<void> waiter;
162 auto handler = [&](Message const&, AckHandler h) {
163 std::move(h).ack();
164 if (received_one.test_and_set()) return;
165 waiter.set_value();
166 };
167 std::thread t([&cq] { cq.Run(); });
168 auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
169 waiter.get_future().wait();
170 response.cancel();
171 ASSERT_STATUS_OK(response.get());
172 // We need to explicitly cancel any pending timers (some of which may be quite
173 // long) left by the subscription.
174 cq.CancelAll();
175 cq.Shutdown();
176 t.join();
177
178 auto const log_lines = backend->ClearLogLines();
179 EXPECT_THAT(log_lines, Contains(HasSubstr("AsyncPull")));
180 EXPECT_THAT(log_lines, Contains(HasSubstr("AsyncAcknowledge")));
181 google::cloud::LogSink::Instance().RemoveBackend(id);
182 }
183
184 /// @test Verify the metadata decorator is configured
TEST(SubscriberConnectionTest,MakeSubscriberConnectionSetupsMetadata)185 TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsMetadata) {
186 auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
187 Subscription const subscription("test-project", "test-subscription");
188
189 EXPECT_CALL(*mock, AsyncPull)
190 .Times(AtLeast(1))
191 .WillRepeatedly([&](google::cloud::CompletionQueue&,
192 std::unique_ptr<grpc::ClientContext> context,
193 google::pubsub::v1::PullRequest const&) {
194 EXPECT_STATUS_OK(
195 IsContextMDValid(*context, "google.pubsub.v1.Subscriber.Pull",
196 google::cloud::internal::ApiClientHeader()));
197 google::pubsub::v1::PullResponse response;
198 auto& m = *response.add_received_messages();
199 m.set_ack_id("test-ack-id-0");
200 m.mutable_message()->set_message_id("test-message-id-0");
201 return make_ready_future(make_status_or(response));
202 });
203 EXPECT_CALL(*mock, AsyncAcknowledge)
204 .Times(AtLeast(1))
205 .WillRepeatedly([](google::cloud::CompletionQueue&,
206 std::unique_ptr<grpc::ClientContext>,
207 google::pubsub::v1::AcknowledgeRequest const&) {
208 return make_ready_future(Status{});
209 });
210 // Depending on timing this might be called, but it is very rare.
211 EXPECT_CALL(*mock, AsyncModifyAckDeadline)
212 .WillRepeatedly([](google::cloud::CompletionQueue&,
213 std::unique_ptr<grpc::ClientContext>,
214 google::pubsub::v1::ModifyAckDeadlineRequest const&) {
215 return make_ready_future(Status{});
216 });
217
218 auto subscriber = pubsub_internal::MakeSubscriberConnection(
219 mock, ConnectionOptions{grpc::InsecureChannelCredentials()});
220 std::atomic_flag received_one{false};
221 promise<void> waiter;
222 auto handler = [&](Message const&, AckHandler h) {
223 std::move(h).ack();
224 if (received_one.test_and_set()) return;
225 waiter.set_value();
226 };
227 auto response = subscriber->Subscribe({subscription.FullName(), handler, {}});
228 waiter.get_future().wait();
229 response.cancel();
230 ASSERT_STATUS_OK(response.get());
231 }
232
233 } // namespace
234 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
235 } // namespace pubsub
236 } // namespace cloud
237 } // namespace google
238