1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/storage/client.h"
16 #include "google/cloud/storage/testing/storage_integration_test.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include <gmock/gmock.h>
20 #include <cstdio>
21 #include <fstream>
22 #include <future>
23 #include <regex>
24 #include <thread>
25
26 namespace google {
27 namespace cloud {
28 namespace storage {
29 inline namespace STORAGE_CLIENT_NS {
30 namespace {
31
32 auto const kObjectSize = 16 * 1024;
33
34 class ObjectFileMultiThreadedTest
35 : public google::cloud::storage::testing::StorageIntegrationTest {
36 protected:
SetUp()37 void SetUp() override {
38 bucket_name_ = google::cloud::internal::GetEnv(
39 "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME")
40 .value_or("");
41 ASSERT_FALSE(bucket_name_.empty());
42 auto object_count = google::cloud::internal::GetEnv(
43 "GOOGLE_CLOUD_CPP_STORAGE_TEST_OBJECT_COUNT");
44 if (object_count) object_count_ = std::stoi(*object_count);
45 }
46
ThreadCount()47 static int ThreadCount() {
48 static int const kCount = [] {
49 auto c = static_cast<int>(std::thread::hardware_concurrency());
50 return c == 0 ? 4 : 4 * c;
51 }();
52 return kCount;
53 }
54
CreateObjectNames()55 std::vector<std::string> CreateObjectNames() {
56 std::vector<std::string> object_names(object_count_);
57 // Use MakeRandomFilename() because the same name is used for
58 // the destination file.
59 std::generate_n(object_names.begin(), object_names.size(),
60 [this] { return MakeRandomFilename(); });
61 return object_names;
62 }
63
CreateSomeObjects(Client client,std::vector<std::string> const & object_names,int thread_count,int modulo)64 Status CreateSomeObjects(Client client,
65 std::vector<std::string> const& object_names,
66 int thread_count, int modulo) {
67 auto contents = [this] {
68 std::unique_lock<std::mutex> lk(mu_);
69 return MakeRandomData(kObjectSize);
70 }();
71 int index = 0;
72 for (auto const& n : object_names) {
73 if (index++ % thread_count != modulo) continue;
74 if (modulo == 0) {
75 std::unique_lock<std::mutex> lk(mu_);
76 std::cout << '.' << std::flush;
77 }
78 auto metadata =
79 client.InsertObject(bucket_name_, n, contents, IfGenerationMatch(0));
80 if (!metadata) return metadata.status();
81 }
82 return Status();
83 }
84
CreateObjects(Client const & client,std::vector<std::string> const & object_names)85 void CreateObjects(Client const& client,
86 std::vector<std::string> const& object_names) {
87 // Parallelize the object creation too because it can be slow.
88 int const thread_count = ThreadCount();
89 auto create_some_objects = [this, &client, &object_names,
90 thread_count](int modulo) {
91 return CreateSomeObjects(client, object_names, thread_count, modulo);
92 };
93 std::vector<std::future<Status>> tasks(thread_count);
94 int modulo = 0;
95 for (auto& t : tasks) {
96 t = std::async(std::launch::async, create_some_objects, modulo++);
97 }
98 for (auto& t : tasks) {
99 auto const status = t.get();
100 EXPECT_STATUS_OK(status);
101 }
102 }
103
DeleteSomeObjects(Client client,std::vector<std::string> const & object_names,int thread_count,int modulo)104 Status DeleteSomeObjects(Client client,
105 std::vector<std::string> const& object_names,
106 int thread_count, int modulo) {
107 int index = 0;
108 Status status;
109 for (auto const& name : object_names) {
110 if (index++ % thread_count != modulo) continue;
111 if (modulo == 0) {
112 std::unique_lock<std::mutex> lk(mu_);
113 std::cout << '.' << std::flush;
114 }
115 auto result = client.DeleteObject(bucket_name_, name);
116 if (!result.ok()) status = result;
117 }
118 return status;
119 }
120
DeleteObjects(Client const & client,std::vector<std::string> const & object_names)121 void DeleteObjects(Client const& client,
122 std::vector<std::string> const& object_names) {
123 // Parallelize the object deletion too because it can be slow.
124 int const thread_count = ThreadCount();
125 auto delete_some_objects = [this, &client, &object_names,
126 thread_count](int modulo) {
127 return DeleteSomeObjects(client, object_names, thread_count, modulo);
128 };
129 std::vector<std::future<Status>> tasks(thread_count);
130 int modulo = 0;
131 for (auto& t : tasks) {
132 t = std::async(std::launch::async, delete_some_objects, modulo++);
133 }
134 for (auto& t : tasks) {
135 auto const status = t.get();
136 EXPECT_STATUS_OK(status);
137 }
138 }
139
140 std::mutex mu_;
141 std::string bucket_name_;
142 int object_count_ = 128;
143 };
144
TEST_F(ObjectFileMultiThreadedTest,Download)145 TEST_F(ObjectFileMultiThreadedTest, Download) {
146 StatusOr<Client> client = MakeIntegrationTestClient();
147 ASSERT_STATUS_OK(client);
148
149 auto const object_names = CreateObjectNames();
150 std::cout << "Create test objects " << std::flush;
151 ASSERT_NO_FATAL_FAILURE(CreateObjects(*client, object_names));
152 std::cout << " DONE\n";
153
154 // Create multiple threads, each downloading a portion of the objects.
155 auto const thread_count = ThreadCount();
156 auto download_some_objects = [this, thread_count, &client,
157 &object_names](int modulo) {
158 std::cout << '+' << std::flush;
159 int index = 0;
160 for (auto const& name : object_names) {
161 if (index++ % thread_count != modulo) continue;
162 if (modulo == 0) {
163 std::unique_lock<std::mutex> lk(mu_);
164 std::cout << '.' << std::flush;
165 }
166 auto status = client->DownloadToFile(bucket_name_, name, name);
167 if (!status.ok()) return status; // stop on the first error
168 }
169 return Status();
170 };
171 std::cout << "Performing downloads " << std::flush;
172 std::vector<std::future<Status>> tasks(thread_count);
173 int modulo = 0;
174 for (auto& t : tasks) {
175 t = std::async(std::launch::async, download_some_objects, modulo++);
176 }
177 for (auto& t : tasks) {
178 auto const status = t.get();
179 EXPECT_STATUS_OK(status);
180 }
181 std::cout << " DONE\n";
182
183 for (auto const& name : object_names) {
184 EXPECT_EQ(0, std::remove(name.c_str()));
185 }
186
187 std::cout << "Delete test objects " << std::flush;
188 ASSERT_NO_FATAL_FAILURE(DeleteObjects(*client, object_names));
189 std::cout << " DONE\n";
190 }
191
192 } // anonymous namespace
193 } // namespace STORAGE_CLIENT_NS
194 } // namespace storage
195 } // namespace cloud
196 } // namespace google
197