1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/internal/subscription_flow_control.h"
16 #include "google/cloud/pubsub/testing/mock_subscription_batch_source.h"
17 #include "google/cloud/internal/background_threads_impl.h"
18 #include "google/cloud/internal/random.h"
19 #include "google/cloud/testing_util/status_matchers.h"
20 #include <gmock/gmock.h>
21 
22 namespace google {
23 namespace cloud {
24 namespace pubsub_internal {
25 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
26 namespace {
27 
28 using ::google::cloud::testing_util::StatusIs;
29 using ::testing::ElementsAre;
30 
31 auto constexpr kUnlimitedCallbacks = 1000;
32 auto constexpr kDefaultSizeLwm = 10 * 1024 * 1024L;
33 auto constexpr kDefaultSizeHwm = 20 * 1024 * 1024L;
34 
35 class SubscriptionFlowControlTest : public ::testing::Test {
36  protected:
GenerateMessages(std::string const & prefix,int count)37   static google::pubsub::v1::PullResponse GenerateMessages(
38       std::string const& prefix, int count) {
39     google::pubsub::v1::PullResponse response;
40     for (int i = 0; i != count; ++i) {
41       auto const id = prefix + std::to_string(i);
42       auto& m = *response.add_received_messages();
43       m.set_ack_id("ack-" + id);
44       m.mutable_message()->set_message_id("message-" + id);
45     }
46     return response;
47   }
48 
GenerateSizedMessages(std::string const & prefix,int count,int size)49   google::pubsub::v1::PullResponse GenerateSizedMessages(
50       std::string const& prefix, int count, int size) {
51     // see https://cloud.google.com/pubsub/pricing
52     auto constexpr kMessageOverhead = 20;
53     auto const payload = google::cloud::internal::Sample(
54         generator_, size - kMessageOverhead, "0123456789");
55     google::pubsub::v1::PullResponse response;
56     for (int i = 0; i != count; ++i) {
57       auto const id = prefix + std::to_string(i);
58       auto& m = *response.add_received_messages();
59       m.set_ack_id("ack-" + id);
60       m.mutable_message()->set_message_id(payload);
61     }
62     return response;
63   }
64 
AddAsyncPull()65   future<void> AddAsyncPull() {
66     std::unique_lock<std::mutex> lk(async_pulls_mu_);
67     promise<void> p;
68     auto f = p.get_future();
69     async_pulls_.push_back(std::move(p));
70     async_pulls_cv_.notify_one();
71     return f;
72   }
73 
WaitAsyncPullCount(std::size_t n)74   void WaitAsyncPullCount(std::size_t n) {
75     std::unique_lock<std::mutex> lk(async_pulls_mu_);
76     async_pulls_cv_.wait(lk, [&] { return async_pulls_.size() >= n; });
77   }
78 
CurrentAcks()79   std::vector<std::string> CurrentAcks() {
80     std::unique_lock<std::mutex> lk(acks_mu_);
81     auto tmp = std::move(acks_);
82     acks_ = {};
83     return tmp;
84   }
85 
SaveAcks(google::pubsub::v1::ReceivedMessage const & m)86   void SaveAcks(google::pubsub::v1::ReceivedMessage const& m) {
87     std::unique_lock<std::mutex> lk(acks_mu_);
88     acks_.push_back(m.ack_id());
89     lk.unlock();
90     acks_cv_.notify_one();
91   }
92 
WaitAcks()93   void WaitAcks() { WaitAcksCount(1); }
94 
WaitAcksCount(std::size_t n)95   void WaitAcksCount(std::size_t n) {
96     std::unique_lock<std::mutex> lk(acks_mu_);
97     acks_cv_.wait(lk, [&] { return acks_.size() >= n; });
98   }
99 
100   std::mutex async_pulls_mu_;
101   std::condition_variable async_pulls_cv_;
102   std::vector<promise<void>> async_pulls_;
103 
104   std::mutex acks_mu_;
105   std::condition_variable acks_cv_;
106   std::vector<std::string> acks_;
107 
108   google::cloud::internal::DefaultPRNG generator_ =
109       google::cloud::internal::DefaultPRNG(std::random_device{}());
110 };
111 
TEST_F(SubscriptionFlowControlTest,Basic)112 TEST_F(SubscriptionFlowControlTest, Basic) {
113   auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
114   EXPECT_CALL(*mock, Shutdown);
115 
116   EXPECT_CALL(*mock, Pull)
117       .WillOnce([&](std::int32_t max_messages) {
118         EXPECT_EQ(1, max_messages);
119         return AddAsyncPull().then([](future<void>) {
120           return make_status_or(GenerateMessages("0-", 1));
121         });
122       })
123       .WillOnce([&](std::int32_t max_messages) {
124         EXPECT_EQ(1, max_messages);
125         return AddAsyncPull().then([](future<void>) {
126           return make_status_or(GenerateMessages("1-", 1));
127         });
128       })
129       .WillOnce([&](std::int32_t max_messages) {
130         EXPECT_EQ(1, max_messages);
131         return AddAsyncPull().then([](future<void>) {
132           return make_status_or(GenerateMessages("2-", 1));
133         });
134       });
135   EXPECT_CALL(*mock, AckMessage)
136       .WillOnce([&](std::string const& ack_id, std::size_t) {
137         EXPECT_THAT(ack_id, "ack-0-0");
138         return make_ready_future(Status{});
139       });
140   EXPECT_CALL(*mock, NackMessage)
141       .WillOnce([&](std::string const& ack_id, std::size_t) {
142         EXPECT_THAT(ack_id, "ack-1-0");
143         return make_ready_future(Status{});
144       });
145   // There may be a few of these calls during shutdown. We have a separate test
146   // for this, so we ignore them here.
147   EXPECT_CALL(*mock, BulkNack)
148       .WillRepeatedly([](std::vector<std::string> const&, std::size_t) {
149         return make_ready_future(Status{});
150       });
151 
152   google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
153   auto shutdown = std::make_shared<SessionShutdownManager>();
154   auto uut = SubscriptionFlowControl::Create(
155       background.cq(), shutdown, mock, /*message_count_lwm=*/0,
156       /*message_count_hwm=*/1,
157       /*message_size_lwm=*/kDefaultSizeLwm,
158       /*message_size_hwm=*/kDefaultSizeHwm);
159 
160   auto callback = [this](google::pubsub::v1::ReceivedMessage const& response) {
161     SaveAcks(response);
162   };
163 
164   auto done = shutdown->Start({});
165 
166   uut->Start(callback);
167   uut->Read(kUnlimitedCallbacks);
168   WaitAsyncPullCount(1);
169   EXPECT_TRUE(CurrentAcks().empty());
170   async_pulls_[0].set_value();
171   WaitAcks();
172   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0"));
173   uut->AckMessage("ack-0-0", 0);
174 
175   WaitAsyncPullCount(2);
176   async_pulls_[1].set_value();
177   WaitAcks();
178   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0"));
179   uut->NackMessage("ack-1-0", 0);
180 
181   WaitAsyncPullCount(3);
182 
183   // Shut down things to prevent more calls, only then complete the last
184   // asynchronous call.
185   shutdown->MarkAsShutdown(__func__, {});
186   uut->Shutdown();
187   async_pulls_[2].set_value();
188 
189   EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
190 }
191 
192 /// @test Verify that messages received after a shutdown are nacked.
TEST_F(SubscriptionFlowControlTest,NackOnShutdown)193 TEST_F(SubscriptionFlowControlTest, NackOnShutdown) {
194   auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
195   EXPECT_CALL(*mock, Shutdown);
196 
197   {
198     ::testing::InSequence sequence;
199     EXPECT_CALL(*mock, Pull).WillOnce([&](std::int32_t max_messages) {
200       EXPECT_EQ(1, max_messages);
201       return AddAsyncPull().then([](future<void>) {
202         return make_status_or(GenerateMessages("0-", 3));
203       });
204     });
205     EXPECT_CALL(*mock, BulkNack)
206         .WillOnce([&](std::vector<std::string> const& ack_ids, std::size_t) {
207           EXPECT_THAT(ack_ids, ElementsAre("ack-0-0", "ack-0-1", "ack-0-2"));
208           return make_ready_future(Status{});
209         });
210   }
211 
212   google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
213   auto shutdown = std::make_shared<SessionShutdownManager>();
214   auto uut =
215       SubscriptionFlowControl::Create(background.cq(), shutdown, mock, 0, 1,
216                                       /*message_size_lwm=*/kDefaultSizeLwm,
217                                       /*message_size_hwm=*/kDefaultSizeHwm);
218 
219   auto done = shutdown->Start({});
220   uut->Start([](google::pubsub::v1::ReceivedMessage const&) {});
221   uut->Read(kUnlimitedCallbacks);
222   WaitAsyncPullCount(1);
223 
224   shutdown->MarkAsShutdown(__func__, {});
225   uut->Shutdown();
226   async_pulls_[0].set_value();
227 
228   EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
229 }
230 
231 /// @test Verify that messages received after a shutdown are nacked.
TEST_F(SubscriptionFlowControlTest,HandleOnPullError)232 TEST_F(SubscriptionFlowControlTest, HandleOnPullError) {
233   auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
234   EXPECT_CALL(*mock, Shutdown);
235 
236   {
237     ::testing::InSequence sequence;
238     EXPECT_CALL(*mock, Pull)
239         .WillOnce([&](std::int32_t max_messages) {
240           EXPECT_EQ(10, max_messages);
241           return AddAsyncPull().then([](future<void>) {
242             return make_status_or(GenerateMessages("0-", 3));
243           });
244         })
245         .WillOnce([&](std::int32_t max_messages) {
246           EXPECT_EQ(7, max_messages);
247           return AddAsyncPull().then([](future<void>) {
248             return StatusOr<google::pubsub::v1::PullResponse>(
249                 Status(StatusCode::kUnknown, "uh?"));
250           });
251         });
252     EXPECT_CALL(*mock, BulkNack)
253         .WillOnce([&](std::vector<std::string> const& ack_ids, std::size_t) {
254           EXPECT_THAT(ack_ids, ElementsAre("ack-0-1", "ack-0-2"));
255           return make_ready_future(Status{});
256         });
257   }
258 
259   google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
260   auto shutdown = std::make_shared<SessionShutdownManager>();
261   auto uut =
262       SubscriptionFlowControl::Create(background.cq(), shutdown, mock,
263                                       /*message_count_lwm=*/0,
264                                       /*message_count_hwm=*/10,
265                                       /*message_size_lwm=*/kDefaultSizeLwm,
266                                       /*message_size_hwm=*/kDefaultSizeHwm);
267 
268   auto done = shutdown->Start({});
269   uut->Start([](google::pubsub::v1::ReceivedMessage const&) {});
270   uut->Read(1);
271   WaitAsyncPullCount(1);
272   async_pulls_[0].set_value();
273   WaitAsyncPullCount(2);
274   // Errors are reported to the ShutdownManager by the LeaseManagement layer,
275   // so let's do that here.
276   shutdown->MarkAsShutdown("test-code", Status(StatusCode::kUnknown, "uh?"));
277   // And then simulate the bulk cancel.
278   async_pulls_[1].set_value();
279 
280   EXPECT_THAT(done.get(), StatusIs(StatusCode::kUnknown, "uh?"));
281 }
282 
TEST_F(SubscriptionFlowControlTest,ReachMessageCountHwm)283 TEST_F(SubscriptionFlowControlTest, ReachMessageCountHwm) {
284   auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
285   EXPECT_CALL(*mock, Shutdown);
286   EXPECT_CALL(*mock, BulkNack)
287       .WillRepeatedly([](std::vector<std::string> const&, std::size_t) {
288         return make_ready_future(Status{});
289       });
290 
291   auto generate_messages = [](std::string const& prefix, int count) {
292     return [prefix, count](future<void>) {
293       return make_status_or(GenerateMessages(prefix, count));
294     };
295   };
296 
297   EXPECT_CALL(*mock, Pull)
298       .WillOnce([&](std::int32_t max_messages) {
299         EXPECT_EQ(10, max_messages);
300         return AddAsyncPull().then(generate_messages("0-", 5));
301       })
302       .WillOnce([&](std::int32_t max_messages) {
303         EXPECT_EQ(5, max_messages);
304         return AddAsyncPull().then(generate_messages("1-", 5));
305       })
306       .WillOnce([&](std::int32_t max_messages) {
307         EXPECT_EQ(8, max_messages);
308         return AddAsyncPull().then(generate_messages("2-", 3));
309       })
310       .WillOnce([&](std::int32_t max_messages) {
311         EXPECT_EQ(5, max_messages);
312         return AddAsyncPull().then(generate_messages("3-", 5));
313       });
314   EXPECT_CALL(*mock, NackMessage)
315       .WillOnce([&](std::string const& ack_id, std::size_t) {
316         EXPECT_EQ("ack-0-1", ack_id);
317         return make_ready_future(Status{});
318       })
319       .WillOnce([&](std::string const& ack_id, std::size_t) {
320         EXPECT_EQ("ack-0-3", ack_id);
321         return make_ready_future(Status{});
322       })
323       .WillOnce([&](std::string const& ack_id, std::size_t) {
324         EXPECT_EQ("ack-1-1", ack_id);
325         return make_ready_future(Status{});
326       });
327 
328   EXPECT_CALL(*mock, AckMessage)
329       .WillOnce([&](std::string const& ack_id, std::size_t) {
330         EXPECT_EQ("ack-0-0", ack_id);
331         return make_ready_future(Status{});
332       })
333       .WillOnce([&](std::string const& ack_id, std::size_t) {
334         EXPECT_EQ("ack-0-2", ack_id);
335         return make_ready_future(Status{});
336       })
337       .WillOnce([&](std::string const& ack_id, std::size_t) {
338         EXPECT_EQ("ack-0-4", ack_id);
339         return make_ready_future(Status{});
340       })
341       .WillOnce([&](std::string const& ack_id, std::size_t) {
342         EXPECT_EQ("ack-1-0", ack_id);
343         return make_ready_future(Status{});
344       })
345       .WillOnce([&](std::string const& ack_id, std::size_t) {
346         EXPECT_EQ("ack-1-2", ack_id);
347         return make_ready_future(Status{});
348       });
349 
350   google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
351   auto shutdown = std::make_shared<SessionShutdownManager>();
352   auto uut = SubscriptionFlowControl::Create(
353       background.cq(), shutdown, mock, /*message_count_lwm=*/2,
354       /*message_count_hwm=*/10,
355       /*message_size_lwm=*/kDefaultSizeLwm,
356       /*message_size_hwm=*/kDefaultSizeHwm);
357 
358   auto callback = [&](google::pubsub::v1::ReceivedMessage const& response) {
359     this->SaveAcks(response);
360   };
361 
362   auto done = shutdown->Start({});
363   uut->Start(callback);
364   uut->Read(kUnlimitedCallbacks);
365   WaitAsyncPullCount(1);
366   EXPECT_TRUE(acks_.empty());
367   async_pulls_[0].set_value();
368   WaitAcksCount(5);
369   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0", "ack-0-1", "ack-0-2",
370                                          "ack-0-3", "ack-0-4"));
371   WaitAsyncPullCount(2);
372   async_pulls_[1].set_value();
373   WaitAcksCount(5);
374   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0", "ack-1-1", "ack-1-2",
375                                          "ack-1-3", "ack-1-4"));
376   // Acking or Nacking the first 7 messages should have no effect, no new calls
377   // or incoming data.
378   for (auto const* id : {"ack-0-0", "ack-0-2", "ack-0-4", "ack-1-0"}) {
379     uut->AckMessage(id, 0);
380   }
381   for (auto const* id : {"ack-0-1", "ack-0-3", "ack-1-1"}) {
382     uut->NackMessage(id, 0);
383   }
384   EXPECT_EQ(2, async_pulls_.size());
385 
386   // Acking message number 8 should bring the count of messages below the LWM
387   // and trigger a new AsyncPull(), the third expectation from above.
388   uut->AckMessage("ack-1-2", 0);
389   WaitAsyncPullCount(3);
390   async_pulls_[2].set_value();
391   WaitAcksCount(3);
392   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-2-0", "ack-2-1", "ack-2-2"));
393 
394   WaitAsyncPullCount(4);
395 
396   // Shutting things down should prevent any further calls.
397   shutdown->MarkAsShutdown(__func__, {});
398   uut->Shutdown();
399   // Only after shutdown complete the last AsyncPull().
400   async_pulls_[3].set_value();
401 
402   EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
403 }
404 
TEST_F(SubscriptionFlowControlTest,ReachMessageSizeHwm)405 TEST_F(SubscriptionFlowControlTest, ReachMessageSizeHwm) {
406   auto mock = std::make_shared<pubsub_testing::MockSubscriptionBatchSource>();
407   EXPECT_CALL(*mock, Shutdown);
408 
409   auto constexpr kMessageSize = 1024;
410   auto generate_messages = [&](std::string const& prefix, int count) {
411     // Workaround GCC/Clang warnings for capturing constexpr vs. MSVC behavior
412     // around not capturing without defaults.
413     auto const size = kMessageSize;
414     return [=](future<void>) {
415       return make_status_or(GenerateSizedMessages(prefix, count, size));
416     };
417   };
418 
419   auto ack_success = [&](std::string const&, std::size_t) {
420     return make_ready_future(Status{});
421   };
422 
423   {
424     ::testing::InSequence sequence;
425     EXPECT_CALL(*mock, Pull(20)).WillOnce([&](std::int32_t) {
426       return AddAsyncPull().then(generate_messages("0-", 5));
427     });
428     EXPECT_CALL(*mock, Pull(15)).WillOnce([&](std::int32_t) {
429       return AddAsyncPull().then(generate_messages("1-", 5));
430     });
431     EXPECT_CALL(*mock, AckMessage("ack-0-0", 1024)).WillOnce(ack_success);
432     EXPECT_CALL(*mock, AckMessage("ack-0-2", 1024)).WillOnce(ack_success);
433     EXPECT_CALL(*mock, Pull(12)).WillOnce([&](std::int32_t) {
434       return AddAsyncPull().then(generate_messages("2-", 3));
435     });
436   }
437 
438   google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
439   auto shutdown = std::make_shared<SessionShutdownManager>();
440   auto uut =
441       SubscriptionFlowControl::Create(background.cq(), shutdown, mock,
442                                       /*message_count_lwm=*/19,
443                                       /*message_count_hwm=*/20,
444                                       /*message_size_lwm=*/8 * kMessageSize,
445                                       /*message_size_hwm=*/10 * kMessageSize);
446 
447   auto callback = [&](google::pubsub::v1::ReceivedMessage const& response) {
448     this->SaveAcks(response);
449   };
450 
451   auto done = shutdown->Start({});
452   uut->Start(callback);
453   uut->Read(kUnlimitedCallbacks);
454   WaitAsyncPullCount(1);
455   EXPECT_TRUE(acks_.empty());
456   async_pulls_[0].set_value();
457   WaitAcksCount(5);
458   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-0-0", "ack-0-1", "ack-0-2",
459                                          "ack-0-3", "ack-0-4"));
460   WaitAsyncPullCount(2);
461   async_pulls_[1].set_value();
462   WaitAcksCount(5);
463   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-1-0", "ack-1-1", "ack-1-2",
464                                          "ack-1-3", "ack-1-4"));
465 
466   // We are below the HWM for message count, and above the HWM for message size.
467   // That means no more calls until we go below the HWM for mesages.
468   uut->AckMessage("ack-0-0", 1024);
469   uut->AckMessage("ack-0-2", 1024);
470   WaitAsyncPullCount(3);
471   async_pulls_[2].set_value();
472   WaitAcksCount(3);
473   EXPECT_THAT(CurrentAcks(), ElementsAre("ack-2-0", "ack-2-1", "ack-2-2"));
474 
475   // Shutting things down should prevent any further calls.
476   shutdown->MarkAsShutdown(__func__, {});
477   uut->Shutdown();
478 
479   EXPECT_THAT(done.get(), StatusIs(StatusCode::kOk));
480 }
481 
482 }  // namespace
483 }  // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
484 }  // namespace pubsub_internal
485 }  // namespace cloud
486 }  // namespace google
487