1 // Copyright 2018 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "google/cloud/storage/client.h"
16 #include "google/cloud/storage/testing/storage_integration_test.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include <gmock/gmock.h>
20 #include <thread>
21
22 namespace google {
23 namespace cloud {
24 namespace storage {
25 inline namespace STORAGE_CLIENT_NS {
26 namespace {
27
28 using ::testing::AnyOf;
29 using ::testing::Eq;
30 using ::testing::HasSubstr;
31 using ::testing::Not;
32
33 class ObjectResumableWriteIntegrationTest
34 : public google::cloud::storage::testing::StorageIntegrationTest {
35 protected:
SetUp()36 void SetUp() override {
37 google::cloud::storage::testing::StorageIntegrationTest::SetUp();
38 bucket_name_ = google::cloud::internal::GetEnv(
39 "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME")
40 .value_or("");
41 ASSERT_FALSE(bucket_name_.empty());
42 }
43
44 std::string bucket_name_;
45 };
46
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithContentType)47 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithContentType) {
48 StatusOr<Client> client = MakeIntegrationTestClient();
49 ASSERT_STATUS_OK(client);
50
51 auto object_name = MakeRandomObjectName();
52
53 // We will construct the expected response while streaming the data up.
54 std::ostringstream expected;
55
56 // Create the object, but only if it does not exist already.
57 auto os = client->WriteObject(
58 bucket_name_, object_name, IfGenerationMatch(0),
59 WithObjectMetadata(ObjectMetadata().set_content_type("text/plain")));
60 os.exceptions(std::ios_base::failbit);
61 os << LoremIpsum();
62 EXPECT_FALSE(os.resumable_session_id().empty());
63 os.Close();
64 ASSERT_STATUS_OK(os.metadata());
65 ObjectMetadata meta = os.metadata().value();
66 EXPECT_EQ(object_name, meta.name());
67 EXPECT_EQ(bucket_name_, meta.bucket());
68 EXPECT_EQ("text/plain", meta.content_type());
69 if (UsingEmulator()) {
70 EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
71 EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
72 }
73
74 auto status = client->DeleteObject(bucket_name_, object_name);
75 EXPECT_STATUS_OK(status);
76 }
77
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithContentTypeFailure)78 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithContentTypeFailure) {
79 StatusOr<Client> client = MakeIntegrationTestClient();
80 ASSERT_STATUS_OK(client);
81
82 auto bucket_name = MakeRandomBucketName();
83 auto object_name = MakeRandomObjectName();
84
85 // We will construct the expected response while streaming the data up.
86 std::ostringstream expected;
87
88 // Create the object, but only if it does not exist already.
89 auto os = client->WriteObject(
90 bucket_name, object_name, IfGenerationMatch(0),
91 WithObjectMetadata(ObjectMetadata().set_content_type("text/plain")));
92 EXPECT_TRUE(os.bad());
93 EXPECT_FALSE(os.metadata().status().ok())
94 << ", status=" << os.metadata().status();
95 }
96
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithUseResumable)97 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithUseResumable) {
98 StatusOr<Client> client = MakeIntegrationTestClient();
99 ASSERT_STATUS_OK(client);
100
101 auto object_name = MakeRandomObjectName();
102
103 // We will construct the expected response while streaming the data up.
104 std::ostringstream expected;
105
106 // Create the object, but only if it does not exist already.
107 auto os = client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
108 NewResumableUploadSession());
109 os.exceptions(std::ios_base::failbit);
110 os << LoremIpsum();
111 EXPECT_FALSE(os.resumable_session_id().empty());
112 os.Close();
113 ASSERT_STATUS_OK(os.metadata());
114 ObjectMetadata meta = os.metadata().value();
115 EXPECT_EQ(object_name, meta.name());
116 EXPECT_EQ(bucket_name_, meta.bucket());
117 if (UsingEmulator()) {
118 EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
119 EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
120 }
121
122 auto status = client->DeleteObject(bucket_name_, object_name);
123 EXPECT_STATUS_OK(status);
124 }
125
TEST_F(ObjectResumableWriteIntegrationTest,WriteResume)126 TEST_F(ObjectResumableWriteIntegrationTest, WriteResume) {
127 StatusOr<Client> client = MakeIntegrationTestClient();
128 ASSERT_STATUS_OK(client);
129
130 auto object_name = MakeRandomObjectName();
131
132 // We will construct the expected response while streaming the data up.
133 std::ostringstream expected;
134
135 // Create the object, but only if it does not exist already.
136 std::string session_id;
137 {
138 auto old_os =
139 client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
140 NewResumableUploadSession());
141 ASSERT_TRUE(old_os.good()) << "status=" << old_os.metadata().status();
142 session_id = old_os.resumable_session_id();
143 std::move(old_os).Suspend();
144 }
145
146 auto os = client->WriteObject(bucket_name_, object_name,
147 RestoreResumableUploadSession(session_id));
148 ASSERT_TRUE(os.good()) << "status=" << os.metadata().status();
149 EXPECT_EQ(session_id, os.resumable_session_id());
150 os << LoremIpsum();
151 os.Close();
152 ASSERT_STATUS_OK(os.metadata());
153 ObjectMetadata meta = os.metadata().value();
154 EXPECT_EQ(object_name, meta.name());
155 EXPECT_EQ(bucket_name_, meta.bucket());
156 if (UsingEmulator()) {
157 EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
158 EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
159 }
160
161 auto status = client->DeleteObject(bucket_name_, object_name);
162 EXPECT_STATUS_OK(status);
163 }
164
TEST_F(ObjectResumableWriteIntegrationTest,WriteNotChunked)165 TEST_F(ObjectResumableWriteIntegrationTest, WriteNotChunked) {
166 StatusOr<Client> client = MakeIntegrationTestClient();
167 ASSERT_STATUS_OK(client);
168
169 auto object_name = MakeRandomObjectName();
170 auto constexpr kUploadQuantum = 256 * 1024;
171 auto const payload = std::string(
172 client->raw_client()->client_options().upload_buffer_size(), '*');
173 auto const header = MakeRandomData(kUploadQuantum / 2);
174
175 auto os =
176 client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0));
177 ASSERT_TRUE(os.good()) << "status=" << os.metadata().status();
178 // Write a small header that is too small to be flushed...
179 os.write(header.data(), header.size());
180 for (int i = 0; i != 3; ++i) {
181 // Append some data that is large enough to flush, this creates a call to
182 // UploadChunk() with two buffers, and that triggered chunked transfer
183 // encoding, even though the size is known which wastes bandwidth.
184 os.write(payload.data(), payload.size());
185 ASSERT_TRUE(os.good());
186 }
187 os.Close();
188 ASSERT_STATUS_OK(os.metadata());
189 ObjectMetadata meta = os.metadata().value();
190 if (meta.has_metadata("x_emulator_upload")) {
191 EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
192 }
193 if (meta.has_metadata("x_emulator_transfer_encoding")) {
194 EXPECT_THAT(meta.metadata("x_emulator_transfer_encoding"),
195 Not(HasSubstr("chunked")));
196 }
197
198 auto status = client->DeleteObject(bucket_name_, object_name);
199 EXPECT_STATUS_OK(status);
200 }
201
TEST_F(ObjectResumableWriteIntegrationTest,WriteResumeFinalizedUpload)202 TEST_F(ObjectResumableWriteIntegrationTest, WriteResumeFinalizedUpload) {
203 // TODO(#5460) remove this when the underlying issue is resolved.
204 if (UsingGrpc()) GTEST_SKIP();
205
206 StatusOr<Client> client = MakeIntegrationTestClient();
207 ASSERT_STATUS_OK(client);
208
209 auto object_name = MakeRandomObjectName();
210
211 // Start a resumable upload and finalize the upload.
212 std::string session_id;
213 {
214 auto old_os =
215 client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
216 NewResumableUploadSession());
217 ASSERT_TRUE(old_os.good()) << "status=" << old_os.metadata().status();
218 session_id = old_os.resumable_session_id();
219 old_os << LoremIpsum();
220 }
221
222 auto os = client->WriteObject(bucket_name_, object_name,
223 RestoreResumableUploadSession(session_id));
224 EXPECT_FALSE(os.IsOpen());
225 EXPECT_EQ(session_id, os.resumable_session_id());
226 ASSERT_STATUS_OK(os.metadata());
227 // TODO(b/146890058) - gRPC does not return the object metadata.
228 if (!UsingGrpc()) {
229 ObjectMetadata meta = os.metadata().value();
230 EXPECT_EQ(object_name, meta.name());
231 EXPECT_EQ(bucket_name_, meta.bucket());
232 if (UsingEmulator()) {
233 EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
234 EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
235 }
236 }
237
238 auto status = client->DeleteObject(bucket_name_, object_name);
239 EXPECT_STATUS_OK(status);
240 }
241
TEST_F(ObjectResumableWriteIntegrationTest,StreamingWriteFailure)242 TEST_F(ObjectResumableWriteIntegrationTest, StreamingWriteFailure) {
243 StatusOr<Client> client = MakeIntegrationTestClient();
244 ASSERT_STATUS_OK(client);
245
246 auto object_name = MakeRandomObjectName();
247
248 std::string expected = LoremIpsum();
249
250 // Create the object, but only if it does not exist already.
251 StatusOr<ObjectMetadata> meta = client->InsertObject(
252 bucket_name_, object_name, expected, IfGenerationMatch(0));
253 ASSERT_STATUS_OK(meta);
254
255 EXPECT_EQ(object_name, meta->name());
256 EXPECT_EQ(bucket_name_, meta->bucket());
257
258 auto os = client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
259 NewResumableUploadSession());
260 os << "Expected failure data:\n" << LoremIpsum();
261
262 // This operation should fail because the object already exists.
263 os.Close();
264 EXPECT_TRUE(os.bad());
265 EXPECT_FALSE(os.metadata().ok());
266 // The GCS server returns a different error code depending on the
267 // protocol (REST vs. gRPC) used
268 EXPECT_THAT(
269 os.metadata().status().code(),
270 AnyOf(Eq(StatusCode::kFailedPrecondition), Eq(StatusCode::kAborted)))
271 << " status=" << os.metadata().status();
272
273 auto status = client->DeleteObject(bucket_name_, object_name);
274 EXPECT_STATUS_OK(status);
275 }
276
TEST_F(ObjectResumableWriteIntegrationTest,StreamingWriteSlow)277 TEST_F(ObjectResumableWriteIntegrationTest, StreamingWriteSlow) {
278 std::chrono::seconds timeout(3);
279 auto retry_policy =
280 LimitedTimeRetryPolicy(/*maximum_duration=*/timeout).clone();
281 StatusOr<Client> client = MakeIntegrationTestClient(std::move(retry_policy));
282 ASSERT_STATUS_OK(client);
283
284 auto object_name = MakeRandomObjectName();
285
286 auto data = MakeRandomData(1024 * 1024);
287
288 auto os =
289 client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0));
290 os.write(data.data(), data.size());
291 EXPECT_FALSE(os.bad());
292 std::cout << "Sleeping to force timeout ... " << std::flush;
293 std::this_thread::sleep_for(2 * timeout);
294 std::cout << "DONE\n";
295
296 os.write(data.data(), data.size());
297 EXPECT_FALSE(os.bad());
298
299 // This operation should fail because the object already exists.
300 os.Close();
301 EXPECT_FALSE(os.bad());
302 EXPECT_STATUS_OK(os.metadata());
303
304 auto status = client->DeleteObject(bucket_name_, object_name);
305 EXPECT_STATUS_OK(status);
306 }
307
TEST_F(ObjectResumableWriteIntegrationTest,WithXUploadContentLength)308 TEST_F(ObjectResumableWriteIntegrationTest, WithXUploadContentLength) {
309 if (UsingEmulator() || UsingGrpc()) GTEST_SKIP();
310 auto constexpr kMiB = 1024 * 1024L;
311 auto constexpr kChunkSize = 2 * kMiB;
312
313 auto options = ClientOptions::CreateDefaultClientOptions();
314 ASSERT_STATUS_OK(options);
315 Client client(options->SetUploadBufferSize(kChunkSize));
316
317 auto const chunk = MakeRandomData(kChunkSize);
318
319 for (auto const desired_size : {2 * kMiB, 3 * kMiB, 4 * kMiB}) {
320 auto object_name = MakeRandomObjectName();
321 SCOPED_TRACE("Testing with desired_size=" + std::to_string(desired_size) +
322 ", name=" + object_name);
323 auto os = client.WriteObject(
324 bucket_name_, object_name, IfGenerationMatch(0),
325 CustomHeader("X-Upload-Content-Length", std::to_string(desired_size)));
326 auto offset = 0L;
327 while (offset < desired_size) {
328 auto const n = (std::min)(desired_size - offset, kChunkSize);
329 os.write(chunk.data(), n);
330 ASSERT_FALSE(os.bad());
331 offset += n;
332 }
333
334 os.Close();
335 EXPECT_FALSE(os.bad());
336 EXPECT_STATUS_OK(os.metadata());
337 EXPECT_EQ(desired_size, os.metadata()->size());
338
339 auto status = client.DeleteObject(bucket_name_, object_name);
340 EXPECT_STATUS_OK(status);
341 }
342 }
343
TEST_F(ObjectResumableWriteIntegrationTest,WithXUploadContentLengthRandom)344 TEST_F(ObjectResumableWriteIntegrationTest, WithXUploadContentLengthRandom) {
345 if (UsingGrpc()) GTEST_SKIP();
346 auto constexpr kQuantum = 256 * 1024L;
347 size_t constexpr kChunkSize = 2 * kQuantum;
348
349 auto options = ClientOptions::CreateDefaultClientOptions();
350 ASSERT_STATUS_OK(options);
351 Client client(options->SetUploadBufferSize(kChunkSize));
352
353 auto const chunk = MakeRandomData(kChunkSize);
354
355 std::uniform_int_distribution<std::size_t> size_gen(kQuantum, 5 * kQuantum);
356 for (int i = 0; i != 10; ++i) {
357 auto object_name = MakeRandomObjectName();
358 auto const desired_size = size_gen(generator_);
359 SCOPED_TRACE("Testing with desired_size=" + std::to_string(desired_size) +
360 ", name=" + object_name);
361 auto os = client.WriteObject(
362 bucket_name_, object_name, IfGenerationMatch(0),
363 CustomHeader("X-Upload-Content-Length", std::to_string(desired_size)));
364 std::size_t offset = 0L;
365 while (offset < desired_size) {
366 auto const n = (std::min)(desired_size - offset, kChunkSize);
367 os.write(chunk.data(), n);
368 ASSERT_FALSE(os.bad());
369 offset += n;
370 }
371
372 os.Close();
373 EXPECT_FALSE(os.bad());
374 EXPECT_STATUS_OK(os.metadata());
375 EXPECT_EQ(desired_size, os.metadata()->size());
376
377 auto status = client.DeleteObject(bucket_name_, object_name);
378 EXPECT_STATUS_OK(status);
379 }
380 }
381
TEST_F(ObjectResumableWriteIntegrationTest,WithInvalidXUploadContentLength)382 TEST_F(ObjectResumableWriteIntegrationTest, WithInvalidXUploadContentLength) {
383 if (UsingEmulator() || UsingGrpc()) GTEST_SKIP();
384 StatusOr<Client> client = MakeIntegrationTestClient();
385 ASSERT_STATUS_OK(client);
386
387 auto constexpr kChunkSize = 256 * 1024L;
388 auto const chunk = MakeRandomData(kChunkSize);
389
390 auto object_name = MakeRandomObjectName();
391 auto const desired_size = 5 * kChunkSize;
392 // Use an invalid value in the X-Upload-Content-Length header, the library
393 // should return an error.
394 auto os = client->WriteObject(
395 bucket_name_, object_name, IfGenerationMatch(0),
396 CustomHeader("X-Upload-Content-Length", std::to_string(3 * kChunkSize)));
397 auto offset = 0L;
398 while (offset < desired_size) {
399 auto const n = (std::min)(desired_size - offset, kChunkSize);
400 os.write(chunk.data(), n);
401 ASSERT_FALSE(os.bad());
402 offset += n;
403 }
404
405 // This operation should fail because the x-upload-content-length header does
406 // not match the amount of data sent in the upload.
407 os.Close();
408 EXPECT_TRUE(os.bad());
409 EXPECT_FALSE(os.metadata().ok());
410 // No need to delete the object, as it is never created.
411 }
412
413 } // anonymous namespace
414 } // namespace STORAGE_CLIENT_NS
415 } // namespace storage
416 } // namespace cloud
417 } // namespace google
418