1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/pubsub/snapshot_builder.h"
16 #include "google/cloud/pubsub/subscription.h"
17 #include "google/cloud/pubsub/subscription_admin_client.h"
18 #include "google/cloud/pubsub/testing/random_names.h"
19 #include "google/cloud/pubsub/testing/test_retry_policies.h"
20 #include "google/cloud/pubsub/topic_admin_client.h"
21 #include "google/cloud/pubsub/version.h"
22 #include "google/cloud/internal/getenv.h"
23 #include "google/cloud/internal/random.h"
24 #include "google/cloud/testing_util/assert_ok.h"
25 #include "google/cloud/testing_util/is_proto_equal.h"
26 #include "google/cloud/testing_util/scoped_environment.h"
27 #include "google/cloud/testing_util/status_matchers.h"
28 #include <gmock/gmock.h>
29
30 namespace google {
31 namespace cloud {
32 namespace pubsub {
33 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
34 namespace {
35
36 using ::google::cloud::pubsub_testing::TestBackoffPolicy;
37 using ::google::cloud::pubsub_testing::TestRetryPolicy;
38 using ::google::cloud::testing_util::IsProtoEqual;
39 using ::google::cloud::testing_util::ScopedEnvironment;
40 using ::google::cloud::testing_util::StatusIs;
41 using ::testing::AnyOf;
42 using ::testing::Contains;
43 using ::testing::Not;
44
UsingEmulator()45 bool UsingEmulator() {
46 return google::cloud::internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value();
47 }
48
TEST(SubscriptionAdminIntegrationTest,SubscriptionCRUD)49 TEST(SubscriptionAdminIntegrationTest, SubscriptionCRUD) {
50 auto project_id =
51 google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
52 ASSERT_FALSE(project_id.empty());
53
54 auto subscription_names = [](SubscriptionAdminClient client,
55 std::string const& project_id) {
56 std::vector<std::string> names;
57 for (auto& subscription : client.ListSubscriptions(project_id)) {
58 EXPECT_STATUS_OK(subscription);
59 if (!subscription) break;
60 names.push_back(subscription->name());
61 }
62 return names;
63 };
64
65 auto snapshot_names = [](SubscriptionAdminClient client,
66 std::string const& project_id) {
67 std::vector<std::string> names;
68 for (auto& snapshot : client.ListSnapshots(project_id)) {
69 EXPECT_STATUS_OK(snapshot);
70 if (!snapshot) break;
71 names.push_back(snapshot->name());
72 }
73 return names;
74 };
75
76 auto generator = google::cloud::internal::MakeDefaultPRNG();
77 Topic topic(project_id, pubsub_testing::RandomTopicId(generator));
78 Subscription subscription(project_id,
79 pubsub_testing::RandomSubscriptionId(generator));
80
81 auto topic_admin = TopicAdminClient(MakeTopicAdminConnection());
82 auto subscription_admin =
83 SubscriptionAdminClient(MakeSubscriptionAdminConnection());
84
85 EXPECT_THAT(subscription_names(subscription_admin, project_id),
86 Not(Contains(subscription.FullName())));
87
88 auto topic_metadata = topic_admin.CreateTopic(TopicBuilder(topic));
89 ASSERT_THAT(topic_metadata, AnyOf(StatusIs(StatusCode::kOk),
90 StatusIs(StatusCode::kAlreadyExists)));
91
92 struct Cleanup {
93 std::function<void()> action;
94 explicit Cleanup(std::function<void()> a) : action(std::move(a)) {}
95 ~Cleanup() { action(); }
96 };
97 Cleanup cleanup_topic{
98 [&topic_admin, &topic] { topic_admin.DeleteTopic(topic); }};
99
100 auto endpoint = "https://" + project_id + ".appspot.com/push";
101 auto create_response = subscription_admin.CreateSubscription(
102 topic, subscription,
103 SubscriptionBuilder{}.set_push_config(
104 PushConfigBuilder{}.set_push_endpoint(endpoint)));
105 ASSERT_THAT(create_response, AnyOf(StatusIs(StatusCode::kOk),
106 StatusIs(StatusCode::kAlreadyExists)));
107
108 auto get_response = subscription_admin.GetSubscription(subscription);
109 ASSERT_STATUS_OK(get_response);
110 // We cannot compare the full protos because for push configs `Create...()`
111 // returns less information than `Get` :shrug:
112 EXPECT_EQ(create_response->name(), get_response->name());
113
114 auto constexpr kTestDeadlineSeconds = 20;
115 auto update_response = subscription_admin.UpdateSubscription(
116 subscription, SubscriptionBuilder{}.set_ack_deadline(
117 std::chrono::seconds(kTestDeadlineSeconds)));
118 ASSERT_STATUS_OK(update_response);
119 EXPECT_EQ(kTestDeadlineSeconds, update_response->ack_deadline_seconds());
120
121 EXPECT_THAT(subscription_names(subscription_admin, project_id),
122 Contains(subscription.FullName()));
123
124 auto modify_push_config_response = subscription_admin.ModifyPushSubscription(
125 subscription, PushConfigBuilder{});
126 EXPECT_STATUS_OK(modify_push_config_response);
127
128 auto const topic_subscriptions = [&] {
129 std::vector<std::string> names;
130 for (auto& name : topic_admin.ListTopicSubscriptions(topic)) {
131 EXPECT_STATUS_OK(name);
132 names.push_back(std::move(*name));
133 }
134 return names;
135 }();
136 EXPECT_THAT(topic_subscriptions, Contains(subscription.FullName()));
137
138 // To create snapshots we need at least one subscription, so we test those
139 // here too.
140 // TODO(#4792) - cannot test server-side assigned names, the emulator lacks
141 // support for them.
142 Snapshot snapshot(project_id, pubsub_testing::RandomSnapshotId(generator));
143 auto create_snapshot_response =
144 subscription_admin.CreateSnapshot(subscription, snapshot);
145 ASSERT_STATUS_OK(create_snapshot_response);
146 EXPECT_EQ(snapshot.FullName(), create_snapshot_response->name());
147
148 auto const topic_snapshots = [&] {
149 std::vector<std::string> names;
150 for (auto& name : topic_admin.ListTopicSnapshots(topic)) {
151 EXPECT_STATUS_OK(name);
152 names.push_back(std::move(*name));
153 }
154 return names;
155 }();
156 EXPECT_THAT(topic_snapshots, Contains(snapshot.FullName()));
157
158 auto get_snapshot_response = subscription_admin.GetSnapshot(snapshot);
159 ASSERT_STATUS_OK(get_snapshot_response);
160 EXPECT_THAT(*get_snapshot_response, IsProtoEqual(*create_snapshot_response));
161
162 // TODO(#4792) - the emulator does not support UpdateSnapshot()
163 if (!UsingEmulator()) {
164 auto update_snapshot_response = subscription_admin.UpdateSnapshot(
165 snapshot, SnapshotBuilder{}.add_label("test-label", "test-value"));
166 ASSERT_STATUS_OK(update_snapshot_response);
167 EXPECT_FALSE(update_snapshot_response->labels().empty());
168 }
169
170 auto seek_response = subscription_admin.Seek(subscription, snapshot);
171 EXPECT_STATUS_OK(seek_response);
172
173 EXPECT_THAT(snapshot_names(subscription_admin, project_id),
174 Contains(snapshot.FullName()));
175 auto delete_snapshot = subscription_admin.DeleteSnapshot(snapshot);
176 EXPECT_STATUS_OK(delete_snapshot);
177 EXPECT_THAT(snapshot_names(subscription_admin, project_id),
178 Not(Contains(snapshot.FullName())));
179
180 // TODO(#4792) - the emulator does not support DetachSubscription()
181 if (!UsingEmulator()) {
182 auto detach_response = topic_admin.DetachSubscription(subscription);
183 ASSERT_STATUS_OK(detach_response);
184 }
185
186 auto delete_response = subscription_admin.DeleteSubscription(subscription);
187 EXPECT_THAT(delete_response, AnyOf(StatusIs(StatusCode::kOk),
188 StatusIs(StatusCode::kNotFound)));
189
190 EXPECT_THAT(subscription_names(subscription_admin, project_id),
191 Not(Contains(subscription.FullName())));
192 }
193
TEST(SubscriptionAdminIntegrationTest,CreateSubscriptionFailure)194 TEST(SubscriptionAdminIntegrationTest, CreateSubscriptionFailure) {
195 // Use an invalid endpoint to force a connection error.
196 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
197 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
198 {}, TestRetryPolicy(), TestBackoffPolicy()));
199 auto create_response = client.CreateSubscription(
200 Topic("--invalid-project--", "--invalid-topic--"),
201 Subscription("--invalid-project--", "--invalid-subscription--"));
202 ASSERT_FALSE(create_response.ok());
203 }
204
TEST(SubscriptionAdminIntegrationTest,GetSubscriptionFailure)205 TEST(SubscriptionAdminIntegrationTest, GetSubscriptionFailure) {
206 // Use an invalid endpoint to force a connection error.
207 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
208 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
209 {}, TestRetryPolicy(), TestBackoffPolicy()));
210 auto create_response = client.GetSubscription(
211 Subscription("--invalid-project--", "--invalid-subscription--"));
212 ASSERT_FALSE(create_response.ok());
213 }
214
TEST(SubscriptionAdminIntegrationTest,UpdateSubscriptionFailure)215 TEST(SubscriptionAdminIntegrationTest, UpdateSubscriptionFailure) {
216 // Use an invalid endpoint to force a connection error.
217 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
218 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
219 {}, TestRetryPolicy(), TestBackoffPolicy()));
220 auto create_response = client.UpdateSubscription(
221 Subscription("--invalid-project--", "--invalid-subscription--"),
222 SubscriptionBuilder{}.set_ack_deadline(std::chrono::seconds(20)));
223 ASSERT_FALSE(create_response.ok());
224 }
225
TEST(SubscriptionAdminIntegrationTest,ListSubscriptionsFailure)226 TEST(SubscriptionAdminIntegrationTest, ListSubscriptionsFailure) {
227 // Use an invalid endpoint to force a connection error.
228 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
229 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
230 {}, TestRetryPolicy(), TestBackoffPolicy()));
231 auto list = client.ListSubscriptions("--invalid-project--");
232 auto i = list.begin();
233 EXPECT_FALSE(i == list.end());
234 EXPECT_FALSE(*i);
235 }
236
TEST(SubscriptionAdminIntegrationTest,DeleteSubscriptionFailure)237 TEST(SubscriptionAdminIntegrationTest, DeleteSubscriptionFailure) {
238 // Use an invalid endpoint to force a connection error.
239 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
240 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
241 {}, TestRetryPolicy(), TestBackoffPolicy()));
242 auto delete_response = client.DeleteSubscription(
243 Subscription("--invalid-project--", "--invalid-subscription--"));
244 ASSERT_FALSE(delete_response.ok());
245 }
246
TEST(SubscriptionAdminIntegrationTest,ModifyPushConfigFailure)247 TEST(SubscriptionAdminIntegrationTest, ModifyPushConfigFailure) {
248 // Use an invalid endpoint to force a connection error.
249 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
250 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
251 {}, TestRetryPolicy(), TestBackoffPolicy()));
252 auto delete_response = client.ModifyPushSubscription(
253 Subscription("--invalid-project--", "--invalid-subscription--"),
254 PushConfigBuilder{});
255 ASSERT_FALSE(delete_response.ok());
256 }
257
TEST(SubscriptionAdminIntegrationTest,CreateSnapshotFailure)258 TEST(SubscriptionAdminIntegrationTest, CreateSnapshotFailure) {
259 // Use an invalid endpoint to force a connection error.
260 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
261 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
262 {}, TestRetryPolicy(), TestBackoffPolicy()));
263 auto response = client.CreateSnapshot(
264 Subscription("--invalid-project--", "--invalid-subscription--"));
265 ASSERT_FALSE(response.ok());
266 }
267
TEST(SubscriptionAdminIntegrationTest,GetSnapshotFailure)268 TEST(SubscriptionAdminIntegrationTest, GetSnapshotFailure) {
269 // Use an invalid endpoint to force a connection error.
270 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
271 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
272 {}, TestRetryPolicy(), TestBackoffPolicy()));
273 auto response = client.GetSnapshot(
274 Snapshot("--invalid-project--", "--invalid-snapshot--"));
275 ASSERT_FALSE(response.ok());
276 }
277
TEST(SubscriptionAdminIntegrationTest,ListSnapshotsFailure)278 TEST(SubscriptionAdminIntegrationTest, ListSnapshotsFailure) {
279 // Use an invalid endpoint to force a connection error.
280 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
281 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
282 {}, TestRetryPolicy(), TestBackoffPolicy()));
283 auto list = client.ListSnapshots("--invalid-project--");
284 auto i = list.begin();
285 EXPECT_FALSE(i == list.end());
286 EXPECT_FALSE(*i);
287 }
288
TEST(SubscriptionAdminIntegrationTest,UpdateSnapshotFailure)289 TEST(SubscriptionAdminIntegrationTest, UpdateSnapshotFailure) {
290 // Use an invalid endpoint to force a connection error.
291 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
292 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
293 {}, TestRetryPolicy(), TestBackoffPolicy()));
294 auto response = client.UpdateSnapshot(
295 Snapshot("--invalid-project--", "--invalid-snapshot--"),
296 SnapshotBuilder{}.clear_labels());
297 ASSERT_FALSE(response.ok());
298 }
299
TEST(SubscriptionAdminIntegrationTest,DeleteSnapshotFailure)300 TEST(SubscriptionAdminIntegrationTest, DeleteSnapshotFailure) {
301 // Use an invalid endpoint to force a connection error.
302 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
303 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
304 {}, TestRetryPolicy(), TestBackoffPolicy()));
305 auto response = client.DeleteSnapshot(
306 Snapshot("--invalid-project--", "--invalid-snapshot--"));
307 ASSERT_FALSE(response.ok());
308 }
309
TEST(SubscriptionAdminIntegrationTest,SeekFailureTimestamp)310 TEST(SubscriptionAdminIntegrationTest, SeekFailureTimestamp) {
311 // Use an invalid endpoint to force a connection error.
312 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
313 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
314 {}, TestRetryPolicy(), TestBackoffPolicy()));
315 auto response = client.Seek(
316 Subscription("--invalid-project--", "--invalid-subscription--"),
317 std::chrono::system_clock::now());
318 ASSERT_FALSE(response.ok());
319 }
320
TEST(SubscriptionAdminIntegrationTest,SeekFailureSnapshot)321 TEST(SubscriptionAdminIntegrationTest, SeekFailureSnapshot) {
322 // Use an invalid endpoint to force a connection error.
323 ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
324 auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
325 {}, TestRetryPolicy(), TestBackoffPolicy()));
326 auto response = client.Seek(
327 Subscription("--invalid-project--", "--invalid-subscription--"),
328 Snapshot("--invalid-project--", "--invalid-snapshot--"));
329 ASSERT_FALSE(response.ok());
330 }
331
332 } // namespace
333 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
334 } // namespace pubsub
335 } // namespace cloud
336 } // namespace google
337