1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/pubsub/internal/subscription_flow_control.h"
16 #include "google/cloud/pubsub/testing/mock_subscription_batch_source.h"
17 #include "google/cloud/internal/background_threads_impl.h"
18 #include "google/cloud/internal/random.h"
19 #include "google/cloud/testing_util/status_matchers.h"
20 #include <gmock/gmock.h>
21
22 namespace google {
23 namespace cloud {
24 namespace pubsub_internal {
25 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
26 namespace {
27
28 using ::google::cloud::testing_util::StatusIs;
29 using ::testing::ElementsAre;
30
31 auto constexpr kUnlimitedCallbacks = 1000;
32 auto constexpr kDefaultSizeLwm = 10 * 1024 * 1024L;
33 auto constexpr kDefaultSizeHwm = 20 * 1024 * 1024L;
34
35 class SubscriptionFlowControlTest : public ::testing::Test {
36 protected:
GenerateMessages(std::string const & prefix,int count)37 static google::pubsub::v1::PullResponse GenerateMessages(
38 std::string const& prefix, int count) {
39 google::pubsub::v1::PullResponse response;
40 for (int i = 0; i != count; ++i) {
41 auto const id = prefix + std::to_string(i);
42 auto& m = *response.add_received_messages();
43 m.set_ack_id("ack-" + id);
44 m.mutable_message()->set_message_id("message-" + id);
45 }
46 return response;
47 }
48
GenerateSizedMessages(std::string const & prefix,int count,int size)49 google::pubsub::v1::PullResponse GenerateSizedMessages(
50 std::string const& prefix, int count, int size) {
51 // see https://cloud.google.com/pubsub/pricing
52 auto constexpr kMessageOverhead = 20;
53 auto const payload = google::cloud::internal::Sample(
54 generator_, size - kMessageOverhead, "0123456789");
55 google::pubsub::v1::PullResponse response;
56 for (int i = 0; i != count; ++i) {
57 auto const id = prefix + std::to_string(i);
58 auto& m = *response.add_received_messages();
59 m.set_ack_id("ack-" + id);
60 m.mutable_message()->set_message_id(payload);
61 }
62 return response;
63 }
64
AddAsyncPull()65 future<void> AddAsyncPull() {
66 std::unique_lock<std::mutex> lk(async_pulls_mu_);
67 promise<void> p;
68 auto f = p.get_future();
69 async_pulls_.push_back(std::move(p));
70 async_pulls_cv_.notify_one();
71 return f;
72 }
73
WaitAsyncPullCount(std::size_t n)74 void WaitAsyncPullCount(std::size_t n) {
75 std::unique_lock<std::mutex> lk(async_pulls_mu_);
76 async_pulls_cv_.wait(lk, [&] { return async_pulls_.size() >= n; });
77 }
78
CurrentAcks()79 std::vector<std::string> CurrentAcks() {
80 std::unique_lock<std::mutex> lk(acks_mu_);
81 auto tmp = std::move(acks_);
82 acks_ = {};
83 return tmp;
84 }
85
SaveAcks(google::pubsub::v1::ReceivedMessage const & m)86 void SaveAcks(google::pubsub::v1::ReceivedMessage const& m) {
87 std::unique_lock<std::mutex> lk(acks_mu_);
88 acks_.push_back(m.ack_id());
89 lk.unlock();
90 acks_cv_.notify_one();
91 }
92
WaitAcks()93 void WaitAcks() { WaitAcksCount(1); }
94
WaitAcksCount(std::size_t n)95 void WaitAcksCount(std::size_t n) {
96 std::unique_lock<std::mutex> lk(acks_mu_);
97 acks_cv_.wait(lk, [&] { return acks_.size() >= n; });
98 }
99
100 std::mutex async_pulls_mu_;
101 std::condition_variable async_pulls_cv_;
102 std::vector<promise<void>> async_pulls_;
103
104 std::mutex acks_mu_;
105 std::condition_variable acks_cv_;
106 std::vector<std::string> acks_;
107
108 google::cloud::internal::DefaultPRNG generator_ =
109 google::cloud::internal::DefaultPRNG(std::random_device{}());
110 };
111
TEST_F(SubscriptionFlowControlTest,Basic)112 TEST_F(SubscriptionFlowControlTest, Basic) {
113 auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
114 EXPECT_CALL(*mock, Shutdown);
115
116 EXPECT_CALL(*mock, Pull)
117 .WillOnce([&](std::int32_t max_messages) {
118 EXPECT_EQ(1, max_messages);
119 return AddAsyncPull().then([](future<void>) {
120 return make_status_or(GenerateMessages("0-", 1));
121 });
122 })
123 .WillOnce([&](std::int32_t max_messages) {
124 EXPECT_EQ(1, max_messages);
125 return AddAsyncPull().then([](future<void>) {
126 return make_status_or(GenerateMessages("1-", 1));
127 });
128 })
129 .WillOnce([&](std::int32_t max_messages) {
130 EXPECT_EQ(1, max_messages);
131 return AddAsyncPull().then([](future<void>) {
132 return make_status_or(GenerateMessages("2-", 1));
133 });
134 });
135 EXPECT_CALL(*mock, AckMessage)
136 .WillOnce([&](std::string const& ack_id, std::size_t) {
137 EXPECT_THAT(ack_id, "ack-0-0");
138 return make_ready_future(Status{});
139 });
140 EXPECT_CALL(*mock, NackMessage)
141 .WillOnce([&](std::string const& ack_id, std::size_t) {
142 EXPECT_THAT(ack_id, "ack-1-0");
143 return make_ready_future(Status{});
144 });
145 // There may be a few of these calls during shutdown. We have a separate test
146 // for this, so we ignore them here.
147 EXPECT_CALL(*mock, BulkNack)
148 .WillRepeatedly([](std::vector<std::string> const&, std::size_t) {
149 return make_ready_future(Status{});
150 });
151
152 google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
153 auto shutdown = std::make_shared<SessionShutdownManager>();
154 auto uut = SubscriptionFlowControl::Create(
155 background.cq(), shutdown, mock, /*message_count_lwm=*/0,
156 /*message_count_hwm=*/1,
157 /*message_size_lwm=*/kDefaultSizeLwm,
158 /*message_size_hwm=*/kDefaultSizeHwm);
159
160 auto callback = [this](google::pubsub::v1::ReceivedMessage const& response) {
161 SaveAcks(response);
162 };
163
164 auto done = shutdown->Start({});
165
166 uut->Start(callback);
167 uut->Read(kUnlimitedCallbacks);
168 WaitAsyncPullCount(1);
169 EXPECT_TRUE(CurrentAcks().empty());
170 async_pulls_[0].set_value();
171 WaitAcks();
172 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0"));
173 uut->AckMessage("ack-0-0", 0);
174
175 WaitAsyncPullCount(2);
176 async_pulls_[1].set_value();
177 WaitAcks();
178 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0"));
179 uut->NackMessage("ack-1-0", 0);
180
181 WaitAsyncPullCount(3);
182
183 // Shut down things to prevent more calls, only then complete the last
184 // asynchronous call.
185 shutdown->MarkAsShutdown(__func__, {});
186 uut->Shutdown();
187 async_pulls_[2].set_value();
188
189 EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
190 }
191
192 /// @test Verify that messages received after a shutdown are nacked.
TEST_F(SubscriptionFlowControlTest,NackOnShutdown)193 TEST_F(SubscriptionFlowControlTest, NackOnShutdown) {
194 auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
195 EXPECT_CALL(*mock, Shutdown);
196
197 {
198 ::testing::InSequence sequence;
199 EXPECT_CALL(*mock, Pull).WillOnce([&](std::int32_t max_messages) {
200 EXPECT_EQ(1, max_messages);
201 return AddAsyncPull().then([](future<void>) {
202 return make_status_or(GenerateMessages("0-", 3));
203 });
204 });
205 EXPECT_CALL(*mock, BulkNack)
206 .WillOnce([&](std::vector<std::string> const& ack_ids, std::size_t) {
207 EXPECT_THAT(ack_ids, ElementsAre("ack-0-0", "ack-0-1", "ack-0-2"));
208 return make_ready_future(Status{});
209 });
210 }
211
212 google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
213 auto shutdown = std::make_shared<SessionShutdownManager>();
214 auto uut =
215 SubscriptionFlowControl::Create(background.cq(), shutdown, mock, 0, 1,
216 /*message_size_lwm=*/kDefaultSizeLwm,
217 /*message_size_hwm=*/kDefaultSizeHwm);
218
219 auto done = shutdown->Start({});
220 uut->Start([](google::pubsub::v1::ReceivedMessage const&) {});
221 uut->Read(kUnlimitedCallbacks);
222 WaitAsyncPullCount(1);
223
224 shutdown->MarkAsShutdown(__func__, {});
225 uut->Shutdown();
226 async_pulls_[0].set_value();
227
228 EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
229 }
230
231 /// @test Verify that messages received after a shutdown are nacked.
TEST_F(SubscriptionFlowControlTest,HandleOnPullError)232 TEST_F(SubscriptionFlowControlTest, HandleOnPullError) {
233 auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
234 EXPECT_CALL(*mock, Shutdown);
235
236 {
237 ::testing::InSequence sequence;
238 EXPECT_CALL(*mock, Pull)
239 .WillOnce([&](std::int32_t max_messages) {
240 EXPECT_EQ(10, max_messages);
241 return AddAsyncPull().then([](future<void>) {
242 return make_status_or(GenerateMessages("0-", 3));
243 });
244 })
245 .WillOnce([&](std::int32_t max_messages) {
246 EXPECT_EQ(7, max_messages);
247 return AddAsyncPull().then([](future<void>) {
248 return StatusOr<google::pubsub::v1::PullResponse>(
249 Status(StatusCode::kUnknown, "uh?"));
250 });
251 });
252 EXPECT_CALL(*mock, BulkNack)
253 .WillOnce([&](std::vector<std::string> const& ack_ids, std::size_t) {
254 EXPECT_THAT(ack_ids, ElementsAre("ack-0-1", "ack-0-2"));
255 return make_ready_future(Status{});
256 });
257 }
258
259 google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
260 auto shutdown = std::make_shared<SessionShutdownManager>();
261 auto uut =
262 SubscriptionFlowControl::Create(background.cq(), shutdown, mock,
263 /*message_count_lwm=*/0,
264 /*message_count_hwm=*/10,
265 /*message_size_lwm=*/kDefaultSizeLwm,
266 /*message_size_hwm=*/kDefaultSizeHwm);
267
268 auto done = shutdown->Start({});
269 uut->Start([](google::pubsub::v1::ReceivedMessage const&) {});
270 uut->Read(1);
271 WaitAsyncPullCount(1);
272 async_pulls_[0].set_value();
273 WaitAsyncPullCount(2);
274 // Errors are reported to the ShutdownManager by the LeaseManagement layer,
275 // so let's do that here.
276 shutdown->MarkAsShutdown("test-code", Status(StatusCode::kUnknown, "uh?"));
277 // And then simulate the bulk cancel.
278 async_pulls_[1].set_value();
279
280 EXPECT_THAT(done.get(), StatusIs(StatusCode::kUnknown, "uh?"));
281 }
282
TEST_F(SubscriptionFlowControlTest,ReachMessageCountHwm)283 TEST_F(SubscriptionFlowControlTest, ReachMessageCountHwm) {
284 auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
285 EXPECT_CALL(*mock, Shutdown);
286 EXPECT_CALL(*mock, BulkNack)
287 .WillRepeatedly([](std::vector<std::string> const&, std::size_t) {
288 return make_ready_future(Status{});
289 });
290
291 auto generate_messages = [](std::string const& prefix, int count) {
292 return [prefix, count](future<void>) {
293 return make_status_or(GenerateMessages(prefix, count));
294 };
295 };
296
297 EXPECT_CALL(*mock, Pull)
298 .WillOnce([&](std::int32_t max_messages) {
299 EXPECT_EQ(10, max_messages);
300 return AddAsyncPull().then(generate_messages("0-", 5));
301 })
302 .WillOnce([&](std::int32_t max_messages) {
303 EXPECT_EQ(5, max_messages);
304 return AddAsyncPull().then(generate_messages("1-", 5));
305 })
306 .WillOnce([&](std::int32_t max_messages) {
307 EXPECT_EQ(8, max_messages);
308 return AddAsyncPull().then(generate_messages("2-", 3));
309 })
310 .WillOnce([&](std::int32_t max_messages) {
311 EXPECT_EQ(5, max_messages);
312 return AddAsyncPull().then(generate_messages("3-", 5));
313 });
314 EXPECT_CALL(*mock, NackMessage)
315 .WillOnce([&](std::string const& ack_id, std::size_t) {
316 EXPECT_EQ("ack-0-1", ack_id);
317 return make_ready_future(Status{});
318 })
319 .WillOnce([&](std::string const& ack_id, std::size_t) {
320 EXPECT_EQ("ack-0-3", ack_id);
321 return make_ready_future(Status{});
322 })
323 .WillOnce([&](std::string const& ack_id, std::size_t) {
324 EXPECT_EQ("ack-1-1", ack_id);
325 return make_ready_future(Status{});
326 });
327
328 EXPECT_CALL(*mock, AckMessage)
329 .WillOnce([&](std::string const& ack_id, std::size_t) {
330 EXPECT_EQ("ack-0-0", ack_id);
331 return make_ready_future(Status{});
332 })
333 .WillOnce([&](std::string const& ack_id, std::size_t) {
334 EXPECT_EQ("ack-0-2", ack_id);
335 return make_ready_future(Status{});
336 })
337 .WillOnce([&](std::string const& ack_id, std::size_t) {
338 EXPECT_EQ("ack-0-4", ack_id);
339 return make_ready_future(Status{});
340 })
341 .WillOnce([&](std::string const& ack_id, std::size_t) {
342 EXPECT_EQ("ack-1-0", ack_id);
343 return make_ready_future(Status{});
344 })
345 .WillOnce([&](std::string const& ack_id, std::size_t) {
346 EXPECT_EQ("ack-1-2", ack_id);
347 return make_ready_future(Status{});
348 });
349
350 google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
351 auto shutdown = std::make_shared<SessionShutdownManager>();
352 auto uut = SubscriptionFlowControl::Create(
353 background.cq(), shutdown, mock, /*message_count_lwm=*/2,
354 /*message_count_hwm=*/10,
355 /*message_size_lwm=*/kDefaultSizeLwm,
356 /*message_size_hwm=*/kDefaultSizeHwm);
357
358 auto callback = [&](google::pubsub::v1::ReceivedMessage const& response) {
359 this->SaveAcks(response);
360 };
361
362 auto done = shutdown->Start({});
363 uut->Start(callback);
364 uut->Read(kUnlimitedCallbacks);
365 WaitAsyncPullCount(1);
366 EXPECT_TRUE(acks_.empty());
367 async_pulls_[0].set_value();
368 WaitAcksCount(5);
369 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0", "ack-0-1", "ack-0-2",
370 "ack-0-3", "ack-0-4"));
371 WaitAsyncPullCount(2);
372 async_pulls_[1].set_value();
373 WaitAcksCount(5);
374 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0", "ack-1-1", "ack-1-2",
375 "ack-1-3", "ack-1-4"));
376 // Acking or Nacking the first 7 messages should have no effect, no new calls
377 // or incoming data.
378 for (auto const* id : {"ack-0-0", "ack-0-2", "ack-0-4", "ack-1-0"}) {
379 uut->AckMessage(id, 0);
380 }
381 for (auto const* id : {"ack-0-1", "ack-0-3", "ack-1-1"}) {
382 uut->NackMessage(id, 0);
383 }
384 EXPECT_EQ(2, async_pulls_.size());
385
386 // Acking message number 8 should bring the count of messages below the LWM
387 // and trigger a new AsyncPull(), the third expectation from above.
388 uut->AckMessage("ack-1-2", 0);
389 WaitAsyncPullCount(3);
390 async_pulls_[2].set_value();
391 WaitAcksCount(3);
392 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-2-0", "ack-2-1", "ack-2-2"));
393
394 WaitAsyncPullCount(4);
395
396 // Shutting things down should prevent any further calls.
397 shutdown->MarkAsShutdown(__func__, {});
398 uut->Shutdown();
399 // Only after shutdown complete the last AsyncPull().
400 async_pulls_[3].set_value();
401
402 EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
403 }
404
TEST_F(SubscriptionFlowControlTest,ReachMessageSizeHwm)405 TEST_F(SubscriptionFlowControlTest, ReachMessageSizeHwm) {
406 auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
407 EXPECT_CALL(*mock, Shutdown);
408
409 auto constexpr kMessageSize = 1024;
410 auto generate_messages = [&](std::string const& prefix, int count) {
411 // Workaround GCC/Clang warnings for capturing constexpr vs. MSVC behavior
412 // around not capturing without defaults.
413 auto const size = kMessageSize;
414 return [=](future<void>) {
415 return make_status_or(GenerateSizedMessages(prefix, count, size));
416 };
417 };
418
419 auto ack_success = [&](std::string const&, std::size_t) {
420 return make_ready_future(Status{});
421 };
422
423 {
424 ::testing::InSequence sequence;
425 EXPECT_CALL(*mock, Pull(20)).WillOnce([&](std::int32_t) {
426 return AddAsyncPull().then(generate_messages("0-", 5));
427 });
428 EXPECT_CALL(*mock, Pull(15)).WillOnce([&](std::int32_t) {
429 return AddAsyncPull().then(generate_messages("1-", 5));
430 });
431 EXPECT_CALL(*mock, AckMessage("ack-0-0", 1024)).WillOnce(ack_success);
432 EXPECT_CALL(*mock, AckMessage("ack-0-2", 1024)).WillOnce(ack_success);
433 EXPECT_CALL(*mock, Pull(12)).WillOnce([&](std::int32_t) {
434 return AddAsyncPull().then(generate_messages("2-", 3));
435 });
436 }
437
438 google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
439 auto shutdown = std::make_shared<SessionShutdownManager>();
440 auto uut =
441 SubscriptionFlowControl::Create(background.cq(), shutdown, mock,
442 /*message_count_lwm=*/19,
443 /*message_count_hwm=*/20,
444 /*message_size_lwm=*/8 * kMessageSize,
445 /*message_size_hwm=*/10 * kMessageSize);
446
447 auto callback = [&](google::pubsub::v1::ReceivedMessage const& response) {
448 this->SaveAcks(response);
449 };
450
451 auto done = shutdown->Start({});
452 uut->Start(callback);
453 uut->Read(kUnlimitedCallbacks);
454 WaitAsyncPullCount(1);
455 EXPECT_TRUE(acks_.empty());
456 async_pulls_[0].set_value();
457 WaitAcksCount(5);
458 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0", "ack-0-1", "ack-0-2",
459 "ack-0-3", "ack-0-4"));
460 WaitAsyncPullCount(2);
461 async_pulls_[1].set_value();
462 WaitAcksCount(5);
463 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0", "ack-1-1", "ack-1-2",
464 "ack-1-3", "ack-1-4"));
465
466 // We are below the HWM for message count, and above the HWM for message size.
467 // That means no more calls until we go below the HWM for mesages.
468 uut->AckMessage("ack-0-0", 1024);
469 uut->AckMessage("ack-0-2", 1024);
470 WaitAsyncPullCount(3);
471 async_pulls_[2].set_value();
472 WaitAcksCount(3);
473 EXPECT_THAT(CurrentAcks(), ElementsAre("ack-2-0", "ack-2-1", "ack-2-2"));
474
475 // Shutting things down should prevent any further calls.
476 shutdown->MarkAsShutdown(__func__, {});
477 uut->Shutdown();
478
479 EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
480 }
481
482 } // namespace
483 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
484 } // namespace pubsub_internal
485 } // namespace cloud
486 } // namespace google
487