1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/snapshot_builder.h"
16 #include "google/cloud/pubsub/subscription.h"
17 #include "google/cloud/pubsub/subscription_admin_client.h"
18 #include "google/cloud/pubsub/testing/random_names.h"
19 #include "google/cloud/pubsub/testing/test_retry_policies.h"
20 #include "google/cloud/pubsub/topic_admin_client.h"
21 #include "google/cloud/pubsub/version.h"
22 #include "google/cloud/internal/getenv.h"
23 #include "google/cloud/internal/random.h"
24 #include "google/cloud/testing_util/assert_ok.h"
25 #include "google/cloud/testing_util/is_proto_equal.h"
26 #include "google/cloud/testing_util/scoped_environment.h"
27 #include "google/cloud/testing_util/status_matchers.h"
28 #include <gmock/gmock.h>
29 
30 namespace google {
31 namespace cloud {
32 namespace pubsub {
33 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
34 namespace {
35 
36 using ::google::cloud::pubsub_testing::TestBackoffPolicy;
37 using ::google::cloud::pubsub_testing::TestRetryPolicy;
38 using ::google::cloud::testing_util::IsProtoEqual;
39 using ::google::cloud::testing_util::ScopedEnvironment;
40 using ::google::cloud::testing_util::StatusIs;
41 using ::testing::AnyOf;
42 using ::testing::Contains;
43 using ::testing::Not;
44 
UsingEmulator()45 bool UsingEmulator() {
46   return google::cloud::internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value();
47 }
48 
TEST(SubscriptionAdminIntegrationTest,SubscriptionCRUD)49 TEST(SubscriptionAdminIntegrationTest, SubscriptionCRUD) {
50   auto project_id =
51       google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
52   ASSERT_FALSE(project_id.empty());
53 
54   auto subscription_names = [](SubscriptionAdminClient client,
55                                std::string const& project_id) {
56     std::vector<std::string> names;
57     for (auto& subscription : client.ListSubscriptions(project_id)) {
58       EXPECT_STATUS_OK(subscription);
59       if (!subscription) break;
60       names.push_back(subscription->name());
61     }
62     return names;
63   };
64 
65   auto snapshot_names = [](SubscriptionAdminClient client,
66                            std::string const& project_id) {
67     std::vector<std::string> names;
68     for (auto& snapshot : client.ListSnapshots(project_id)) {
69       EXPECT_STATUS_OK(snapshot);
70       if (!snapshot) break;
71       names.push_back(snapshot->name());
72     }
73     return names;
74   };
75 
76   auto generator = google::cloud::internal::MakeDefaultPRNG();
77   Topic topic(project_id, pubsub_testing::RandomTopicId(generator));
78   Subscription subscription(project_id,
79                             pubsub_testing::RandomSubscriptionId(generator));
80 
81   auto topic_admin = TopicAdminClient(MakeTopicAdminConnection());
82   auto subscription_admin =
83       SubscriptionAdminClient(MakeSubscriptionAdminConnection());
84 
85   EXPECT_THAT(subscription_names(subscription_admin, project_id),
86               Not(Contains(subscription.FullName())));
87 
88   auto topic_metadata = topic_admin.CreateTopic(TopicBuilder(topic));
89   ASSERT_THAT(topic_metadata, AnyOf(StatusIs(StatusCode::kOk),
90                                     StatusIs(StatusCode::kAlreadyExists)));
91 
92   struct Cleanup {
93     std::function<void()> action;
94     explicit Cleanup(std::function<void()> a) : action(std::move(a)) {}
95     ~Cleanup() { action(); }
96   };
97   Cleanup cleanup_topic{
98       [&topic_admin, &topic] { topic_admin.DeleteTopic(topic); }};
99 
100   auto endpoint = "https://" + project_id + ".appspot.com/push";
101   auto create_response = subscription_admin.CreateSubscription(
102       topic, subscription,
103       SubscriptionBuilder{}.set_push_config(
104           PushConfigBuilder{}.set_push_endpoint(endpoint)));
105   ASSERT_THAT(create_response, AnyOf(StatusIs(StatusCode::kOk),
106                                      StatusIs(StatusCode::kAlreadyExists)));
107 
108   auto get_response = subscription_admin.GetSubscription(subscription);
109   ASSERT_STATUS_OK(get_response);
110   // We cannot compare the full protos because for push configs `Create...()`
111   // returns less information than `Get` :shrug:
112   EXPECT_EQ(create_response->name(), get_response->name());
113 
114   auto constexpr kTestDeadlineSeconds = 20;
115   auto update_response = subscription_admin.UpdateSubscription(
116       subscription, SubscriptionBuilder{}.set_ack_deadline(
117                         std::chrono::seconds(kTestDeadlineSeconds)));
118   ASSERT_STATUS_OK(update_response);
119   EXPECT_EQ(kTestDeadlineSeconds, update_response->ack_deadline_seconds());
120 
121   EXPECT_THAT(subscription_names(subscription_admin, project_id),
122               Contains(subscription.FullName()));
123 
124   auto modify_push_config_response = subscription_admin.ModifyPushSubscription(
125       subscription, PushConfigBuilder{});
126   EXPECT_STATUS_OK(modify_push_config_response);
127 
128   auto const topic_subscriptions = [&] {
129     std::vector<std::string> names;
130     for (auto& name : topic_admin.ListTopicSubscriptions(topic)) {
131       EXPECT_STATUS_OK(name);
132       names.push_back(std::move(*name));
133     }
134     return names;
135   }();
136   EXPECT_THAT(topic_subscriptions, Contains(subscription.FullName()));
137 
138   // To create snapshots we need at least one subscription, so we test those
139   // here too.
140   // TODO(#4792) - cannot test server-side assigned names, the emulator lacks
141   //    support for them.
142   Snapshot snapshot(project_id, pubsub_testing::RandomSnapshotId(generator));
143   auto create_snapshot_response =
144       subscription_admin.CreateSnapshot(subscription, snapshot);
145   ASSERT_STATUS_OK(create_snapshot_response);
146   EXPECT_EQ(snapshot.FullName(), create_snapshot_response->name());
147 
148   auto const topic_snapshots = [&] {
149     std::vector<std::string> names;
150     for (auto& name : topic_admin.ListTopicSnapshots(topic)) {
151       EXPECT_STATUS_OK(name);
152       names.push_back(std::move(*name));
153     }
154     return names;
155   }();
156   EXPECT_THAT(topic_snapshots, Contains(snapshot.FullName()));
157 
158   auto get_snapshot_response = subscription_admin.GetSnapshot(snapshot);
159   ASSERT_STATUS_OK(get_snapshot_response);
160   EXPECT_THAT(*get_snapshot_response, IsProtoEqual(*create_snapshot_response));
161 
162   // TODO(#4792) - the emulator does not support UpdateSnapshot()
163   if (!UsingEmulator()) {
164     auto update_snapshot_response = subscription_admin.UpdateSnapshot(
165         snapshot, SnapshotBuilder{}.add_label("test-label", "test-value"));
166     ASSERT_STATUS_OK(update_snapshot_response);
167     EXPECT_FALSE(update_snapshot_response->labels().empty());
168   }
169 
170   auto seek_response = subscription_admin.Seek(subscription, snapshot);
171   EXPECT_STATUS_OK(seek_response);
172 
173   EXPECT_THAT(snapshot_names(subscription_admin, project_id),
174               Contains(snapshot.FullName()));
175   auto delete_snapshot = subscription_admin.DeleteSnapshot(snapshot);
176   EXPECT_STATUS_OK(delete_snapshot);
177   EXPECT_THAT(snapshot_names(subscription_admin, project_id),
178               Not(Contains(snapshot.FullName())));
179 
180   // TODO(#4792) - the emulator does not support DetachSubscription()
181   if (!UsingEmulator()) {
182     auto detach_response = topic_admin.DetachSubscription(subscription);
183     ASSERT_STATUS_OK(detach_response);
184   }
185 
186   auto delete_response = subscription_admin.DeleteSubscription(subscription);
187   EXPECT_THAT(delete_response, AnyOf(StatusIs(StatusCode::kOk),
188                                      StatusIs(StatusCode::kNotFound)));
189 
190   EXPECT_THAT(subscription_names(subscription_admin, project_id),
191               Not(Contains(subscription.FullName())));
192 }
193 
TEST(SubscriptionAdminIntegrationTest,CreateSubscriptionFailure)194 TEST(SubscriptionAdminIntegrationTest, CreateSubscriptionFailure) {
195   // Use an invalid endpoint to force a connection error.
196   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
197   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
198       {}, TestRetryPolicy(), TestBackoffPolicy()));
199   auto create_response = client.CreateSubscription(
200       Topic("--invalid-project--", "--invalid-topic--"),
201       Subscription("--invalid-project--", "--invalid-subscription--"));
202   ASSERT_FALSE(create_response.ok());
203 }
204 
TEST(SubscriptionAdminIntegrationTest,GetSubscriptionFailure)205 TEST(SubscriptionAdminIntegrationTest, GetSubscriptionFailure) {
206   // Use an invalid endpoint to force a connection error.
207   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
208   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
209       {}, TestRetryPolicy(), TestBackoffPolicy()));
210   auto create_response = client.GetSubscription(
211       Subscription("--invalid-project--", "--invalid-subscription--"));
212   ASSERT_FALSE(create_response.ok());
213 }
214 
TEST(SubscriptionAdminIntegrationTest,UpdateSubscriptionFailure)215 TEST(SubscriptionAdminIntegrationTest, UpdateSubscriptionFailure) {
216   // Use an invalid endpoint to force a connection error.
217   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
218   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
219       {}, TestRetryPolicy(), TestBackoffPolicy()));
220   auto create_response = client.UpdateSubscription(
221       Subscription("--invalid-project--", "--invalid-subscription--"),
222       SubscriptionBuilder{}.set_ack_deadline(std::chrono::seconds(20)));
223   ASSERT_FALSE(create_response.ok());
224 }
225 
TEST(SubscriptionAdminIntegrationTest,ListSubscriptionsFailure)226 TEST(SubscriptionAdminIntegrationTest, ListSubscriptionsFailure) {
227   // Use an invalid endpoint to force a connection error.
228   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
229   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
230       {}, TestRetryPolicy(), TestBackoffPolicy()));
231   auto list = client.ListSubscriptions("--invalid-project--");
232   auto i = list.begin();
233   EXPECT_FALSE(i == list.end());
234   EXPECT_FALSE(*i);
235 }
236 
TEST(SubscriptionAdminIntegrationTest,DeleteSubscriptionFailure)237 TEST(SubscriptionAdminIntegrationTest, DeleteSubscriptionFailure) {
238   // Use an invalid endpoint to force a connection error.
239   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
240   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
241       {}, TestRetryPolicy(), TestBackoffPolicy()));
242   auto delete_response = client.DeleteSubscription(
243       Subscription("--invalid-project--", "--invalid-subscription--"));
244   ASSERT_FALSE(delete_response.ok());
245 }
246 
TEST(SubscriptionAdminIntegrationTest,ModifyPushConfigFailure)247 TEST(SubscriptionAdminIntegrationTest, ModifyPushConfigFailure) {
248   // Use an invalid endpoint to force a connection error.
249   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
250   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
251       {}, TestRetryPolicy(), TestBackoffPolicy()));
252   auto delete_response = client.ModifyPushSubscription(
253       Subscription("--invalid-project--", "--invalid-subscription--"),
254       PushConfigBuilder{});
255   ASSERT_FALSE(delete_response.ok());
256 }
257 
TEST(SubscriptionAdminIntegrationTest,CreateSnapshotFailure)258 TEST(SubscriptionAdminIntegrationTest, CreateSnapshotFailure) {
259   // Use an invalid endpoint to force a connection error.
260   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
261   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
262       {}, TestRetryPolicy(), TestBackoffPolicy()));
263   auto response = client.CreateSnapshot(
264       Subscription("--invalid-project--", "--invalid-subscription--"));
265   ASSERT_FALSE(response.ok());
266 }
267 
TEST(SubscriptionAdminIntegrationTest,GetSnapshotFailure)268 TEST(SubscriptionAdminIntegrationTest, GetSnapshotFailure) {
269   // Use an invalid endpoint to force a connection error.
270   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
271   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
272       {}, TestRetryPolicy(), TestBackoffPolicy()));
273   auto response = client.GetSnapshot(
274       Snapshot("--invalid-project--", "--invalid-snapshot--"));
275   ASSERT_FALSE(response.ok());
276 }
277 
TEST(SubscriptionAdminIntegrationTest,ListSnapshotsFailure)278 TEST(SubscriptionAdminIntegrationTest, ListSnapshotsFailure) {
279   // Use an invalid endpoint to force a connection error.
280   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
281   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
282       {}, TestRetryPolicy(), TestBackoffPolicy()));
283   auto list = client.ListSnapshots("--invalid-project--");
284   auto i = list.begin();
285   EXPECT_FALSE(i == list.end());
286   EXPECT_FALSE(*i);
287 }
288 
TEST(SubscriptionAdminIntegrationTest,UpdateSnapshotFailure)289 TEST(SubscriptionAdminIntegrationTest, UpdateSnapshotFailure) {
290   // Use an invalid endpoint to force a connection error.
291   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
292   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
293       {}, TestRetryPolicy(), TestBackoffPolicy()));
294   auto response = client.UpdateSnapshot(
295       Snapshot("--invalid-project--", "--invalid-snapshot--"),
296       SnapshotBuilder{}.clear_labels());
297   ASSERT_FALSE(response.ok());
298 }
299 
TEST(SubscriptionAdminIntegrationTest,DeleteSnapshotFailure)300 TEST(SubscriptionAdminIntegrationTest, DeleteSnapshotFailure) {
301   // Use an invalid endpoint to force a connection error.
302   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
303   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
304       {}, TestRetryPolicy(), TestBackoffPolicy()));
305   auto response = client.DeleteSnapshot(
306       Snapshot("--invalid-project--", "--invalid-snapshot--"));
307   ASSERT_FALSE(response.ok());
308 }
309 
TEST(SubscriptionAdminIntegrationTest,SeekFailureTimestamp)310 TEST(SubscriptionAdminIntegrationTest, SeekFailureTimestamp) {
311   // Use an invalid endpoint to force a connection error.
312   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
313   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
314       {}, TestRetryPolicy(), TestBackoffPolicy()));
315   auto response = client.Seek(
316       Subscription("--invalid-project--", "--invalid-subscription--"),
317       std::chrono::system_clock::now());
318   ASSERT_FALSE(response.ok());
319 }
320 
TEST(SubscriptionAdminIntegrationTest,SeekFailureSnapshot)321 TEST(SubscriptionAdminIntegrationTest, SeekFailureSnapshot) {
322   // Use an invalid endpoint to force a connection error.
323   ScopedEnvironment env("PUBSUB_EMULATOR_HOST", "localhost:1");
324   auto client = SubscriptionAdminClient(MakeSubscriptionAdminConnection(
325       {}, TestRetryPolicy(), TestBackoffPolicy()));
326   auto response = client.Seek(
327       Subscription("--invalid-project--", "--invalid-subscription--"),
328       Snapshot("--invalid-project--", "--invalid-snapshot--"));
329   ASSERT_FALSE(response.ok());
330 }
331 
332 }  // namespace
333 }  // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
334 }  // namespace pubsub
335 }  // namespace cloud
336 }  // namespace google
337