1 // Copyright 2018 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/storage/client.h"
16 #include "google/cloud/storage/testing/storage_integration_test.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/internal/random.h"
19 #include "google/cloud/log.h"
20 #include "google/cloud/testing_util/assert_ok.h"
21 #include <gmock/gmock.h>
22 #include <future>
23 #include <thread>
24 
25 namespace google {
26 namespace cloud {
27 namespace storage {
28 inline namespace STORAGE_CLIENT_NS {
29 namespace {
30 using ObjectNameList = std::vector<std::string>;
31 
32 class ThreadIntegrationTest
33     : public google::cloud::storage::testing::StorageIntegrationTest {
34  public:
CreateObjects(std::string const & bucket_name,ObjectNameList const & group,std::string const & contents)35   static void CreateObjects(std::string const& bucket_name,
36                             ObjectNameList const& group,
37                             std::string const& contents) {
38     // Create our own client so no state is shared with the other threads.
39     StatusOr<Client> client = MakeIntegrationTestClient();
40     ASSERT_STATUS_OK(client);
41     for (auto const& object_name : group) {
42       StatusOr<ObjectMetadata> meta = client->InsertObject(
43           bucket_name, object_name, contents, IfGenerationMatch(0));
44       ASSERT_STATUS_OK(meta);
45     }
46   }
47 
DeleteObjects(std::string const & bucket_name,ObjectNameList const & group)48   static void DeleteObjects(std::string const& bucket_name,
49                             ObjectNameList const& group) {
50     // Create our own client so no state is shared with the other threads.
51     StatusOr<Client> client = MakeIntegrationTestClient();
52     ASSERT_STATUS_OK(client);
53     for (auto const& object_name : group) {
54       (void)client->DeleteObject(bucket_name, object_name);
55     }
56   }
57 
58  protected:
SetUp()59   void SetUp() override {
60     project_id_ =
61         google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
62     ASSERT_FALSE(project_id_.empty());
63     region_id_ = google::cloud::internal::GetEnv(
64                      "GOOGLE_CLOUD_CPP_STORAGE_TEST_REGION_ID")
65                      .value_or("");
66     ASSERT_FALSE(region_id_.empty());
67   }
68 
69   std::string project_id_;
70   std::string region_id_;
71 };
72 
73 /**
74  * Divides @p source in to @p count groups of approximately equal size.
75  */
DivideIntoEqualSizedGroups(ObjectNameList const & source,unsigned int count)76 std::vector<ObjectNameList> DivideIntoEqualSizedGroups(
77     ObjectNameList const& source, unsigned int count) {
78   std::vector<ObjectNameList> groups;
79   groups.reserve(count);
80   auto div = source.size() / count;
81   auto rem = source.size() % count;
82   // begin points to the beginning of the next chunk, it is incremented inside
83   // the loop by the number of elements.
84   std::size_t size;
85   for (auto begin = source.begin(); begin != source.end(); begin += size) {
86     size = div;
87     if (rem != 0) {
88       size++;
89       rem--;
90     }
91     auto remaining =
92         static_cast<std::size_t>(std::distance(begin, source.end()));
93     if (remaining < size) {
94       size = remaining;
95     }
96     auto end = begin;
97     std::advance(end, size);
98     groups.emplace_back(ObjectNameList(begin, end));
99   }
100   return groups;
101 }
102 
103 }  // anonymous namespace
104 
TEST_F(ThreadIntegrationTest,Unshared)105 TEST_F(ThreadIntegrationTest, Unshared) {
106   std::string bucket_name =
107       MakeRandomBucketName(/*prefix=*/"cloud-cpp-testing-");
108   auto bucket_client = MakeBucketIntegrationTestClient();
109   ASSERT_STATUS_OK(bucket_client);
110 
111   StatusOr<Client> client = MakeIntegrationTestClient();
112   ASSERT_STATUS_OK(client);
113 
114   StatusOr<BucketMetadata> meta = bucket_client->CreateBucketForProject(
115       bucket_name, project_id_,
116       BucketMetadata()
117           .set_storage_class(storage_class::Standard())
118           .set_location(region_id_)
119           .disable_versioning(),
120       PredefinedAcl("private"), PredefinedDefaultObjectAcl("projectPrivate"),
121       Projection("full"));
122   ASSERT_STATUS_OK(meta);
123   EXPECT_EQ(bucket_name, meta->name());
124 
125   constexpr int kObjectCount = 2000;
126   std::vector<std::string> objects;
127   objects.reserve(kObjectCount);
128   std::generate_n(std::back_inserter(objects), kObjectCount,
129                   [this] { return MakeRandomObjectName(); });
130 
131   unsigned int thread_count = std::thread::hardware_concurrency();
132   if (thread_count == 0) {
133     thread_count = 4;
134   }
135 
136   auto groups = DivideIntoEqualSizedGroups(objects, thread_count);
137   std::vector<std::future<void>> tasks;
138   tasks.reserve(groups.size());
139   for (auto const& g : groups) {
140     tasks.emplace_back(std::async(std::launch::async,
141                                   &ThreadIntegrationTest::CreateObjects,
142                                   bucket_name, g, LoremIpsum()));
143   }
144   for (auto& t : tasks) {
145     t.get();
146   }
147 
148   tasks.clear();
149   tasks.reserve(groups.size());
150   for (auto const& g : groups) {
151     tasks.emplace_back(std::async(std::launch::async,
152                                   &ThreadIntegrationTest::DeleteObjects,
153                                   bucket_name, g));
154   }
155   for (auto& t : tasks) {
156     t.get();
157   }
158 
159   auto delete_status = bucket_client->DeleteBucket(bucket_name);
160   ASSERT_STATUS_OK(delete_status);
161   // This is basically a smoke test, if the test does not crash it was
162   // successful.
163 }
164 
165 class CaptureSendHeaderBackend : public LogBackend {
166  public:
167   std::vector<std::string> log_lines;
168 
Process(LogRecord const & lr)169   void Process(LogRecord const& lr) override {
170     // Break the records in lines, because we will analyze the output per line.
171     std::istringstream is(lr.message);
172     std::string line;
173     while (std::getline(is, line)) {
174       log_lines.emplace_back(line);
175     }
176   }
177 
ProcessWithOwnership(LogRecord lr)178   void ProcessWithOwnership(LogRecord lr) override { Process(lr); }
179 };
180 
TEST_F(ThreadIntegrationTest,ReuseConnections)181 TEST_F(ThreadIntegrationTest, ReuseConnections) {
182   auto log_backend = std::make_shared<CaptureSendHeaderBackend>();
183 
184   auto client_options = ClientOptions::CreateDefaultClientOptions();
185   ASSERT_STATUS_OK(client_options);
186   Client client((*client_options)
187                     .set_enable_raw_client_tracing(true)
188                     .set_enable_http_tracing(true));
189 
190   std::string bucket_name =
191       MakeRandomBucketName(/*prefix=*/"cloud-cpp-testing-");
192 
193   auto id = LogSink::Instance().AddBackend(log_backend);
194   StatusOr<BucketMetadata> meta = client.CreateBucketForProject(
195       bucket_name, project_id_,
196       BucketMetadata()
197           .set_storage_class(storage_class::Standard())
198           .set_location(region_id_)
199           .disable_versioning(),
200       PredefinedAcl("private"), PredefinedDefaultObjectAcl("projectPrivate"),
201       Projection("full"));
202   ASSERT_STATUS_OK(meta);
203   EXPECT_EQ(bucket_name, meta->name());
204 
205   constexpr int kObjectCount = 100;
206   std::vector<std::string> objects;
207   objects.reserve(kObjectCount);
208   std::generate_n(std::back_inserter(objects), kObjectCount,
209                   [this] { return MakeRandomObjectName(); });
210 
211   std::vector<std::chrono::steady_clock::duration> create_elapsed;
212   std::vector<std::chrono::steady_clock::duration> delete_elapsed;
213   for (auto const& name : objects) {
214     auto start = std::chrono::steady_clock::now();
215     StatusOr<ObjectMetadata> meta = client.InsertObject(
216         bucket_name, name, LoremIpsum(), IfGenerationMatch(0));
217     ASSERT_STATUS_OK(meta);
218     create_elapsed.emplace_back(std::chrono::steady_clock::now() - start);
219   }
220   for (auto const& name : objects) {
221     auto start = std::chrono::steady_clock::now();
222     (void)client.DeleteObject(bucket_name, name);
223     delete_elapsed.emplace_back(std::chrono::steady_clock::now() - start);
224   }
225   LogSink::Instance().RemoveBackend(id);
226   auto delete_status = client.DeleteBucket(bucket_name);
227   ASSERT_STATUS_OK(delete_status);
228 
229   std::set<std::string> connected;
230   std::copy_if(
231       log_backend->log_lines.begin(), log_backend->log_lines.end(),
232       std::inserter(connected, connected.end()), [](std::string const& line) {
233         // libcurl prints established connections using this format:
234         //   Connected to <hostname> (<ipaddress>) port <num> (#<connection>)
235         // We capturing the different lines in that form tells us how many
236         // different connections were used.
237         return line.find("== curl(Info): Connected to ") != std::string::npos;
238       });
239   // We expect that at most 5% of the requests required a new connection,
240   // ideally it should be 1 connection, but anything small is acceptable. Recall
241   // that we make two requests per connection, so:
242   std::size_t max_expected_connections = kObjectCount * 2 * 5 / 100;
243   EXPECT_GE(max_expected_connections, connected.size()) << [&log_backend] {
244     return std::accumulate(log_backend->log_lines.begin(),
245                            log_backend->log_lines.end(), std::string{},
246                            [](std::string const& x, std::string const& y) {
247                              return x + "\n" + y;
248                            });
249   }();
250   // Zero connections indicates a bug in the test, the client should have
251   // connected at least once.
252   EXPECT_LT(0U, connected.size());
253 }
254 
255 }  // namespace STORAGE_CLIENT_NS
256 }  // namespace storage
257 }  // namespace cloud
258 }  // namespace google
259