1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/internal/sequential_batch_sink.h"
16 #include "google/cloud/pubsub/testing/mock_batch_sink.h"
17 #include "google/cloud/pubsub/testing/test_retry_policies.h"
18 #include "google/cloud/pubsub/topic.h"
19 #include "google/cloud/testing_util/async_sequencer.h"
20 #include "google/cloud/testing_util/is_proto_equal.h"
21 #include "google/cloud/testing_util/status_matchers.h"
22 #include <gmock/gmock.h>
23 #include <condition_variable>
24 #include <deque>
25 #include <mutex>
26 
27 namespace google {
28 namespace cloud {
29 namespace pubsub_internal {
30 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
31 namespace {
32 
33 using ::google::cloud::testing_util::AsyncSequencer;
34 using ::google::cloud::testing_util::IsProtoEqual;
35 using ::google::cloud::testing_util::StatusIs;
36 using ::testing::Unused;
37 
TestTopic()38 pubsub::Topic TestTopic() {
39   return pubsub::Topic("test-project", "test-topic");
40 }
41 
TestOrderingKey()42 std::string TestOrderingKey() { return "test-key"; }
43 
MakeRequest(int n)44 google::pubsub::v1::PublishRequest MakeRequest(int n) {
45   google::pubsub::v1::PublishRequest request;
46   request.set_topic(TestTopic().FullName());
47 
48   for (int i = 0; i != n; ++i) {
49     auto& m = *request.add_messages();
50     m.set_message_id("message-" + std::to_string(i));
51     m.set_ordering_key(TestOrderingKey());
52   }
53   return request;
54 }
55 
MakeResponse(google::pubsub::v1::PublishRequest const & request)56 google::pubsub::v1::PublishResponse MakeResponse(
57     google::pubsub::v1::PublishRequest const& request) {
58   google::pubsub::v1::PublishResponse response;
59   for (auto const& m : request.messages()) {
60     response.add_message_ids("id-" + m.message_id());
61   }
62   return response;
63 }
64 
TEST(DefaultBatchSinkTest,BasicNoErrors)65 TEST(DefaultBatchSinkTest, BasicNoErrors) {
66   AsyncSequencer<void> sequencer;
67 
68   // We will use this function to ensure the mock calls happen *after* some
69   // code below.
70   ::testing::MockFunction<void(int)> barrier;
71 
72   auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
73   {
74     ::testing::InSequence sequence;
75     EXPECT_CALL(barrier, Call(0));
76     EXPECT_CALL(*mock, AsyncPublish)
77         .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
78           EXPECT_THAT(r, IsProtoEqual(MakeRequest(3)));
79           return sequencer.PushBack().then(
80               [r](future<void>) { return make_status_or(MakeResponse(r)); });
81         });
82 
83     EXPECT_CALL(barrier, Call(1));
84     EXPECT_CALL(*mock, AsyncPublish)
85         .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
86           EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
87           return sequencer.PushBack().then(
88               [r](future<void>) { return make_status_or(MakeResponse(r)); });
89         });
90 
91     EXPECT_CALL(barrier, Call(2));
92     EXPECT_CALL(*mock, AsyncPublish)
93         .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
94           EXPECT_THAT(r, IsProtoEqual(MakeRequest(1)));
95           return sequencer.PushBack().then(
96               [r](future<void>) { return make_status_or(MakeResponse(r)); });
97         });
98   }
99 
100   auto checkpoint = barrier.AsStdFunction();
101   auto uut = SequentialBatchSink::Create(mock);
102   // We only expect the first AsyncPublish() to be called.
103   checkpoint(0);
104   auto f1 = uut->AsyncPublish(MakeRequest(3));
105   auto f2 = uut->AsyncPublish(MakeRequest(2));
106   auto f3 = uut->AsyncPublish(MakeRequest(1));
107   EXPECT_EQ(2, uut->QueueDepth());
108 
109   checkpoint(1);
110   sequencer.PopFront().set_value();
111   auto r1 = f1.get();
112   ASSERT_THAT(r1, StatusIs(StatusCode::kOk));
113   EXPECT_THAT(*r1, IsProtoEqual(MakeResponse(MakeRequest(3))));
114   EXPECT_EQ(1, uut->QueueDepth());
115 
116   checkpoint(2);
117   sequencer.PopFront().set_value();
118   auto r2 = f2.get();
119   ASSERT_THAT(r2, StatusIs(StatusCode::kOk));
120   EXPECT_THAT(*r2, IsProtoEqual(MakeResponse(MakeRequest(2))));
121   EXPECT_EQ(0, uut->QueueDepth());
122 }
123 
TEST(DefaultBatchSinkTest,BasicErrorHandling)124 TEST(DefaultBatchSinkTest, BasicErrorHandling) {
125   AsyncSequencer<void> sequencer;
126 
127   auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
128   {
129     ::testing::InSequence sequence;
130     EXPECT_CALL(*mock, AsyncPublish)
131         .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
132           EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
133           return sequencer.PushBack().then([r](future<void>) {
134             return StatusOr<google::pubsub::v1::PublishResponse>(
135                 Status{StatusCode::kPermissionDenied, "uh-oh"});
136           });
137         });
138 
139     EXPECT_CALL(*mock, ResumePublish);
140     EXPECT_CALL(*mock, AsyncPublish)
141         .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
142           EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
143           return sequencer.PushBack().then(
144               [r](future<void>) { return make_status_or(MakeResponse(r)); });
145         });
146   }
147 
148   auto uut = SequentialBatchSink::Create(mock);
149   auto f1 = uut->AsyncPublish(MakeRequest(2));
150   auto f2 = uut->AsyncPublish(MakeRequest(3));
151   auto f3 = uut->AsyncPublish(MakeRequest(3));
152   EXPECT_EQ(2, uut->QueueDepth());
153 
154   sequencer.PopFront().set_value();
155   auto r1 = f1.get();
156   ASSERT_THAT(r1, StatusIs(StatusCode::kPermissionDenied));
157 
158   // The queued messages should become satisfied with the same error status.
159   auto r2 = f2.get();
160   ASSERT_THAT(r2, StatusIs(StatusCode::kPermissionDenied));
161   auto r3 = f3.get();
162   ASSERT_THAT(r3, StatusIs(StatusCode::kPermissionDenied));
163 
164   // And new messages should become satisfied with the same error too.
165   auto r4 = uut->AsyncPublish(MakeRequest(3)).get();
166   ASSERT_THAT(r4, StatusIs(StatusCode::kPermissionDenied));
167 
168   // Calling ResumePublish() enables regular messages again.
169   uut->ResumePublish(TestOrderingKey());
170   auto f5 = uut->AsyncPublish(MakeRequest(2));
171   sequencer.PopFront().set_value();
172   auto r5 = f5.get();
173   ASSERT_THAT(r5, StatusIs(StatusCode::kOk));
174   EXPECT_THAT(*r5, IsProtoEqual(MakeResponse(MakeRequest(2))));
175 }
176 
177 }  // namespace
178 }  // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
179 }  // namespace pubsub_internal
180 }  // namespace cloud
181 }  // namespace google
182