1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/pubsub/publisher.h"
16 #include "google/cloud/pubsub/subscriber.h"
17 #include "google/cloud/pubsub/subscription.h"
18 #include "google/cloud/pubsub/subscription_admin_client.h"
19 #include "google/cloud/pubsub/testing/random_names.h"
20 #include "google/cloud/pubsub/topic_admin_client.h"
21 #include "google/cloud/pubsub/version.h"
22 #include "google/cloud/internal/getenv.h"
23 #include "google/cloud/internal/random.h"
24 #include "google/cloud/testing_util/assert_ok.h"
25 #include "google/cloud/testing_util/status_matchers.h"
26 #include <gmock/gmock.h>
27 #include <map>
28
29 namespace google {
30 namespace cloud {
31 namespace pubsub {
32 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
33 namespace {
34
35 using ::google::cloud::testing_util::StatusIs;
36 using ::testing::AnyOf;
37
TEST(MessageIntegrationTest,PublishPullAck)38 TEST(MessageIntegrationTest, PublishPullAck) {
39 auto project_id =
40 google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
41 ASSERT_FALSE(project_id.empty());
42
43 auto generator = google::cloud::internal::MakeDefaultPRNG();
44 Topic topic(project_id, pubsub_testing::RandomTopicId(generator));
45 Subscription subscription(project_id,
46 pubsub_testing::RandomSubscriptionId(generator));
47
48 auto topic_admin = TopicAdminClient(MakeTopicAdminConnection());
49 auto subscription_admin =
50 SubscriptionAdminClient(MakeSubscriptionAdminConnection());
51
52 auto topic_metadata = topic_admin.CreateTopic(TopicMutationBuilder(topic));
53 ASSERT_THAT(topic_metadata, AnyOf(StatusIs(StatusCode::kOk),
54 StatusIs(StatusCode::kAlreadyExists)));
55
56 struct Cleanup {
57 std::function<void()> action;
58 explicit Cleanup(std::function<void()> a) : action(std::move(a)) {}
59 ~Cleanup() { action(); }
60 };
61 Cleanup cleanup_topic(
62 [topic_admin, &topic]() mutable { topic_admin.DeleteTopic(topic); });
63
64 auto subscription_metadata = subscription_admin.CreateSubscription(
65 topic, subscription,
66 SubscriptionMutationBuilder{}.set_ack_deadline(std::chrono::seconds(10)));
67 ASSERT_THAT(
68 subscription_metadata,
69 AnyOf(StatusIs(StatusCode::kOk), StatusIs(StatusCode::kAlreadyExists)));
70
71 auto publisher = Publisher(MakePublisherConnection(topic, {}));
72 auto subscriber = Subscriber(MakeSubscriberConnection());
73
74 std::mutex mu;
75 std::map<std::string, int> ids;
76 for (auto const* data : {"message-0", "message-1", "message-2"}) {
77 auto response =
78 publisher.Publish(MessageBuilder{}.SetData(data).Build()).get();
79 EXPECT_STATUS_OK(response);
80 if (response) {
81 std::lock_guard<std::mutex> lk(mu);
82 ids.emplace(*std::move(response), 0);
83 }
84 }
85 EXPECT_FALSE(ids.empty());
86
87 promise<void> ids_empty;
88 auto handler = [&](pubsub::Message const& m, AckHandler h) {
89 SCOPED_TRACE("Search for message " + m.message_id());
90 std::unique_lock<std::mutex> lk(mu);
91 auto i = ids.find(m.message_id());
92 // Remember that Cloud Pub/Sub has "at least once" semantics, so a dup is
93 // perfectly possible, in that case the message would not be in the map of
94 // of pending ids.
95 if (i == ids.end()) return;
96 // The first time just NACK the message to exercise that path, we expect
97 // Cloud Pub/Sub to retry.
98 if (i->second == 0) {
99 std::move(h).nack();
100 ++i->second;
101 return;
102 }
103 ids.erase(i);
104 if (ids.empty()) ids_empty.set_value();
105 lk.unlock();
106 std::move(h).ack();
107 };
108
109 auto result = subscriber.Subscribe(subscription, handler);
110 // Wait until there are no more ids pending, then cancel the subscription and
111 // get its status.
112 ids_empty.get_future().get();
113 result.cancel();
114 EXPECT_STATUS_OK(result.get());
115
116 auto delete_response = subscription_admin.DeleteSubscription(subscription);
117 EXPECT_THAT(delete_response, AnyOf(StatusIs(StatusCode::kOk),
118 StatusIs(StatusCode::kNotFound)));
119 }
120
121 } // namespace
122 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
123 } // namespace pubsub
124 } // namespace cloud
125 } // namespace google
126