1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/pubsub/internal/sequential_batch_sink.h"
16 #include "google/cloud/pubsub/testing/mock_batch_sink.h"
17 #include "google/cloud/pubsub/testing/test_retry_policies.h"
18 #include "google/cloud/pubsub/topic.h"
19 #include "google/cloud/testing_util/async_sequencer.h"
20 #include "google/cloud/testing_util/is_proto_equal.h"
21 #include "google/cloud/testing_util/status_matchers.h"
22 #include <gmock/gmock.h>
23 #include <condition_variable>
24 #include <deque>
25 #include <mutex>
26
27 namespace google {
28 namespace cloud {
29 namespace pubsub_internal {
30 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
31 namespace {
32
33 using ::google::cloud::testing_util::AsyncSequencer;
34 using ::google::cloud::testing_util::IsProtoEqual;
35 using ::google::cloud::testing_util::StatusIs;
36 using ::testing::Unused;
37
TestTopic()38 pubsub::Topic TestTopic() {
39 return pubsub::Topic("test-project", "test-topic");
40 }
41
TestOrderingKey()42 std::string TestOrderingKey() { return "test-key"; }
43
MakeRequest(int n)44 google::pubsub::v1::PublishRequest MakeRequest(int n) {
45 google::pubsub::v1::PublishRequest request;
46 request.set_topic(TestTopic().FullName());
47
48 for (int i = 0; i != n; ++i) {
49 auto& m = *request.add_messages();
50 m.set_message_id("message-" + std::to_string(i));
51 m.set_ordering_key(TestOrderingKey());
52 }
53 return request;
54 }
55
MakeResponse(google::pubsub::v1::PublishRequest const & request)56 google::pubsub::v1::PublishResponse MakeResponse(
57 google::pubsub::v1::PublishRequest const& request) {
58 google::pubsub::v1::PublishResponse response;
59 for (auto const& m : request.messages()) {
60 response.add_message_ids("id-" + m.message_id());
61 }
62 return response;
63 }
64
TEST(DefaultBatchSinkTest,BasicNoErrors)65 TEST(DefaultBatchSinkTest, BasicNoErrors) {
66 AsyncSequencer<void> sequencer;
67
68 // We will use this function to ensure the mock calls happen *after* some
69 // code below.
70 ::testing::MockFunction<void(int)> barrier;
71
72 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
73 {
74 ::testing::InSequence sequence;
75 EXPECT_CALL(barrier, Call(0));
76 EXPECT_CALL(*mock, AsyncPublish)
77 .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
78 EXPECT_THAT(r, IsProtoEqual(MakeRequest(3)));
79 return sequencer.PushBack().then(
80 [r](future<void>) { return make_status_or(MakeResponse(r)); });
81 });
82
83 EXPECT_CALL(barrier, Call(1));
84 EXPECT_CALL(*mock, AsyncPublish)
85 .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
86 EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
87 return sequencer.PushBack().then(
88 [r](future<void>) { return make_status_or(MakeResponse(r)); });
89 });
90
91 EXPECT_CALL(barrier, Call(2));
92 EXPECT_CALL(*mock, AsyncPublish)
93 .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
94 EXPECT_THAT(r, IsProtoEqual(MakeRequest(1)));
95 return sequencer.PushBack().then(
96 [r](future<void>) { return make_status_or(MakeResponse(r)); });
97 });
98 }
99
100 auto checkpoint = barrier.AsStdFunction();
101 auto uut = SequentialBatchSink::Create(mock);
102 // We only expect the first AsyncPublish() to be called.
103 checkpoint(0);
104 auto f1 = uut->AsyncPublish(MakeRequest(3));
105 auto f2 = uut->AsyncPublish(MakeRequest(2));
106 auto f3 = uut->AsyncPublish(MakeRequest(1));
107 EXPECT_EQ(2, uut->QueueDepth());
108
109 checkpoint(1);
110 sequencer.PopFront().set_value();
111 auto r1 = f1.get();
112 ASSERT_THAT(r1, StatusIs(StatusCode::kOk));
113 EXPECT_THAT(*r1, IsProtoEqual(MakeResponse(MakeRequest(3))));
114 EXPECT_EQ(1, uut->QueueDepth());
115
116 checkpoint(2);
117 sequencer.PopFront().set_value();
118 auto r2 = f2.get();
119 ASSERT_THAT(r2, StatusIs(StatusCode::kOk));
120 EXPECT_THAT(*r2, IsProtoEqual(MakeResponse(MakeRequest(2))));
121 EXPECT_EQ(0, uut->QueueDepth());
122 }
123
TEST(DefaultBatchSinkTest,BasicErrorHandling)124 TEST(DefaultBatchSinkTest, BasicErrorHandling) {
125 AsyncSequencer<void> sequencer;
126
127 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
128 {
129 ::testing::InSequence sequence;
130 EXPECT_CALL(*mock, AsyncPublish)
131 .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
132 EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
133 return sequencer.PushBack().then([r](future<void>) {
134 return StatusOr<google::pubsub::v1::PublishResponse>(
135 Status{StatusCode::kPermissionDenied, "uh-oh"});
136 });
137 });
138
139 EXPECT_CALL(*mock, ResumePublish);
140 EXPECT_CALL(*mock, AsyncPublish)
141 .WillOnce([&](google::pubsub::v1::PublishRequest const& r) {
142 EXPECT_THAT(r, IsProtoEqual(MakeRequest(2)));
143 return sequencer.PushBack().then(
144 [r](future<void>) { return make_status_or(MakeResponse(r)); });
145 });
146 }
147
148 auto uut = SequentialBatchSink::Create(mock);
149 auto f1 = uut->AsyncPublish(MakeRequest(2));
150 auto f2 = uut->AsyncPublish(MakeRequest(3));
151 auto f3 = uut->AsyncPublish(MakeRequest(3));
152 EXPECT_EQ(2, uut->QueueDepth());
153
154 sequencer.PopFront().set_value();
155 auto r1 = f1.get();
156 ASSERT_THAT(r1, StatusIs(StatusCode::kPermissionDenied));
157
158 // The queued messages should become satisfied with the same error status.
159 auto r2 = f2.get();
160 ASSERT_THAT(r2, StatusIs(StatusCode::kPermissionDenied));
161 auto r3 = f3.get();
162 ASSERT_THAT(r3, StatusIs(StatusCode::kPermissionDenied));
163
164 // And new messages should become satisfied with the same error too.
165 auto r4 = uut->AsyncPublish(MakeRequest(3)).get();
166 ASSERT_THAT(r4, StatusIs(StatusCode::kPermissionDenied));
167
168 // Calling ResumePublish() enables regular messages again.
169 uut->ResumePublish(TestOrderingKey());
170 auto f5 = uut->AsyncPublish(MakeRequest(2));
171 sequencer.PopFront().set_value();
172 auto r5 = f5.get();
173 ASSERT_THAT(r5, StatusIs(StatusCode::kOk));
174 EXPECT_THAT(*r5, IsProtoEqual(MakeResponse(MakeRequest(2))));
175 }
176
177 } // namespace
178 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
179 } // namespace pubsub_internal
180 } // namespace cloud
181 } // namespace google
182