1 // Copyright 2018 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/storage/client.h"
16 #include "google/cloud/storage/testing/storage_integration_test.h"
17 #include "google/cloud/internal/getenv.h"
18 #include "google/cloud/testing_util/assert_ok.h"
19 #include <gmock/gmock.h>
20 #include <thread>
21 
22 namespace google {
23 namespace cloud {
24 namespace storage {
25 inline namespace STORAGE_CLIENT_NS {
26 namespace {
27 
28 using ::testing::AnyOf;
29 using ::testing::Eq;
30 using ::testing::HasSubstr;
31 using ::testing::Not;
32 
33 class ObjectResumableWriteIntegrationTest
34     : public google::cloud::storage::testing::StorageIntegrationTest {
35  protected:
SetUp()36   void SetUp() override {
37     google::cloud::storage::testing::StorageIntegrationTest::SetUp();
38     bucket_name_ = google::cloud::internal::GetEnv(
39                        "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME")
40                        .value_or("");
41     ASSERT_FALSE(bucket_name_.empty());
42   }
43 
44   std::string bucket_name_;
45 };
46 
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithContentType)47 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithContentType) {
48   StatusOr<Client> client = MakeIntegrationTestClient();
49   ASSERT_STATUS_OK(client);
50 
51   auto object_name = MakeRandomObjectName();
52 
53   // We will construct the expected response while streaming the data up.
54   std::ostringstream expected;
55 
56   // Create the object, but only if it does not exist already.
57   auto os = client->WriteObject(
58       bucket_name_, object_name, IfGenerationMatch(0),
59       WithObjectMetadata(ObjectMetadata().set_content_type("text/plain")));
60   os.exceptions(std::ios_base::failbit);
61   os << LoremIpsum();
62   EXPECT_FALSE(os.resumable_session_id().empty());
63   os.Close();
64   ASSERT_STATUS_OK(os.metadata());
65   ObjectMetadata meta = os.metadata().value();
66   EXPECT_EQ(object_name, meta.name());
67   EXPECT_EQ(bucket_name_, meta.bucket());
68   EXPECT_EQ("text/plain", meta.content_type());
69   if (UsingEmulator()) {
70     EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
71     EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
72   }
73 
74   auto status = client->DeleteObject(bucket_name_, object_name);
75   EXPECT_STATUS_OK(status);
76 }
77 
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithContentTypeFailure)78 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithContentTypeFailure) {
79   StatusOr<Client> client = MakeIntegrationTestClient();
80   ASSERT_STATUS_OK(client);
81 
82   auto bucket_name = MakeRandomBucketName();
83   auto object_name = MakeRandomObjectName();
84 
85   // We will construct the expected response while streaming the data up.
86   std::ostringstream expected;
87 
88   // Create the object, but only if it does not exist already.
89   auto os = client->WriteObject(
90       bucket_name, object_name, IfGenerationMatch(0),
91       WithObjectMetadata(ObjectMetadata().set_content_type("text/plain")));
92   EXPECT_TRUE(os.bad());
93   EXPECT_FALSE(os.metadata().status().ok())
94       << ", status=" << os.metadata().status();
95 }
96 
TEST_F(ObjectResumableWriteIntegrationTest,WriteWithUseResumable)97 TEST_F(ObjectResumableWriteIntegrationTest, WriteWithUseResumable) {
98   StatusOr<Client> client = MakeIntegrationTestClient();
99   ASSERT_STATUS_OK(client);
100 
101   auto object_name = MakeRandomObjectName();
102 
103   // We will construct the expected response while streaming the data up.
104   std::ostringstream expected;
105 
106   // Create the object, but only if it does not exist already.
107   auto os = client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
108                                 NewResumableUploadSession());
109   os.exceptions(std::ios_base::failbit);
110   os << LoremIpsum();
111   EXPECT_FALSE(os.resumable_session_id().empty());
112   os.Close();
113   ASSERT_STATUS_OK(os.metadata());
114   ObjectMetadata meta = os.metadata().value();
115   EXPECT_EQ(object_name, meta.name());
116   EXPECT_EQ(bucket_name_, meta.bucket());
117   if (UsingEmulator()) {
118     EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
119     EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
120   }
121 
122   auto status = client->DeleteObject(bucket_name_, object_name);
123   EXPECT_STATUS_OK(status);
124 }
125 
TEST_F(ObjectResumableWriteIntegrationTest,WriteResume)126 TEST_F(ObjectResumableWriteIntegrationTest, WriteResume) {
127   StatusOr<Client> client = MakeIntegrationTestClient();
128   ASSERT_STATUS_OK(client);
129 
130   auto object_name = MakeRandomObjectName();
131 
132   // We will construct the expected response while streaming the data up.
133   std::ostringstream expected;
134 
135   // Create the object, but only if it does not exist already.
136   std::string session_id;
137   {
138     auto old_os =
139         client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
140                             NewResumableUploadSession());
141     ASSERT_TRUE(old_os.good()) << "status=" << old_os.metadata().status();
142     session_id = old_os.resumable_session_id();
143     std::move(old_os).Suspend();
144   }
145 
146   auto os = client->WriteObject(bucket_name_, object_name,
147                                 RestoreResumableUploadSession(session_id));
148   ASSERT_TRUE(os.good()) << "status=" << os.metadata().status();
149   EXPECT_EQ(session_id, os.resumable_session_id());
150   os << LoremIpsum();
151   os.Close();
152   ASSERT_STATUS_OK(os.metadata());
153   ObjectMetadata meta = os.metadata().value();
154   EXPECT_EQ(object_name, meta.name());
155   EXPECT_EQ(bucket_name_, meta.bucket());
156   if (UsingEmulator()) {
157     EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
158     EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
159   }
160 
161   auto status = client->DeleteObject(bucket_name_, object_name);
162   EXPECT_STATUS_OK(status);
163 }
164 
TEST_F(ObjectResumableWriteIntegrationTest,WriteNotChunked)165 TEST_F(ObjectResumableWriteIntegrationTest, WriteNotChunked) {
166   StatusOr<Client> client = MakeIntegrationTestClient();
167   ASSERT_STATUS_OK(client);
168 
169   auto object_name = MakeRandomObjectName();
170   auto constexpr kUploadQuantum = 256 * 1024;
171   auto const payload = std::string(
172       client->raw_client()->client_options().upload_buffer_size(), '*');
173   auto const header = MakeRandomData(kUploadQuantum / 2);
174 
175   auto os =
176       client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0));
177   ASSERT_TRUE(os.good()) << "status=" << os.metadata().status();
178   // Write a small header that is too small to be flushed...
179   os.write(header.data(), header.size());
180   for (int i = 0; i != 3; ++i) {
181     // Append some data that is large enough to flush, this creates a call to
182     // UploadChunk() with two buffers, and that triggered chunked transfer
183     // encoding, even though the size is known which wastes bandwidth.
184     os.write(payload.data(), payload.size());
185     ASSERT_TRUE(os.good());
186   }
187   os.Close();
188   ASSERT_STATUS_OK(os.metadata());
189   ObjectMetadata meta = os.metadata().value();
190   if (meta.has_metadata("x_emulator_upload")) {
191     EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
192   }
193   if (meta.has_metadata("x_emulator_transfer_encoding")) {
194     EXPECT_THAT(meta.metadata("x_emulator_transfer_encoding"),
195                 Not(HasSubstr("chunked")));
196   }
197 
198   auto status = client->DeleteObject(bucket_name_, object_name);
199   EXPECT_STATUS_OK(status);
200 }
201 
TEST_F(ObjectResumableWriteIntegrationTest,WriteResumeFinalizedUpload)202 TEST_F(ObjectResumableWriteIntegrationTest, WriteResumeFinalizedUpload) {
203   // TODO(#5460) remove this when the underlying issue is resolved.
204   if (UsingGrpc()) GTEST_SKIP();
205 
206   StatusOr<Client> client = MakeIntegrationTestClient();
207   ASSERT_STATUS_OK(client);
208 
209   auto object_name = MakeRandomObjectName();
210 
211   // Start a resumable upload and finalize the upload.
212   std::string session_id;
213   {
214     auto old_os =
215         client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
216                             NewResumableUploadSession());
217     ASSERT_TRUE(old_os.good()) << "status=" << old_os.metadata().status();
218     session_id = old_os.resumable_session_id();
219     old_os << LoremIpsum();
220   }
221 
222   auto os = client->WriteObject(bucket_name_, object_name,
223                                 RestoreResumableUploadSession(session_id));
224   EXPECT_FALSE(os.IsOpen());
225   EXPECT_EQ(session_id, os.resumable_session_id());
226   ASSERT_STATUS_OK(os.metadata());
227   // TODO(b/146890058) - gRPC does not return the object metadata.
228   if (!UsingGrpc()) {
229     ObjectMetadata meta = os.metadata().value();
230     EXPECT_EQ(object_name, meta.name());
231     EXPECT_EQ(bucket_name_, meta.bucket());
232     if (UsingEmulator()) {
233       EXPECT_TRUE(meta.has_metadata("x_emulator_upload"));
234       EXPECT_EQ("resumable", meta.metadata("x_emulator_upload"));
235     }
236   }
237 
238   auto status = client->DeleteObject(bucket_name_, object_name);
239   EXPECT_STATUS_OK(status);
240 }
241 
TEST_F(ObjectResumableWriteIntegrationTest,StreamingWriteFailure)242 TEST_F(ObjectResumableWriteIntegrationTest, StreamingWriteFailure) {
243   StatusOr<Client> client = MakeIntegrationTestClient();
244   ASSERT_STATUS_OK(client);
245 
246   auto object_name = MakeRandomObjectName();
247 
248   std::string expected = LoremIpsum();
249 
250   // Create the object, but only if it does not exist already.
251   StatusOr<ObjectMetadata> meta = client->InsertObject(
252       bucket_name_, object_name, expected, IfGenerationMatch(0));
253   ASSERT_STATUS_OK(meta);
254 
255   EXPECT_EQ(object_name, meta->name());
256   EXPECT_EQ(bucket_name_, meta->bucket());
257 
258   auto os = client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0),
259                                 NewResumableUploadSession());
260   os << "Expected failure data:\n" << LoremIpsum();
261 
262   // This operation should fail because the object already exists.
263   os.Close();
264   EXPECT_TRUE(os.bad());
265   EXPECT_FALSE(os.metadata().ok());
266   // The GCS server returns a different error code depending on the
267   // protocol (REST vs. gRPC) used
268   EXPECT_THAT(
269       os.metadata().status().code(),
270       AnyOf(Eq(StatusCode::kFailedPrecondition), Eq(StatusCode::kAborted)))
271       << " status=" << os.metadata().status();
272 
273   auto status = client->DeleteObject(bucket_name_, object_name);
274   EXPECT_STATUS_OK(status);
275 }
276 
TEST_F(ObjectResumableWriteIntegrationTest,StreamingWriteSlow)277 TEST_F(ObjectResumableWriteIntegrationTest, StreamingWriteSlow) {
278   std::chrono::seconds timeout(3);
279   auto retry_policy =
280       LimitedTimeRetryPolicy(/*maximum_duration=*/timeout).clone();
281   StatusOr<Client> client = MakeIntegrationTestClient(std::move(retry_policy));
282   ASSERT_STATUS_OK(client);
283 
284   auto object_name = MakeRandomObjectName();
285 
286   auto data = MakeRandomData(1024 * 1024);
287 
288   auto os =
289       client->WriteObject(bucket_name_, object_name, IfGenerationMatch(0));
290   os.write(data.data(), data.size());
291   EXPECT_FALSE(os.bad());
292   std::cout << "Sleeping to force timeout ... " << std::flush;
293   std::this_thread::sleep_for(2 * timeout);
294   std::cout << "DONE\n";
295 
296   os.write(data.data(), data.size());
297   EXPECT_FALSE(os.bad());
298 
299   // This operation should fail because the object already exists.
300   os.Close();
301   EXPECT_FALSE(os.bad());
302   EXPECT_STATUS_OK(os.metadata());
303 
304   auto status = client->DeleteObject(bucket_name_, object_name);
305   EXPECT_STATUS_OK(status);
306 }
307 
TEST_F(ObjectResumableWriteIntegrationTest,WithXUploadContentLength)308 TEST_F(ObjectResumableWriteIntegrationTest, WithXUploadContentLength) {
309   if (UsingEmulator() || UsingGrpc()) GTEST_SKIP();
310   auto constexpr kMiB = 1024 * 1024L;
311   auto constexpr kChunkSize = 2 * kMiB;
312 
313   auto options = ClientOptions::CreateDefaultClientOptions();
314   ASSERT_STATUS_OK(options);
315   Client client(options->SetUploadBufferSize(kChunkSize));
316 
317   auto const chunk = MakeRandomData(kChunkSize);
318 
319   for (auto const desired_size : {2 * kMiB, 3 * kMiB, 4 * kMiB}) {
320     auto object_name = MakeRandomObjectName();
321     SCOPED_TRACE("Testing with desired_size=" + std::to_string(desired_size) +
322                  ", name=" + object_name);
323     auto os = client.WriteObject(
324         bucket_name_, object_name, IfGenerationMatch(0),
325         CustomHeader("X-Upload-Content-Length", std::to_string(desired_size)));
326     auto offset = 0L;
327     while (offset < desired_size) {
328       auto const n = (std::min)(desired_size - offset, kChunkSize);
329       os.write(chunk.data(), n);
330       ASSERT_FALSE(os.bad());
331       offset += n;
332     }
333 
334     os.Close();
335     EXPECT_FALSE(os.bad());
336     EXPECT_STATUS_OK(os.metadata());
337     EXPECT_EQ(desired_size, os.metadata()->size());
338 
339     auto status = client.DeleteObject(bucket_name_, object_name);
340     EXPECT_STATUS_OK(status);
341   }
342 }
343 
TEST_F(ObjectResumableWriteIntegrationTest,WithXUploadContentLengthRandom)344 TEST_F(ObjectResumableWriteIntegrationTest, WithXUploadContentLengthRandom) {
345   if (UsingGrpc()) GTEST_SKIP();
346   auto constexpr kQuantum = 256 * 1024L;
347   size_t constexpr kChunkSize = 2 * kQuantum;
348 
349   auto options = ClientOptions::CreateDefaultClientOptions();
350   ASSERT_STATUS_OK(options);
351   Client client(options->SetUploadBufferSize(kChunkSize));
352 
353   auto const chunk = MakeRandomData(kChunkSize);
354 
355   std::uniform_int_distribution<std::size_t> size_gen(kQuantum, 5 * kQuantum);
356   for (int i = 0; i != 10; ++i) {
357     auto object_name = MakeRandomObjectName();
358     auto const desired_size = size_gen(generator_);
359     SCOPED_TRACE("Testing with desired_size=" + std::to_string(desired_size) +
360                  ", name=" + object_name);
361     auto os = client.WriteObject(
362         bucket_name_, object_name, IfGenerationMatch(0),
363         CustomHeader("X-Upload-Content-Length", std::to_string(desired_size)));
364     std::size_t offset = 0L;
365     while (offset < desired_size) {
366       auto const n = (std::min)(desired_size - offset, kChunkSize);
367       os.write(chunk.data(), n);
368       ASSERT_FALSE(os.bad());
369       offset += n;
370     }
371 
372     os.Close();
373     EXPECT_FALSE(os.bad());
374     EXPECT_STATUS_OK(os.metadata());
375     EXPECT_EQ(desired_size, os.metadata()->size());
376 
377     auto status = client.DeleteObject(bucket_name_, object_name);
378     EXPECT_STATUS_OK(status);
379   }
380 }
381 
TEST_F(ObjectResumableWriteIntegrationTest,WithInvalidXUploadContentLength)382 TEST_F(ObjectResumableWriteIntegrationTest, WithInvalidXUploadContentLength) {
383   if (UsingEmulator() || UsingGrpc()) GTEST_SKIP();
384   StatusOr<Client> client = MakeIntegrationTestClient();
385   ASSERT_STATUS_OK(client);
386 
387   auto constexpr kChunkSize = 256 * 1024L;
388   auto const chunk = MakeRandomData(kChunkSize);
389 
390   auto object_name = MakeRandomObjectName();
391   auto const desired_size = 5 * kChunkSize;
392   // Use an invalid value in the X-Upload-Content-Length header, the library
393   // should return an error.
394   auto os = client->WriteObject(
395       bucket_name_, object_name, IfGenerationMatch(0),
396       CustomHeader("X-Upload-Content-Length", std::to_string(3 * kChunkSize)));
397   auto offset = 0L;
398   while (offset < desired_size) {
399     auto const n = (std::min)(desired_size - offset, kChunkSize);
400     os.write(chunk.data(), n);
401     ASSERT_FALSE(os.bad());
402     offset += n;
403   }
404 
405   // This operation should fail because the x-upload-content-length header does
406   // not match the amount of data sent in the upload.
407   os.Close();
408   EXPECT_TRUE(os.bad());
409   EXPECT_FALSE(os.metadata().ok());
410   // No need to delete the object, as it is never created.
411 }
412 
413 }  // anonymous namespace
414 }  // namespace STORAGE_CLIENT_NS
415 }  // namespace storage
416 }  // namespace cloud
417 }  // namespace google
418