1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/storage/client.h"
16 #include "google/cloud/storage/testing/storage_integration_test.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include <gmock/gmock.h>
20 #include <cstdio>
21 #include <fstream>
22 #include <future>
23 #include <regex>
24 #include <thread>
25 
26 namespace google {
27 namespace cloud {
28 namespace storage {
29 inline namespace STORAGE_CLIENT_NS {
30 namespace {
31 
32 auto const kObjectSize = 16 * 1024;
33 
34 class ObjectFileMultiThreadedTest
35     : public google::cloud::storage::testing::StorageIntegrationTest {
36  protected:
SetUp()37   void SetUp() override {
38     bucket_name_ = google::cloud::internal::GetEnv(
39                        "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME")
40                        .value_or("");
41     ASSERT_FALSE(bucket_name_.empty());
42     auto object_count = google::cloud::internal::GetEnv(
43         "GOOGLE_CLOUD_CPP_STORAGE_TEST_OBJECT_COUNT");
44     if (object_count) object_count_ = std::stoi(*object_count);
45   }
46 
ThreadCount()47   static int ThreadCount() {
48     static int const kCount = [] {
49       auto c = static_cast<int>(std::thread::hardware_concurrency());
50       return c == 0 ? 4 : 4 * c;
51     }();
52     return kCount;
53   }
54 
CreateObjectNames()55   std::vector<std::string> CreateObjectNames() {
56     std::vector<std::string> object_names(object_count_);
57     // Use MakeRandomFilename() because the same name is used for
58     // the destination file.
59     std::generate_n(object_names.begin(), object_names.size(),
60                     [this] { return MakeRandomFilename(); });
61     return object_names;
62   }
63 
CreateSomeObjects(Client client,std::vector<std::string> const & object_names,int thread_count,int modulo)64   Status CreateSomeObjects(Client client,
65                            std::vector<std::string> const& object_names,
66                            int thread_count, int modulo) {
67     auto contents = [this] {
68       std::unique_lock<std::mutex> lk(mu_);
69       return MakeRandomData(kObjectSize);
70     }();
71     int index = 0;
72     for (auto const& n : object_names) {
73       if (index++ % thread_count != modulo) continue;
74       if (modulo == 0) {
75         std::unique_lock<std::mutex> lk(mu_);
76         std::cout << '.' << std::flush;
77       }
78       auto metadata =
79           client.InsertObject(bucket_name_, n, contents, IfGenerationMatch(0));
80       if (!metadata) return metadata.status();
81     }
82     return Status();
83   }
84 
CreateObjects(Client const & client,std::vector<std::string> const & object_names)85   void CreateObjects(Client const& client,
86                      std::vector<std::string> const& object_names) {
87     // Parallelize the object creation too because it can be slow.
88     int const thread_count = ThreadCount();
89     auto create_some_objects = [this, &client, &object_names,
90                                 thread_count](int modulo) {
91       return CreateSomeObjects(client, object_names, thread_count, modulo);
92     };
93     std::vector<std::future<Status>> tasks(thread_count);
94     int modulo = 0;
95     for (auto& t : tasks) {
96       t = std::async(std::launch::async, create_some_objects, modulo++);
97     }
98     for (auto& t : tasks) {
99       auto const status = t.get();
100       EXPECT_STATUS_OK(status);
101     }
102   }
103 
DeleteSomeObjects(Client client,std::vector<std::string> const & object_names,int thread_count,int modulo)104   Status DeleteSomeObjects(Client client,
105                            std::vector<std::string> const& object_names,
106                            int thread_count, int modulo) {
107     int index = 0;
108     Status status;
109     for (auto const& name : object_names) {
110       if (index++ % thread_count != modulo) continue;
111       if (modulo == 0) {
112         std::unique_lock<std::mutex> lk(mu_);
113         std::cout << '.' << std::flush;
114       }
115       auto result = client.DeleteObject(bucket_name_, name);
116       if (!result.ok()) status = result;
117     }
118     return status;
119   }
120 
DeleteObjects(Client const & client,std::vector<std::string> const & object_names)121   void DeleteObjects(Client const& client,
122                      std::vector<std::string> const& object_names) {
123     // Parallelize the object deletion too because it can be slow.
124     int const thread_count = ThreadCount();
125     auto delete_some_objects = [this, &client, &object_names,
126                                 thread_count](int modulo) {
127       return DeleteSomeObjects(client, object_names, thread_count, modulo);
128     };
129     std::vector<std::future<Status>> tasks(thread_count);
130     int modulo = 0;
131     for (auto& t : tasks) {
132       t = std::async(std::launch::async, delete_some_objects, modulo++);
133     }
134     for (auto& t : tasks) {
135       auto const status = t.get();
136       EXPECT_STATUS_OK(status);
137     }
138   }
139 
140   std::mutex mu_;
141   std::string bucket_name_;
142   int object_count_ = 128;
143 };
144 
TEST_F(ObjectFileMultiThreadedTest,Download)145 TEST_F(ObjectFileMultiThreadedTest, Download) {
146   StatusOr<Client> client = MakeIntegrationTestClient();
147   ASSERT_STATUS_OK(client);
148 
149   auto const object_names = CreateObjectNames();
150   std::cout << "Create test objects " << std::flush;
151   ASSERT_NO_FATAL_FAILURE(CreateObjects(*client, object_names));
152   std::cout << " DONE\n";
153 
154   // Create multiple threads, each downloading a portion of the objects.
155   auto const thread_count = ThreadCount();
156   auto download_some_objects = [this, thread_count, &client,
157                                 &object_names](int modulo) {
158     std::cout << '+' << std::flush;
159     int index = 0;
160     for (auto const& name : object_names) {
161       if (index++ % thread_count != modulo) continue;
162       if (modulo == 0) {
163         std::unique_lock<std::mutex> lk(mu_);
164         std::cout << '.' << std::flush;
165       }
166       auto status = client->DownloadToFile(bucket_name_, name, name);
167       if (!status.ok()) return status;  // stop on the first error
168     }
169     return Status();
170   };
171   std::cout << "Performing downloads " << std::flush;
172   std::vector<std::future<Status>> tasks(thread_count);
173   int modulo = 0;
174   for (auto& t : tasks) {
175     t = std::async(std::launch::async, download_some_objects, modulo++);
176   }
177   for (auto& t : tasks) {
178     auto const status = t.get();
179     EXPECT_STATUS_OK(status);
180   }
181   std::cout << " DONE\n";
182 
183   for (auto const& name : object_names) {
184     EXPECT_EQ(0, std::remove(name.c_str()));
185   }
186 
187   std::cout << "Delete test objects " << std::flush;
188   ASSERT_NO_FATAL_FAILURE(DeleteObjects(*client, object_names));
189   std::cout << " DONE\n";
190 }
191 
192 }  // anonymous namespace
193 }  // namespace STORAGE_CLIENT_NS
194 }  // namespace storage
195 }  // namespace cloud
196 }  // namespace google
197