1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/publisher.h"
16 #include "google/cloud/pubsub/subscriber.h"
17 #include "google/cloud/pubsub/subscription.h"
18 #include "google/cloud/pubsub/subscription_admin_client.h"
19 #include "google/cloud/pubsub/testing/random_names.h"
20 #include "google/cloud/pubsub/topic_admin_client.h"
21 #include "google/cloud/pubsub/version.h"
22 #include "google/cloud/internal/getenv.h"
23 #include "google/cloud/internal/random.h"
24 #include "google/cloud/testing_util/assert_ok.h"
25 #include "google/cloud/testing_util/status_matchers.h"
26 #include <gmock/gmock.h>
27 #include <map>
28 
29 namespace google {
30 namespace cloud {
31 namespace pubsub {
32 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
33 namespace {
34 
35 using ::google::cloud::testing_util::StatusIs;
36 using ::testing::AnyOf;
37 
TEST(MessageIntegrationTest,PublishPullAck)38 TEST(MessageIntegrationTest, PublishPullAck) {
39   auto project_id =
40       google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
41   ASSERT_FALSE(project_id.empty());
42 
43   auto generator = google::cloud::internal::MakeDefaultPRNG();
44   Topic topic(project_id, pubsub_testing::RandomTopicId(generator));
45   Subscription subscription(project_id,
46                             pubsub_testing::RandomSubscriptionId(generator));
47 
48   auto topic_admin = TopicAdminClient(MakeTopicAdminConnection());
49   auto subscription_admin =
50       SubscriptionAdminClient(MakeSubscriptionAdminConnection());
51 
52   auto topic_metadata = topic_admin.CreateTopic(TopicMutationBuilder(topic));
53   ASSERT_THAT(topic_metadata, AnyOf(StatusIs(StatusCode::kOk),
54                                     StatusIs(StatusCode::kAlreadyExists)));
55 
56   struct Cleanup {
57     std::function<void()> action;
58     explicit Cleanup(std::function<void()> a) : action(std::move(a)) {}
59     ~Cleanup() { action(); }
60   };
61   Cleanup cleanup_topic(
62       [topic_admin, &topic]() mutable { topic_admin.DeleteTopic(topic); });
63 
64   auto subscription_metadata = subscription_admin.CreateSubscription(
65       topic, subscription,
66       SubscriptionMutationBuilder{}.set_ack_deadline(std::chrono::seconds(10)));
67   ASSERT_THAT(
68       subscription_metadata,
69       AnyOf(StatusIs(StatusCode::kOk), StatusIs(StatusCode::kAlreadyExists)));
70 
71   auto publisher = Publisher(MakePublisherConnection(topic, {}));
72   auto subscriber = Subscriber(MakeSubscriberConnection());
73 
74   std::mutex mu;
75   std::map<std::string, int> ids;
76   for (auto const* data : {"message-0", "message-1", "message-2"}) {
77     auto response =
78         publisher.Publish(MessageBuilder{}.SetData(data).Build()).get();
79     EXPECT_STATUS_OK(response);
80     if (response) {
81       std::lock_guard<std::mutex> lk(mu);
82       ids.emplace(*std::move(response), 0);
83     }
84   }
85   EXPECT_FALSE(ids.empty());
86 
87   promise<void> ids_empty;
88   auto handler = [&](pubsub::Message const& m, AckHandler h) {
89     SCOPED_TRACE("Search for message " + m.message_id());
90     std::unique_lock<std::mutex> lk(mu);
91     auto i = ids.find(m.message_id());
92     // Remember that Cloud Pub/Sub has "at least once" semantics, so a dup is
93     // perfectly possible, in that case the message would not be in the map of
94     // of pending ids.
95     if (i == ids.end()) return;
96     // The first time just NACK the message to exercise that path, we expect
97     // Cloud Pub/Sub to retry.
98     if (i->second == 0) {
99       std::move(h).nack();
100       ++i->second;
101       return;
102     }
103     ids.erase(i);
104     if (ids.empty()) ids_empty.set_value();
105     lk.unlock();
106     std::move(h).ack();
107   };
108 
109   auto result = subscriber.Subscribe(subscription, handler);
110   // Wait until there are no more ids pending, then cancel the subscription and
111   // get its status.
112   ids_empty.get_future().get();
113   result.cancel();
114   EXPECT_STATUS_OK(result.get());
115 
116   auto delete_response = subscription_admin.DeleteSubscription(subscription);
117   EXPECT_THAT(delete_response, AnyOf(StatusIs(StatusCode::kOk),
118                                      StatusIs(StatusCode::kNotFound)));
119 }
120 
121 }  // namespace
122 }  // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
123 }  // namespace pubsub
124 }  // namespace cloud
125 }  // namespace google
126